上一篇: JVM 基本结构

应用场景

批量入库适用于实时性要求不是很高,但一批入库数据量很大的情况。

有时候批量数据类型并不都是文件类型,比如:计费流程中从Kafka获取到的list类型,如果将list或map等类型的数据转换成文件之后再入库,这种方法效率大大降低。

mysql批量入库语法

LOAD DATA [LOW_PRIORITY | CONCURRENT] [LOCAL] INFILE 'file_name.txt'
 [REPLACE | IGNORE]
 INTO TABLE tbl_name
 [FIELDS
     [TERMINATED BY 'string']
     [[OPTIONALLY] ENCLOSED BY 'char']
     [ESCAPED BY 'char' ]
 ]
 [LINES
     [STARTING BY 'string']
     [TERMINATED BY 'string']
 ]
 [IGNORE number LINES]
 [(col_name_or_user_var,...)]
 [SET col_name = expr,...)]

该处主要讲解一下LOCAL关键字的含义,其他不做讲解。

如果指定了LOCAL,则被认为与连接的客户端有关:

1.如果指定了LOCAL,则文件会被客户主机上的客户端读取,并被发送到服务器。文件会被给予一个完整的路径名称,以指定确切的位置。如果给定的是一个相对的路径名称,则此名称会被理解为相对于启动客户端时所在的目录。

2.如果LOCAL没有被指定,则文件必须位于服务器主机上,并且被服务器直接读取。

当在服务器主机上为文件定位时,服务器使用以下规则:

1)如果给定了一个绝对的路径名称,则服务器使用此路径名称。

2)如果给定了带有一个或多个引导组件的相对路径名称,则服务器会搜索相对于服务器数据目录的文件。

3)如果给定了一个不带引导组件的文件名称,则服务器会在默认数据库的数据库目录中寻找文件。

1.IGNORE忽略 情形

使用IO流的方式直接将流数据导入mysql

批量入库源码

/**
 * 批量导入mysql
 * @param  conn 数据库连接
 * @param  tableName 数据库表名
 * @param  dataList 数据
 * @param  coverType 覆盖类型
 * @param  fieldSplit 字段分隔符
 * @param  fields 如:(field1, field3) 定义导入字段
 * @param  SQLException  设定文件 
 * @return void 返回类型 
 */
public static void loadFile(Connection conn, String tableName, List<String> dataList, String coverType, String fieldSplit, String fields) throws SQLException {
	StringBuffer loadDataSql = new StringBuffer();
	loadDataSql.append("LOAD DATA LOCAL INFILE 'sql.csv' ");
	loadDataSql.append(coverType);
	loadDataSql.append(" INTO TABLE ");
	loadDataSql.append(tableName);
	loadDataSql.append(" FIELDS TERMINATED BY '");
	loadDataSql.append(fieldSplit);
	loadDataSql.append("' LINES TERMINATED BY '");
	loadDataSql.append(lineSplit);
	loadDataSql.append("' ");
	loadDataSql.append(fields);
	System.out.println(loadDataSql.toString());
	conn.setAutoCommit(false);
	PreparedStatement pt = conn.prepareStatement(loadDataSql.toString());
	com.mysql.jdbc.PreparedStatement sqlStatement = null;
	if (pt.isWrapperFor(com.mysql.jdbc.Statement.class)) {
		sqlStatement = pt.unwrap(com.mysql.jdbc.PreparedStatement.class);
	}
	
	StringBuilder sb = new StringBuilder();
	for(String line : dataList){
		sb.append(line);
		sb.append(lineSplit);
	}
	byte[] bytes = sb.toString().getBytes();
	InputStream in = new ByteArrayInputStream(bytes);
	sqlStatement.setLocalInfileInputStream(in);
	int rows = sqlStatement.executeUpdate();
	try {
		sqlStatement.close();
		in.close();
	} catch (IOException e) {
		e.printStackTrace();
	}
	System.out.println("importing "+rows+" rows data into mysql ");
}

(1)【操作】 针对4个字段的批量入库,field1为tb_test表的主键

	Connection conn = CdrDBMgr.getConnection("xx");
	List<String> dataList = new ArrayList<String>();
	for(int i=0;i<5;i++){
		String s = "A"+i+",b,c,d";
		dataList.add(s);
	}
	String fieldSql = "(field1,field2,field3,field4)";
	System.out.println(DataStore2.storeCdr(conn, "tb_test", dataList, fieldSql));

【SQL】 LOAD DATA LOCAL INFILE ‘sql.csv’ IGNORE INTO TABLE tb_test FIELDS TERMINATED BY ‘,’ LINES TERMINATED BY ‘’ (field1,field2,field3,field4)

【结果】

1

(2)【操作】 修改 for(int i=0;i<7;i++)…String s = “A”+i+”,bb,cc,dd”; 预期结果是已有数据不会变化,新插入两条数据

