Text Clustering at scale

Spark Logo

Hello there! Today we will explore the overview of databricks © clusters and how to run the model using community account.

Prerequisite

Basic understanding of programming in Python or Scala. Knowledge or experience in Java, SQL, PySpark can be beneficial but is not essential.

Objective

After reading this blog, readers will be able to:

  • Use the core Spark APIs to operate on text data.
  • Build data pipelines and query large data sets using Spark SQL and DataFrames.
  • Use core Spark MLlib to perform clustering.

Text Clustering

Clustering of text data is a simple concept but elegant algorithm and many python libraries provide API for it. In particluar, people use K-Means clustering for making a cluster out of textual data.

scikit-learn approach


from sklearn.cluster import KMeans
# Number of clusters
kmeans = KMeans(n_clusters=20)
# Fitting the input data
kmeans = kmeans.fit(X)
# Getting the cluster labels
labels = kmeans.predict(X)
# Centroid values
centroids = kmeans.cluster_centers_

When it start to give trouble with performance

Clustering API in scikit-learn library requires the whole data to be loaded in local/physical memory, which in turn blocks the I/O and almost freezes your machine.

In my scenario, I was testing with 10K lines and 100 features. The whole process was taking around 10-12 minutes on my Mac Machine (SSD: 256GB/ RAM: 8GB). At this moment, there was no need of thinking in the distributed computing environment.

However, When I was handling aonther scenario with same 10K lines but 4000 features; the process completed after running for 1.5 hours; sometimes python process got killed/stuck in some dead I/O state.

PySpark comes to rescue

PySpark is one of the famous wrappers to trigger Spark® core API to compute in the distributed environment. After reading a bit about pyspark, I was able to complete my text clustering task in around 10-12 minutes. The trick was, instead of loading all data into memory, we will store in RDD and pass this RDD to KMeans API in PySpark MLlib.

Take a look at the code

from pyspark import SparkContext, SQLContext, StorageLevel
from pyspark.ml.feature import HashingTF, Tokenizer, RegexTokenizer, StopWordsRemover, CountVectorizer, NGram, IDF, Normalizer

# create a Spark Context
sc = SparkContext(master='local[12]')
sc.setSystemProperty('spark.executor.memory', '8g')
sc.setSystemProperty('spark.driver.memory', '8g')
data_as_RDD = sc.textFile("file.txt")
sqlcontext = SQLContext(sc)

# now rdd is loaded
# you can inspect with print(rdd.take(1))

# since we are using MLlib Pipeline
# it is recommended to use dataframes

# create index
rdd = data.zipWithIndex().map(lambda x: x)
# convert rdd to dataframe
docDF = sqlcontext.createDataFrame(rdd, ["text", "id"])


# convert to feature vector
# step 1 # tokenize
reTokenizer = RegexTokenizer(inputCol="text", outputCol="words")
tokenized_df = reTokenizer.transform(docDF)

# step 2 # vectorize
# count vectorizer
# TF
hashingTF = HashingTF(inputCol="words", outputCol="tf", numFeatures=20)
featurizedData = hashingTF.transform(tokenized_df)
# IDF
idf = IDF(minDocFreq=3, inputCol="tf", outputCol="idf")
idf_DF = idf.transform(featurizedData)

# very important, else you will get error regarding SparseVectors
from pyspark.mllib.linalg import Vectors
rdd = result.rdd.map(lambda x: Vectors.fromML(x["idf"]))

# KMEANS
model = KMeans.train(rdd, k=20)
labels = model.predict(rdd).map(lambda x: x).collect()

Why we need dedicated cluster compute feature

A computer cluster is a set of loosely or tightly connected computers that work together so that, in many respects, they can be viewed as a single system Wiki

Some reasons can be coined as :

  • High-Availability (HA)
  • load balancing in a scalable manner
  • redundancy and fault tolerance

A friendly introduction to Databricks


DataBricks Logo

Databricks is a company founded by the creators of Apache Spark, that aims to help clients with cloud-based big data processing using Spark. Databricks has developed a web-based platform for working with Spark, that provides automated cluster management and IPython-style notebooks.

We will not go beyond our aim in explaining inner working of databricks, however interested readers can follow up this blog for more details.

How did I leverage databricks community account for personal ML Projects

I have been a fanboy of databricks from the very start and amazed by its performance and reliability.

To complete our task for text clustering, we have to understand certain topics:

  1. WorkSpace

    • This holds all user-defined regions
    • Under your account, you have to create a notebook by right-clicking on it
    • This notebook is based IPython
    • You can’t run any code unless your cluster instance is running
  2. Clusters

    • You must have to create a cluster to run IPython notebooks
    • You may get prompt to attach cluster to one or many notebooks
  3. Data

    • Generally, you will use the default database
    • You can create multiple tables with unique name
    • This table will be instantly accessible at your notebook
    • You can query with SQL statement to get data out of the table
    • To load your own data, click on create a new table
      • Drop files to upload, or browse.
      • Load from Amazon S3, etc
    • After finishing the import, do the following
      • Create a table with UI
      • Select cluster
      • Click on the preview to see the rows
      • It will automatically figure out the filetype
      • I have used both json and csv
      • Rename column name if required
    • Finish

Running Text Clustering Databricks Notebook

from pyspark import SparkContext,SQLContext
# this is important as you already attached your notebook with cluster, so you can't create new as you were
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
lines = spark.sql("SELECT * FROM <table_name>")

# to view data
display(lines)

# to complete out clustering task
# we have to select the `text` column
# writing x[0] will give you raw text
rdd = lines("text").rdd.map(lambda x:x[0])

docDF = spark.createDataFrame(rdd, schema = ["text","id"])
# Proceed with the same code as earlier.

Result : Whole session completes in 1.334 minutes including clustering step

Data/Model Export from Databricks

You can leverage databricks internal dbfs for file storage.

with open('/dbfs/tmp/out.csv') as fs:
    fs.writelines(lines)

If you want models or output files to be exported to local; you might have to go via S3.

There are a huge collection of features and external integration which databricks support, feel free to explore them, link.

Summary

Local Environment MacBook Pro (SSD:256GB / RAM:8GB)

  • Text clustering with 10K lines and 100 features using sklearn takes 12 minutes to run
  • Text clustering with 10K lines and 4K features using sklearn takes more than 1.5 hours to run
  • Text clustering with 10K lines and 4K features using pyspark takes 10-12 minutes to run
  • Text clustering with 100K lines and 10K features using pyspark breaks with Error

DataBricks Cluster (Community Edition)

  • Text clustering with 10K lines and 100 features using pyspark takes less 34.96 seconds to run, excluding display lines
  • Text clustering with 10K lines and 4K features using pyspark takes 1.344 minutes to run, including all lines

Enjoy

Mrityunjay