Part 5: Working with FileSets

Reading and Writing Files: Built-in Extractors

U-SQL has three built-in extractors that handle text

  • Extractors.Csv() reads comma-separated value (CSV)
  • Extractors.Tsv() reads tab-separated value (TSV)
  • Extractors.Text() reads delimited text files.

Extractors.Csv and Extractors.Tsv are the same as Extractors.Text but they default to a specific delimiter appropriate for the format they support.

Text Encoding

By default all the extractors default to UTF-8 as the encoding.

All three built-in extractors allow you to control the encoding through the encoding parameter as shown below.

@searchlog = 
    EXTRACT UserId          int, 
            Start           DateTime, 
            Region          string, 
            Query           string, 
            Duration        int, 
            Urls            string, 
            ClickedUrls     string
    FROM "/SearchLog.tsv"
    USING Extractors.Tsv( encoding: Encoding.[ASCII] );

These are the supported text encodings:

Encoding.[ASCII]
Encoding.BigEndianUnicode
Encoding.Unicode
Encoding.UTF7
Encoding.UTF8
Encoding.UTF32

FileSets

We’ve seen that you can explicitly list all the files in the EXTRACT statement. In some cases, there might be a large number of files, so you may not want to list all the files manually every time.

FileSets make it easy to define a pattern to identify a set of files to read.

In the simplest case let’s get all the files in a folder.

@rs =
    EXTRACT 
        user   string,
        id     string,
    FROM 
        "/input/{*}"
    USING Extractors.Csv();

Specifying a list of files that have an extension

This is very simple modification syntax. The example uses will extract all the files that end with “.csv”.

@rs =
    EXTRACT 
        user   string,
        id     string,
    FROM 
        "/input/{*}.csv"
    USING Extractors.Csv();

Getting filenames as a column in the RowSet

Because we are reading rows from multiple files. it is convenient to for the rows to have some information about the filename it came from. We can adjust the query slightly to make this possible.

@rs =
    EXTRACT 
        user       string,
        id         string,
        __filename string
    FROM 
        "/input/{__filename}"
    USING Extractors.Csv();

You are probably wondering about the __ in the column __filename. It isn’t necessary at all, however it is useful as a way of marking that this information came from the process of extracting the file, not from the data in the file itself.

To emphasize that the naming of __filename and the use of the __ prefix was completely arbitrary below is the same script with a different name (foo).

@rs =
    EXTRACT 
        user string,
        id   string,
        foo  string
    FROM 
        "/input/{foo}"
    USING Extractors.Csv();

Getting parts of a filename as a column in the RowSet

Instead of the full filename, we can also get part of the filename. The sample below shows how to get just the number part.

@rs =
    EXTRACT 
        user       string,
        id         string,
        __filenum  int
    FROM 
        "/input/data{__filenum}.csv"
    USING Extractors.Csv();

Notes

  • The schemas for all the files in the FileSet must match the schema specified in the extract.
  • The more files there are in the FileSet the longer the compilation time will take.

Using WHERE to filter the files

FileSets also let us filter the inputs so that the EXTRACT only chooses some of those files.

For example, imagine we have 10 files as shown below.

/input/data1.csv
/input/data2.csv
/input/data3.csv
/input/data4.csv
/input/data5.csv
/input/data6.csv
/input/data7.csv
/input/data8.csv
/input/data9.csv
/input/data10.csv

The following EXTRACT will read all 10 files

@rs =
    EXTRACT 
        user       string,
        id         string,
        __filenum  int,
    FROM 
        "/input/data{__filenum}.csv"
    USING Extractors.Csv();

However by adding a WHERE clause that uses __filenum only those files that are matched by the WHERE clause are read. Only 3 files are read (7,8,9).

@rs =
    EXTRACT 
        user       string,
        id         string,
        __filenum  int,
    FROM 
        "/input/data{__filenum}.csv"
    USING Extractors.Csv();

@rs =
    SELECT *
    FROM @rs
    WHERE 
        ( __filenum >= 7  ) AND 
        ( __filenum <= 9 );

