Announcing ScyllaDB 6.0 — True Elastic Scale | Learn More

Build Low Latency, Windowless Event Processing Pipelines with Quine and ScyllaDB

Matthew Cullum20 minutes

Build an event processing pipeline that scales to millions of events/second with sub-millisecond latencies all while ingesting multiple streams and remaining resilient in the face of host failures. We will present a production-ready reference architecture that ingests from multiple Kafka streams, and produces results to downstream Kafka topics. Common use cases include fraud detection, customer 360, and data enrichment.

Share this

Video Slides

Video Transcript

My name is Matthew Cullum. I’m the director of engineering at thatDot. I’m here to talk about how Quine solves the challenge of resiliently processing complex events using a graph in real time and at scale.

a little about myself I have 18 years of experience in Enterprise software 12 years leading the architecture and technology for Industrial Automation platforms and distributed control systems all right let’s get started I’m here to talk about this really hard problem called complex event processing why a graph data structure is the ideal way to represent the problem domain but also why graph data structures have been historically unable to maintain performance as they scale up then we’ll design an architecture together that does what was once thought next to Impossible and show some test data demonstrating our desired results our goal a piece of software that sits squarely between multiple data sources like high volume Kafka topics and it needs to scale in a linear fashion to meet any Enterprise scale data volume we’d like it to represent the integration of these streams as a graph data structure with no need for time Windows meaning the graph is stateful and can theoretically grow forever and we should be able to perform complex queries on the events as they stream in in real time as each event is being merged into the graph and then send the query results as they are realized to one or more Downstream targets this is a tall order think about it each event is a data package potentially a large Json record and what we’re saying is we want to build a system that can extract transform and load these events into a graph data structure of potentially infinite size without impacting the latency of ingest we want to do this at ingest rates that the largest Enterprises are struggling with today we want all data in the graph accessible in constant time and we want to perform complex queries on that graph in real time so we can send those results Downstream oh and by the way we want this to be resilient to infrastructure and network failures and we want to do this on cost optimized Hardware sound impossible I’m going to show you how to do it in under 20 minutes before we begin designing the architecture though I want to point out this is not academic Quine exists and it does everything we’re setting up to accomplish it can ingest from multiple sources at millions of events per second integrates those events into a highly performant graph data structure performs query computations on the nodes in the graph in real time as those nodes and edges change in response to events it then streams the results to Downstream targets scales linearly in terms of both ingest rate and query performance it’s resilient to infrastructure failure and it does all of that on cost-effective infrastructure

let’s start with our problem space complex event processing

imagine a Kafka or Pulsar stream of events imagine two or three of them five of them each one is streaming events again perhaps composed of large Json records and we want to observe all of these streams at once consume each event in each stream at the same time and make inferences of the data both contemporaneously and in the past some use cases include fraud detection and cyber security The Brute Force way would be to dump everything into a massive database and run batch queries to check for known patterns but this would have two very big problems first the queries would be very slow and to control the runtime of the queries we’d have to introduce time windows a Time window is when you truncate data because it no longer fits into memory or because you need to limit the computation time of a complex query losing data or observations should not be acceptable the second problem is that in many complex event processing use cases by the time these queries return it might be too late to respond to the event ideally when an event comes through a stream which would allow us to make an observation we’d really prefer to act on it immediately in real time if that observation indicates fraud or a cyber security breach we need to know now not tomorrow morning

ignoring technical limitations or historical bias what is the best way to represent this problem well it’s a graph data structure it allows for categorical data to be represented naturally rather than encoding it alongside numerical data it is much easier to express complex patterns and relationships SQL queries for many complex patterns require recursive sub queries that can get very hard to reason about they’re error prone and are very slow graph query languages like Cipher provide a natural way to express complex patterns easily if we want to draw complex conclusions and we want to reason about the data then a graph data structure is ideal we can have nodes which represent proper entities from the streams properties and on the nodes can might contain metadata about the entities and edges between the nodes which represent relationships as each event comes in we translate it into nodes properties and edges and then merge them into the graph with one unified graph of all events from all streams we can then continuously walk the graph looking for specific patterns we’re interested in obviously we can’t ignore technical limitations so why have what have been the limitations of graph databases

