环境:
参考: 《博文:SqlBulkCopy使用注意事项》
1. 问题场景
在批量迁移或导入数据时,我们可能会遇到插入大量数据的问题,比如:100万、500万、甚至几千万。 这个时候,如果我们再使用普通的insert插入的话就太慢了。
在sqlserver中,我们可以使用 SqlBulkCopy 进行大数据量的插入。
2. 简单示例
先看个示例:
var connString = "Data Source=192.168.252.129;Initial Catalog=test;User ID=sa;Password=123456;Encrypt=True; TrustServerCertificate=True;";
var conn = new SqlConnection(connString);
conn.Open();
var cmd = conn.CreateCommand();
cmd.CommandText = @"create table test(
id int primary key,
name varchar(50)
)";
cmd.ExecuteNonQuery();
var dt = new DataTable("test");
dt.Columns.Add("id");
dt.Columns.Add("name");
for (var i = 0; i < 5; i++)
{
var row = dt.NewRow();
row["id"] = i;
row["name"] = "name" + i;
dt.Rows.Add(row);
}
var sbc = new SqlBulkCopy(conn, SqlBulkCopyOptions.Default, null);
sbc.DestinationTableName = dt.TableName;
for (int i = 0; i < dt.Columns.Count; i++)
{
sbc.ColumnMappings.Add(dt.Columns[i].ColumnName, dt.Columns[i].ColumnName);
}
sbc.WriteToServer(dt);
conn.Close();
插入的数据如下:
3. 注意点
上面是一个简单的示例,我们在使用的时候需要关注以下几种情况:
- 如果上面表的id是自增的话,那么批量插入时是数据库自增还是使用自己指定的?(
SqlBulkCopyOptions.KeepIdentity ); - 如果上面表的某列有定义默认值,那么批量插入数据时是使用定义的默认值还是自己指定数据的null值?(
SqlBulkCopyOptions.KeepNulls ); - 如果上面表的某列有检查约束,那么批量插入时,是否进行检查以保证数据正确?(
SqlBulkCopyOptions.CheckConstraints ); - 如果上面表定义了insert触发器,那么批量插入数据时,是否触发?(
SqlBulkCopyOptions.FireTrigger ); - 插入时是否内部开启一个事务,以保证数据完整性?(
SqlBulkCopyOptions.UseInternalTransaction ); - 插入时是否获取这个表的表锁(默认是行锁)?(
SqlBulkCopyOptions.TableLock );
上面的这些设置使用如下:
var sbc = new SqlBulkCopy(conn, SqlBulkCopyOptions.KeepIdentity | SqlBulkCopyOptions.KeepNulls null);
上面的配置项,建议取SqlBulkCopyOptions.KeepIdentity | SqlBulkCopyOptions.KeepNulls , 因为在批量插入数据时我们是希望将自己准备的树原封不动的插入进去!!!
另外,我们需要注意,当我们使用SqlBulkCopyOptions.UseInternalTransaction 选项的时候,我们不能再手动指定事务,否则会报错:
除了上面的参数,还有一个需要关注的,就是执行超时时间,默认是30秒,如果数据量太大,可以设置的长一点,如下:
var sbc = new SqlBulkCopy(conn, SqlBulkCopyOptions.Default, null);
sbc.DestinationTableName = dt.TableName;
sbc.BulkCopyTimeout = 60 * 30;
4. 性能测试
下面以测试插入500万条数据耗时结束本次实验:
public static void Main(string[] args)
{
var connString = "Data Source=192.168.252.129;Initial Catalog=test;User ID=sa;Password=123456;Encrypt=True; TrustServerCertificate=True;";
var conn = new SqlConnection(connString);
conn.Open();
var cmd = conn.CreateCommand();
cmd.CommandText = @"create table test(
id int primary key,
name varchar(50),
age int,
addr varchar(50),
birth datetime,
avatar varchar(500),
sex int,
uno varchar(50),
remark varchar(50)
)";
cmd.ExecuteNonQuery();
var st = new Stopwatch();
st.Start();
var dt = new DataTable("test");
dt.Columns.Add("id");
dt.Columns.Add("name");
dt.Columns.Add("addr");
dt.Columns.Add("birth");
dt.Columns.Add("avatar");
dt.Columns.Add("sex");
dt.Columns.Add("uno");
dt.Columns.Add("remark");
var count = 500 * 10000;
for (var i = 0; i < count; i++)
{
var row = dt.NewRow();
row["id"] = i;
row["name"] = "name" + i;
row["addr"] = "天明路" + i;
row["birth"] = DateTime.Now.AddMilliseconds(i);
row["avatar"] = $"http://www.qq.com/1234567878/{i + 1}.png";
row["sex"] = i % 2;
row["uno"] = Guid.NewGuid().ToString();
row["remark"] = $"这是一个关于{i} 的描述!";
dt.Rows.Add(row);
}
Console.WriteLine($"准备数据: 500万,花费: {st.ElapsedMilliseconds} 毫秒!");
st.Restart();
var sbc = new SqlBulkCopy(conn, SqlBulkCopyOptions.KeepIdentity | SqlBulkCopyOptions.KeepNulls, null);
sbc.DestinationTableName = dt.TableName;
sbc.BatchSize = 10000;
sbc.SqlRowsCopied += (object sender, SqlRowsCopiedEventArgs e) =>
{
Console.WriteLine($"已拷贝: {e.RowsCopied} 行,进度: {(e.RowsCopied / (count + 0.0) * 100).ToString("0.00")}%");
};
sbc.NotifyAfter = 10000;
sbc.BulkCopyTimeout = 5 * 60;
for (int i = 0; i < dt.Columns.Count; i++)
{
sbc.ColumnMappings.Add(dt.Columns[i].ColumnName, dt.Columns[i].ColumnName);
}
sbc.WriteToServer(dt);
conn.Close();
st.Stop();
Console.WriteLine($"插入数据: 500万,花费: {st.ElapsedMilliseconds} 毫秒!");
Console.ReadLine();
}
输出如下:
|