SQL: IntArray – pole hodnot typu int jako UDT + aggregate (CLR)

Na SQL serveru do verze 2000 mě štvalo, že nemá žádnou rozumnou práci s poli. Jakmile tedy vyšel SQL2005 s možností použití .NET Frameworku (CLR), napsal jsem si hned vlastní typ IntArray. Nyní, po půl roce jeho používání, mohu říct, že se výborně osvědčil.

Typ IntArray samozřejmě není primárně určen pro použití v tabulkách, kde by sloužil jako typ sloupce, tím bychom degradovali relační schéma. Co nám tedy takové pole prvků typu int usnadní?

  • můžeme snadno psát parametrické dotazy s podmínkou WHERE CiselnikID IN [1, 2, 3, …], typické pro vyhledávání – máme-li například tabulku osob, číselník jejich pracovních pozic a chceme-li vypsat několik pozic najednou – například všechny účetní, personalisty a uklízečky k tomu,
  • můžeme snadno „jedním vrzem“ (do jedné řádky) načítat objekty i s položkami – pokud používáme v Business Layer klasický přístup s ghost objekty inicializovanými svým ID a s lazyloadem dalších hodnot, pak můžeme krásně načítat nadřazený objekt i s jeho členskými kolekcemi. Například v objednavka.Load() načteme data objednávky a zároveň pole IDček všech řádek objednávky, kterým hned vytvoříme ghost objekty,
  • stejně jako načítat, můžeme snadno ukládat vybrané kolekce objektu, například můžeme snadno uložit jedním vrzem uživatele i se seznamem jeho rolí.

SqlInt32Array

První, co tedy potřebujeme, je samotný CLR user-defined type:

using System;
using System.Data;
using Microsoft.SqlServer.Server;
using System.Data.SqlTypes;
using System.Text;
using System.Collections;
using System.Collections.Generic;
  
namespace Havit.Data.SqlTypes
{
 [Serializable]
 [SqlUserDefinedType(
  Format.UserDefined,
  Name = "IntArray",
  IsByteOrdered = true,
  MaxByteSize = 8000)]
 public class SqlInt32Array : INullable, IBinarySerialize
 {
  #region private value holder
  private List<SqlInt32> values = null;
  #endregion 
  



  #region Constructors
  /// <summary>
  /// Vytvoří instanci s hodnotou NULL.
  /// </summary>
  public SqlInt32Array()
  {
   this.values = null;
  }
  

  /// <summary>
  /// Vytvoří instanci a naplní ji předanými hodnotami.
  /// </summary>
  /// <param name="values">hodnoty, které mají instance reprezentovat</param>
  public SqlInt32Array(int[] values)
  {
   if ((values == null) || (values.Length == 0))
   {
    this.values = null;
    return;
   }
   if (values.Length > 1999)
   {
    throw new ArgumentException(String.Format("Maximální velikost pole je 1999 hodnot, požadováno je však {0} hodnot.",
     values.Length));
   }

   this.values = new List<SqlInt32>();
   for (int i = 0; i < values.Length; i++)
   {
    this.values.Add(new SqlInt32(values[i]));
   }
  }
  #endregion
   

  #region Add
  /// <summary>
  /// Přidá prvek do pole.
  /// </summary>
  /// <param name="value"></param>
  public void Add(SqlInt32 value)
  {
   if (!value.IsNull && (value.Value == Int32.MinValue))
   {
    throw new ArgumentException("Prvek nesmí mít vyhrazenou hodnotu Int32.MinValue.");
   }

   if (this.values == null)
   {
    this.values = new List<SqlInt32>();
   }
   values.Add(value);
  }
  #endregion

  #region Count
  /// <summary>
  /// Počet prvků v seznamu.
  /// </summary>
  public int Count
  {
   get { return values.Count; }
  }
  #endregion
   

  #region Indexer
  /// <summary>
  /// Indexer pro přístup k prvkům podle jejich pořadí.
  /// </summary>
  /// <param name="index">index (pořadí) prvku</param>
  /// <returns>hodnota <see cref="SqlInt32"/></returns>
  public SqlInt32 this[int index]
  {
   get { return values[index]; }
  }
  #endregion
   

  #region Merge
  /// <summary>
  /// Spojí dvě pole v jedno.
  /// </summary>
  /// <param name="array">přidávané pole</param>
  public void Merge(SqlInt32Array array)
  {
   if (!array.IsNull)
   {
    for (int i = 0; i < array.values.Count; i++)
    {
     this.values.Add(array.values[i]);
    }
   }
  }
  #endregion
   

