"""
================================================================================
Bulk Run
================================================================================
| This stores BulkRun class used for running multiple concurrent simulations
| Written By: Matthew Stadelman
| Date Written: 2016/06/16
| Last Modifed: 2016/06/16
"""
from collections import defaultdict
from itertools import product
import string
from time import sleep
from .. import _get_logger, set_main_logger_level
from .run_model import estimate_req_RAM, run_model
# module globals
logger = _get_logger(__name__)
[docs]class BulkRun(dict):
r"""
Handles generating a collection of input files from the provided parameters
and then running multiple instances of the LCL model concurrently to
produce simulation data for each simulation parameter combination. A
comprehensive example of this class and the associated script is avilable
under the Usage Examples page.
Parameters
----------
init_input_file : apmapflow.run_model.InputFile
An inital InputFile instance to define the static parameters of the bulk run.
num_CPUs : int, optional
The maximum number of CPUs to utilize
sys_RAM : float, optional
The maximum amount of RAM avilable for use.
**kwargs : multiple
* delim : string
The expected delimiter in the aperture map files
* spawn_delay : float
The minimum time between spawning of new LCL instances in seconds
* retest_delay : float
The time to wait between checking for completed processes.
Examples
--------
>>> from apmapflow import BulkRun, InputFile
>>> inp_file = InputFile('./input-file-path.inp')
>>> blk_run = BulkRun(inp_file, num_CPUs=16, sys_RAM=32.0, spawn_delay=10.0)
Notes
-----
``spawn_delay`` is useful to help ensure shared resources are not accessed
at the same time.
"""
[docs] def __init__(self, init_input_file, num_CPUs=2, sys_RAM=4.0, **kwargs):
r"""
Setting properties of the class.
"""
super().__init__()
self.init_input_file = init_input_file.clone()
self.num_CPUs = num_CPUs
self.sys_RAM = sys_RAM
self.avail_RAM = sys_RAM * 0.90
self.input_file_list = []
#
# updating keys
self.update(kwargs)
#
msg = 'Utilizing a maximum of {:d} cores, and {:f} gigabtyes of RAM'
logger.debug(msg.format(int(self.num_CPUs), self.sys_RAM))
[docs] def dry_run(self):
r"""
Steps through the entire simulation creating directories and
input files without actually starting any of the simulations. This Allows
the LCL input files to be inspected before actually starting the run.
Examples
--------
>>> from apmapflow import BulkRun, InputFile
>>> inp_file = InputFile('./input-file-path.inp')
>>> blk_run = BulkRun(inp_file, num_CPUs=16, sys_RAM=32.0, spawn_delay=10.0)
>>> blk_run.dry_run()
See Also
--------
start
"""
#
orig_level = logger.getEffectiveLevel()
set_main_logger_level('debug')
#
self._initialize_run()
fmt = '{:d} simulations would be performed'
logger.info(fmt.format(len(self.input_file_list)))
#
logger.info('Writing model input files to disk for inspection')
for inp_file in self.input_file_list:
inp_file.write_inp_file()
#
logger.info('Dry run has been completed.')
logger.info('Use the start() method to begin simulations')
set_main_logger_level(orig_level)
[docs] def start(self):
r"""
Starts the bulk run, first creating the input files and then managing
the multiple processes until all input files have been processed. The
input file list must have already been generated prior to calling this
method.
See Also
--------
generate_input_files
"""
#
logger.info('Beginning bulk run of simulations')
self._initialize_run()
#
# initializing processes list and starting loop
processes = []
RAM_in_use = []
self._start_simulations(processes, RAM_in_use, **self)
while self.input_file_list:
self._check_processes(processes, RAM_in_use, **self)
self._start_simulations(processes, RAM_in_use, **self)
@staticmethod
[docs] def _combine_run_params(run_params):
r"""
Generates all possible unique combinations from a set of
parameter arrays.
Parameters
----------
run_params : dictionary
A dictionary of parameter lists to combine together
Returns
-------
parameter combinations : dictionary
A list of dictionaries where each parameter only has a single value
"""
#
# processing run_params for falsy values, i.e. empty arrays or None
run_params = {key: val for key, val in run_params.items() if val}
#
# creating a combination of all arg lists for each input map
combinations = []
for comb in product(*run_params.values()):
#
args = {key: val for key, val in zip(run_params.keys(), comb)}
combinations.append(args)
#
return combinations
[docs] def _initialize_run(self):
r"""
Assesses RAM requirements of each aperture map in use and registers the
value with the InputFile instance. This RAM measurement is later used
when determining if there is enough space available to begin a simulation.
"""
logger.info('Assesing RAM requirements of each aperture map')
#
# storing InputFile instances by aperture map for easy RAM_req updates
maps = defaultdict(list)
for inp_file in self.input_file_list:
maps[inp_file['APER-MAP'].value].append(inp_file)
#
# estimating the RAM requirement for each aperture map
keys = list(maps.keys())
RAM_per_map = estimate_req_RAM(keys, self.avail_RAM, **self)
RAM_per_map = {key: value for key, value in zip(keys, RAM_per_map)}
#
# Updating the InputFile instances with their RAM requirement
for key, value in RAM_per_map.items():
msg = 'Estimated {:f}gb RAM reqired for map: {}'
logger.info(msg.format(value, key))
for inp_file in maps[key]:
inp_file.RAM_req = value
@staticmethod
[docs] def _check_processes(processes, RAM_in_use, retest_delay=5, **kwargs):
r"""
Checks the list of currently running processes for any that have completed
removing and them from a list. If no processes have completed then the
routine sleep for a specified amount of time before checking again.
Parameters
----------
processes : list of Popen instances
The list of processes to curate.
RAM_in_use : list of floats
The list of maximum RAM each process is estimated to use.
retest_delay : floats
The time delay between testing for completed processes.
"""
while True:
for i, proc in enumerate(processes):
if proc.poll() is not None:
del processes[i]
del RAM_in_use[i]
return
#
sleep(retest_delay)
[docs] def _start_simulations(self, processes, RAM_in_use, spawn_delay=5, **kwargs):
r"""
This starts additional simulations if there is enough free RAM and
avilable CPUs.
Parameters
----------
processes : list of Popen instances
The list of processes to add any new simulations to.
RAM_in_use : list of floats
The list of maximum RAM to a new simulations requirement to.
spawn_delay : floats
The time delay between spawning of processes.
"""
#
free_RAM = self.avail_RAM - sum(RAM_in_use)
#
while True:
recheck = False
#
if len(processes) >= self.num_CPUs:
break
#
for i, inp_file in enumerate(self.input_file_list):
if inp_file.RAM_req <= free_RAM:
inp_file = self.input_file_list.pop(i)
processes.append(run_model(inp_file))
#
RAM_in_use.append(inp_file.RAM_req)
free_RAM = self.avail_RAM - sum(RAM_in_use)
recheck = True
sleep(spawn_delay)
break
#
if not recheck:
break