See all blog posts

Deep Dive into the ScyllaDB Spark Migrator

ScyllaDB and Spark

Update: We now have a full masterclass on NoSQL migration, including  a detailed look at ScyllaDB Migrator in action.  You can watch it (free + on-demand)  here: NoSQL Data Migration Masterclass.

Another week, another Spark and ScyllaDB post! This time, we’re back again with the ScyllaDB Spark Migrator; we’ll take a short tour through its innards to see how it is implemented.

  • Read why we implemented the ScyllaDB Spark Migrator in this blog.

Overview

When developing the Migrator, we had several design goals in mind. First, the Migrator should be highly efficient in terms of resource usage. Resource efficiency in the land of Spark applications usually translates to avoiding data shuffles between nodes. Data shuffles are destructive to Spark’s performance, as they incur more I/O costs. Moreover, shuffles usually get slower as more nodes are added (which is the opposite of the scaling model we like!).

Beyond resource efficiency, the Migrator was designed to perform decently out of the box with relatively little tuning: the default configuration splits the source table into 256 ranges that are transferred in parallel; on each executor, 8 connections are opened to Cassandra and 16 to ScyllaDB; rows are fetched in batches of 1000 and TTL/WRITETIME timestamps are preserved. Of course, these parameters can be tuned using the configuration file.

With these goals in mind, let’s recap how the Migrator works:

  • When launched, the Migrator reads the schema definition for the source table from Cassandra;
  • The schema is used to create the CQL selection;
  • If timestamp preservation is enabled, the TTL and WRITETIME timestamps for non-key columns are also added to the CQL projection;
  • The rows are fetched in chunks from Cassandra, and each chunk is written to ScyllaDB;
  • As rows are written, the token ranges that have been processed are tracked and periodically saved.

Sounds pretty straightforward! We’ll dive into how these steps are implemented in the following sections.

Using the table definition to create the DataFrame schema

As we’ve mentioned in the post about DataFrames, every DataFrame in Spark has an associated schema. The schema is used to validate the queries against the DataFrame, optimize them and so forth. When the data source for the DataFrame is structured, it is very sensible to infer the schema from the data source.

When creating a DataFrame from a Cassandra table, the Cassandra connector will happily infer the schema for us. However, creating DataFrames is limited to using the table itself. The Migrator needs to add additional expressions to the table scan, so we’re going to build the schema manually using the connector’s infrastructure.

We start off by creating an instance of CassandraConnector. This class bundles the management of connections to Cassandra for the driver and the executors:

The connector’s configuration can be derived from Spark’s configuration. The actual initialization in the Migrator is slightly different, as we extract the configuration parameters from a YAML file rather than through Spark’s configuration mechanism. With the connector defined, we can use it to build the schema.

The connector provides a data type called TableDef:

The TableDef data type represents everything we need to know about the structure of a Cassandra/ScyllaDB table. Most importantly, it includes several sequences of ColumnDef instances that describe the columns of the table.

Now, TableDef is a plain case class, so we could construct it manually (and that could be useful when creating new tables from DataFrames not originating in Cassandra), but in this case, we’d like to infer it from an existing table. There’s a handy method for this:

tableFromCassandra will use the connector to fetch the schema data from Cassandra and create the TableDef instance. There’s actually another method, Schema.fromCassandra, that can fetch the definitions for all keyspaces and all tables on Cassandra, but we won’t use it.

Using the TableDef instance, we should be able to construct the DataFrame schema. These schemas are specified as a StructType; this is a data type representing a record:

So essentially, each ColumnDef should map to a StructField. There’s one last missing piece for this puzzle: how do we actually do that? How do we convert the ColumnType associated with a ColumnDef to that?

Luckily, the DataTypeConverter object has what we need:

This function will take any ColumnDef and spit back a StructField. So we can just do the following to create our StructType schema:

Great! So now, we can move forward with actually modifying the schema for our needs.

Building a custom query for a table scan

