Machine Learning, AI and Programming

Designing a Social Network Site like Twitter

In this post we would be looking at designing a social networking site similar to Twitter.  Quite obviously we would not be designing every other feature on the site, but the important ones only. The most important feature on Twitter is the Feed (home timeline and profile timeline).

Snapshot of my Twitter Feed

The feeds on twitter drives user engagement and thus it needs to be designed in a scalable way such that it can handle 500M tweets per day and 200B tweets a year.

Note that the ideas and discussions presented in this post does not necessarily mean that Twitter uses or have used that idea. This is just a design exercise.

The other less but important features are Search, Follower Recommendations, Trending hashtags, Promoted contents (Ads), URL Shortener, Notifications etc. Apart from designing feeds we will also look into how we can use machine learning to solve some of Twitter's problems, for e.g.

  • Ranking tweets/re-tweets on home timeline. (Relevancy of an user or his tweet to his followers).
  • Follower recommendations.
  • Detecting tweets by bots.
  • Detecting and blocking offensive contents in tweets.
  • Predicting which tweets will go viral (many re-tweets).
  • Identify celebrities.
  • Targeting users with promoted content.
  • Analysing tweets to identify political events, natural disasters etc.

Let's start with Feeds.

Designing feeds is conceptually very simple, because all your home timeline needs to show are the tweets from the people you follow, ranked by some order (e.g. time of posting) along with 4 different kinds of actions for each tweet : Like, Re-tweet, Reply and Share. Each action is associated with an action count (e.g. number of likes from people who follow the author of the tweet).

On clicking an individual tweet, it opens up a tweet details page, which contains the original tweet as well as the 4 different actions and their counts. Additionally it shows all the replies to the tweet. The "Reply" object is just an extension of the "Tweet" object, because each reply also has the same 4 kinds of actions with action counts.

Note : One can reply to a reply. But we need to limit the depth to say maximum 5 levels.

A "Tweet" class object :

{tweet id, user id, date-time, like(), re-tweet(), reply(), share(), get_like_cnts(), get_retweet_cnts(), get_reply_cnts(), get_share_cnts()}

The "Reply" and "Retweet" classes inherits the above class. One can use the same database table to store these three objects but with flags indicating which one is a tweet, a reply or a retweet, because on the frontend, each of these requires a different action on click or display option.

A very basic relational database schema for feeds :

  • Table for Tweet :
    • tweet_id, user_id, tweet_text, time_posted, is_tweet, is_retweet, is_reply, like_cnt, share_cnt, retweet_cnt, reply_cnt, original_tweet_id
  • Table for User :
    • user_id, first_name, last_name, dob, username, enc_password, etc.
  • Table for Follower :
    • user_id_1, user_id_2
    • user_id_1 follows user_id_2, for bi-directional follows, there will be two entries.
  • Table for Activity :
    • activity_id, activity_name, user_id_1, user_id_2, date_time.
    • user_id_1 [Likes/Retweets/Replies/Shares] user_id_2.

The tables are properly indexed for the appropriate fields.

Whenever an user A tweets, it gets inserted into the "Tweet" table and is_tweet field is set to 1. Now when user B opens the app, the system fetches all users who B follows from the "Follower" table. For all those user_ids who B follows, fetch all their tweets from the "Tweet" table and then join and sort them based on the "time_posted" field.

Whenever user B, likes a tweet, the corresponding activity in the "Activity" table for activity "Like" gets updated with user_ids for B and A. Also the like_cnt field in the "Tweet" table for the corresponding tweet gets incremented by 1.

Similarly for the other activities - Share, Reply and Retweet. For Reply and Retweet we need to set is_reply and is_retweet flag in "Tweet" table respectively.

In the tweet details page, all replies and retweets are fetched from the "Tweet" table, for the corresponding tweet id. The field 'original_tweet_id' is the field which is used to search for the replies and retweets.

Since each reply and a retweet is a row in itself in the table, fetch all rows corresponding to original_tweet_id = <current tweet id>.

Replies to replies can similarly be found out based on 'original_tweet_id' field.

