fbpx
See all blog posts

Consuming CDC with Java and Go

If you are a regular visitor of our blog, you probably heard about Change Data Capture (CDC) in Scylla. It’s a feature that allows you to track and react to changes made to data in your cluster. In Scylla 4.3, we are confident to say that CDC is production-ready. Along with that, we are releasing libraries for Java and Go which will simplify writing applications that will read from Scylla’s CDC.

We believe these two languages will appeal best to our highly opinionated developer base. Java is the older, more established and more broadly-used language. Golang is newer, supports better concurrency handling and faster performance.

In this blog post, we will cover both scylla-cdc-java, a Java library, and scylla-cdc-go, a Go library. These two libraries serve as frameworks for writing applications that process changes in the CDC log. You will learn what kind of challenges they help you avoid, and how to use them to write applications that print changes happening to a table in real-time.

If you want to read the reasoning behind our approach, read the next section “Why Use a Library?” Or, if you want to jump right in and get started using your favorite programming language, here are some handy links:

Why Use a Library?

Scylla’s design of CDC is based on the concept of CDC log tables. For every table whose changes you wish to track, an associated CDC log table is created. We refer to this new table as the CDC log table and the original table as a base table. Every time you modify your data in the base table — insert, update or delete — this fact is recorded by inserting one or more rows to the corresponding CDC log table.

This approach makes it possible to use tools that already exist in order to read from a CDC log. Everything is accessible through CQL and the schema of CDC log tables is documented by us, so it’s possible to write an application consuming CDC with the help of a driver (or even cqlsh).

However, the CDC log format is more complicated than a single queue of events. You need to know the design of Scylla CDC well in order to implement an application that is performant and robust. Fortunately, our libraries will handle those concerns for you. You can use their convenient API so that you can concentrate on writing the business logic of your application.

Challenges

Streams

A CDC log table does not form a single queue of events – instead, it is divided into multiple queues. This partitioning is defined by a set of streams. Each stream defines a set of partition keys such that all partitions in that stream are stored on the same set of replicas, and on the same shard. In turn, for each stream, the CDC log table has a partition that contains events about changes to partitions within that stream.

Thanks to this layout of data, Scylla can co-locate entries in a CDC log table with affected partitions in the base table. More specifically, if a partition is modified, the information will be put into the CDC log and will be stored on the same node as the partition in the base table. This reduces the number of nodes participating in a write to the base table, improves consistency between the base table and CDC table, and enables better performance of the cluster.

Generations

To make things even more complicated, the topology of the cluster may change during its lifetime. Because it modifies the token ring, this can break the co-location property of the CDC log data. In order to maintain good performance and consistency, Scylla changes the partitioning scheme of the CDC log after such an event. A new “generation” will be computed – a new set of stream IDs that will be used in CDC logs. At a scheduled point in time, Scylla stops writing to partitions marked with previous stream IDs – the old generation – and starts using the new set of stream IDs.

The scylla-cdc-java and scylla-cdc-go libraries manage the complexity of generations and streams. They guarantee that, within a single stream, your application will process changes in order. They also make sure that all changes from streams of the previous generation are processed before moving to reading from streams of the next generation. This is necessary to ensure that no record is missed.

If you are interested in learning more about generations and streams, check out our documentation on CDC streams and generations.

Getting Started with Java

Let’s see how to use the Java library. We will build an application that prints changes happening to a table in real-time. You can see the final code here.

Installing the library

The latest version of the Scylla CDC Java library is available here. Please follow the installation instructions and add the library as a dependency to your Java project.

Setting up the CDC consumer

First, we establish a connection to the Scylla cluster using the Scylla Java Driver. We’re using the driver in version 3.x but the newer driver from 4.x branch works as well:

Having established a connection, we have to specify which tables of the CDC log we want to read. The provided names should be of the base tables, not the CDC log tables (e.g. ks.t not ks.t_scylla_cdc_log):

To consume changes, we specify a class that implements RawChangeConsumer interface (here by using a lambda). The consumer returns a CompletableFuture, so you can react to CDC changes and perform some I/O or processing.