The motivation for building the schema manually is to include in it the expressions for TTL and WRITETIME timestamps for the individual columns; we need to copy over those timestamps along with the column values. Let’s start by creating the custom query itself. We’re going to use the more low-level RDD interface, as only that interface supports specifying custom selections.

We can modify the colums included in the CQL projection by using the select method on the CassandraRDD. This is the definition for select:

We haven’t talked about ColumnRef yet – this is a data type describing column references in CQL projections. We can build the basic projection by using the ref field present on the ColumnDef:

Now, to include the TTL and WRITETIME timestamps of each column in the projection, we could do something like the following:

There’s one problem: we can’t retrieve these timestamps for columns that are part of the primary key for the table; so we need to only add the timestamp extraction for regular columns. The TableDef data type differentiates between the columns (allColumns is just a concatenation of all the ColumnDef instances), so we can write the projection as follows:

And this projection can be fed to the select method on the CassandraRDD:

What’s CassandraSQLRow, you ask? This is a data type, provided by the Cassandra connector, that implements the interface for Spark’s internal Row: an sequence of heterogenously-typed named fields that backs DataFrames. The rows from the Cassandra driver for Java are converted into this data type.

Finally, we need to take care of the DataFrame schema as well: it too must be modified to include the fields for the TTL and WRITETIME timestamps. This is done similarly by transforming the sequence of StructFields:

Note that each field is checked against the names of regular columns; only if it is indeed a non-key column, the schema is modified to include its timestamps.

Excellent! We now have an RDD that uses a custom projection to query the columns and their timestamps, and a DataFrame schema. We need to put them together and create the DataFrame; we don’t actually want to work with the RDD. To do so, we’ll use the createDataset method available on the SparkSession:

The RowEncoder is (yet another) data type that tells Spark’s Catalyst engine how to convert the RDD’s data to Spark’s internal binary representation.

Preserving ttls and writetimes for individual cells

Our next challenge is to figure out how to actually write different WRITETIME~/~TTL timestamps to each column of the transferred rows. See, the problem is that when issuing a CQL insert, you can only specify one timestamp that is applied to all non-key columns being written:

This statement would apply the 24 hour TTL value to both reg1 and reg2 values. The solution to this is to issue a separate CQL statement for each group of values within a row with the same TTL and WRITETIME. For example, assuming we need to copy a row that consists of the following values:

To copy such a row, we would need to issue the following CQL statements:

Note how the insertions of reg3 and reg4 ended up in the same CQL statement; this is because they have the same TTL and WRITETIME values, while the other values create unique combinations of TTL and WRITETIME. When ScyllaDB processes these CQL statements, it’ll merge the values into the same row determined by the key.

The DataFrame that contains the rows must use a fixed schema; the CQL statements we’ve shown contain the columns of the primary key, and a subset of (or possibly the entire set of) regular columns. We’ll need to represent the fact that some columns are unset on the rows. To do so, we’ll use the CassandraOption[T] data type provided by the Cassandra connector.

This data type is similar to Scala’s built-in Option[T] data type, but contains another term in the sum type. Here’s the definition:

We can use this data type to represent the 3 states a column in a CQL DML statement can be: set with a value, unset, or set with null (which means it’ll be cleared in the row).

To implement this row splitting operation, we’ll use the flatMap function present on DataFrames, that allows the use of plain Scala functions to process the rows in the DataFrame. This is the definition of flatMap:

Recall that DataFrames are a type alias for Dataset[Row]. TraversableOnce is a trait from the Scala collections library representing collections that can be processed once (or more). We can use flatMap to transform the rows in the DataFrame to collections of elements for which a Spark encoder exists. The collections will be flattened into the resulting DataFrame.

So, we’ll write a function of the form Row => List[Row] that will expand each original row into rows that use the same TTL and WRITETIME values. Since access to rows is by index, we’ll first create maps that contain the indices for the primary keys and the regular columns.

This function will create the maps we need:

Note that the index map for the regular columns contains the index for the original column, the TTL column and the WRITETIME column. To actually perform the row splitting operation, we’ll use this function:

