When prototyping SisoDb I used datatables under the covers when consuming the SqlBulkCopy class to insert data. This lead to that I had the source entities in memory as well as the datatables. Since the SqlBulkCopy class can work with readers I created a very simple datareader implementation over my entities instead. I gained a little performance boost and got rid of unnecessary creation of in-memory datatables. At first the API of a datareader seems kind of tedious to implement, but in the context of using it in conjunction with the SqlBulkCopy-class, there really isn’t that many methods that need to be implemented:
- public override int GetOrdinal(string name)
- public override IEnumerator GetEnumerator()
- public override bool Read()
- public override void Close()
- public override bool HasRows
- public override int FieldCount
- public override int RecordsAffected
- public override bool IsClosed
My model contains two entities but three sets: Structure, StructureIndex; sets: Structures, Indexes, Uniques. The root is the Structure which then contains the other sets. For me the simplest solution was to create one reader for each entity. Each custom reader extended a custom base-class: SingleResultReaderBase<T>.
To keep track of tablename, columnnames, ordinals etc. I put together a verry simple StorageSchema base class and then one specific implementation for each entity (Structure, StructureIndex).
That’s pretty much it. I will show you the code for how I handled the root entity. The complete code for the other entities can be found in the trunk of http://sisodb.codeplex.com
SingleResultReaderBase<T>
internal abstract class SingleResultReaderBase<T> : DbDataReader where T : class
{
protected internal StorageSchemaBase StorageSchema { get; private set; }
protected IEnumerable<T> Items { get; private set; }
protected IEnumerator<T> Enumerator { get; private set; }
protected SingleResultReaderBase(StorageSchemaBase storageSchema, IEnumerable<T> items)
{
StorageSchema = storageSchema;
Items = items;
Enumerator = Items.GetEnumerator();
}
public override bool IsClosed
{
get { return Enumerator == null; }
}
public override int RecordsAffected
{
get { return Items.Count(); }
}
public override int FieldCount
{
get { return StorageSchema.FieldCount(); }
}
public override bool HasRows
{
get { return Items != null && Items.Count() > 0; }
}
public override void Close()
{
Enumerator = null;
Items = null;
}
public override bool Read()
{
return Enumerator.MoveNext();
}
public override IEnumerator GetEnumerator()
{
return Enumerator;
}
public override int GetOrdinal(string name)
{
return StorageSchema.FieldsByName[name].Index;
}
public override object this[int ordinal]
{
get { throw new NotSupportedException(); }
}
public override object this[string name]
{
get { throw new NotSupportedException(); }
}
public override int Depth
{
get { throw new NotSupportedException(); }
}
public override string GetName(int ordinal)
{
throw new NotSupportedException();
}
public override DataTable GetSchemaTable()
{
throw new NotSupportedException();
}
public override bool NextResult()
{
throw new NotSupportedException();
}
public override bool GetBoolean(int ordinal)
{
throw new NotSupportedException();
}
public override byte GetByte(int ordinal)
{
throw new NotSupportedException();
}
public override long GetBytes(int ordinal, long dataOffset, byte[] buffer, int bufferOffset, int length)
{
throw new NotSupportedException();
}
public override char GetChar(int ordinal)
{
throw new NotSupportedException();
}
public override long GetChars(int ordinal, long dataOffset, char[] buffer, int bufferOffset, int length)
{
throw new NotSupportedException();
}
public override Guid GetGuid(int ordinal)
{
throw new NotSupportedException();
}
public override short GetInt16(int ordinal)
{
throw new NotSupportedException();
}
public override int GetInt32(int ordinal)
{
throw new NotSupportedException();
}
public override long GetInt64(int ordinal)
{
throw new NotSupportedException();
}
public override DateTime GetDateTime(int ordinal)
{
throw new NotSupportedException();
}
public override string GetString(int ordinal)
{
throw new NotSupportedException();
}
public override int GetValues(object[] values)
{
throw new NotSupportedException();
}
public override bool IsDBNull(int ordinal)
{
throw new NotSupportedException();
}
public override decimal GetDecimal(int ordinal)
{
throw new NotSupportedException();
}
public override double GetDouble(int ordinal)
{
throw new NotSupportedException();
}
public override float GetFloat(int ordinal)
{
throw new NotSupportedException();
}
public override string GetDataTypeName(int ordinal)
{
throw new NotSupportedException();
}
public override Type GetFieldType(int ordinal)
{
throw new NotSupportedException();
}
}
StructuresReader
internal class StructuresReader : SingleResultReaderBase<IStructure>
{
internal StructuresReader(StructureStorageSchema storageSchema, IEnumerable<IStructure> items)
: base(storageSchema, items)
{
}
public override object GetValue(int ordinal)
{
var schemaField = StorageSchema.FieldsByIndex[ordinal];
if(schemaField.Name == StructureStorageSchema.Fields.Id.Name)
{
if (Enumerator.Current.Id.IdType == IdTypes.Identity)
return int.Parse(Enumerator.Current.Id.Value);
if (Enumerator.Current.Id.IdType == IdTypes.Guid)
return Guid.Parse(Enumerator.Current.Id.Value);
throw new NotSupportedException();
}
if(schemaField.Name == StructureStorageSchema.Fields.Json.Name)
return Enumerator.Current.Json;
throw new NotSupportedException();
}
}
StorageSchema
[Serializable]
internal abstract class StorageSchemaBase
{
internal readonly Dictionary<int, SchemaField> FieldsByIndex;
internal readonly Dictionary<string, SchemaField> FieldsByName;
internal string Name { get; private set; }
protected StorageSchemaBase(string name)
{
FieldsByIndex = new Dictionary<int, SchemaField>();
FieldsByName = new Dictionary<string, SchemaField>();
Name = name;
InitializeFields();
}
protected abstract void InitializeFields();
public int FieldCount()
{
return FieldsByIndex.Count;
}
}
[Serializable]
internal class StructureStorageSchema
: StorageSchemaBase
{
internal static class Fields
{
internal static readonly SchemaField Id = new SchemaField(0, "Id");
internal static readonly SchemaField Json = new SchemaField(1, "Json");
internal static SchemaField[] GetOrderedFields()
{
return new[] { Id, Json };
}
}
internal StructureStorageSchema(IStructureSchema structureSchema)
: base(structureSchema.GetStructureTableName())
{
}
protected override void InitializeFields()
{
foreach (var field in Fields.GetOrderedFields())
{
FieldsByIndex.Add(field.Index, field);
FieldsByName.Add(field.Name, field);
}
}
}
Then to consume the reader you would have code looking something like this:
using(var structuresReader = new StructuresReader(structureStorageSchema, structures))
{
using (var structuresBulkInserter = new SqlBulkCopy(_connection,
SqlBulkCopyOptions.Default | SqlBulkCopyOptions.KeepIdentity, _transatcion))
{
structuresBulkInserter.BatchSize = structuresReader.RecordsAffected;
structuresBulkInserter.DestinationTableName = structuresReader.StorageSchema.Name;
structuresBulkInserter.NotifyAfter = 0;
foreach (var field in structuresReader.StorageSchema.FieldsByIndex.Values)
structuresBulkInserter.ColumnMappings.Add(field.Name, field.Name);
structuresBulkInserter.WriteToServer(structuresReader);
structuresBulkInserter.Close();
}
}
Pingback: DotNetShoutout