Aggregations at Scale for ShareChat — Using Kafka Streams and ScyllaDB

Charan Movva25 minutes

ShareChat is a social media app with ~180 million MAU and 50 million DAU. We capture and aggregate various engagement metrics such as likes, views, shares, comments, etc., at a post level to curate better content for our users. In terms of numbers for the engagement metrics, we operate a scale of 55k-60k writes/sec and 290k-300k reads/sec, respectively. With these engagement metrics directly impacting users, we need a datastore that would offer lower latencies and is highly available, resilient, and scalable. It would be best if we could achieve all of these at an optimal cost. This is to learn how we accomplished the above mentioned criteria by using in-house Kafka streams and ScyllaDB.

Share this

Video Slides

Video Transcript

Thanks for joining! As part of this presentation I’ll be covering how ShareChat is tackling problems of engagement counters with Kafka streams and ScyllaDB. Through this presentation we’ll briefly go over what ShareChat, the scale around which we operate, and why we choose streaming as a solution.

right and few requirements around the problem uh that we had and architecture that we have that we currently have in place and few internals around it and how clr2b is helping us in solving a part of the problem and our experience so far uh with ciladi

so ShareChat is India’s largest homegrown social media platform which offers content creation and consumption in Indian in 15 Indian languages and we are on our path to build an inclusive community that encourages and empowers each individual to share the journey and valuable experiences with the comfort of their native language and currently we are operating at a scale of 125 million users a month and our users end up sharing content and experience which add up to around 1.3 billion plus shares a month and on an average typical user spends approximate of 31 minutes on the app wait uh yeah so that is straight shot and uh to understand what engagement events are in the criticality around it right so uh while serving content to our end users we collect a lot of events around the way uh they interact with our app and these in include new events as well as in which post the user had viewed and if they had found that particular post engaging by liking sharing or commenting over that post so uh it is sort of evident that we do capture a lot of events at multiple post level and also around multiple levels of Engagement when I say engagement again it is the combination of uh likes shares comments Etc and uh the scale of Engagement events alone ranges from 370k to 443 440k Ops per second and so these are also critical in nature as we want to show these back to our users on the feed as a feedback mechanism and they also play a some role in the process of curating the better content for our users using some of the data science and machine learning models

uh and uh when the basic problem the question arises like why did we go with streaming right and if if we were to look at the other uh paradigms that are available for us to handle this particular problem the first one is uh the request response right well it’s the lowest latency option with frequent updates reaching the database and with the magnitude of scale of events that we have uh it is going to be a bit too much on the database with more connections and unnecessary rights or updates on the DB and why did I say unnecessary rights or updates uh because uh like shown over here in the slide uh 12 500 and 12599 are the same because we end up shortening uh the particular number and show it as 12.5 K likes on the feed right so the updates happening from twelve thousand five hundred to twelve thousand five ninety nine doesn’t really have to be in uh in real time and this only grows with the order of the number right and uh uh and it is sort of evident uh uh now that in order to reduce the database calls we could aggregate over a set of events uh over some time and then make name and make a single call or maybe uh reduce number of operations on the database then comes batch processing right uh it is the high throughput option uh here we set up a process which runs every certain interval executes on the data and that was that was available since last execution uh and this comes up with different set of problems for us as an uh given that like we are running this particular process every certain interval we have uh we will have some stale data for for the early engagement as in for a post we just started to receive the engagement which is again the combination of likes or how the user is engaging with that particular post right the updates are going to be crucial as in uh there isn’t there is not going to be any shortening happening of the number uh for a while uh well on on the on those counters while showing them on the feed right so then we have the stream processing right well it is the processing of a data stream or an event stream and by definition uh the test time is unbounded data set meaning infinite and double growing the new records keep on arriving and uh and this essentially fills the gap between the request response and the batching world because this offers continuous and non-blocking processing of the data and that that is the reason why we went ahead string and if you look at our main requirements so we are looking at to have a windowed aggregation that we aggregate all is all the events that we have received over a certain window right it would be last five minutes or last 10 minutes or last five seconds for that matter and when we would like to support multiple aggregation Windows right as in uh for a counter which is in the range of one two thousand uh we have to have an instant aggregation window like in the order of less than five seconds or so and for a counter which is greater than 100k we are okay with the aggregation window in the order of minutes right uh like mentioned earlier the updates can we can aggregate uh we can wait over the aggregations for some time and then maybe uh do the operation and next comes the ease of configuration for new windows in feature uh right currently uh we do have some configurations in place which are meeting our product needs at this moment and uh uh and we want to support uh the new requirements or that might come our way right and triggers basically as a product we want to send notifications to the user whenever their post crosses a certain milestone and this is for different engagement levels right um and uh we want to solve the problem as part of this aggregations given that uh we are essentially solving the problem around updating the counters right and uh and the system should also support the easy onboarding of new Counters New triggers and new windows at a counter level in near future

