IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 九、Abp 之Mysql数据库批量操作 -> 正文阅读

[大数据]九、Abp 之Mysql数据库批量操作

????????在实际项目中,不可避免会遇到数据导入的需求,如果是大量数据导入,就必须引用批量处理的功能;efcore本身不提供批量处理功能,而abp框架虽然提供InsertMany和UpdateMany方法,但本质上仍然是分解成单条insert和update操作,在处理稍大量的数据时,耗时就会明显增加;

????????如果你使用的是SqlServer、PostgreSQL或SQLite,可以使用官方推荐的方法实现批处理,你只需要引用borisdj / EFCore.BulkExtensions并编写IEfCoreBulkOperationProvider的提供程序即可

public class BulkExtensionsEfCoreBulkOperationsProvider : IEfCoreBulkOperationProvider, ITransientDependency
    {
        public Task DeleteManyAsync<TDbContext, TEntity>(IEfCoreRepository<TEntity> repository, IEnumerable<TEntity> entities, bool autoSave, CancellationToken cancellationToken)
            where TDbContext : IEfCoreDbContext
            where TEntity : class, IEntity
        {
            return repository.DbContext.BulkDeleteAsync(entities.ToList());
        }

        public Task InsertManyAsync<TDbContext, TEntity>(IEfCoreRepository<TEntity> repository, IEnumerable<TEntity> entities, bool autoSave, CancellationToken cancellationToken)
            where TDbContext : IEfCoreDbContext
            where TEntity : class, IEntity
        {
            return repository.DbContext.BulkInsertAsync(entities.ToList());
        }

        public Task UpdateManyAsync<TDbContext, TEntity>(IEfCoreRepository<TEntity> repository, IEnumerable<TEntity> entities, bool autoSave, CancellationToken cancellationToken)
            where TDbContext : IEfCoreDbContext
            where TEntity : class, IEntity
        {
            return repository.DbContext.BulkUpdateAsync(entities.ToList());
        }
    }

这样常规调用InsertManyAsync或UpdateManyAsync时就可以实现批量操作。但是borisdj / EFCore.BulkExtensions不支持Mysql数据,因此不能使用这种方法,这里我使用的是另一个扩展包yang-er/efcore-ext,跟EFCore.BulkExtensions算是一脉相承,并且添加了对mysql的支持,理论上也可以跟上面一样编写提供程序来实现,但是由于实现原理有点不一样,这样写生成不了Sql语句,还是只能换一个思路来实现了

下面开始实例演示如何在abp上使用efcore-ext扩展包:

一、搭建测试环境

????????1、如果未安装abp cli工具,执行以下命令安装全局abp cli工具

dotnet tool install -g Volo.Abp.Cli

? ? ? ? ?2、如果已安装旧版本,执行以下命令更新到最新版

dotnet tool update -g Volo.Abp.Cli

? ? ? ? 3、在任意目录执行以下命令生成测试项目

abp new MyComp.BulkTest -t app -u none -d ef -dbms MySQL -csf

? ? ? ? 注意这里我们仅生成后端api的代码,不包含前端界面?

? ? ? ? 4、生成测试模块

? ??????进入解决方案目录:

 cd .\MyComp.BulkTest\aspnet-core\

? ??????在解决方案中添加测试模块

abp add-module MyComp.BulkModule --new --add-to-solution-file

二、添加第三方包引用
? ? ? ?使用vs2022打开解决方案,在解决方案找到模块的ef层项目MyComp.BulkModule.EntityFrameworkCore,在包管理器中添加XiaoYang.EntityFrameworkCore.Bulk引用,或者使用命令行添加引用:

cd .\modules\MyComp.BulkModule\src\MyComp.BulkModule.EntityFrameworkCore\
 dotnet add package XiaoYang.EntityFrameworkCore.Bulk

? ? ? ? ?接着找到主模块的ef项目MyComp.BulkTest.EntityFrameworkCore项目,在包管理器中添加XiaoYang.EntityFrameworkCore.Bulk.MySql引用,或者使用命令行添加引用:

 cd ..\..\..\..\src\MyComp.BulkTest.EntityFrameworkCore\
dotnet add package XiaoYang.EntityFrameworkCore.Bulk.MySql

? ? ? ? ?在主模块引用Mysql包是为了不更改模块代码的情况下可以方便切换数据库实现,如果没有分模块则两个引用都在主模块ef层添加即可

三、添加基础设施实现

? ? ? ? 在测试模块的MyComp.BulkModule.Domain项目中添加实体类TableA

using System;
using System.Collections.Generic;
using System.Text;
using Volo.Abp.Domain.Entities;

namespace MyComp.BulkModule.Entities
{
    public class TableA : Entity<Guid>
    {

        public TableA(){}
        
        public TableA(Guid id, int code, string name, string desc = null)
        {
            Id = id;
            Code = code;
            Name = name;
            Desc = desc;
        }

