May2

Analyzing flight delays with Scylla on top of Spark

Subscribe to Our Blog

In this article, we will demonstrate how to use Spark Scala API with Scylla to get instant results. We will demonstrate how to extract the average arrival/departure delays of flights or cancellations during one year from the public dataset of RITA; namely, the average arrival delay, the average departure delay, the average departure/arrival delay, and flight cancellation for each air carrier.

Analyzing Flight Delays With Scylla on Top of Spark The data we downloaded from RITA contains the flight arrival and departure information for all commercial flights operated by large air carriers within the U.S. from 1987 to 2008. This dataset holds approximately 120 million records in comma-separated value (CSV) format, while the size in uncompressed format is 120 GB. The project consists of twological modules:

  1. Loader is a module for loading big data from /tmp/2008.csv file into ScyllaDB.
  2. Extractor is a module that processes the requested data and extracts it from the huge datasets. To demonstrate our case, we will run several queries using Spark Scala API. For example, we will analyze destinations and carriers with the highest flight delays and cancellations.

Scylla on Spark: Step-by-Step

Prerequisites

Deploy Scylla

You can choose any of these options to get started with Scylla.

For this example, we are using docker (default configuration):

$ docker run --name scylla -p 7000:7000 -p 7001:7001 -p 9042:9042 -p 9160:9160 -p 10000:10000 -d scylladb/scylla

Run nodetool and cqlsh

Verify Scylla is OK:

$ docker exec -it scylla nodetool status
$ docker logs scylla | tail

Check cql is OK:

$ docker exec -it scylla cqlsh

Configure Scylla (*Optional)

By default, Scylla utilizes all CPUs and memory. To configure the resource limits, use these commands:

--smp, --memory, --cpuset. 

Here is an example with strict limitations for Scylla:

$ docker run --name scylla -p 7000:7000 -p 7001:7001 -p 9042:9042 -p 9160:9160 -p 10000:10000 -d scylladb/scylla --memory 4G --cpuset 0-2,4

Prepare Data

Download and extract the data from the archive. For our example (in order to make this even more interesting), we have chosen to use public datasets; specifically, data from the year 2008 from the Statistical Computing Statistical Graphics site.

To do this, let’s run the following commands in a terminal:

$ wget http://stat-computing.org/dataexpo/2009/2008.csv.bz2
$ mv 2008.csv.bz2 /tmp
$ bzip2 -d /tmp/2008.csv.bz2

The next step is to run in cqlsh the scripts:

cqlsh> CREATE KEYSPACE spark_scylla WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};

cqlsh> USE spark_scylla;

cqlsh> CREATE TABLE rita (year int, month int, day int, origin text, dest text, carrier text, cancelled boolean, tail text, arrdelay int, depdelay int, distance int, flight int, PRIMARY KEY ((year, month, day), origin, dest, carrier, cancelled, tail));

year - 1987-2008 
month - 1-12 
day - DayofMonth - 1-31 
carrier - unique carrier code 
flight - flight number 
tail - plane tail number 
arrdelay - arrival delay, in minutes 
depdelay - departure delay, in minutes 
origin - origin IATA airport code 
dest - destination IATA airport code 
distance - in miles 
cancelled - was the flight cancelled?

Prepare Java, Scala, and Sbt Installation

OpenJDK:

To install OpenJDK in Ubuntu or Debian using apt-get, execute the following command in a terminal:

$ sudo apt-get install openjdk-8-jdk

To install OpenJDK in Fedora, Oracle, or Redhat using yum, run the following command in a terminal:

$ su -c "yum install java-1.8.0-openjdk-devel"

Sbt:

To install Sbt in Ubuntu or Debian, execute the following commands in a terminal:

$ echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
$ sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 2EE0EA64E40A89B84B2DF73499E82A75642AC823
$ sudo apt-get update
$ sudo apt-get install sbt

To install Sbt in Fedora, Oracle, or Red Hat, execute the following commands in a terminal:

$ curl https://bintray.com/sbt/rpm/rpm | sudo tee /etc/yum.repos.d/bintray-sbt-rpm.repo
$ sudo yum install sbt

Scala:

To install Scala, run the following commands in a terminal:

$ wget http://www.scala-lang.org/files/archive/scala-2.11.8.tgz
$ sudo mkdir /usr/local/src/scala
$ sudo cp scala-2.11.8.tgz /usr/local/src/scala
$ cd /usr/local/src/scala
$ sudo tar xvf /usr/local/src/scala/scala-2.11.8.tgz
$ cd ~

Add the following text to the file

~/.bashrc: export SCALA_HOME=/usr/local/src/scala/scala-2.11.8 export PATH=$SCALA_HOME/bin:$PATH . .bashrc