FileSets with dates

File names or paths often include information about dates and times. This is often the case for log files. FileSets make it easy to handle these cases.

Image we have files that are named in this pattern "data-YEAR-MONTH-DAY.csv". The following query reads all the files where with that pattern.

@rs = 
  EXTRACT 
      user    string,
      id      string,
      __date  DateTime
  FROM 
    "/input/data-{__date:yyyy}-{__date:MM}-{__date:dd}.csv"
  USING Extractors.Csv();

Many times you’ll have to restrict the files to a specific time range. This can be done by refining the RowSet with a WHERE clause.

@rs = 
  EXTRACT 
      user    string,
      id      string,
      __date  DateTime
  FROM 
    "/input/data-{__date:yyyy}-{__date:MM}-{__date:dd}.csv"
  USING Extractors.Csv();

@rs = 
  SELECT * 
  FROM @rs
  WHERE 
    date >= System.DateTime.Parse("2016/1/1") AND
    date < System.DateTime.Parse("2016/2/1");

In the above example we used all three parts of the date in the file path. However, you can use any parts you need and don’t need to use them all. The following example shows a file path that only uses the year and month.

@rs =
    EXTRACT 
      user    string,
      id      string,
      __date  DateTime
  FROM 
    "/input/data-{__date:yyyy}-{__date:MM}.csv"
  USING Extractors.Csv();

@rs = 
  SELECT * 
  FROM @rs
  WHERE
    date >= System.DateTime.Parse("2016/1/1") AND
    date < System.DateTime.Parse("2016/2/1");

Grouping and Aggregation

Grouping, in essence, collapses multiple rows into single rows based on some criteria. Hand-in-hand with performing a grouping operation, some fields in the output rowset must be aggregated into some meaningful value (or discarded if no possible or meaningful aggregation can be done).

We can witness this behavior by building up to it in stages.

// list all session durations.  
@output =  
  SELECT Duration  
  FROM @searchlog;

This creates a simple list of integers.

Duration
73
614
74
24
1213
241
502
60
1270
610
422
283
305
10
612
1220
691
63
30
119
732
183
630

Now, let’s add all the numbers together. This yields a RowSet with
exactly one row and one column.

// Find the total duration for all sessions combined
@output =  
  SELECT  
    SUM(Duration) AS TotalDuration  
  FROM @searchlog;
Duration
9981

Now let’s use the GROUP BY operator to break apart the totals by
Region.

// find the total Duration by Region
@output =
  SELECT
    Region,
    SUM(Duration) AS TotalDuration
  FROM searchlog
  GROUP BY Region;

This returns:

en_ca24
en_ch10
en_fr241
en_gb688
en_gr305
en_mx422
en_us8291

This is a good opportunity to explore a common use of the HAVING operator. We can use HAVING to restrict the output RowSet to those rows that have aggregate values we are interested in. For example, perhaps we want to find all the Regions where total dwell time is above some value.

// find all the Regions where the total dwell time is &gt; 200
@output =
  SELECT
    Region,
    SUM(Duration) AS TotalDuration
  FROM @searchlog
  GROUP BY Region
  HAVING TotalDuration > 200;
en-fr241
en-gb688
en-gr305
en-mx422
en-us8291
// Count the number of total sessions.
@output =
  SELECT
    COUNT() AS NumSessions
  FROM @searchlog;
23

Count the number of total sessions by Region.

@output =
  SELECT
    COUNT() AS NumSessions,
    Region
  FROM @searchlog
  GROUP BY Region;
1en_ca
1en_ch
1en_fr
2en_gb
1en_gr
1en_mx
16en_us

Count the number of total sessions by Region and include total duration for that language.

@output =
  SELECT
    COUNT() AS NumSessions,
    Region,
    SUM(Duration) AS TotalDuration,
    AVG(Duration) AS AvgDwellTtime,
    MAX(Duration) AS MaxDuration,
    MIN(Duration) AS MinDuration
    FROM @searchlog
  GROUP BY Region;
