Passing Messages to Process

The example codes are follow:

import multiprocessing

class MyFancyClass:

    def __init__(self, name):
        self.name = name


    def do_something(self):
        proc_name = multiprocessing.current_process().name

        print('Doing something fancy in {} for {}'.format(proc_name, self.name))


def worker(q):
    obj = q.get()
    obj.do_something()

if __name__ == '__main__':

    queue = multiprocessing.Queue()
    for i in range(3):
        p = multiprocessing.Process(target=worker, args=(queue,))
        p.start()


        queue.put(MyFancyClass('Fancy Dan %s' % i))

    queue.close()
    queue.join_thread()
    p.join()

This short example passes just a single message to a single worker, then the main process waits for the workers to finish.

D:\Python39\python.exe C:/Users/admin/Desktop/multiprocessing_queue.py
Doing something fancy in Process-1 for Fancy Dan 0
Doing something fancy in Process-3 for Fancy Dan 1
Doing something fancy in Process-2 for Fancy Dan 2

Process finished with exit code 0
import multiprocessing
import time


class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue


    def run(self):
        proc_name = self.name

        while 1:
            next_task = self.task_queue.get()
            if next_task is None:
                # Posion pill means shutdown
                print('{}: Exiting'.format(proc_name))
                self.task_queue.task_done()
                break
            print('{}: {}'.format(proc_name, next_task))
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)


class Task:

    def __init__(self, a, b):
        self.a = a
        self.b = b

    def __call__(self):
        time.sleep(0.1)  # Pretend to take time to do the work
        return '{} * {} = {}'.format(
            self.a, self.b,  self.a * self.b
        )

    def __str__(self):
        return '{} * {}'.format(self.a, self.b)


if __name__ == '__main__':
    # Establish communication queues
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()

    # Start consumers
    num_consumers = multiprocessing.cpu_count() * 2
    print('Creating {} consumers'.format(num_consumers))
    consumers = [
        Consumer(tasks, results)
        for i in range(num_consumers)
    ]

    for w in consumers:
        w.start()

    # Enqueue jobs.
    num_jobs = 40
    for i in range(num_jobs):
        tasks.put(Task(i, i))

    # Add a poison pill for each consumer
    for i in range(num_consumers):
        tasks.put(None)

    # Wait for all of the tasks to finish
    tasks.join()

    # Start printing results
    while num_jobs:
        result = results.get()
        print('Result:', result)
        num_jobs -= 1
D:\Python39\python.exe C:/Users/admin/Desktop/multiprocessing_producer_consumer.py
Creating 40 consumers
Consumer-2: 0 * 0
Consumer-3: 1 * 1
Consumer-8: 2 * 2
Consumer-13: 3 * 3
Consumer-1: 4 * 4
Consumer-7: 5 * 5
Consumer-11: 6 * 6
Consumer-4: 7 * 7
Consumer-10: 8 * 8
Consumer-17: 9 * 9
Consumer-6: 10 * 10
Consumer-12: 11 * 11
Consumer-9: 12 * 12
Consumer-5: 13 * 13
Consumer-18: 14 * 14
Consumer-22: 15 * 15
Consumer-16: 16 * 16
Consumer-19: 17 * 17
Consumer-26: 18 * 18
Consumer-21: 19 * 19
Consumer-14: 20 * 20
Consumer-15: 21 * 21
Consumer-23: 22 * 22
Consumer-20: 23 * 23
Consumer-31: 24 * 24
Consumer-27: 25 * 25
Consumer-30: 26 * 26
Consumer-25: 27 * 27
Consumer-29: 28 * 28
Consumer-24: 29 * 29
Consumer-33: 30 * 30
Consumer-32: 31 * 31
Consumer-39: 32 * 32
Consumer-40: 33 * 33
Consumer-35: 34 * 34
Consumer-37: 35 * 35
Consumer-34: 36 * 36
Consumer-36: 37 * 37
Consumer-28: 38 * 38
Consumer-38: 39 * 39
Consumer-2: Exiting
Consumer-3: Exiting
Consumer-7: ExitingConsumer-1: Exiting
Consumer-11: Exiting
Consumer-13: Exiting

Consumer-4: Exiting
Consumer-8: Exiting
Consumer-9: Exiting
Consumer-6: Exiting
Consumer-5: Exiting
Consumer-10: Exiting
Consumer-17: Exiting
Consumer-12: Exiting
Consumer-14: Exiting
Consumer-22: Exiting
Consumer-19: Exiting
Consumer-18: Exiting
Consumer-26: Exiting
Consumer-16: Exiting
Consumer-21: Exiting
Consumer-30: Exiting
Consumer-25: Exiting
Consumer-20: Exiting
Consumer-27: Exiting
Consumer-29: Exiting
Consumer-15: Exiting
Consumer-31: Exiting
Consumer-23: Exiting
Consumer-40: Exiting
Consumer-39: Exiting
Consumer-38: Exiting
Consumer-32: Exiting
Consumer-37: Exiting
Consumer-35: Exiting
Consumer-28: Exiting
Consumer-36: Exiting
Consumer-33: Exiting
Consumer-34: Exiting
Consumer-24: Exiting
Result: 0 * 0 = 0
Result: 1 * 1 = 1
Result: 4 * 4 = 16
Result: 5 * 5 = 25
Result: 6 * 6 = 36
Result: 3 * 3 = 9
Result: 7 * 7 = 49
Result: 2 * 2 = 4
Result: 12 * 12 = 144
Result: 10 * 10 = 100
Result: 13 * 13 = 169
Result: 8 * 8 = 64
Result: 11 * 11 = 121
Result: 9 * 9 = 81
Result: 15 * 15 = 225
Result: 20 * 20 = 400
Result: 17 * 17 = 289
Result: 14 * 14 = 196
Result: 18 * 18 = 324
Result: 19 * 19 = 361
Result: 16 * 16 = 256
Result: 26 * 26 = 676
Result: 27 * 27 = 729
Result: 23 * 23 = 529
Result: 25 * 25 = 625
Result: 28 * 28 = 784
Result: 21 * 21 = 441
Result: 22 * 22 = 484
Result: 24 * 24 = 576
Result: 33 * 33 = 1089
Result: 32 * 32 = 1024
Result: 39 * 39 = 1521
Result: 31 * 31 = 961
Result: 35 * 35 = 1225
Result: 38 * 38 = 1444
Result: 34 * 34 = 1156
Result: 37 * 37 = 1369
Result: 36 * 36 = 1296
Result: 30 * 30 = 900
Result: 29 * 29 = 841

Process finished with exit code 0
04-14 06:39