Close-to-the-metal architecture handles millions of OPS with predictable single-digit millisecond latencies.
Learn MoreP99 CONF is the event on all things performance. Join us online Oct 23-24 — Registration is free
Close-to-the-metal architecture handles millions of OPS with predictable single-digit millisecond latencies.
Learn MoreScyllaDB is purpose-built for data-intensive apps that require high throughput & predictable low latency.
Learn MoreLevel up your skills with our free NoSQL database courses.
Take a CourseOur blog keeps you up to date with recent news about the ScyllaDB NoSQL database and related technologies, success stories and developer how-tos.
Read MoreToday we are going to talk about our journey using PostgresDB to ScyllaDB for our next Generation query engine. I’m Dan Harris and I’m here with my colleague Sebastian. We’re software engineers at Coralogix working on our next Generation distributed query engine.
uh so we have a couple sort of core requirements here one is like low latency right part of the point of this was to limit the amount of time we spend dealing with metadata so that was sort of the first most important requirement uh it needs to be scalable both uh in terms of how uh how much we can scale up the read and write um capacity and also uh uh storage uh how we can scale these the sort of underlying storage um so as an example for just one of our larger customers they write about 2 000 parquet files per hour about 50 000 per day that’s 15 terabytes of parquet data per day and 20 Gigabytes of just uh just parquet and that’s one day for One customer um so just storing the metadata is something that used
um and we we sort of started this uh an initial implementation on top of postgres understanding at the time that that uh a non-distributed um engine would would not really be sufficient for the long run
um so there’s a couple sort of key key things that we store in the metastore so the first thing is what we call blocks so this is basically one row group and one parquet file um and it’s it’s basically just some high level metadata about about that uh that row group so you know the URL and object storage for that uh for that file the road group index of the block um some uh very light metadata about what’s in that file like the minimum maximum time stamp for records in the file and total size and number of rows
um and then additionally we want to store away uh we want to sort of provide from the metastore a way to uh sort of prune what we actually need to read based off uh something that we can do when we’re listing the underlying blocks um so uh uh a cheap ish way to do that is to use Bloom filters so essentially we want to support something like full text search um so we basically when we’re adjusting these files into our system uh we can sort of build a bloom filter for all the distinct tokens that we find in the file and then based off a particular query we can use those Bloom filters to actually uh prune the data that we need to scan to begin with um so uh we we need to put this in storage somewhere so that it’s uh sort of quickly readable uh so we we sort of Break um you know we use a basic uh block split Bloom filter um setup where basically you have uh the bloom filter broken into 32 byte blocks and then we can sort of store those independently uh so that we don’t need to read the entire Bloom filter during uh query time and then finally we store the the column metadata so this is basically for each column in the parquet file we grab just that chunk of metadata and we store that in the uh in the metastore so uh the the files that we’re writing are quite wide sometimes as many as 20 000 columns so by reading only the metadata that we need to we can we can uh really reduce the amount of i o we need to do on any given query
all right so uh I’m gonna hand it over to uh Sebastian who’s going to sort of tell you a little bit of how we implemented this on top of this ledb
uh so let’s start with how we modeled the blocks um so what you see here is an example of a block URL you can see the blue part is the the customer’s top level bucket and inside the pocket things are partitioned by by our and so the question is what should the primary key be in this case well we could use the table URL but the problem there is that some customers have way more parquet files and other customers so you know we want to keep things balanced and you know that’s why we have decided to not go for the table URL uh we could use a block URL in the row Group which uniquely defines a block but it would be very hard to you know list all the blocks for a given day you know because the timestamp is basically not in the key here
we could then use table URL an hour and that works because you know if you have 24 hours to create you can just uh grade quite easily we made one addition and I created a segmentation key so we can easily find a given block uh in in an hour so if you want to delete something that’s also um you know pretty easy let’s move on to something more interesting the bloom filters so as Dan mentioned um you know we need to verify that certain bits are Set uh questions how do you do that so that doesn’t have anything like custom for that that we can easily use uh so the solution is to read the bloom filters and process them in the application the problem here is that we have sometimes 50 000 blocks per day for a customer uh times 262 kilobytes which is the size of one bloom filter and we have 12 gigabytes of data which is just too much to pull back into the application for one query so what we do is we uh we chunk the bloom filter because as I also mentioned you know we don’t need the whole Bloom filter we just need like part of them uh depending on the tokens so we chunk then we split them into rows and then you know per token that we need to find we basically only need to read 1.6 megabytes uh which is way more doable
and then another question what should the plan with keepy well you know we could say like we’re gonna group because one block has one Bloomfield in it so you use uh the the block identifier as the primary key and then use the chunk index as the other key but um and that works you have 8192 chunks times 32 bytes is 262 kilobytes so pretty uh evenly spent and per partition it’s also easy to insert you know you can everything’s in the every boom filters in the same partition so you can just do a single batch query uh the problem is you know we’re reading the the blocks so uh sometimes we’re reading 50 000 blocks and you need to know the ID of the identifier of the block before you can you know read the bloom filter so if you’re doing a listing great for 24 hours you know of course you can pipeline it as soon as blocks come in you can start reading Bloom filters but you still have this dependency which is a bit annoying also you have to access really a lot of partitions like if you have 50 000 blocks you have 50 000 uh Bloom filter partitions that you have to access and you know even if you even necess super fast it’s still going to be hard to do that in less than a second uh so we decided for something else we basically uh you know same as with the Box we partitioned by the table URL the hour and we add the index to the partition key and the trick there is because in our application we know we need the fifth token we need the 10th token we need the nth token uh so it’s pretty fast to just like you know we read the whole partition basically uh this way uh for an entire hour and you know that gives us like 24 hours times five uh tokens is 120 partitions compared to 50 000 partitions that’s a lot better we also have a dependency on box anymore so we don’t need to have the block ID before we read the bloom filter which which is also nice you can just do both things in parallel and then at the end we can match them together but also some downsides because now we have to like insert uh our single Bloom filter in 8192 different partitions and also if you want to delete one sim we have to access like a lot of partitions but you know reading is is more important for us than writing um well you know these queries have to be fast so we really it’s very important and also you know in the future we might want to investigate a bit more of an optimal chunking now we spend to 8 000 plus rows maybe we can split into thousand rows and fetch a bit too much data into our application and uh you know it will still be fine probably and will potentially uh or will definitely speed up our inserts but what we really would like to do in the future is you know just move the work to the server basically and start using udfs so user-defined functions you know we already have everything written in Rust so we can you know we’ve been really nice if we can start using uh the udfs and basically you know we don’t have to send anything back to the application anymore and you know that gives us a bit more leeway to to play with the chunking that we would need to do
yeah also you know just some advice I I wasn’t uh I wasn’t used to um you know working with solo Cassandra but you know it looks a bit like SQL I know it says no sequel but still you know you have this uh ID error that is still SQL and so uh one uh they I figured out after we were already importing uh data for a day that you know I messed up Min and Max and I thought oh okay you know how am I gonna fix this and I thought you know maybe I can rename the columns to something else and then somehow uh make it work again I promise you cannot rename columns uh they’re not clustering key so they had to find something else that maybe I’ll add new columns and do an update blocks sets to the old value and unfortunately this also doesn’t work um it works in a normal SQL and postgres but yeah I couldn’t hear so we ended up dropping a drunk kid in the table and starting over again and because that seemed less work than just you know writing migration code at that point uh so yeah I don’t know what advice is here don’t write any books but that’s not that easy of course so just be careful um you know modeling and writing the code is one thing so we uh we use uh the ScyllaDB use a lot of facilityb components so we’ve written that code and rust uh using the 3db rust drive but one of you know the main things attracting us to Sila was also the operator for kubernetes we’re pretty big kubernetes users and you know having something that’s easy to set up um was definitely a priority for us and yeah we also use ScyllaDB monitoring the manager so you know these components really helped us to make the production ready and yeah I you know from deciding oh we might want to use Sila we were basically more or less production ready in two months which is quite nice
yeah cost is very important to us because the whole point of this project is to provide a low cost alternative for customers how to create their data their logs data their metrics and So currently we’re I think using the absolute minimum that is sufficient for us so we have adrenal cluster but acpu 32 gigs of memory um we’re on AWS so we’re able to use on graviton which is nice because it saves some costs then a more controversial decision is that we’re using EBS volumes a gp3 not spectacular numbers I would say um but yeah we’re trying to make it work this way and we’ll see how long we can manage um yeah so for the block listing maybe some some numbers so we have about four to five terabytes on each note currently we’re only um doing this for One customer uh we’re doing about ten thousand rights per second and the latency is a lower than one millisecond which is nice for Block listing you know it very much depends on the customer and the query and and how long it is if it’s like one hour so about two thousand per k files and we’re using Bloom filters it’s less than 20 milliseconds for 50 000 files it’s less than 500 milliseconds latency but bear in mind that we also do you know the checking of the bits and stuff and we could probably optimize it further but you know for 50 000 4K files less than 500 milliseconds is just good enough for us because the actual query uh that comes afterwards will be quite a bit longer so but I’m sure there’s room for improvement there
metadata we have a bit of an issue there because the p50 is really good but we do notice uh that you know this bit of a high tail latency and yeah the the problem is there that you know if you have 50 000 per k files and our executors are fetching all of these in parallel and um yeah you know we just have a very large amount of computer and queries and we’re not using the best disks so you know we assume that that is what’s causing uh the issue for us all right thanks Sebastian um all right so what’s the what’s the what are the Lessons Learned here um so you know I think the biggest uh the biggest difference between Scylla DB and using uh you know other RDP rdb dbms systems is that you really have to think very carefully about uh how you’re partitioning and partition sizes um so we went through quite a few iterations as Sebastian mentioned figuring out what our partition and clustering keys were and it’s it’s really important to Performance um and then you know you have to think again about your read write patterns right so are you read how do you write heavy do you need a mixed workload of lots of writes and reads um you know in our case we we are quite right-handedly because we are constantly ingesting data but we also need to prioritize um uh the reads because uh read latency is very very important to our you know product and customers and bright latency is we have a lot more leeway there um and you know we’re quite happy with the the fact that we could get uh the block listing um in and sort of get uh uh you know P99 um uh latency there under 500 milliseconds while you’re doing basically Bloom filtering with um across tens of thousands of uh individual blocks um but for reading the column metadata you know it’s been a bit more mixed right so it’s when it uh the p50 latency is fantastic right being able to do that in less than five milliseconds but uh the hightail latency is uh has been sort of problematic um and then along those lines uh you know we’ve we were sort of warned not to use uh try not to use EBS but at least we didn’t listen and we probably should have so uh you know if you’re considering using uh Sila it would probably be a good idea to look at sort of um and you know instances that have local ssds instead of trying to trying to use EBS volumes
oh all right thanks um so our contact information is there if you have any questions or like to reach out please just uh drop us a line sometime thanks [Applause]
Apache® and Apache Cassandra® are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. Amazon DynamoDB® and Dynamo Accelerator® are trademarks of Amazon.com, Inc. No endorsements by The Apache Software Foundation or Amazon.com, Inc. are implied by the use of these marks.