【结果】

2

(3)【操作】 修改 String fieldSql = “(field1,@2,field3,field4,@5) set field2=concat(@2,10),field6=str_to_date(@5,’%Y-%m-%d %H:%i:%s’)”;

【SQL】 LOAD DATA LOCAL INFILE ‘sql.csv’ IGNORE INTO TABLE tb_test FIELDS TERMINATED BY ‘,’ LINES TERMINATED BY ‘ ‘ (field1,@2,field3,field4,@5) set field2=concat(@2,10),field6=str_to_date(@5,’%Y-%m-%d %H:%i:%s’)

【结果】

3

2.REPLACE覆盖 情形

使用IO流的方式直接将流数据导入mysql

(1)【操作】 针对4个字段的批量入库,field1为tb_test表的主键

	Connection conn = CdrDBMgr.getConnection("xx");
	List<String> dataList = new ArrayList<String>();
	for(int i=0;i<3;i++){
		String s = "A"+i+",b,c,d,2008-4-2 15:3:28,f,g";
		dataList.add(s);
	}
	String fieldSql = "(field1,field2,field3,field4)";
	System.out.println(DataStore2.replaceCdr(conn, "tb_test", dataList, fieldSql));

【SQL】 LOAD DATA LOCAL INFILE ‘sql.csv’ REPLACE INTO TABLE tb_test FIELDS TERMINATED BY ‘,’ LINES TERMINATED BY ‘’ (field1,field2,field3,field4)

【结果】 [ WARN] batch storage failure: Row 1 was truncated; it contained more data than there were input columns

说明:使用关键字时,入库数据字段必须与定义的数据库字段个数一致,否则报错

上面问题修改 String fieldSql = “(field1,@2,field3,field4,@5,@6,@7)”;

4

原因分析:

从打印的日志看 importing 6 rows data into mysql,入库了3条记录但是数据库执行了6次,实际上REPLACE情形下是通过先delete后再执行insert的方法,所以不导入的字段会变为空,这一点使用时需要特别注意。

3.UPDATE更新 情形

**批量入库源码 **

/**
 * 批量更新mysql
 * @param  conn 数据库连接
 * @param  tableName 数据库表名
 * @param  dataList 数据
 * @param  fields 如:(field1, field3) 定义更新字段
 * @param  SQLException  设定文件 
 * @return void 返回类型 
 */
public static void loadFile(Connection conn, String tableName, List<String> dataList, String fields) throws SQLException {
	StringBuffer loadDataSql = new StringBuffer();
	loadDataSql.append("insert into ");
	loadDataSql.append(tableName);
	loadDataSql.append(fields);
	loadDataSql.append(" values ");
	for(int i=0;i<dataList.size();i++){
		loadDataSql.append("(");
		loadDataSql.append(dataList.get(i));
		loadDataSql.append(")");
		if(i != dataList.size()-1){
			loadDataSql.append(",");
		}
	}
	loadDataSql.append(" on duplicate key update ");
	loadDataSql.append(" field2=values(field2), field3=values(field3), field4=values(field4)");
	System.out.println(loadDataSql.toString());
	conn.setAutoCommit(false);
	PreparedStatement pt = conn.prepareStatement(loadDataSql.toString());
	System.out.println(pt.execute());
}

(1)【操作】 针对4个字段的批量入库,field1为tb_test表的主键

	Connection conn = CdrDBMgr.getConnection("xx");
	List<String> dataList = new ArrayList<String>();
	for(int i=0;i<5;i++){
		String s = "'A"+i+"','bxx','cxx','dxx'";
		dataList.add(s);
	}
	String fieldSql = "(field1,field2,field3,field4)";
	System.out.println(DataStore2.updateCdr(conn, "tb_test", dataList, fieldSql));

【SQL】

insert into tb_test(field1,field2,field3,field4) values 
('A0','bxx','cxx','dxx'),
('A1','bxx','cxx','dxx'),
('A2','bxx','cxx','dxx'),
('A3','bxx','cxx','dxx'),
('A4','bxx','cxx','dxx') 
on duplicate key update  field2=values(field2), field3=values(field3), field4=values(field4)

【结果】

5

原有的字段field6中有两行值并没有被置为空,可以满足业务需要,但该方法效率比REPLACE略慢,需要结合情形使用。

另外:

-- 可以只更新部分字段
insert into tb_test(field1,field2,field3,field4) 
values ('A0','b1','c1','d1'),('A1','b1','c1','1')
on duplicate key update  field2=values(field2)

-- 同样支持函数表达式
insert into tb_test(field1,field2,field3,field4) 
values ('A0','b11','c11','2008-4-1 15:3:21'),('A1','b1','c1','2008-4-2 15:3:26')
on duplicate key update  field2=values(field2), field3=values(field3), field6=str_to_date(values(field4),'%Y-%m-%d %H:%i:%s')