In part one we explained how to create a training model. In this part we show how to make predictions to show which machines in our dataset should be taken out of service for maintenance.

First, here is how to submit the job to Spark with spark-submit:

  • jar file that contains
  • what file to read, i.e., the one you just generated in Part I (put link to previous article)
  • which model to use i.e., the one you just generated in Part I (put link to previous article)

--master local[*] hdfs://localhost:9000/maintenance/lr-assembly-1.0.jar


Here are the usual imports and package name.


import org.apache.spark.sql.SQLContext
import{BinaryLogisticRegressionSummary, LogisticRegression}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import{VectorAssembler, StringIndexer}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkContext

Read the command line arguments and set up the Spark and SQL context. Create the object makePrediction so that we can instantiate class from spark-submit.

object makePrediction {

def main(args: Array[String]): Unit = {
 val conf = new SparkConf().setAppName("lr")
 val sc = new SparkContext(conf)
var modelFile = args(1);
var file = args(0);

val sqlContext = new SQLContext(sc)

Use databricks to read the input file and create a org.apache.spark.sql.DataFrame. Later databricks will be useful to save the output to Hadoop as it requires only line of code.

val df ="com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").option("delimiter",";").load(file)

At this point df looks like this:

lifetime|broken| pressureInd|       moistureInd|    temperatureInd|team|  provider|
|17.792044609594086| 0|104.25543343273085| 88.67373165099184|122.75111030066334|  47|Ford F-750|
| 69.39805687060903| 0| 93.34561942201603| 86.62996015487022|  84.9796059428202|  34|Ford F-750|
| 84.53664924532875| 0|110.64579687466193|125.89351825805036| 58.34915688191312|   8|Ford F-750| transforms the features in featureCols into a vector column. We need it in this format to plug into LogisticRegressionModel.transform() which takes the features(lifetime, pressureInd, moistureInd, temperatureInd) and labels (broken which we rename to label for clarity) as a step to predict which machines will fail.

Then we take the broken (1 or 0) label and put that into to contain a single value, the label. fits the data to the model, meaning make the predictions.

You can see what the features vectors look like after we create df3 (see below).

val featureCols =  Array("lifetime", "pressureInd", "moistureInd", "temperatureInd")

val assembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features")

val labelIndexer = new StringIndexer().setInputCol("broken").setOutputCol("label")

val df2 = assembler.transform(df)

val df3 =

Here we show the features column (albeit it is chopped off in the display). If you recall this is like the dense vectors we have been using to plug into to training models."features").show()
|            features|

Load saved model from Hadoop file system. We trained this model in the first program.

val model = LogisticRegressionModel.load(modelFile)

Here we select just the columns that we want (getting rid of features, since it is hard to print and view and redundant too). Then we show only those machines that require maintenance, i.e., those whose predicted value of broken is 1 (true).

val predictions = model.transform(df3)

var df4 = ("team", "provider", "pressureInd", "moistureInd", "temperatureInd", "label", "prediction")

val df5 = df4.filter("prediction=1")

With we see that 4 machines need maintenance:

|team|  provider|       pressureInd|       moistureInd|   temperatureInd|label|prediction|
|  34|Ford F-750| 93.34561942201603| 86.62996015487022| 84.9796059428202|  0.0|       1.0|
|   8|Ford F-750|110.64579687466193|125.89351825805036|58.34915688191312|  0.0|       1.0|
|  83|Ford F-750| 55.77009802003853|   66.832777175712|125.2982705340028|  0.0|       1.0|
|   2|Ford F-750|  84.1763960666348|  82.1342684415311|57.73202884026434|  0.0|       1.0|

Now we see another example of how useful databricks is by legging us write the predictions in a .csv to Hadoop. We save the output in a file whose name is the date. Note also that we import java.util.Date since it is the easiest way to create different date formats.

import java.util.Date
import java.text.SimpleDateFormat
val date = new Date()
var dformat:SimpleDateFormat = new SimpleDateFormat("");
val csvFile = "hdfs://localhost:9000/maintenance/" + dformat.format(date) + ".csv"
df5.write.format("com.databricks.spark.csv").option("header", "true").save(csvFile)


Now we show the data:

hadoop fs -cat hdfs://localhost:9000/maintenance/2018.
34,Ford F-750,93.34561942201603,86.62996015487022,84.9796059428202,0.0,1.0
8,Ford F-750,110.64579687466193,125.89351825805036,58.34915688191312,0.0,1.0
83,Ford F-750,55.77009802003853,66.832777175712,125.2982705340028,0.0,1.0
2,Ford F-750,84.1763960666348,82.1342684415311,57.73202884026434,0.0,1.0

Wikibon: Automate your Big Data pipeline

Learn how data management experts throughout the industry are transforming their Big Data infrastructure for maximum business impact.
Download Now ›
Last updated: 05/25/2018

These postings are my own and do not necessarily represent BMC's position, strategies, or opinion.

See an error or have a suggestion? Please let us know by emailing

About the author

Walker Rowe

Walker Rowe

Walker Rowe is an American freelance tech writer and programmer living in Tunisia. He specializes in big data, analytics, and programming languages. Find him on LinkedIn or Upwork.