from multiprocessing import Pool

from tqdm import tqdm


class SelfMultiple:
    def __init__(self, func, process: int, params: list, custom_callback=False, callback=None):
        print("==>init customized multiple class")
        self.func = func
        self.params = params
        self.process = process
        self.custom_callback = custom_callback
        self.callback = callback

    def run(self):
        self.pool = Pool(processes=self.process)

        if self.custom_callback == False:
            print("==>undefined self callback")

            pbar = tqdm(total=len(self.params))

            def update(*a):
                pbar.update()

            for param in self.params:
                result = self.pool.apply_async(self.func, param, callback=update)
                result.get()
        else:
            print("==>defined self callback")
            print(f"==>executing || {self.func}")
            for param in self.params:
                result = self.pool.apply_async(self.func, param, callback=self.callback)
                result.get()
        self.pool.close()
        self.pool.join()

如何调用呢?

def add(x, y):
    print(f"adding || {x} + {y}")
    return x + y


if __name__ == "__main__":

    params = [(1, 2), (3, 4), (5, 6), (7, 8)]

    multiple_tool = SelfMultiple(add, process=10, params=params, custom_callback=False)
    multiple_tool.run()

在这里插入图片描述
个人测试了一下超大型任务,CPU是可以跑满的,有一个大型任务从400小时优化到了26小时。

另外,也可以用多线程的方式跑


from multiprocessing import Pool

from tqdm import tqdm
import threading
from datetime import datetime
import logging
import time


class SelfMultipleThreads:
    def __init__(self, func, process: int, params: list, custom_callback=False, callback=None,args=None):
        print("==>init customized multiple class")
        self.func = func
        self.params = params
        self.process = process
        self.custom_callback = custom_callback
        self.callback = callback
        self.threads = []
        self.args = args
        for i in range(len(params)):
            self.threads.append(threading.Thread(target=self.func, args=params[i]))
    def run(self):
        pbar = tqdm(total=len(self.params)//self.process)
        for i in range(0,len(self.threads),self.process):
            threads = self.threads[i:i+self.process]
            for thread in threads:
                thread.start()
            # 等待所有线程执行完毕
            for thread in threads:
                thread.join()
            pbar.update()
            if self.args.debug:
                break

Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