apache_spark api_first data_science machine_learning pivotal_cloud_foundry smart_apps

Operationalizing PySpark Data Science Models on Pivotal Cloud Foundry

Joint work by Andreas Fleig (Senior Software Engineer) and Dat Tran (Senior Data Scientist).

Apache Spark is the first choice tool for dealing with large data sets for many people. It is very popular among data scientists, in particular, due to its simple and elegant machine learning API. However, though it is very easy to train and cross-validate machine learning models on Spark, deploying models, e.g. as RESTful API services, is not trivial. We had yet to find any solution that makes deployment blindingly easy. This is also commonly known as the last mile problem. Pivotal Cloud Foundry, meanwhile, allows software developers to easily deploy their applications in a matter of seconds. We wondered if Pivotal Cloud Foundry could provide the same benefits for Spark applications as well.

Apache Spark Ecosystem

Apache Spark provides a simpler abstraction layer to work with distributed data than the Hadoop MapReduce framework. Spark itself, like Hadoop, has its own ecosystem consisting of several components (Figure 1):

Figure 1: Apache Spark Ecosystem.

In our case, Spark’s machine learning library, called MLlib, is of particular interest. MLlib provides common algorithms for supervised and unsupervised learning tasks. Handy features for feature engineering and model evaluation are also part of the library. The library itself comes with two main APIs: a RDD-based API and DataFrame-based API. The former is expected to be removed soon though.

 

From Model Building to Model Operationalization

On a high level, a data science project can be roughly split into two phases: an exploration phase and a production phase. In the exploration phase, we experiment with different models and approaches to solve the problem. In the production phase, we want to expose our model as predictive APIs in, for example, a web application. This is particularly important as only models that are in production bring any value.

In the case of Spark, we train a model in a distributed cluster environment (e.g. on AWS) on Big Data datasets (typically batches of data) during the exploration phase. Once a model is built, we perform predictions during the production phase, where low latency requests play a critical role. The production phase is also characterized by small amounts of input data. There are several approaches we can take. For example, MLlib offers the possibility to export models to production with Predictive Model Markup Language (PMML). This approach has various limitations, however, including that not all models can be exported to PMML and that it is currently restricted to the RDD-based API, which is going to be deprecated soon.

Another solution is to use MLeap, which is described as a common serialization format and execution engine for machine learning pipelines. Essentially, model pipelines trained in Spark are exported and then can run outside of Spark via a helper library that includes some of Spark’s core and MLlib components. The main advantage of using MLeap is that its dependencies are more lightweight than Spark’s and it uses a faster data structure than Spark to execute ML pipelines. However, like PMML, not all models can be exported and adding another library is not a natural fit but more like overhead in a project.

Another alternative, which our team favored in the past, is to re-implement the models in another language or environment. We extract specific characteristics of the model, such as the weights and intercepts for linear regression or cluster centers in k-means, and use those to implement the same math in our application. This is not trivial, however, as some models are quite complicated to re-implement. Therefore, it would be great if there was a simpler way to deploy models.

 

Model Deployment on Cloud Foundry

In an earlier posts on the Pivotal Engineering Journal, we described how to deploy a machine learning model using Pivotal Cloud Foundry. In that case, we deployed a Flask app that detects digits from handwriting using a deep learning model trained with Keras. This is easy due to the official Python buildpack. For this reason, we wondered whether it would be possible to extend the buildpack to run PySpark applications, Spark’s Python API, on Pivotal Cloud Foundry. The reason to focus on Python alone, despite the fact that Spark also supports Scala, Java and R, is due to its popularity among data scientists. Moreover, Python is the primary language used by our team.

Buildpack

The PySpark buildpack is based on the Python buildpack and adds a Java Runtime Environment (JRE) and Apache Spark. This allows developers to leverage Conda or PyPI packages as well as the libraries that come with Spark/PySpark. Apps can just assume that Spark is available and need no further configuration – deploying the whole solution becomes super easy.

