Python键盘中断多进程池

解决例子

优雅的处理方式,如下:

import signal
from multiprocessing import Pool
import os
import time

def sum_numbers(num):
    result = 0
    for i in range(1, num + 1):
        result += i
        # 模拟长时间运算
        time.sleep(1)
    return result

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

if __name__ == '__main__':
    numbers = [10, 20, 30, 10, 15]
    pool = Pool(initializer=init_worker)
    results = pool.map_async(sum_numbers, numbers)

    try:
        # 等待所有任务完成或手动中断
        while not results.ready() and results._number_left != 0:
            # 打印当前已完成的任务数量
            print(f'已完成任务数量:{len(numbers) - results._number_left}')
            time.sleep(1)

        if results._number_left == 0:
            print("所有任务已完成")
            # 获取所有任务的结果
            all_results = results.get()
            for result in all_results:
                print(f'任务结果:{result}')
        else:
            print("任务已手动中断")
    except KeyboardInterrupt:
        pool.terminate()
        pool.join()
        os.kill(os.getpid(), signal.SIGINT)

问题重现

无尽KeyboardInterrupt

在使用Python多进程池pool时,想要通过终端CTRL+C来KeyboardInterrupt所有进程,然后在实现过程中发现CTRL+C之后始终不能正常退出,会进入无尽的KeyboardInterrupt。

from multiprocessing import Pool
import time
import sys

def worker_function(seconds):
    print(f"Worker sleeping for {seconds} seconds")
    time.sleep(seconds)
    print("Worker finished sleeping")

def main():
    pool = Pool(processes=2)
    try:
        pool.map(worker_function, range(40))

    except KeyboardInterrupt:
        print("KeyboardInterrupt received, terminating all processes")
        pool.terminate()
        sys.exit(1)

if __name__ == "__main__":
    main()

微信截图_20240426214953.png

ingore KeyboardInterrupt

在上述基础上,看到了如下答案,将CTRL+C Ignore掉,但是该写法将主进程的CTRL+C也忽略掉了并无法解决问题。

from multiprocessing import Pool
import time
import sys
import signal

def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)

def worker_function(seconds):
    print(f"Worker sleeping for {seconds} seconds")
    time.sleep(seconds)
    print("Worker finished sleeping")

def main():
    pool = Pool(processes=2, initializer=initializer)
    try:
        pool.map(worker_function, range(40))

    except KeyboardInterrupt:
        print("KeyboardInterrupt received, terminating all processes")
        pool.terminate()
        sys.exit(1)

if __name__ == "__main__":
    main()

说明

map(func, iterable[, chunksize])
内置 map() 函数的并行版本 (但它只支持一个 iterable 参数,对于多个可迭代对象请参阅 starmap())。 它会保持阻塞直到获得结果。

map_async是map() 方法的一个变种,将返回一个AsyncResult对象
AsyncResult对象

  • 通过ready()返回执行状态,是否已经完成。
  • 通过get([timeout])用于获取执行结果。如果 timeout 不是 None 并且在 timeout 秒内仍然没有执行完得到结果,则抛出 multiprocessing.TimeoutError 异常。如果远程调用发生异常,这个异常会通过 get() 重新抛出。

参考链接1
参考链接2