Looks pretty simple !!! Then what are the challenges to the above schema ?

  • Number of reads (people browsing timelines) are much higher compared to number of writes (people tweeting). It is 300K queries per seconds for reads vs. 6K queries per seconds for writes.
  • There will be more than 500-600B rows in the "Tweet" table, which will make the size of the database more than a few petabytes in size.
    • Each tweet is 140 chars = 560 bytes.
    • Total size approx. = 500 x 10x 560 = 280 x 1012 = 280 petabytes.
  • There are approx. 200-300M users registered and each user on an average will have 100-200 followers. These number is highly skewed (power law) as celebrities will have followers in millions. The number of pairwise rows in the "Follower" table would be approx. 100B.
  • Fetching the required user_ids from the "Follower" table, then fetching all tweets for each user_id and then combining and sorting those tweets based on time_posted and then sending it over the HTTP service, could take well over few minutes for the timeline to be populated and displayed to the user.
  • Table joins are theoretically faster than writing the grouping and sorting logic in the application code, but joining tables of sizes in 100's of billion rows is not computationally feasible.
  • Maintaining such large databases on a single machine is a big problem. They can fail and with it will bring down entire site down.
  • Vertical scalability is another problem. More than >100K users can be simulateneously accessing their timelines and for each user the combined size of all tweets to be sorted can be greater than 100 MB. Thus for 100K user, this comes to around 10TB of RAM.

The problem with large databases can be solved using sharding and replicating.

  • Basically this means that partition a large table into blocks (based on time_posted or user_id etc.) and then store each partition on different servers. Additionally replicate the data on each server 3-4 times. Now even if one partition fails, it will not bring down the entire DB. Plus since we have replica of this partition already, we can do a failover to one of the replicas.
  • To read from sharded tables, one needs to know which database servers we need to access in order to access all the necessary rows of data for the query.

Database Sharding

  • For e.g. if the Tweet table is partitioned on time_posted, then searching all rows on the key user_id, will probably require to access all DB servers because an user can post tweet every day. Whereas if the table was partitioned on user_id, then accessing the server corresponding to the user_id will be enough. Thus one needs to select the shard key carefully.
  • To enable faster location of the database servers corresponding to the shard key, one can use consistent hashing, i.e. mapping of the shard key to the DB server, such that even if that DB server fails, it will not effect the mapping.

Consistent Hashing

Choice of DB :

  • NoSQL vs. Relational SQL ? Joins on normalised tables as in relational SQL is computationally memory intensive, better to de-normalize data into NoSQL DB's like MongoDB or Cassandra.
  • Graph DB for storing social network connections. Graph is a natural abstraction for a social network.
    • One of the challenges with graph databases is horizontal scaling, i.e. sharding or partitioning the database.
    • In case of relational DB's like MySQL, each record (row) is independent of each other and can be effectively partitioned based on the values of one of the indexed columns.
    • The nodes in a graph are not independent as there is a relation defined by the edge connecting a pair of nodes.
    • Sharding based on node values can cause too many network communications for users who has too many followers or followees, because each follower node or followee node can be in different shards.
    • One possible way is to store for each node all nodes within 3 edges from it in the same shard. This way one can minimize the network communications but it can lead to duplication of nodes into different shards because each node will act as a root node in one shard and neighboring nodes in other shards.

Sample Graph DB Relation

Computing the tweets to be shown on the timeline of the logged in user, when the user requests his/her home page is known as Pull based mechanism, because the user is requesting his timeline. We have seen that this method of computing timeline at real-time has its drawbacks and will lead to very high latency, because there are approx. 300K QPS for timelines.

Can we take advantage of the fact that there are only 6000 write requests per second as compared to 300K read requests per second ?

The other method is the Push based mechanism, where instead of computing the timeline during read time, the timelines for users are computed during writes.

Whenever an user posts a tweet, the tweet is inserted into the timelines of all his/her followers. But how the timelines are stored ? Each user's timeline is stored in an in-memory database like Redis. Whenever a new tweet comes from an user_id, the "Follower" table or a GraphDB is scanned for all followers of user_id, then based on the user_ids of the followers, determine all the Redis clusters corresponding to these user_ids. Each Redis cluster implements a limited size queue, i.e. only the recent 1000 tweets that needs to be shown in the timeline.

Whenever an user requests his timeline, all the tweet objects are fetched from his corresponding Redis cluster and sent over HTTP. There is no further computations involved in these process as everything is pre-computed.

