摘要.
1. 需求
从 mongodb 数据库的 A 表(或者叫集合)读取数据
经过处理后导入MySQL的 B 表中
其中
A 表有近 2000 万条的数据
需要对每条数据进行分析处理,分析处理过程就不细说了
每条 A 表数据分析处理后可提取生成数条 B 表数据
B 表数据的字段中有 school、speciality 和 post 三个字段,和一个字段 number
导入 B 表中的数据需要通过这三个字段联合去重,并统计重复的条数 number
最终将生成的 B 表数据以及频率字段 number 存储到 MySQL 数据库中
需求不复杂
2. 解决过程
2.1 思路
逐步读取那 2000 万条数据,并进行处理
定义一个 Map,key 值为 school、speciality 和 post 三个字段的拼接值,value 为出现的次数
循环遍历 2000 万条数据
通过 Map 的 key 值进行判断
如果不存在,则将数据通过 insert 语句插入到 MySQL 数据库,同时 put 到 Map 中,key 值为 school、speciality 和 post 三个字段的拼接值,value 为 1
如果存在了,则从 Map 中取出对应的 value,value++,并将 value 值通过 update 语句更新到 MySQL 数据库中,同时更新 Map 中的 value
貌似没问题,运行,程序快速的运行起来了
2.2 遇到问题 1
通过控制台打印的日志,我发现一个问题
所有的 insert 语句都是都是哗哗哗的打印
update 却有明显的停顿感
很明显 update 语句影响了执行效率
特别是当 MySQL 表中数据量达到 200 万 + 之后
数据插入速度明显减慢了
这么搞下去可能要耽误事儿了
2.3 优化 1
update B 表 set number = ?where school = ? and speciality = ? and post = ?
明显这个语句是执行速度的短板,所以应该从分析这条 sql 语句开始
因为用到了 where 关键字,所以条件查询应该是时间瓶颈
最先应该想到的就是建立索引
所以建立了 school、speciality、post 三个字段的联合索引,索引类型是 normal
晚上执行的建立索引的语句,然后就去睡觉了,第二天早上起来建立好了
200 多万数据,忘了看执行时间了
继续运行,果然速度快了很多
2.4 遇到问题 2
当数据量逐渐变大,达到近千万级时
用来存储频率出现次数的 Map 大小也随之达到了千万级
从内存使用和效率方面,性能都有所下降
所以我就考虑不再通过 Map 记录数据的出现频率
想通过 update B 表 set number = number + 1 where school = ? and speciality = ? and post = ? 来解决
但是这个时候我发现,由于前期代码问题,在程序中断又重启过好多次, school、speciality、post 这三个字段的内容并不是唯一存在的
如果直接使用上边的语句,会导致我统计的 number 数字不准确
所以第一步需要解决的问题就是去重
可以通过以下语句对数据进行多字段判断的去重
DELETE
FROM
speciality\_post
WHERE
( school, speciality, post ) IN (
SELECT
school,
speciality,
post
FROM
( SELECT school, speciality, post FROM speciality\_post GROUP BY school, speciality, post HAVING COUNT( \* ) \> 1 ) s1
)
AND id NOT IN ( SELECT id FROM ( SELECT id FROM speciality\_post GROUP BY school, speciality, post HAVING COUNT( \* ) \> 1 ) s2 );
然后再根据那三个字段建立唯一索引
但是建立索引也是一个很漫长的过程
并且去重之后,如果不重新遍历 2000 万的原始数据
number 也是统计的不准确的
2.5 优化 2
最终决定
雄关漫道真如铁
而今迈步从头越
新建 B 表,并将 school、speciality、post 这三个字段建立联合唯一索引
同时改进 sql 语句,使用 insert ... on duplicate key update 语句,简化代码
然后修改代码,运行
有时候从头再来也是一种策略
可以很大程度的挽回损失或停止损失
3. 总结
3.1 为什么 insert 语句要比 update 语句要快?
update 语句需要先定位数据行位置,需要根据主键索引或条件索引逐行扫描,然后再更新特定字段
而 insert 语句是没有这些开销的
所以 insert 语句肯定要 update 语句快的多
3.2 为什么将 update 语句中的 where 条件字段建立索引后,更新速度就变快了?
因为如果 update 语句中的 where 条件字段没有建立索引,在执行 update 语句的时候是要进行全表扫描的,扫描的过程中对每一行数据进行加锁判断释放锁,这个过程耗时会随着数据量的增加直线上升
而如果加了索引,就能快速准确定位到目标数据行,而且 MySQL 底层使用的是 B + 树建立的索引,所以稳定性也特别好
3.3 解释一下 insert ... on duplicate key update 语句
如果你插入的记录导致一个 UNIQUE 索引或者 primary key(主键) 出现重复,那么就会认为该条记录存在,则执行 update 语句而不是 insert 语句,反之,则执行 insert 语句而不是更新语句
判断数据是否存在是通过判断是否导致了一个 UNIQUE 索引或者 primary key(主键) 出现重复,所以 update 语句是不能跟 where 条件判断的
4. 测试
接下来才是我想说的内容
针对这个过程中的出现的几种情况
我对 update 语句的效率做了一个测试和对比
目标表 B 表中的数据量有 2,013,035 条
执行更新语句 50 次 (本来想用 5000 条,测试的时候才发现,如果是 5000 条我得等一天)
分别测试无索引、normal 索引、unique 索引下 update 和 insert..on duplicate key update 语句的耗时
话不多说上代码
主函数方法
public static void main(String\[\] args) {
DruidUtils.init("localhost", "3306", "test", "root", "root");
String specialityPostNoIndex \= "speciality\_post\_no\_index";
String specialityPostUniqueIndex \= "speciality\_post\_unique\_index";
String specialityPostNormalIndex \= "speciality\_post\_normal\_index";
List<SpecialityPost\> list \= select();
long start \= System.currentTimeMillis();
for (SpecialityPost specialityPost : list) {
update(specialityPost, specialityPostNoIndex);
}
long end \= System.currentTimeMillis();
System.out.println("update语句在无索引情况下耗时:\\t" + (end \- start) + "毫秒\\n");
System.out.println("update语句在无索引情况下平均耗时:\\t" + ((end \- start) / 50) + "毫秒\\n\\n\\n\\n");
start \= System.currentTimeMillis();
for (SpecialityPost specialityPost : list) {
insertOrUpdate(specialityPost, specialityPostNoIndex);
}
end \= System.currentTimeMillis();
System.out.println("insert·· on duplicate key update语句在无索引情况下耗时:\\t" + (end \- start) + "毫秒\\n");
System.out.println("insert·· on duplicate key update语句在无索引情况下平均耗时:\\t" + ((end \- start) / 50) + "毫秒\\n\\n\\n\\n");
start \= System.currentTimeMillis();
for (SpecialityPost specialityPost : list) {
update(specialityPost, specialityPostNormalIndex);
}
end \= System.currentTimeMillis();
System.out.println("update语句在normal索引情况下耗时:\\t" + (end \- start) + "毫秒\\n");
System.out.println("update语句在normal索引情况下平均耗时:\\t" + ((end \- start) / 50) + "毫秒\\n\\n\\n\\n");
start \= System.currentTimeMillis();
for (SpecialityPost specialityPost : list) {
insertOrUpdate(specialityPost, specialityPostNormalIndex);
}
end \= System.currentTimeMillis();
System.out.println("insert·· on duplicate key update语句在normal索引情况下耗时:\\t" + (end \- start) + "毫秒\\n");
System.out.println("insert·· on duplicate key update语句在normal索引情况下平均耗时:\\t" + ((end \- start) / 50) + "毫秒\\n\\n\\n\\n");
start \= System.currentTimeMillis();
for (SpecialityPost specialityPost : list) {
update(specialityPost, specialityPostUniqueIndex);
}
end \= System.currentTimeMillis();
System.out.println("update语句在unique索引情况下耗时:\\t" + (end \- start) + "毫秒\\n");
System.out.println("update语句在unique索引情况下平均耗时:\\t" + ((end \- start) / 50) + "毫秒\\n\\n\\n\\n");
start \= System.currentTimeMillis();
for (SpecialityPost specialityPost : list) {
insertOrUpdate(specialityPost, specialityPostUniqueIndex);
}
end \= System.currentTimeMillis();
System.out.println("insert·· on duplicate key update语句在unique索引情况下耗时:\\t" + (end \- start) + "毫秒\\n");
System.out.println("insert·· on duplicate key update语句在unique索引情况下平均耗时:\\t" + ((end \- start) / 50) + "毫秒\\n\\n\\n\\n");
}
其他用到的方法
private static int update(SpecialityPost specialityPost, String table) {
String sql \= "update " + table + " set number = number+1 where school = ? and speciality = ? and post = ?";
Connection connection \= DruidUtils.getConnection();
PreparedStatement preparedStatement \= null;
try {
preparedStatement \= connection.prepareStatement(sql);
preparedStatement.setString(1, specialityPost.getSchool());
preparedStatement.setString(2, specialityPost.getSpeciality());
preparedStatement.setString(3, specialityPost.getPost());
System.out.println(preparedStatement.toString());
return preparedStatement.executeUpdate();
} catch (SQLException throwables) {
throwables.printStackTrace();
} finally {
DruidUtils.closeAll(connection, preparedStatement, null);
}
return 0;
}
private static int insertOrUpdate(SpecialityPost specialityPost, String table) {
String sql \= "INSERT INTO " + table + " (area,school,degree,original\_speciality,speciality,original\_post,post,number) " +
"VALUE" +
"(?,?,?,?,?,?,?,?) " +
"ON DUPLICATE KEY UPDATE " +
"number = number+1;";
Connection connection \= DruidUtils.getConnection();
PreparedStatement preparedStatement \= null;
try {
preparedStatement \= connection.prepareStatement(sql);
preparedStatement.setString(1, specialityPost.getArea());
preparedStatement.setString(2, specialityPost.getSchool());
preparedStatement.setString(3, specialityPost.getDegree());
preparedStatement.setString(4, specialityPost.getOriginalSpeciality());
preparedStatement.setString(5, specialityPost.getSpeciality());
preparedStatement.setString(6, specialityPost.getOriginalPost());
preparedStatement.setString(7, specialityPost.getPost());
preparedStatement.setInt(8, specialityPost.getNumber());
System.out.println(preparedStatement.toString());
return preparedStatement.executeUpdate();
} catch (SQLException throwables) {
throwables.printStackTrace();
} finally {
DruidUtils.closeAll(connection, preparedStatement, null);
}
return 0;
}
private static List<SpecialityPost\> select() {
String sql \= "select \* from speciality\_post\_unique\_index order by number limit 50";
Connection connection \= DruidUtils.getConnection();
List<SpecialityPost\> list \= new ArrayList<\>();
PreparedStatement preparedStatement \= null;
ResultSet resultSet \= null;
try {
preparedStatement \= connection.prepareStatement(sql);
System.out.println(preparedStatement.toString());
resultSet \= preparedStatement.executeQuery();
while (resultSet.next()) {
SpecialityPost specialityPost \= new SpecialityPost();
specialityPost.setId(resultSet.getInt("id"));
specialityPost.setNumber(resultSet.getInt("number"));
specialityPost.setSchool(resultSet.getString("school"));
specialityPost.setSpeciality(resultSet.getString("speciality"));
specialityPost.setPost(resultSet.getString("post"));
specialityPost.setArea(resultSet.getString("area"));
specialityPost.setOriginalSpeciality(resultSet.getString("original\_speciality"));
specialityPost.setOriginalPost(resultSet.getString("original\_post"));
list.add(specialityPost);
}
} catch (SQLException throwables) {
throwables.printStackTrace();
} finally {
DruidUtils.closeAll(connection, preparedStatement, resultSet);
}
return list;
}
4.1 无索引情况下的 update 语句
4.2 无索引情况下的 insert ... on duplicate key update 语句
注意: 这个之所以执行这么快,不是因为 sql 语句的优化的好,前边说到了,这个语句判断是否更新是判断是否与唯一索引冲突,在这里是没冲突的,所以其实执行的是插入操作
4.3normal 索引情况下的 update 语句
4.4normal 索引情况下的 insert ... on duplicate key update 语句
注意: 这个之所以执行这么快,不是因为 sql 语句的优化的好,前边说到了,这个语句判断是否更新是判断是否与唯一索引冲突,在这里是没冲突的,所以其实执行的是插入操作
4.5unique 索引情况下的 update 语句
4.6unique 索引情况下的 insert ... on duplicate key update 语句
可以看到使用索引和 insert ... on duplicate key update 语句的效率还是极高的
在做类似数据操作的时候,可以参考一下
你有其他更好的办法吗?
文 / 戴先生 @2020 年 6 月 20 日
---end--- https://cloud.tencent.com/developer/article/1691626