Pythonでマルチプロセスを駆使した高速並行処理を実現!「multiprocessing.connection.Connection」の使い方を徹底解説

2024-06-19

Pythonにおけるマルチプロセスでの「同時実行」と「multiprocessing.connection.Connection」

Python で並行処理を行うには、主に2つの方法があります。

  1. マルチスレッド
  2. マルチプロセス

マルチスレッドは、1つのプロセス内で複数のスレッドを実行することで並行処理を実現します。一方、マルチプロセスは、複数のプロセスを別々に実行することで並行処理を実現します。

CPythonの場合、グローバルインタープリターロック (GIL) の制約により、マルチスレッドであっても実際には1つのスレッドしか実行できません。そのため、CPU バウンド型タスクの処理速度向上にはマルチプロセスの方が適しています。

マルチプロセス間でデータをやり取りするには、いくつかの方法があります。その中でもよく使われるのが、multiprocessing.connection.Connection オブジェクトです。

multiprocessing.connection.Connection オブジェクトは、2つのプロセス間で双方向の通信を可能にするパイプのようなものです。このオブジェクトを使用して、データを送受信したり、関数を呼び出したりすることができます。

multiprocessing.connection.Connection オブジェクトは、以下のようなメソッドを提供します。

  • send(): データを送信します。
  • poll(): データ受信可能かどうかをチェックします。
  • close(): 接続を閉じます。

簡単な例

import multiprocessing

def producer(pipe):
    pipe.send('Hello, World!')
    pipe.close()

def consumer(pipe):
    data = pipe.recv()
    print(data)
    pipe.close()

if __name__ == '__main__':
    parent_pipe, child_pipe = multiprocessing.Pipe()
    producer_process = multiprocessing.Process(target=producer, args=(parent_pipe,))
    consumer_process = multiprocessing.Process(target=consumer, args=(child_pipe,))

    producer_process.start()
    consumer_process.start()

    producer_process.join()
    consumer_process.join()

この例では、producer() 関数は "Hello, World!" という文字列を pipe に送信し、consumer() 関数は pipe からデータを受信して印字します。

注意点

  • 送信するデータは、シリアライズ可能である必要があります。つまり、pickle モジュールなどでシリアライズできる形式である必要があります。
  • 接続は双方向であり、どちらのプロセスからも閉じることができます。
  • 接続が閉じられた後は、その接続を使用してデータを送受信することはできません。

multiprocessing.connection.Connection オブジェクトは、Python でマルチプロセス間でデータをやり取りする簡単な方法を提供します。しかし、シリアライズや接続の管理などの点に注意する必要があります。