An user generally do not browse more than 100-200 tweets at once per session, so caching the recent 1000 tweets makes sense. Even if the user wants to scroll beyond 1000 tweets, then the remaining tweets can be computed and fetched directly from DB's as explained above. But this would be slow.

So what could be the challenges with a Push based mechanism ?

  • Keeping a Redis cluster in memory for every user.
    • Each Redis cluster contains 1000 tweets from followers.
    • Twitter user UTF-8 encoded tweets, which is 4 bytes each. Thus for 140 characters, the total size of a tweet is 560 bytes.
    • Assuming most of the data is taken up by the text, the total size of a Redis cluster is 1000x560 bytes = 560KB (approx.)
    • If we keep one instance for each of the 300M users, then total size comes to around 150TB (all in memory !!!)
    • Keep in memory clusters only for active users (users logged in at-least once in last 5 days).
    • For other users, compute timelines in real time from DB whenever the inactive users logs back again. Persist data in Redis cache again.
    • Other options : LRU or LFU caching mechanism.
  • High Fan-Out. Celebrities have number of followers in millions, thus each tweet by one of these celebrities require updating millions of Redis clusters.
    • Multiple celebrities tweeting simultaneously can cause exhaustion of available write threads. If queue is implemented then the queue size can quickly outgrow.
    • Compute relevancy score of the tweet to an user. Relevancy score is a function of number of interactions of user with celebrity, similarity of the tweet to other tweets the user interacted with, bi-directional relation present or not, similarity of the celebrity with other users interacted with, active user or not, etc.
    • Prioritise followers based on the relevancy score.
  • Inconsistent state.
    • For high fan-out scenario as above, it might happen that, by the time the tweet reaches to one of the follower (low relevancy score), a reply to the tweet from a follower (with high relevancy score) reaches the Redis cluster before.
    • Always check that the original tweet exists in the Redis cluster of the user before showing the replies.
  • High write throughputs during political events and natural disasters.
    • Number of http processes will exhaust and system will freeze.
    • Need queue based implementation for writes. Asynchronous writes.
    • Priority Queue ? Prioritise tweets before retweets and replies.

Let's look at some Machine Learning applications for this social networking site. Two of the most important applications are :

  1. Ranking tweets/re-tweets on home timeline.
  2. Recommending followers.

Because they directly impact user engagement on the website.

Ranking Tweets :

  • Most common approach is to order by time, newest to oldest. This is a very logical way to do it.
  • When can a time ordered timeline be a bad choice for a user ?
    • Too many tweets from a single followee in succession. Probably the tweeter is breaking down his tweets due to the 140 character limit or it is just a bot posting on-behalf of the account.
    • Possibly group all such tweets into a single tweet and show some kind of a collapsible UI to display them if the user wanted to.
    • Not many relevant tweets for the user. The user might have followed some political figures just to expand his knowledge about politics but most of his interactions are very intellectual i.e. with mathematicians or scientists. So during a political event his timeline will be flooded with tweets/retweets from those political figures.
    • Showing tweets based only on relevancy and no time factor will cause stale tweet effect, because every time the user opens up his timeline, he gets to see the same tweets.
    • Better strategy would be filter relevant tweets that has come after the last session of the user.
  • Create dataset around, which tweets does the user interacts (likes, retweets, shares etc.) with (positive examples) and the remaining tweets, the user do not interact with (negative examples). Create a binary supervised model emitting the probability of a tweet to be interacted by the user.
    • The features for the model would be the text (n-grams) of the tweet, the author of the tweet, number of likes, shares and retweets for current tweet, number of interactions of the user with the author in the past etc.
    • Normalize the features.
    • Train a neural network model with output as 0 or 1. 0 for negative examples and 1 for positive.
    • For negative examples, sample equal number of examples as positive, because negative examples would be larger in number.
    • Need model to be incrementally trained with incoming stream of examples.
    • NN architecture allows incremental training with initial weights equal to the learnt weights by the previous model.
  • But the user may not necessarily interact with a tweet which he likes.
  • How often the user has interacted with users similar to the tweeter ?
  • Similarity between users can be found out by some of the following simple strategies :
    • Number of common followees and followers.
    • Number of edges connecting the two users (can be uni or bi directional) and average lengths of paths connecting them.
    • Fraction of times both users have interacted with the same tweet. The fraction is over the maximum or the total number of tweets interacted with.
    • Number of times a common followee will interact with both the users tweet or no. of times both users interacts with a common follower on a single day.
    • Number of times they have interacted with each other.
    • Similarity of tweets/retweets/shares. Average pairwise similarity between pairs of tweets from user A and B.

