Part 7: Window Functions

Window functions were introduced to the ISO/ANSI SQL Standard in 2003. U-SQL adopts a subset of window functions as defined by the ANSI SQL Standard.

Window functions are used to do computation within sets of rows called windows. Windows are defined by the OVER clause. Window functions solve some key scenarios in a highly efficient manner.

This tutorial uses two sample datasets to walk you through some sample scenario where you can apply window functions.

The window functions are categorized into:

  • Reporting aggregation functions, such as SUM or AVG
  • Ranking functions, such as DENSE_RANK, ROW_NUMBER, NTILE, and RANK
  • Analytic functions, such as cumulative distribution, percentiles, or accesses data from a previous row in the same result set without the use of a self-join

Sample data

QueryLog sample dataset

QueryLog represents a list of what people searched for in search engine. Each query log includes:

  • Query – What the user was searching for.
  • Latency – How fast the query came back to the user in milliseconds.
  • Vertical – What kind of content the user was interested in (Web links, Images, Videos). Copy and paste the following script into your U-SQL project for constructing the QueryLog rowset:
@querylog = 
    SELECT * FROM ( VALUES
        ("Banana"  , 300, "Image" ),
        ("Cherry"  , 300, "Image" ),
        ("Durian"  , 500, "Image" ),
        ("Apple"   , 100, "Web"   ),
        ("Fig"     , 200, "Web"   ),
        ("Papaya"  , 200, "Web"   ),
        ("Avocado" , 300, "Web"   ),
        ("Cherry"  , 400, "Web"   ),
        ("Durian"  , 500, "Web"   ) )
    AS T(Query,Latency,Vertical);

Employees sample dataset

The Employee dataset includes the following fields:

  • EmpID – Employee ID
  • EmpName Employee name
  • DeptName – Department name
  • DeptID – Deparment ID
  • Salary – Employee salary

Copy and paste the following script into your U-SQL project for constructing the Employees rowset:

@employees = 
    SELECT * FROM ( VALUES
        (1, "Noah",   "Engineering", 100, 10000),
        (2, "Sophia", "Engineering", 100, 20000),
        (3, "Liam",   "Engineering", 100, 30000),
        (4, "Emma",   "HR",          200, 10000),
        (5, "Jacob",  "HR",          200, 10000),
        (6, "Olivia", "HR",          200, 10000),
        (7, "Mason",  "Executive",   300, 50000),
        (8, "Ava",    "Marketing",   400, 15000),
        (9, "Ethan",  "Marketing",   400, 10000) )
    AS T(EmpID, EmpName, DeptName, DeptID, Salary);

Compare window functions to Grouping

Windowing and Grouping are conceptually related by also different. It is helpful to understand this relationship.

Use aggregation and Grouping

The following query uses an aggregation to calculate the total salary for all employees:

@result = 
    SELECT 
        SUM(Salary) AS TotalSalary
    FROM @employees;

The result is a single row with a single column. The $165000 is the sum of of the Salary value from the whole table.

TotalSalary
165000

The following statement use the GROUP BY clause to calculate the total salary for each department:

@result=
    SELECT DeptName, SUM(Salary) AS SalaryByDept
    FROM @employees
    GROUP BY DeptName;

The results are:

DeptNameSalaryByDept
Engineering60000
Executive50000
HR30000
Marketing25000

The sum of the SalaryByDept column is $165000, which matches the amount in the last script. In both these cases the number of there are fewer output rows than input rows: Without GROUP BY, the aggregation collapses all the rows into a single row. With GROUP BY, there are N output rows where N is the number of distinct values that appear in the data, In this case, you will get 4 rows in the output.

Use a window function

The OVER clause in the following sample is empty. This defines the “window” to include all rows. The SUM in this example is applied to the OVER clause that it precedes. You could read this query as: “The sum of Salary over a window of all rows”.

@result=
    SELECT
        EmpName,
        SUM(Salary) OVER( ) AS SalaryAllDepts
    FROM @employees;

Unlike GROUP BY, there are as many output rows as input rows:

EmpNameSalaryAllDepts
Noah165000
Sophia165000
Liam165000
Emma165000
Jacob165000
Olivia165000
Mason165000
Ava165000
Ethan165000

The value of 165000 (the total of all salaries) is placed in each output row. That total comes from the “window” of all rows, so it includes all the salaries. The next example demonstrates how to refine the “window” to list all the employees, the department, and the total salary for the department. PARTITION BY is added to the OVER clause.

