数据结构与算法 python

多路归并排序-Python实现

技术分享

systemime
2021-05-28
6 min

使用python实现多(K)路归并外部排序,解决小内存排序大文件问题

上一篇中,我们实现了一般的归并排序 归并排序递归与非递归-Python实现

在实际工作中,多个有序数列合并成一个,大文件或多个大文件合并成一个并排序的需求常见并不少见,首先,先来看一下多个有序数列情况

# 合并多个有序数组

比如现在有四路:

  • a0: [1, 3, 6, 7]
  • a1: []
  • a2: [3, 5, 7, 19]
  • a3: [9, 12, 87, 98]

# 保存每路最小值

第一步需要知道每一路的最小值,如果每一路用数组表示的话需要保存对应的下标,并保存为min_map

  • 第0路: 1
  • 第1路: 没有值
  • 第2路: 3
  • 第3路: 9

初始的 min_map:

{0: (1, 0), 2: (3, 0), 3: (9, 0)}

# 获取最小值中的最小值

第二部需要将最小值取出来然,后检查被取出值的那一路是否还剩下。

其他元素如果存在,则修改min_map里面对应的值,如果不存在,则删除掉min_map里面对应的记录,以表示该路已经没有元素需要遍历了

代码:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# 多路归并: 将已经排序好的多个数组合并起来


def nw_merge(arrs):
    """
    需要知道每一路的最小值
    第0路: 1
    第1路: 没有值
    第2路: 3
    第3路: 9
    """

    result = []
    min_map = {} # 用min_map 保存每一路的当前最小值
    for inx, arr in enumerate(arrs):
        if arr:
            min_map[inx] = (arr[0], 0)

    print("初始化的每一路最小值min_map", min_map)

    while min_map:
        """
        需要知道每一路的最小值里面哪一路的最小值, 以及最小值所在的那一路的index
        """

        min_ = min(min_map.items(), key = lambda m: m[1][0])
        way_num, (way_min_v, way_inx) = min_
        result.append(way_min_v)

        """
        检查被取出值的那一路是否还剩下其他元素, 如果存在, 则修改min_map里面对应的值, 如果不存在,
        则删除掉min_map里面对应的记录, 以表示该路已经没有元素需要遍历了
        """
        way_inx += 1
        if way_inx < len(arrs[way_num]):
            min_map[way_num] = (arrs[way_num][way_inx], way_inx)
        else:
            del min_map[way_num]
    return result

a0 = [1, 3, 6, 7]
a1 = []
a2 = [3, 5, 7, 19]
a3 = [9, 12, 87, 98]
arrs = [a0, a1, a2, a3]

print("a0:", a0)
print("a1:", a1)
print("a2:", a2)
print("a3:", a3)

result = nw_merge(arrs)

print("最终合并的:", result)

# 输出

"""
a0: [1, 3, 6, 7]
a1: []
a2: [3, 5, 7, 19]
a3: [9, 12, 87, 98]

初始化的每一路最小值min_map {0: (1, 0), 2: (3, 0), 3: (9, 0)}
"""

# 最终合并的: 
[1, 3, 3, 5, 6, 7, 7, 9, 12, 19, 87, 98]

# 对超大文件排序(10G的日志,512M的内存)

绕不开归并核心思想,分治,先拆成小文件,再排序,最后合并所有碎片文件成一个大文件

# 拆文件

首先第一步,大文件拆分成 x 个 block_size 大的小文件,每个小文件排好序


def save_file(l, fileno):
    filepath = f"/home/xxx/{fileno}"

    f = open(filepath, 'a')
    for i in l:
        f.write(f"{i}\n")
    f.close()
    return filepath

def split_file(file_path, block_size):
    f = open(file_path, 'r')
    fileno = 1 
    files = []
    while True:
        lines = f.readlines(block_size)
        if not lines:
            break
        lines = [int(i.strip()) for i in lines]
        lines.sort()
        files.append(save_file(lines, fileno))
        fileno += 1
    return files

# 合并

将拆分成的小文件合并起来,然后将归并的东西写到大文件里面去,这里用到的是上面的多路归并的方法

def nw_merge(files):
    fs = [open(file_) for file_ in files]
    min_map = {}
    out = open("/home/xxx/out", "a")
    for f in fs:
        read = f.readline()
        if read:
            min_map[f] = int(read.strip())
    
    while min_map:
        min_ = min(min_map.items(), key = lambda x: x[1])
        min_f, min_v = min_
        out.write("{}".format(min_v))
        out.write("\n")
        nextline = min_f.readline()
        if nextline:
            min_map[min_f] = int(nextline.strip())
        else:
            del min_map[min_f]

# 全部代码

import os
from pathlib import Path


def nw_merge(files):
    fs = [open(file_) for file_ in files]
    min_map = {}  # 用来记录每一路当前最小值
    out = open(Path(".") / "out/integration.txt", "a+")
    for f in fs:
        read = f.readline()
        if read:
            min_map[f] = int(read.strip())

    while min_map:  # 将最小值取出, 并将该最小值所在的那一路做对应的更新
        min_ = min(min_map.items(), key=lambda x: x[1])
        min_f, min_v = min_
        out.write(f"{min_v}\n")
        nextline = min_f.readline()
        if nextline:
            min_map[min_f] = int(nextline.strip())
        else:
            del min_map[min_f]
    for f in fs:
        f.close()
    out.close()


def save_file(l, fileno):
    path = Path(".") / "split"
    filepath = path / f"{fileno}"
    info = '\n'.join(map(str, l))
    with open(filepath, "a+") as f:
        f.write(f"{info}")

    return filepath


def split_file(file_path, block_size):
    fileno = 1  # 文件数
    files = []  # 小文件目录
    with open(file_path, 'r') as f:
        while True:
            lines = f.readlines(block_size)
            if not lines:
                break
            lines = [int(i.strip()) for i in lines]  # 生成一个列表
            lines.sort()  # 排序
            files.append(save_file(lines, fileno))
            fileno += 1
        return files


if __name__ == "__main__":
    # 每行单个数字
    file_path = Path(".") / "tests.txt"
    block_size = 500 * 1024 * 1024 # 500K
    num_blocks = os.stat(file_path).st_size / block_size
    files = split_file(file_path, block_size)
    nw_merge(files)
上次编辑于: 6/1/2021, 2:48:43 AM