Recommending Followers :

  • One simple strategy is closure. If I follow user A and user A follows user B, then recommend user B to me. The closure can happen at multiple depths in the graph.
  • Number of common followees and followers. If user A and B have very high number of common followers, then users A and B are very similar.
  • Number of followers and followees who are very similar. For example, if user A has the set of followers {X} and user B has the set of followers {Y}, then if the average similarity (defined above) between users in X to users in Y is high, then user A and user B must also be similar and must follow each other.

Twitter Recommendations

  • PageRank based recommendation algorithm. It's basically the probability that if started at user A, any random walk along the edges will eventually land up at user B. Each node has a score indicating how likely one can land up at this node by a random walk from any node.
    • Each vertex or node in the graph represents an user. The edge between two nodes represents the "follows" relation. And edge from A to B implies that A follows B.
    • Note that we do not need to explicitly weight the edges with the number of interactions etc. because we are interested in the relative importance of a node as compared to other nodes.
    • The number of incoming edges to node or the number of outgoing edges is implicit in determining the importance of a node. PageRank goes beyond the number of incoming or outgoing edges and also takes into account the importance of the nodes the edges are coming in from or going out to.
    • Each node is given an initial score = 1/Total number of nodes.
    • In each iteration t, we update the score for a node X as

P_t(X) = (1-d) + d * (\frac{P_{t-1}(A)}{3} + \frac{P_{t-1}(B)}{2} + \frac{P_{t-1}(C)}{5}), where d=0.85 is the damping factor

PageRank for node X computed based on nodes A, B, and C

    • Probability of landing on user X by following a set of users randomly is given by the above equation, because X can only be reached from either users A, B or C and the importance of user X is determined by the importance of A, B and C.
    • To recommend an user to user A, do a random walk along the edges but following the distribution of the PageRanks of the nodes. This will lead to a node which A do not follow but is important in the network.
    • A modification to the above algorithm is personalised PageRank. The PageRanks assigned to the nodes in the above algorithm is global, i.e. the score for a node defines how important it is to the entire network but not to a particular user.
  • SALSA algorithm is also an iterative algorithm like PageRank. It is based on personalized PageRank.
    • The idea in personalized PageRank is that for an user A, compute the PageRank score for all nodes, but ultimately the random walk should lead back to user A again.
    • Create a bipartite graph from the users in the "circle of trust" of user A, i.e. users reachable by a random walk starting from user A, such that users in the left partition are followers of the users in the right partition.
    • In each iteration, go from a node X in left partition to a node Y in the right partition (by random walk) and then come back again from Y to a node X' (again random walk) in left partition.

P_t(X) = (1-d) + d * (\frac{P_{t-1}(Y_1)}{3} + \frac{P_{t-1}(Y_2)}{2} + \frac{P_{t-1}(Y_3)}{4})

P_{t-1}(Y_1) = (1-d) + d * (\frac{P_{t-1}(X_1)}{3} + \frac{P_{t-1}(X_2)}{2})

P_{t-1}(Y_2) = (1-d) + d * (\frac{P_{t-1}(X_3)}{3})

P_{t-1}(Y_3) = (1-d) + d * (\frac{P_{t-1}(X_2)}{2} + \frac{P_{t-1}(X_3)}{3})

Personalized PageRank

One can continue to explore many different kinds of approaches for ranking tweets or recommending followers, but each approach must be possible to be implemented in a scalable way else with such traffic as described above, the system will freeze. For e.g. in the WTF paper they have implemented SALSA algorithm based on a graph database assuming that the entire graph will fit into the memory of a single machine.

Due to shortage of time and to prevent this post to become too long, I am deferring exploring the other machine learning problems for later posts.

Useful Links to Read :



Tags: , , , , , ,