今天继续解决最后的两方法,不过掌柜觉得最后一个方法其实已经在上一篇的存储过程里面实现了👉事务+有序的多次插入。那么这篇主要就谈合并单条SQL语句为多条再导入这个方法。
PS:掌柜翻遍全网发现好多文章都只谈到批量合并数据可以提升数据插入的性能,但是很多都没有提到如何把这多行值批量进行合并为一行。。。很多都只说把单条SQL语句插入:
改成合并多条的值为一条,再插入:
但是这是百万或千万亿级的数据啊!!!总不能让我手动进行多行合并为一行的操作吧!于是就有了下面👇的用Python批量合并多行的操作🧐。
首先要合并这个100万行数据,到底多少行合并为一行?掌柜觉得每1000行合并为1行正好,只需要插入1000次。(思路就是这样的,但是看似简单的每1000行合并为1行,然后进行1000次插入的操作中间有一两个小坑😂。。。)
-
掌柜首先把这100万行数据划分为1000个小文件,每个里面都有1000行数据;然后对每个小文件依次进行读取,再进行合并操作的时候遇到第一个小坑👉:Python读取文件不是按照文件标号顺序读取的! 出现了下面这样的情况:
解决办法掌柜已经在这篇文章中给出来了👇:
Python读取文件夹下多个文件,却不是按数字标号从小到大的顺序读取的解决办法 -
接着解决了文件读取顺序的问题后,本以为可以顺利的导入新的文件,就突然报错如下:
后来一查发现原因是:进行多行数据合并的时候发生了只有第一个文件的末尾有分号,但是从第二个文件开始就没有。。。而是逗号结尾。。。那么解决办法就是加入一个IF判断,并从第二个文件开始去除合并后末尾的逗号,再添加个分号:
if file_name != 'E:/user_gender/user_0.sql':
with open(file_name, 'r',encoding='utf-8') as f:
for line in f:
#先用strip去掉每行的多余字符串和空格,然后用split按照分号和换行符对每行进行切分;
#再用join和逗号拼接每部分为一个整体,即得到我们要的前1000个数据的合并值
row_user += ','.join(line.strip('INSERT INTO `user_gender` VALUES').split(';\n'))
#print(row_user)
#最后把第一行的1000个合并值按照SQL语法进行拼接
new_row = 'INSERT INTO user_gender VALUES ' + row_user
#print(new_row)
sql_file.write('\n'+new_row.strip(',')+';') #注意这里从第二个文件开始就要去掉合并后产生的末尾逗号,再添加分号来划分插入的句子
sql_file.close()
else:
with open(file_name, 'r',encoding='utf-8') as f:
for line in f:
row_user += ','.join(line.strip('INSERT INTO `user_gender` VALUES').split(';\n'))
new_row = 'INSERT INTO user_gender VALUES ' + row_user
sql_file.write('\n'+new_row) #第一个文件不需要修改末尾,合并后本身就是分号结尾。。。
sql_file.close()
最后就成功生成新的sql插入文件:合并了每1000行为1行,总共就只有1000行数据。
下面先给出切分100万行大文件为1000个小文件,每个含1000行数据的代码部分:
#首先批量划分这100万数据为1000个小文件
f = open('user_gender_value.sql', 'r',encoding='utf-8') #打开文件
i = 0 #设置计数器
while i <= 1000000 : #这里1000000表示文件行数
with open('E:/user_gender/user_'+str(i)+'.sql','w', encoding='utf-8') as f1:
for j in range(1,1001) : #这里设置每个子文件的大小
if i <= 1000000: #这里判断是否已结束,否则最后可能报错
f1.writelines(f.readline())
i = i + 1
else:
break
接着是批量合并多行为一行的全部代码部分:
# -*- coding:utf-8 -*-
import os
#定义一个可以重复写入新生成的每1000行合并为1行数据的函数,参数就是刚才分割的那1000个小文件
def insert_data_1000(file_name):
sql_file = open('E:/1000000_data.sql','a', encoding = 'utf-8')
row_user = ''
#这里需要进行一个文件的判断
if file_name == 'E:/user_gender/user_0.sql':
with open(file_name, 'r',encoding='utf-8') as f:
for line in f:
#先用strip去掉每行的多余字符串和空格,然后用split按照分号和换行符对每行进行切分;
#再用join和逗号拼接每部分为一个整体,即得到我们要的前1000个数据的合并值
row_user += ','.join(line.strip('INSERT INTO `user_gender` VALUES').split(';\n'))
new_row = 'INSERT INTO user_gender VALUES ' + row_user #最后把第一行的1000个合并值按照SQL语法进行拼接
sql_file.write(new_row) #第一个文件不需要修改末尾,合并后本身就是分号结尾。。。
sql_file.close()
else:
with open(file_name, 'r',encoding='utf-8') as f:
for line in f:
row_user += ','.join(line.strip('INSERT INTO `user_gender` VALUES').split(';\n'))
new_row = 'INSERT INTO user_gender VALUES ' + row_user
sql_file.write('\n'+new_row.strip(',')+';') #注意:从第二个文件开始就要去掉合并后产生的末尾逗号,再添加分号来划分插入的句子
sql_file.close()
if __name__ == '__main__':
path = 'E:/user_gender'
fileslist = os.listdir(path)
#先定义一个排序的空列表
sort_num_list = []
for file in fileslist:
sort_num_list.append(int(file.split('user_')[1].split('.sql')[0])) #去掉前面的字符串和下划线以及后缀,只留下数字并转换为整数方便后面排序
sort_num_list.sort() #然后再重新排序
#print(sort_num_list)
#接着再重新排序读取文件
sorted_files = []
for sort_num in sort_num_list:
for file in fileslist:
if str(sort_num) == file.split('user_')[1].split('.sql')[0]:
sorted_files.append(file)
#print(sorted_file)
for sorted_file in sorted_files:
file_name = path + '/' + sorted_file #再依次读取这切割好的1000个小文件
insert_data_1000(file_name) #最后进行每1000行就合并为1行的操作
(总觉得上面的批量合并数据方法还可以再优化,但是暂时没有想到更好的解法,以后有了再更。)
最后的最后就是导入新合并生成的文件到MySQL数据库里面:
经过对比时间,每插入1000行数据,进行1000次插入的操作这种方法用来进行100万数据导入,只花了2分钟左右,效率还是不错的!😁
最后的最后的最后说一下这六种方法的一个比较:
第一种source直接暴力导入是最不可取的!!!相对来说第二、三、五、六方法更优(目前的数据来看,最优导入百万数据的方法是第二种Load Data Infile!!! 不过这里要注意一下导入的时候数据列的格式问题);至于第四种方法–更换引擎,虽然导入数据是相对快了一些但是对后面的查询等操作就影响有点大了,不是很建议更换引擎这种方法!
总算完结了这部分,下面继续之前的回炉重造😄部分。