first graph databases are unable to process event streams at or exceeding a million events per second in fact most struggle to reach 10 000 events per second Enterprises today have use cases which exceed a million events per second and would prefer to push their systems to 10 or 15 millions of million events per second second current graph databases have computational limitations some focus on optimizing around persisting the data as quickly as possible but then any query requires Atmos or at worst a full scan of all nodes into memory when walking the graph to perform computation and pattern matching some focus on trying to keep as much of the graph in memory as possible so pattern matching can be very fast but then they’re limited by how large the graph can be or they have to fall back to scanning from the persistor of course some have gotten clever and have found ways to compromise on the trade-offs but they all still have trade-offs between how fast they can ingest data and how fast they can perform pattern matching computations on the graph and we don’t want to compromise we want both we want to scale linearly along both dimensions let’s put some real numbers behind our goals we want a scale to ingest 1 million events per second and we want it to scale linearly in terms of interest rate and query processing rate so we can have confidence in our ability to scale to that of the largest Enterprises exceeding 10 million events per second we want it to match a four node complex query in real time this pattern should be relatively rare making it harder to find like two percent of the stream might complete the pattern but even at this scale at a million events per second that’s still 21 000. successful query matches per second and we want to stream these results to a downstream Target in real time and finally for completeness we want to do all of this while introducing hosts and network failures on cost optimized infrastructure just to make it realistic

let’s start by designing a new architecture what if we started with a database which already has similar characteristics like Silla DB it scales linearly to enormous scales it has incredibly low latency on queries it is highly resilient to infrastructure failure but it isn’t a graph it’s a key Value Store but let’s run with that anyway what if the properties and edges of each node in our graph data structure has a value and each node had a uniquely hashed key based on the properties which make it unique then we’d be able to create and retrieve any node in constant time so we could build a graph data structure over a key Value Store each node in our graph would be a key value pair where each node would have a uniquely hashed key from its properties and the value would be the properties and edges on the Node this might work for having a very large graph but complex event processing requires us to constantly walk the graph looking for complex patterns when is the best time to perform the computation of a query it’s when a node is changing and already in memory but any query worth running especially in complex event processing must span multiple nodes that’s the point for those of you who know how a language compiler works you’ll know that languages get parsed into an abstract syntax tree a tree data structure is just a graph without Cycles what if we took the syntax tree from a query and refactored it by distinct by the distinct nodes in the query or in the pattern here’s some Cipher it’s a graph query language it uses ASCII art to define a pattern instead of a SQL select statement it describes it describes looking for a three node pattern where each node is of type login connected by edges of type next where the first two have a result property equal to failure and the third has a result property equal to success but this is equivalent to three node queries the first says find a node of type login where a result property equals failure and follow its next edges looking for a neighbor of type login where it has a property result equal to failure and the next edges again follow those edges looking for a neighbor of type login this time where the result property equals success it can be refactored into a tree of node level queries can we guarantee that every query can be refactored into a tree of computations per logical note they’re describing as it turns out yes you can do this it does not matter if our query had a fork in it or a branched in multiple ways this process would still work we can always translate a query into a tree of node queries per logical node in a pattern and this is a huge observation because it means we can embed our queries in the graph every time a node changes during ingest that node will be brought into memory anyway and while it’s already in memory we can check if those node if those changes have an impact on any of the registered queries in the system we can then cache these node query results on each node to improve performance when enabling those results change

this would enable us to perform query computations internally to the graph as it changes not as part of an external process that needs to scan every node in the graph looking for pattern matches and since the nodes are already in memory it will be very fast

as I said before Quine does exactly what we’re looking to achieve let’s look under the hood of Quine and compare it to what we were just designing

Quine ingests data from multiple sources such as Kafka Pulsar kinesis on ingest each event is translated and merged into an ever-evolving graph representation of the streams the cipher query language is used to define how to extract and transform an event into the graph we’ve extended our implementation a cipher to support ID from which is a hashing function used to generate a consistent and unique hash for every node in the graph based on the properties of the node which make it unique this allows nodes no matter how long it’s been since they were last updated or created to be loaded into memory in constant time we persist all changes into a plugable persistor we support several different persisters ScyllaDB DB due to its low latency clustering and scaling capabilities is an excellent choice

