Tutorials

Deploying PySpark jobs using DC/OS

Jan 10, 2018

Matt Jarvis

D2iQ

7 min read

 
More and more industries are venturing into the realm of big and fast data processing in order to better understand their customers and react more quickly to changes in their businesses. Standardized data processing architectures like the SMACK stack make it easier for any company to become a tech company, since they can learn from the early-adopters in their industries. DC/OS simplifies things further by automating the best practices for installing and running common components of the SMACK stack such as Apache Spark, Mesos, Cassandra, and Kafka.
 
However, the challenges of pioneering a new data strategy aren't solved by simply installing a data processing pipeline. You'll want to process some test data to make sure that the pipeline you've chosen works for you, and you want to make sure that you can deploy your jobs into your production environment.
 
PySpark for Natural Language Processing Pipelines
 
I've recently been working with PySpark, building a natural language processing pipeline demo for DC/OS. This has been a great learning experience, and PySpark provides an easier entry point into the world of Spark programming for a systems guy like myself than having to learn Java or Scala.
 
When you're developing Spark jobs, testing locally is very different from deploying to a cluster, so it's not always straightforward working out how to deploy to a Spark cluster on DC/OS once you think you've got a working job. The Spark docs also aren't particularly clear with respect to Python, especially where you've got dependent libraries involved - a very different case from the Java world of uploading a jar file, and most data folks use Scala or Java, so Google isn't necessarily your friend either.
 
So let's look at the problem space. I have a PySpark job which needs a couple of external Python libraries, numpy and kafka, and also needs an additional Spark module, spark-sql-kafka.
 
When I'm running it locally, I have the Python libraries installed, in this case in a virtualenv, and I supply the --package argument on the Spark to command line to add the spark-sql-kafka package at runtime.
 
 
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 spark_kafka.py
 
 
When I move it to DC/OS, my first problem is how do I include my dependent Python libraries. Spark has a CLI argument --py-files which takes a comma separated list, but for even a small module this would be pretty unwieldy. If we look at the contents of our two dependent modules, we can see they are made up of a whole bunch of different files:
 
 
Mattbook-Pro:site-packages matt$ ls kafka*
 
kafka:
 
__init__.py     client_async.py     codec.py        conn.py         context.pyc     future.py       producer        structs.pyc     version.py
 
__init__.pyc        client_async.pyc    codec.pyc       conn.pyc        coordinator     future.pyc      protocol        util.py         version.pyc
 
client.py       cluster.py      common.py       consumer        errors.py       metrics         serializer      util.pyc
 
client.pyc      cluster.pyc     common.pyc      context.py      errors.pyc      partitioner     structs.py      vendor
 
 
 
kafka-1.3.5.dist-info:
 
DESCRIPTION.rst INSTALLER METADATA RECORD WHEEL metadata.json top_level.txt
 
Mattbook-Pro:site-packages matt$ ls numpy*
 
numpy:
 
__config__.py _distributor_init.py _import_tools.py compat distutils f2py ma polynomial testing
 
__config__.pyc _distributor_init.pyc _import_tools.pyc core doc fft matlib.py random tests
 
__init__.py _globals.py add_newdocs.py ctypeslib.py dual.py lib matlib.pyc setup.py version.py
 
__init__.pyc _globals.pyc add_newdocs.pyc ctypeslib.pyc dual.pyc linalg matrixlib setup.pyc version.pyc
 
 
 
numpy-1.13.3.dist-info:
 
DESCRIPTION.rst INSTALLER METADATA RECORD WHEEL metadata.json top_level.txt
 
 
So obviously we need a different approach. There appears to be some support for virtualenv's directly in Spark, but the only documentation I could find looked very specific to Yarn, and given my limited knowledge of Spark I didn't think I could make it work in the time I had available.
 
I then came across this great post on best practices with PySpark, which although slightly too much for my small project, did give me a clue as to the direction I should be going in. Python can treat a zip file as a directory and import modules and functions from it just like any other directory.
 
Now, with my --py-files argument, and a small code change to my Spark job, I can include the modules I want to and deploy them to my DC/OS cluster.
 
So, let's start at the start and work through our development pipeline.
 
First, we'll create a virtualenv :
 
 
Mattbook-Pro:pyspark matt$ virtualenv venv
 
New python executable in /Users/matt/pyspark/venv/bin/python
 