Prepare Spark

To install Apache Spark, just do the following in a terminal of the server:

$ wget http://d3kbcqa49mib13.cloudfront.net/spark-2.1.0-bin-hadoop2.7.tgz
$ tar xvf spark-2.1.0-bin-hadoop2.7.tgz
$ cd spark-2.1.0-bin-hadoop2.7

Prepare an Application

The data from RITA is a large dataset: there are more than 120 million records, and it is 1.6 gigabytes of space compressed. We’re taking only 7 million values for our example.

To prepare the application, we have to download the project rita-analyzer from github repository. First, get the folder rita-analyzer.

$ cd ~/
$ git clone https://github.com/scylladb/scylla-code-samples.git
$ cd scylla-code-samples/rita-analyzer

Second, we need to build the application. To do so, we have to execute the following command in the rita-analyzer directory:

$ sbt assembly 

This will download all the necessary dependencies, build our example, and create an output jar-file in:

target/scala-2.11/rita-analyzer-assembly-1.0.jar

Third, we need to configure the file spark-scylla.conf. If we are going to run Spark and Scylla on the same node, we will have to limit the amount of cores used by Spark to only 2 cores by adding the following line in the file spark-scylla.conf:

spark.master local 
…….

where local means to run Spark locally with one worker thread (i.e. no parallelism at all).

The next step is to load previously downloaded data from the /tmp/2008.csv file into ScyllaDB and execute the following command in the Spark directory:

$ ./bin/spark-submit --properties-file /path/to/rita-analyzer/spark-scylla.conf --class Loader /path/to/rita-analyzer/target/scala-2.11/rita-analyzer-assembly-1.0.jar

We will download the data from the file /tmp/2008.csv into Scylla in batches by 1000 rows. At the code level, this functionality will look like this:

while (work) { 
 data.take(1000).toList match { 
 case Nil => work = false 
 case xs: List[String] => 
 val rita = xs.map(line => { 
 val cols = line.split(",") 
 Some(Rita(cols(0).toInt, cols(1).toInt, cols(2).toInt, 
 cols(16), cols(17), cols(8), cols(21).toRitaBoolean, cols(10), 
 cols(14).toRitaInt, cols(15).toRitaInt, cols(18).toInt, cols(9).toInt)) 
 }) 
sc.parallelize(rita).filter(_.isDefined).map(_.get).saveToCassandra(keyspace, tableName, someColumns) 
 } 
}