        /// <summary>
        /// 批处理需要Id属性赋值,不能在构造中赋值,而基类中set方法是protected的,因此此处需要new关键字覆盖默认的Id属性
        /// </summary>
        public new Guid Id { get; set; }

        public int Code { get; set; }

        public string Name { get; set; }

        public string Desc { get; set; }
    }
}

在MyComp.BulkModule.EntityFrameworkCore项目中添加以下内容:

IBulkModuleDbContext

public interface IBulkModuleDbContext : IEfCoreDbContext
{
    DbSet<TableA> TableA { get; }
}

?BulkModuleDbContext

public class BulkModuleDbContext : AbpDbContext<BulkModuleDbContext>, IBulkModuleDbContext
{
    public DbSet<TableA> TableA { get; set; }
    ...
    ...
}

?BulkModuleDbContextModelCreatingExtensions

public static class BulkModuleDbContextModelCreatingExtensions
{
    public static void ConfigureBulkModule(
        this ModelBuilder builder)
    {
        Check.NotNull(builder, nameof(builder));

        builder.Entity<TableA>(b =>
        {
            //Configure table & schema name
            b.ToTable(BulkModuleDbProperties.DbTablePrefix + "TableA", BulkModuleDbProperties.DbSchema);

            b.ConfigureByConvention();

            //Properties
            b.Property(q => q.Name).HasMaxLength(64);
            b.Property(q => q.Desc).HasMaxLength(256);

        });
    }
}

四、生成数据库迁移

?进入主模块MyComp.BulkTest.EntityFrameworkCore项目,执行如下执行生成数据库迁移

 dotnet ef migrations add  -o Migrations CreateDb

找到MyComp.BulkTest.DbMigrator项目,右键调试项目,将数据库迁移和基础数据应用到数据库

五、配置abp以支持批处理

? ? ? ? 注意abp框架中的AbpDbContext扩展了Efcore中的DbContext实例,而使用AbpDbContext是无法使用efcore-ext扩展包的,我们自己实现的DbContext继承自AbpDbContext,也是无法直接使用批处理扩展包的;具体原因这里就不描述了,调试一下扩展包源码就可以大概清楚。

? ? ? ? 首先在MyComp.BulkModule.EntityFrameworkCore项目中添加DataBulkDbContext类

using Microsoft.EntityFrameworkCore;
using MyComp.BulkModule.Entities;

namespace MyComp.BulkModule.EntityFrameworkCore
{
    public class DataBulkDbContext : DbContext
    {
        public DbSet<TableA> Members { get; set; }

        public DataBulkDbContext(DbContextOptions<DataBulkDbContext> options)
            : base(options)
        {

        }

        protected override void OnModelCreating(ModelBuilder builder)
        {
            base.OnModelCreating(builder);

            builder.Entity<TableA>(b =>
            {
                //Configure table & schema name
                b.ToTable(BulkModuleDbProperties.DbTablePrefix + "TableA", BulkModuleDbProperties.DbSchema);

                // 显示设置主键
                b.HasKey(b => b.Id);
                // 批处理扩展包中要求主键值类型为ValueGeneratedNever,默认为ValueGeneratedOnAdd,会报错
                b.Property(t => t.Id).ValueGeneratedNever();

                // 如果有扩展字段,忽略掉
                //b.Ignore(b => b.ExtraProperties);

            });
        }
    }
}

????????注意这里 基类是DbContext而不是AbpDbContext,可以跟默认的BulkModuleDbContext比较不同;DataBulkDbContext仅用作批处理操作,不影响到数据迁移,只需要设置表名和主键字段即可,有唯一索引也需要设置

? ? ? ? ?接着在主模块MyComp.BulkTest.EntityFrameworkCore项目的BulkTestEntityFrameworkCoreModule类中添加如下代码:

public class BulkTestEntityFrameworkCoreModule : AbpModule
{
    ...

    public override void ConfigureServices(ServiceConfigurationContext context)
    {
        ...

        ...

        var configuration = context.Services.GetConfiguration();

        context.Services.AddDbContext<DataBulkDbContext>(options =>
        {
            options.UseMySql(configuration["ConnectionStrings:Default"], MySqlServerVersion.LatestSupportedServerVersion, o => o.UseBulk());
        });
    }
}

六、添加批处理测试代码

? ? ? ? 1、在测试模块MyComp.BulkModule.Domain中添加数据仓库接口

? ? ? ? Abp常规数据仓库接口ITableARepository

using System;
using Volo.Abp.Domain.Repositories;

namespace MyComp.BulkModule.Entities
{
    public interface ITableARepository : IBasicRepository<TableA, Guid>
    {
    }
}

? ? ? ? 批处理数据仓库接口ITableABulkRepository

using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace MyComp.BulkModule.Entities
{
    public interface ITableABulkRepository
    {
        Task UpsertManyAsync(IEnumerable<TableA> members, CancellationToken cancellationToken = default);
    }
}

? ? ? ? 2、在MyComp.BulkModule.EntityFrameworkCore中添加数据仓库实现

????????Abp常规数据仓库实现

