Missing Imputation in scala
Imputation:
In statistics, imputation is the process of replacing missing data with substituted values. When substituting for a data point, it is known as “unit imputation”; when substituting for a component of a data point, it is known as “item imputation”.
There are many ways to approach missing data. The most common, I believe, is to ignore it. But making no choice means that your statistical software is choosing for you. Most of the time, your software is choosing listwise deletion. Listwise deletion which may or may not be a bad choice, depending on why and how much data are missing.
Another common approach among those who are paying attention is imputation. Imputation simply means replacing the missing values with an estimate, then analyzing the full data set as if the imputed values were actual observed values.
How do you choose that estimate? The following are common methods:
Mean imputation
Simply calculate the mean of the observed values for that variable for all individuals who are non-missing.
It has the advantage of keeping the same mean and the same sample size, but many, many disadvantages. Pretty much every method listed below is better than mean imputation.
Missing imputation algorithm
- Read the data
- Get all columns name and the type of columns
- Replace all missing value(NA, N.A., N.A//,” ”) by null
- Set Boolean value for each column whether it contains null value or not. True for those columns which contains null otherwise false
- If column type is string then find the most frequent word of that column Else: calculate avg of that column
- Impute most frequent word for those column which is string type Else impute average for number
Implementation:
Read the data:
File format is csv; so we have to use databricks csv (spark-csv) format parser. This returns a dataframe.
val df = sqlContext.read .format("com.databricks.spark.csv") .option("header", "true") .option("inferSchema", "true") .load("/user/test/test.csv")
Get all column names and the type: dataframe.dtypes provides the name and the type of the columns
val dataTypeList = df.dtypes.toList
Replace all missing value (NA, N.A., N.A//,” ”) by null: created blankAsNull function to check whether particular column contains missing value or not. If contains missing value it will replace missing value by null.
def blankAsNull(x: String): Column = { return when(col(x) === "N.A//" ||col(x) === "N.A.//" || col(x) === "N.A." || col(x) === "NA"|| col(x) === "", null) .otherwise(col(x)) }
Set boolean to identify missing column: Check column whether it has null value or not then set boolean value to handle this created the countNull function
def countNull(c:String): Column = { return sum(col(c).isNull.cast("integer")).alias(c) }
Finding most frequent word: groupBy the dataframe by column name, apply count, do sorting (descending) after that take first element.
//finding all string column and frequentWord\ val stringColumnList = dataTypeList.filter(x => x._2.startsWith("Str") ).map(x=>x._1) val frequencyList = stringColumnList.map(x =>df.groupBy(x).count().sort(desc("count")).take(1)(0).get(0).asInstanceOf[String] )
Calculate average: Just calculate avg(column_name )
//finding number columns and calculating avg val numberColumnList = dataTypeList.filter(x => x._2.startsWith("Integer") || x._2.startsWith("Long")).map(x=>x._1) val numbersAVGExpr = numberColumnList.map(x => calcAvg(x)) val numberAVGDF=nullifiedDF.select(numbersAVGExpr: _*)
Impute the data: impute most frequent word for the string columns and avg for number column
val improvedDF=nullifiedDF.na.fill(stringMap) val imputed=improvedDF.na.fill(numberMap)
code:
import org.apache.spark.{ SparkConf, SparkContext } import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.log4j.{ Logger, Level } object MissingImputation { def main(args: Array[String]) { var logger = Logger.getLogger(this.getClass()) val conf = new SparkConf().setAppName("Missing Data Imputation").setMaster("local[*]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val rootLogger = Logger.getRootLogger() rootLogger.setLevel(Level.ERROR) import sqlContext.implicits._ val df = sqlContext.read .format("com.databricks.spark.csv") .option("header", "true") .option("inferSchema", "true") .load("resources/sample.csv") val dataTypeList = df.dtypes.toList println(dataTypeList) //utility dataTypeList.foreach(x => if (x._2.startsWith("Str")) println(x._1)) //finding all string column and frequentWord val stringColumnList = dataTypeList.filter(x => x._2.startsWith("Str")) val frequencyList = stringColumnList.map(x => df.groupBy(x._1).count().sort(desc("count")).take(5).filter(x => !("".equals(x(0)) || "N.A.".equals(x(0)) || "N.A.//".equals(x(0))))(0).get(0).asInstanceOf[String]) val result = frequencyList.map(_.toString) //finding int column and calculating avg val numberColumnList = dataTypeList.filter(x => x._2.startsWith("Integer") || x._2.startsWith("Long")) val numbersAVG = numberColumnList.map(x => df.select(avg(x._1)).first) numberColumnList.foreach(println(_)) df.groupBy($"department").avg().foreach(println(_)) val nullifiedDF = df.withColumn("uname", blankAsNull("uname")) val replaced = nullifiedDF.na.fill(result(0), Seq("uname")) replaced.printSchema() replaced.take(20).foreach { x => println(x.toString()) } // val x=if(stringColumnList.contains(x)) blankAsNull(x._1).alias(x._1) else for(x <- df.columns) x) val test= df.dtypes.map(x=> if(stringColumnList.contains(x)) blankAsNull(x._1).alias(x._1) else for(x <- df.columns) x) test.take(20).foreach { x => println(x.toString()) } } def blankAsNull(x: String): Column = { return when(col(x) === "N.A//" || col(x) === "N.A." || col(x) === "NA"|| col(x) === "", null) .otherwise(col(x)) } def createMap(res: String): Map[String, String] = { val userList = List("N.A.", "N.A.//", "", " ") val resultList = userList.map { x => res } return userList.zip(resultList).toMap } }