In this article, we will demonstrate how to use Spark Scala API with ScyllaDB to get instant results. We will demonstrate how to extract the average arrival/departure delays of flights or cancellations during one year from the public dataset of RITA; namely, the average arrival delay, the average departure delay, the average departure/arrival delay, and flight cancellation for each air carrier.
The data we downloaded from RITA contains the flight arrival and departure information for all commercial flights operated by large air carriers within the U.S. from 1987 to 2008. This dataset holds approximately 120 million records in comma-separated value (CSV) format, while the size in uncompressed format is 120 GB. The project consists of twological modules:
- Loader is a module for loading big data from /tmp/2008.csv file into ScyllaDB.
- Extractor is a module that processes the requested data and extracts it from the huge datasets. To demonstrate our case, we will run several queries using Spark Scala API. For example, we will analyze destinations and carriers with the highest flight delays and cancellations.
ScyllaDB on Spark: Step-by-Step
Prerequisites
Deploy ScyllaDB
You can choose any of these options to get started with ScyllaDB.
For this example, we are using docker (default configuration):
$ docker run --name scylla -p 7000:7000 -p 7001:7001 -p 9042:9042 -p 9160:9160 -p 10000:10000 -d scylladb/scylla
Run nodetool and cqlsh
Verify ScyllaDB is OK:
$ docker exec -it scylla nodetool status
$ docker logs scylla | tail
Check cql is OK:
$ docker exec -it scylla cqlsh
Configure ScyllaDB (*Optional)
By default, ScyllaDB utilizes all CPUs and memory. To configure the resource limits, use these commands:
--smp, --memory, --cpuset.
Here is an example with strict limitations for ScyllaDB:
$ docker run --name scylla -p 7000:7000 -p 7001:7001 -p 9042:9042 -p 9160:9160 -p 10000:10000 -d scylladb/scylla --memory 4G --cpuset 0-2,4
Prepare Data
Download and extract the data from the archive. For our example (in order to make this even more interesting), we have chosen to use public datasets; specifically, data from the year 2008 from the Statistical Computing Statistical Graphics site.
To do this, let’s run the following commands in a terminal:
$ wget http://stat-computing.org/dataexpo/2009/2008.csv.bz2
$ mv 2008.csv.bz2 /tmp
$ bzip2 -d /tmp/2008.csv.bz2
The next step is to run in cqlsh the scripts:
cqlsh> CREATE KEYSPACE spark_scylla WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
cqlsh> USE spark_scylla;
cqlsh> CREATE TABLE rita (year int, month int, day int, origin text, dest text, carrier text, cancelled boolean, tail text, arrdelay int, depdelay int, distance int, flight int, PRIMARY KEY ((year, month, day), origin, dest, carrier, cancelled, tail));
year - 1987-2008
month - 1-12
day - DayofMonth - 1-31
carrier - unique carrier code
flight - flight number
tail - plane tail number
arrdelay - arrival delay, in minutes
depdelay - departure delay, in minutes
origin - origin IATA airport code
dest - destination IATA airport code
distance - in miles
cancelled - was the flight cancelled?
Prepare Java, Scala, and Sbt Installation
OpenJDK:
To install OpenJDK in Ubuntu or Debian using apt-get, execute the following command in a terminal:
$ sudo apt-get install openjdk-8-jdk
To install OpenJDK in Fedora, Oracle, or Redhat using yum, run the following command in a terminal:
$ su -c "yum install java-1.8.0-openjdk-devel"
Sbt:
To install Sbt in Ubuntu or Debian, execute the following commands in a terminal:
$ echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
$ sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 2EE0EA64E40A89B84B2DF73499E82A75642AC823
$ sudo apt-get update
$ sudo apt-get install sbt
To install Sbt in Fedora, Oracle, or Red Hat, execute the following commands in a terminal:
$ curl https://bintray.com/sbt/rpm/rpm | sudo tee /etc/yum.repos.d/bintray-sbt-rpm.repo
$ sudo yum install sbt
Scala:
To install Scala, run the following commands in a terminal:
$ wget http://www.scala-lang.org/files/archive/scala-2.11.8.tgz
$ sudo mkdir /usr/local/src/scala
$ sudo cp scala-2.11.8.tgz /usr/local/src/scala
$ cd /usr/local/src/scala
$ sudo tar xvf /usr/local/src/scala/scala-2.11.8.tgz
$ cd ~
Add the following text to the file
~/.bashrc: export SCALA_HOME=/usr/local/src/scala/scala-2.11.8 export PATH=$SCALA_HOME/bin:$PATH . .bashrc
Prepare Spark
To install Apache Spark, just do the following in a terminal of the server:
$ wget http://d3kbcqa49mib13.cloudfront.net/spark-2.1.0-bin-hadoop2.7.tgz
$ tar xvf spark-2.1.0-bin-hadoop2.7.tgz
$ cd spark-2.1.0-bin-hadoop2.7
Prepare an Application
The data from RITA is a large dataset: there are more than 120 million records, and it is 1.6 gigabytes of space compressed. We’re taking only 7 million values for our example.
To prepare the application, we have to download the project rita-analyzer from github repository. First, get the folder rita-analyzer.
$ cd ~/
$ git clone https://github.com/scylladb/scylla-code-samples.git
$ cd scylla-code-samples/rita-analyzer
Second, we need to build the application. To do so, we have to execute the following command in the rita-analyzer directory:
$ sbt assembly
This will download all the necessary dependencies, build our example, and create an output jar-file in:
target/scala-2.11/rita-analyzer-assembly-1.0.jar
Third, we need to configure the file spark-scylla.conf. If we are going to run Spark and ScyllaDB on the same node, we will have to limit the amount of cores used by Spark to only 2 cores by adding the following line in the file spark-scylla.conf:
spark.master local
…….
where local means to run Spark locally with one worker thread (i.e. no parallelism at all).
The next step is to load previously downloaded data from the /tmp/2008.csv file into ScyllaDB and execute the following command in the Spark directory:
$ ./bin/spark-submit --properties-file /path/to/rita-analyzer/spark-scylla.conf --class Loader /path/to/rita-analyzer/target/scala-2.11/rita-analyzer-assembly-1.0.jar
We will download the data from the file /tmp/2008.csv into ScyllaDB in batches by 1000 rows. At the code level, this functionality will look like this:
while (work) {
data.take(1000).toList match {
case Nil => work = false
case xs: List[String] =>
val rita = xs.map(line => {
val cols = line.split(",")
Some(Rita(cols(0).toInt, cols(1).toInt, cols(2).toInt,
cols(16), cols(17), cols(8), cols(21).toRitaBoolean, cols(10),
cols(14).toRitaInt, cols(15).toRitaInt, cols(18).toInt, cols(9).toInt))
})
sc.parallelize(rita).filter(_.isDefined).map(_.get).saveToCassandra(keyspace, tableName, someColumns)
}
}
In the terminal output, we will see some Spark log data and can watch this process in Spark UI (http://localhost:4040). We can check the data in ScyllaDB while the app is running or after its completion (we limited the output by 10 first rows):
SELECT * from rita limit 10;
The output should look like this:
year | month | day | origin | dest | carrier | cancelled | tail | arrdelay | depdelay | distance | flight ------+-------+-----+--------+------+---------+-----------+--------+----------+----------+----------+-------- 2008 | 6 | 10 | ABE | ATL | EV | False | N821AS | -16 | -4 | 692 | 4598 2008 | 6 | 10 | ABE | ATL | EV | False | N977EV | -12 | -6 | 692 | 4171 2008 | 6 | 10 | ABE | CLE | XE | False | N27506 | 186 | 186 | 339 | 2594 etc...
Run the Application
In the first example below, we will demonstrate how to use Spark Scala API with ScyllaDB to get the data from the relevant public dataset from RITA for the average arrival/departure delays of flights and their cancellations during one year.
We will describe the following five use cases:
- Top 3 destinations with the highest average arrival delay
- Top 3 origins with the highest average departure delay
- Top 3 carriers with the highest average departure and arrival delay
- Top 3 carriers with the highest flight cancellation
- Top 3 carriers with minimal average departure delays
1. Top 3 destinations with the highest average arrival delay
Here, we will determine the destinations with the maximum average arrival delay. To do so we pull the data about carriers with the maximum delay for the chosen places of arrival and places of departure, from which there is the greatest delay in arrival. Find below the algorithm and instructions for how to do that:
1. Filter by condition: field arrdelay > 0
2. Group by field dest
3. For each group:
3.1. Calculate average arrival delay List[{rows of rita data}].map(x => x.arrdelay).sum / List[{rows of rita data}].size
3.2. Group by field carrier
3.2.1. For each subgroup, calculate [average arrival delay] like we did in step 3.1 for groups
3.2.2. Sort by [average arrival delay] descending
3.2.3. Take first 3 records
3.3. Group by field origin
3.3.1. For each subgroup, calculate [average arrival delay] like we did in step 3.1 for groups
3.3.2. Sort by [average arrival delay] descending
3.3.3. Take first 3 records
4. Sort by [average arrival delay] descending
5. Take first 3 records, and the output will look like this:
Where:
MQ – Envoy Air
MKE – General Mitchell Airport, Milwaukee, Wisconsin
GRB – Green Bay-Austin Straubel Airport, Austin, Wisconsin
ORD – O’Hare International Airport, Chicago, Illinois
Here is the source code of the described case on Spark:
rdd
.filter(_.arrdelay > 0)
.groupBy(p => p.dest)
.map({ case (dest, xs) =>
DestDelay(
dest = dest,
delay = xs.map(x => x.arrdelay).sum / xs.size, // average delay
carrier = xs.groupBy(_.carrier).map { gc => // group by carrier
NameDelay(gc._1, gc._2.map(_.arrdelay).sum / gc._2.size)
}.toList.sortWith(_.delay > _.delay).take(3), // sort carrier's list by average delay DESC, take top 3
origin = xs.groupBy(_.origin).map { gc => // group by origin
NameDelay(gc._1, gc._2.map(_.arrdelay).sum / gc._2.size)
}.toList.sortWith(_.delay > _.delay).take(3) // sort origin's list by average delay DESC, take top 3
)ל
})
.sortBy(_.delay, ascending = false)
.take(3)
)
The source code can be found here.
2. Top 3 origins with the highest average departure delay
In the next example, we determine the places of departure with the maximum average delay of departure. To do so, we get the data about the carriers with the maximum delay of departure from places of departure.
We follow the same steps as in use case #1, and the results below are what you get using the algorithm scripted with this source code.
Origin: CMX, departure average delay: 92 mins
... Top 3 carriers with highest departure average delay:
...... carrier: 9E, departure average delay: 92 mins
... Top 3 destinations with highest departure average delay:
...... from destination: MSP, departure average delay: 92 mins
etc…
Where:
CMX – Houghton County Memorial Airport
9E – Endeavor Air
MSP – Minneapolis–Saint Paul International Airport
3. Top 3 carriers with the highest average departure and arrival delay
Determine the carriers with the maximum average delay in departure / arrival. You can find the algorithm’s source code here.
And, here is the output:
Carrier: YV, departure average delay: 55 mins, arrival average delay: 57 mins
Carrier: B6, departure average delay: 54 mins, arrival average delay: 58 mins
Carrier: OH, departure average delay: 48 mins, arrival average delay: 52 mins
etc…
Where:
YV – Mesa Airlines
B6 – JetBlue Airways
OH – PSA Airlines
4. Top 3 carriers with the highest flight cancellation:
Determine the carriers with the maximum amount of flight cancellations. Here is the algorithm’s source code.
And, here is the output:
Carrier: MQ, flight cancellation count: 14006
Carrier: OO, flight cancellation count: 12226
Carrier: AA, flight cancellation count: 11757
etc…
Where:
MQ – American Eagle Airlines
OO – SkyWest Airlines
AA – American Airlines
5. Top 3 carriers with minimal average departure delays
We find the carriers with minimal average departure delay the same way as in use case #3, and the results below you get using the described algorithm with this source code.
Here is the output:
Carrier: HA, total departure delay: 1, total arrival delay: 2
Carrier: US, total departure delay: 5, total arrival delay: 2
Carrier: F9, total departure delay: 5, total arrival delay: 6
etc…
Where:
HA- Hawaiian Airlines
US- US Airways
F9 – Frontier Airlines
Thereby, to get such results, we have to run the following command in the Spark directory in the terminal:
$ ./bin/spark-submit --properties-file /path/to/rita-analyzer/spark-scylla.conf --class Extractor /path/to/rita-analyzer/target/scala-2.11/rita-analyzer-assembly-1.0.jar
Final Notes
While ScyllaDB cannot reduce your own travel delays (not yet, at least!), it can certainly reduce your latency, cluster size, and other metrics. In the airline industry, fast reading of data is key for analyzing and presenting events in near real time. ScyllaDB is a high-performing database that allows you to do just that. Plus, developers don’t need to change data structures for their existing Cassandra projects. They can apply them directly to ScyllaDB.