This one is slightly more involved, so let’s describe each of the transformations it performs. We start by checking if there are any regular columns; if there aren’t, and the row is composed entirely of primary key columns, we just return the row in a list.

When we do have regular columns, we first transform the index map to a list of column name, value, TTL value and WRITETIME value. Next, we group the fields by their TTL and WRITETIME values and discard the timestamps from the resulting map’s value type. Lastly, we construct a row from each field group by adding the primary key values, the regular column values wrapped in CassandraOption and finally adding the TTL and WRITETIME values.

To actually use these functions, we first need to use Spark’s broadcasting mechanism to send a copy of the field index maps to each executor. This is done as follows:

The broadcasting mechanism is an optimization for when the bodies of DataFrame transformation functions need to reference read-only values. To transform the DataFrame, we call flatMap as follows:

Finally, we need to tell the connector to save the DataFrame to ScyllaDB. We’ll drop down again to the RDD API as that offers more control over how the CQL statements are constructed. We create a ColumnSelector value that describes which columns should be written; the columns are created from the original schema that we loaded, before adding the colName_ttl columns:

We also create a WriteConf value that describes how the connector should perform the writes. Critically, we tell the connector which column should be used for the TTL and WRITETIME values for each row:

And we pass all of this to the saveToCassandra method:

Whew! That’s the beefiest part for the Migrator.

Tracking token ranges that have been transferred

The next part we’ll discuss is how the Migrator keeps track of which token ranges have already been transferred. This functionality required some modification of the Cassandra connector to propagate this information to the Migrator, which is why the Migrator relies on a fork of the connector.

There are two sides to this feature: first, we must keep track of token ranges that have been transferred, and periodically write them to a savepoint file; second, when resuming a migration, we must use the savepoint file to skip token ranges that have already been transferred. Let’s start with the first part.

We’ve seen Spark’s broadcast variables in the previous section; these are immutable, read-only values that are sent to all executors from the driver. Spark also contains accumulators: variables that are readable and writable from both executors and the driver.

We’ve modified the connector to store transferred token ranges in an accumulator. This accumulator is then periodically read on the driver and its contents is stored in a savepoint file. To implement an accumulator, we can inherit from Spark’s AccumulatorV2 abstract class:

The contract is that the accumulator can read values of type IN, update an internal state in a thread-safe way, and output values of type OUT, which would typically be an aggregation of IN values. Our accumulator will set IN = Set[CqlTokenRange[_, _]] and OUT = Set[CqlTokenRange[_, _]].

These are the important parts of the implementation:

The core idea is that we use an AtomicReference to store the current set of transferred token ranges. AtomicReference is a mutable reference for storing immutable values. Scala’s Set is immutable, so it can be safely stored there. Whenever we need to add another set of token ranges that have been transferred, we use getAndUpdate to atomically update the set. To extract the set, we can use the get method on the reference.

To use the accumulator, we’ve modified the connector’s TableWriter class; specifically, when writing one of the RDD’s partitions, the writer tests if the partition is a CassandraPartition (this is true for the Migrator, but not always, and thus is not reflected in the types), and extracts its ranges:

We call this function in the writeInternal method and add the transferred ranges into the accumulator after writing them:

The full implementation can be seen here.

Before starting the DataFrame write, which blocks the calling thread until it completes, a scheduled thread is setup which reads the accumulator periodically and writes its contents to the savepoint file:

Now, the second part to this feature is actually skipping token ranges that have already been transferred. The connector operates dividing the token ranges between the executors, and then fetching rows corresponding to the token ranges. We can filter the token ranges by passing in a function of the form (Long, Long) => Boolean, which determines if a given range should be transferred.

This is the relevant implementation in CassandraTableScanRDD#compute:

These token ranges are then fetched as they were before our modifications. This is all that’s needed to skip the token ranges that were already transferred!

Summary

The source code for the Migrator can be found here, and the source code for the modified Cassandra connector here (see the latest commits to see the changes made). Some of the snippets shown here differ slightly from the code in Github, but the core principles remain the same.