Machine Learning: Logistic Regression using Apache Spark
In this blog post, I’ll help you get started using Apache Spark’s spark.ml Logistic Regression for predicting whether or not someone makes more or less than $50,000.
Classification
Classification is a family of supervised machine learning algorithms that identify which category an item belongs to, based on labeled examples of known items. Classification takes a set of data with known labels and pre-determined features and learns how to label new records based on that information. Features are the “if questions” that you ask. The label is the answer to those questions.
Logistic Regression
Logistic regression is a popular method to predict a binary response. It is a special case of Generalized Linear models that predicts the probability of the outcome. Logistic regression measures the relationship between the Y “Label” and the X “Features” by estimating probabilities using a logistic function. The model predicts a probability which is used to predict the label class.
SparkML:
Spark ML provides a uniform set of high-level APIs built on top of DataFrames. The main concepts in Spark ML are:
- DataFrame: The ML API uses DataFrames from Spark SQL as an ML dataset.
- Transformer: A Transformer is an algorithm which transforms one DataFrame into another DataFrame. For example, turning a DataFrame with features into a DataFrame with predictions.
- Estimator: An Estimator is an algorithm which can be fit on a DataFrame to produce a Transformer. For example, training/tuning on a DataFrame and producing a model.
- Pipeline: A Pipeline chains multiple Transformers and Estimators together to specify a ML workflow.
- ParamMaps: Parameters to choose from, sometimes called a “parameter grid” to search over.
- Evaluator: Metric to measure how well a fitted Model does on held-out test data.
- CrossValidator: Identifies the best ParamMap and re-fits the Estimator using the best ParamMap and the entire dataset.
Background:
Before going further, we need to take a little detour. Most of the ML algorithms operates on numerical data. So we need to do things like convert a column for gender that has Male / Female / Unknown to binary values. We do this by creating two columns.
- The first column will be used to indicate whether the person is or is not male. A one means that the person is male and a zero indicates that the person is not male. StringIndexer will be used to convert categories to numerical values.
- The second column will be used to indicate whether the person is or is not female. A one means that the person is female and a zero indicates that the person is not female.
Notice that we don’t need a third column for Unknown. If there is a zero in both the Male and Female columns, we know that the gender is Unknown. This process of taking a single category column and decomposing it to many columns of binary values will be accomplished by the OneHotEncoderEstimator.
StringIndexer
A StringIndexer converts categories to numbers. The numbers have a range from 0 to number of categories minus one. The most frequent category gets a number of zero, the second most frequent category gets a number of 1 and so on.
//preparing data val sampleData: Seq[(Int, String)] = Seq((0, "a"), (1, "b"), (2, "b"), (3, "c"), (4, "c"), (5, "c"), (6, "d"), (7, "d"), (8, "d"), (9, "d")) val dataFrame = spark.createDataFrame(sampleData).toDF("id", "category") dataFrame.show()
+---+--------+ | id|category| +---+--------+ | 0| a| | 1| b| | 2| b| | 3| c| | 4| c| | 5| c| | 6| d| | 7| d| | 8| d| | 9| d| +---+--------+
//applying StringIndexer import org.apache.spark.ml.feature.StringIndexer val stringIndexer = new StringIndexer().setInputCol("category").setOutputCol("categoryIndex") val modelStringIndexer = stringIndexer.fit(dataFrame) val transformedDataFrame = modelStringIndexer.transform(dataFrame) transformedDataFrame.collect() transformedDataFrame.select("category", "categoryIndex") .distinct() .orderBy("categoryIndex") .show()
+--------+-------------+ |category|categoryIndex| +--------+-------------+ | d| 0.0| | c| 1.0| | b| 2.0| | a| 3.0| +--------+-------------+
OneHotEncoderEstimator
One-hot encoding maps a categorical feature, represented as a label index, to a binary vector with at most a single one-value indicating the presence of a specific feature value from among the set of all feature values. This encoding allows algorithms which expect continuous features, such as Logistic Regression, to use categorical features. For string type input data, it is common to encode categorical features using StringIndexer first
val encoder = new OneHotEncoderEstimator() .setInputCols(Array("categoryIndex")) .setOutputCols(Array("categoryIndexVec")) val model = encoder.fit(transformedDataFrame) val encoded = model.transform(transformedDataFrame) encoded.show()
+---+--------+-------------+----------------+ | id|category|categoryIndex|categoryIndexVec| +---+--------+-------------+----------------+ | 0| a| 3.0| (3,[],[])| | 1| b| 2.0| (3,[2],[1.0])| | 2| b| 2.0| (3,[2],[1.0])| | 3| c| 1.0| (3,[1],[1.0])| | 4| c| 1.0| (3,[1],[1.0])| | 5| c| 1.0| (3,[1],[1.0])| | 6| d| 0.0| (3,[0],[1.0])| | 7| d| 0.0| (3,[0],[1.0])| | 8| d| 0.0| (3,[0],[1.0])| | 9| d| 0.0| (3,[0],[1.0])| +---+--------+-------------+----------------+
SparseVector
new SparseVector(size: Int, indices: Array[Int], values: Array[Double])
val sparse= new SparseVector(4, Array(1, 3), Array(3.0, 4.0)) (spark 1.0.0) val sparse1 = Vectors.sparse(4, Array(1, 3), Array(3.0, 4.0)) (spark 2.0.0)
size: size of the vector. indices: index array, assume to be strictly increasing. values: value array, must have the same length as the index array.
VectorAssembler
VectorAssembler
is a transformer that combines a given list of columns into a single vector column. It is useful for combining raw features and features generated by different feature transformers into a single feature vector, in order to train ML models like logistic regression and decision trees. VectorAssembler
accepts the following input column types: all numeric types, boolean type, and vector type. In each row, the values of the input columns will be concatenated into a vector in the specified order.
import org.apache.spark.ml.feature.VectorAssembler import org.apache.spark.ml.linalg.Vectors val dataset = spark.createDataFrame( Seq((0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0)) ).toDF("id", "hour", "mobile", "userFeatures", "clicked") val assembler = new VectorAssembler() .setInputCols(Array("hour", "mobile", "userFeatures")) .setOutputCol("features") val output = assembler.transform(dataset) println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'") output.select("features", "clicked").show(false)
Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features' +-----------------------+-------+ |features |clicked| +-----------------------+-------+ |[18.0,1.0,0.0,10.0,0.5]|1.0 | +-----------------------+-------+
The Adult dataset we are going to use is publicly available at the UCI Machine Learning Repository. This data derives from census data, and consists of information about 48842 individuals and their annual income. We will use this information to predict if an individual earns >50k a year or <=50K a year. The dataset is rather clean, and consists of both numeric and categorical variables.
Attribute Information:
-
- age: continuous
- workclass: Private,Self-emp-not-inc, Self-emp-inc, Federal-gov, Local-gov, State-gov, Without-pay, Never-worked
- fnlwgt: continuous
- education: Bachelors, Some-college, 11th, HS-grad, Prof-school, Assoc-acdm, Assoc-voc…
- education-num: continuous
- marital-status: Married-civ-spouse, Divorced, Never-married, Separated, Widowed, Married-spouse-absent…
- occupation: Tech-support, Craft-repair, Other-service, Sales, Exec-managerial, Prof-specialty, Handlers-cleaners…
- relationship: Wife, Own-child, Husband, Not-in-family, Other-relative, Unmarried
- race: White, Asian-Pac-Islander, Amer-Indian-Eskimo, Other, Black
- sex: Female, Male
- capital-gain: continuous
- capital-loss: continuous
- hours-per-week: continuous
- native-country: United-States, Cambodia, England, Puerto-Rico, Canada, Germany…
Target/Label: – <=50K, >50K
Load Data:
def load(filePath: String): Dataset[Row] = { val columns = Seq("age", "workclass", "fnlwgt", "education", "education_num", "marital_status", "occupation", "relationship", "race", "sex", "capital_gain", "capital_loss", "hours_per_week", "native_country", "income") spark.read.format("csv"). option("inferSchema", true). load(filePath). toDF(columns: _*) }
+---+-----------------+--------+----------+-------------+----------------------+------------------+--------------+------+-------+------------+------------+--------------+--------------+------+ |age|workclass |fnlwgt |education |education_num|marital_status |occupation |relationship |race |sex |capital_gain|capital_loss|hours_per_week|native_country|income| +---+-----------------+--------+----------+-------------+----------------------+------------------+--------------+------+-------+------------+------------+--------------+--------------+------+ |39 | State-gov |77516.0 | Bachelors|13.0 | Never-married | Adm-clerical | Not-in-family| White| Male |2174.0 |0.0 |40.0 | United-States| <=50K| |50 | Self-emp-not-inc|83311.0 | Bachelors|13.0 | Married-civ-spouse | Exec-managerial | Husband | White| Male |0.0 |0.0 |13.0 | United-States| <=50K| |38 | Private |215646.0| HS-grad |9.0 | Divorced | Handlers-cleaners| Not-in-family| White| Male |0.0 |0.0 |40.0 | United-States| <=50K| |53 | Private |234721.0| 11th |7.0 | Married-civ-spouse | Handlers-cleaners| Husband | Black| Male |0.0 |0.0 |40.0 | United-States| <=50K| |28 | Private |338409.0| Bachelors|13.0 | Married-civ-spouse | Prof-specialty | Wife | Black| Female|0.0 |0.0 |40.0 | Cuba | <=50K| |37 | Private |284582.0| Masters |14.0 | Married-civ-spouse | Exec-managerial | Wife | White| Female|0.0 |0.0 |40.0 | United-States| <=50K| |49 | Private |160187.0| 9th |5.0 | Married-spouse-absent| Other-service | Not-in-family| Black| Female|0.0 |0.0 |16.0 | Jamaica | <=50K| |52 | Self-emp-not-inc|209642.0| HS-grad |9.0 | Married-civ-spouse | Exec-managerial | Husband | White| Male |0.0 |0.0 |45.0 | United-States| >50K | |31 | Private |45781.0 | Masters |14.0 | Never-married | Prof-specialty | Not-in-family| White| Female|14084.0 |0.0 |50.0 | United-States| >50K | |42 | Private |159449.0| Bachelors|13.0 | Married-civ-spouse | Exec-managerial | Husband | White| Male |5178.0 |0.0 |40.0 | United-States| >50K | +---+-----------------+--------+----------+-------------+----------------------+------------------+--------------+------+-------+------------+------------+--------------+--------------+------+ only showing top 10 rows root |-- age: integer (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: double (nullable = true) |-- education: string (nullable = true) |-- education_num: double (nullable = true) |-- marital_status: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: double (nullable = true) |-- capital_loss: double (nullable = true) |-- hours_per_week: double (nullable = true) |-- native_country: string (nullable = true) |-- income: string (nullable = true)
Preprocess Data
Since we are going to try algorithms like Logistic Regression, we will have to convert the categorical variables in the dataset into numeric variables. There are 2 ways we can do this.
- Category Indexing
- One-Hot Encoding
Here, we will use a combination of StringIndexer and OneHotEncoderEstimator to convert the categorical variables. The OneHotEncoderEstimator
will return a SparseVector.
import org.apache.spark.ml.feature.StringIndexer val categoricalColumns = Seq("workclass", "education", "marital_status", "occupation","relationship", "race", "sex", "native_country") val indexers = categoricalColumns .map(colName => new StringIndexer(). setInputCol(colName). setOutputCol(colName + "_indexed")) .toArray //We use the StringIndexer again to encode our labels to label indices. val labelStringIdx = new StringIndexer(). setInputCol("income"). setOutputCol("label") val AllIndexer = indexers.toList ++ List(labelStringIdx)
//preparing one hot encoder import org.apache.spark.ml.feature.OneHotEncoderEstimator val encoder = new OneHotEncoderEstimator() .setInputCols(categoricalColumns.map(_ + "_indexed").toArray) .setOutputCols(categoricalColumns.map(name => s"${name}_vec").toArray)
Next, we will use the VectorAssembler
to combine all the feature columns into a single vector column. This will include both the numeric columns and the one-hot encoded binary vector columns in our dataset.
val numericCols = Array("age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week") val combined = categoricalColumns.map(name => s"${name}_vec") ++ numericCols val vectorAssembler = new VectorAssembler(). setInputCols(combined.toArray). setOutputCol("features")
Train model and then make predictions
Code:
import org.apache.spark.ml.Pipeline import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.feature.{OneHotEncoderEstimator, StringIndexer, VectorAssembler} import org.apache.spark.sql.{Dataset, Row, SparkSession} object LogisticRegression { val spark: SparkSession = SparkSession.builder().master("local[*]").appName("ML project").getOrCreate() spark.sparkContext.setLogLevel("ERROR") def load(filePath: String): Dataset[Row] = { val columns = Seq("age", "workclass", "fnlwgt", "education", "education_num", "marital_status", "occupation", "relationship", "race", "sex", "capital_gain", "capital_loss", "hours_per_week", "native_country", "income") spark.read. format("csv"). option("inferSchema", true). load(filePath). toDF(columns: _*) } def main(args: Array[String]): Unit = { val fileName = "adult.data.txt" val filePath = getClass.getResource(s"/$fileName").getPath val adultIncomeDs = load(filePath).na.drop() //adultIncomeDs.show(10, false) //adultIncomeDs.printSchema() import spark.implicits._ val categoricalColumns = Seq("workclass", "education", "marital_status", "occupation", "relationship", "race", "sex", "native_country") val indexers = categoricalColumns. map(colName => new StringIndexer(). setInputCol(colName). setOutputCol(colName + "_indexed")) .toArray val labelStringIdx = new StringIndexer(). setInputCol("income"). setOutputCol("label") val AllIndexer = indexers.toList ++ List(labelStringIdx) // OneHotEncoderEstimator:: OneHotEncoderEstimator to convert the categorical variables. The OneHotEncoderEstimator will return a SparseVector. val encoder = new OneHotEncoderEstimator(). setInputCols(categoricalColumns.map(_ + "_indexed").toArray) .setOutputCols(categoricalColumns.map(name => s"${name}_vec").toArray) val numericCols = Array("age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week") val combined = categoricalColumns.map(name => s"${name}_vec") ++ numericCols val vectorAssembler = new VectorAssembler(). setInputCols(combined.toArray). setOutputCol("features") val lr = new LogisticRegression() lr.setFeaturesCol("features") lr.setLabelCol("label") lr.setMaxIter(10) val allStage = AllIndexer ++ List(encoder, vectorAssembler, lr) val pipeline = new Pipeline().setStages(allStage.toArray) val Array(trainingData, testData) = adultIncomeDs.randomSplit(Array(0.7, 0.3), seed = 100) val model = pipeline.fit(trainingData) val predictions = model.transform(testData) predictions.printSchema() val results = predictions.select("label", "prediction", "probability", "age", "occupation") results.show(10, false) results.select("label").distinct().show() val evaluator = new BinaryClassificationEvaluator().setRawPredictionCol("rawPrediction") val accuracy = evaluator.evaluate(predictions) println(s"Model accuracy : $accuracy") // For Metrics and Evaluation import org.apache.spark.mllib.evaluation.MulticlassMetrics // Need to convert to RDD to use this val predictionAndLabels = results.select($"prediction", $"label").as[(Double, Double)].rdd // Instantiate metrics object val metrics = new MulticlassMetrics(predictionAndLabels) // Confusion matrix println("Confusion matrix:") println(metrics.confusionMatrix) println(metrics.accuracy) } }
Dataset Review: