There are 5 kinds of User-Defined entities in U-SQL
- User-Defined Functions (UDFs)
- User-Defined Types (UDTs)
- User-Defined Aggregators (UDAggs)
- User-defined Operators (UDOs)
- User-Defined Appliers
All of them are defined by .NET code. C# is not required. Any .NET language will work.
User-Defined Functions
User defined functions are normal static methods on a .NET Class. Below is the code for a simple class.
namespace OrdersLib
{
public static class Helpers
{
public static string Normalize(string s)
{ return s.ToLower(); }
}
}
Assuming you registered the assembly that contains this class using the name “MyDB.OrdersLibAsm”, you can use the assembly and the UDF as shown below.
REFERENCE ASSEMBLY MyDB.OrdersLibAsm;
@customers =
SELECT * FROM
(VALUES
("Contoso", 123 ),
("Woodgrove", 456 )
) AS D( Customer, Id );
@customers =
SELECT
OrdersLib.Helpers.Normalize(Customer) AS Customer,
Id
FROM @customers;
User-Defined Aggregators (UDAggs)
UDAggs allow you to define your own aggregate functions for U-SQL for use with GROUP BY.
Creating an aggregator
Here is a simple example of a UDAgg that manually implements a custom version of Sum
using Microsoft.Analytics.Interfaces;
namespace MVA_UDAgg
{
public class MySum : IAggregate<int, long>
{
long total;
public override void Init() { total = 0; }
public override void Accumulate(int value) { total += value; }
public override long Terminate() { return total; }
}
}
Using the UDAgg
In the SELECT clause use the AGG<T>
operator where T
is the name of your UDAgg;
AGG<UDAgg>(InputCol) AS OutputCOl
Here’s a full script:
REFERENCE ASSEMBLY AssemblyContainingUDagg;
@t =
SELECT * FROM
(VALUES
("2016/03/31","1:00","mrys","@saveenr great demo yesterday", 7 ),
("2016/03/31","7:00","saveenr","@mrys Thanks! U-SQL RuL3Z!", 4 )
) AS D( date, time, author, tweet , retweets);
@results =
SELECT
AGG<MVA_UDAgg.MySum>(retweets) AS totalretweets
FROM @t
GROUP BY date;
OUTPUT @results
TO "/output.csv"
USING Outputters.Csv();
Recursive Aggregators
If the operation in your is associative (https://en.wikipedia.org/wiki/Associative\_property\) then you should mark your aggregator with an attribute to indicate that the UDAgg is “recursive” (a.k.a “associative” ). This will improve the performance substantially of your U-SQL script has to aggregate a lot of data with your UDAgg because the UDAgg can be parallelized.
The snippet below shows how to do this.
[SqlUserDefinedReducer(IsRecursive = true)]
public class MySum : IAggregate<int, long>
{
// your code here
}
Do not just blindly add the IsRecursive
property to your UDAggs. Make sure the UDAggs support the associative property.
DEPLOY RESOURCE
Scenario
Suppose you are using a .NET library – for example one that maps IP addresses to locations. THen suppose that this .NET library loads its IP database from a file on disk – maybe a file called ip.data
.
Now if we want to use this library in U-SQL we have to do two things: (1) make sure the .NET library is available to the script via a U-SQL assembly and (2) make sure the data file is available to the script with the DEPLOY RESOURCE
statement.
A simple Hello World example
The simple example below shows how it would be done with a C# expression directly in the script.
DEPLOY RESOURCE "/helloworld.txt";
@departments =
SELECT *
FROM (VALUES
(31, "Sales"),
(33, "Engineering"),
(34, "Clerical"),
(35, "Marketing")
) AS D( DepID, DepName );
@departments =
SELECT DepID, DepName, System.[IO].File.ReadAllText("helloworld.txt") AS Message
FROM @departments;
OUTPUT @departments
TO "/departments.tsv"
USING Outputters.Tsv();
Using a processor
DEPLOY RESOURCE "/helloworld.txt";
@departments =
SELECT *
FROM (VALUES
(31, "Sales"),
(33, "Engineering"),
(34, "Clerical"),
(35, "Marketing")
) AS D( DepID, DepName );
@departments =
PROCESS @departments
PRODUCE DepID int,
DepName string,
HelloWorld string
USING new Demo.HelloWorldProcessor();
OUTPUT @departments
TO "/departments.tsv"
USING Outputters.Tsv();
using Microsoft.Analytics.Interfaces;
using Microsoft.Analytics.Types.Sql;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
namespace Demo
{
[SqlUserDefinedProcessor]
public class HelloWorldProcessor : IProcessor
{
private string hw;
public HelloWorldProcessor()
{
this.hw = System.IO.File.ReadAllText("helloworld.txt");
}
public override IRow Process(IRow input, IUpdatableRow output)
{
output.Set<int>("DepID", input.Get<int>("DepID"));
output.Set<string>("DepName", input.Get<string>("DepName"));
output.Set<string>("HelloWorld", hw);
return output.AsReadOnly();
}
}
}
User-Defined Types
Concepts
In order to create a UDT in U-SQL there are two thing that have to be created:
- The UDT type itself
- A formatter that can conver the UDT into a string and can transform a string into an instance of the UDT
Scenario
We will define a UDT – and its corresponding formatter – for a “Bits” UDT. This UDFT is a simple bitarray with a textual representation that looks like “1100101010”.
UDT and formatter skeletons
First let’s look at the fundamental structure of the Bits type.
[SqlUserDefinedType(typeof(BitFormatter))]
public struct Bits
{
}
Notice that the only thing the Bits UDT requires is that it identifies its corresponding formatter.
And here is the skeleton of its corresponding formatter.
public class BitFormatter : Microsoft.Analytics.Interfaces.IFormatter<Bits>
{
public BitFormatter()
{ ... }
public void Serialize(
Bits instance,
IColumnWriter writer,
ISerializationContext context)
{ ... }
public Bits Deserialize(
IColumnReader reader,
ISerializationContext context)
{ ... }
}
Full code for the UDT
namespace MyUDTExamples
{
[SqlUserDefinedType(typeof(BitFormatter))]
public struct Bits
{
System.Collections.BitArray bitarray;
public Bits(string s)
{
this.bitarray = new System.Collections.BitArray(s.Length);
for (int i = 0; i<s.Length; i++)
{
this.bitarray[i] = (s[s.Length-i-1] == '1' ? true : false);
}
}
public int ToInteger()
{
int value = 0;
for (int i = 0; i < this.bitarray.Length; i++)
{ if (bitarray[i]) { value += (int)System.Math.Pow(2, i); } }
return value;
}
public override string ToString()
{
var sb = new System.Text.StringBuilder(this.bitarray.Length);
for (int i = 0; i < this.bitarray.Length; i++)
{ sb.Append(this.bitarray[i] ? "1" : "0"); }
return sb.ToString();
}
}
}
Full code for the UDT’s formatter
namespace MyUDTExamples
{
public class BitFormatter : Microsoft.Analytics.Interfaces.IFormatter<Bits>
{
public BitFormatter()
{
}
public void Serialize(
Bits instance,
IColumnWriter writer,
ISerializationContext context)
{
using (var w = new System.IO.StreamWriter(writer.BaseStream))
{
var bitstring = instance.ToString();
w.Write(bitstring);
w.Flush();
}
}
public Bits Deserialize(
IColumnReader reader,
ISerializationContext context)
{
using (var w = new System.IO.StreamReader(reader.BaseStream))
{
string bitstring = w.ReadToEnd();
var bits = new Bits(bitstring);
return bits;
}
}
}
}
Using the UDT
@products =
SELECT * FROM
(VALUES
("Apple", "0000"),
("Cherry", "0001"),
("Banana", "1001"),
("Orange", "0110")
) AS
D( bitstring );
@products =
SELECT
ProductCode
BitString,
new MyUDTExamples.Bits(BitString) AS Bits
FROM @products;
Persisting UDTs
Persisting UDTs into a file
UDTs cannot be persistent directly into a file using a default U-SQL outputter. You’ll have to manually convert your UDT’s value to a U-SQL-supported datataype.
@products2 =
SELECT
ProductCode
BitString,
Bits.ToInteger() AS BitInt
FROM @products;
Now it can be persisted into a text file:
OUTPUT @products2
TO "/output.csv"
USING Outputters.Csv();
Persisting UDTs into a U-SQL table
UDTs cannot be persistent directly into a U-SQL table. You’ll have to manually convert your UDT’s value to a U-SQL-supported datataype.
CREATE TABLE MyDB.dbo.MyTable
(
INDEX idx
CLUSTERED(ProductCode ASC)
DISTRIBUTED BY HASH(ProductCode)
) AS SELECT * FROM @products2;
Tips
Generating ranges of numbers and dates
Many common scenarios for U-SQL developers require constructing a RowSet made up of a simple range of numbers or dates, for example the integers from 1 to 10. In this blog post we’ll take a look at options for doing this in U-SQL. In the process, we’ll get a chance to learn how to use some common U-SQL features:
- Creating RowSets from constant values
- Using CROSS JOIN
- Using SELECT to map integers to DateTimes
- Using CREATE TABLE to create a table directly from a RowSet. This is sometimes called “CREATE TABLE AS SELECT” and often abbreviated as “CTAS”.
First, we’ll begin by using the VALUES statement to create a simple RowSet of integers from 0 to
@numbers_10 =
SELECT*
FROM (VALUES
(0),
(1),
(2),
(3),
(4),
(5),
(6),
(7),
(8),
(9)
) AS T(Value);
This technique is simple. However, it’s disadvantages are: (1) the script will need to manually list all the numbers (2) there is an upper limit on the number of items allowed in VALUES in a U-SQL script. Currently that limit is 10,000 items.
The CROSS JOIN statement helps us generate larger lists of numbers easily. In the following snippet, CROSS JOIN is used to generate 100 integers from 0 to 99.
@numbers_100 =
SELECT (a.Value*10 + b.Value) AS Value
FROM @numbers_10 AS a
CROSS JOIN @numbers_10 AS b;
We can apply CROSS JOIN again to increase this to generate 10,000 integers from 0 to 9,999 as shown in the following sample:
@numbers_10000 =
SELECT (a.Value*100 + b.Value) AS Value
FROM @numbers_100 AS a CROSS JOIN @numbers_100 AS b;
Finally, if you want a range of dates, use SELECT to map each integer to a DateTime value as shown below.
DECLARE @StartDate = DateTime.Parse("1979-03-31");
@numbers_10000 = ...;
@result =
SELECT
Value,
@StartDate.AddDays( Value ) AS Date
FROM @numbers_10000;
Putting it all together, the full script looks like this:
DECLARE @StartDate = DateTime.Parse("1979-03-31");
@numbers_10 =
SELECT
*
FROM
(VALUES
(0),
(1),
(2),
(3),
(4),
(5),
(6),
(7),
(8),
(9)
) AS T(Value);
@numbers_100 =
SELECT (a.Value*10 + b.Value) AS Value
FROM @numbers_10 AS a CROSS JOIN @numbers_10 AS b;
@numbers_10000 =
SELECT (a.Value*100 + b.Value) AS Value
FROM @numbers_100 AS a CROSS JOIN @numbers_100 AS b;
@result =
SELECT
Value,
@StartDate.AddDays( Value ) AS Date
FROM @numbers_10000;
OUTPUT @result TO "/res.csv" USING Outputters.Csv(outputHeader:true);
Ultimately, it may be more convenient to store these ranges in a U-SQL Table rather than regenerating them every time in a script. This is easy with CTAS. The snippet below shows how to create a table using CREATE TABLE from a RowSet that contains these numbers. Notice that in this case CREATE TABLE does not require the schema to be specified. The scheme is inferred from the SELECT clause.
CREATE DATABASE IF NOT EXISTS MyDB;
DROP TABLE IF EXISTS MyDB.dbo.Numbers_10000;
CREATE TABLE MyDB.dbo.Numbers_10000
(
INDEX idx
CLUSTERED(Value ASC)
DISTRIBUTED BY RANGE(Value)
) AS SELECT * FROM @numbers_10000
Now retrieving any desired range is simple by using SELECT on the table followed with a WHERE clause:
// get the range of numbers from 1 to 87
@a =
SELECT Value
FROM MyDB.dbo.Numbers_10000
WHERE Value >=1 AND Value <= 87;