Hi there!
Apache Spark has been written in Scala originally, although Python developers are loving it’s wrapper-known as PySpark. One can work with RDD’s and dataframes in Python too.
We, the data science team @Talentica love PySpark and always rely on Spark Clusters for data processing and other relevant stuffs. Recently, we faced one challange which is very important to be addressed.
Spark Context
def get_spark_context():
conf = SparkConf().setAppName('app-name').set('spark.executor.memory', '8g').set('spark.driver.memory', '4g')
sc = SparkContext(master='local[10]', conf=conf).getOrCreate()
return sc
Whenever in need, one can initialize the Spark Context in their py file and reuse it. You might get this error when returning spark context from a function.
ValueError: Cannot run multiple SparkContexts at once;
Or, if you may get away with this error if you create it as global parameter in a py file and import it again and again depending upon use. You might notice that every import leaves a re-init of spark context. Even if there is no error on run-time, but your whole data processing step gets slower in heavy use.
So, for overcoming this issue; we laid our hopes on Object Oriented Design Principles and try to implement a Singleton Pattern for same.
Enough with the story for now, coming right back to code.
__author__ = "mrityunjay.kumar@talentica.com"
from pyspark import SQLContext, SparkContext, SparkConf
class GetSparkContext:
""" A singleton backbone for Spark Context Creation"""
class __internal:
""" Implementation of the singleton interface """
def __init__(self):
self.conf = SparkConf().setAppName('app-name'). \
set('spark.executor.memory', '16g'). \
set('spark.driver.memory', '16g')
# correct way to initiate spark context
self.sc = SparkContext(master='local[10]', conf=self.conf).getOrCreate()
self.sql_context = SQLContext(self.sc)
def get_instance(self):
""" instance retrieval method, return spark context """
return self.sc
def get_sql_context(self):
""" instance retrieval method, return sql context for dataframes """
return self.sql_context
# storage for the instance reference
__spark_instance = None
def __init__(self):
""" Create singleton Spark instance """
# Check whether we already have an instance
if GetSparkContext.__spark_instance is None:
# Create instance
GetSparkContext.__spark_instance = GetSparkContext.__internal()
# Store instance reference as the only member in the handle
self.__dict__['SparkInstance'] = GetSparkContext.__spark_instance
def __getattr__(self, attr):
""" Delegate access to implementation """
return getattr(self.__spark_instance, attr)
This class will help to access the spark as well as sql context without any errors or warnings.
Sample Use:
# for spark instance
sc = GetSparkContext().get_instance()
# for sql context
sql_context = GetSparkContext().get_sql_context()
Hope this helps.