  #region Accessors
  /// <summary>
  /// Vrátí pole SqlInt32[] s hodnotami.
  /// </summary>
  /// <returns>Pole SqlInt32[] s hodnotami.</returns>
  public SqlInt32[] GetSqlInt32Array()
  {
   if (this.IsNull)
   {
      return null;
   }
   return (SqlInt32[])values.ToArray();
  }

  /// <summary>
  /// Vrací tabulku Int32 hodnot.
  /// Metoda určená pro mapování do T-SQL na table-valued function (TVF).
  /// </summary>
  /// <param name="values">Proměnná, která má být rozbalena do tabulky hodnot Int32.</param>
  /// <returns>tabulka Int32 hodnot (pomocí FillInt32Row)</returns>
  [SqlFunctionAttribute(
   Name= "IntArrayToTable",
   TableDefinition = "[Value] int",
   FillRowMethodName = "FillSqlInt32Row")]
  public static IEnumerable GetSqlInt32Values(SqlInt32Array values)
  {
   return values.GetSqlInt32Array();
  }

  /// <summary>
  /// Metoda zajišťující převod řádku v table-valued function (TVF).
  /// </summary>
  /// <param name="int32ArrayElement">vstupní hodnota řádku</param>
  /// <param name="value">výstupní hodnota řádku</param>
  public static void FillSqlInt32Row(object sqlInt32ArrayElement, out SqlInt32 value)
  {
   value = (SqlInt32)sqlInt32ArrayElement;
  }
  #endregion
   

  #region Parse
  /// <summary>
  /// Vytvoří z CSV textové reprezentace hodnotu pole.
  /// </summary>
  /// <param name="text">CSV text hodnot</param>
  /// <returns>pole s hodnotami dle CSV</returns>
  [SqlMethod(DataAccess = DataAccessKind.None, SystemDataAccess = SystemDataAccessKind.None)]
  public static SqlInt32Array Parse(SqlString text)
  {
   if (text.IsNull)
   {
    return Null;
   }
   string[] parts = text.Value.Split(',');
   int length = parts.Length;
   SqlInt32Array result = new SqlInt32Array();
   for (int i = 0; i < length; i++)
   {
    if (String.Compare(parts[i].Trim(), "NULL", true) == 0)
    {
     result.Add(SqlInt32.Null);
    }
    else
    {
     result.Add(new SqlInt32(Convert.ToInt32(parts[i])));
    }
   }
   return result;
  }
  #endregion
   

  #region ToString
  /// <summary>
  /// Převede hodnotu na CSV textovou reprezentaci string
  /// </summary>
  /// <returns>CSV seznam hodnot</returns>
  public override string ToString()
  {
   if (this.IsNull)
   {
    return null;
   }

   StringBuilder sb = new StringBuilder();
   for (int i = 0; i < this.values.Count; i++)
   {
    if (this.values[i].IsNull)
    {
     sb.Append("NULL");
    }
    else
    {
     sb.Append(this.values[i].Value);
    }
    if (i < this.values.Count - 1)
    {
     sb.Append(",");
    }
   }
   return sb.ToString();
  }
  #endregion
   

  #region Null (static)
  /// <summary>
  /// Hodnota NULL.
  /// </summary>
  public static SqlInt32Array Null
  {
   get
   {
    return new SqlInt32Array();
   }
  }
  #endregion
   

  #region INullable Members
  /// <summary>
  /// Indikuje, zda-li je hodnota NULL.
  /// </summary>
  public bool IsNull
  {
   get
   {
    return ((this.values == null) || (this.values.Count == 0));
   }
  }
  #endregion
   

  #region IBinarySerialize Members
  /// <summary>
  /// Načte hodnotu z binární reprezentace.
  /// </summary>
  /// <remarks>
  /// Binární serializace je takováto:
  /// byte 1-4 ~ Int32 Length (velikost pole, pokud je 0, pak je hodnota NULL)
  /// byte 5-(8000) ~ values (NULL hodnoty reprezentuje Int32.MinValue)
  /// </remarks>
  /// <param name="r"><see cref="System.IO.BinaryReader"/> s binární reprezentací hodnoty</param>
  public void Read(System.IO.BinaryReader r)
  {
   // byte 1 - počet hodnot
   Int32 length = r.ReadInt32();
   if (length == 0)
   {
    // NULL
    this.values = null;
   }
   else
   {
    // hodnoty
    this.values = new List<SqlInt32>();
    for (int i = 0; i < length; i++)
    {
     Int32 temp = r.ReadInt32();
     if (temp == Int32.MinValue)
     {
      this.values.Add(SqlInt32.Null);
     }
     else
     {
      this.values.Add(new SqlInt32(temp));
     }
    }
   }
  }
  

