You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

105 lines
2.7 KiB

import pathlib as pl
import json
import importlib.machinery as impmach
import multiprocessing
import threading
import concurrent.futures as concfut
import os
import time
from . import batch
from . import plan
def execute(exp_file):
dispatcher = load(exp_file)
dispatcher.start()
dispatcher.join()
def load(exp_file):
exp_plan = plan.Plan(exp_file, multiprocessing.Lock())
with open(exp_file) as efile:
exp_obj = json.loads(efile.read())
exp_obj["load"] = pl.Path(exp_obj["load"])
exp_mod = impmach.SourceFileLoader(exp_obj["load"].stem,
str(exp_obj["load"])).load_module()
num_workers = 1
if "workers" in exp_obj:
if exp_obj["workers"] == "all":
num_workers = os.cpu_count()
else:
num_workers = int(exp_obj["workers"])
return Dispatcher(exp_mod, exp_plan, num_workers)
class Dispatcher (threading.Thread):
def __init__(self, exp_mod, exp_plan, num_workers):
threading.Thread.__init__(self)
self.__num_workers = num_workers
self.__workers = []
self.__stop_called = threading.Event()
for i in range(self.__num_workers):
self.__workers.append(Worker(exp_mod, exp_plan))
def run(self):
for worker in self.__workers:
worker.start()
def wait_to_continue(workers, stop_called):
any_worker_alive = lambda: any(map(lambda w: w.is_alive(), workers))
while any_worker_alive() and not stop_called.is_set():
time.sleep(0)
waiter = threading.Thread(target=wait_to_continue,
args=(self.__workers,
self.__stop_called))
waiter.start()
waiter.join()
if self.__stop_called.is_set():
for worker in self.__workers:
worker.terminate()
for worker in self.__workers:
worker.join()
def stop(self):
self.__stop_called.set()
def num_active_workers(self):
count = 0
for worker in self.__workers:
count += 1 if worker.is_alive() else 0
return count
class Worker (multiprocessing.Process):
def __init__(self, exp_mod, exp_plan):
multiprocessing.Process.__init__(self)
self.__exp_mod = exp_mod
self.__exp_plan = exp_plan
def run(self):
instance = self.__exp_plan.next()
while instance != None:
self.__exp_mod.run(instance)
self.__exp_plan.done_with(instance)
instance = self.__exp_plan.next()
def terminate(self):
self.__exp_plan.delete()
multiprocessing.Process.terminate(self)