This tutorial assumes you have a proper environment setup to access the DAPLAB cluster and that you have a basic understanding of Spark.

This page will familiarize you to Python Spark. We will first expriment with the PySpark Python interpreter, then try to launch a real PySpark job.


PySpark is the Spark API for python. The key differences with Java or Scala are:

Spark basic concepts are explained in our Spark tutorial.


Here are some pretty good tutorials and resources online:


PySpark is available on the DAPLAB. To use it, follow these simple steps:

1) choose which python version you will use (instructionshere)

2) set the pyspark interpreter to the correct python version (python2.7 or python3):

export PYSPARK_PYTHON=/usr/bin/python3 # we will use python3 for this tutorial

3) set the spark home variable:

export SPARK_HOME=/usr/hdp/current/spark-client/
export PYTHONPATH=${SPARK_HOME}/python:${SPARK_HOME}/python/build:${SPARK_HOME}/python/lib/

4) make hadoop libraries available to pyspark:

export HADOOP_HOME=/usr/hdp/current/hadoop

To ease everything, you can put all the export statements in your .bashrc. Don't forget to then reload it: source ~/.bashrc.

You are ready to go !

PySpark interpreter

Launch the interpreter with the command pyspark. In the interpreter, the spark context is already created for us and available through the sc variable. By default, it runs applications locally with a single core.


Before starting, download the text of alice in wonderland and load it into hdfs (from the terminal, not pyspark):

hdfs dfs -put 11.txt.utf-8 /user/USERNAME/alice_in_wonderland.txt

In pyspark, we will count the recurrence of every word and list the ten most used ones.

First, load the content of the file into an RDD using sc.textFile and remove blank lines:

alice = "/user/USERNAME/alice_in_wonderland.txt"
lines = sc.textFile(alice).filter(lambda x: len(x) > 0)

We have the RDD, let's work on it:

import re
words = lines.flatMap(lambda x: re.split(r" +", x)).filter(lambda x: len(x) > 0)
counts = word: (word, 1)).reduceByKey(lambda a, b: a+b)

    [('hastily', 7), ('ornamented', 2), ('volunteer', 1), ('sea.', 3), ("'tis", 2), ('Just', 5), ('dark', 3), ("can,'", 1), ("procession,'", 1), ('positively', 1)]

SO, first we split lines into words and filter empty words. Then, we create a tuple with count 1 for each word and ask spark to reduce by key: the key here is the first field of the tuple (the word). The lambda argument of reduceByKey defines what to do with the values: here we just sum them.

We can rewrite it (and improve it) in a single pipeline of transformations like this:

import re

wordcounts = lambda x: re.sub(r'[^A-Za-z]+', ' ', x).lower()) \
        .flatMap(lambda x: x.split()) \
        .map(lambda x: (x, 1)) \
        .reduceByKey(lambda x,y:x+y) \
        .map(lambda x:(x[1],x[0])) \


    [(1818, 'the'), (940, 'and'), (809, 'to'), (690, 'a'), (631, 'of'), (610, 'it'), (553, 'she'), (545, 'i'), (481, 'you'), (462, 'said')]

Here, we replace all non-ascii character by space and also sort the results in descending order. To go further, add a map stage to also filter all words of length less than 4.

Working with DataFrames

In Spark, a DataFrame is a distributed collection of data organized into named columns. Users can use DataFrame API to perform various relational operations on both external data sources (Cassandra, SQL, etc) and Spark’s built-in distributed collections without providing specific procedures for processing data.

The advantages ? You get a more structured RDD and programs based on DataFrame API will be automatically optimized by Spark’s built-in optimizer, Catalyst.

For more information, have a look at this article.

First, download flights-2008.csv and put it in your hdfs folder. It contains some flight information in csv format.

For this example, we will use spark-csv, which requires to start the console with the command pyspark --packages com.databricks:spark-csv_2.11:1.1.0

df ="com.databricks.spark.csv").option("header", "true").load("2008.csv")

Here, we just loaded the csv file. Since it has headers, spark can automatically infer the columns, but they are all considered strings. To change that, we define a function which casts a column, then insert it back into the schema:

def convertColumn(df, name, new_type):
    # 1. rename col
    # 2. cast the column into new_type
    # 3. add the casted column to the df, under name
    # 4. drop the old column
    df_1 = df.withColumnRenamed(name, "swap")
    return df_1.withColumn(name, df_1["swap"].cast(new_type)).drop("swap")

df = convertColumn(df, "FlightNum", "int")
df = convertColumn(df, "DepDelay", "int")
df = convertColumn(df, "ArrDelay", "int")

Ok, now let's play with the data:

from pyspark.sql import functions as F # use SQL concepts
averageDelays = df.groupBy(df["FlightNum"]).agg(F.avg(df["ArrDelay"]), F.avg(df["DepDelay"]))


|FlightNum|     avg(ArrDelay)|avg(DepDelay)|
|     3231|               2.0|         19.0|
|     3831|              17.0|         25.0|
|      231|               5.0|         12.0|
|     2631|              45.0|         60.0|
|      631|               6.0|          4.0|
|     1232|              11.0|         12.0|
|     2032|              20.0|         26.0|
|     2632|              32.0|         47.0|
|      632|              63.0|         77.0|
|     3232|              21.5|         34.0|
|     1032|              22.0|         20.0|
|      232|              35.0|         31.0|
|      832|              -7.0|         -4.0|
|     3832|              96.0|        101.0|
|      432|              24.0|         12.0|
|      433|30.666666666666668|         22.0|
|     3233|               3.0|          2.0|
|       33|              17.0|         24.5|
|     1033|               0.0|         17.0|
|     1034|               9.0|         -5.0|

Other examples (see the doc for all available functions):

 # Correlation between two columns
df.corr("ArrDelay", "DepDelay") # = 0.9647874675196805
# compute statistics for numeric columns
# select distinct cells"FlightNum").distinct().count()

pyspark standalone programs

To use the power of spark on YARN clusters, we must write a PySpark standalone application and launch it with spark-submit.


The basic template of a pyspark application is the following:

## Spark Application - execute with spark-submit

## Imports
from pyspark import SparkConf, SparkContext

## Module Constants
APP_NAME = "My Spark Application"

## Closure Functions

## Main functionality

def main(sc):

if __name__ == "__main__":
    # the app name can be used to track the app
    # status with yarn application --list or through
    # the spark UI
    conf = SparkConf().setAppName(APP_NAME)

    # the line below is optional: you can also configure
    # the master mode with the option spark-submit --master
    # conf = conf.setMaster("local[*]")

    # create the spark context
    sc   = SparkContext(conf=conf)

    # Execute Main functionality

launch commands

!! In the daplab, don't forget to load the correct python module and to export the environment variables as described at the beginning of the article before proceeding.

# basic submission using default values
# more complex example:
spark-submit --master yarn-client \
    --num-executors 10 \
    --driver-memory 20G \
    --executor-memory 10g \
    --conf spark.akka.frameSize=100 \
           spark.shuffle.manager=SORT \
           spark.yarn.executor.memoryOverhead=4096 \