See all blog posts

Hooking up Spark and ScyllaDB: Part 3

Spark and ScyllaDB: Part 3

Spark and ScyllaDB: Spark DataFrames in ScyllaDB

Welcome back! Last time, we discussed how Spark executes our queries and how Spark’s DataFrame and SQL APIs can be used to read data from ScyllaDB. That concluded the querying data segment of the series; in this post, we will see how data from DataFrames can be written back to ScyllaDB.

As always, we have a code sample repository with a docker-compose.yaml file with all the necessary services we’ll need. After you’ve cloned it, start up the services with docker-compose:

After that is done, launch the Spark shell as in the previous posts in order to run the samples in the post:

Saving static DataFrames

Let’s start with a simple example. We’ll create a DataFrame from a list of objects of the same type, and see how it can be written to ScyllaDB. Here’s the definition for our DataFrame; note that the comments in the snippet indicate the resulting values as you’d see them in the Spark shell:

The data types are fairly rich, containing primitives, lists and maps. It’ll be interesting to see how they are translated when writing the DataFrame into ScyllaDB.

Next, add the following imports from the DataStax Cassandra connector for Spark to enrich the Spark API:

Now, similarly to how DataFrames are read from ScyllaDB using spark.read.cassandraFormat (see the previous post for the details), they can be written into ScyllaDB by using a DataFrameWriter available on the DataFrame:

Spark, as usual, is not running anything at this point. To actually run the write operation, we use writer.save:

Unfortunately (or fortunately), the Cassandra connector will not automatically create tables for us when saving to non-existent targets. Let’s fire up cqlsh in another terminal:

We can go back to our Spark shell and try running writer.save() again. If all went well, no exceptions should be thrown and nothing will be printed. When the Spark shell prompt returns, try running the following query in cqlsh – you should see similar results:

Great! So that was fairly easy. All we had to do was match the column names and types to the DataFrame’s schema. Let’s see what happens when a column’s type is mismatched; create another table with name’s type changed to bigint:

And write the DataFrame to it:

You should see a fairly large amount of stack traces scrolling by, and embedded somewhere within them, the NumberFormatException we’ve shown here. It’s not very informative; we can infer from our simple example that the problem is with the name column, but with a larger application and schema this might be harder.

When column names are mismatched, the error message is slightly friendlier; this is the exception we’d get when name is misnamed:

Some type mismatched won’t even throw exceptions; the connector will happily coerce booleans to text, for example, when saving boolean fields from DataFrames into text columns. This is not great for data quality. The astute reader will also note that the NumberFormatException from before would not be thrown for strings that only contain numbers, which means that some datasets might behave perfectly well, while others would fail.

The connector contains some useful infrastructure that we could use to implement programmatic checks for schema compatibility. For example, the TableDef data type represents a ScyllaDB table’s schema. We can convert a Spark DataFrame’s schema to a TableDef like so:

The fromDataFrame function mapped every Spark type to the corresponding ScyllaDB type. It has also picked the first column in the DataFrame schema as the primary key for the resulting table definition.

Alternatively, we can also retrieve a TableDef from ScyllaDB itself using the Schema data type. This time, we need to initialize the connector’s session manually and then use it to retrieve the schema:

The TableDef contains the definitions of all columns in the tables, the partition keys, indices and so forth. You should of course use those fields rather than compare the generated CQL.

So, we can formulate a naive solution based on these data types for checking whether our schemas are compatible. This could serve as runtime validation prior to starting our data transformation jobs. For a complete solution, you’d also need to take into consideration which data types are sensibly assignable to other data types; smallint is assignable to a bigint, for example, but not the other way around.

To end this section, let’s see how a new table can be created from a DataFrame. The createCassandraTable method can be used to create a new, empty table in ScyllaDB using the DataFrame’s schema:

NOTE: The support for writing RDDs back to ScyllaDB is more extensive than that for DataFrames; for example, when writing RDDs, one can specify that elements should be appended to list columns rather than replacing them. The DataFrame API is somewhat lagging behind in this regard. We will expand more on this in a subsequent post in this series, in which we will describe the ScyllaDB Migrator project.

Execution details

Now that we know how to execute the write operations, it’d be good to understand what technically is happening as they are executed. As you might recall, the RDDs that underlie DataFrames are comprised of partitions; when executing a transformation on a DataFrame, the transformation is executed on each partition in parallel (assuming there are adequate compute resources).

Write operations are no different. They are executed in parallel on each partition by translating the rows of each partition into INSERT statements. This is can be seen clearly by using ScyllaDB’s tracing capabilities. Let’s truncate our table and turn on the probabilistic tracing with a probability of 1 using nodetool:

We’ll execute the write operation again, but this time, we will create a much larger dataframe using a small helper function:

We can reset the tracing probability to 0 now using the same nodetool subcommand. To view the tracing results, we can query the system_traces.sessions table:

The results on your local instance should look similar; you should see many entries in the sessions table for execution of INSERT statements. The connector will prepare the statement for inserting the data and execute batches of inserts that reuse the prepared statement.

Another interesting aspect of the execution of the table writes is the use of laziness. Say we’re reading back the big table we just wrote into a DataFrame, and we’d like to write it back to a new table, like so:

Instead of reading the entire source table into Spark’s memory and only then writing it back to ScyllaDB, the connector will lazily fetch batches of rows from ScyllaDB and pipe them into the writer. If you recall, RDDs are defined by a function Iterator[T] => Iterator[U]. When the DataFrame is created in the first line in the snippet, the connector creates an Iterator (see here) that when pulled, would fetch the next batch from ScyllaDB.

When the DataFrame is written into ScyllaDB on the last line, that iterator has not been pulled yet; no data has been fetched from ScyllaDB. The TableWriter class in the connector will create another iterator (see here), on top of the source iterator, that will build batches of INSERT statements.

The overall effect is that the loop that will iterate the batch iterator and insert the batches will cause the source to lazily fetch data from ScyllaDB. This means that only the data needed for the batch being inserted will be fetched into memory. That’s a very useful property that you can exploit for building ETL processes!

It should be noted that this property is only true if the stages between the source and the writer did not contain any wide transformations. Those transformations would cause a shuffle to be performed (see the previous post for more details on this) and subsequently the entire table would be loaded into memory.

Summary

You should now be equipped to write Spark jobs that can execute queries on ScyllaDB, create DataFrames from the results of those queries and write the DataFrames back to ScyllaDB — into existing tables or tables that need to be created.

Up until now, all workloads we’ve described are typically described as batch workloads: the entirety of the dataset is available up front for processing and its size is known. On our next post, we will discuss streaming workloads, in which those two conditions aren’t necessarily true. We’ll put together an application that uses everything we’ve seen up until now to process streaming workloads, write them into ScyllaDB and serve queries using the data from ScyllaDB. Stay tuned!

Continue to Part 4 of 4: Structured Streaming