Apache Spark: WindowSpec & Window
WindowSpec
is a window specification that defines which rows are included in a window (frame), i.e. the set of rows that are associated with the current row by some relation.
WindowSpec
takes the following when created:
- Partition specification (
Seq[Expression]
) which defines which records are in the same partition. With no partition defined, all records belong to a single partition - Ordering Specification (
Seq[SortOrder]
) which defines how records in a partition are ordered that in turn defines the position of a record in a partition. The ordering could be ascending (ASC
in SQL orasc
in Scala) or descending (DESC
ordesc
). - Frame Specification (
WindowFrame
) which defines the rows to be included in the frame for the current row, based on their relative position to the current row. For example, “the three rows preceding the current row to the current row” describes a frame including the current input row and three rows appearing before the current row.
You use Window object to create a WindowSpec
.
import org.apache.spark.sql.expressions.Window
scala> val byHTokens = Window.partitionBy('token startsWith "h")
byHTokens: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@574985d8
Once the initial version of a WindowSpec
is created, you use the methods to further configure the window specification.
Method | Description |
---|---|
orderBy | orderBy(cols: Column*): WindowSpec orderBy(colName: String, colNames: String*): WindowSpec |
partitionBy | partitionBy(cols: Column*): WindowSpec partitionBy(colName: String, colNames: String*): WindowSpec |
rangeBetween | rangeBetween(start: Column, end: Column): WindowSpec rangeBetween(start: Long, end: Long): WindowSpec |
rowsBetween | rowsBetween(start: Long, end: Long): WindowSpec |
With a window specification fully defined, you use Column.over operator that associates the WindowSpec
with an aggregate or window function.
scala> :type windowSpec
org.apache.spark.sql.expressions.WindowSpec
import org.apache.spark.sql.functions.rank
val c = rank over windowSpe
Window Definitions
PARTITION BY
Clause
PARTITION BY
clause partitions data into groups based on a sequence of expressions. It is more or less equivalent to GROUP BY
clause in standard aggregations. If not provided it will apply operation on all records.
ORDER BY
Clause
ORDER BY
clause is used to order data based on a sequence of expressions. It is required for window functions which depend on the order of rows like LAG
/ LEAD
or FIRST
/ LAST
.
ROWS BETWEEN
and RANGE BETWEEN
Clauses
ROWS BETWEEN
/ RANGE BETWEEN
clauses defined respectively number of rows and range of rows to be included in a single window frame. Each frame definitions contains two parts:
- window frame preceding (
UNBOUNDED PRECEDING
,CURRENT ROW
, value) - window frame following (
UNBOUNDED FOLLOWING
,CURRENT ROW
, value)
In raw SQL both values should be positive numbers:
OVER (ORDER BY ... ROWS BETWEEN 1 AND 5) -- include one preceding and 5 following rows
In DataFrame
DSL values should be negative for preceding and positive for following range:
Window.orderBy($"foo").rangeBetween(-10.0, 15.0) // Take rows where `foo` is between current - 10.0 and current + 15.0.
For unbounded windows one should use -sys.maxsize
/ sys.maxsize
and Long.MinValue
/ Long.MaxValue
in Python and Scala respectively.
Default frame specification depends on other aspects of a given window defintion:
- if the
ORDER BY
clause is specified and the function accepts the frame specification, then the frame specification is defined byRANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
, - otherwise the frame specification is defined by
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
.
The first rules has some interesting consequences. For last("foo").over(Window.orderBy($"foo"))
will always return the current foo
.
It is also important to note that right now performance impact of unbounded frame definition is asymmetric with UNBOUNDED FOLLOWING
being significantly more expensive than UNBOUNDED PRECEDING
. See SPARK-8816.
Example Usage
Select the row with maximum value
per group
:
from pyspark.sql.functions import row_number from pyspark.sql.window import Window w = Window().partitionBy("group").orderBy(col("value").desc()) (df .withColumn("rn", row_number().over(w)) .where(col("rn") == 1))
Select rows with value
larger than an average for group
import org.apache.spark.sql.avg import org.apache.spark.sql.expressions.Window val w = Window.partitionBy($"group") df.withColumn("group_avg", avg($"value").over(w)).where($"value" > $"group_avg")
Compute sliding average of the value
with window [-3, +3] rows per group
ordered by date
w <- window.partitionBy(df$group) %>% orderBy(df$date) %>% rowsBetween(-3, 3) df %>% withColumn("sliding_mean", over(avg(df$value), w))
Requirements and Performance Considerations
In Spark < 2.0.0 window functions are supported only with HiveContext
. Since Spark 2.0.0 Spark provides native window functions implementation independent of Hive.
As a rule of thumb window functions should always contain PARTITION BY
clause. Without it all data will be moved to a single partition:
val df = sc.parallelize((1 to 100).map(x => (x, x)), 10).toDF(“id”, “x”)
val w = Window.orderBy($”x”)
df.rdd.glom.map(_.size).collect
// Array[Int] = Array(10, 10, 10, 10, 10, 10, 10, 10, 10, 10)
df.withColumn(“foo”, lag(“x”, 1).over(w)).rdd.glom.map(_.size).collect
// Array[Int] = Array(100)