NumSessionsRegionTotalDurationAvgDurationMaxDurationMinDuration
1en_ca24242424
1en_ch10101010
1en_fr241241241241
2en_gb68834461474
1en_gr305305305305
1en_mx422422422422
16en_us8291518.1875127030

Data types coming from aggregate functions

You should be aware of how some aggregation operators deal with data types. Some aggregations will promote a numeric type to a “larger” type.

For example:

  • SUM(floatexpr) -> double
  • SUM(doubleexpr) -> double
  • SUM(intexpr) -> long
  • SUM(byteexpr) -> long

DISTINCT with Aggregates

Every aggregate function can take a DISTINCT qualifier.

For example

COUNT(DISTINCT x)

Notes

Aggregates can ONLY appear in a SELECT clause.

Conditionally counting

Consider the following RowSet:

@a  = 
    SELECT * FROM 
        (VALUES
            ("Contoso",    1500.0),
            ("Woodgrove",  2700.0),
            ((string)null, 6700.0)
        ) AS D( customer, amount );

We can easily count the number of rows:

@b =
    SELECT 
       COUNT() AS Count1
       FROM @a;

But what if we wanted to count the rows only if they met a certain criteria? We can accomplish this my using the SUM operator with an expression that will return either a 1 or 0.

@b =
    SELECT 
       COUNT() AS Count,
       SUM(customer.Contains("o") ? 1 : 0) AS CountContainsLowercaseO,
       SUM(customer==null ? 1: 0) AS CountIsNull
    FROM @a;
CountCountContainsLowercaseOnt2CountIsNull
321

Filtering Aggregated Rows

We’ll start again with a simple GROUP BY

@output =
  SELECT
    Region,
    SUM(Duration) AS TotalDuration
  FROM @searchlog
  GROUP BY Region;
RegionTotalDuration
en_ca24
en_ch10
en_fr241
en_gb688
en_gr305
en_mx422
en_us8291

Filtering with WHERE

You might try WHERE here.

@output =
  SELECT  
    Region,  
    SUM(Duration) AS TotalDuration  
  FROM @searchlog  
  WHERE TotalDuration > 200  
  GROUP BY Region;

Which will cause an error. Because WHERE can only work on the input columns to the statement, not the output columns

We could use multiple U-SQL Statements to accomplish this

@output =  
  SELECT  
    Region,  
    SUM(Duration) AS TotalDuration  
  FROM @searchlog  
  GROUP BY Region;  

@output =  
  SELECT *  
  FROM @output  
  WHERE TotalDuration > 200;

Filtering with HAVING

Alternatively , we can use the HAVING clause which is designed to filter columns when a GROUP BY is used..

@output =  
  SELECT  
    Region,  
    SUM(Duration) AS TotalDuration  
  FROM @searchlog  
  GROUP BY Region  
  HAVING SUM(Duration) > 200;

You may have noticed that SUM(Duration) was repeated in the HAVING clause. That’s because HAVING (like WHERE) cannot use columns created in the SELECT clause.

Aggregate functions

U-SQL contains several common aggregation functions:

  • AVG
  • COUNT
  • ANY_VALUE
  • FIRST_VALUE
  • LAST_VALUE
  • MAX
  • MIN
  • SUM
  • VAR
  • STDEV
  • ARRAY_AGG
  • MAP_AGG

Basic Statistics with MAX, MIN, AVG, STDEV, & SUM

These do what you expect them to do

@output =
    SELECT
        MAX(Duration) AS DurationMax,
        MIN(Duration) AS DurationMin,
        AVG(Duration) AS DurationAvg,
        SUM(Duration) AS DurationSum,
        VAR(Duration) AS DurationVariance,
        STDEV(Duration) AS DurationStDev,
        VARP(Duration) AS DurationVarianceP,
        STDEVP(Duration) AS DurationStDevP
    FROM @searchlog
    GROUP BY Region;

Notes for Statisticians

VAR & STDEV are the sample version with Bessel’s correction VARP & STDEVP are the better-known population version.