so this is the high level architecture diagram so each engagement event also has some business logic associated with it and these events are captured by different services on the back end and these backend Services send events to our Kafka cluster which is an in-house setup right and uh the reason why we have business logic so it is like at different services like we do capture and uh derive different internal counters also which which are which are helpful for us to uh understand better and also uh helps in decision making in order to show a post on a feed and we do have different topics for different use cases right uh and we have this aggregation service running on kubernetes the similar blocks of aggregation service that are shown over here in the picture are denoting the N number of parts that we run right and the core logic around the windowed aggregations triggers are handed by the Kafka stream CBI as we are using the Kafka streams domain specific language and once the aggregations are done we update that particular counter or the aggregated value in our key value store which is cldb in this case and also publish a change log to a Kafka topic back

so I mean so what is actually happening under the hood right um so we are leveraging the features that Kafka has to offer as in we don’t have to worry about the messages or the events for a given post ID and a specific specific counter being processed in two different parts or instances and this is taken care by the Kafka partitions right and we have the combination of entity ID which is a post ID and the counter uh as a partition key and so by definition of Kafka these messages for a certain post ID and counter will always be in the same partition and eventually will be processed by the same consumer

and uh streams API so we are using Kafka streams domain specific language to handle most of the complex logic around the windowing and aggregations right and any stream application processing application implements and executes topology and we Define and we Define the topology as part of this aggregation service like shown in the previous diagram topology is nothing but a dag a directory cycling graph and this topology involves a series of transformation that every event moves through from input to Output right and at the end we are updating the aggregated value in the DB foreign if you look at the topology that we have created so uh moving from left to right so we consume the events from an engagement topic we do basic filtering operation on the events as to check if the counter is registered with the system or not uh and if it is not then we end up filtering them out and into a into a unregistered topic uh for our monitoring and logging purposes and we usually Define the aggregation window on a particular range right uh moving back again moving from left to right like any counter whose previous value is ranging from 100 to 1000 should be aggregated in the window of five seconds and it is on these basis we for the branch out the main event stream like uh into different streams with different Windows to aggregate over right and basically this we are deciding on the previous value of that counter that we usually get and the three priority branches that you are seeing are bases the different aggregation windows and uh once we Branch them out we are creating the different Windows teams out of them right and the aggregations uh will need to maintain a state around the events that are processed so far the respect to Windows teams and handling a state has its own set of issues like what is the instance or the power shuts down or starts again or may be replaced by another instance due to the rebalancing and all so this is something that Kafka streams handle very well this so-called local store local state is in memory is stored in in-memory using embedded Rocks TV which also persists the data to the disk for quick recovery during restarts and these State changes are also sent to a Kafka topic so when a node goes down the local state can be easily recreated and we get an output stream for the aggregations right so basically we are merging them into merging them all merging all those streams and then updated updating the aggregated values in the data store for the counters although we could have skipped this step like uh we could also easily get away without merging the streams and then process them it is the same thing at the end of the day uh also we like uh after uh like updating actually even before updating the counters in vsync uh we log the changes in a change log topic right so if we briefly look at some of the important pieces of the code right in the first point uh so we are essentially creating a stream object and while creating a stream object we need to pass the input topic which we are trying to consume from the key value 30 which will be used for consuming the records out of the topic and deserializing them into input objects like mentioned in the previous slide we Branch out streams bases the aggregating window which is the counter priority stream and within the particular priority stream we further group them by the key we we are providing the key value certain for the serialization and this returns a grouped string configured with key value inserting and uh we are looping over the streams at a particular at a priority or a window level after grouping them by the keys we are defining a window for aggregation uh and uh and this particular window is again if you can see uh it is basically the seconds threshold which is the window that we have mentioned right and uh Grace is a way of mentioning to consider out consider out of order events that array after mentioned Threshold at the end of its window and uh we are go we went ahead with grace period as zero and why did we go with grace period as zero so we are looking at completeness versus duration to limit out the aggregation scope and we have more concerned around the completeness of our data also our events will never be out of order because we are using the wall clock time as the event thing when if you again uh go back to the first point the timestamp vector extractor that we have mentioned over there is internally using the wall clock time which is a system time and it is sort of guaranteed that events that we are processing are going to be in order and uh and it’s essentially boils down to the problem and the use case that one is solving uh so we were prioritizing completeness completeness over the duration size so uh we went ahead with Grace as Z