@result=
    SELECT
        EmpName, 
        DeptName,
        SUM(Salary) OVER( PARTITION BY DeptName ) AS SalaryByDept
    FROM @employees;

The results are:

EmpNameDeptNameSalaryByDept
NoahEngineering60000
SophiaEngineering60000
LiamEngineering60000
MasonExecutive50000
EmmaHR30000
JacobHR30000
OliviaHR30000
AvaMarketing25000
EthanMarketing25000

Again, there are the same number of input rows as output rows. However each row has a total salary for the corresponding department.

Reporting aggregation functions

Window functions also support the following aggregates:

  • COUNT
  • SUM
  • MIN
  • MAX
  • AVG
  • STDEV
  • VAR
  • STDEVP
  • VARP

The syntax:

<AggregateFunction>( [DISTINCT] <expression>) [<OVER_clause>]

Note:

  • By default, aggregate functions, except COUNT, ignore null values.
  • When aggregate functions are specified along with the OVER clause, the ORDER BY clause is not allowed in the OVER clause.

SUM

The following example adds a total salary by department to each input row:

@result=
    SELECT
        *,
        SUM(Salary) OVER( PARTITION BY DeptName ) AS TotalByDept
    FROM @employees;
EmpIDEmpNameDeptNameDeptIDSalaryTotalByDept
1NoahEngineering1001000060000
2SophiaEngineering1002000060000
3LiamEngineering1003000060000
7MasonExecutive3005000050000
4EmmaHR2001000030000
5JacobHR2001000030000
6OliviaHR2001000030000
8AvaMarketing4001500025000
9EthanMarketing4001000025000

COUNT

The following example adds an extra field to each row to show the total number employees in each department.

@result =
    SELECT 
        *,
        COUNT(*) OVER(PARTITION BY DeptName) AS CountByDept
    FROM @employees;
EmpIDEmpNameDeptNameDeptIDSalaryCountByDept
1NoahEngineering100100003
2SophiaEngineering100200003
3LiamEngineering100300003
7MasonExecutive300500001
4EmmaHR200100003
5JacobHR200100003
6OliviaHR200100003
8AvaMarketing400150002
9EthanMarketing400100002

MIN and MAX

The following example adds an extra field to each row to show the lowest salary of each department:

@result =
    SELECT
        *,
        MIN(Salary) OVER ( PARTITION BY DeptName ) AS MinSalary
    FROM @employees;
EmpIDEmpNameDeptNameDeptIDSalaryMinSalary
1NoahEngineering1001000010000
2SophiaEngineering1002000010000
3LiamEngineering1003000010000
7MasonExecutive3005000050000
4EmmaHR2001000010000
5JacobHR2001000010000
6OliviaHR2001000010000
8AvaMarketing4001500010000
9EthanMarketing4001000010000

Analytic functions

Analytic functions are used to understand the distributions of values in windows. The most common scenario for using analytic functions is the computation of percentiles.

Supported analytic window functions

  • CUME_DIST
  • PERCENT_RANK
  • PERCENTILE_CONT
  • PERCENTILE_DISC

CUME_DIST

CUME_DIST computes the relative position of a specified value in a group of values.

For a column “X”, It calculates the percent of rows that have an X less than or equal to the current X in the same window.

For a row R, assuming ascending ordering, the CUME_DIST of R is the number of rows with values lower than or equal to the value of R, divided by the number of rows evaluated in the partition or query result set. CUME_DIST returns numbers in the range 0 < x <= 1

CUME_DIST()
    OVER (
        [PARTITION BY <identifier, > …[n]]
        ORDER BY >identifier, > …[n] [ASC|DESC]
    ) AS <alias>

The following example uses the CUME_DIST function to compute the latency percentile for each query within a vertical.

@result=
    SELECT
        *,
        CUME_DIST() OVER(PARTITION BY Vertical ORDER BY Latency) AS CumeDist
    FROM @querylog;

The results:

QueryLatencyVerticalCumeDist
Durian500Image1
Banana300Image0.666666666666667
Cherry300Image0.666666666666667
Durian500Web1
Cherry400Web0.833333333333333
Fig300Web0.666666666666667
Fig200Web0.5
Papaya200Web0.5
Apple100Web0.166666666666667
  • There are 6 rows in the partition where partition key is “Web” (4th row and down)
  • There are 6 rows with the value equal or lower than 500, so the CUME_DIST equals to 6/6=1
  • There are 5 rows with the value equal or lower than 400, so the CUME_DIST equals to 5/6=0.83
  • There are 4 rows with the value equal or lower than 300, so the CUME_DIST equals to 4/6=0.66
  • There are 3 rows with the value equal or lower than 200, so the CUME_DIST equals to 3/6=0.5. There are two rows with the same latency value.
  • There is 1 row with the value equal or lower than 100, so the CUME_DIST equals to 1/6=0.16.

Usage notes:

  • Tie values always evaluate to the same cumulative distribution value.
  • NULL values are treated as the lowest possible values.
  • You must specify the ORDER BY clause to calculate CUME_DIST.
  • CUME_DIST is similar to the PERCENT_RANK function
  • Note: The ORDER BY clause is not allowed if the SELECT statement is not followed by OUTPUT. Thus ORDER BY clause in the OUTPUT statement determines the display order of the resultant rowset.

PERCENTILE_CONT & PERCENTILE_DISC

These two functions calculates a percentile based on a continuous or discrete distribution of the column values.

  • PERCENTILE_CONT calculates a percentile based on a continuous distribution. It will interpolate values.
  • PERCENTILE_DISC calculates the percentile based on a discrete distribution. It will always return one of the input values and will not interpolate a value.

Syntax

[PERCENTILE_CONT | PERCENTILE_DISC] ( numeric_literal )
    WITHIN GROUP ( ORDER BY <identifier> [ASC | DESC] )
    OVER ( [PARTITION BY <identifier,>…[n] ] ) AS <alias>
  • numeric_literal – The percentile to compute. range = [0.0, 1.0].
  • WITHIN GROUP ( ORDER BY <identifier> [ASC | DESC]) – Within each partition, compute the percentile on identifier. The default sort order is ascending.
  • OVER ( [PARTITION BY <identifierm,...> [n] ] ) – Defines the partitions.

Note: Any nulls in the data set are ignored.

PERCENTILE_CONT vs PERCENTILE_DISC

You can see how PERCENTILE_CONT and PERCENTILE_DISC differ in the example below which tries to find the median (percentile=0.50) value for Latency within each Vertical

@result =
    SELECT
        Vertical,
        Query,
        PERCENTILE_CONT(0.5) 
            WITHIN GROUP (ORDER BY Latency)
            OVER ( PARTITION BY Vertical ) AS PercentileCont50,
        PERCENTILE_DISC(0.5)
            WITHIN GROUP (ORDER BY Latency)
            OVER ( PARTITION BY Vertical ) AS PercentileDisc50
    FROM @querylog;

The results:

QueryLatencyVerticalPercentileCont50PercentilDisc50
Banana300Image300300
Cherry300Image300300
Durian500Image300300
Apple100Web250200
Fig200Web250200
Papaya200Web250200
Fig300Web250200
Cherry400Web250200
Durian500Web250200

Look at the median for the Web vertical.

  • PERCENTILE_CONT gives the median as 250 even though no query in the web vertical had a latency of 250.
  • PERCENTILE_DISC gives median for Web as 200, which is an actual value found in the input rows.

PERCENT_RANK

PERCENT_RANK calculates the relative rank of a row within a group of rows. PERCENT_RANK is used to evaluate the relative standing of a value within a rowset or partition. The range of values returned by PERCENT_RANK is greater than 0 and less than or equal to 1.

PERCENT_RANK( )
    OVER (
        [PARTITION BY <identifier>; …[n]]
        ORDER BY <identifier,> …[n] [ASC|DESC]
    ) AS <alias>;

Notes

  • The first row in any set has a PERCENT_RANK of 0.
  • NULL values are treated as the lowest possible values.
  • PERCENT_RANK is similar to the CUME_DIST the function. Unlike CUME_DIST, PERCENT_RANK is always 0 for the first row.

The following example uses the PERCENT_RANK function to compute the latency percentile for each query within a vertical. The PARTITION BY clause is specified to partition the rows in the result set by the vertical. The ORDER BY clause in the OVER clause orders the rows in each partition. The value returned by the PERCENT_RANK function represents the rank of the queries’ latency within a vertical as a percentage.

@result=
    SELECT
        *,
        PERCENT_RANK() 
            OVER (PARTITION BY Vertical ORDER BY Latency) AS PercentRank
    FROM @querylog;
