????????在实际项目中,不可避免会遇到数据导入的需求,如果是大量数据导入,就必须引用批量处理的功能;efcore本身不提供批量处理功能,而abp框架虽然提供InsertMany和UpdateMany方法,但本质上仍然是分解成单条insert和update操作,在处理稍大量的数据时,耗时就会明显增加;
????????如果你使用的是SqlServer、PostgreSQL或SQLite,可以使用官方推荐的方法实现批处理,你只需要引用borisdj / EFCore.BulkExtensions并编写IEfCoreBulkOperationProvider的提供程序即可
public class BulkExtensionsEfCoreBulkOperationsProvider : IEfCoreBulkOperationProvider, ITransientDependency
{
public Task DeleteManyAsync<TDbContext, TEntity>(IEfCoreRepository<TEntity> repository, IEnumerable<TEntity> entities, bool autoSave, CancellationToken cancellationToken)
where TDbContext : IEfCoreDbContext
where TEntity : class, IEntity
{
return repository.DbContext.BulkDeleteAsync(entities.ToList());
}
public Task InsertManyAsync<TDbContext, TEntity>(IEfCoreRepository<TEntity> repository, IEnumerable<TEntity> entities, bool autoSave, CancellationToken cancellationToken)
where TDbContext : IEfCoreDbContext
where TEntity : class, IEntity
{
return repository.DbContext.BulkInsertAsync(entities.ToList());
}
public Task UpdateManyAsync<TDbContext, TEntity>(IEfCoreRepository<TEntity> repository, IEnumerable<TEntity> entities, bool autoSave, CancellationToken cancellationToken)
where TDbContext : IEfCoreDbContext
where TEntity : class, IEntity
{
return repository.DbContext.BulkUpdateAsync(entities.ToList());
}
}
这样常规调用InsertManyAsync或UpdateManyAsync时就可以实现批量操作。但是borisdj / EFCore.BulkExtensions不支持Mysql数据,因此不能使用这种方法,这里我使用的是另一个扩展包yang-er/efcore-ext,跟EFCore.BulkExtensions算是一脉相承,并且添加了对mysql的支持,理论上也可以跟上面一样编写提供程序来实现,但是由于实现原理有点不一样,这样写生成不了Sql语句,还是只能换一个思路来实现了。
下面开始实例演示如何在abp上使用efcore-ext扩展包:
一、搭建测试环境
????????1、如果未安装abp cli工具,执行以下命令安装全局abp cli工具
dotnet tool install -g Volo.Abp.Cli
? ? ? ? ?2、如果已安装旧版本,执行以下命令更新到最新版
dotnet tool update -g Volo.Abp.Cli
? ? ? ? 3、在任意目录执行以下命令生成测试项目
abp new MyComp.BulkTest -t app -u none -d ef -dbms MySQL -csf
? ? ? ? 注意这里我们仅生成后端api的代码,不包含前端界面?
? ? ? ? 4、生成测试模块
? ??????进入解决方案目录:
cd .\MyComp.BulkTest\aspnet-core\
? ??????在解决方案中添加测试模块
abp add-module MyComp.BulkModule --new --add-to-solution-file
二、添加第三方包引用 ? ? ? ?使用vs2022打开解决方案,在解决方案找到模块的ef层项目MyComp.BulkModule.EntityFrameworkCore,在包管理器中添加XiaoYang.EntityFrameworkCore.Bulk引用,或者使用命令行添加引用:
cd .\modules\MyComp.BulkModule\src\MyComp.BulkModule.EntityFrameworkCore\
dotnet add package XiaoYang.EntityFrameworkCore.Bulk
? ? ? ? ?接着找到主模块的ef项目MyComp.BulkTest.EntityFrameworkCore项目,在包管理器中添加XiaoYang.EntityFrameworkCore.Bulk.MySql引用,或者使用命令行添加引用:
cd ..\..\..\..\src\MyComp.BulkTest.EntityFrameworkCore\
dotnet add package XiaoYang.EntityFrameworkCore.Bulk.MySql
? ? ? ? ?在主模块引用Mysql包是为了不更改模块代码的情况下可以方便切换数据库实现,如果没有分模块则两个引用都在主模块ef层添加即可
三、添加基础设施实现
? ? ? ? 在测试模块的MyComp.BulkModule.Domain项目中添加实体类TableA
using System;
using System.Collections.Generic;
using System.Text;
using Volo.Abp.Domain.Entities;
namespace MyComp.BulkModule.Entities
{
public class TableA : Entity<Guid>
{
public TableA(){}
public TableA(Guid id, int code, string name, string desc = null)
{
Id = id;
Code = code;
Name = name;
Desc = desc;
}
/// <summary>
/// 批处理需要Id属性赋值,不能在构造中赋值,而基类中set方法是protected的,因此此处需要new关键字覆盖默认的Id属性
/// </summary>
public new Guid Id { get; set; }
public int Code { get; set; }
public string Name { get; set; }
public string Desc { get; set; }
}
}
在MyComp.BulkModule.EntityFrameworkCore项目中添加以下内容:
IBulkModuleDbContext
public interface IBulkModuleDbContext : IEfCoreDbContext
{
DbSet<TableA> TableA { get; }
}
?BulkModuleDbContext
public class BulkModuleDbContext : AbpDbContext<BulkModuleDbContext>, IBulkModuleDbContext
{
public DbSet<TableA> TableA { get; set; }
...
...
}
?BulkModuleDbContextModelCreatingExtensions
public static class BulkModuleDbContextModelCreatingExtensions
{
public static void ConfigureBulkModule(
this ModelBuilder builder)
{
Check.NotNull(builder, nameof(builder));
builder.Entity<TableA>(b =>
{
//Configure table & schema name
b.ToTable(BulkModuleDbProperties.DbTablePrefix + "TableA", BulkModuleDbProperties.DbSchema);
b.ConfigureByConvention();
//Properties
b.Property(q => q.Name).HasMaxLength(64);
b.Property(q => q.Desc).HasMaxLength(256);
});
}
}
四、生成数据库迁移
?进入主模块MyComp.BulkTest.EntityFrameworkCore项目,执行如下执行生成数据库迁移
dotnet ef migrations add -o Migrations CreateDb
找到MyComp.BulkTest.DbMigrator项目,右键调试项目,将数据库迁移和基础数据应用到数据库
五、配置abp以支持批处理
? ? ? ? 注意abp框架中的AbpDbContext扩展了Efcore中的DbContext实例,而使用AbpDbContext是无法使用efcore-ext扩展包的,我们自己实现的DbContext继承自AbpDbContext,也是无法直接使用批处理扩展包的;具体原因这里就不描述了,调试一下扩展包源码就可以大概清楚。
? ? ? ? 首先在MyComp.BulkModule.EntityFrameworkCore项目中添加DataBulkDbContext类
using Microsoft.EntityFrameworkCore;
using MyComp.BulkModule.Entities;
namespace MyComp.BulkModule.EntityFrameworkCore
{
public class DataBulkDbContext : DbContext
{
public DbSet<TableA> Members { get; set; }
public DataBulkDbContext(DbContextOptions<DataBulkDbContext> options)
: base(options)
{
}
protected override void OnModelCreating(ModelBuilder builder)
{
base.OnModelCreating(builder);
builder.Entity<TableA>(b =>
{
//Configure table & schema name
b.ToTable(BulkModuleDbProperties.DbTablePrefix + "TableA", BulkModuleDbProperties.DbSchema);
// 显示设置主键
b.HasKey(b => b.Id);
// 批处理扩展包中要求主键值类型为ValueGeneratedNever,默认为ValueGeneratedOnAdd,会报错
b.Property(t => t.Id).ValueGeneratedNever();
// 如果有扩展字段,忽略掉
//b.Ignore(b => b.ExtraProperties);
});
}
}
}
????????注意这里 基类是DbContext而不是AbpDbContext,可以跟默认的BulkModuleDbContext比较不同;DataBulkDbContext仅用作批处理操作,不影响到数据迁移,只需要设置表名和主键字段即可,有唯一索引也需要设置
? ? ? ? ?接着在主模块MyComp.BulkTest.EntityFrameworkCore项目的BulkTestEntityFrameworkCoreModule类中添加如下代码:
public class BulkTestEntityFrameworkCoreModule : AbpModule
{
...
public override void ConfigureServices(ServiceConfigurationContext context)
{
...
...
var configuration = context.Services.GetConfiguration();
context.Services.AddDbContext<DataBulkDbContext>(options =>
{
options.UseMySql(configuration["ConnectionStrings:Default"], MySqlServerVersion.LatestSupportedServerVersion, o => o.UseBulk());
});
}
}
六、添加批处理测试代码
? ? ? ? 1、在测试模块MyComp.BulkModule.Domain中添加数据仓库接口
? ? ? ? Abp常规数据仓库接口ITableARepository
using System;
using Volo.Abp.Domain.Repositories;
namespace MyComp.BulkModule.Entities
{
public interface ITableARepository : IBasicRepository<TableA, Guid>
{
}
}
? ? ? ? 批处理数据仓库接口ITableABulkRepository
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace MyComp.BulkModule.Entities
{
public interface ITableABulkRepository
{
Task UpsertManyAsync(IEnumerable<TableA> members, CancellationToken cancellationToken = default);
}
}
? ? ? ? 2、在MyComp.BulkModule.EntityFrameworkCore中添加数据仓库实现
????????Abp常规数据仓库实现
using MyComp.BulkModule.Entities;
using System;
using Volo.Abp.Domain.Repositories.EntityFrameworkCore;
using Volo.Abp.EntityFrameworkCore;
namespace MyComp.BulkModule.EntityFrameworkCore.Repositories
{
public class TableARepository : EfCoreRepository<IBulkModuleDbContext, TableA, Guid>, ITableARepository
{
public TableARepository(IDbContextProvider<IBulkModuleDbContext> dbContextProvider) : base(dbContextProvider)
{
}
}
}
????????批处理数据仓库实现
using Microsoft.EntityFrameworkCore;
using MyComp.BulkModule.Entities;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
namespace MyComp.BulkModule.EntityFrameworkCore.Repositories
{
public class TableABulkRepository : ITableABulkRepository, ITransientDependency
{
private readonly DataBulkDbContext _bulkContext;
public TableABulkRepository(DataBulkDbContext dbContext)
{
_bulkContext = dbContext;
}
/// <summary>
/// 批量插入或更新
/// </summary>
/// <param name="importData">数据源</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task UpsertManyAsync(IEnumerable<TableA> importData, CancellationToken cancellationToken = default)
{
var ms = importData.Select(t => new { t.Id, t.Code, t.Name, t.Desc });
await _bulkContext.TableA.UpsertAsync(ms,
insertExpression:
s => new TableA { Id = s.Id, Code = s.Code, Name = s.Name, Desc = s.Desc },
updateExpression:
(rc1, rc2) => new TableA { Code = rc2.Code, Name = rc2.Name, Desc = rc2.Desc }
);
}
}
}
? ? ? ? 3、Service服务实现
? ? ? ? 修改MyComp.BulkModule.Application.Contracts项目ISampleAppService接口如下?
public interface ISampleAppService : IApplicationService
{
...
/// <summary>
/// abp常规插入
/// </summary>
/// <returns></returns>
Task InsertManyAsync();
/// <summary>
/// 批量插入或更新
/// </summary>
/// <returns></returns>
Task BulkUpsertAsync();
}
? ? ? ? 修改MyComp.BulkModule.Application项目SampleAppService类如下:?
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Authorization;
using MyComp.BulkModule.Entities;
using Volo.Abp.Uow;
namespace MyComp.BulkModule.Samples;
public class SampleAppService : BulkModuleAppService, ISampleAppService
{
private ITableARepository _tableARepository;
private ITableABulkRepository _tableABulkRepository;
public SampleAppService(ITableARepository tableARepository, ITableABulkRepository tableABulkRepository)
{
_tableARepository = tableARepository;
_tableABulkRepository = tableABulkRepository;
}
public async Task InsertManyAsync()
{
var data = new List<TableA>();
for(int i = 0; i< 20000; i++)
{
data.Add(new TableA(GuidGenerator.Create(), i, $"Test{i}"));
}
await _tableARepository.InsertManyAsync(data);
}
public async Task BulkUpsertAsync()
{
var data = new List<TableA>();
for (int i = 0; i < 20000; i++)
{
data.Add(new TableA(GuidGenerator.Create(), i, $"Test{i}"));
}
await _tableABulkRepository.UpsertManyAsync(data);
}
。。。
}
? ? ? ?这里分别使用常规方法和批量方法插入20000条记录
????????修改MyComp.BulkModule.HttpApi项目的SampleController类如下:
using System.Threading.Tasks;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Volo.Abp;
namespace MyComp.BulkModule.Samples;
[Area(BulkModuleRemoteServiceConsts.ModuleName)]
[RemoteService(Name = BulkModuleRemoteServiceConsts.RemoteServiceName)]
[Route("api/BulkModule/sample")]
public class SampleController : BulkModuleController, ISampleAppService
{
private readonly ISampleAppService _sampleAppService;
public SampleController(ISampleAppService sampleAppService)
{
_sampleAppService = sampleAppService;
}
[HttpPost]
[Route("insert")]
public Task InsertManyAsync()
{
return _sampleAppService.InsertManyAsync();
}
[HttpPost]
[Route("bulk")]
public Task BulkUpsertAsync()
{
return _sampleAppService.BulkUpsertAsync();
}
}
七、执行批量插入
? ? ? ? 首先启动MyComp.BulkTest.HttpApi.Host项目,在浏览器打开https://localhost:44347/swagger/index.html,找到Sample控制器,执行insert和bulk方法,完成后查看审计日志
?可以看出常规方法,要100秒,而且随着记录数增加,时间增加更多
?同样插入20000条记录,很明显,批量方法速度快很多,才1秒左右
本章源码:AbpMysqlBulkSample: Abp 框架Mysql批量操作示例 (gitee.com)
|