使用python实现多(K)路归并外部排序,解决小内存排序大文件问题
上一篇中,我们实现了一般的归并排序 归并排序递归与非递归-Python实现
在实际工作中,多个有序数列合并成一个,大文件或多个大文件合并成一个并排序的需求常见并不少见,首先,先来看一下多个有序数列情况
# 合并多个有序数组
比如现在有四路:
- a0: [1, 3, 6, 7]
- a1: []
- a2: [3, 5, 7, 19]
- a3: [9, 12, 87, 98]
# 保存每路最小值
第一步需要知道每一路的最小值,如果每一路用数组表示的话需要保存对应的下标,并保存为min_map
- 第0路: 1
- 第1路: 没有值
- 第2路: 3
- 第3路: 9
初始的 min_map
:
{0: (1, 0), 2: (3, 0), 3: (9, 0)}
# 获取最小值中的最小值
第二部需要将最小值取出来然,后检查被取出值的那一路是否还剩下。
其他元素如果存在,则修改min_map里面对应的值,如果不存在,则删除掉min_map里面对应的记录,以表示该路已经没有元素需要遍历了
代码:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# 多路归并: 将已经排序好的多个数组合并起来
def nw_merge(arrs):
"""
需要知道每一路的最小值
第0路: 1
第1路: 没有值
第2路: 3
第3路: 9
"""
result = []
min_map = {} # 用min_map 保存每一路的当前最小值
for inx, arr in enumerate(arrs):
if arr:
min_map[inx] = (arr[0], 0)
print("初始化的每一路最小值min_map", min_map)
while min_map:
"""
需要知道每一路的最小值里面哪一路的最小值, 以及最小值所在的那一路的index
"""
min_ = min(min_map.items(), key = lambda m: m[1][0])
way_num, (way_min_v, way_inx) = min_
result.append(way_min_v)
"""
检查被取出值的那一路是否还剩下其他元素, 如果存在, 则修改min_map里面对应的值, 如果不存在,
则删除掉min_map里面对应的记录, 以表示该路已经没有元素需要遍历了
"""
way_inx += 1
if way_inx < len(arrs[way_num]):
min_map[way_num] = (arrs[way_num][way_inx], way_inx)
else:
del min_map[way_num]
return result
a0 = [1, 3, 6, 7]
a1 = []
a2 = [3, 5, 7, 19]
a3 = [9, 12, 87, 98]
arrs = [a0, a1, a2, a3]
print("a0:", a0)
print("a1:", a1)
print("a2:", a2)
print("a3:", a3)
result = nw_merge(arrs)
print("最终合并的:", result)
# 输出
"""
a0: [1, 3, 6, 7]
a1: []
a2: [3, 5, 7, 19]
a3: [9, 12, 87, 98]
初始化的每一路最小值min_map {0: (1, 0), 2: (3, 0), 3: (9, 0)}
"""
# 最终合并的:
[1, 3, 3, 5, 6, 7, 7, 9, 12, 19, 87, 98]
# 对超大文件排序(10G的日志,512M的内存)
绕不开归并核心思想,分治,先拆成小文件,再排序,最后合并所有碎片文件成一个大文件
# 拆文件
首先第一步,大文件拆分成 x 个 block_size 大的小文件,每个小文件排好序
def save_file(l, fileno):
filepath = f"/home/xxx/{fileno}"
f = open(filepath, 'a')
for i in l:
f.write(f"{i}\n")
f.close()
return filepath
def split_file(file_path, block_size):
f = open(file_path, 'r')
fileno = 1
files = []
while True:
lines = f.readlines(block_size)
if not lines:
break
lines = [int(i.strip()) for i in lines]
lines.sort()
files.append(save_file(lines, fileno))
fileno += 1
return files
# 合并
将拆分成的小文件合并起来,然后将归并的东西写到大文件里面去,这里用到的是上面的多路归并的方法
def nw_merge(files):
fs = [open(file_) for file_ in files]
min_map = {}
out = open("/home/xxx/out", "a")
for f in fs:
read = f.readline()
if read:
min_map[f] = int(read.strip())
while min_map:
min_ = min(min_map.items(), key = lambda x: x[1])
min_f, min_v = min_
out.write("{}".format(min_v))
out.write("\n")
nextline = min_f.readline()
if nextline:
min_map[min_f] = int(nextline.strip())
else:
del min_map[min_f]
# 全部代码
import os
from pathlib import Path
def nw_merge(files):
fs = [open(file_) for file_ in files]
min_map = {} # 用来记录每一路当前最小值
out = open(Path(".") / "out/integration.txt", "a+")
for f in fs:
read = f.readline()
if read:
min_map[f] = int(read.strip())
while min_map: # 将最小值取出, 并将该最小值所在的那一路做对应的更新
min_ = min(min_map.items(), key=lambda x: x[1])
min_f, min_v = min_
out.write(f"{min_v}\n")
nextline = min_f.readline()
if nextline:
min_map[min_f] = int(nextline.strip())
else:
del min_map[min_f]
for f in fs:
f.close()
out.close()
def save_file(l, fileno):
path = Path(".") / "split"
filepath = path / f"{fileno}"
info = '\n'.join(map(str, l))
with open(filepath, "a+") as f:
f.write(f"{info}")
return filepath
def split_file(file_path, block_size):
fileno = 1 # 文件数
files = [] # 小文件目录
with open(file_path, 'r') as f:
while True:
lines = f.readlines(block_size)
if not lines:
break
lines = [int(i.strip()) for i in lines] # 生成一个列表
lines.sort() # 排序
files.append(save_file(lines, fileno))
fileno += 1
return files
if __name__ == "__main__":
# 每行单个数字
file_path = Path(".") / "tests.txt"
block_size = 500 * 1024 * 1024 # 500K
num_blocks = os.stat(file_path).st_size / block_size
files = split_file(file_path, block_size)
nw_merge(files)