- A+
所属分类:.NET技术
dapper是C#程序员比较喜欢用的轻量级ORM,简单易学,只是没有批量新增以及修改(收费版有),写了如下扩展
/// <summary> /// dapper MySQL批量新增修改扩展 /// </summary> public static class DapperExtensions { /// <summary> /// 批量插入 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="connection"></param> /// <param name="tableName">表名</param> /// <param name="items">数据列表</param> /// <param name="dataFunc"></param> /// <param name="duplicateData">主键相同修改字段</param> /// <param name="dbTransaction">事务</param> /// <param name="insert">insert,replace,insert ignore</param> /// <returns></returns> public static async Task BulkInsert<T>( this IDbConnection connection, string tableName, IReadOnlyCollection<T> items, Dictionary<string, Func<T, object>> dataFunc, IEnumerable<string>? duplicateData = null, IDbTransaction? dbTransaction = null, string insert = "INSERT") { const int MaxBatchSize = 5000; const int MaxParameterSize = 10000; var batchSize = Math.Min((int)Math.Ceiling((double)MaxParameterSize / dataFunc.Keys.Count), MaxBatchSize); var numberOfBatches = (int)Math.Ceiling((double)items.Count / batchSize); var columnNames = dataFunc.Keys; var insertSql = $"{insert} INTO {tableName} ({string.Join(",", columnNames.Select(e => $"`{e}`"))}) VALUES"; var sqlToExecute = new List<Tuple<string, DynamicParameters>>(); for (var i = 0; i < numberOfBatches; i++) { var dataToInsert = items.Skip(i * batchSize) .Take(batchSize); var valueSql = GetQueries(dataToInsert, dataFunc); sqlToExecute.Add(Tuple.Create($"{insertSql}{string.Join(",", valueSql.Item1)}", valueSql.Item2)); } var duplicate = string.Empty; if (duplicateData != null) { duplicate = $" ON DUPLICATE KEY UPDATE {string.Join(',', duplicateData.Select(d => $"`{d}`=VALUES(`{d}`)"))}"; } foreach (var sql in sqlToExecute) { await connection.ExecuteAsync(sql.Item1 + duplicate, sql.Item2, commandTimeout: int.MaxValue, transaction: dbTransaction); } } private static Tuple<IEnumerable<string>, DynamicParameters> GetQueries<T>( IEnumerable<T> dataToInsert, Dictionary<string, Func<T, object>> dataFunc) { var parameters = new DynamicParameters(); return Tuple.Create( dataToInsert.Select(e => $"({string.Join(",", GenerateQueryAndParameters(e, parameters, dataFunc))})"), parameters); } private static IEnumerable<string> GenerateQueryAndParameters<T>( T entity, DynamicParameters parameters, Dictionary<string, Func<T, object>> dataFunc) { var paramTemplateFunc = new Func<Guid, string>(guid => $"@p{guid:N}"); var paramList = new List<string>(); foreach (var key in dataFunc) { var paramName = paramTemplateFunc(Guid.NewGuid()); parameters.Add(paramName, key.Value(entity)); paramList.Add(paramName); } return paramList; } /// <summary> /// 批量更新 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="connection"></param> /// <param name="tableName">表名</param> /// <param name="items">数据列表</param> /// <param name="dataFunc"></param> /// <param name="primaryFunc"></param> /// <param name="primaryKey">主键字段</param> /// <param name="isIntKey">主键是否是数字类型</param> /// <param name="dbTransaction">事务</param> /// <returns></returns> public static async Task BulkUpdate<T>( this IDbConnection connection, string tableName, IReadOnlyCollection<T> items, Dictionary<string, Func<T, object>> dataFunc, Func<T, object> primaryFunc, string primaryKey, bool isIntKey = true, IDbTransaction? dbTransaction = null) { const int MaxBatchSize = 5000; const int MaxParameterSize = 10000; var batchSize = Math.Min((int)Math.Ceiling((double)MaxParameterSize / dataFunc.Keys.Count), MaxBatchSize); var numberOfBatches = (int)Math.Ceiling((double)items.Count / batchSize); var columnNames = dataFunc.Keys; var updateSql = $"UPDATE {tableName} SET"; var sqlToExecute = new List<Tuple<string, DynamicParameters>>(); for (var i = 0; i < numberOfBatches; i++) { var dataToUpdate = items.Skip(i * batchSize) .Take(batchSize); var valueSql = GetUpdateQueries(dataToUpdate, dataFunc, primaryFunc, primaryKey, isIntKey); sqlToExecute.Add(Tuple.Create($"{updateSql}{valueSql.Item1}", valueSql.Item2)); } foreach (var sql in sqlToExecute) { await connection.ExecuteAsync(sql.Item1, sql.Item2, commandTimeout: int.MaxValue, transaction: dbTransaction); } } private static Tuple<string, DynamicParameters> GetUpdateQueries<T>( IEnumerable<T> dataToUpdate, Dictionary<string, Func<T, object>> dataFunc, Func<T, object> primaryFunc, string primaryKey, bool isIntKey) { var paramTemplateFunc = new Func<Guid, T, (string param, string sql)>((guid, entity) => { var keyValue = primaryFunc(entity); if (!isIntKey) { keyValue = $"'{keyValue}'"; } var param = $"@p{guid:N}"; var sql = $"WHEN {keyValue} THEN {param}"; return (param, sql); } ); var parameters = new DynamicParameters(); List<string> sqlList = new(); foreach (var key in dataFunc) { var paramList = new List<string>(); foreach (var e in dataToUpdate) { var (param, sql) = paramTemplateFunc(Guid.NewGuid(), e); parameters.Add(param, key.Value(e)); paramList.Add(sql); } sqlList.Add($"`{key.Key}`=CASE `{primaryKey}` {string.Join(" ", paramList)} END"); } object idFunc(T p) { return primaryFunc(p); } parameters.Add("@ids", dataToUpdate.Select(idFunc)); return Tuple.Create( $"{string.Join(",", sqlList)} WHERE `{primaryKey}` IN @ids", parameters); } }
使用方法:
新增:
await conn.BulkInsert( "userInfo", //表名 userinfoList, //列表 new Dictionary<string, Func<UserInfo, object>> { {"Name", u => u.Name }, {"Age", u => u.Age }, {"Sex", u => u.Sex }, });
修改:
await conn.BulkUpdate("userInfo", userInfoList, new Dictionary<string, Func<UserInfo, object>> { {"Name", u => u.Name } {"Age", u => u.Age }, {"Sex", u => u.Sex }, }, new Func<UserInfo, object>(u => u.ID), "ID");