QueryLatency:intVerticalPercentRank
Banana300Image0
Cherry300Image0
Durian500Image1
Apple100Web0
Fig200Web0.2
Papaya200Web0.2
Fig300Web0.6
Cherry400Web0.8
Durian500Web1

Window Ranking Functions

Ranking functions return a ranking value (a long) for each row in each partition as defined by the PARTITION BY and OVER clauses. The ordering of the rank is controlled by the ORDER BY in the OVER clause.

The following are supported ranking functions:

  • RANK
  • DENSE_RANK
  • ROW_NUMBER
  • NTILE

Syntax:

[RANK() | DENSE_RANK() | ROW_NUMBER() | NTILE(<numgroups>)]
    OVER (
    [PARTITION BY <identifier, > …[n]]
    [ORDER BY <identifier, > …[n] [ASC|DESC]]
    ) AS <alias>

The ORDER BY clause is optional for ranking functions. If ORDER BY is specified then it determines the order of the ranking. If ORDER BY is not specified then U-SQL assigns values based on the order it reads record. Thus resulting into non deterministic value of row number, rank or dense rank in the case were order by clause is not specified.

NTILE requires an expression that evaluates to a positive integer. This number specifies the number of groups into which each partition must be divided. This identifier is used only with the NTILE ranking function.

For more details on the OVER clause, see U-SQL reference.

ROW_NUMBER, RANK, and DENSE_RANK

ROW_NUMBER, RANK, and DENSE_RANK all assign numbers to rows in a window. Rather than cover them separately, it’s more intuitive to see how They respond to the same input.

@result =
SELECT
*,
ROW_NUMBER() OVER (PARTITION BY Vertical ORDER BY Latency) AS RowNumber,
RANK() OVER (PARTITION BY Vertical ORDER BY Latency) AS Rank,
DENSE_RANK() OVER (PARTITION BY Vertical ORDER BY Latency) AS DenseRank
FROM @querylog;

Note the OVER clauses are identical. The result:

QueryLatency:intVerticalRowNumberRankDenseRank
Banana300Image111
Cherry300Image211
Durian500Image332
Apple100Web111
Fig200Web222
Papaya200Web322
Fig300Web443
Cherry400Web554
Durian500Web665

ROW_NUMBER

Within each Window (Vertical,either Image or Web), the row number increases by 1 ordered by Latency.

RANK

Different from ROW_NUMBER(), RANK() takes into account the value of the Latency which is specified in the ORDER BY clause for the window.

RANK starts with (1,1,3) because the first two values for Latency are the same. Then the next value is 3 because the Latency value has moved on to 500. The key point being that even though duplicate values are given the same rank, the RANK number will “skip” to the next ROW_NUMBER value. You can see this pattern repeat with the sequence (2,2,4) in the Web vertical.

DENSE_RANK

DENSE_RANK is just like RANK except it doesn’t “skip” to the next ROW_NUMBER, instead it goes to the next number in the sequence. Notice the sequences (1,1,2) and (2,2,3) in the sample.

Remarks

  • If ORDER BY is not specified than ranking function will be applied to rowset without any ordering. This will result into non deterministic behavior on how ranking function is applied
  • There is no guarantee that the rows returned by a query using ROW_NUMBER will be ordered exactly the same with each execution unless the following conditions are true.
  • Values of the partitioned column are unique.
  • Values of the ORDER BY columns are unique.
  • Combinations of values of the partition column and ORDER BY columns are unique.

NTILE

NTILE distributes the rows in an ordered partition into a specified number of groups. The groups are numbered, starting at one.

The following example splits the set of rows in each partition (vertical) into 4 groups in the order of the query latency, and returns the group number for each row.

The Image vertical has 3 rows, thus it has 3 groups.

The Web vertical has 6 rows, the two extra rows are distributed to the first two groups. That’s why there are 2 rows in group 1 and group 2, and only 1 row in group 3 and group 4.

@result =
    SELECT
        *,
        NTILE(4) OVER(PARTITION BY Vertical ORDER BY Latency) AS Quartile
    FROM @querylog;

The results:

QueryLatencyVerticalQuartile
Banana300Image1
Cherry300Image2
Durian500Image3
Apple100Web1
Fig200Web1
Papaya200Web2
Fig300Web2
Cherry400Web3
Durian500Web4

NTILE takes a parameter (“numgroups”). Numgroups is a positive int or long constant expression that specifies the number of groups into which each partition must be divided.

