You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

96 lines
2.6 KiB

  1. import pathlib as pl
  2. import json
  3. import importlib.machinery as impmach
  4. import multiprocessing
  5. import threading
  6. import concurrent.futures as concfut
  7. import os
  8. import time
  9. from . import batch
  10. from . import plan
  11. def execute(exp_file):
  12. dispatcher = load(exp_file)
  13. dispatcher.start()
  14. dispatcher.join()
  15. def load(exp_file):
  16. exp_plan = plan.Plan(exp_file, multiprocessing.Lock())
  17. with open(exp_file) as efile:
  18. exp_obj = json.loads(efile.read())
  19. exp_obj["load"] = pl.Path(exp_obj["load"])
  20. exp_mod = impmach.SourceFileLoader(exp_obj["load"].stem,
  21. str(exp_obj["load"])).load_module()
  22. num_workers = 1
  23. if "workers" in exp_obj:
  24. if exp_obj["workers"] == "all":
  25. num_workers = os.cpu_count()
  26. else:
  27. num_workers = int(exp_obj["workers"])
  28. return Dispatcher(exp_mod, exp_plan, num_workers)
  29. class Dispatcher (threading.Thread):
  30. def __init__(self, exp_mod, exp_plan, num_workers):
  31. threading.Thread.__init__(self)
  32. self.__num_workers = num_workers
  33. self.__workers = []
  34. self.__stop_called = threading.Event()
  35. for i in range(self.__num_workers):
  36. self.__workers.append(Worker(exp_mod, exp_plan))
  37. def run(self):
  38. for worker in self.__workers:
  39. worker.start()
  40. def wait_to_continue(workers, stop_called):
  41. any_worker_alive = any(map(lambda w: w.is_alive(), workers))
  42. while any_worker_alive and not stop_called.is_set():
  43. time.sleep(0)
  44. waiter = threading.Thread(target=wait_to_continue,
  45. args=(self.__workers,
  46. self.__stop_called))
  47. waiter.start()
  48. waiter.join()
  49. if self.__stop_called.is_set():
  50. for worker in self.__workers:
  51. worker.terminate()
  52. for worker in self.__workers:
  53. worker.join()
  54. def stop(self):
  55. self.__stop_called.set()
  56. class Worker (multiprocessing.Process):
  57. def __init__(self, exp_mod, exp_plan):
  58. multiprocessing.Process.__init__(self)
  59. self.__exp_mod = exp_mod
  60. self.__exp_plan = exp_plan
  61. def run(self):
  62. instance = self.__exp_plan.next()
  63. while instance != None:
  64. self.__exp_mod.run(instance)
  65. self.__exp_plan.done_with(instance)
  66. instance = self.__exp_plan.next()
  67. def terminate(self):
  68. self.__exp_plan.delete()
  69. multiprocessing.Process.terminate(self)