C# – Custom datareader for SqlBulkCopy

When prototyping SisoDb I used datatables under the covers when consuming the SqlBulkCopy class to insert data. This lead to that I had the source entities in memory as well as the datatables. Since the SqlBulkCopy class can work with readers I created a very simple datareader implementation over my entities instead. I gained a little performance boost and got rid of unnecessary creation of in-memory datatables. At first the API of a datareader seems kind of tedious to implement, but in the context of using it in conjunction with the SqlBulkCopy-class, there really isn’t that many methods that need to be implemented:

  • public override int GetOrdinal(string name)
  • public override IEnumerator GetEnumerator()
  • public override bool Read()
  • public override void Close()
  • public override bool HasRows
  • public override int FieldCount
  • public override int RecordsAffected
  • public override bool IsClosed

My model contains two entities but three sets: Structure, StructureIndex; sets: Structures, Indexes, Uniques. The root is the Structure which then contains the other sets. For me the simplest solution was to create one reader for each entity. Each custom reader extended a custom base-class: SingleResultReaderBase<T>.

To keep track of tablename, columnnames, ordinals etc. I put together a verry simple StorageSchema base class and then one specific implementation for each entity (Structure, StructureIndex).

That’s pretty much it. I will show you the code for how I handled the root entity. The complete code for the other entities can be found in the trunk of http://sisodb.codeplex.com

SingleResultReaderBase<T>

internal abstract class SingleResultReaderBase<T> : DbDataReader where T : class 
{
	protected internal StorageSchemaBase StorageSchema { get; private set; }

	protected IEnumerable<T> Items { get; private set; }

	protected IEnumerator<T> Enumerator { get; private set; }

	protected SingleResultReaderBase(StorageSchemaBase storageSchema, IEnumerable<T> items)
	{
		StorageSchema = storageSchema;
		Items = items;
		Enumerator = Items.GetEnumerator();
	}

	public override bool IsClosed
	{
		get { return Enumerator == null; }
	}

	public override int RecordsAffected
	{
		get { return Items.Count(); }
	}

	public override int FieldCount
	{
		get { return StorageSchema.FieldCount(); }
	}

	public override bool HasRows
	{
		get { return Items != null && Items.Count() > 0; }
	}

	public override void Close()
	{
		Enumerator = null;
		Items = null;
	}
	
	public override bool Read()
	{
		return Enumerator.MoveNext();
	}

	public override IEnumerator GetEnumerator()
	{
		return Enumerator;
	}

	public override int GetOrdinal(string name)
	{
		return StorageSchema.FieldsByName[name].Index;
	}

	public override object this[int ordinal]
	{
		get { throw new NotSupportedException(); }
	}

	public override object this[string name]
	{
		get { throw new NotSupportedException(); }
	}

	public override int Depth
	{
		get { throw new NotSupportedException(); }
	}

	public override string GetName(int ordinal)
	{
		throw new NotSupportedException();
	}

	public override DataTable GetSchemaTable()
	{
		throw new NotSupportedException();
	}

	public override bool NextResult()
	{
		throw new NotSupportedException();
	}

	public override bool GetBoolean(int ordinal)
	{
		throw new NotSupportedException();
	}

	public override byte GetByte(int ordinal)
	{
		throw new NotSupportedException();
	}

	public override long GetBytes(int ordinal, long dataOffset, byte[] buffer, int bufferOffset, int length)
	{
		throw new NotSupportedException();
	}

	public override char GetChar(int ordinal)
	{
		throw new NotSupportedException();
	}

	public override long GetChars(int ordinal, long dataOffset, char[] buffer, int bufferOffset, int length)
	{
		throw new NotSupportedException();
	}

	public override Guid GetGuid(int ordinal)
	{
		throw new NotSupportedException();
	}

	public override short GetInt16(int ordinal)
	{
		throw new NotSupportedException();
	}

	public override int GetInt32(int ordinal)
	{
		throw new NotSupportedException();
	}

	public override long GetInt64(int ordinal)
	{
		throw new NotSupportedException();
	}