If the number of rows in the partition is evenly divisible by numgroups then the groups will have equal

Assign Globally Unique Row Number

It’s often useful to assign a globally unique number to each row. This is easy (and more efficient than using a reducer) with the ranking functions.

@result =
    SELECT
        *,
        ROW_NUMBER() OVER () AS RowNumber
    FROM @querylog;

op N Records in group via RANK, DENSE_RANK or ROW_NUMBER

Many users want to select only TOP n rows per group. This is not possible with the traditional GROUP BY.

You have seen the following example at the beginning of the Ranking functions section. It doesn’t show top N records for each partition:

@result =
    SELECT
        *,
        ROW_NUMBER() OVER (PARTITION BY Vertical ORDER BY Latency) AS RowNumber,
        RANK() OVER (PARTITION BY Vertical ORDER BY Latency) AS Rank,
        DENSE_RANK() OVER (PARTITION BY Vertical ORDER BY Latency) AS DenseRank
    FROM @querylog;

The results:

QueryLatencyVerticalRankDenseRankRowNumber
Banana300Image111
Cherry300Image112
Durian500Image323
Apple100Web111
Fig200Web222
Papaya200Web223
Fig300Web434
Cherry400Web545
Durian500Web656

TOP N with DENSE RANK

The following example returns the top 3 records from each group with no gaps in the sequential rank numbering of rows in each windowing partition.

@result =
    SELECT
        *,
        DENSE_RANK() 
            OVER (PARTITION BY Vertical ORDER BY Latency) AS DenseRank
    FROM @querylog;

@result =
    SELECT *
    FROM @result
    WHERE DenseRank <= 3;

The results:

QueryLatencyVerticalDenseRank
Banana300Image1
Cherry300Image1
Durian500Image2
Apple100Web1
Fig200Web2
Papaya200Web2
Fig300Web3

TOP N with RANK

@result =
    SELECT
        *,
        RANK() 
            OVER (PARTITION BY Vertical ORDER BY Latency) AS Rank
    FROM @querylog;

    @result =
        SELECT *
        FROM @result
        WHERE Rank <= 3;

The results:

QueryLatencyVerticalRank
Banana300Image1
Cherry300Image1
Durian500Image3
Apple100Web1
Fig200Web2
Papaya200Web2

TOP N with ROW_NUMBER

@result =
    SELECT
        *,
        ROW_NUMBER() 
            OVER (PARTITION BY Vertical ORDER BY Latency) AS RowNumber
    FROM @querylog;

@result =
    SELECT *
    FROM @result
    WHERE RowNumber <= 3;

The results:

QueryLatencyVerticalRowNumber
Banana300Image1
Cherry300Image2
Durian500Image3
Apple100Web1
Fig200Web2
Papaya200Web3

Any N Records per group

Just as you sometimes want to get the “TOP n rows per group”. Sometimes you just want “ANY n rows per group”.

For example, “list any 10 customers per zipcode”.

@result =
    SELECT
        *,
        ROW_NUMBER() 
            OVER (PARTITION BY ZipCode ORDER BY 1) AS RowNumber
    FROM @customers;

@result =
    SELECT *
    FROM @result
    WHERE RowNumber <= 10;

This technique makes use of the “ORDER BY 1” pattern – which effectively means no ordering is performed.

Getting the Rows that have the maximum (or minimum) value for a Column within a partition

Another scenario easily done through the ranking functions, is finding the row that contains the max value in a partition

Returning to our original input data set, imagine we want to partition by Vertical and within each vertical find the row that has the maximum value for latency.

QueryLatencyVertical
Banana300Image
Cherry300Image
Durian500Image
Apple100Web
Fig200Web
Papaya200Web
Fig300Web
Cherry400Web
Durian500Web

The desired output for is as follows. As clearly 500 is the maximum latency in both Image and Web

QueryLatencyVertical
Durian500Image
Durian500Web

The U-SQL that accomplishes this uses ROW_NUMBER

@results =
     SELECT
         Query,
         Latency,
         Vertical,
         ROW\_NUMBER() OVER (PARTITION BY Vertical ORDER BY Latency DESC) AS rn
     FROM @querylog;

@results =
     SELECT
         Query,
         Latency,
         Vertical
     FROM @results
     WHERE rn==1;

To retrieve the row with the minimum value for each partition, in the OVER clause change the DESC to ASC.