image_pdfimage_print

In part one we explained how to create a training model. In this part we show how to make predictions to show which machines in our dataset should be taken out of service for maintenance.

First, here is how to submit the job to Spark with spark-submit:

  • jar file that contains com.bmc.lr.makePrediction
  • what file to read, i.e., the one you just generated in Part I (put link to previous article)
  • which model to use i.e., the one you just generated in Part I (put link to previous article)

spark-submit
--class com.bmc.lr.makePrediction
--master local[*] hdfs://localhost:9000/maintenance/lr-assembly-1.0.jar
hdfs://localhost:9000/maintenance/2018.04.20.15.48.54.csv
hdfs://localhost:9000/maintenance/maintenance_model

Code

Here are the usual imports and package name.

package com.bmc.lr

import org.apache.spark.sql.SQLContext
import org.apache.spark.ml.classification.{BinaryLogisticRegressionSummary, LogisticRegression}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.{VectorAssembler, StringIndexer}
import org.apache.spark.ml.classification.LogisticRegressionModel
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkContext

Read the command line arguments and set up the Spark and SQL context. Create the object makePrediction so that we can instantiate class com.bmc.lr.makePrediction from spark-submit.

object makePrediction {

def main(args: Array[String]): Unit = {
 val conf = new SparkConf().setAppName("lr")
 val sc = new SparkContext(conf)
var modelFile = args(1);
var file = args(0);

val sqlContext = new SQLContext(sc)

Use databricks to read the input file and create a org.apache.spark.sql.DataFrame. Later databricks will be useful to save the output to Hadoop as it requires only line of code.

val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").option("delimiter",";").load(file)

At this point df looks like this:

lifetime|broken| pressureInd|       moistureInd|    temperatureInd|team|  provider|
+------------------+------+------------------+------------------+--------------
|17.792044609594086| 0|104.25543343273085| 88.67373165099184|122.75111030066334|  47|Ford F-750|
| 69.39805687060903| 0| 93.34561942201603| 86.62996015487022|  84.9796059428202|  34|Ford F-750|
| 84.53664924532875| 0|110.64579687466193|125.89351825805036| 58.34915688191312|   8|Ford F-750|

org.apache.spark.ml.featureVectorAssembler transforms the features in featureCols into a vector column. We need it in this format to plug into LogisticRegressionModel.transform() which takes the features(lifetime, pressureInd, moistureInd, temperatureInd) and labels (broken which we rename to label for clarity) as a step to predict which machines will fail.

Then we take the broken (1 or 0) label and put that into org.apache.spark.ml.feature.StringIndexer to contain a single value, the label. org.apache.spark.ml.feature.StringIndexer.fit() fits the data to the model, meaning make the predictions.

You can see what the features vectors look like after we create df3 (see below).

val featureCols =  Array("lifetime", "pressureInd", "moistureInd", "temperatureInd")

val assembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features")

val labelIndexer = new StringIndexer().setInputCol("broken").setOutputCol("label")

val df2 = assembler.transform(df)

val df3 = labelIndexer.fit(df2).transform(df2)

Here we show the features column (albeit it is chopped off in the display). If you recall this is like the dense vectors we have been using to plug into to training models.

df3.select("features").show()
+--------------------+
|            features|
+--------------------+
|[17.7920446095940...|
|[69.3980568706090...|
|[84.5366492453287...|
|[71.6002965259175...|
|[46.8176995523900...|

Load saved org.apache.spark.ml.classification.LogisticRegressionModel model from Hadoop file system. We trained this model in the first program.

val model = LogisticRegressionModel.load(modelFile)

Here we select just the columns that we want (getting rid of features, since it is hard to print and view and redundant too). Then we show only those machines that require maintenance, i.e., those whose predicted value of broken is 1 (true).

val predictions = model.transform(df3)

var df4 = predictions.select ("team", "provider", "pressureInd", "moistureInd", "temperatureInd", "label", "prediction")

val df5 = df4.filter("prediction=1")

df5.show()

With df5.show() we see that 4 machines need maintenance:

+----+----------+------------------+------------------+-----------------+-----+----------+
|team|  provider|       pressureInd|       moistureInd|   temperatureInd|label|prediction|
+----+----------+------------------+------------------+-----------------+-----+----------+
|  34|Ford F-750| 93.34561942201603| 86.62996015487022| 84.9796059428202|  0.0|       1.0|
|   8|Ford F-750|110.64579687466193|125.89351825805036|58.34915688191312|  0.0|       1.0|
|  83|Ford F-750| 55.77009802003853|   66.832777175712|125.2982705340028|  0.0|       1.0|
|   2|Ford F-750|  84.1763960666348|  82.1342684415311|57.73202884026434|  0.0|       1.0|
+----+----------+------------------+------------------+-----------------+-----+----------+

Now we see another example of how useful databricks is by legging us write the predictions in a .csv to Hadoop. We save the output in a file whose name is the date. Note also that we import java.util.Date since it is the easiest way to create different date formats.

import java.util.Date
import java.text.SimpleDateFormat
val date = new Date()
var dformat:SimpleDateFormat = new SimpleDateFormat("yyyy.MM.dd.HH.mm.ss");
val csvFile = "hdfs://localhost:9000/maintenance/" + dformat.format(date) + ".csv"
df5.write.format("com.databricks.spark.csv").option("header", "true").save(csvFile)

}
}

Now we show the data:

hadoop fs -cat hdfs://localhost:9000/maintenance/2018.05.16.09.07.33.csv/part-00000-7a9a4de6-ab90-4cd4-b9f5-d0d30c881b2c-c000.csv
team,provider,pressureInd,moistureInd,temperatureInd,label,prediction
34,Ford F-750,93.34561942201603,86.62996015487022,84.9796059428202,0.0,1.0
8,Ford F-750,110.64579687466193,125.89351825805036,58.34915688191312,0.0,1.0
83,Ford F-750,55.77009802003853,66.832777175712,125.2982705340028,0.0,1.0
2,Ford F-750,84.1763960666348,82.1342684415311,57.73202884026434,0.0,1.0

Automate big data workflows to simplify and accelerate your big data lifecycle

In this e-book, you’ll learn how you can automate your entire big data lifecycle from end to end—and cloud to cloud—to deliver insights more quickly, easily, and reliably.
Read the e-book ›
Last updated: 05/25/2018

These postings are my own and do not necessarily represent BMC's position, strategies, or opinion.

See an error or have a suggestion? Please let us know by emailing blogs@bmc.com.

About the author

Walker Rowe

Walker Rowe

Walker Rowe is a freelance tech writer and programmer. He specializes in big data, analytics, and programming languages. Find him on LinkedIn or Upwork.