  /// <summary>
  /// Vytvoří binární reprezentaci hodnoty.
  /// </summary>
  /// <remarks>
  /// Binární serializace je takováto:
  /// byte 1-4 ~ Int32 Length (velikost pole, pokud je 0, pak je hodnota NULL)
  /// byte 5-(8000) ~ values (NULL hodnoty implementuje Int32.MinValue)
  /// </remarks>
  /// <param name="w"><see cref="System.IO.BinaryWriter"/> do kterého má být binární reprezentace zapsána</param>
  public void Write(System.IO.BinaryWriter w)
  {
   // byte 1 - počet hodnot
   if (this.IsNull)
   {
    w.Write(0);
   }
   else
   {
    w.Write(this.values.Count); 
    // hodnoty
    for (int i = 0; i < this.values.Count; i++)
    {
     if (this.values[i].IsNull)
     {
      w.Write(Int32.MinValue);
     }
     else
     {
      w.Write(this.values[i].Value);
     }
    }
   }
  }
  #endregion
 }
}

Jak jste si jistě všimli, mimo typu IntArray zavádíme hned i UDF funkci IntArrayToTable, kterou bude v T-SQL pole převádět na tabulku.

Náš UDT můžeme do SQL serveru zavést buď přímo z Visual Studia pomocí Deploy, pokud máme jako typ projektu SQL Server Project, nebo ručně pomocí T-SQL:

CREATE ASSEMBLY [Havit.Data.SqlServer]
FROM 'C:\Havit.Data.SqlServer.dll'
  
CREATE TYPE [dbo].IntArray
EXTERNAL NAME [Havit.Data.SqlServer].[Havit.Data.SqlTypes.SqlInt32Array]
  
CREATE FUNCTION IntArrayToTable
(
    @array dbo.IntArray
)
RETURNS TABLE
(
     [Value] int
)
AS EXTERNAL NAME [Havit.Data.SqlServer].[Havit.Data.SqlTypes.SqlInt32Array].[GetInt32Values]

Příklad vyhledávání:

CREATE PROCEDURE Filter
(
     @Vlastnosti dbo.IntArray = NULL
)
AS
    SELECT col FROM tab
        WHERE ((@Vlastnosti IS NULL) OR (VlastnostID IN (SELECT Value FROM dbo.IntArrayToTable(@Vlastnosti))))

Příklad využití pole IDček pro ukládání:

 

CREATE PROCEDURE dbo.Uzivatel_Update 
(
 @UzivatelID int,
 @Username varchar(30),
 @Password nvarchar(30),
 @DisplayAs nvarchar(50),
 @Email nvarchar(80),
 @Deleted bit = 0,
 @Role IntArray = NULL
)
AS
 SET NOCOUNT ON
   
 SET XACT_ABORT ON
 BEGIN TRANSACTION
   
  -- Update tabulky uživatelů
  UPDATE dbo.Uzivatel
   SET
    Username = @Username,
    Password = @Password,
    DisplayAs = @DisplayAs,
    Email = @Email,
    Deleted = @Deleted
   WHERE
    (UzivatelID = @UzivatelID)
      
  -- Update rolí
  DELETE FROM dbo.Uzivatel_Role
   WHERE
    (UzivatelID = @UzivatelID)
    
  IF (@Role IS NOT NULL)
   INSERT INTO dbo.Uzivatel_Role(UzivatelID, RoleID)
    SELECT @UzivatelID AS UzivatelID, Value AS RoleID FROM dbo.IntArrayToTable(@Role)
   
 COMMIT TRANSACTION
   
 RETURN

Použití UDT z aplikace pak vypadá nějak takto:

SqlParameter paramRole = new SqlParameter("@Role", SqlDbType.Udt);
   paramRole.UdtTypeName = "IntArray";
   paramRole.Value = new SqlInt32Array(this.Role.GetIDs());
   cmd.Parameters.Add(paramRole);

SqlInt32ArrayAggregate

Na to, abychom z tabulky se sloupcem int hodnot dostali pole, potřebujeme agregát. Tedy obdobu SUM, AVG, MAX, MIN, prostě něco, co udělá ze všech hodnot jednu.

using System;
using System.Data;
using System.Data.SqlClient;
using System.Data.SqlTypes;
using Microsoft.SqlServer.Server;
using System.IO;
  