Installing setuptools, pip, wheel...done.
 
 
Now we'll activate it :
 
 
Mattbook-Pro:pyspark matt$ source venv/bin/activate
 
(venv) Mattbook-Pro:pyspark matt$
 
 
Now let's deploy our required libraries into our new virtualenv :
 
 
(venv) Mattbook-Pro:pyspark matt$ pip install kafka
 
Collecting kafka
 
Using cached kafka-1.3.5-py2.py3-none-any.whl
 
Installing collected packages: kafka
 
Successfully installed kafka-1.3.5
 
(venv) Mattbook-Pro:pyspark matt$ pip install numpy
 
Collecting numpy
 
Using cached numpy-1.13.3-cp27-cp27m-macosx_10_6_intel.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl
 
Installing collected packages: numpy
 
Successfully installed numpy-1.13.3
 
 
Virtualenv has installed those libraries into venv/lib/python2.7/site-packages/ so let's find those packages and add them to a zip file. We don't need anything else out of our virtualenv, just the two deployed packages.
 
 
(venv) Mattbook-Pro:pyspark matt$ cd venv/lib/python2.7/site-packages/
 
(venv) Mattbook-Pro:site-packages matt$ zip -r libs.zip numpy kafka
 
 
So if we supply our libs.zip along with our Spark job using the --py-files argument, the zip file will be deployed to the same directory as the Spark job itself. In order for our Spark job to include those libraries we need to make a small change to our PySpark code to get it to add that directory to its path :
 
 
import sys
 
import os
 
if os.path.exists('libs.zip'):
 
sys.path.insert(0, 'libs.zip')
 
 
We make this change at the top of our Spark job, before any of the imports contained in libs.zip, basically saying if this zip file exists then add it to the path using sys.path.insert
 
The DC/OS Spark CLI extension supports the --py-files argument, so we can simply deploy our libs.zip somewhere accessible and include the URL in our DC/OS command line. In my case, I'm going to use Github to host both the job and the libs.zip, so the start of my command will be something like :
 
dcos spark run --submit-args="--py-files=https://raw.githubusercontent.com/mattj-io/spark_nlp/master/libs.zip 
 
https://raw.githubusercontent.com/mattj-io/spark_nlp/master/spark_kafka.py"
 
Note that any arguments for Spark directly precede the job on the DC/OS CLI, arguments after the job are interpreted as to be passed to the job itself.
 
My next problem is that I need to include a Spark library, spark-sql-kafka, for which I only have Maven coordinates. The DC/OS Spark CLI extension doesn't support the --packages switch, so how can I pass this into Spark? Yes, I could manually go into the cluster, start running things by hand, but my aim here is full automation so I want to just be exercising the DC/OS CLI directly.
 
After a lot of hair pulling and googling, I noticed the DC/OS CLI has a --submit-args switch to set Spark configuration values :
 
--conf=PROP=VALUE ... Custom Spark configuration properties.
 
This then led me to the Spark docs, and after trawling through the Spark configuration reference, I found the configuration entry spark.jars.package :
 
Comma-separated list of Maven coordinates of jars to include on the driver and executor classpaths.
 
The coordinates should be groupId:artifactId:version. If spark.jars.ivySettings is given artifacts will be
 
resolved according to the configuration in the file, otherwise artifacts will be searched for in the local
 
maven repo, then maven central and finally any additional remote repositories given by the command-line
 
option --repositories. For more details, see Advanced Dependency Management.
 
This looked like it was what I needed, so now my DC/OS CLI string looked like :
 
dcos spark run --submit-args="--py-files=https://raw.githubusercontent.com/mattj-io/spark_nlp/master/libs.zip --conf=spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 https://raw.githubusercontent.com/mattj-io/spark_nlp/master/spark_kafka.py"
 
When I ran this, it actually worked! Well, nearly. After a few minutes, my job died with the Spark logs complaining about no Spark Context. Heading back to the Spark docs, I realised that the default for driver memory allocation is only 1GB, so I uppped that to 2GB in my command line, and the next run worked perfectly!
 
My final CLI looked like this :
 
dcos spark run --submit-args="--driver-memory 2048M --py-files=https://raw.githubusercontent.com/mattj-io/spark_nlp/master/libs.zip --conf=spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 https://raw.githubusercontent.com/mattj-io/spark_nlp/master/spark_kafka.py"
 
So there we have it, a working method for deploying PySpark jobs to DC/OS, along with dependent libraries and additional Spark packages.

Ready to get started?