摘要.
0.0792017.04.25 07:47:24 字数 74 阅读 10,100
# 支持 python2.7 3.5 3.6, 运用 multiprocessing 模块的 Pool 异步进程池,分段读取文件(文件编码由 chardet 自动判断,需pip install chardet
),并统计词频,代码如下:
from __future__ import print_function, division, unicode_literals
import sys, re, time, os
import operator
from collections import Counter
from functools import reduce
from multiprocessing import Pool, cpu_count
from datetime import datetime
from utils import humansize, humantime, processbar
def wrap(wcounter, fn, p1, p2, f_size):
return wcounter.count_multi(fn, p1, p2, f_size)
class WordCounter(object):
def __init__(self, from_file, to_file=None, workers=None, coding=None,
max_direct_read_size=10000000):
'''根据设定的进程数,把文件from_file分割成大小基本相同,数量等同与进程数的文件段,
来读取并统计词频,然后把结果写入to_file中,当其为None时直接打印在终端或命令行上。
Args:
@from_file 要读取的文件
@to_file 结果要写入的文件
@workers 进程数,为0时直接把文件一次性读入内存;为1时按for line in open(xxx)
读取;>=2时为多进程分段读取;默认为根据文件大小选择0或cpu数量的64倍
@coding 文件的编码方式,默认为采用chardet模块读取前1万个字符才自动判断
@max_direct_read_size 直接读取的最大值,默认为10000000(约10M)
How to use:
w = WordCounter('a.txt', 'b.txt')
w.run()
'''
if not os.path.isfile(from_file):
raise Exception('No such file: 文件不存在')
self.f1 = from_file
self.filesize = os.path.getsize(from_file)
self.f2 = to_file
if workers is None:
if self.filesize < int(max_direct_read_size):
self.workers = 0
else:
self.workers = cpu_count() * 64
else:
self.workers = int(workers)
if coding is None:
try:
import chardet
except ImportError:
os.system('pip install chardet')
print('-'*70)
import chardet
with open(from_file, 'rb') as f:
coding = chardet.detect(f.read(10000))['encoding']
self.coding = coding
self._c = Counter()
def run(self):
start = time.time()
if self.workers == 0:
self.count_direct(self.f1)
elif self.workers == 1:
self.count_single(self.f1, self.filesize)
else:
pool = Pool(self.workers)
res_list = []
for i in range(self.workers):
p1 = self.filesize * i // self.workers
p2 = self.filesize * (i+1) // self.workers
args = [self, self.f1, p1, p2, self.filesize]
res = pool.apply_async(func=wrap, args=args)
res_list.append(res)
pool.close()
pool.join()
self._c.update(reduce(operator.add, [r.get() for r in res_list]))
if self.f2:
with open(self.f2, 'wb') as f:
f.write(self.result.encode(self.coding))
else:
print(self.result)
cost = '{:.1f}'.format(time.time()-start)
size = humansize(self.filesize)
tip = '\nFile size: {}. Workers: {}. Cost time: {} seconds'
print(tip.format(size, self.workers, cost))
self.cost = cost + 's'
def count_single(self, from_file, f_size):
'''单进程读取文件并统计词频'''
start = time.time()
with open(from_file, 'rb') as f:
for line in f:
self._c.update(self.parse(line))
processbar(f.tell(), f_size, from_file, f_size, start)
def count_direct(self, from_file):
'''直接把文件内容全部读进内存并统计词频'''
start = time.time()
with open(from_file, 'rb') as f:
line = f.read()
self._c.update(self.parse(line))
def count_multi(self, fn, p1, p2, f_size):
c = Counter()
with open(fn, 'rb') as f:
if p1:
f.seek(p1-1)
while b'\n' not in f.read(1):
pass
start = time.time()
while 1:
line = f.readline()
c.update(self.parse(line))
pos = f.tell()
if p1 == 0:
processbar(pos, p2, fn, f_size, start)
if pos >= p2:
return c
def parse(self, line):
return Counter(re.sub(r'\s+','',line.decode(self.coding)))
def flush(self):
self._c = Counter()
@property
def counter(self):
return self._c
@property
def result(self):
ss = ['{}: {}'.format(i, j) for i, j in self._c.most_common()]
return '\n'.join(ss)
def main():
if len(sys.argv) < 2:
print('Usage: python wordcounter.py from_file to_file')
exit(1)
from_file, to_file = sys.argv[1:3]
args = {'coding' : None, 'workers': None, 'max_direct_read_size':10000000}
for i in sys.argv:
for k in args:
if re.search(r'{}=(.+)'.format(k), i):
args[k] = re.findall(r'{}=(.+)'.format(k), i)[0]
w = WordCounter(from_file, to_file, **args)
w.run()
if __name__ == '__main__':
main()
from __future__ import print_function, division, unicode_literals
import os
import time
def humansize(size):
"""将文件的大小转成带单位的形式
>>> humansize(1024) == '1 KB'
True
>>> humansize(1000) == '1000 B'
True
>>> humansize(1024*1024) == '1 M'
True
>>> humansize(1024*1024*1024*2) == '2 G'
True
"""
units = ['B', 'KB', 'M', 'G', 'T']
for unit in units:
if size < 1024:
break
size = size // 1024
return '{} {}'.format(size, unit)
def humantime(seconds):
"""将秒数转成00:00:00的形式
>>> humantime(3600) == '01:00:00'
True
>>> humantime(360) == '06:00'
True
>>> humantime(6) == '00:06'
True
"""
h = m = ''
seconds = int(seconds)
if seconds >= 3600:
h = '{:02}:'.format(seconds // 3600)
seconds = seconds % 3600
if 1 or seconds >= 60:
m = '{:02}:'.format(seconds // 60)
seconds = seconds % 60
return '{}{}{:02}'.format(h, m, seconds)
def processbar(pos, p2, fn, f_size, start):
'''打印进度条
just like:
a.txt, 50.00% [===== ] 1/2 [00:01<00:01]
'''
percent = min(pos * 10000 // p2, 10000)
done = '=' * (percent//1000)
half = '-' if percent // 100 % 10 > 5 else ''
tobe = ' ' * (10 - percent//1000 - len(half))
tip = '{}{}, '.format('\33[?25l', os.path.basename(fn))
past = time.time()-start
remain = past/(percent+0.01)*(10000-percent)
print('\r{}{:.1f}% [{}{}{}] {:,}/{:,} [{}<{}]'.format(tip,
percent/100, done, half, tobe,
min(pos*int(f_size//p2+0.5), f_size), f_size,
humantime(past), humantime(remain)),
end='')
if percent == 10000:
print('\33[?25h', end='')
if __name__ == '__main__':
import doctest
doctest.testmod()
github 地址:https://github.com/waketzheng/wordcounter
可以直接:
运行结果:
[willie@localhost linuxtools]$ python wordcounter.py test/var/20000thousandlines.txt tmp2.txt 20000thousandlines.txt, 100.0% [==========] 115,000,000/115,000,000 [06:57<00:00] File size: 109 M. Workers: 128. Cost time: 417.8 seconds
更多精彩内容下载简书 APP
"小礼物走一走,来简书关注我"
还没有人赞赏,支持一下
总资产 1 (约 0.10 元) 共写了 2486 字获得 12 个赞共 10 个粉丝
# 推荐阅读更多精彩内容
这次的任务是综合练习:直接上题目和代码~ 结果 结果 结果: 结果: 解题思路是:利用 case when 判断 r...
一、关于时间 1、获得当前时间 2、获得当前时间 3、获得时间差 二、文件夹是否存在 如果文件夹存在,则忽视;如果...
骆旺达阅读 222 评论 1 赞 0
本文的文字及图片来源于网络, 仅供学习、交流使用, 不具有任何商业用途, 如有问题请及时联系我们以作处理。 以下文章来源...
当数据量比较大的时候,我们就需要考虑读写分离了,也就是动态切换数据库连接, 对指定的数据库进行操作。在 spring 中...
Spark SQL 读取 MySQL 的方式 Spark SQL 还包括一个可以使用 JDBC 从其他数据库读取数据的数据源。... https://www.jianshu.com/p/7665545c970b https://www.jianshu.com/p/7665545c970b