In the terminal output, we will see some Spark log data and can watch this process in Spark UI (http://localhost:4040). We can check the data in Scylla while the app is running or after its completion (we limited the output by 10 first rows):

SELECT * from rita limit 10;

The output should look like this:

year | month | day | origin | dest | carrier | cancelled | tail   | arrdelay | depdelay | distance | flight
------+-------+-----+--------+------+---------+-----------+--------+----------+----------+----------+--------
2008 |     6 |  10 |    ABE |  ATL |      EV |     False | N821AS |      -16 |       -4 |      692 |   4598
2008 |     6 |  10 |    ABE |  ATL |      EV |     False | N977EV |      -12 |       -6 |      692 |   4171
2008 |     6 |  10 |    ABE |  CLE |      XE |     False | N27506 |      186 |      186 |      339 |   2594
etc...

Run the Application

In the first example below, we will demonstrate how to use Spark Scala API with ScyllaDB to get the data from the relevant public dataset from RITA for the average arrival/departure delays of flights and their cancellations during one year.

We will describe the following five use cases:

  1. Top 3 destinations with the highest average arrival delay
  2. Top 3 origins with the highest average departure delay
  3. Top 3 carriers with the highest average departure and arrival delay
  4. Top 3 carriers with the highest flight cancellation
  5. Top 3 carriers with minimal average departure delays

1. Top 3 destinations with the highest average arrival delay

Here, we will determine the destinations with the maximum average arrival delay. To do so we pull the data about carriers with the maximum delay for the chosen places of arrival and places of departure, from which there is the greatest delay in arrival. Find below the algorithm and instructions for how to do that:

1. Filter by condition: field arrdelay > 0
2. Group by field dest
3. For each group:
3.1. Calculate average arrival delay List[{rows of rita data}].map(x => x.arrdelay).sum / List[{rows of rita data}].size
3.2. Group by field carrier
3.2.1. For each subgroup, calculate [average arrival delay] like we did in step 3.1 for groups
3.2.2. Sort by [average arrival delay] descending
3.2.3. Take first 3 records
3.3. Group by field origin
3.3.1. For each subgroup, calculate [average arrival delay] like we did in step 3.1 for groups
3.3.2. Sort by [average arrival delay] descending
3.3.3. Take first 3 records
4. Sort by [average arrival delay] descending
5. Take first 3 records, and the output will look like this:

Dest: MQT (Michigan airport), arrival average delay: 68 mins … Top 3 carriers with highest arrival average delay: …… carrier: MQ, arrival average delay: 68 mins … Top 3 origins with highest arrival average delay: …… from origin: MKE, arrival average delay: 78 mins …… from origin: GRB, arrival average delay: 73 mins …… from origin: ORD, arrival average delay: 54 mins etc…

Where:
MQ – Envoy Air
MKE – General Mitchell Airport, Milwaukee, Wisconsin
GRB – Green Bay-Austin Straubel Airport, Austin, Wisconsin
ORD – O’Hare International Airport, Chicago, Illinois

Here is the source code of the described case on Spark:

rdd 
.filter(_.arrdelay > 0) 
.groupBy(p => p.dest) 
.map({ case (dest, xs) => 
 DestDelay( 
 dest = dest, 
 delay = xs.map(x => x.arrdelay).sum / xs.size, // average delay 
 carrier = xs.groupBy(_.carrier).map { gc => // group by carrier 
 NameDelay(gc._1, gc._2.map(_.arrdelay).sum / gc._2.size) 
 }.toList.sortWith(_.delay > _.delay).take(3), // sort carrier's list by average delay DESC, take top 3 
 origin = xs.groupBy(_.origin).map { gc => // group by origin 
 NameDelay(gc._1, gc._2.map(_.arrdelay).sum / gc._2.size) 
 }.toList.sortWith(_.delay > _.delay).take(3) // sort origin's list by average delay DESC, take top 3 
 
 }) 
 .sortBy(_.delay, ascending = false) 
 .take(3) 
)

The source code can be found here.

2. Top 3 origins with the highest average departure delay

In the next example, we determine the places of departure with the maximum average delay of departure. To do so, we get the data about the carriers with the maximum delay of departure from places of departure.

We follow the same steps as in use case #1, and the results below are what you get using the algorithm scripted with this source code.

Origin: CMX, departure average delay: 92 mins
... Top 3 carriers with highest departure average delay:
...... carrier: 9E, departure average delay: 92 mins
... Top 3 destinations with highest departure average delay:
...... from destination: MSP, departure average delay: 92 mins
etc…

Where:
CMX – Houghton County Memorial Airport
9E – Endeavor Air
MSP – Minneapolis–Saint Paul International Airport

3. Top 3 carriers with the highest average departure and arrival delay

Determine the carriers with the maximum average delay in departure / arrival. You can find the algorithm’s source code here.

And, here is the output:

Carrier: YV, departure average delay: 55 mins, arrival average delay: 57 mins
Carrier: B6, departure average delay: 54 mins, arrival average delay: 58 mins 
Carrier: OH, departure average delay: 48 mins, arrival average delay: 52 mins 
etc…

Where:
YV – Mesa Airlines
B6 – JetBlue Airways
OH – PSA Airlines

4. Top 3 carriers with the highest flight cancellation:

Determine the carriers with the maximum amount of flight cancellations. Here is the algorithm’s source code.

And, here is the output:

Carrier: MQ, flight cancellation count: 14006 
Carrier: OO, flight cancellation count: 12226 
Carrier: AA, flight cancellation count: 11757 
etc…

Where:
MQ – American Eagle Airlines
OO – SkyWest Airlines
AA – American Airlines

5. Top 3 carriers with minimal average departure delays

We find the carriers with minimal average departure delay the same way as in use case #3, and the results below you get using the described algorithm with this source code.

Here is the output:

Carrier: HA, total departure delay: 1, total arrival delay: 2 
Carrier: US, total departure delay: 5, total arrival delay: 2 
Carrier: F9, total departure delay: 5, total arrival delay: 6 
etc…

Where:
HA- Hawaiian Airlines
US- US Airways
F9 – Frontier Airlines

Thereby, to get such results, we have to run the following command in the Spark directory in the terminal:

$ ./bin/spark-submit --properties-file /path/to/rita-analyzer/spark-scylla.conf --class Extractor /path/to/rita-analyzer/target/scala-2.11/rita-analyzer-assembly-1.0.jar

Final Notes

While Scylla cannot reduce your own travel delays (not yet, at least!), it can certainly reduce your latency, cluster size, and other metrics. In the airline industry, fast reading of data is key for analyzing and presenting events in near real time. Scylla is a high-performing database that allows you to do just that. Plus, developers don’t need to change data structures for their existing Cassandra projects. They can apply them directly to Scylla.


Tags: