Design Pattern Tricks for PySpark

Hi there!

Apache Spark has been written in Scala originally, although Python developers are loving it’s wrapper-known as PySpark. One can work with RDD’s and dataframes in Python too.

We,data science team @Talentica, love PySpark and mostly rely on Spark Clusters for data processing and other relevant stuffs. Recently, we faced one challange which is very important to be addressed.

Spark Context

Whenever in need, one can initialize the Spark Context in their py file and reuse it. You might get this error when returning spark context from a function.

def get_spark_context():
    conf = SparkConf().setAppName('demo-app-name').set('spark.executor.memory', '8g').set('spark.driver.memory', '4g')
    sc = SparkContext(master='local[10]', conf=conf).getOrCreate()
    return sc

The error looks kile this:

ValueError: Cannot run multiple SparkContexts at once;

Or, if you may get away with this error if you create it as global parameter in a py file and import it again and again depending upon your use.

You might notice that every import leaves a re-init of spark context. Even if there is no error on run-time, but your whole data processing step gets slower on heavy use.

So, for overcome on this issue; we laid our hopes on Object Oriented Design Principles and try to implement a Singleton Pattern for same.

Enough with the story for now, coming right back to code.

__author__ = "mrityunjay.kumar@talentica.com"

from pyspark import SQLContext, SparkContext, SparkConf
class GetSparkContext:
    """ A singleton backbone for Spark Context Creation"""

    class __internal:
        """ Implementation of the singleton interface """

        def __init__(self):
            self.conf = SparkConf().setAppName('app-name'). \
                set('spark.executor.memory', '16g'). \
                set('spark.driver.memory', '16g')
            # correct way to initiate spark context
            self.sc = SparkContext(master='local[10]', conf=self.conf).getOrCreate()
            self.sql_context = SQLContext(self.sc)

        def get_instance(self):
            """ instance retrieval method, return spark context """
            return self.sc

        def get_sql_context(self):
            """ instance retrieval method, return sql context for dataframes """
            return self.sql_context

    # storage for the instance reference
    __spark_instance = None

    def __init__(self):
        """ Create singleton Spark instance """
        # Check whether we already have an instance
        if GetSparkContext.__spark_instance is None:
            # Create instance
            GetSparkContext.__spark_instance = GetSparkContext.__internal()

        # Store instance reference as the only member in the handle
        self.__dict__['SparkInstance'] = GetSparkContext.__spark_instance

    def __getattr__(self, attr):
        """ Delegate access to implementation """
        return getattr(self.__spark_instance, attr)

This class will help to access the spark as well as sql context without any errors or warnings.

Sample Use in python:

# for spark instance
sc = GetSparkContext().get_instance()
# for sql context
sql_context = GetSparkContext().get_sql_context()

Hope this helps.