	public override DateTime GetDateTime(int ordinal)
	{
		throw new NotSupportedException();
	}

	public override string GetString(int ordinal)
	{
		throw new NotSupportedException();
	}

	public override int GetValues(object[] values)
	{
		throw new NotSupportedException();
	}

	public override bool IsDBNull(int ordinal)
	{
		throw new NotSupportedException();
	}

	public override decimal GetDecimal(int ordinal)
	{
		throw new NotSupportedException();
	}

	public override double GetDouble(int ordinal)
	{
		throw new NotSupportedException();
	}

	public override float GetFloat(int ordinal)
	{
		throw new NotSupportedException();
	}

	public override string GetDataTypeName(int ordinal)
	{
		throw new NotSupportedException();
	}

	public override Type GetFieldType(int ordinal)
	{
		throw new NotSupportedException();
	}
}

StructuresReader

internal class StructuresReader : SingleResultReaderBase<IStructure>
{
	internal StructuresReader(StructureStorageSchema storageSchema, IEnumerable<IStructure> items)
		: base(storageSchema, items)
	{
	}

	public override object GetValue(int ordinal)
	{
		var schemaField = StorageSchema.FieldsByIndex[ordinal];
		if(schemaField.Name == StructureStorageSchema.Fields.Id.Name)
		{
			if (Enumerator.Current.Id.IdType == IdTypes.Identity)
				return int.Parse(Enumerator.Current.Id.Value);

			if (Enumerator.Current.Id.IdType == IdTypes.Guid)
				return Guid.Parse(Enumerator.Current.Id.Value);

			throw new NotSupportedException();
		}

		if(schemaField.Name == StructureStorageSchema.Fields.Json.Name)
			return Enumerator.Current.Json;

		throw new NotSupportedException();
	}
}

StorageSchema

[Serializable]
internal abstract class StorageSchemaBase
{
	internal readonly Dictionary<int, SchemaField> FieldsByIndex;
	internal readonly Dictionary<string, SchemaField> FieldsByName;

	internal string Name { get; private set; }

	protected StorageSchemaBase(string name)
	{
		FieldsByIndex = new Dictionary<int, SchemaField>();
		FieldsByName = new Dictionary<string, SchemaField>();
		Name = name;
		
		InitializeFields();
	}

	protected abstract void InitializeFields();

	public int FieldCount()
	{
		return FieldsByIndex.Count;
	}
}

[Serializable]
internal class StructureStorageSchema
	: StorageSchemaBase
{
	internal static class Fields
	{
		internal static readonly SchemaField Id = new SchemaField(0, "Id");
		internal static readonly SchemaField Json = new SchemaField(1, "Json");

		internal static SchemaField[] GetOrderedFields()
		{
			return new[] { Id, Json };
		}
	}

	internal StructureStorageSchema(IStructureSchema structureSchema) 
		: base(structureSchema.GetStructureTableName())
	{
	}

	protected override void InitializeFields()
	{
		foreach (var field in Fields.GetOrderedFields())
		{
			FieldsByIndex.Add(field.Index, field);
			FieldsByName.Add(field.Name, field);
		}
	}
}

Then to consume the reader you would have code looking something like this:

using(var structuresReader = new StructuresReader(structureStorageSchema, structures))
{
	using (var structuresBulkInserter = new SqlBulkCopy(_connection,
		SqlBulkCopyOptions.Default | SqlBulkCopyOptions.KeepIdentity, _transatcion))
	{
		structuresBulkInserter.BatchSize = structuresReader.RecordsAffected;
		structuresBulkInserter.DestinationTableName = structuresReader.StorageSchema.Name;
		structuresBulkInserter.NotifyAfter = 0;

		foreach (var field in structuresReader.StorageSchema.FieldsByIndex.Values)
			structuresBulkInserter.ColumnMappings.Add(field.Name, field.Name);

		structuresBulkInserter.WriteToServer(structuresReader);
		structuresBulkInserter.Close();
	}
}
About these ads

One thought on “C# – Custom datareader for SqlBulkCopy

  1. Pingback: DotNetShoutout

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s