Reading and Writing Files: Built-in Extractors
U-SQL has three built-in extractors that handle text
Extractors.Csv()
reads comma-separated value (CSV)Extractors.Tsv()
reads tab-separated value (TSV)Extractors.Text()
reads delimited text files.
Extractors.Csv
and Extractors.Tsv
are the same as Extractors.Text
but they default to a specific delimiter appropriate for the format they support.
Text Encoding
By default all the extractors default to UTF-8 as the encoding.
All three built-in extractors allow you to control the encoding through the encoding parameter as shown below.
@searchlog =
EXTRACT UserId int,
Start DateTime,
Region string,
Query string,
Duration int,
Urls string,
ClickedUrls string
FROM "/SearchLog.tsv"
USING Extractors.Tsv( encoding: Encoding.[ASCII] );
These are the supported text encodings:
Encoding.[ASCII]
Encoding.BigEndianUnicode
Encoding.Unicode
Encoding.UTF7
Encoding.UTF8
Encoding.UTF32
FileSets
We’ve seen that you can explicitly list all the files in the EXTRACT statement. In some cases, there might be a large number of files, so you may not want to list all the files manually every time.
FileSets make it easy to define a pattern to identify a set of files to read.
In the simplest case let’s get all the files in a folder.
@rs =
EXTRACT
user string,
id string,
FROM
"/input/{*}"
USING Extractors.Csv();
Specifying a list of files that have an extension
This is very simple modification syntax. The example uses will extract all the files that end with “.csv”.
@rs =
EXTRACT
user string,
id string,
FROM
"/input/{*}.csv"
USING Extractors.Csv();
Getting filenames as a column in the RowSet
Because we are reading rows from multiple files. it is convenient to for the rows to have some information about the filename it came from. We can adjust the query slightly to make this possible.
@rs =
EXTRACT
user string,
id string,
__filename string
FROM
"/input/{__filename}"
USING Extractors.Csv();
You are probably wondering about the __
in the column __filename
. It isn’t necessary at all, however it is useful as a way of marking that this information came from the process of extracting the file, not from the data in the file itself.
To emphasize that the naming of __filename
and the use of the __
prefix was completely arbitrary below is the same script with a different name (foo
).
@rs =
EXTRACT
user string,
id string,
foo string
FROM
"/input/{foo}"
USING Extractors.Csv();
Getting parts of a filename as a column in the RowSet
Instead of the full filename, we can also get part of the filename. The sample below shows how to get just the number part.
@rs =
EXTRACT
user string,
id string,
__filenum int
FROM
"/input/data{__filenum}.csv"
USING Extractors.Csv();
Notes
- The schemas for all the files in the FileSet must match the schema specified in the extract.
- The more files there are in the FileSet the longer the compilation time will take.
Using WHERE to filter the files
FileSets also let us filter the inputs so that the EXTRACT only chooses some of those files.
For example, imagine we have 10 files as shown below.
/input/data1.csv
/input/data2.csv
/input/data3.csv
/input/data4.csv
/input/data5.csv
/input/data6.csv
/input/data7.csv
/input/data8.csv
/input/data9.csv
/input/data10.csv
The following EXTRACT will read all 10 files
@rs =
EXTRACT
user string,
id string,
__filenum int,
FROM
"/input/data{__filenum}.csv"
USING Extractors.Csv();
However by adding a WHERE clause that uses __filenum
only those files that are matched by the WHERE clause are read. Only 3 files are read (7,8,9).
@rs =
EXTRACT
user string,
id string,
__filenum int,
FROM
"/input/data{__filenum}.csv"
USING Extractors.Csv();
@rs =
SELECT *
FROM @rs
WHERE
( __filenum >= 7 ) AND
( __filenum <= 9 );
FileSets with dates
File names or paths often include information about dates and times. This is often the case for log files. FileSets make it easy to handle these cases.
Image we have files that are named in this pattern "data-YEAR-MONTH-DAY.csv"
. The following query reads all the files where with that pattern.
@rs =
EXTRACT
user string,
id string,
__date DateTime
FROM
"/input/data-{__date:yyyy}-{__date:MM}-{__date:dd}.csv"
USING Extractors.Csv();
Many times you’ll have to restrict the files to a specific time range. This can be done by refining the RowSet with a WHERE clause.
@rs =
EXTRACT
user string,
id string,
__date DateTime
FROM
"/input/data-{__date:yyyy}-{__date:MM}-{__date:dd}.csv"
USING Extractors.Csv();
@rs =
SELECT *
FROM @rs
WHERE
date >= System.DateTime.Parse("2016/1/1") AND
date < System.DateTime.Parse("2016/2/1");
In the above example we used all three parts of the date in the file path. However, you can use any parts you need and don’t need to use them all. The following example shows a file path that only uses the year and month.
@rs =
EXTRACT
user string,
id string,
__date DateTime
FROM
"/input/data-{__date:yyyy}-{__date:MM}.csv"
USING Extractors.Csv();
@rs =
SELECT *
FROM @rs
WHERE
date >= System.DateTime.Parse("2016/1/1") AND
date < System.DateTime.Parse("2016/2/1");
Grouping and Aggregation
Grouping, in essence, collapses multiple rows into single rows based on some criteria. Hand-in-hand with performing a grouping operation, some fields in the output rowset must be aggregated into some meaningful value (or discarded if no possible or meaningful aggregation can be done).
We can witness this behavior by building up to it in stages.
// list all session durations.
@output =
SELECT Duration
FROM @searchlog;
This creates a simple list of integers.
Duration |
---|
73 |
614 |
74 |
24 |
1213 |
241 |
502 |
60 |
1270 |
610 |
422 |
283 |
305 |
10 |
612 |
1220 |
691 |
63 |
30 |
119 |
732 |
183 |
630 |
Now, let’s add all the numbers together. This yields a RowSet with
exactly one row and one column.
// Find the total duration for all sessions combined
@output =
SELECT
SUM(Duration) AS TotalDuration
FROM @searchlog;
Duration |
---|
9981 |
Now let’s use the GROUP BY operator to break apart the totals byRegion
.
// find the total Duration by Region
@output =
SELECT
Region,
SUM(Duration) AS TotalDuration
FROM searchlog
GROUP BY Region;
This returns:
en_ca | 24 |
---|---|
en_ch | 10 |
en_fr | 241 |
en_gb | 688 |
en_gr | 305 |
en_mx | 422 |
en_us | 8291 |
This is a good opportunity to explore a common use of the HAVING operator. We can use HAVING to restrict the output RowSet to those rows that have aggregate values we are interested in. For example, perhaps we want to find all the Regions where total dwell time is above some value.
// find all the Regions where the total dwell time is > 200
@output =
SELECT
Region,
SUM(Duration) AS TotalDuration
FROM @searchlog
GROUP BY Region
HAVING TotalDuration > 200;
en-fr | 241 |
---|---|
en-gb | 688 |
en-gr | 305 |
en-mx | 422 |
en-us | 8291 |
// Count the number of total sessions.
@output =
SELECT
COUNT() AS NumSessions
FROM @searchlog;
23 |
---|
Count the number of total sessions by Region.
@output =
SELECT
COUNT() AS NumSessions,
Region
FROM @searchlog
GROUP BY Region;
1 | en_ca |
---|---|
1 | en_ch |
1 | en_fr |
2 | en_gb |
1 | en_gr |
1 | en_mx |
16 | en_us |
Count the number of total sessions by Region and include total duration for that language.
@output =
SELECT
COUNT() AS NumSessions,
Region,
SUM(Duration) AS TotalDuration,
AVG(Duration) AS AvgDwellTtime,
MAX(Duration) AS MaxDuration,
MIN(Duration) AS MinDuration
FROM @searchlog
GROUP BY Region;
NumSessions | Region | TotalDuration | AvgDuration | MaxDuration | MinDuration |
---|---|---|---|---|---|
1 | en_ca | 24 | 24 | 24 | 24 |
1 | en_ch | 10 | 10 | 10 | 10 |
1 | en_fr | 241 | 241 | 241 | 241 |
2 | en_gb | 688 | 344 | 614 | 74 |
1 | en_gr | 305 | 305 | 305 | 305 |
1 | en_mx | 422 | 422 | 422 | 422 |
16 | en_us | 8291 | 518.1875 | 1270 | 30 |
Data types coming from aggregate functions
You should be aware of how some aggregation operators deal with data types. Some aggregations will promote a numeric type to a “larger” type.
For example:
SUM(floatexpr)
-> doubleSUM(doubleexpr)
-> doubleSUM(intexpr)
-> longSUM(byteexpr)
-> long
DISTINCT with Aggregates
Every aggregate function can take a DISTINCT qualifier.
For example
COUNT(DISTINCT x)
Notes
Aggregates can ONLY appear in a SELECT clause.
Conditionally counting
Consider the following RowSet:
@a =
SELECT * FROM
(VALUES
("Contoso", 1500.0),
("Woodgrove", 2700.0),
((string)null, 6700.0)
) AS D( customer, amount );
We can easily count the number of rows:
@b =
SELECT
COUNT() AS Count1
FROM @a;
But what if we wanted to count the rows only if they met a certain criteria? We can accomplish this my using the SUM operator with an expression that will return either a 1 or 0.
@b =
SELECT
COUNT() AS Count,
SUM(customer.Contains("o") ? 1 : 0) AS CountContainsLowercaseO,
SUM(customer==null ? 1: 0) AS CountIsNull
FROM @a;
Count | CountContainsLowercaseOnt2 | CountIsNull |
---|---|---|
3 | 2 | 1 |
Filtering Aggregated Rows
We’ll start again with a simple GROUP BY
@output =
SELECT
Region,
SUM(Duration) AS TotalDuration
FROM @searchlog
GROUP BY Region;
Region | TotalDuration |
---|---|
en_ca | 24 |
en_ch | 10 |
en_fr | 241 |
en_gb | 688 |
en_gr | 305 |
en_mx | 422 |
en_us | 8291 |
Filtering with WHERE
You might try WHERE
here.
@output =
SELECT
Region,
SUM(Duration) AS TotalDuration
FROM @searchlog
WHERE TotalDuration > 200
GROUP BY Region;
Which will cause an error. Because WHERE
can only work on the input columns to the statement, not the output columns
We could use multiple U-SQL Statements to accomplish this
@output =
SELECT
Region,
SUM(Duration) AS TotalDuration
FROM @searchlog
GROUP BY Region;
@output =
SELECT *
FROM @output
WHERE TotalDuration > 200;
Filtering with HAVING
Alternatively , we can use the HAVING clause which is designed to filter columns when a GROUP BY is used..
@output =
SELECT
Region,
SUM(Duration) AS TotalDuration
FROM @searchlog
GROUP BY Region
HAVING SUM(Duration) > 200;
You may have noticed that SUM(Duration)
was repeated in the HAVING clause. That’s because HAVING (like WHERE) cannot use columns created in the SELECT clause.
Aggregate functions
U-SQL contains several common aggregation functions:
- AVG
- COUNT
- ANY_VALUE
- FIRST_VALUE
- LAST_VALUE
- MAX
- MIN
- SUM
- VAR
- STDEV
- ARRAY_AGG
- MAP_AGG
Basic Statistics with MAX, MIN, AVG, STDEV, & SUM
These do what you expect them to do
@output =
SELECT
MAX(Duration) AS DurationMax,
MIN(Duration) AS DurationMin,
AVG(Duration) AS DurationAvg,
SUM(Duration) AS DurationSum,
VAR(Duration) AS DurationVariance,
STDEV(Duration) AS DurationStDev,
VARP(Duration) AS DurationVarianceP,
STDEVP(Duration) AS DurationStDevP
FROM @searchlog
GROUP BY Region;
Notes for Statisticians
VAR & STDEV are the sample version with Bessel’s correction VARP & STDEVP are the better-known population version.
ANY_VALUE
ANY_VALUE gets a value for that column with no implications about the where inside that rowset the value came from. It could be the first value, the last value, are on value in between. It is useful because in some scenarios (for example when using Window Functions) where you don’t care which value you receive as long as you get one.
@output =
SELECT
ANY_VALUE(Start) AS FirstStart,
Region
FROM @searchlog
GROUP BY Region;
CROSS APPLY and ARRAY_AGG
CROSS APPLY and ARRAY_AGG support two very common scenarios in transforming text
- CROSS APPLY can break a row apart into multiple rows
- ARRAY_AGG joins rows into a single row
What we will how is how to move back and forth between the following two RowSets
The first RowSet has a column called Urls we want to split
Region | Urls |
---|---|
en-us | A;B;C |
en-gb | D;E;F |
The second RowSet has a column called Url
we want to merge
Region | Url |
---|---|
en-us | A |
en-us | B |
en-us | C |
en-gb | D |
en-gb | E |
en-gb | F |
Breaking apart rows with CROSS APPLY
Let’s examine the searchlog again and extract the Region
and Urls
columns.
@a =
SELECT
Region,
Urls
FROM @searchlog;
@a looks like this:
Region | Urls |
---|---|
en-us | A;B;C |
en-gb | D;E;F |
The Urls column contains strings, but each string is a semicolon-separated list of URLs. We will eventually use CROSS APPLY to break that column apart. But first we must transform the string into an array that CROSS APPLY can work with.
@b =
SELECT
Region,
SqlArray.Create(Urls.Split(';')) AS UrlTokens
FROM @a;
@b looks like this
Region | Urls |
---|---|
en-us | SqlArray{“A”,”B”,”C”} |
en-gb | SqlArray{“D”,”E”,”F”} |
Now we can use CROSS APPLY to break the rows apart.
@c =
SELECT
Region,
Token AS Url
FROM @b
CROSS APPLY EXPLODE (UrlTokens) AS r(Token);
@c looks like this
Region | Url |
---|---|
en-us | A |
en-us | B |
en-us | C |
en-gb | D |
en-gb | E |
en-gb | F |
Merging rows with ARRAY_AGG
Now, let’s reverse the scenario and merge the Url column for each region with ARRAY_AGG
First, we’ll merge the Urls together into an array with ARRAY_AGG
@d =
SELECT
Region,
ARRAY_AGG<string>(Url) AS UrlsArray
FROM @c
GROUP BY Region;
@d looks like this
Region | UrlsArray |
---|---|
en-us | SqlArray{“A”,”B”,”C”} |
en-gb | SqlArray{“D”,”E”,”F”} |
Now that we have arrays of strings, we will collapse the each array into a string using string.Join
.
@e =
SELECT
Region,
string.Join(";", UrlsArray) AS Urls
FROM @d;
Finally @e looks like this. We are back to where we started.
Region | Urls |
---|---|
en-us | A;B;C |
en-gb | D;E;F |
Writing a U-SQL Query to Process Multiple Files:
Now that I’ve done quite a bit of processing on a single file, I’ll take things one step further by processing data in multiple files.
In the New U-SQL Job blade, I’ll type ‘Summarize Logs’ and enter the following script in the code window:
@log = EXTRACT date string, time string, client_ip string, username string, server_ip string, port int, method string, stem string, query string, status string, server_bytes int, client_bytes int, time_taken int, user_agent string, referrer string FROM "/bigdata/{*}.txt" USING Extractors.Text(' ', silent:true); @dailysummary = SELECT date, COUNT(*) AS hits, SUM(server_bytes) AS bytes_sent, SUM(client_bytes) AS bytes_received FROM @log GROUP BY date; OUTPUT @dailysummary TO "/output/six_month_summary.csv" ORDER BY date USING Outputters.Csv();
Notice how the code uses the wildcard placeholder {*} to read all files with a .txt extension from the specified folder.