The CDC consumer is started multi-threaded, with a configurable number of threads. Each thread will read a distinct subset of the CDC log (partitioned based on Vnodes). Those multiple threads will cumulatively read the entire CDC log. All changes related to the same row (more generally the same partition key) will appear on the same thread. Note that after a topology change (adding or removing nodes from the Scylla cluster) this mapping will be reset.

Next, we create an instance of RawChangeConsumerProvider which returns a RawChangeConsumer for each thread. We could write the provider in two ways:

  1. A single consumer shared by all threads. With such a provider, a single consumer will receive rows read from all worker threads that read the CDC log. Note that the consumer should be thread-safe. Below is an example of such a provider:

  1. Separate consumer for each thread. With such a provider, a separate consumer will be created for each worker thread. Those multiple consumers will cumulatively read the entire CDC log. Because each consumer receives changes from a single worker thread, they don’t have to be thread-safe. Note that after the topology change (adding or removing a node from the Scylla cluster), consumers are recreated. Below is an example of such a provider:

Finally, we can build a CDCConsumer instance and start it! If we are finished consuming the changes, we should call the stop() method.

Consuming CDC changes

Let’s implement the printChange(RawChange change) method and see what information is available about the change. The RawChange object represents a single row of CDC log. First, we get information about the change id: its stream id and time:

Those accessors correspond to cdc$stream_id and cdc$time columns.

We can get the operation type (if it was an INSERT, UPDATE, etc.):

In each RawChange there is information about the schema of the change – column names, data types, whether the column is part of the primary key:

There are two types of columns inside ChangeSchema:

CDC log columns can be easily accessed by RawChange helper methods (such as getTTL(), getId()). Let’s concentrate on non-CDC columns (those are from the base table) and iterate over them:

We can also read the value of a given cell (column) in the change:

If we know the type of a given cell, we can get the value as a specific type:

Full Example

You can read the full source code here. You can run it using the following commands:

Where SOURCE is the IP address of the cluster.

Getting Started with Go

Now let’s look at how this same sort of CDC reader application can be implemented in Go. You can read the source code for this example here.

Installing the library

To install the library, simply run the following command:

If you use Go modules, make sure to run the command from your project’s directory.

The library uses gocql to read CDC from the cluster. For optimal performance, make sure you use our gocql fork. The fork has some Scylla-specific optimizations which result in better latency. Recently, it has gained optimizations for CDC, too – we will elaborate on that in a later blog post. You can learn how to switch to our fork here.

Setting up the CDC consumer

Like in the case of scylla-cdc-java, first we need to establish a connection to the Scylla cluster. To do that, we create a gocql session:

Next, we need to prepare a configuration for the scylla-cdc-go library. It is necessary to provide at least a session, list of fully qualified base table names, and a change consumer factory — we will come back to the last one in a moment. Table names need to be fully qualified and point to the base tables, not CDC log tables (e.g. ks.t, not ks.t_scylla_cdc_log). For a good measure, we also provide a logger, so that the library tells us what it is doing.

Now, we need to define a consumer type. Each instance of the consumer will be associated with a single CDC stream and will process changes from that stream in chronological order.

As mentioned before, the library processes generations one after another. When it starts processing a generation, it stops consumers for the old generation, and creates a new consumer for each stream in the new one. The library manages the lifetime of change consumers for you, therefore you need to provide a change consumer factory.

When the library starts processing a generation, it spawns a number of goroutines. Each goroutine periodically polls a fixed subset of streams, and feeds fetched changes to the consumers associated with those streams. This means that each consumer is associated with a single goroutine for its entire lifetime.

Keeping the multi-goroutine model in mind, there are two approaches to writing consumers and consumer factories:

  1. If your consumer is simple and stateless, you can model it as a function. In such case, you can easily create a factory for such a consumer by using a library-provided function:

  1. If your consumer needs to keep some state or run some custom logic on its creation or deletion, then you need to put in more work — you need to create a type for both consumer and a factory which will implement scyllacdc.ChangeConsumer and scyllacdc.ChangeConsumerFactory respectively:

