Apache Zeppelin: Big data prototyping and visualization in no-time

Thursday, Jul 28th, 2016

Lately the name Zeppelin crossed our minds several times. Keeping in mind the daily release of a new big data tool and the mostly disappointing impression you get when diving into those tools, we silently ignored Zeppelin for the time being. After the ongoing encouragement of several colleagues we finally decided to take a look at Apache's latest flying machine: Can it make us fly?

What is Apache Zeppelin?

So what is Apache Zeppelin? Users of IPython notebooks are already familiar with the concept of an interactive web-based computational environment. Apache Zeppelin provides a web-based notebook that enables interactive data analytics. The main focus of Zeppelin strikes data ingestion, discovery, analytics, visualization and collaboration. Though IPython notebooks can also be used to provide data-analytics with Spark, they do not provide the out-of-the box data optimizations that are built into Zeppelin.

The general interpreter concept allows any language or data processing backend to be plugged into Zeppelin. Though many interpreters exist out of the box (Spark, Python, Hive, Cassandra, ElasticSearch, Flink, …), you can also write your own interpreter if needed. We will focus on Spark, both in Scala and Python, for the remainder of this article.

Installation

Excited? Let’s get our hands dirty. Zeppelin can be installed and configured in multiple ways depending on your needs. As we are focussing on Spark for this session, we’ll cover three installation types:

  • Local Zeppelin - Local Spark cluster
  • Local Zeppelin - Remote Spark cluster
  • Remote Zeppelin - Remote Spark cluster (AWS)

Local Zeppelin - Local Spark cluster

The easiest way is a local installation of Zeppelin with the out-of-the-box included local Spark version. You can download a binary package with all interpreters here. This demo is based on version 0.6.0 which is the latest version at the time of writing. Unpack the package somewhere on your computer and start Zeppelin:

bin/zeppelin-daemon.sh start

That's about it, visit Zeppelin at http://localhost:8080 in your web browser.

By default the local Spark interpreter is used with the embedded Spark version, this version depends on the Zeppelin build. You can configure another Spark version by setting the SPARK_HOME variable on your system or in the conf/zeppelin-env.sh file. Read the documentation for more information about the configuration of Spark on Zeppelin.

Local Zeppelin - Remote Spark cluster

If you really want to crunch Big data, you’ll need an actual cluster. You can simply connect to a remote cluster by specifying the HADOOP_CONF_DIR in the conf/zeppelin-env.sh file. The Hadoop configuration basically only needs a Yarn resource manager address to connect to the remote cluster. The Spark master URL needs to be configured as yarn-client as Zeppelin does not work in yarn-cluster mode. Note that your notebook is running local (so you can read and write from and to your local file system) but the Spark jobs are actually running on the cluster.

Although Zeppelin should connect to a remote cluster easily, we encountered some issues while connecting our local Zeppelin to a remote AWS cluster. There seems to be an open issue when connecting Zeppelin to a Yarn cluster. For the time being its recommended to use the embedded Zeppelin in AWS EMR as discussed in the next paragraph.

Remote Zeppelin - Remote Spark cluster (AWS)

By far the easiest option is to spin up an AWS EMR cluster with Zeppelin already attached. The only downside is that you're bound to the Amazon supported version of Zeppelin, which is unfortunately 0.5.6 for now.

Sign in with your AWS account and create a new cluster in the EMR services. You'll have to choose the advanced options tab and manually select Zeppelin from the available software packages.

Hadoop, Spark and Zeppelin should be enough for a basic setup. Besides the software packages you could leave all other settings in default mode, don't forget to select an EC2 key pair to connect to your cluster.

Set-up a dynamic port forwarding tunnel as described on the EMR cluster page to connect to all web services from your cluster. That should be all, visit the Zeppelin URL provided by the connection information on the AWS cluster page.

Usage

Alright, Zeppelin is up and running so we're ready to write some code. The Spark interpreter automatically provides a SparkContext, SQLContext and ZeppelinContext. The latter is used to share dependencies among the interpreters. Note that Python and Scala share the same SparkContext and SQLContext, which is a good thing as it enables RDD and Spark SQL table sharing among both languages.

Given the name of the tool it was pretty straightforward to select an appropriate data set to analyse, we used an airline data set which contains American flight information from 1987 till 2008. The total size of the data set is about 12 GB, upload all data sets to an S3 bucket for a starting point.

Our demo notebook is available on GitHub. You'll have to import the json file into your running Zeppelin. For those of you who don't like to spin up their own Zeppelin, there is a readable image of the complete notebook available here.

