• 4.13 创建数据处理管道
    • 问题
    • 解决方案
    • 讨论

    4.13 创建数据处理管道

    问题

    你想以数据管道(类似Unix管道)的方式迭代处理数据。比如,你有个大量的数据需要处理,但是不能将它们一次性放入内存中。

    解决方案

    生成器函数是一个实现管道机制的好办法。为了演示,假定你要处理一个非常大的日志文件目录:

    1. foo/
    2. access-log-012007.gz
    3. access-log-022007.gz
    4. access-log-032007.gz
    5. ...
    6. access-log-012008
    7. bar/
    8. access-log-092007.bz2
    9. ...
    10. access-log-022008

    假设每个日志文件包含这样的数据:

    1. 124.115.6.12 - - [10/Jul/2012:00:18:50 -0500] "GET /robots.txt ..." 200 71
    2. 210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /ply/ ..." 200 11875
    3. 210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /favicon.ico ..." 404 369
    4. 61.135.216.105 - - [10/Jul/2012:00:20:04 -0500] "GET /blog/atom.xml ..." 304 -
    5. ...

    为了处理这些文件,你可以定义一个由多个执行特定任务独立任务的简单生成器函数组成的容器。就像这样:

    1. import os
    2. import fnmatch
    3. import gzip
    4. import bz2
    5. import re
    6.  
    7. def gen_find(filepat, top):
    8. '''
    9. Find all filenames in a directory tree that match a shell wildcard pattern
    10. '''
    11. for path, dirlist, filelist in os.walk(top):
    12. for name in fnmatch.filter(filelist, filepat):
    13. yield os.path.join(path,name)
    14.  
    15. def gen_opener(filenames):
    16. '''
    17. Open a sequence of filenames one at a time producing a file object.
    18. The file is closed immediately when proceeding to the next iteration.
    19. '''
    20. for filename in filenames:
    21. if filename.endswith('.gz'):
    22. f = gzip.open(filename, 'rt')
    23. elif filename.endswith('.bz2'):
    24. f = bz2.open(filename, 'rt')
    25. else:
    26. f = open(filename, 'rt')
    27. yield f
    28. f.close()
    29.  
    30. def gen_concatenate(iterators):
    31. '''
    32. Chain a sequence of iterators together into a single sequence.
    33. '''
    34. for it in iterators:
    35. yield from it
    36.  
    37. def gen_grep(pattern, lines):
    38. '''
    39. Look for a regex pattern in a sequence of lines
    40. '''
    41. pat = re.compile(pattern)
    42. for line in lines:
    43. if pat.search(line):
    44. yield line

    现在你可以很容易的将这些函数连起来创建一个处理管道。比如,为了查找包含单词python的所有日志行,你可以这样做:

    1. lognames = gen_find('access-log*', 'www')
    2. files = gen_opener(lognames)
    3. lines = gen_concatenate(files)
    4. pylines = gen_grep('(?i)python', lines)
    5. for line in pylines:
    6. print(line)

    如果将来的时候你想扩展管道,你甚至可以在生成器表达式中包装数据。比如,下面这个版本计算出传输的字节数并计算其总和。

    1. lognames = gen_find('access-log*', 'www')
    2. files = gen_opener(lognames)
    3. lines = gen_concatenate(files)
    4. pylines = gen_grep('(?i)python', lines)
    5. bytecolumn = (line.rsplit(None,1)[1] for line in pylines)
    6. bytes = (int(x) for x in bytecolumn if x != '-')
    7. print('Total', sum(bytes))

    讨论

    以管道方式处理数据可以用来解决各类其他问题,包括解析,读取实时数据,定时轮询等。

    为了理解上述代码,重点是要明白 yield 语句作为数据的生产者而 for 循环语句作为数据的消费者。当这些生成器被连在一起后,每个 yield 会将一个单独的数据元素传递给迭代处理管道的下一阶段。在例子最后部分, sum() 函数是最终的程序驱动者,每次从生成器管道中提取出一个元素。

    这种方式一个非常好的特点是每个生成器函数很小并且都是独立的。这样的话就很容易编写和维护它们了。很多时候,这些函数如果比较通用的话可以在其他场景重复使用。并且最终将这些组件组合起来的代码看上去非常简单,也很容易理解。

    使用这种方式的内存效率也不得不提。上述代码即便是在一个超大型文件目录中也能工作的很好。事实上,由于使用了迭代方式处理,代码运行过程中只需要很小很小的内存。

    在调用 gen_concatenate() 函数的时候你可能会有些不太明白。这个函数的目的是将输入序列拼接成一个很长的行序列。itertools.chain() 函数同样有类似的功能,但是它需要将所有可迭代对象最为参数传入。在上面这个例子中,你可能会写类似这样的语句 lines = itertools.chain(*files) ,这将导致 gen_opener() 生成器被提前全部消费掉。但由于 gen_opener() 生成器每次生成一个打开过的文件,等到下一个迭代步骤时文件就关闭了,因此 chain() 在这里不能这样使用。上面的方案可以避免这种情况。

    gen_concatenate() 函数中出现过 yield from 语句,它将 yield 操作代理到父生成器上去。语句 yield from it 简单的返回生成器 it 所产生的所有值。关于这个我们在4.14小节会有更进一步的描述。

    最后还有一点需要注意的是,管道方式并不是万能的。有时候你想立即处理所有数据。然而,即便是这种情况,使用生成器管道也可以将这类问题从逻辑上变为工作流的处理方式。

    David Beazley 在他的Generator Tricks for Systems Programmers教程中对于这种技术有非常深入的讲解。可以参考这个教程获取更多的信息。

    原文:

    http://python3-cookbook.readthedocs.io/zh_CN/latest/c04/p13_create_data_processing_pipelines.html