ANY_VALUE

ANY_VALUE gets a value for that column with no implications about the where inside that rowset the value came from. It could be the first value, the last value, are on value in between. It is useful because in some scenarios (for example when using Window Functions) where you don’t care which value you receive as long as you get one.

@output =
  SELECT
    ANY_VALUE(Start) AS FirstStart,
    Region
  FROM @searchlog
  GROUP BY Region;

CROSS APPLY and ARRAY_AGG

CROSS APPLY and ARRAY_AGG support two very common scenarios in transforming text

  • CROSS APPLY can break a row apart into multiple rows
  • ARRAY_AGG joins rows into a single row

What we will how is how to move back and forth between the following two RowSets

The first RowSet has a column called Urls we want to split

RegionUrls
en-usA;B;C
en-gbD;E;F

The second RowSet has a column called Url we want to merge

RegionUrl
en-usA
en-usB
en-usC
en-gbD
en-gbE
en-gbF

Breaking apart rows with CROSS APPLY

Let’s examine the searchlog again and extract the Region and Urls columns.

@a = 
  SELECT 
    Region, 
    Urls
  FROM @searchlog;

@a looks like this:

RegionUrls
en-usA;B;C
en-gbD;E;F

The Urls column contains strings, but each string is a semicolon-separated list of URLs. We will eventually use CROSS APPLY to break that column apart. But first we must transform the string into an array that CROSS APPLY can work with.

@b =  
  SELECT 
    Region, 
    SqlArray.Create(Urls.Split(';')) AS UrlTokens  
  FROM @a;

@b looks like this

RegionUrls
en-usSqlArray{“A”,”B”,”C”}
en-gbSqlArray{“D”,”E”,”F”}

Now we can use CROSS APPLY to break the rows apart.

@c =  
  SELECT 
    Region, 
    Token AS Url  
  FROM @b   
   CROSS APPLY EXPLODE (UrlTokens) AS r(Token);

@c looks like this

RegionUrl
en-usA
en-usB
en-usC
en-gbD
en-gbE
en-gbF

Merging rows with ARRAY_AGG

Now, let’s reverse the scenario and merge the Url column for each region with ARRAY_AGG

First, we’ll merge the Urls together into an array with ARRAY_AGG

@d =  
  SELECT 
    Region, 
    ARRAY_AGG<string>(Url) AS UrlsArray  
  FROM @c
  GROUP BY Region;

@d looks like this

RegionUrlsArray
en-usSqlArray{“A”,”B”,”C”}
en-gbSqlArray{“D”,”E”,”F”}

Now that we have arrays of strings, we will collapse the each array into a string using string.Join.

@e =  
  SELECT 
    Region, 
    string.Join(";", UrlsArray) AS Urls  
  FROM @d;

Finally @e looks like this. We are back to where we started.

RegionUrls
en-usA;B;C
en-gbD;E;F

Writing a U-SQL Query to Process Multiple Files:

Now that I’ve done quite a bit of processing on a single file, I’ll take things one step further by processing data in multiple files.

In the New U-SQL Job blade, I’ll type ‘Summarize Logs’ and enter the following script in the code window:

@log = EXTRACT date string,                
               time string,                
               client_ip string,                
               username string,                
               server_ip string,                
               port int,                
               method string,                
               stem string,                
               query string,                
               status string,                
               server_bytes int,                
               client_bytes int,                
               time_taken int,                
               user_agent string,                
               referrer string    
       FROM "/bigdata/{*}.txt"         
       USING Extractors.Text(' ', silent:true); 
 
@dailysummary = SELECT date,                        
             COUNT(*) AS hits,                        
             SUM(server_bytes) AS bytes_sent,
             SUM(client_bytes) AS bytes_received                 
             FROM @log                 
             GROUP BY date; 
 
OUTPUT @dailysummary 
TO "/output/six_month_summary.csv" 
ORDER BY date 
USING Outputters.Csv();			

Notice how the code uses the wildcard placeholder {*} to read all files with a .txt extension from the specified folder.