yhw-miracle

分布式进程

痛点就是起点 writed in

本文为痛点就是起点原创文章,可以随意转载,但需注明出处。

分布式进程是指将 Process 进程分布到多台机器上,充分利用多台机器的性能完成复杂的任务。python 中实现分布式进程要用到 multiprocessing 模块,该模块不仅支持多进程,其中 managers 子模块还支持把多进程分布到多台机器上。

分布式进程可以用在做爬虫程序中,利用一个服务进程作为调度者,将不同任务分布到其他机器上的进程上,利用网络通信进行管理,提高爬虫程序的效率。下面举例说明分布式进程在爬虫程序中应用。

需求:以爬取某个网站上所以图片为例

如果使用多进程,一般是用一个进程负责抓取图片的链接地址,将链接地址放到本地队列中(Queue),另一个进程负责从队列中读取链接地址并进行下载和存储到本地。

如果使用分布式进程来完成这一需求,需要一台机器负责抓取链接,将链接地址放到队列中(Queue),这时候需要将队列共享到网络中,便于其他机器中的进程可以访问到,其他机器上的进程负责读取链接并下载存储。分布式进程就是将这一过程进行了封装,相当于本地队列的网络化。

要实现上述需求,可以编写服务进程,负责抓取链接地址,共享到网络队列上;编写任务进程,负责读取网络队列上的链接地址,并下载存储。

服务进程编写过程:

  1. 建立队列 Queue,用来进行进程间的通信。服务进程创建任务队列 taskQueue,用来作为传递任务给任务进程的通道;服务进程创建结果队列 resultQueue,作为任务进程完成任务后回复服务进程的通道。在分布式多进程中,需要通过 QueueManager 获得的 Queue 接口来添加任务。
  2. 把第一步创建的队列在网络上注册,共享给其他机器上的进程,注册后获得网络队列,相当于本地队列的映像。
  3. 建立一个对象 (QueueManager(BaseManager)) 实例 manager,绑定端口和验证口令。然后,启动该实例,监管信息通道。
  4. 通过管理实例的方法获得网络访问的 Queue 对象,即把网络队列实体化可以使用的本地队列。
  5. 创建任务到本地队列中,自动上传任务到网络队列中,分配给任务进程进行处理。
# coding:utf-8

import random, time, Queue
from multiprocessing.managers import BaseManager

# 第一步: 建立 taskQueue 和 resultQueue ,用了存放任务和结果
taskQueue = Queue.Queue()
resultQueue = Queue.Queue()


class QueueManager(BaseManager):
    pass


if __name__ == '__main__':
    # 第二步: 把创建的两个队列注册在网络上,利用 register 方法,callable 参数关联了 Queue 对象
    # 将 Queue 对象在网络上暴露
    QueueManager.register('getTaskQueue', callable=lambda:taskQueue)
    QueueManager.register('getResultQueue', callable=lambda:resultQueue)

    # 第三步: 绑定端口 8001,设置验证口令'痛点就是起点'(这个相当于对象的初始化)
    manager = QueueManager(address=('', 8001), authkey='痛点就是起点')

    # 启动管理,监听信息通道
    manager.start()

    # 第四步: 通过管理实例的方法获得通过网络访问的 Queue 对象
    task = manager.getTaskQueue()
    result = manager.getResultQueue()

    # 第五步: 添加任务
    for url in ['ImageUrl_' + str(i) for i in range(10)]:
        print('put task %s ...' % url)
        task.put(url)

    # 获取返回的结果
    print('try get result ...')
    for i in range(10):
        print('result is %s' % result.get(timeout=10))

    # 关闭管理
    manager.shutdown()

任务进程编写过程:

  1. 使用 QueueManager 注册用于获取 Queue 的方法名称,任务进程只能通过名称来获取网络上的 Queue。
  2. 连接服务器,保持端口和验证口令与服务进程一致。
  3. 获取网络上的 Queue,进行本地化。
  4. 从 taskQueue 获取任务,并把结果写入 resultQueue。
# coding:utf-8

import time
from multiprocessing.managers import BaseManager


# 创建类似的 QueueManager
class QueueManager(BaseManager):
    pass


if __name__ == '__main__':
    # 第一步: 使用 QueueManager 注册用于获取 Queue 的方法名称
    QueueManager.register('getTaskQueue')
    QueueManager.register('getResultQueue')

    # 第二步: 连接到服务器
    serverAddr = '127.0.0.1'
    print('Connect to server %s ...' % serverAddr)
    # 端口和验证口令注意保持与服务进程完全一致
    m = QueueManager(address=(serverAddr, 8001), authkey='痛点就是起点')
    m.connect()

    # 第三步: 获取 Queue 对象
    task = m.getTaskQueue()
    result = m.getResultQueue()

    # 第四步: 从 task 队列获取任务,并把结果写入到 result 队列
    while(not task.empty()):
        imageUrl = task.get(True, timeout=5)
        print('run task download %s ...' % imageUrl)
        time.sleep(1)
        result.put('%s ---> success' % imageUrl)

    # 处理结束
    print('worker exit.')

小结:其实这是一个简单但真正的分布式进程,把代码稍加改造,启动多个任务进程,就可以把任务分不到多台机器上,实现大规模的分布式爬虫。

python 分布式进程
知识总结

欢迎关注,我们一起进行认知迭代!


痛点就是起点

© 2016 - 2020 基于 jekyll | Github Pags | iconfont By yhw-miracle