Change Data Capture (CDC) is a feature that allows users to track and react to changes in their dataset. CDC became production ready (GA) in ScyllaDB Open Source 4.3.
ScyllaDB’s implementation of CDC exposes a CQL-compatible interface that makes it possible to use existing tools or drivers to process CDC data. However, due to the unique way in which ScyllaDB distributes CDC data across the cluster, the implementation of shard-awareness in some drivers might get confused and send requests to incorrect nodes or shards when reading CDC data. In this blog post, we will describe what causes this confusion, why it happens and how we solved it on the driver side.
Change Data Capture
In ScyllaDB’s implementation CDC is enabled on a per-table basis. For each CDC-enabled table, a separate table called “CDC log” is created. Every time data is modified in the base table, a row is being appended to the CDC log table.
Inside a CDC log table, rows are organized into multiple partitions called “streams“. Each stream corresponds to a portion of the token ring (similarly to a vnode). In fact, a single stream corresponds to a part of a vnode which is owned by a single shard of that vnode’s primary replica. After a partition is changed in the base table, a stream is chosen based on the partition’s primary key, and then a row record describing this change is appended to that stream. Such partitioning into streams makes sure that a partition in the base table is stored on the same replicas as the CDC log rows describing changes made to it. This colocation property makes sure that the number of replicas participating in a write operation made on the base table does not increase.
How are Stream IDs Generated?
When the cluster is created or its topology changes, it must calculate a set of streams based on the current token ring. The stream ID will be used as a partition key in the CDC log table, so in order to achieve the colocation property mentioned in the previous paragraph, the token calculated from the stream has to belong to the vnode and the shard associated with that stream.
If you recall that ScyllaDB calculates tokens from partition keys using the MurmurHash3 algorithm, you will realize that calculating a set of streams for a token ring boils down to reversing the hash. In older versions of ScyllaDB, we used the following, simple algorithm:
- For each pair of vnode and shard, initialize an empty bucket.
- Repeat until no buckets are left empty:
- Generate a random 64-bit number,
- Use MurmurHash3 to calculate the token of that number,
- If the bucket corresponding to the token is empty, put the number to that bucket. If not, discard the number.
This method of calculating stream sets was quite computationally expensive. It is a Las Vegas-type algorithm – it is guaranteed to compute a valid set of streams, but it may take a long time to do it if we are unlucky. In order to make sure that the algorithm finishes in a sensible time frame, it would be stopped after executing a number of iterations proportional to the number of vnodes times the number of shards. Because of that, we would sometimes fail to compute IDs for some streams, and CDC log rows for those streams would have to be put into another stream. Because of that, those CDC log rows would have a different set of replicas than corresponding rows in the base table — which breaks the colocation property and causes some of the writes to the base table to be more expensive.
A New Approach
Instead of fighting the hash function, we decided to come up with a format of a stream ID which will be easy to compute and tell ScyllaDB to compute tokens from stream IDs using a different algorithm.
Currently, the stream ID is a 128-bit number, composed like this:
128 64 27 4 0 | <token:64> | <random:38> | <index:22> | <version:4> |
Given a vnode and a shard, this format is easy to compute as it’s easy for ScyllaDB to generate a token which belongs to this vnode and shard. The token is directly encoded into the stream ID so no hashing is needed to get it. The algorithm of calculating a set of streams becomes deterministic, and always manages to compute all streams in the set in a reasonable time.
Unfortunately, this defeats token-awareness and shard-awareness in drivers which do not understand this -— they will wrongly use MurmurHash3 to compute the token. Queries issued on a CDC log table will still work but will be sent to a wrong node or a wrong shard, resulting in greater latency.
Solving the Issue in the Driver
The solution on the driver side is very simple — we just needed to teach the drivers how to detect tables with custom partitioning schemes and how to correctly compute a token for a partition key in such a table. In the case of CDC log tables the partition key is just the stream ID, therefore the token can be extracted by taking the last 64 bits.
The detection process is fully automatic – you just need to make sure that the version of the driver you use supports it.
We are planning to add this feature to all drivers that are maintained by ScyllaDB. As of writing this blog, two of our drivers already have the support for this feature merged and released:
- Our fork of GoCQL, starting from version 1.5.0.
- Our Java driver, starting from versions 3.10.2-scylla-0 and 4.9.0-scylla-1.
And issues have been opened for our shard-aware C++ (#11), Python (#96) and Rust (#194) drivers.
Get Connected to Our Community
Our priorities are driven by user feedback. So to stay in touch with the latest developments in our CDC implementation and advocate for your favorite programming language drivers, make sure you join our Slack channel, or join the conversation in our user mailing list.