Stream Processing with ScyllaDB – No Message Queue Involved!

27 minutes

In This NoSQL Presentation

At Palo Alto Networks, we process terabytes of events each day. One of our many challenges is to understand which of those events (which might come from various different sensors) actually describe the same story but from many different viewpoints. Traditionally, such a system would need some sort of a database to store the events, and a message queue to notify consumers about new events that arrived into the system. We wanted to mitigate the cost and operational overhead of deploying yet another stateful component to our system, and designed a solution that uses ScyllaDB as the database for the events *and* as a message queue that allows our consumers to consume the correct events each time.

Daniel Belenky, Principal Software Engineer, Palo Alto Networks

Daniel is an experienced software developer. In the past year he’s been working in Palo Alto Networks developing fast data processing solutions.

Video Transcript

Hello, everyone. My name is Daniel Belenky, and today I will be talking with you about stream processing with ScyllaDB. So my name is Daniel Belenky. I’m a Principal Software Engineer at Palo Alto Networks. My background is Kubernetes virtualization, distributed applications, big data and stream processing solutions, and let just start. So the agenda for today is, we’ll start with a brief introduction of the product and my team.

Then we will talk about, what was the challenge that we were trying to solve eventually? Then we’ll talk about, what were the other solutions that were considered besides what is the solution that we ended up with and how we’ve managed to solve our problem and how ScyllaDB helped us to solve this. So let’s start with a brief introduction. So our product is a network security product. It performs analytics, detection and response of millions of records per second. It handles multiple data sources and different data schemas, and we have to provide insight in a near real-time time frame because this is a security product. So when something goes wrong, when there is a threat on the system, we have to respond. So what my team is doing, so my team basically develops the initial data pipelines that receive the data from our end points, and we clean the data. We process it, and we prepare it for further analysis in other parts of the system. One of the major tasks that … It’s not the major task, but one of the major tasks that my team is facing is building stories. Building stories in our terminology means that because we receive multiple event types from multiple different data sources, and each of these data sources can actually describe the same network session but from different viewpoints on the network, and we want eventually to be able to say, “So this event from the firewall and this event from the end point and this event from the cloud provider, those all events are basically describing the same … They’re basically telling the same story but from different viewpoints.” And we want to fuse those events, and by fuse, I mean we want to eventually extract the data from each of those individual events and produce one enriched event, one big event which is enriched, and it consists important data from all of those smaller individual events that we’ve seen. So our technology stack is mostly Golang and Python, and we deploy things on Kubernetes. So what was the challenge we were facing? And it was trying to explain. So here what we can see on the screen, you can see the bell. This bell represents a single event. So let’s say that there is an event from some sensor. Let’s say this is a router, and at a certain time, this router emits some message. I don’t know. I routed some packet from route A to route B. So we go to this event. In this particular example, it is a DNS query. You can see that this message actually consists of two events, not just one event. Then … And this event arrived to our system. Then after 1 second, another sensor decided to send us another message. This sensor happens to be some custom system that told us that someone performed a log-in, and someone else performed a sign-up. Each of those events has some unique ID, and as you can see, those events have different structure, similar structures, but different. They can be completely different. It doesn’t have to be similar in real life, and then after 8 minutes, we received another event from a third sensor. This time it was some HTTP logs, and we got all those events. So all those events which arrived in different times, they can actually represent the same session, the same activity on the network. So we are trying to tell this to build the graph. So this is the same … This slide is starting where the previous slide stopped. So we have data that came from different sources in different times. So we take this data. We normalize this data to a canonical form that will be ready to process by the rest of the system, and the problem is that now we have millions of normalized but unassociated entry. So by unassociated, I mean we have a lot of data, a lot of discrete events, but I don’t know right now which of those events are telling me something. So we want to build something. So how to associate discrete entries that describe the same network session. So let’s see why this is a challenge at first place. So first, clock skew, and by clock skew, I mean different sensors. They don’t have to be in the same data center. They don’t have to be in the same network. They don’t have to be in the same computer. They can have … Even if we synchronize their clock, it not always will be synchronized to the millisecond or not even to the second, and another problem that I don’t think that I’ve mentioned yet, because we’re a security company, we provide each of our customers a unique deployment, so no two customers will be on the same deployment. And of course deployments can vary in size. We have deployments which receive several bytes per second or kilobytes per second and some very large deployments that can process gigabytes per second, and our solution have to be optimized to handle all those kinds of data, and the sensor’s viewpoint of the session can have … Well, we call this directionality, and by directionality, I mean that each sensor is not always aware of the world around it. So it can … Say that I receive some message. I forward some message, but it doesn’t always mean that the direction is from point A to point B. Maybe this sensor actually reported the reverse part, the reverse group routing, and we have zero tolerance for data loss, and this is very important for us. Data that is pushed to us, if we lose it, we lose it. That’s it. It’s gone forever. We never see it again, and we can miss threats if we lose it. And of course continuous out of order stream. So sensors send us data in different times, and the event time is not equal to the ingestion time. So by that, I mean that the event time is the actual time when this event occurred, and the ingestion time is the time when the event actually was sent to our system. And then there is the processing time when we were actually able to get to work on this event. So those three are not always the same thing. Most cases are not the same, and we will see it in an example. So let’s say … So here you can see a graph that just … It’s a timeline, actually. So you have time zero up to time five, and we have the event time, ingestion time and processing time. So you can see here the gray dot, which represents an event in time, and you can see that this event was ingested. It was sent to us, and we were able to process it right when it was sent to us. Then there is another event. This event occurred at some time one, but it was not sent to us. Why it was not sent to us, I don’t know. Maybe it was … the user who generated this event was on an airplane or on a train that suddenly lost connection to the Internet, and this is okay. And so now we have another event that was generated by another user or by another part of the network, and it was ingested and sent to us, and then suddenly we reviewed the previous event, the blue one. Then we got another event that was sent to us right when it was generated. Finally, we finished processing the first event. Now we can process the green event. It was ingested to us, and only then we get to process the blue event. So as you can see already, the processing time does not have to be ordered. So there is nothing that guarantees to us that the order on which we will receive the events will be the order in which they will be processed, and then we process the yellow event, and now let’s say that this gray event and the blue event, which are marked now with Xes, are actually describing the same network session. So those actually are not … Well, they are discrete events, but they actually belong to the same story. They tell us if we will be able to find it, we will be able to gain more insights on what happened. So what do we need to actually do this? What do we need to actually take different discrete events, build them together and build a story, to say that those events are actually telling us something about the same session. So we need to receive a stream of events of course. We will have to wait some amount of time to allow for related events to arrive because we can’t process each event right away because as we’ve seen before it might take some time for later events to arrive. We have to decide somehow which events are related to each other and which are not. We have to publish the results. So those are the basic and, let’s say, initial system requirements for our solution, and there are two other requirements, which are more of business requirement. So our deployment has to be single tenant, as I mentioned before. So we must provide complete isolation. We have no place for mistakes here at all, and we have to support all those single tenants, all those discrete deployments. They have to of course different size, and some have kilobytes per hour or, as I said before, per second, or I don’t know, and several very, very large deployments with hundreds of gigabytes of data, and all this has to be done of course at a reasonable cost. So let’s talk a bit about our solution. So the first and the most, I think, straightforward solution and maybe the easiest to implement is to use a relational database. There are many offerings. So how this solution will look like? So we have our normalized data that comes to us already in the canonical form, which we’ve already processed. So we store it in some relational database, can be distributed, not distributed. Depends on the size. And then we can have some periodical tasks that will run some complex queries on these relational database, and those queries will basically represent our business logic to find which events are related to each other, which events describe the same story, take the response and publish it, and then other parts of the system can consume this data and work on it, simple and a working solution. So the pros of this solution is that it is a very simple implementation, right? We almost don’t have to write anything. All we have to do is deploy a database, write some query, which can be complex, but okay. We write it one time, and that’s it, basically. It works. It’s a solution, but there are problems. There is an operational overhead. We have to maintain this database, right, over time, and there is some limited performance, so relational databases are often slower, much slower, than NoSQL databases, like ScyllaDB, and of course if the data model allows us to use a NoSQL database it will be much faster to use one. And there is the operational cost because now we have to run complex queries which will require much more CPU, and it will cost us more to deploy this. So we went to another solution, which is using ScyllaDB and Kafka. I’m saying Kafka, but it can be any message queue. We tried it with Kafka. If someone is not familiar with Kafka, I don’t want to spend too much time on it, but this is basically a message queue. You can send messages. You can have multiple publishers, multiple subscribers, and you can communicate by that. So just as before, we have our system with the normalized data in a canonical form. We send it first … We send the records to ScyllaDB, and in parallel we publish the keys that will later on allow us to fetch those records from ScyllaDB, those event records. Each row represents one event from of course different sources, and first question that someone can ask himself is, why don’t you store the records? Why don’t you publish the records directly on Kafka? So the problem is that those records can be big. They can be several megabytes in size, and we can’t afford to run this through Kafka because in order to work fast Kafka will have to work from memory, and we don’t have so much memory to get it. So then we have multiple consumers that will read the data from a Kafka topic. This data again will contain just the key. It will contain just enough data in order to allow those consumers to fetch those records from ScyllaDB. So those consumers will fetch the records from ScyllaDB. They will compute the stories. They will compute the relations between those events, and they will publish the stories so that other components on the system can consume. So again, this solution has some pros and cons, and some of the pros of this solution is that … By the way, we actually have all the solutions that I’m showing. We’ve implemented them. We’ve tried them because you can see that the first solution is the easiest one to implement. This one is a bit harder, and the last one will be a bit more complicated. So the pros of this solution is a very high throughput compared to the relational database with the batch queries. We have one less database to maintain. We were able to get rid of the relational database. ScyllaDB is already part of our system. We were already using it prior to this solution, so that’s a good thing. And cons of this solution is that we have to write our own logic to find those colloquial relations. We can no longer rely on some database to do the heavy lifting for us. We have to do it ourselves, and this is quite complex logic. We have this complex architecture and deployment because now we have Kafka. We have data sent to Kafka and in parallel to ScyllaDB … Sorry, in ScyllaDB in parallel, you send the keys to Kafka, and then someone has to consume it. So it makes the system a bit more complex, and the biggest problem for us, and this is what at least from our point of view was a big no, that we had to maintain 1,000 Kafka deployments because we have to provide Kafka for every customer, and to run … When you combine this demand, this requirement with the fact that we have zero tolerance for data loss, it means we have to run and distribute Kafka. So it means that for even the smallest customer he will have to have maybe two or three Kafka instances, which we didn’t want, and of course someone has to manage this whole thing, and if someone ever tried to manage Kafka, you know it’s not an easy task. So the optional solution, the third solution, I will not go into too much details about it, but it’s basically just like the solution with Kafka, but we tried to replace Kafka with some cloud-managed queue, but it didn’t work too well because this queue was much slower than Kafka, and it was actually … I think I can say that the performance was comparable to what we’ve had with the relational database. So some of the pros is that, okay, so we have a higher throughput compared to the relational database of course, but we have one less database to maintain. We didn’t have to maintain Kafka deployments, but some of the cons were that we have much lower performance compared to Kafka, and as I said, it’s a higher throughput than the relational database but not something that probably was worth the overhead of writing our own logic to build those stories. So what is the solution that we eventually used to solve this problem? And before I get to it, I want to say that this solution will not work for everyone. It works for our use case because it was a fact that ScyllaDB is there, and we were using it already, so we were using ScyllaDB for other parts of the system. So it was just available for us at that point of time, and we didn’t want of course to manage thousands of Kafka deployments. So how we solved it. So we have our normalized data in canonical form as before, and we store the records in ScyllaDB and only in ScyllaDB. Of course the data is sharded into hundreds of shards. This is actually true for the other two solutions, and how we stored the data in ScyllaDB, this is the interesting part, so the partition key is actually the shard number to allow different workers to work on different shards in parallel, and the insert time. And this insert time is a time stamp with some certain resolution, let’s say up to 1 second and not just any arbitrary millisecond time stamp, and the clustering key is the event ID. This is used later to fetch dedicated events. So we have our multiple consumers fetching records from ScyllaDB. So how they actually fetch records from ScyllaDB is the interesting part. So they actually … We will see it a little in the next slides, but basically we … The query that they run is that they tell to ScyllaDB, “Give me all the data that you have for this partition … for this shard and with the given time stamp,” which, as I said, the resolution is 1 second, for example. So ScyllaDB returns all the records to us and then we publish … We compute the stories ourself and we publish the story for other parts or other components in the system so they can consume it. So the pros of this solution is first, since we already have ScyllaDB deployed, so we don’t have to add a new creatures to the system, no new systems. That’s always good. We have high throughput when compared to the relational database approach. I wouldn’t say that it was working faster than the solution with Kafka, but it was comparable to the solution with Kafka. And I can say that right now ScyllaDB is not the bottleneck for us. Our bottleneck is the disks right now. And of course still want this database to maintain because we were able to get rid of the relational database we had and no need to maintain Kafka deployments, this is always good. But there are of course some downsides to this. So our code is much more complex. And we have to keep our producers and consumers synchronized with our clocks up to certain resolution. It’s not that hard of a requirement because we can always read from the past, just add a small delay to the consumers, so that we would make sure they never read before the producers. So let’s see the detailed overview of this solution, so how this works. So first our … on the right side … on the left side you can see all the events and on the right side you can see our actually components, workers that actually build stories. So they actually run some query on ScyllaDB when they start. We have, actually, another table that is called read offsets. And this is where each of those workers stores the last offset … the last time stamp that it reached with its reading. So it basically tells it … It basically takes the last state that it has for its own shard, and ScyllaDB returns. You can see that for shard one, the read offset is 1,000, whatever this time is. Shard two has another read offset, and shard three has a different read offset. Then the producers, they run a query that looks like this, we insert into our data. It takes the shard … the insert time, and of course the payload, and we insert this data into ScyllaDB. Then the workers, they run all the time, actually run in an endless loop. And they take the data from ScyllaDB. So as you can see that those events are actually were sent to the same shard. And each of them, when it finished computing the results, it commits the last read offset to ScyllaDB and the same goes for the other workers as well. Then we have another event arriving, of course, and we read it as well. So what were our final results? So we’ve been able to reduce the operational cost by a lot, actually. We reviewed reduce the operational complexity because we didn’t add another system, we actually removed a system from our deployment. And we’ve been able to increase and improve our performance and allowing this translates into more reduced operational costs. And that’s basically it. Thank you very much and for staying until now, and stay in touch.

Read More