Apache Spark: WindowSpec & Window
WindowSpec is a window specification that defines which rows are included in a window (frame), i.e. the set of rows that are associated with the current row by some relation.
WindowSpec takes the following when created:
- Partition specification (
Seq[Expression]) which defines which records are in the same partition. With no partition defined, all records belong to a single partition - Ordering Specification (
Seq[SortOrder]) which defines how records in a partition are ordered that in turn defines the position of a record in a partition. The ordering could be ascending (ASCin SQL orascin Scala) or descending (DESCordesc). - Frame Specification (
WindowFrame) which defines the rows to be included in the frame for the current row, based on their relative position to the current row. For example, “the three rows preceding the current row to the current row” describes a frame including the current input row and three rows appearing before the current row.
You use Window object to create a WindowSpec.
import org.apache.spark.sql.expressions.Window
scala> val byHTokens = Window.partitionBy('token startsWith "h")
byHTokens: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@574985d8
Once the initial version of a WindowSpec is created, you use the methods to further configure the window specification.
| Method | Description |
|---|---|
orderBy | orderBy(cols: Column*): WindowSpec orderBy(colName: String, colNames: String*): WindowSpec |
partitionBy | partitionBy(cols: Column*): WindowSpecpartitionBy(colName: String, colNames: String*): WindowSpec |
rangeBetween | rangeBetween(start: Column, end: Column): WindowSpecrangeBetween(start: Long, end: Long): WindowSpec |
rowsBetween | rowsBetween(start: Long, end: Long): WindowSpec |
With a window specification fully defined, you use Column.over operator that associates the WindowSpec with an aggregate or window function.
scala> :type windowSpec
org.apache.spark.sql.expressions.WindowSpec
import org.apache.spark.sql.functions.rank
val c = rank over windowSpe
Window Definitions
PARTITION BY Clause
PARTITION BY clause partitions data into groups based on a sequence of expressions. It is more or less equivalent to GROUP BY clause in standard aggregations. If not provided it will apply operation on all records.
ORDER BY Clause
ORDER BY clause is used to order data based on a sequence of expressions. It is required for window functions which depend on the order of rows like LAG / LEAD or FIRST / LAST.
ROWS BETWEEN and RANGE BETWEEN Clauses
ROWS BETWEEN / RANGE BETWEEN clauses defined respectively number of rows and range of rows to be included in a single window frame. Each frame definitions contains two parts:
- window frame preceding (
UNBOUNDED PRECEDING,CURRENT ROW, value) - window frame following (
UNBOUNDED FOLLOWING,CURRENT ROW, value)
In raw SQL both values should be positive numbers:
OVER (ORDER BY ... ROWS BETWEEN 1 AND 5) -- include one preceding and 5 following rows
In DataFrame DSL values should be negative for preceding and positive for following range:
Window.orderBy($"foo").rangeBetween(-10.0, 15.0) // Take rows where `foo` is between current - 10.0 and current + 15.0.
For unbounded windows one should use -sys.maxsize / sys.maxsize and Long.MinValue / Long.MaxValue in Python and Scala respectively.
Default frame specification depends on other aspects of a given window defintion:
- if the
ORDER BYclause is specified and the function accepts the frame specification, then the frame specification is defined byRANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, - otherwise the frame specification is defined by
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING.
The first rules has some interesting consequences. For last("foo").over(Window.orderBy($"foo")) will always return the current foo.
It is also important to note that right now performance impact of unbounded frame definition is asymmetric with UNBOUNDED FOLLOWING being significantly more expensive than UNBOUNDED PRECEDING. See SPARK-8816.
Example Usage
Select the row with maximum value per group:
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
w = Window().partitionBy("group").orderBy(col("value").desc())
(df
.withColumn("rn", row_number().over(w))
.where(col("rn") == 1))
Select rows with value larger than an average for group
import org.apache.spark.sql.avg
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy($"group")
df.withColumn("group_avg", avg($"value").over(w)).where($"value" > $"group_avg")
Compute sliding average of the value with window [-3, +3] rows per group ordered by date
w <- window.partitionBy(df$group) %>% orderBy(df$date) %>% rowsBetween(-3, 3)
df %>% withColumn("sliding_mean", over(avg(df$value), w))
Requirements and Performance Considerations
In Spark < 2.0.0 window functions are supported only with HiveContext. Since Spark 2.0.0 Spark provides native window functions implementation independent of Hive.
As a rule of thumb window functions should always contain PARTITION BY clause. Without it all data will be moved to a single partition:
val df = sc.parallelize((1 to 100).map(x => (x, x)), 10).toDF(“id”, “x”)
val w = Window.orderBy($”x”)
df.rdd.glom.map(_.size).collect
// Array[Int] = Array(10, 10, 10, 10, 10, 10, 10, 10, 10, 10)
df.withColumn(“foo”, lag(“x”, 1).over(w)).rdd.glom.map(_.size).collect
// Array[Int] = Array(100)