You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

114 lines
2.9 KiB

  1. import pathlib as pl
  2. import json
  3. import importlib.machinery as impmach
  4. import multiprocessing
  5. import threading
  6. import concurrent.futures as concfut
  7. import os
  8. import time
  9. from . import batch
  10. from . import plan
  11. def execute(exp_file):
  12. dispatcher = load(exp_file)
  13. dispatcher.start()
  14. dispatcher.join()
  15. def load(exp_file):
  16. exp_plan = plan.Plan(exp_file, multiprocessing.Lock())
  17. with open(exp_file) as efile:
  18. exp_obj = json.loads(efile.read())
  19. exp_obj["load"] = pl.Path(exp_obj["load"])
  20. exp_mod = impmach.SourceFileLoader(exp_obj["load"].stem,
  21. str(exp_obj["load"])).load_module()
  22. num_workers = 1
  23. if "workers" in exp_obj:
  24. if exp_obj["workers"] == "all":
  25. num_workers = os.cpu_count()
  26. else:
  27. num_workers = int(exp_obj["workers"])
  28. return Dispatcher(exp_mod, exp_plan, num_workers)
  29. class Dispatcher (threading.Thread):
  30. def __init__(self, exp_mod, exp_plan, num_workers):
  31. threading.Thread.__init__(self)
  32. self.__num_workers = num_workers
  33. self.__workers = []
  34. self.__stop_called = threading.Event()
  35. self.__exp_mod = exp_mod
  36. for i in range(self.__num_workers):
  37. self.__workers.append(Worker(exp_mod, exp_plan))
  38. def run(self):
  39. for worker in self.__workers:
  40. worker.start()
  41. def wait_to_continue(workers, stop_called):
  42. any_worker_alive = lambda: any(map(lambda w: w.is_alive(), workers))
  43. while any_worker_alive() and not stop_called.is_set():
  44. time.sleep(0)
  45. waiter = threading.Thread(target=wait_to_continue,
  46. args=(self.__workers,
  47. self.__stop_called))
  48. waiter.start()
  49. waiter.join()
  50. if self.__stop_called.is_set():
  51. for worker in self.__workers:
  52. worker.terminate()
  53. for worker in self.__workers:
  54. worker.join()
  55. self.__done()
  56. def stop(self):
  57. self.__stop_called.set()
  58. def num_active_workers(self):
  59. count = 0
  60. for worker in self.__workers:
  61. count += 1 if worker.is_alive() else 0
  62. return count
  63. def __done(self):
  64. if hasattr(self.__exp_mod, "done"):
  65. self.__exp_mod.done()
  66. class Worker (multiprocessing.Process):
  67. def __init__(self, exp_mod, exp_plan):
  68. multiprocessing.Process.__init__(self)
  69. self.__exp_mod = exp_mod
  70. self.__exp_plan = exp_plan
  71. def run(self):
  72. instance = self.__exp_plan.next()
  73. while instance != None:
  74. self.__exp_mod.run(instance)
  75. self.__exp_plan.done_with(instance)
  76. instance = self.__exp_plan.next()
  77. def terminate(self):
  78. self.__exp_plan.delete()
  79. multiprocessing.Process.terminate(self)