Part 9: Extending U-SQL

There are 5 kinds of User-Defined entities in U-SQL

  • User-Defined Functions (UDFs)
  • User-Defined Types (UDTs)
  • User-Defined Aggregators (UDAggs)
  • User-defined Operators (UDOs)
  • User-Defined Appliers

All of them are defined by .NET code. C# is not required. Any .NET language will work.

User-Defined Functions

User defined functions are normal static methods on a .NET Class. Below is the code for a simple class.

namespace OrdersLib
{
  public static class Helpers
  {
    public static string Normalize(string s)
    { return s.ToLower(); }
  }
}

Assuming you registered the assembly that contains this class using the name “MyDB.OrdersLibAsm”, you can use the assembly and the UDF as shown below.

REFERENCE ASSEMBLY MyDB.OrdersLibAsm;

@customers =
  SELECT * FROM
    (VALUES
      ("Contoso", 123 ),
      ("Woodgrove", 456 )
    ) AS D( Customer, Id );

@customers =
  SELECT
    OrdersLib.Helpers.Normalize(Customer) AS Customer,
    Id
  FROM @customers;

User-Defined Aggregators (UDAggs)

UDAggs allow you to define your own aggregate functions for U-SQL for use with GROUP BY.

Creating an aggregator

Here is a simple example of a UDAgg that manually implements a custom version of Sum

using Microsoft.Analytics.Interfaces;

namespace MVA_UDAgg
{
    public class MySum : IAggregate<int, long>
    {
        long total;
        public override void Init() { total = 0; }
        public override void Accumulate(int value) { total += value; }
        public override long Terminate() { return total; }
    }
}

Using the UDAgg

In the SELECT clause use the AGG<T> operator where T is the name of your UDAgg;

AGG<UDAgg>(InputCol) AS OutputCOl

Here’s a full script:

REFERENCE ASSEMBLY AssemblyContainingUDagg;

@t = 
  SELECT * FROM 
    (VALUES
      ("2016/03/31","1:00","mrys","@saveenr great demo yesterday", 7 ),
      ("2016/03/31","7:00","saveenr","@mrys Thanks! U-SQL RuL3Z!", 4 )
    ) AS D( date, time, author, tweet , retweets);

@results = 
    SELECT
        AGG<MVA_UDAgg.MySum>(retweets) AS totalretweets
    FROM @t
    GROUP BY date;

OUTPUT @results
    TO "/output.csv"
    USING Outputters.Csv();

Recursive Aggregators