Spark

Let's write our first Spark notebook. The default Zeppelin interpreter is Spark (with Scala), other interpreters can be specified at the beginning of each section: %python, %pispark, %sql, %elasticsearch, ....

Parsing a CSV file into a DataFrame is natively supported as of Spark 2.0. As Zeppelin does not yet support Spark 2, at least not straightforward, we'll need to load the Databricks csv dependency to work with csv files in Spark. As of version 0.6.0 of Zeppelin the dependency management features are completely refactored and shared among the different interpreters. As described in the documentation it is pretty easy to add a new dependency. If you are working with version 0.5.6 or earlier, you'll need to add dependencies via the ZeppelinContext:

%dep
z.reset()
z.load("com.databricks:spark-csv_2.10:1.4.0")

Make sure to run the above paragraph first whenever starting or restarting the Spark interpreter, you'll get strange errors otherwise. Okay, we're all set to load the data into a Spark DataFrame now:

val inputPath =  "s3://zeppelin-blog/data/*"
val airTraffic = sqlContext.read
        .format("com.databricks.spark.csv")
        .option("header", "true")
        .option("delimiter", ",")
        .option("inferSchema", "true")
        .load(inputPath)

We could add some additional features for our analysis and visualizations:

val calcDayOfYear = udf(
    (dayOfMonth: Int, month: Int, year: Int) => { 
        val dateFormat = DateTimeFormat.forPattern("dd/MM/yyyy")
        dateFormat.parseDateTime(s"$dayOfMonth/$month/$year").getDayOfYear()
    }
)
val calcRoute = udf(
    (origin: String, dest: String) => s"%origin - %dest"    
)
val featuredTraffic = airTraffic
    .withColumn("DayOfYear", calcDayOfYear(airTraffic("DayOfMonth"), airTraffic("Month"), airTraffic("Year")))
    .withColumn("Route", calcRoute(airTraffic("Origin"), airTraffic("Dest")))

Spark SQL

Register the DataFrame as a temporary Spark SQL table.

featuredTraffic.registerTempTable("air_traffic")
sqlContext.cacheTable("air_traffic")

Let the fun begin: Zeppelin has a built-in query and visualization tool for querying Spark SQL tables. Just use the %sql interpreter and play with the visuals:

%sql
select DayOfYear, count(*) as NrOfFlights, avg(DepDelay) as AvgDepDelay, avg(ArrDelay) as AvgArrDelay from air_traffic group by DayOfYear having NrOfFlights > 100000

We've just launched a query on 12 GB of data and received an instant (okay, it took 17 minutes but we could scale up if needed) visual result!

Another cool feature of Zeppelin is the support of dynamic forms to integrate user input into your notebooks. You could for example make an interactive query where the airport and day of the week are configurable by the user.

It may seem pretty standard, but we enable interactive Spark queries with a visual UI this way, which is not that common with the currently available big data tools. Read the docs for more information about dynamic form integration in Zeppelin.

Python

Besides Scala you could also use PySpark as an alternative, though the Scala API is more mature in my opinion. Cool thing about Zeppelin is you can mix and match both as they share the same SparkContext. Zeppelin has a very limited set of visuals, we could easily use our SQL table from above and make other visuals with Python libraries. As most Python visualization libraries work on Panda's dataframes, there is a very nice function toPandas() which can be applied on a Spark DataFrame to convert it into a Pandas DataFrame. That's basically all we need to build a heatmap in Python based on our Spark SQL table:

df = sqlContext.sql("SELECT Dest, Month, count(*) as NrOfFlights, avg(ArrDelay) as AvgArrDelay FROM air_traffic group by Dest, Month")
data = df.toPandas() 

Note that we used a little workaround to use matplotlib inline here.

Finally let's a build a wordcloud with all routes based on the number of flights.

That was our brief introduction to Apache Zeppelin. There are some more advanced features like the sharing of your notebooks or scheduled runs of certain notebooks as well, though you should get a good understanding of the basic functionality by now.

Conclusions

Apache Zeppelin certainly convinced us as a prototyping tool voor (big) data analysis. Besides the on-the-fly available Spark and SqlContext and the ability to mix and match between Scala and Python, the querying features with automatic visualizations are a great pro for instant data exploration. There are still some minor bugs but we believe Zeppelin could become a de facto standard for big data analysis in the near feature. We are certainly curious about your feedback regarding this blog and the tool, have fun with Zeppelin!

Joris
Data Architect

Add new comment

Image CAPTCHA