Before our application can run, we need to do one more thing — actually start the reading process. We create a CDC reader object by using the configuration we prepared earlier and then start it:

Consuming CDC changes

No matter which method from the previous section you use, consuming changes boils down to analyzing a scyllacdc.Change object. Let’s use the first approach — we will implement the printerConsumer method and see what kind of information the change object offers.

A scyllacdc.Change object corresponds to a set of rows from the CDC table sharing the same cdc$stream_id and cdc$time column. Both of those parameters are available as fields of the change object:

A change partitions its rows into three groups: Delta, Preimage and Postimage. The Delta rows contain information about the change itself – which rows and columns were modified, and what kinds of modifications were applied to them. Preimage and Postimage rows represent the state of modified rows before and after the change. The last two groups will only appear if you enabled them in your CDC options for that table.

Rows from each group are represented by the scyllacdc.ChangeRow type. It provides a number of convenience methods.

First, you can use (*ChangeRow).Columns() to learn about the columns of the CDC log. It contains information about both CDC-specific and non-CDC-specific columns:

Next, there are methods for retrieving information about changes from the row. First, for partition keys and clustering keys, it is recommended to use (*ChangeRow).GetValue(column string) which directly returns the value of the column.

However, information about changes made to regular or static columns is usually split across multiple CDC log columns. For example, a column of name “v” and type “int” will be represented as two columns: “v” and “cdc$deleted_v”. Instead of fetching each column separately with GetValue, you can use convenience functions which already do it for you. In case of an int column, (*ChangeRow).GetAtomicChange(column string) will be the right function to use, but there are variants for non-atomic types such as tuples, collections and user defined types, too — refer to the documentation to learn more about them.

Finally, change row implements the Stringer interface. You can use it in fmt.Printf to have the row pretty-printed to the standard output:

Saving progress

Sometimes, your application will have to be stopped either due to a planned or an unplanned event. It might be desirable to keep track of how much data was processed in each stream and regularly save progress. The scylla-cdc-go library provides optional facilities which will help you avoid repeating unnecessary work in case your application was stopped. You can configure the library to either not save progress at all, store progress in a Scylla cluster, or use a user-defined mechanism for saving information about the progress.

Good to Go!

That’s it! It is a functional, albeit simplistic example of an application which reads from the CDC log. The application polls CDC streams, processes changes from each stream in order, and executes your callback for each change. It also takes care of topology changes out of the box.

Again, you can find all the code for this example here. You can run it using the following commands:

Where SOURCE is the IP address of the cluster.

Further Reading

In this blog, we have explained what problems the scylla-cdc-java and scylla-cdc-go libraries solve and how to write a simple application with each. If you would like to learn more, check out the links below:

  • Replicator example application in the scylla-cdc-java repository. It is an advanced application that replicates a table from one Scylla cluster to another one using the CDC log and scylla-cdc-java library.
  • Example applications in scylla-cdc-go repository. The repository currently contains two examples: “simple-printer”, which prints changes from a particular schema, “printer”, which is the same as the example presented in the blog, and “replicator”, which is a relatively complex application which replicates changes from one cluster to another.
  • API reference for scylla-cdc-go. Includes slightly more sophisticated examples which, unlike the example in this blog, cover saving progress.
  • CDC documentation. Knowledge about the design of Scylla’s CDC can be helpful in understanding the concepts in the documentation for both the Java and Go libraries. The parts about the CDC log schema and representation of data in the log is especially useful.
  • Change Data Capture (CDC) lesson in Scylla University.
  • ScyllaDB users slack. We will be happy to answer your questions about the CDC on the #cdc channel.

We hope all that talk about consuming data has managed to whet your appetite for CDC!

Happy and fruitful coding!

 

 

 

Piotr Grabowski

About Piotr Grabowski

From a young age, Piotr participated in many competitive programming contests. Always striving to stay on top of the latest business and tech news, he never finishing a day without checking his RSS reader! Piotr holds a BSc in Computer Science from the University of Warsaw and is now pursuing an MSc.

Piotr Dulikowski

About Piotr Dulikowski

Software Engineer at ScyllaDB. Previously worked on internal frameworks for SDNs.