A while ago I was asked to solve a simple exercise to get me through the first step of a job application (which it did). I had had some experience with PySpark 1.6 but had never played around with PySpark 2.0 so I figured I would just give it a try. Since there are not that many simple (and complete) examples of ML with PySpark 2.0 I decided to share my python notebook as well as some personal comments on the whole process. The point of the exercise was to create a prediction model that was able to, given only three simple variables, predict the price that a house would be sold. The variables were the lease duration, the property type, and whether or not the property was in London. The dataset can be found here. Obviously, getting a model to perform well on only these three variables was not the point of the exercise. Indeed, a much more exciting task was to understand the data and explain why would you use and evaluate your model on one assumption or another. The full notebook can be found here. Do have a look and let me know if you disagree with something. Anyway, let’s get back to the point of this post and talk about PySpark 2.0.
Just out of the bat there were some differences on initiating Spark. While in 1.6 you would use contexts, in 2.0 you start off by defining a Spark Session:
from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("Example").getOrCreate()
The second cool thing was opening CSV’s. In 1.6 you would either have to read and parse the file by hand or use external libraries such as Spark-csv. Spark 2.0 lets you do this with one simple command:
dataset = spark.read.csv("pp-complete.csv", header=False) # we could infer schema but let's to that by hand # let's define and select our variables dataset = dataset.select(col("_c2").cast(DateType()).alias("date"),col("_c1").cast(IntegerType()).alias("price"), col("_c4").alias("type"), col("_c6").alias("duration"), col("_c13").alias("isLondon")) # let's also create a new column as a transformation of an old one isLondonFunc = udf(lambda s: "LONDON" in s, StringType()) dataset = dataset.withColumn("isLondon", isLondonFunc(dataset.isLondon))
The next step would be to do EDA and get a feeling of how our data looks, I’ll leave that aside for now but feel free to look at the notebook for more details on that. After we have our data (now renamed filteredLast12 – to have only the last 12 months of 2015) ready we can start modeling! With 2.0 pipelines became easier to use so let’s give them a go. First we need to index the categories of our variables so that we can use them with Spark’s ML models and then we can join them together in a feature vector:
from pyspark.ml.feature import StringIndexer from pyspark.ml import Pipeline from pyspark.ml.feature import VectorAssembler from pyspark.ml.regression import DecisionTreeRegressor from pyspark.ml.evaluation import RegressionEvaluator # we need to convert categorical variables to numeric ones first isLondonIndexer = StringIndexer(inputCol="isLondon", outputCol="isLondonI") durationIndexer = StringIndexer(inputCol="duration", outputCol="durationI") typeIndexer = StringIndexer(inputCol="type", outputCol="typeI") # then we convert these into a feature vector assembler = VectorAssembler(inputCols=["isLondonI", "durationI", "typeI"],outputCol="features") # and we pick a simple model to test out Regressor = DecisionTreeRegressor(featuresCol="features",labelCol="price") pipeline = Pipeline(stages=[isLondonIndexer,durationIndexer,typeIndexer,assembler,Regressor]) model = pipeline.fit(filteredLast12)
Once we have our model trained we can use this pipeline to transform our test data and predict its outcome in one go and use the “new” evaluators to score the error of our predictions:
predictions = model.transform(testing) modelEvaluator = RegressionEvaluator(labelCol="price") modelError = modelEvaluator.evaluate(predictions) #rmse by default modelError = modelEvaluator.evaluate(predictions,{modelEvaluator.metricName: "mae"})
It is usually a good idea to compare your model against a baseline, specially when using something like RMSE. To do this you can simply add a new column to your prediction and pass this new column as the prediction column on your new evaluator using the “predictionCol” parameter. You can find more about this in the notebook linked above.
I hope you have found this little tutorial helpful! I would have loved to add some grid search and cross validation snippets to the code but I’ll leave that for another day :).
Have fun!