より複雑なデータ構造や通信プロトコルを扱う場合は、multiprocessing.Queuemultiprocessing.Manager などの他のツールを使用することもできます。



    Pythonにおけるマルチプロセスでの「同時実行」と「multiprocessing.connection.Connection」のサンプルコード

    キューを使って複数のタスクを実行

    この例では、multiprocessing.Pool クラスを使用して、複数のタスクを非同期に実行する方法を示します。各タスクは、multiprocessing.connection.Connection オブジェクトを使用して、キューからジョブを受け取り、結果をキューに返します。

    import multiprocessing
    import time
    
    def worker(queue, pipe):
        while True:
            try:
                job = queue.get(block=True)
            except queue.Empty:
                break
            
            result = job(pipe)
            pipe.send(result)
    
    def task(pipe):
        time.sleep(2)
        pipe.send('Hello from task!')
        return 'Task finished'
    
    if __name__ == '__main__':
        queue = multiprocessing.Queue()
        pool = multiprocessing.Pool(processes=4)
    
        # 4つのワーカープロセスを作成
        for i in range(4):
            worker_pipe, main_pipe = multiprocessing.Pipe()
            worker_process = pool.apply_async(worker, args=(queue, worker_pipe,))
            
        # キューにタスクを追加
        for i in range(10):
            queue.put(task, args=(main_pipe,))
    
        # 結果の収集
        results = []
        for i in range(10):
            result = main_pipe.recv()
            results.append(result)
    
        # ワーカープロセスの終了
        pool.close()
        pool.join()
    
        print(results)
    

    このコードでは、以下の処理が行われます。

    1. worker() 関数は、キューからジョブを取得し、そのジョブを実行して結果をパイプに送信します。
    2. task() 関数は、2秒後に "Hello from task!" という文字列をパイプに送信し、"Task finished" という文字列を返します。
    3. メインプロセスは、4つのワーカープロセスを作成し、それぞれに worker() 関数を実行させます。
    4. メインプロセスは、10個のタスクをキューに追加します。
    5. メインプロセスは、パイプから結果を受信し、リストに保存します。
    6. メインプロセスは、ワーカープロセスを終了させます。
    7. メインプロセスは、結果を印字します。

    イベントを使ってタスクの完了を通知

    この例では、multiprocessing.Event オブジェクトを使用して、タスクが完了したことを他のプロセスに通知する方法を示します。

    import multiprocessing
    import time
    
    def worker(event):
        time.sleep(2)
        event.set()
    
    if __name__ == '__main__':
        event = multiprocessing.Event()
        worker_process = multiprocessing.Process(target=worker, args=(event,))
    
        worker_process.start()
    
        event.wait()
        print('Task finished!')
    
        worker_process.join()
    
    1. worker() 関数は、2秒後に event オブジェクトをセットします。
    2. メインプロセスは、worker() 関数を実行するワーカープロセスを作成します。
    3. メインプロセスは、event オブジェクトがセットされるまで待機します。
    4. メインプロセスは "Task finished!" という文字列を印字します。

    共有メモリを使ってデータを共有

    この例では、multiprocessing.Manager クラスを使用して、複数のプロセス間でメモリを共有する方法を示します。

    import multiprocessing
    import array
    
    def worker(shared_array):
        for i in range(len(shared_array)):
            shared_array[i] += 1
    
    if __name__ == '__main__':
        manager = multiprocessing.Manager()
        shared_array = manager.array('i', range(10))
    
        worker_process = multiprocessing.Process(target=worker, args=(shared_array,))
    
        worker_process.start()
    
        worker_process.join()
    
        print(shared_array)
    
    1. worker() 関数は、共有配列の各要素に 1 を加算します。
    2. メインプロセスは、manager オブジェクトを使用して共有配列を作成します。
    3. メインプロセスは、worker() 関数を実行


    multiprocessing.connection.Connection の代替方法

    • シリアライズの必要性: 送信するデータはシリアライズ可能である必要があり、複雑なデータ構造の場合は煩雑になる可能性があります。
    • 双方向通信: 接続は双方向であり、常に両方のプロセスからデータを送受信する必要があります。一方方向の通信のみが必要な場合は、非効率となります。
    • 限られた機能: キューやイベントなどの高度な同期機能はありません。

    これらの制限を克服するために、multiprocessing.connection.Connection の代替となるいくつかの方法があります。

    キュー

    multiprocessing.Queue は、プロセス間でデータをやり取りするための同期キューを提供します。シリアライズの必要がなく、一方方向の通信にも対応できます。また、複数のプロセス間でデータを共有したり、タスクをスケジューリングしたりするのに役立ちます。

    例:

    import multiprocessing
    import time
    
    def producer(queue):
        for i in range(10):
            queue.put(i)
            time.sleep(1)
    
    def consumer(queue):
        while True:
            try:
                item = queue.get(block=True)
                print(f'Received: {item}')
            except queue.Empty:
                break
    
    if __name__ == '__main__':
        queue = multiprocessing.Queue()
    
        producer_process = multiprocessing.Process(target=producer, args=(queue,))
        consumer_process = multiprocessing.Process(target=consumer, args=(queue,))
    
        producer_process.start()
        consumer_process.start()
    
        producer_process.join()
        consumer_process.join()
    

    共有メモリ

    multiprocessing.Manager クラスは、複数のプロセス間でメモリを共有するためのオブジェクトを提供します。これは、大きなデータ構造を共有したり、状態を同期したりするのに役立ちます。

    import multiprocessing
    import array
    
    def worker(shared_array):
        for i in range(len(shared_array)):
            shared_array[i] += 1
    
    if __name__ == '__main__':
        manager = multiprocessing.Manager()
        shared_array = manager.array('i', range(10))
    
        worker_process = multiprocessing.Process(target=worker, args=(shared_array,))
    
        worker_process.start()
        worker_process.join()
    
        print(shared_array)
    

    メッセージパッシングシステム

    ZeroMQ や Apache Kafka などのメッセージパッシングシステムは、スケーラブルで高性能なマルチプロセス間通信を提供します。これらのシステムは、複雑な分散アプリケーションを構築するのに役立ちます。

    import zmq
    
    context = zmq.Context()
    
    # Publisher
    publisher = context.socket(zmq.PUB)
    publisher.bind("tcp://*:5555")
    
    # Subscriber
    subscriber = context.socket(zmq.SUB)
    subscriber.connect("tcp://localhost:5555")
    subscriber.setsockopt(zmq.SUBSCRIBE, b"")
    
    while True:
        message = subscriber.recv()
        print(f'Received: {message}')
    
        # Send a reply
        publisher.send(b"Hello from subscriber!")
    

    RPC フレームワーク

    RPyC や Pyro などの RPC フレームワークは、プロセス間で関数を呼び出すための高レベルな抽象化を提供します。これらは、複雑な分散オブジェクトを構築するのに役立ちます。

    import rpyc
    
    # Server
    class MyService(rpyc.Service):
        def double(self, x):
            return 2 * x
    
    with rpyc.ThreadedServer(MyService, port=12345) as server:
        server.start()
    
    # Client
    with rpyc.connect("localhost", 12345) as conn:
        service = conn.root.MyService
        result = service.double(10)
        print(f'Result: {result}')
    

    これらの代替手段はそれぞれ長所と短所があり、要件に応じて適切なものを選択する必要があります。

    • シンプルさ: multiprocessing.connection.Connection は最もシンプルですが、機能も限定されています。
    • 汎用性: キューと共有メモリは、幅広いユースケースに対応できます。
    • スケーラビリティ: メッセージパッシングシステムと RPC フレームワークは