ScyllaDB 5.0 Lightning Talks, Part 1: Building in Resiliency

53 minutes

Register for access to all 30+ on demand sessions.

Enter your email to watch this video and access the slide deck from the ScyllaDB Summit 2022 livestream. You’ll also get access to all available recordings and slides.

In This NoSQL Presentation

Discover the new features and capabilities of ScyllaDB Open Source 5.0 directly from the engineers who developed it. This second block of lightning talks will cover the following topics:
  • New IO Scheduler and Disk Parallelism
  • Per-Service-Level Timeouts
  • Better Workload Estimation for Backpressure and Out-of-Memory Conditions
  • Large Partition Handling Improvements
  • Optimizing Reverse Queries
ScyllaDB Engineers Mascot

ScyllaDB Engineers, Engineers, ScyllaDB

Hear from the engineers who develop ScyllaDB.

Video Transcript

Hello, everyone, and welcome to the final lightning talk session about what’s new in ScyllaDB 5.0. And there’s a nice bunch of presentations here. I’ll talk about the I/O scheduler, how we work to improve our understanding of disks and embed that into the I/O scheduler. There will be a talk about how ScyllaDB supports SLA so you can run multiple workloads on one cluster.

There will be a talk about our improvements to out of memory conditions so that you can run workloads with high concurrency without fear of running out of memory. We’ll talk about the ongoing improvements to large partition support and how the performance of workloads with large partitions can now be very similar to workloads with small partitions. And we’ll have a talk about reversed queries and how we improve reversed queries to the point that they have comparable performance to forward queries. And now onto the first presentation, my own, about the I/O scheduler. Hello, everyone. So in this mini session, I’ll talk about I/O scheduling in ScyllaDB 5.0, what’s new, what you can expect from it. A little bit about me. I’m the original maintainer of Linux KVM, the kernel-based virtual machine, and for 8 or 9 years now, I’ve been at ScyllaDB working on Seastar, the underlying I/O framework, and I’m also co-founder of the company. So why I/O scheduling? A database has to balance many different competing workloads. So first and foremost, your reads. You want your reads to go to the disk quickly so that they can be completed and the data returned to you. But there are also maintenance workloads that are competing with them. One of them is the compaction, where the database takes multiple SSTable files and makes them into one large SSTable to reduce the read amplification. And the other maintenance workload is repair or bootstrap and decommission where there is work to move data to new nodes, remove data from old nodes and synchronize data between existing nodes. And it is important not to have one of these workloads affect the others in a negative way, or you will see this when latency shoots up while you’re, say, starting a new node. So we must keep the bytes flowing to the disk. And this is a diagram of how the I/O scheduler works. We have queues for requests for serving reads, for serving compactions which includes both reads and writes and the maintenance queue for bootstrap and decommission of nodes and repairing between nodes. And these queues all connect to the scheduler which restricts the flow of data to the disk, and it must be very careful, so if the scheduler does not restrict the flow of data enough, then it can cause congestion on disk. And if it restricts the data too much, then the disk is performing below its ability, and you will get higher latency. And when the scheduler restricts the data, it is moving the congestion from the disk to itself, and you might ask, “What’s the point of this action? You are just moving the congestion from one point to another.” And the reason is so that we can select which queues will be congested and which queues will not be congested. So if the disk is uncongested, then we can make sure that the I/Os from reads keep flowing to the disk and I/Os from compaction and maintenance are blocked by the I/O scheduler or throttled, and the congestion is only passed to them and not to the read queue. Of course in different scenarios, we might actually want to congest the read queue in order to allow maintenance operations to complete in a timely manner, but that is outside the scope here. So far what we’ve talked about was present in existing ScyllaDB versions. The I/O scheduler is not new by any means. What is new is that we have improved our understanding of the disk model, and we’ve applied this improved understanding to the I/O scheduler. So this chart is quite complex. There’s a lot to take in, and I’ll start by explaining it. So here we are measuring an I3en disk from Amazon, and on the x-axis we are varying the bandwidth from 0 bytes per second to a gigabyte per second. And on the y-axis, we are varying the read I/Os person from zero ops per second to about 250,000 ops per second. And you will note that this is the matrix, so every point on the chart represents an experiment that we performed on the disks. For example, the point that my mouse cursor is pointing at is at around 300 megabytes per second bandwidth and 100,000 I/Os per second read operations. And the color indicates the latency. And you can see on the color bar on the right, you can see the different latencies that you can get from read. At the low end in the cyan color, we have about 100 microseconds, and the purple is about 5 milliseconds, and you can see that at the 50th percentile, which is what the upper chart represents, we usually get very close to 100 microseconds of latency. And you might ask, “What is the white area?” The white area is experiments which were performed but did not achieve the required bandwidth and the required I/Os. So for example, if we tried to set the 800 megabytes per second in this area of the chart and 150,000 reads per second simultaneously, then this doesn’t work. The disk is not able to serve this workload, and something must be traded away. The lower chart is similar, so it’s the same experiment where we vary the write bandwidth from 0 to 1 gigabyte per second and vary the read I/Os from zero to 250,000 operations per second. But here, we are measuring the 95th percentile latency, and you can see that there is an impact. When both the write bandwidth and the read IOPS are higher, there is an impact. Here in the purple area, the latency is around 5 milliseconds, so that’s quite high latency. And the takeaway from this chart is that we must stay below this diagonal line. And this is what the new scheduler embeds. It is able to calculate where we are on the chart, and there’s even more intricacies because not all read IOPS are the same. Some are reading small amounts of data, and some are reading large amounts of data. So it takes all of this into its calculation and determines where we are on the line, and if it determines that we are close to the diagonal line, then it restricts the flow of I/O even more, and this allows the disk to stay uncongested, and it allows the read operations to flow through while the compaction operations and the maintenance operations are delayed. And that’s it. There will be more in a dedicated talk about that, but that’s it in a nutshell. And so let’s recap about what we did here. So first, we collected lots of information about the disk. Each such experiment takes several hours, and there are more than 400 data points in each chart, and we did that for multiple disks. We used this information to build a more accurate disk model, certainly more accurate than the one that we had before. And we made adjustments to the I/O scheduler so that it follows the new model and applies it to the read and write activities that the database performs. So thanks, everyone, and if there are questions, you can ask on the mailing list or in our Slack channel. You can follow me here on Twitter or my email. Thanks. Next up is Eliran with a short presentation about service levels. These allow you to define separate workloads and with different parameters. So here’s Eliran. Thanks. Hi, my name is Eliran, a software team leader at ScyllaDB, and I’m here to share some of the results of an activity we had in the past year aiming at reducing overload impact on ScyllaDB. Overload is something that may happen even on a correctly configured cluster, and the main concern is not preventing it, but keep serving requests despite the overload and producing as predictable performance results as possible. When thinking about it, once we eliminate all overload, obviously undesired effects like crashes, stalls and disconnections, we still have to deal with the simple fact of life. The cluster at the moment can’t fulfill every request according to the user expectations. Of course, there is a whole class of solutions that works right out of the box, some of them presented in the past, the most popular being throw more money at it, and a derivative of that approach is keep the worst case size data center for every workload. In the future, we might have elasticity to deal with this at runtime, but even when we do, it is not guaranteed to solve all of our problems. We came to realize two things. The first is that in the real world where we have limited amounts of money and processing power, overload is a fact of life, so it is better to optimize behavior under overload than trying to aim for an overload-free environment. And the second is that sometimes, the point at which this overload hurts the user can be stretched. Moreover, there are some workloads that are not really hurt by overload but only affected by it. Workloads can have different properties depending on the client software or user profile that drives them. They can have different kinds of parallelism, small versus large or correlated versus uncorrelated, latency distribution expectations and some more subjective properties like priorities which are more about user perspective. We have already introduced a means to dealing with at least the last example, workload prioritization. At its core, this feature aims to use let the users specify the importance of workloads relative to each other. This in turn reduces the impact workloads have on each other by distributing some of the system’s resources according to the user preferences. This alone has the potential to balance between some workloads and mitigate some cross-workload impact like shown in this graph taken from our blog post about workload prioritization. It shows how workload prioritization can be used to mitigate the impact of analytics on latency of interactive workload. Unfortunately, in our latest efforts, we discovered that it’s not enough. This relative way of expressing workload priorities is not expressive enough on its own. For starters, different workloads have different and explicit expectations about latencies which means that mitigation is not enough. There are some configurations that it is quite apparent that can’t hold globally in multi-workload environments, time-outs being one of the easiest to reason about, but there are others, and we will touch one more later. In addition, some of the very advanced techniques that we use today can be further tweaked to accommodate for this difference in workloads, a real-world concurrency semaphore for being one of them. I will not get into the details, but you are welcome to watch Botond talk about it later. With this understanding in mind, we started to try and figure out what is missing. How we can improve our behavior under overload? We very quickly came to realize that before improving the behavior, we should make sure that this is actually an improvement. Meaning, given that we are overloaded, the newly implemented behavior is the one the user expects. After going over some real-life examples from our bugs and issues, we found that ScyllaDB needs some hints about what is the expected behavior for a specific workload, and so workload characterization was created. The following example is a classic use case where we are lacking the information to deal with overload or at least used to lack the information. We want to support a simple web server application. It has two workloads. The main workload consists of queries triggered by user clicking or navigating on some areas of the web page. The second workload is some analytics being ran periodically to collect some statistics or to aggregate some information to represent it to all users. The users behind the main workload expects high responsiveness. That translates to low latency, and it means that they will have short time-outs. Another thing to notice is that you can’t prevent users from just clicking over and over again because of what appears to them as a page being stuck. Failing to set low enough time-out on the server side can also trigger a whole retry avalanche effect that would appear on the server side like very high or unbounded concurrency. On the other hand, we have the secondary workload that makes a series of computations and can and probably is designed with limited concurrency which means that it can be controlled. With matters like throttling, this workload is a lot less sensitive to latency, and it is more throughput oriented. As I mentioned, for the main workload it is more suitable to have a very small time-out, and for the secondary workload, we need large time-outs to accommodate for always full queues. Even [when] the main interactive workloads time-out is configured, there is still the interactive user which can’t be configured. If it clicks over and over again, there is little you can do about it except for having an accounting mechanism on another layer of the system which means more development effort. However, since there is only one server-side time-out configuration, and it should be less than the client one, or we can have retry avalanche on the extreme case and wasted resources in the less extreme case. We can’t optimize for both, and whatever choice we make will be suboptimal for one of the workloads. We need to also decide on a proper response to overload. On interactive workload, it is probably beneficial to fail early by shedding load, if we see that our in-flight requests are going to time out or starting to pile up, for example, while on analytics we should delay some responses or even wait for a time-out to naturally happen on the client side since this serves as a back pressure mechanism. There are lot of useful characteristics that can hint ScyllaDB about expected behavior, not all of which are implemented, of course. The web server example demonstrates that there is some information about workloads that can help ScyllaDB to behave better and to stretch the overload limit further. It can help us utilize our cluster better and help us to reduce the administrative effort while providing us valuable metrics such as time-outs per workload and better isolation capabilities. It is also beneficial to characterize workloads in order to size the cluster correctly, and in the future, it can also help us to employ elasticity in a smarter way. A way to express those workload properties already existed in our enterprise version for a while now, and it is now extended and backported to our open-source versions as well. This concept is called service level. Those service levels can be attached to roles. When a user logs into the system, all of the service levels that are attached to the user and to his granted roles are combined to form the workload characteristics. Then in turn, ScyllaDB tweaks its behavior for requests that are sent in this session which is now tagged with specific workload characteristics. Utilizing this on a web server example, for the main workload we need low time-outs with load shedding as our overload response, and we would like to have a lot of dedicated resources available whenever this workload needs them. For the secondary workload, we can have pretty large time-outs to accommodate for always full queues. We would like to throttle requests under load so the computation is stable and controllable. And finally, we would like the workload to have very little dedicated resources and will use mostly unused resources to achieve better cluster utilization. The aforementioned requirements can already be expressed in ScyllaDB as shown here. Here are the commands for the main and secondary workload as they [were] really implemented. This breakdown of commands demonstrates how we would express our expectations for each workload. It is already fully implemented. Workload characterization is still a work in progress. The service level mechanism gives us a way to easily add more advanced configuration options in the future. There are still a lot of future improvements that can be implemented on a per workload basis. But according to what we’ve learned, per workload characterization is one of the cornerstones in utilizing the cluster in full on one hand and doing the right thing in the presence of overload on the other. Thank you. Thanks a lot, Eliran. Next up is Botond with an explanation about how we’re improving ScyllaDB to deal with memory management. ScyllaDB is able to run many reads in parallel, and each read can take a very large amount of memory, and so it’s a large job to manage all of the memory for all of those concurrent reads, and Botond will explain how we improved this for ScyllaDB 5.0. Hi, everyone. In this presentation, I’m going to go over recent improvements to the out of memory resilience of reads on the replica. My name is Botond, and I am a software engineer working at ScyllaDB since 2017 as a member of the storage team. So what’s the basic idea here? We want to control the concurrency of reads on the replica with the goal of keeping concurrency within a useful limit and of course to avoid resource exhaustion. This happens via semaphore which is dual limited with count and memory resources. Each read consumes one count and a fixed amount of memory on admission. As the read progresses, its memory resource is tracked, and it’s consumed from the semaphore’s memory unit. The goal here is that we don’t want to allow anymore reads once the memory is exhausted. We have a separate semaphore for each scheduling group on each shard, and semaphores are created with a fixed amount of count resources and the amount of memory that is some percentage of the shard’s total amount of memory. Two percent of the shard’s memory is a typical value for the amount of memory resources. As for count, the user read semaphore, which is used for user reads, has 100 counts and internal semaphore set of 10 counts. We depend on the accuracy of tracking of the memory consumption of reads for being able to determine that a new read can be admitted or not. So it is crucial that we track the memory consumption of as many aspects of reads as practical. This is an area that we’ve improved a lot recently. The I/O buffer tracking had a part where buffers were tracked only after I/O completed, not when they were allocated. We now also track all buffers using I/O and parsing raw data coming from I/O from the very moment they are allocated. We also track the internal buffers of readers. Readers are — You can think of them as abstract objects that do the actual reading and that their output is a stream. So these improvements vastly improve the effectiveness of our memory-based concurrency control. There’s still more aspects of reads that are untracked, but extending the coverage has diminishing returns. For example, fixing the tracking of I/O buffers was a three-line change and brought huge improvements, but on the other hand, tracking the buffers of readers was a lot of work and in some workloads, in particular very small cells, the effect is barely noticeable. We have also addressed the most common causes of out of memory condition directly. These are in particular unpaged and reversed queries or even worse, the combination. Both can consume an unbounded amount of memory internally, and they can easily cause — a single unpaged read can easily cause an out of memory condition on its own. So we have introduced a targeted fix for these in the form of a pair of limits, a soft and a hard limit. So when one of these reads crosses the soft limit, we print the warning in the log, and when it crosses the hard limit, we abort the read. The goal here is to allow [users] to set a very strict soft limit and allow users to notice that they have problematic queries and adjust their application. And then once they do that, they can tweak the hard limits to be stricter as well. Another big work that we’ve done is to move the semaphore to the front. So historically, we only applied concurrency control on reads that actually had to go to the disk. As you may know, we have a cache that represents out the content of the disk, and optimally, your read would hit the cache and would not have to go to the disk. So you can see two pictures of the before and after state. On the pictures, you can see a reader tree for a typical read. On the top level, we have a memtable reader and a cache reader, and their output has to be merged, and this is done by a combined reader. Like I said, the cache represents the content of the disk, so if you have a cache miss, the cache itself creates the disk readers, and the read from the disk will happen through the cache, populating the cache during the process of executing the read. We might read from more than one SSTable in which case we will have one reader object per SSTable, and again, their output will be merged using a combined reader. So as you can see, the concurrency control represented here by the purple restricted reader is injected between the cache and the disk, and therefore it’s only activated when the disk readers are created. The dotted purple therefore represents the readers that are actually covered by concurrency control. I suppose the assumption here was that in-memory reads complete so very quickly that they don’t really need any concurrency control because they can’t effectively run concurrent to each other, but this assumption was invalidated, and in certain workloads, we started to see out of memory conditions caused by cache reads so we had to move the semaphore to the very front. Reads now have to pass admission before the reader objects are even created. Despite all the improvements, I just discussed problems can still happen. Previously, the only way to debug out of memory and concurrency semaphore related bugs was only possible with core dumps. Core dumps are a pain to work with. First of all, they have to be available which is not always the case. Sometimes when memory runs out or you see the bad dialogue exception messages spawned in the logs. To help with investigating these conditions, we have altered the diagnostics feature around them. For example, when memory runs out, ScyllaDB will dump a report about the state of its memory allocator. By default, this only happens when critical allocations fail, those that will eventually cause a crash, but this can be configured to be dumped on any bad dialogue. We altered the similar report dump to the concurrency semaphore. This is dumped when the semaphore times out, when the reader that’s queued in the semaphore to be metered times out, to be exact, or when the queue grows to such a size that it overflows. We have a limit for the size of the queue. So these reports are dumped to the logs which is very important because logs are very easily obtainable, and then they can be copied through a GitHub issue or bug report, and they can kick-start the investigation, or in some cases, we had some concrete cases where just the report from the log was enough to find the bug. With all the discussed improvements, we are at quite a good place now. Out of memory crashes are actually quite rare. We are also currently working on improving other weak spots that we know about. There might also still be corner cases hiding, so we are not letting our guard down. That was it. Thank you very much. Thanks a lot, Botond. Next up is Tomasz with a presentation about how we improved ScyllaDB’s large partition support to the point where it really doesn’t matter if you have small partitions or large partitions. We’ve done this work over the years, and the latest installment in the saga brings it almost to a close, so here is Tomasz. Thank you, Avi. I am Tomasz Grabiec. I’m a software engineer at ScyllaDB. I have been working for over 8 years as a core developer working on all things around the core. Now what’s the new feature in ScyllaDB 5.0 all related to SSTable indexing? We added automatic caching of SSTable indexing which I will explain later, but the bottom line is that the reads from disk are now much faster than before especially for large partitions. So first, let’s have a look at what indexes are, and how are they used during reads? When we have a read from an SSTable, we first need to locate the position in the data file where the data should be read from, and the index is a data structure which allows the read to narrow down the position. It can be represented as a binary search tree, and that’s how I’m showing it on this diagram and across the presentation. Actually, it has a more complicated structure than this, but it doesn’t matter. This is a good enough abstraction. So another thing about an index is that it has multiple layers. We have the partition key index which is at the top, and the partition key index has an entry for every partition inside the SSTable. And below that, we have a clustering key index which has a variable size depending on the partition, depending on the partition size. Some partitions may have larger clustering key indexes. Some partition don’t have any. Here for simplicity, I’m showing it as having the same size for every partition. Another thing which can be distinguished in the index is the partition key index summary which is the top part of the partition key index. This is the part of that partition key index which is always loaded in RAM. It’s decided on what it is when the SSTable is written to disk. It’s stored in a separate file on disk, and it’s a sort of static caching for the index. We choose it to be small enough so that it fits in memory, so currently we use a ratio of one to 20,000. Meaning the ratio of summary size to the data file size because ScyllaDB aims at the memory to disk size ratio of one to 100. This makes sure that for every deployment, we can fit all summary files in memory. This makes sure that we fit the summary in memory, but of course it may mean that we don’t use as much RAM as we could, and as a result, the reads are not as fast as they could [be]. Which brings us to the problem with this current caching, which is that every read which goes to SSTable has to touch the disk in order to read the indexes because there is always a part of the index which has to be traversed by the read. This of course increases utilization of the disk and adds latency to the query and reduces throughput. Another problem with the old way of things was that only the summary was cached. The clustering key index was not cached, meaning that if you had large partition, then the whole clustering key index has to be read from disk, had to be traversed from disk, and you experienced diminished caching. If your partitions were large enough, that would mean effectively no caching for the index. And the solution which we implemented in ScyllaDB 5.0 is to dynamically cache all the index, meaning that whenever you had a read which goes to disk, the cache is populated during read in a read-through manner with the data which was read from the index file. This way, the index can be reused by later reads. The cached index is evicted on memory pressure automatically, and the old static caching is still kept in place. We didn’t make the summary dynamically cached. It’s still always in memory. So this diagram here shows for example why caching is beneficial. Here you have two reads which go to disk, the green one and the red one which go to different locations in a data file. However, as they go to the data file through the index, they share most of the path, and if we cache the index for the green read for example which was earlier, then the red read can leverage the fact that it was cached and avoid I/O during its search. So here is an example of what kind of improvement you can expect for a large partition. Suppose you have a partition of 10 gigabytes in size with 10 million rows and index file size of 5 megabytes. A single read from an SSTable which contains this partition would have to execute the following I/O requests: two times 32 kilobytes for the partition index, 17 4-kilobyte requests to search inside the clustering index and then two requests to read from the actual data file, and the total for this operation would be 196 kilobytes and 21 I/O requests, and I measured this on my disk to take 20 milliseconds. Now if the index cache is populated for this read meaning it’s hot, then we can avoid I/O requests for the partition index part and the clustering index part, and we’re left only with the I/O for the data file which means more than half of bandwidth is used. Only two I/O requests are made, and latency is much lower. I measured 2/10 millisecond. Here is a similar example on the same data set, but this time it’s a throughput test. I used scylla-bench to measure throughput on a single node, single shard with 4 gigabytes with concurrency of 100 threads, and I measured an improvement of throughput from 2,000 rows per second to 6,000 rows per second, and the improvement is about 3X, and it stems directly from the fact that, as you can see here, the bandwidth is reduced about three times, so the throughput is expected to also increase about three times since in this test, we are bound by this bandwidth. Now a little bit about implementation details, there are actually two kinds of caches which were added, and the main one is the index file page cache which is similar to the Linux page cache over the index file. We added a cache which stores chunks of the index file in memory with the 4-kilobyte chunk size. It’s of course populated on demand read-through and evicted automatically, and this kind of cache serves the purpose of caching the clustering key index. The clustering key index has an on-disk representation which is random access, so we don’t need to have a parsed representation in memory. We can use the same representation which is stored on disk. And the index file page cache makes sure that as we read the index file, we don’t have to touch the disk each time. And this diagram here shows you an approximation of how the caching would look like if you consider the cache to be a binary search tree. Basically, those vertical chunks of the tree can be either cached or not cached, and as you binary search through the index, the vertical chunks of the tree will be inserted into the cache. There is another kind of cache which was added. The caching for partition key index pages is done separately. What is a partition key index page? If you recall the partition key index summary introduced earlier, then consider that the partition key index is divided by the partition key index summary into subtrees, and every such subtree is called the partition key index page. This page has an on-disk representation which is not searchable unlike the clustering key index. So whenever we access those pages, whenever we access this part of the index, we have to parse the whole page, so we added caching for those pages so that we need an individual partition from the page, we don’t need to parse the whole page every time, and thus we can save on the CPU time. So those two caches share the space in memory with each other. They compete for space, and they compete also for space with existing cache which is called the row cache, which is the cache for the data combined from all SSTables which we have on disk, and all those caches behave in a similar manner meaning they use all the available free space. There are no separate tunables to set limits for each of them. They automatically grow and expand to use all available free space, and because they compete, we need some policy for choosing which entries to evict. And currently we implemented a single LRU as an eviction policy, so every cache has the same rights, and for example, if you have no reads from disk so no need to cache SSTable indexes, then row cache can take up all the space. This is maybe not optimal for all workloads, so this is room for improvement in future versions. And thank you for listening. That’s all I had. If you have questions, feel free to contact me. Now back to Avi. Thank you very much, Tomasz, and next up is Kamil with a presentation about reversed queries. These are something that most people will never have encountered, but those that did may have some pain from that encounter, and we did a lot of work to alleviate that pain in case you want to use the reversed queries. So here is Kamil. Hi. Thank you, Avi, for the introduction. Recently, ScyllaDB gained a new implementation of reversed queries which I’m going to talk about in this presentation. My name is Kamil. I’m a software engineer at ScyllaDB. A reversed query is a query in which the specified clustering key ordering is different from the ordering in the schema. There are two possible orderings, ascending and descending. If your schema was created with ascending order, a query with descending order is a reversed query and vice versa. For example, suppose your schema specifies ascending order. In your query, if you don’t specify the order or explicitly say for it to be ascending, you get a regular forward query, and your data will be sorting according to the clustering key in ascending order. On the other hand, if you specify the order to be descending, this is a reversed query. Your data will be sorted according to the clustering key in descending order. The old implementation of reversed queries had significant problems. I’ll illustrate it using SSTable reads. Suppose you want to query a range of clustering keys from six to 16. The old implementation would start by performing a regular forward query on this range, fetching it entirely into memory. It would then iterate over rows in the range in the reverse and construct the page, then return it. All those rows fetched from the query’s range which don’t belong to the first page are wasted work. We throw them away. When the second page is requested, we do the same thing but with a smaller query range. Now this solution had a quadratic complexity problem. Suppose that there are 10 pages of data. We would read pages one to 10 into memory, return page 10. When the second page is requested, we would read pages one to nine into memory, return page nine and so on, and so on until we return the first page last in reverse. For N pages, we would read a quadratic number of pages, N squared. This solution could be maybe improved by caching the result of the initial query after the first page and getting back to it when the second page is requested, but we don’t know how long we’re going to wait for the second page, and in the meantime, the cache may be cleared or invalidated, and we may not even want to cache the result in the first place due to the second problem which is that the entire range may consume huge amounts of memory. Just to return that single page, we need to fetch the entire queried range, and it may not even fit in memory, causing the read to fail. With sufficiently large partitions and sufficiently large queries, this old implementation would simply not work. The new implementation solves these two problems. Both SSTable and MemTable reversed trees were improved. For illustration, I will focus on SSTables since that’s where most of the data resides, and in my opinion, that’s where the most interesting changes happened. Back to our example, the queried range is six to 16. To return the first page, we first need to find the last row of the queried range, in this example, key 16. We start by consulting the index to find the nearest row before 16 that the index knows of. Let’s say that the index knows where 14 is. We fetch a chunk of data into memory starting at the position given by the index. At this point, it looks just like a forward query. We parse row 14 which gives us the position of row 15, which we didn’t know until now because the size of rows are not constant, in general, and now we can parse row 15, then row 16. If there is any remaining data in the buffer, we can discard it as we only care about the queried range. Now suppose we need more rows for our page. We fetch a chunk of data into memory, but this time before row 14, and now face a problem. Where does row 13 start in this buffer? Thankfully, the SSTable mc format comes to the rescue. In the mc format and newer ones, every row stores the size of the previous row which we use to learn the position of row 13 so we can parse it. We can continue like this, fetching more buffers if necessary until we fill the page. Finally, we can drop any unnecessary data and return the page reversed. Note that this is a simplified picture, but it gives the rough idea of how things work today when reading in reverse from mc SSTables in newer formats. An important part of this implementation is the previous row size metadata which was not available in ka, la SSTable formats. If your data is stored in those other formats, we keep using the old method. The new implementation features linear complexity and much better memory consumption, proportional to the page size. We no longer need to fetch the entire queried range into memory at once. Reversed reads from MemTables previously worked similarly to SSTables. We would perform a forward read of the entire queried range, then return the actual requested page. Now we perform a direct reversed reversal of the MemTable structure. Note that reversed queries are not allowed for range caps, only for single-partition queries. Now let’s look at some numbers. For the purposes of this presentation, I did a very simple benchmark to see how this new implementation performs compared to the old one. I used a simple single-node setup on my laptop. I didn’t set up a larger cluster since most of the changes happened on the storage layer, in the code where we read SSTables and MemTables. I created partitions of different sizes, 10 megs, 15 megs, 20 and so on up to 110 megs and queried them forward and backward. I did this using the open-source 4.5 branch and master branch. The schema had two integer columns for partition and clustering key and one text column so I could insert larger rows. The only reason for this was to reduce the time to insert the data. For each partition, I’m running these queries with and without the order by clause, bypass cache so we exercise our SSTables and two versions with and without a row limit. The row limit version makes the result fit in a single page. The specific number of rows is not that important. And the no limit version gives us the entire partition. I performed each query 10 times, took the mean and standard deviation and plotted an arrow bar. By the way, I’m using the Python driver which is not the fastest of drivers, so it may cause a bit of overhead. On the x-axis, we see the partition size in megabytes, and on the y-axis, we see the query duration in milliseconds. This is a graph for the row limit select, so we fetch only a single page. As we can see, forward query duration does not depend on the partition size. It’s roughly constant, here about 3 milliseconds. The results are pretty much the same for master and 4.5. But for reversed queries, there’s a significant difference. On 4.5, the duration of a reversed query increases the linear rule of the partition size even though you’re always fetching the same number of rows, a single page. And at 100 megabytes, reversed queries start to fail because there is a memory consumption limit which we exceed. On master branch, however, reversed queries act similarly to forward queries. They are a bit worse than forward queries. Here, the duration is about 5 milliseconds. But it does not depend on the partition size. Now if we drop the row limit, so we query the whole partition, this is what we get. The duration of forward queries increases linearly with the partition size which is expected since we’ve queried the whole partition. On 4.5, the duration of a reversed query is a quadratic function of the partition size, but on master, it’s again linear as with forward queries. Summarizing, in ScyllaDB 4.5 and older, reversed queries had quadratic time complexity with respect to the size of the queried range. Memory consumption was linear. For sufficiently large ranges, the query would have to simply fail. mc and newer SSTable formats allow a better implementation, and in the upcoming release, the time complexity of reversed queries is linear with respect to the size of the queried range, and memory consumption is linear with respect to the page size, so even if your range is large, you can still perform reversed queries on it. Thank you. Now back to Avi. Wow, thanks, Kamil, for this presentation about reversed queries. And that wrapped it up for ScyllaDB Summit 2022. Stay tuned for more exciting development about ScyllaDB. You can subscribe to the mailing list and to our Slack channel to get them in real time, and we’ll meet you in ScyllaDB Summit 2023. Thanks, everyone, hope you enjoyed it.

Read More