namespace Havit.Data.SqlTypes
{
 /// <summary>
 /// Aggregate k UDT SqlInt32Array, který zajišťuje převod tabulky hodnot na pole.
 /// </summary>
 [Serializable]
 [Microsoft.SqlServer.Server.SqlUserDefinedAggregate(
  Format.UserDefined,
  IsInvariantToDuplicates = false, IsInvariantToNulls = false, IsInvariantToOrder = true, IsNullIfEmpty = true,
  MaxByteSize = 8000, Name = "IntArrayAggregate")]
 public class SqlInt32ArrayAggregate : IBinarySerialize
 {
  #region private value holder
  /// <summary>
  /// Uchovává mezivýsledek.
  /// </summary>
  private SqlInt32Array array;
  #endregion
  
  #region Init
  /// <summary>
  /// Inicializace agregátoru.
  /// </summary>
  public void Init()
  {
   array = new SqlInt32Array();
  }
  #endregion
  
  #region Accumulate
  /// <summary>
  /// Přidá další hodnotu do agregace.
  /// </summary>
  /// <param name="value">přidávaná hodnota</param>
  public void Accumulate(SqlInt32 value)
  {
   array.Add(value);
  }
  #endregion
  
  #region Merge
  /// <summary>
  /// Spojí dva agregáty v jeden
  /// </summary>
  /// <param name="group">druhá agregace</param>
  public void Merge(SqlInt32ArrayAggregate group)
  {
   group.array.Merge(group.array);
  }
  #endregion
  
  #region Terminate
  /// <summary>
  /// Vrátí výsledek agregace.
  /// </summary>
  public SqlInt32Array Terminate()
  {
   return this.array;
  }
  #endregion
  
  #region IBinarySerialize Members
  /// <summary>
  /// De-serializuje agregaci.
  /// </summary>
  /// <param name="r">BinaryReader</param>
  public void Read(BinaryReader r)
  {
   this.array = new SqlInt32Array();
   this.array.Read(r);
  }
  
  /// <summary>
  /// Serializuje agregaci.
  /// </summary>
  /// <param name="w">BinaryWriter</param>
  public void Write(BinaryWriter w)
  {
   this.array.Write(w);
  }
  #endregion
 }
}

A příklad použití takového agregátu:

CREATE PROCEDURE dbo.Uzivatel_Load 
(
 @UzivatelID int
)
AS
 SET NOCOUNT ON
   
 SELECT
   Uzivatel.*,
   (SELECT dbo.IntArrayAggregate(RoleID) FROM dbo.Uzivatel_Role WHERE UzivatelID = @UzivatelID) AS Role
  FROM dbo.Uzivatel
  WHERE
   (UzivatelID = @UzivatelID)
   
 RETURN

… a jeho využití v aplikaci:

SqlInt32Array roleArray = (SqlInt32Array)reader["Role"];

…určitě se toho dá mnohé vylepšovat, uvítám samozřejmě jakékoliv komentáře.

Update 09/2013

CLR datový typ IntArray a související IntArrayAggregate jsme s drobnými doladěními používali jako klíčovou součást datové vrstvy na 99% našich aplikací od roku 2006 až do roku 2012. Nejednalo se tedy o žádný experimentální pokus, ale produkční prvek, který nám za tu dobu udělal velmi dobrou službu. Jedinou nevýhodou bylo občasné dohadování ohledně potřeby povolit CLR na SQL serverech zákazníků/hostingu.

Na konci roku 2012 jsme se rozhodli, že pro MS SQL 2008 a novější přejdeme na Table Value Parameters pro „směr tam“ a SELECT FOR XML pro „směr zpět“ (místo SqlIntArrayAggregate). Motivací bylo získání vyššího výkonu (byť dosavadní řešení s CLR poskytovalo výkon přinejmenší dostatečný) a zjednodušení situace.

IntArray + IntArrayAggregate jsme si ponechali jen pro pár zbývajících SQL 2005 serverů.

Napsat komentář

Vyplňte detaily níže nebo klikněte na ikonu pro přihlášení:

WordPress.com Logo

Komentujete pomocí vašeho WordPress.com účtu. Log Out / Změnit )

Twitter picture

Komentujete pomocí vašeho Twitter účtu. Log Out / Změnit )

Facebook photo

Komentujete pomocí vašeho Facebook účtu. Log Out / Změnit )

Google+ photo

Komentujete pomocí vašeho Google+ účtu. Log Out / Změnit )

Připojování k %s