as the graph changes it runs live computations for any registered queries we call these standing queries this makes Quine stateful with respect to the streams and event driven with respect to the computation of standing queries again Cipher is a language used to write standing queries just like for ingest if you know Cipher you already know how to interact with Quine if not Cipher is easy to pick up and well documented standing queries are translated as we already discussed into a tree of node queries these node queries are re-evaluated as nodes change and when a node query result is observed its neighbors which have subscribed to that result are notified this means even a node which has not been updated in months can be reloaded into memory in constant time because the neighbor was just updated yielding a standing query to result in a match there are no time windows so we don’t lose observations and this is because we’ve embedded the query in the graph

but of course we want coin to be embedded within our streams which means the standing query results need to go somewhere these results are streamed out to Downstream targets to be reacted upon in real time those of you who understand back pressure and why it is so important in distributed streaming systems are likely aware that Cilla DB put a lot of energy into this ScyllaDB DB is designed to back pressure from the physical Hardware similarly Quine is designed to back pressure you can insert it into your Enterprise scale streams and have confidence in its performance and resiliency all right let’s test this now we put Quine to the test to see if it could achieve the complex event processing challenge at Enterprise scales let’s remind ourselves of our goal to ingest 1 million events per second and we want to scale linearly in terms of both ingest rate and query processing rate so we can have confidence in our ability to scale Beyond to 10 plus million events per second we want it to match twenty one thousand four node complex queries per second we want it to stream these results to a downstream Target in real time and for completeness we want to do all of this while introducing failures on cost optimized Hardware We performed an initial test last fall we created a python script for a real world customer use case which was designed to produce a four node pattern just over two percent of the time we used this script to generate eight billion events and we pre-populated a Kafka cluster we did it this way because we didn’t want to have to build a cluster capable of generating over a million events per second just to pass through Kafka so instead we pre-populated it and every time we performed our test Quine connected to Kafka as a new client allowing it to start from the beginning of the Stream eight billion events is enough to sustain a million events per second for just over two hours we initially provisioned a Quine cluster composed of 140 30 core hosts and a database cluster composed of 66 32 core High mem hosts and we turned replication down to one for the database because we didn’t or because we wanted a worst case failure scenario

and here’s the results we were successful we killed kleinhouse and database hosts we introduced High latency stopping and resuming equine host and a database database host for a whole minute the system always recovered back to a million events per second in most cases the events had no measurable impact in the rest recovery was quick we showed we can resiliently ingest 1 million events per second and here’s the standing query results we can compute twenty one thousand four node complex queries per second keeping up with ingest

but we’re not yet done we can cost optimize our infrastructure more and we still need to demonstrate linear scaling we spent some time tweaking and tuning the system to understand performance characteristics and realized we might have over provisioned both the Quine and ScyllaDB DB clusters in our original run we came to suspect we could achieve the same result with 120 16 core coin and 40 16 core High mem ScyllaDB DB that’s a 54 decrease in Quine hardware and a 70 decrease in silly DB hardware for the same result that would bring the gcp cost down 62 percent to 50 an hour to perform Enterprise scale complex event processing at a sustained 1 million events per second

well we clearly had to do that test and here’s the results with the reduced cluster sizes we still achieved 1 million events per second and computed twenty one thousand four node complex queries per second on cost optimized Hardware in part thanks to sub millisecond latency from ScyllaDB DB

as we were preparing for that run we decided to provision Quine and solid DB with different size clusters to understand how Quine scales and indeed Quine scales linearly along both dimensions ingest rate and standing query results rate so there you have it complex event processing is hard graphs haven’t been able to meet the challenge until now but Technologies like wine are finally catching up the challenge of complex event processing at Enterprise scale can now be realized not only did we achieve the goal but we proved we can scale beyond what was once impossible we exceeded the performance of every other graph database when considering both ingest rate and computational results per second and when compared to other complex event processing Solutions not having time Windows puts Quine in a league of its own these results are exciting we’re confident that we can absolutely scale linearly to exceed 10 million events per second I want to thank you for listening I hope you learned something today go to getting started to check us out let’s keep in touch [Applause]

Read More