2023年6月21日发(作者:)

SQL⼤批量插⼊数据的⽅式(多表关联).前段时间,在⼯作中遇到这个需求,需要⼤批量插⼊⼏万条甚⾄⼏⼗万的数据。因为业务特殊,多张表的相互关联,通常做法是先往主表⾥⾯插⼊⼀条数据,然后获取主表的主键ID,再往其他关联的表⾥⾯插⼊ID的关联数据。刚开始做的时候,想到⽤事务,把⼏万条SQL拼装起来,在⼀个事务⾥⾯去执⾏,结果很壮烈,执⾏性能⾮常糟糕。⼏千条业务数据执⾏了⼏分钟。⽤代码分析⼯具Dottrace⼀查,发现单单操作数据库的时间占了99.9%。(Dottrace,代码性能分析⼯具,它分dottrace Performance和dottrace Memory两个⼯具,dottrace Performance⽤来分析代码性能,⽐如函数执⾏时间,调⽤次数,消耗时间⽐率等,dottrace Memory⼀般⽤来分析内存占⽤情况。⼤家如果有兴趣的同学可以去下载玩下,对代码优化⼯作很有帮助的。)⾔归正传,去⽹上搜了很多资料,原来2.0有⼀个新的特性:SqlBulkCopy,效率还是很⾼的。然后结合⾃⼰的业务需求,修改了下代码。现在跟⼤家⼀起来学习下。(因为也是参考前辈写的资料,所以下⾯的⽰例都⼤同⼩异)建⽴测试数据库(BulkTestDB)、主表(BulkTestMain)、从表(BulkTestDetail)--Create DataBase

create database BulkTestDB;

go

use BulkTestDB;

go

--Create Table

Create table BulkTestMain(

Id int primary key,

GuidId long,--辅助的唯⼀标识

Batch long,--导⼊的批次标识

Name nvarchar(32)

go

Create table BulkTestDetail(

Id int primary key,

PId int,

Lesson nvarchar(32)

go

数据库建⽴完毕,开始编写后台代码View Codepublic void TestMain() { using (SqlConnection connection = new SqlConnection("你的链接字符串")) { (); SqlTransaction transaction = ransaction("Transaction1"); DataTable dtTestMain= GetTableSchema("BulkTestMain");//构建BulkTestMain表结构 DataTable dtTestDetail = GetTableSchema("BulkTestDetail");//构建BulkTestDetail表结构 Guid Batch = d();//插⼊的批次,为后⾯查询dtTestMainTmp 做条件 for (int i = 0; i < 1000000; i++)//测试100w条数据 { DataRow dr= (); Guid newGuid = d(); dr["_GuidId"] = newGuid; dr["_Batch"] = Batch; dr["_UserName"] = "测试" + ng(); (dr); for(int j = 0;j<10;j++)//给从表每次插⼊10条数据 { DataRow dr1 = (); dr1["_GuidId"]= newGuid; dr1["_Lesson"]="课程"+ng(); (dr1); } //这样做的⽬的,让主表与从表可以临时通过GuidId关联起来 } BulkToDB(dtTestMain, "BulkTestMain", connection, transaction);//先让BulkTestMain插⼊了⼤量的数据,注意这些数据是临时的,在SqlTransaction提交之前,查询时要⽤with(nolock)

DataSet dtTestMainTmp = GetNewImportData(ng());//好吧,我们来查询下,刚才⼤量插⼊的10w条数据,这⾥只需要查询标识的2列字段 Dictionary dicGuidToID = new Dictionary(); foreach (DataRow dr in [0].Rows) { (dr[1].ToString(), 64(dr[0])); }//dicGuidToID:guid字段与插⼊的主表ID字段关联起来成字典,⽤字典是为了访问起来效率(为什么获取字典key的值效率很⾼,有兴趣的可以去研究“散列表”的概念) foreach (DataRow dr in )//现在给dtTestDetail的PId字段赋值(PId字段与主表Id外键关联) { dr["_PId"] = dicGuidToID[dr["_GuidId"].ToString()].ToString(); } ("_GuidId");//移除dtTestDetail的GuidId字段,使它与数据库列匹配 BulkToDB(dtTestDetail,"BulkTestDetail",connection, transaction);//给从表插⼊数据 ();

(); } } ///

/// 根据批次Batch获取导进来的临时数据 /// /// public static DataSet GetNewImportData(string batch) { StringBuilder strSql = new StringBuilder(); ("SELECT [Id],[GuidId]") .Append(" FROM ContactInfo WITH (NOLOCK) WHERE Batch=@batch"); SqlParameter[] parameters = { new SqlParameter("@Batch", ){Value = batch} }; DataSet ds = eDataset(ng(), parameters); return ds; } public static void BulkToDB(DataTable dtSource, string TableName,SqlConnection connection, SqlTransaction transaction) { using (SqlBulkCopy sqlBulkCopy = new SqlBulkCopy(connection, entity, transaction)) { ationTableName = TableName;//要插⼊数据的表的名称 ize = ;//数据的⾏数 List mpList = getMapping(TableName);//获取表映射关系 foreach (SqlBulkCopyColumnMapping mp in mpList) { (mp); } if (dtSource != null && != 0) { oServer(dtSource);//插⼊数据 } } } public static List getMapping(string TableName) { List mpList = new List(); switch(TableName) { case "BulkTestMain":{ (new SqlBulkCopyColumnMapping("_Id", "Id")); (new SqlBulkCopyColumnMapping("_GuidId", "GuidId")); (new SqlBulkCopyColumnMapping("_Batch","Batch")); (new SqlBulkCopyColumnMapping("_UserName", "UserName")); }break; case "BulkTestDetail":{ (new SqlBulkCopyColumnMapping("_Id", "Id")); (new SqlBulkCopyColumnMapping("_PId", "PId")); (new SqlBulkCopyColumnMapping("_Lesson", "Lesson")); }break; } return mpList; } private static DataTable GetTableSchema(string TableName) { DataTable dataTable = new DataTable(); switch(TableName) { case "BulkTestMain" :{

ge(new DataColumn[] {

new DataColumn("_Id",typeof(Int32)), new DataColumn("_GuidId",typeof(Int64)), new DataColumn("_Batch",typeof(Int64)), new DataColumn("_UserName",typeof(String)) });}break; case "BulkTestDetail":{ ge(new DataColumn[] {

new DataColumn("_Id",typeof(Int32)), new DataColumn("_PId",typeof(Int32)), new DataColumn("_GuidId",typeof(Int64)), new DataColumn("_Lesson",typeof(String))}); }break; } return dataTable; }

