- A+
1.前言
hi,大家好,我是三合。我是怎么想起写一篇关于数据库快速批量插入的博客的呢?事情起源于我们工作中的一个需求,简单来说,就是有一个定时任务,从数据库里获取大量数据,在应用层面经过处理后再把结果批量插入回到数据库里。这个任务每十分钟执行一次,但是有的时候数据量太大,循环插入数据库的时候会超时,导致任务失败,所以这个时候我就开始研究怎么快速批量插入数据库,因为我们用的数据库是Oracle,所以我首先研究了Oracle的快速批量插入,后面我一想那其他类型的数据库肯定也有这样的需求,于是我在找了很多资料,并且反复实验后,终于完美解决了mysql,sqlServer以及Oracle的快速批量插入,sqlite自身不支持,所以没有sqlite,特地整理成这篇文章,分享给大家。
2.测试前准备
添加一个具有绝大多数类型属性的实体类,用来完整测验批量插入效果,该实体类用于mysql和sqlserver的测试。
public class NullableTable { [DatabaseGenerated(DatabaseGeneratedOption.Identity)] [Key] public int Id { get; set; } [Description("Int2")] public int? Int2 { get; set; } [Description("Long2")] public long? Long2 { get; set; } public float? Float2 { get; set; } public double? Double2 { get; set; } public decimal? Decimal2 { get; set; } [DecimalPrecision(20,4)] public decimal? Decimal3 { get; set; } public Guid? Guid2 { get; set; } public short? Short2 { get; set; } public DateTime? DateTime2 { get; set; } public bool? Bool2 { get; set; } public TimeSpan? TimeSpan2 { get; set; } public byte? Byte2 { get; set; } [StringLength(100)] public string String2 { get; set; } public string String3 { get; set; } public Enum2? Enum2 { get; set; } [Column("TestInt3")] [Description("Int2")] public int? Int3 { get; set; } } public enum Enum2 { x, y }
因为oracle数据库我们习惯于表名和字段名大写,所以oracle的测试实体类定义如下:
[Table("NULLABLETABLE")] [Description("NullableTable")] public class NullableTable { [DatabaseGenerated(DatabaseGeneratedOption.Identity)] [Key] [Column("ID")] public int Id { get; set; } [Description("Int2")] [Column("INT2")] public int? Int2 { get; set; } [Description("Long2")] [Column("LONG2")] public long? Long2 { get; set; } [Column("FLOAT2")] public float? Float2 { get; set; } [Column("DOUBLE2")] public double? Double2 { get; set; } [Column("DECIMAL2")] public decimal? Decimal2 { get; set; } [Column("DECIMAL3")] [DecimalPrecision(20,4)] public decimal? Decimal3 { get; set; } [Column("GUID2")] public Guid? Guid2 { get; set; } [Column("SHORT2")] public short? Short2 { get; set; } [Column("DATETIME2")] public DateTime? DateTime2 { get; set; } [Column("BOOL2")] public bool? Bool2 { get; set; } [Column("TIMESPAN2")] public TimeSpan? TimeSpan2 { get; set; } [Column("BYTE2")] public byte? Byte2 { get; set; } [Column("STRING2")] [StringLength(100)] public string String2 { get; set; } [Column("STRING3")] public string String3 { get; set; } [Column("ENUM2")] public Enum2? Enum2 { get; set; } [Column("TESTINT3")] [Description("Int2")] public int? Int3 { get; set; } }
实验我们采用的是code first,先利用SummerBoot框架的可用于依赖注入的,数据库表和c#实体类互相转换的接口实现功能从实体类生成相应的数据库表,本次实验批量插入2w条数据来对比时间,定义一个列表,用循环的方式给这个列表添加2w条数据。
var nullableTableList3 = new List<NullableTable>(); var now = DateTime.Now; for (int i = 0; i < 20000; i++) { var a = new NullableTable() { Int2 = 2, Bool2 = true, Byte2 = 1, DateTime2 = now, Decimal2 = 1m, Decimal3 = 1.1m, Double2 = 1.1, Float2 = (float)1.1, Guid2 = Guid.NewGuid(), Id = 0, Short2 = 1, TimeSpan2 = TimeSpan.FromHours(1), String2 = "sb", String3 = "sb", Long2 = 2, Enum2 = Model.Enum2.y, Int3 = 4 }; nullableTableList3.Add(a); }
数据库驱动上的选择是这样的,sqlserver采用微软官方驱动System.Data.SqlClient,oracle采用官方驱动Oracle.ManagedDataAccess.Core,mysql采用社区驱动MySqlConnector(为啥mysql不采用官方的驱动呢?因为官方的驱动封装的太差了,社区的驱动支持列名映射,同时项目里官方驱动和社区驱动可以共存)。
同时快速批量插入均支持异步同步,这里仅演示同步,异步的实现基本一样。
3.sqlserver快速批量插入
sqlserver官方提供的批量插入方式是SqlBulkCopy,参数为一个dataTable对象,原生的批量插入代码如下,采用StopWatch类进行计时,测试前都会用DELETE from NullableTable 语句清空表,测试里循环跑5次,获取总时间后除以5获取平均值,合计插入10w条数据。
var sw = new Stopwatch(); sw.Start(); for (int i = 0; i < 5; i++) { using (var dbConnection = new SqlConnection(connectionString)) { dbConnection.Open(); SqlBulkCopy sqlBulkCopy = new SqlBulkCopy(dbConnection, SqlBulkCopyOptions.KeepIdentity, null); sqlBulkCopy.BatchSize = 20000; sqlBulkCopy.DestinationTableName = "NullableTable"; //针对列名做一下映射 sqlBulkCopy.ColumnMappings.Add("Int2", "Int2"); sqlBulkCopy.ColumnMappings.Add("Bool2", "Bool2"); sqlBulkCopy.ColumnMappings.Add("Byte2", "Byte2"); sqlBulkCopy.ColumnMappings.Add("DateTime2", "DateTime2"); sqlBulkCopy.ColumnMappings.Add("Decimal2", "Decimal2"); sqlBulkCopy.ColumnMappings.Add("Decimal3", "Decimal3"); sqlBulkCopy.ColumnMappings.Add("Double2", "Double2"); sqlBulkCopy.ColumnMappings.Add("Float2", "Float2"); sqlBulkCopy.ColumnMappings.Add("Guid2", "Guid2"); sqlBulkCopy.ColumnMappings.Add("Short2", "Short2"); sqlBulkCopy.ColumnMappings.Add("TimeSpan2", "TimeSpan2"); sqlBulkCopy.ColumnMappings.Add("String2", "String2"); sqlBulkCopy.ColumnMappings.Add("String3", "String3"); sqlBulkCopy.ColumnMappings.Add("Long2", "Long2"); sqlBulkCopy.ColumnMappings.Add("Enum2", "Enum2"); sqlBulkCopy.ColumnMappings.Add("Int3", "TestInt3"); //将实体类列表转换成dataTable var table = nullableTableList3.ToDataTable(); sqlBulkCopy.WriteToServer(table); } } sw.Stop(); var totalTime= sw.ElapsedMilliseconds; var avgValue = totalTime / 5;
实验结果如下,sql server中:
采用快速批量插入10w条数据,时间合计1858毫秒,平均插入2w条数据仅需371毫秒。
采用insert into语句,循环插入10w条数据,时间合计457606毫秒,平均插入2w条数据需91521毫秒。
4.实体类列表转dataTable的扩展方法
这里有一个实体类列表转dataTable的扩展方法,采用的是表达式树+构建委托的方式,性能不错,大家可以参考,代码实现如下。
public static ConcurrentDictionary<string, object> CacheDictionary = new ConcurrentDictionary<string, object>(); /// <summary> /// 构建一个object数据转换成一维数组数据的委托 /// </summary> /// <param name="objType"></param> /// <param name="propertyInfos"></param> /// <returns></returns> public static Func<T, object[]> BuildObjectGetValuesDelegate<T>(List<PropertyInfo> propertyInfos) where T : class { var objParameter = Expression.Parameter(typeof(T), "model"); var selectExpressions = propertyInfos.Select(it => BuildObjectGetValueExpression(objParameter, it)); var arrayExpression = Expression.NewArrayInit(typeof(object), selectExpressions); var result = Expression.Lambda<Func<T, object[]>>(arrayExpression, objParameter).Compile(); return result; } /// <summary> /// 构建对象获取单个值得 /// </summary> /// <param name="modelExpression"></param> /// <param name="propertyInfo"></param> /// <returns></returns> public static Expression BuildObjectGetValueExpression(ParameterExpression modelExpression, PropertyInfo propertyInfo) { var propertyExpression = Expression.Property(modelExpression, propertyInfo); var convertExpression = Expression.Convert(propertyExpression, typeof(object)); return convertExpression; } public static DataTable ToDataTable<T>(this IEnumerable<T> source, List<PropertyInfo> propertyInfos = null,bool useColumnAttribute=false) where T : class { var table = new DataTable("template"); if (propertyInfos == null || propertyInfos.Count == 0) { propertyInfos = typeof(T).GetProperties().Where(it => it.CanRead).ToList(); } foreach (var propertyInfo in propertyInfos) { var columnName=useColumnAttribute?(propertyInfo.GetCustomAttribute<ColumnAttribute>()?.Name?? propertyInfo.Name) : propertyInfo.Name; table.Columns.Add(columnName, ChangeType(propertyInfo.PropertyType)); } Func<T, object[]> func; var key = typeof(T).FullName + propertyInfos.Select(it => it.Name).ToList().StringJoin(); if (CacheDictionary.TryGetValue(key, out var cacheFunc)) { func = (Func<T, object[]>)cacheFunc; } else { func = BuildObjectGetValuesDelegate<T>(propertyInfos); CacheDictionary.TryAdd(key, func); } foreach (var model in source) { var rowData = func(model); table.Rows.Add(rowData); } return table; } private static Type ChangeType(Type type) { if (type.IsNullable()) { type = Nullable.GetUnderlyingType(type); } return type; }
5.oracle快速批量插入
oracle官方提供的批量插入方式是ArrayBindCount,即数组批量插入,原生的批量插入代码如下,计时方式与sqlserver相同
var total = 20000; var sw = new Stopwatch(); sw.Start(); for (int i = 0; i < 5; i++) { var connection = new OracleConnection(connectionString); connection.Open(); int?[] Int2 = new int?[total]; bool[] Bool2 = new bool[total]; byte[] Byte2 = new byte[total]; DateTime[] DateTime2 = new DateTime[total]; decimal?[] Decimal2 = new decimal?[total]; decimal[] Decimal3 = new decimal[total]; double[] Double2 = new double[total]; float[] Float2 = new float[total]; Guid?[] Guid2 = new Guid?[total]; short[] Short2 = new short[total]; TimeSpan[] TimeSpan2 = new TimeSpan[total]; string[] String2 = new string[total]; string[] String3 = new string[total]; long[] Long2 = new long[total]; Enum2[] Enum2 = new Enum2[total]; for (int j = 0; j < total; j++) { Int2[j] = 2; Bool2[j] = true; Byte2[j] = 1; DateTime2[j] = now; Decimal2[j] = 1m; Decimal3[j] = 1.1m; Double2[j] = 1.1; Float2[j] = (float) 1.1; Guid2[j] = Guid.NewGuid(); Short2[j] = 1; TimeSpan2[j] = TimeSpan.FromHours(1); String2[j] = "sb"; String3[j] = "sb"; Long2[j] = 2; Enum2[j] = Model.Enum2.y; } var c = (int) Model.Enum2.y; OracleParameter pInt2 = new OracleParameter(); pInt2.OracleDbType = OracleDbType.Int32; pInt2.Value = Int2; OracleParameter pBool2 = new OracleParameter(); pBool2.OracleDbType = OracleDbType.Byte; pBool2.Value = Bool2; OracleParameter pByte2 = new OracleParameter(); pByte2.OracleDbType = OracleDbType.Byte; pByte2.Value = Byte2; OracleParameter pDateTime2 = new OracleParameter(); pDateTime2.OracleDbType = OracleDbType.TimeStamp; pDateTime2.Value = DateTime2; OracleParameter pDecimal2 = new OracleParameter(); pDecimal2.OracleDbType = OracleDbType.Decimal; pDecimal2.Value = Decimal2; OracleParameter pDecimal3 = new OracleParameter(); pDecimal3.OracleDbType = OracleDbType.Decimal; pDecimal3.Value = Decimal3; OracleParameter pDouble2 = new OracleParameter(); pDouble2.OracleDbType = OracleDbType.Double; pDouble2.Value = Double2; OracleParameter pFloat2 = new OracleParameter(); pFloat2.OracleDbType = OracleDbType.BinaryFloat; pFloat2.Value = Float2; OracleParameter pGuid2 = new OracleParameter(); pGuid2.OracleDbType = OracleDbType.Raw; pGuid2.Value = Guid2; OracleParameter pShort2 = new OracleParameter(); pShort2.OracleDbType = OracleDbType.Int16; pShort2.Value = Short2; OracleParameter pTimeSpan2 = new OracleParameter(); pTimeSpan2.OracleDbType = OracleDbType.IntervalDS; pTimeSpan2.Value = TimeSpan2; OracleParameter pString2 = new OracleParameter(); pString2.OracleDbType = OracleDbType.Varchar2; pString2.Value = String2; OracleParameter pString3 = new OracleParameter(); pString3.OracleDbType = OracleDbType.Varchar2; pString3.Value = String3; OracleParameter pLong2 = new OracleParameter(); pLong2.OracleDbType = OracleDbType.Long; pLong2.Value = Long2; OracleParameter pEnum2 = new OracleParameter(); pEnum2.OracleDbType = OracleDbType.Byte; pEnum2.Value = Enum2; // create command and set properties OracleCommand cmd = connection.CreateCommand(); cmd.CommandText = "INSERT INTO NULLABLETABLE (INT2, LONG2, FLOAT2, DOUBLE2, DECIMAL2, DECIMAL3, GUID2, SHORT2, DATETIME2, BOOL2, TIMESPAN2, BYTE2, STRING2, STRING3,ENUM2) VALUES(:1,:2,:3,:4,:5,:6,:7,:8,:9,:10,:11,:12,:13,:14,:15)"; cmd.ArrayBindCount = total; cmd.Parameters.Add(pInt2); cmd.Parameters.Add(pLong2); cmd.Parameters.Add(pFloat2); cmd.Parameters.Add(pDouble2); cmd.Parameters.Add(pDecimal2); cmd.Parameters.Add(pDecimal3); cmd.Parameters.Add(pGuid2); cmd.Parameters.Add(pShort2); cmd.Parameters.Add(pDateTime2); cmd.Parameters.Add(pBool2); cmd.Parameters.Add(pTimeSpan2); cmd.Parameters.Add(pByte2); cmd.Parameters.Add(pString2); cmd.Parameters.Add(pString3); cmd.Parameters.Add(pEnum2); cmd.ExecuteNonQuery(); } sw.Stop(); var totalTime = sw.ElapsedMilliseconds; var avgValue = totalTime / 5;
实验结果如下,oracle中:
采用快速批量插入10w条数据,时间合计2323毫秒,平均插入2w条数据仅需464毫秒。
采用insert into语句,循环插入10w条数据,时间合计462837毫秒,平均插入2w条数据仅需92567毫秒。
6.mysql快速批量插入
mysql社区驱动MySqlConnector提供的批量插入方式是SqlBulkCopy,基于mysql自身的文件上传机制进行批量插入,参数为一个dataTable对象,原生的批量插入代码如下,计时方式与sqlserver相同,同时,mysql的连接字符串里要添加";AllowLoadLocalInfile=true",即连接字符串的形式应该是"Server=
var sw = new Stopwatch(); sw.Start(); for (int j = 0; j < 5; j++) { using (var dbConnection = new MySqlConnection(connectionString)) { dbConnection.Open(); MySqlBulkCopy sqlBulkCopy = new MySqlBulkCopy(dbConnection, null); sqlBulkCopy.DestinationTableName = "NullableTable"; var propertys = typeof(NullableTable).GetProperties() .Where(it => it.CanRead && it.GetCustomAttribute<NotMappedAttribute>() == null).ToList(); for (int i = 0; i < propertys.Count; i++) { var property = propertys[i]; var columnName = property.GetCustomAttribute<ColumnAttribute>()?.Name ?? property.Name; if (property.PropertyType.GetUnderlyingType() == typeof(Guid)) { sqlBulkCopy.ColumnMappings.Add(new MySqlBulkCopyColumnMapping(i, "@tmp", $"{columnName} =unhex(@tmp)")); } else { sqlBulkCopy.ColumnMappings.Add(new MySqlBulkCopyColumnMapping(i, columnName)); } } var table = nullableTableList3.ToDataTable(); SbUtil.ReplaceDataTableColumnType<Guid, byte[]>(table, guid1 => guid1.ToByteArray()); var c = sqlBulkCopy.WriteToServer(table); } } sw.Stop(); var totalTime = sw.ElapsedMilliseconds; var avgValue = totalTime / 5;
实验结果如下,mysql中:
采用快速批量插入10w条数据,时间合计2350毫秒,平均插入2w条数据仅需470毫秒。
采用insert into语句,循环插入10w条数据,时间合计414700毫秒,平均插入2w条数据需82940毫秒。
在mysql中c#的guid对应的mysql字段类型为varbinary(16),所以table里的guid要转换为字节数组,否则插入数据库后,guid的值就会变成乱码,字节数组传递到mysql服务端后利用unhex函数进行解析,即可正常保存guid类型。 将table里guid的值转为字节数组的方法-SbUtil.ReplaceDataTableColumnType的代码实现如下:
/// <summary> /// 替换dataTable里的列类型 /// </summary> /// <param name="dt"></param> public static void ReplaceDataTableColumnType<OldType,NewType>(DataTable dt,Func<OldType, NewType> replaceFunc) { var needUpdateColumnIndexList = new List<int>(); var needUpdateColumnNameList = new List<string>(); for (int i = 0; i < dt.Columns.Count; i++) { var column = dt.Columns[i]; if (column.DataType.GetUnderlyingType() == typeof(OldType)) { needUpdateColumnIndexList.Add(i); needUpdateColumnNameList.Add(column.ColumnName); } } if (needUpdateColumnIndexList.Count == 0) { return; } var nameMapping = new Dictionary<string, string>(); for (int i = 0; i < needUpdateColumnIndexList.Count; i++) { var oldColumnName = needUpdateColumnNameList[i]; var newColumnName = Guid.NewGuid().ToString("N"); nameMapping.Add(newColumnName, oldColumnName); dt.Columns.Add(newColumnName, typeof(byte[])).SetOrdinal(needUpdateColumnIndexList[i]); for (int j = 0; j < dt.Rows.Count; j++) { var c = (dt.Rows[j][oldColumnName]); dt.Rows[j][newColumnName] = replaceFunc((OldType)(dt.Rows[j][oldColumnName])); } dt.Columns.Remove(oldColumnName); } for (int i = 0; i < dt.Columns.Count; i++) { var columnName = dt.Columns[i].ColumnName; if (nameMapping.ContainsKey(columnName)) { dt.Columns[i].ColumnName = nameMapping[columnName]; } } }
7.SummerBoot对各数据库快速批量插入的封装
基于以上各种数据库对于快速批量插入的原生写法过于复杂难记,SummerBoot对其进行了封装,在声明式编程的理念下,封装后仅需3步即可快速批量插入,这里以sqlserver举例。
7.1在StartUp.cs中添加summerBoot的服务支持
services.AddSummerBoot(); services.AddSummerBootRepository(it => { it.DbConnectionType = typeof(SqlConnection); it.ConnectionString = connectionString; });
7.2添加仓储接口
[AutoRepository] public interface INullableTableRepository : IBaseRepository<NullableTable> { }
7.3注入仓储接口后直接调用FastBatchInsert方法
var sw = new Stopwatch(); sw.Start(); for (int i = 0; i < 5; i++) { nullableTableRepository.FastBatchInsert(nullableTableList3); } sw.Stop(); var totalTime= sw.ElapsedMilliseconds; var avgValue = totalTime / 5;
实验结果如下,sql server中:
采用SummerBoot统一封装后快速批量插入10w条数据,时间合计3926(原生快速批量写法1858)毫秒,平均插入2w条数据仅需785(原生快速批量写法371)毫秒。从对比可以看出,经过SummerBoot封装后,快速批量插入所花费的时间有所增加,但是对于这么大数据量而言,这点多消耗的时间和节省的开发量对比,不值一提。
写在最后
SummerBoot是一款声明式编程框架,专注于”做什么”而不是”如何去做”,更多用法,可参考SummerBoot文档,也可以加入QQ群:799648362反馈建议。同时各位看官,如果你觉得这篇文章还不错的话,请帮忙一键三连哦(推荐+关注+github star)