If the operation in your is associative (https://en.wikipedia.org/wiki/Associative\_property\) then you should mark your aggregator with an attribute to indicate that the UDAgg is “recursive” (a.k.a “associative” ). This will improve the performance substantially of your U-SQL script has to aggregate a lot of data with your UDAgg because the UDAgg can be parallelized.

The snippet below shows how to do this.

[SqlUserDefinedReducer(IsRecursive = true)]
public class MySum : IAggregate<int, long>
{
// your code here
}

Do not just blindly add the IsRecursive property to your UDAggs. Make sure the UDAggs support the associative property.

DEPLOY RESOURCE

Scenario

Suppose you are using a .NET library – for example one that maps IP addresses to locations. THen suppose that this .NET library loads its IP database from a file on disk – maybe a file called ip.data.

Now if we want to use this library in U-SQL we have to do two things: (1) make sure the .NET library is available to the script via a U-SQL assembly and (2) make sure the data file is available to the script with the DEPLOY RESOURCE statement.

A simple Hello World example

The simple example below shows how it would be done with a C# expression directly in the script.

DEPLOY RESOURCE "/helloworld.txt";

@departments =
  SELECT * 
  FROM (VALUES
      (31, "Sales"),
      (33, "Engineering"),
      (34, "Clerical"),
      (35, "Marketing")
    ) AS D( DepID, DepName );

@departments =
    SELECT DepID, DepName, System.[IO].File.ReadAllText("helloworld.txt") AS Message
    FROM @departments;

OUTPUT @departments 
    TO "/departments.tsv"
    USING Outputters.Tsv();

Using a processor

DEPLOY RESOURCE "/helloworld.txt";

@departments =
  SELECT * 
  FROM (VALUES
      (31, "Sales"),
      (33, "Engineering"),
      (34, "Clerical"),
      (35, "Marketing")
    ) AS D( DepID, DepName );


@departments =
     PROCESS @departments
     PRODUCE DepID int,
             DepName string,
             HelloWorld string
     USING new Demo.HelloWorldProcessor();

OUTPUT @departments 
    TO "/departments.tsv"
    USING Outputters.Tsv();
using Microsoft.Analytics.Interfaces;
using Microsoft.Analytics.Types.Sql;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;

namespace Demo
{
    [SqlUserDefinedProcessor]
    public class HelloWorldProcessor : IProcessor
    {
        private string hw;

        public HelloWorldProcessor()
        {
            this.hw = System.IO.File.ReadAllText("helloworld.txt");
        }

        public override IRow Process(IRow input, IUpdatableRow output)
        {
            output.Set<int>("DepID", input.Get<int>("DepID"));
            output.Set<string>("DepName", input.Get<string>("DepName"));
            output.Set<string>("HelloWorld", hw);
            return output.AsReadOnly();
        }
    }
}

User-Defined Types

Concepts

In order to create a UDT in U-SQL there are two thing that have to be created:

  • The UDT type itself
  • A formatter that can conver the UDT into a string and can transform a string into an instance of the UDT

Scenario

We will define a UDT – and its corresponding formatter – for a “Bits” UDT. This UDFT is a simple bitarray with a textual representation that looks like “1100101010”.

UDT and formatter skeletons

First let’s look at the fundamental structure of the Bits type.

[SqlUserDefinedType(typeof(BitFormatter))]
public struct Bits
{
}

Notice that the only thing the Bits UDT requires is that it identifies its corresponding formatter.

And here is the skeleton of its corresponding formatter.

public class BitFormatter : Microsoft.Analytics.Interfaces.IFormatter<Bits>
{
    public BitFormatter()
    { ... }

    public void Serialize(
        Bits instance,
        IColumnWriter writer,
        ISerializationContext context)
    { ... }

    public Bits Deserialize(
         IColumnReader reader,
         ISerializationContext context)
    { ... }
}

Full code for the UDT


namespace MyUDTExamples
{
    [SqlUserDefinedType(typeof(BitFormatter))]
    public struct Bits
    {
        System.Collections.BitArray bitarray;

        public Bits(string s)
        {
            this.bitarray = new System.Collections.BitArray(s.Length);
            for (int i = 0; i<s.Length; i++)
            {
                this.bitarray[i] = (s[s.Length-i-1] == '1' ? true : false);
            }
        }

        public int ToInteger()
        {
            int value = 0;
            for (int i = 0; i < this.bitarray.Length; i++)
                { if (bitarray[i]) { value += (int)System.Math.Pow(2, i); } }
            return value;
        }

        public override string ToString()
        {
            var sb = new System.Text.StringBuilder(this.bitarray.Length);
            for (int i = 0; i < this.bitarray.Length; i++)
            { sb.Append(this.bitarray[i] ? "1" : "0"); }
            return sb.ToString();
        }
    }

}

Full code for the UDT’s formatter

namespace MyUDTExamples
{

    public class BitFormatter : Microsoft.Analytics.Interfaces.IFormatter<Bits>
    {
        public BitFormatter()
        {
        }

        public void Serialize(
            Bits instance,
            IColumnWriter writer,
            ISerializationContext context)
        {
            using (var w = new System.IO.StreamWriter(writer.BaseStream))
            {
                var bitstring = instance.ToString();
                w.Write(bitstring);
                w.Flush();
            }
        }

        public Bits Deserialize(
            IColumnReader reader,
            ISerializationContext context)
        {
            using (var w = new System.IO.StreamReader(reader.BaseStream))
            {
                string bitstring = w.ReadToEnd();
                var bits = new Bits(bitstring);
                return bits;
            }
        }
    }

}

Using the UDT

@products  = 
    SELECT * FROM 
        (VALUES
            ("Apple", "0000"),
            ("Cherry", "0001"),
            ("Banana", "1001"),
            ("Orange", "0110")
        ) AS 
              D( bitstring );

@products = 
    SELECT 
       ProductCode
       BitString, 
       new MyUDTExamples.Bits(BitString) AS Bits
    FROM @products;

Persisting UDTs

Persisting UDTs into a file

UDTs cannot be persistent directly into a file using a default U-SQL outputter. You’ll have to manually convert your UDT’s value to a U-SQL-supported datataype.

@products2 = 
    SELECT 
        ProductCode
        BitString, 
        Bits.ToInteger() AS BitInt
    FROM @products;

Now it can be persisted into a text file:

OUTPUT @products2
    TO "/output.csv"
    USING Outputters.Csv();

Persisting UDTs into a U-SQL table

UDTs cannot be persistent directly into a U-SQL table. You’ll have to manually convert your UDT’s value to a U-SQL-supported datataype.

CREATE TABLE MyDB.dbo.MyTable
( 
    INDEX idx  
    CLUSTERED(ProductCode ASC)
    DISTRIBUTED BY HASH(ProductCode) 
) AS SELECT * FROM @products2;

Tips

Generating ranges of numbers and dates

Many common scenarios for U-SQL developers require constructing a RowSet made up of a simple range of numbers or dates, for example the integers from 1 to 10. In this blog post we’ll take a look at options for doing this in U-SQL. In the process, we’ll get a chance to learn how to use some common U-SQL features:

  • Creating RowSets from constant values
  • Using CROSS JOIN
  • Using SELECT to map integers to DateTimes
  • Using CREATE TABLE to create a table directly from a RowSet. This is sometimes called “CREATE TABLE AS SELECT” and often abbreviated as “CTAS”.

First, we’ll begin by using the VALUES statement to create a simple RowSet of integers from 0 to

@numbers_10 = 
    SELECT*
    FROM (VALUES
          (0),
          (1),
          (2),
          (3),
          (4),
          (5),
          (6),
          (7),
          (8),
          (9)
    ) AS T(Value);

This technique is simple. However, it’s disadvantages are: (1) the script will need to manually list all the numbers (2) there is an upper limit on the number of items allowed in VALUES in a U-SQL script. Currently that limit is 10,000 items.

The CROSS JOIN statement helps us generate larger lists of numbers easily. In the following snippet, CROSS JOIN is used to generate 100 integers from 0 to 99.

@numbers_100 = 
    SELECT (a.Value*10 + b.Value) AS Value
    FROM @numbers_10 AS a 
        CROSS JOIN @numbers_10 AS b;

We can apply CROSS JOIN again to increase this to generate 10,000 integers from 0 to 9,999 as shown in the following sample:

@numbers_10000 = 
    SELECT (a.Value*100 + b.Value) AS Value
    FROM @numbers_100 AS a CROSS JOIN @numbers_100 AS b;

Finally, if you want a range of dates, use SELECT to map each integer to a DateTime value as shown below.

DECLARE @StartDate = DateTime.Parse("1979-03-31");

@numbers_10000 = ...;

@result = 
    SELECT 
        Value,
        @StartDate.AddDays( Value ) AS Date
    FROM @numbers_10000;

Putting it all together, the full script looks like this:

DECLARE @StartDate = DateTime.Parse("1979-03-31");

@numbers_10 = 
    SELECT
        *
    FROM 
    (VALUES
        (0),
        (1),
        (2),
        (3),
        (4),
        (5),
        (6),
        (7),
        (8),
        (9)
    ) AS T(Value);

@numbers_100 = 
    SELECT (a.Value*10 + b.Value) AS Value
    FROM @numbers_10 AS a CROSS JOIN @numbers_10 AS b;

@numbers_10000 = 
    SELECT (a.Value*100 + b.Value) AS Value
    FROM @numbers_100 AS a CROSS JOIN @numbers_100 AS b;

@result = 
    SELECT 
        Value,
        @StartDate.AddDays( Value ) AS Date
    FROM @numbers_10000;

OUTPUT @result TO "/res.csv" USING Outputters.Csv(outputHeader:true);

Ultimately, it may be more convenient to store these ranges in a U-SQL Table rather than regenerating them every time in a script. This is easy with CTAS. The snippet below shows how to create a table using CREATE TABLE from a RowSet that contains these numbers. Notice that in this case CREATE TABLE does not require the schema to be specified. The scheme is inferred from the SELECT clause.

CREATE DATABASE IF NOT EXISTS MyDB;
DROP TABLE IF EXISTS MyDB.dbo.Numbers_10000;

CREATE TABLE MyDB.dbo.Numbers_10000
( 
 INDEX idx 
 CLUSTERED(Value ASC)
 DISTRIBUTED BY RANGE(Value) 
) AS SELECT * FROM @numbers_10000

Now retrieving any desired range is simple by using SELECT on the table followed with a WHERE clause:

// get the range of numbers from 1 to 87
@a = 
    SELECT Value 
    FROM MyDB.dbo.Numbers_10000
    WHERE Value >=1 AND Value <= 87;