using MyComp.BulkModule.Entities;
using System;
using Volo.Abp.Domain.Repositories.EntityFrameworkCore;
using Volo.Abp.EntityFrameworkCore;

namespace MyComp.BulkModule.EntityFrameworkCore.Repositories
{
    public class TableARepository : EfCoreRepository<IBulkModuleDbContext, TableA, Guid>, ITableARepository
    {
        public TableARepository(IDbContextProvider<IBulkModuleDbContext> dbContextProvider) : base(dbContextProvider)
        {
        }
    }
}

????????批处理数据仓库实现

using Microsoft.EntityFrameworkCore;
using MyComp.BulkModule.Entities;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;

namespace MyComp.BulkModule.EntityFrameworkCore.Repositories
{
    public class TableABulkRepository : ITableABulkRepository, ITransientDependency
    {
        private readonly DataBulkDbContext _bulkContext;
        public TableABulkRepository(DataBulkDbContext dbContext)
        {
            _bulkContext = dbContext;
        }
        /// <summary>
        /// 批量插入或更新
        /// </summary>
        /// <param name="importData">数据源</param>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        public async Task UpsertManyAsync(IEnumerable<TableA> importData, CancellationToken cancellationToken = default)
        {
            var ms = importData.Select(t => new { t.Id, t.Code, t.Name, t.Desc });

            await _bulkContext.TableA.UpsertAsync(ms,
                insertExpression:
                    s => new TableA { Id = s.Id, Code = s.Code, Name = s.Name, Desc = s.Desc },
                updateExpression: 
                    (rc1, rc2) => new TableA { Code = rc2.Code, Name = rc2.Name, Desc = rc2.Desc }
                );
        }
    }
}

? ? ? ? 3、Service服务实现

? ? ? ? 修改MyComp.BulkModule.Application.Contracts项目ISampleAppService接口如下?

public interface ISampleAppService : IApplicationService
{
    ...

    /// <summary>
    /// abp常规插入
    /// </summary>
    /// <returns></returns>
    Task InsertManyAsync();
    /// <summary>
    /// 批量插入或更新
    /// </summary>
    /// <returns></returns>
    Task BulkUpsertAsync();
}

? ? ? ? 修改MyComp.BulkModule.Application项目SampleAppService类如下:?

using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Authorization;
using MyComp.BulkModule.Entities;
using Volo.Abp.Uow;

namespace MyComp.BulkModule.Samples;

public class SampleAppService : BulkModuleAppService, ISampleAppService
{

    private ITableARepository _tableARepository;
    private ITableABulkRepository _tableABulkRepository;
    public SampleAppService(ITableARepository tableARepository, ITableABulkRepository tableABulkRepository)
    {
        _tableARepository = tableARepository;
        _tableABulkRepository = tableABulkRepository;
    }

    public async Task InsertManyAsync()
    {
        var data = new List<TableA>();

        for(int i = 0; i< 20000; i++)
        {
            data.Add(new TableA(GuidGenerator.Create(), i, $"Test{i}"));
        }

        await _tableARepository.InsertManyAsync(data);

    }

    public async Task BulkUpsertAsync()
    {
        var data = new List<TableA>();

        for (int i = 0; i < 20000; i++)
        {
            data.Add(new TableA(GuidGenerator.Create(), i, $"Test{i}"));
        }

        await _tableABulkRepository.UpsertManyAsync(data);
    }

    。。。

}

? ? ? ?这里分别使用常规方法和批量方法插入20000条记录

????????修改MyComp.BulkModule.HttpApi项目的SampleController类如下:

using System.Threading.Tasks;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Volo.Abp;

namespace MyComp.BulkModule.Samples;

[Area(BulkModuleRemoteServiceConsts.ModuleName)]
[RemoteService(Name = BulkModuleRemoteServiceConsts.RemoteServiceName)]
[Route("api/BulkModule/sample")]
public class SampleController : BulkModuleController, ISampleAppService
{
    private readonly ISampleAppService _sampleAppService;

    public SampleController(ISampleAppService sampleAppService)
    {
        _sampleAppService = sampleAppService;
    }


    [HttpPost]
    [Route("insert")]
    public Task InsertManyAsync()
    {
        return _sampleAppService.InsertManyAsync();
    }
    [HttpPost]
    [Route("bulk")]
    public Task BulkUpsertAsync()
    {
        return _sampleAppService.BulkUpsertAsync();
    }
}

七、执行批量插入

? ? ? ? 首先启动MyComp.BulkTest.HttpApi.Host项目,在浏览器打开https://localhost:44347/swagger/index.html,找到Sample控制器,执行insert和bulk方法,完成后查看审计日志

?可以看出常规方法,要100秒,而且随着记录数增加,时间增加更多

?同样插入20000条记录,很明显,批量方法速度快很多,才1秒左右

本章源码:AbpMysqlBulkSample: Abp 框架Mysql批量操作示例 (gitee.com)

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-04 15:39:43  更:2022-03-04 15:40:55 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 21:14:36-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码