again the first point in this slide uh like as mentioned earlier the windowing aggregation requires maintaining a state and a local store where it will be maintained materialized user store configuration object and we are also assigning a name to it by appending the counters priority which is priority level which is the uh different windows that we have and again provide it with key value certain for serializing and deserializing the results of the aggregation the value aggregated is an integer which is why we are using uh integer as the value certain it then we start on the aggregation we’ll split the stream into multiple tumbling windows and aggregate the events in that particular window it uh as part of this we also provide the supply interfaces to initialize and aggregate the counters by default the stream emits results whenever new results are available from the window but we are only interested in the final aggregated value which comes at the end of the window so we send in this extra argument to suppress all the results on the values till the end of the window and we are sending uh so that is what we are sending like uh supplies buffer config which is unbounded so we are sending unbounded as a suppress buffer config which is unconstrained by size and one has to use this suppress configuration a bit carefully if there isn’t enough Heap memory out there we’d end up seeing some out of memory issues and the result of the aggregation is a table with key and time window as the partition key and the aggregation result as the value so using two string we are again converting or turning the table back into the stream of events after which we update the values in the DB right uh so basically uh we were able to see 70s to one ratio on the events that we have received as an input and the rights that were happening on the DB which is around 80 reduction uh uh and uh in the messages which is which is sort of uh good for us

so uh we have solved the problem around using updates uh multiple updates using Kafka streams and aggregations uh we however we have reduced the uh write operations we still have to serve this data to our end users and the read scale is going to be in the order of 100 to 2K Ops and sometimes even goes beyond 3K 300K Ops per second so we need a data store which could serve our needs with best latency numbers possible not that we had any major problem with the latency it is just us looking for the best solution out there foreign

it is fast right uh so we have we are continuously seeing the sub millisecond latencies uh for this counters cluster and not only that uh we have better monitoring around these uh the database right so we have Matrix visibility at data center level cluster level instance and shards right uh this this gives us a better visibility and helps in pointing pinpointing uh or understanding the issues around any abnormalities or maybe uh mishaps that were happening that were not according to our expectations and also this is coming at a better cost we have seen at least 50 reduction in the database costs with Cilla and to put it in simply terms uh we think it is the best

so uh we are running a three node cluster for the engagement counters and each node has a 48 vcpus and 350 plus gigs of memory and on the top of this particular image we can see that the P99 latencies in 1899 read and write latencies are in microseconds and this is again with uh good amount of read-offs on the cluster read and write Ops on the cluster and the remaining part of the image shows that we have metric set instance comma short level read where we can visualize which particular charts are taking in more load and that helps in basically modeling and understanding the data we tested the same similar setup and was capable of it and it was capable of serving 1.2 million Ops per second right uh and how did we do it uh we started replicating the events in a parallel cluster by creating a backlog of events right and we have seen even with the load touching 90 percent uh the cluster is intact and there is no major impact on the latency’s front

here is another screenshot and this is during our cluster migration like we were migrating from the older database which is not a Cellar DB to ScyllaDB and this is during a migration job which export which is X which was exporting data from the previous store to seller DB and there were ongoing rights happening at this point in time and you can clearly see that the load on each node is touching hundred and uh and when we validate that validated the data across the two databases we have seen what 99.99 which is finance uh much uh

so uh so yeah so uh that that is how uh Sil rdb has helped us breed deal um in optimizing what we had earlier and then serving better uh in order to serve better to our end users right and uh so we work in as a team and this could not have been possible without the contributions from these bright Minds from engineering devops and the leadership and you can learn more about the operational challenges on the problems that we have encountered in our blog post right uh and uh hosting engagement counters on siladebi is one of first and biggest bits for us and we are in much of this of what this database has to offer and this led to much greater adoption of cldb in the organization

that’s me thanks for tuning in thank you so much [Applause]

Read More