总算把代码copy完了,任务完成。多表关联批量插⼊数据库的⽅法,欢迎⼤家批评指正。

2023年6月21日发(作者:)

SQL⼤批量插⼊数据的⽅式(多表关联).前段时间,在⼯作中遇到这个需求,需要⼤批量插⼊⼏万条甚⾄⼏⼗万的数据。因为业务特殊,多张表的相互关联,通常做法是先往主表⾥⾯插⼊⼀条数据,然后获取主表的主键ID,再往其他关联的表⾥⾯插⼊ID的关联数据。刚开始做的时候,想到⽤事务,把⼏万条SQL拼装起来,在⼀个事务⾥⾯去执⾏,结果很壮烈,执⾏性能⾮常糟糕。⼏千条业务数据执⾏了⼏分钟。⽤代码分析⼯具Dottrace⼀查,发现单单操作数据库的时间占了99.9%。(Dottrace,代码性能分析⼯具,它分dottrace Performance和dottrace Memory两个⼯具,dottrace Performance⽤来分析代码性能,⽐如函数执⾏时间,调⽤次数,消耗时间⽐率等,dottrace Memory⼀般⽤来分析内存占⽤情况。⼤家如果有兴趣的同学可以去下载玩下,对代码优化⼯作很有帮助的。)⾔归正传,去⽹上搜了很多资料,原来2.0有⼀个新的特性:SqlBulkCopy,效率还是很⾼的。然后结合⾃⼰的业务需求,修改了下代码。现在跟⼤家⼀起来学习下。(因为也是参考前辈写的资料,所以下⾯的⽰例都⼤同⼩异)建⽴测试数据库(BulkTestDB)、主表(BulkTestMain)、从表(BulkTestDetail)--Create DataBase

create database BulkTestDB;

go

use BulkTestDB;

go

--Create Table