Having Spark installed, we can use spark-submit to run the application locally in standalone mode on each app instance. While this approach sets a limit on the problem size, it allows us to auto-scale apps using Pivotal Cloud Foundry’s elastic runtime. If the model is pushed along with the rest of the app, our app doesn’t even need a central model store and should scale very well. Further, it benefits from all the other production-grade behaviors that Pivotal Cloud Foundry adds to our app for free, such as effortless deployment, logging and operations in general.

 

Using the Buildpack

Due to the nature of Spark, there are a few things we need to take care of when deploying our PySpark apps. During the staging step of cf push, the buildpack will download both Spark and a JRE. The app needs to have enough disk space and memory to accommodate everything. We recommend starting with 2GB disk quota and 2GB of memory if using the default configuration. Subsequently, we can reduce the quota if we tune our settings a little bit more, meaning we can reduce the amount of memory used per executor process on Spark (by default it is 1GB).

Downloading Spark takes some time, so the staging process will be a bit slower than usual. The buildpack tries to cache both Spark and the JRE. Depending on the Pivotal Cloud Foundry version or the setting dea_next.staging_disk_limit_mb in your Pivotal Cloud Foundry environment, the size of the build artifacts cache might exceed the disk limit and result in a push error. For this case, we have added an option to disable the cache (see README). Once the app is started, creating a SparkSession can take a few seconds (since the release of Spark 2.0, the main entry point is a SparkSession instead of a SparkContext). We recommend creating the session upfront and reusing it between requests.

Finally, the Spark versions used to build and load models should match. The version installed by the buildpack can be configured in the configuration file spark_runtime.txt.

 

Example

Now we want to show a simple application using the new PySpark buildpack. The full example can be found here. In this example, we will train a very simple linear regression model using self-generated data.

The goal of linear regression is simply to model the relationship between a dependent variable y and one or more explanatory variables denoted as X (see Figure 2):

Figure 2: Linear regression equation.

In our case we only use one explanatory variable for simplicity. First we will generate the data (Figure 3):

x = np.arange(100)
error = np.random.normal(0, size=(100,))
y = 0.5 + 0.3 * x + error

Figure 3: Generated data with normally distributed errors.

Figure 3 shows the self-generated data with normally distributed errors. Then we need to convert the data to a Spark DataFrame and shape it into the right format so that we can use the MLlib API:

data = pd.DataFrame([(i, j) for i, j in zip(x, y)], columns = ["x", "y"])

data_spark = spark_session.createDataFrame(data)

df = spark_session.createDataFrame((data_spark
 .rdd
 .map(lambda row: (row[1], 0.5, Vectors.dense(row[0]))) 
), ["label", "weight", "features"])

Note: Usually, we would load data stored on a distributed file system like HDFS or on an MPP database like Pivotal Greenplum, in which case some of these steps will look different. Moreover, the datasize would also be much larger.

Now we can fit the model:

lr = LinearRegression(maxIter=5, regParam=0.0, solver="normal", weightCol="weight")
model = lr.fit(df)

The fitted parameters that we get can then be viewed with:

model.coefficients
model.intercept

You can find an instance of a test run on our notebook. Figure 4 also shows the fitted line with our generated data:

Figure 4: Fitted line with the generated data.

We can now store the model. The model is stored in a parquet format, which we will push along our app in Pivotal Cloud Foundry. In general, it would be better to serialize the model by, for example, using pickle and then store it in a cache layer. But this is not so easy due to the distributed nature of Spark.

Finally, we used Flask to create a simple app that takes an integer value as input and returns a JSON object with the prediction. We pushed this app to Pivotal Web Services (PWS), a hosted version of Pivotal Cloud Foundry. The video below shows the entire process of pushing the app to PWS:

Here is also a video demonstrating the app in action:

Summary

We hope you found this article helpful. The example is simple but serves well as a skeleton project that can be easily extended. We would love to see people using our buildpack to build real use cases. For example, someone can build a recommender system with Spark now and easily deploy it within seconds. If you build anything using our buildpack, please share and let us know!

 

Learn More