Create table BulkTestMain(

Id int primary key,

GuidId long,--辅助的唯⼀标识

Batch long,--导⼊的批次标识

Name nvarchar(32)

go

Create table BulkTestDetail(

Id int primary key,

PId int,

Lesson nvarchar(32)

go

数据库建⽴完毕,开始编写后台代码View Codepublic void TestMain() { using (SqlConnection connection = new SqlConnection("你的链接字符串")) { (); SqlTransaction transaction = ransaction("Transaction1"); DataTable dtTestMain= GetTableSchema("BulkTestMain");//构建BulkTestMain表结构 DataTable dtTestDetail = GetTableSchema("BulkTestDetail");//构建BulkTestDetail表结构 Guid Batch = d();//插⼊的批次,为后⾯查询dtTestMainTmp 做条件 for (int i = 0; i < 1000000; i++)//测试100w条数据 { DataRow dr= (); Guid newGuid = d(); dr["_GuidId"] = newGuid; dr["_Batch"] = Batch; dr["_UserName"] = "测试" + ng(); (dr); for(int j = 0;j<10;j++)//给从表每次插⼊10条数据 { DataRow dr1 = (); dr1["_GuidId"]= newGuid; dr1["_Lesson"]="课程"+ng(); (dr1); } //这样做的⽬的,让主表与从表可以临时通过GuidId关联起来 } BulkToDB(dtTestMain, "BulkTestMain", connection, transaction);//先让BulkTestMain插⼊了⼤量的数据,注意这些数据是临时的,在SqlTransaction提交之前,查询时要⽤with(nolock)

DataSet dtTestMainTmp = GetNewImportData(ng());//好吧,我们来查询下,刚才⼤量插⼊的10w条数据,这⾥只需要查询标识的2列字段 Dictionary dicGuidToID = new Dictionary(); foreach (DataRow dr in [0].Rows) { (dr[1].ToString(), 64(dr[0])); }//dicGuidToID:guid字段与插⼊的主表ID字段关联起来成字典,⽤字典是为了访问起来效率(为什么获取字典key的值效率很⾼,有兴趣的可以去研究“散列表”的概念) foreach (DataRow dr in )//现在给dtTestDetail的PId字段赋值(PId字段与主表Id外键关联) { dr["_PId"] = dicGuidToID[dr["_GuidId"].ToString()].ToString(); } ("_GuidId");//移除dtTestDetail的GuidId字段,使它与数据库列匹配 BulkToDB(dtTestDetail,"BulkTestDetail",connection, transaction);//给从表插⼊数据 ();

(); } } ///

/// 根据批次Batch获取导进来的临时数据 /// /// public static DataSet GetNewImportData(string batch) { StringBuilder strSql = new StringBuilder(); ("SELECT [Id],[GuidId]") .Append(" FROM ContactInfo WITH (NOLOCK) WHERE Batch=@batch"); SqlParameter[] parameters = { new SqlParameter("@Batch", ){Value = batch} }; DataSet ds = eDataset(ng(), parameters); return ds; } public static void BulkToDB(DataTable dtSource, string TableName,SqlConnection connection, SqlTransaction transaction) { using (SqlBulkCopy sqlBulkCopy = new SqlBulkCopy(connection, entity, transaction)) { ationTableName = TableName;//要插⼊数据的表的名称 ize = ;//数据的⾏数 List mpList = getMapping(TableName);//获取表映射关系 foreach (SqlBulkCopyColumnMapping mp in mpList) { (mp); } if (dtSource != null && != 0) { oServer(dtSource);//插⼊数据 } } } public static List getMapping(string TableName) { List mpList = new List(); switch(TableName) { case "BulkTestMain":{ (new SqlBulkCopyColumnMapping("_Id", "Id")); (new SqlBulkCopyColumnMapping("_GuidId", "GuidId")); (new SqlBulkCopyColumnMapping("_Batch","Batch")); (new SqlBulkCopyColumnMapping("_UserName", "UserName")); }break; case "BulkTestDetail":{ (new SqlBulkCopyColumnMapping("_Id", "Id")); (new SqlBulkCopyColumnMapping("_PId", "PId")); (new SqlBulkCopyColumnMapping("_Lesson", "Lesson")); }break; } return mpList; } private static DataTable GetTableSchema(string TableName) { DataTable dataTable = new DataTable(); switch(TableName) { case "BulkTestMain" :{

ge(new DataColumn[] {

new DataColumn("_Id",typeof(Int32)), new DataColumn("_GuidId",typeof(Int64)), new DataColumn("_Batch",typeof(Int64)), new DataColumn("_UserName",typeof(String)) });}break; case "BulkTestDetail":{ ge(new DataColumn[] {

new DataColumn("_Id",typeof(Int32)), new DataColumn("_PId",typeof(Int32)), new DataColumn("_GuidId",typeof(Int64)), new DataColumn("_Lesson",typeof(String))}); }break; } return dataTable; }

总算把代码copy完了,任务完成。多表关联批量插⼊数据库的⽅法,欢迎⼤家批评指正。