Source code for apmapflow.run_model.run_model
"""
================================================================================
Run Model Core
================================================================================
| This stores the basic classes and functions needed to the run model
| Written By: Matthew Stadelman
| Date Written: 2016/06/16
| Last Modifed: 2017/04/05
"""
from collections import OrderedDict
import os
import re
from shlex import split as shlex_split
from subprocess import PIPE, Popen
from threading import Thread
from time import time
from scipy import inf as sp_inf
from .. import _get_logger, DataField
# module globals
logger = _get_logger(__name__)
DEFAULT_MODEL_PATH = os.path.split(os.path.split(__file__)[0])[0]
DEFAULT_MODEL_NAME = 'apm-lcl-model.exe'
[docs]class ArgInput(object):
r"""
Stores the value of a single input line of a LCL model input file. Instances
of this class are stored in an InputFile instance's dict to manage each
parameter.
Parameters
----------
line : string
input line to parse.
"""
[docs] def __init__(self, line):
r"""
Parses the line for the input key string and value.
"""
# inital values
self._line_arr = []
self._value_index = -1
self.comment_msg = ''
self.commented_out = False
#
self._parse_line(line)
[docs] def __str__(self):
r"""
Allows direct printing of an ArgInput object in output format.
Examples
--------
>>> from apmapflow.run_model.run_model import ArgInput
>>> param = ArgInput('test-param: 123 ft')
>>> print(param)
test-param: 123 ft
See Also
--------
line
"""
return self.line
[docs] def _parse_line(self, line):
r"""
Parses a line to set attributes of the class instance.
Parameters
----------
line : string
input line to parse.
"""
#
# removing semi-colon if whole line was commented out
line = line.strip()
mat = re.match(r'^;(.*)', line)
if mat:
line = mat.group(1)
self.commented_out = True
#
# removing any comments after occurring the value
mat = re.search(r';.*', line)
if mat:
self.comment_msg = line[mat.start():].strip()
line = line[:mat.start()].strip()
#
# split the line into an array
try:
# posix=False to preserve double quotes and windows paths
self._line_arr = shlex_split(line, posix=False)
except ValueError as err:
msg = 'shlex split failed on line {} with ValueError: {}'
logger.debug(msg.format(line, err))
self._line_arr = re.split(r'\s+', line)
self._line_arr = [v.strip() for v in self._line_arr if v.strip()]
#
# ensure even a blank line produces a value array
if not self._line_arr:
self._line_arr = ['']
#
# if line has a colon the field after it will be used as the value
# otherwise the whole line is considered the value
if not re.search(r':(:?\s|$)', line):
return
#
for ifld, field in enumerate(self._line_arr):
if not re.search(r':$', field):
continue
#
self._value_index = ifld + 1
if len(self._line_arr) <= self._value_index:
self._line_arr.append('') # add index for value
if len(self._line_arr) <= self._value_index + 1:
self._line_arr.append('') # add index for unit
break
@property
def keyword(self):
r"""
Returns the keyword used to register this instance to an
InputFile instance.
"""
return re.sub(r':$', '', self._line_arr[0])
@property
def value(self):
r"""
Returns the value of the parameter stored by the class instance or sets
the value of the instance. When setting the value if a tuple is passed
instead of a scalar the first entry is used to set the value and
the second's truth value sets the .commented out attribute of the instance.
Parameters
----------
value : scalar or tuple, optional
When value is supplied the property is set
Examples
--------
>>> from apmapflow.run_model.run_model import ArgInput
>>> param = ArgInput('test-param: value-123')
>>> param.value
'value-123'
>>> param.commented_out
False
>>> param.value = 'new-value'
>>> param.value
'new-value'
>>> param.value = ('commented-out', True)
>>> param.value
'commented-out'
>>> param.commented_out
True
"""
if self._value_index > -1:
return self._line_arr[self._value_index]
else:
return ' '.join(self._line_arr)
@value.setter
def value(self, value):
r"""
Sets the value of an arg input, if a tuple is passed in the second
value is used to determine if the value should be commented out or not
"""
comment = False
if (isinstance(value, (list, tuple))):
comment = True if value[1] else False
value = value[0]
self.commented_out = comment
#
if self._value_index == -1:
self._parse_line(str(value))
else:
self._line_arr[self._value_index] = str(value)
@property
def unit(self):
r"""
Returns the given units a value is in or None, and can also be used
to set the units of a value.
Parameters
----------
value : string, optional
If supplied with a value then the units of a instance are set to it.
Examples
--------
>>> from apmapflow.run_model.run_model import ArgInput
>>> param = ArgInput('test-param: 123 ft')
>>> param.unit
'ft'
>>> param.unit = 'mm'
>>> param.unit
'mm'
"""
if self._value_index > -1:
return self._line_arr[self._value_index + 1]
else:
return None
@unit.setter
def unit(self, unit):
self._line_arr[self._value_index + 1] = str(unit)
@property
def line(self):
r"""
Return a formatted line meant for use when writing an InputFile isntance
to disk. The line is prefixed by ``;`` if the parameter is supposed to
be commented out.
Examples
--------
>>> from apmapflow.run_model.run_model import ArgInput
>>> param = ArgInput('test-param: 123 ft')
>>> param.line
'test-param: 123 ft'
>>> param.commented_out = True
>>> param.line
';test-param: 123 ft'
See Also
--------
__str__
"""
cmt = ';' if self.commented_out else ''
return cmt + ' '.join(self._line_arr) + self.comment_msg
[docs]class AsyncCommunicate(Thread):
r"""
Allows an executable to be run in a separate thread so it does not block the
main Python process.
Parameters
----------
popen_obj : Popen instance
An instance containing a process that is currently executing.
"""
def __init__(self, popen_obj):
self.popen_obj = popen_obj
super().__init__()
[docs] def run(self):
r"""
Calls the communicate method on the Popen object registered to the class
which blocks the thread until the process terminates. The total execution
time, stdout and stderr of the process are passed back to the Popen object.
"""
out, err = self.popen_obj.communicate()
self.popen_obj.stdout_content, self.popen_obj.stderr_content = out, err
self.popen_obj.end_time = time()
#
msg = '\n\t'.join([
'Completed Simulation:',
'input file: {}',
'Time Required: {:0.3f} minutes',
'Exit Code: {}'
])
treq = (self.popen_obj.end_time - self.popen_obj.start_time)/60.0
logger.info(msg.format(
self.popen_obj.input_file.outfile_name,
treq,
self.popen_obj.returncode))
[docs]class InputFile(OrderedDict):
r"""
Used to read and write and manipulate LCL model input files. Each key-value
pair stored on an instance of this class is actually an instance of the
ArgInput class.
Parameters
----------
infile : string or InputFile instance
Either is a filepath to read an input file from or an existing instance
to copy.
filename_formats : dictionary, optional
A dictionary of filename formats which use Python format strings to
dynamically generate names for the LCL model input and output files.
Examples
--------
>>> from apmapflow.run_model import InputFile
>>> inp_file = InputFile('input-file-path.inp')
>>> fmts = {'VTK-FILE': '{apmap}-data.vtk'}
>>> inp_file2 = InputFile(inp_file, filename_formats=fmts)
Notes
-----
Any ``filename_formats`` defined will overwrite a parameter that was
manually defined by directly setting the value.
The ``__setitem__`` method of
has been subclassed to transparently update the value attribute of the
ArgInput instance for the corresponding key instead of the value itself.
"""
def __init__(self, infile, filename_formats=None):
#
super().__init__()
self.filename_format_args = {}
self.RAM_req = None
self.outfile_name = 'lcl_model_param_file.inp'
#
if filename_formats is None:
filename_formats = {}
self.filename_formats = dict(filename_formats)
#
if isinstance(infile, InputFile):
self.outfile_name = infile.outfile_name
else:
self.outfile_name = infile
#
self.parse_input_file(infile)
#
if 'input_file' not in filename_formats:
self.filename_formats['input_file'] = self.outfile_name
[docs] def __str__(self):
r"""
Writes out the input file as if it was being written to disk.
"""
#
# updating filenames to match current args
self._construct_file_names()
#
# building content from ArgInput class line attribute
content = ''
for arg_input in self.values():
content += arg_input.line + '\n'
#
return content
[docs] def __setitem__(self, key, value, new_param=False):
r"""
Subclassed to pass the value directly to the value attribute of the
ArgInput instance stored on the provided key unless the ``new_param``
argument evaluates to ``True``.
Parameters
----------
key : string
The key on the dictionary to update
value : string or ArgInput instance
The value to set the given key to
new_param : boolean, optional
If ``True`` then the value is not passed on to the ArgInput instance
and the actual value on the InputFile instance is changed.
Examples
--------
>>> from apmapflow.run_model import InputFile
>>> inp_file = InputFile('input-file-path.inp')
>>> inp_file['parameter'] = '123'
>>> inp_file['parameter']
<apmapflow.run_model.run_model.ArgInput object at #########>
>>> inp_file['parameter'].value
'123'
>>> inp_file.__setitem__('parameter', 'param-value', new_param=True)
>>> inp_file['parameter']
'param-value'
Notes
-----
If ``new_param`` is falsy and the key does not already exist a KeyError
exeception is raised. The ``add_parameter`` method is the standard way to
add new ArgInput instances to the InputFile instance
See Also
--------
add_parameter
"""
if new_param:
super().__setitem__(key, value)
else:
try:
self[key].value = value
except KeyError:
msg = "'{}' is not set, use .add_parameter method to set param"
raise KeyError(msg.format(key))
#
if key == 'EXE-FILE':
self.set_executable()
[docs] def parse_input_file(self, infile):
r"""
Populates the InputFile instance with data from a file or copies
an existing instance passed in.
Parameters
----------
infile : string or InputFile instance
Either is a filepath to read an input file from or an existing InputFile
instance to copy.
See Also
--------
InputFile
clone
"""
#
if isinstance(infile, self.__class__):
content = str(infile)
file_path = os.path.realpath(infile.outfile_name)
else:
file_path = os.path.realpath(infile)
with open(infile, 'r') as fname:
content = fname.read()
self.infile = file_path
#
# parsing contents into input_file object
content_arr = content.split('\n')
for line in content_arr:
self.add_parameter(line)
#
self.set_executable()
[docs] def add_parameter(self, line):
r"""
Adds a parameter to the input file by parsing a line into an ArgInput
class instance. The line supplied needs to be the same as if it were being
manually typed into the actual input file.
Parameters
----------
line: string
The provided line to parse and append to the InputFile instance.
Examples
--------
>>> from apmapflow.run_model import InputFile
>>> inp_file = InputFile('input-file-path.inp')
>>> inp_file.add_parameter('NEW-PARAM: 1337 ;elite param')
>>> inp_file['NEW-PARAM'].value
'1337'
Notes
-----
The InputFile inheirits from an OrderedDict so new parameters get added
to the bottom of the file.
"""
line = re.sub(r'^(;+)\s+', r'\1', line)
arg = ArgInput(line)
self.__setitem__(arg.keyword, arg, new_param=True)
[docs] def set_executable(self):
r"""
Sets the path to an LCL model executable based on the ``EXE-FILE``
parameter for the current InputFile instance. The path is checked relative
to the InputFile instance's ``infile`` attribute. If no file is found
a warning is issued and the executable path defaults to the version
packaged with the module.
Examples
--------
>>> from apmapflow.run_model import InputFile
>>> inp_file = InputFile('./input-file-path.inp')
>>> inp_file['EXE-FILE'] = 'my-locally-compiled-model.exe'
>>> inp_file.set_executable()
>>> inp_file.executable
'./my-locally-compiled-model.exe'
Notes
-----
This method needs to be called if the ``EXE-FILE`` parameter is added,
changed or removed.
"""
self.executable = None
#
if self.__contains__('EXE-FILE'):
self['EXE-FILE'].commented_out = True
exec_file = self['EXE-FILE'].value
#
if not os.path.isabs(exec_file):
exec_file = os.path.join(os.path.dirname(self.infile), exec_file)
exec_file = os.path.realpath(exec_file)
if os.path.exists(exec_file):
self.executable = exec_file
else:
logger.warning('The exe file specified does not exist: ' + exec_file)
#
if not self.executable:
self.executable = os.path.join(DEFAULT_MODEL_PATH, DEFAULT_MODEL_NAME)
[docs] def clone(self, file_formats=None):
r"""
Creates a new InputFile instance populated with the current instance
data. New ArgInput instances are created to prevent mutation.
Parameters
----------
filename_formats : dictionary, optional
A dictionary of filename formats which use Python format strings to
dynamically generate names for the LCL model input and output files.
Returns
-------
input_file : apmapflow.run_model.InputFile
The cloned input file instance
Examples
--------
>>> from apmapflow.run_model import InputFile
>>> fmts = {'VTK-FILE': '{apmap}-data.vtk'}
>>> inp_file = InputFile('input-file-path.inp', filename_formats=fmts)
>>> inp_file_copy = inp_file.clone()
>>> inp_file_copy.filename_formats
{'VTK-FILE': '{apmap}-data.vtk'}
>>> inp_file_copy = inp_file.clone(file_formats={})
>>> inp_file_copy.filename_formats
{}
Notes
-----
If the ``filename_formats`` parameter is omitted then the formats from
the current instance are copied over.
"""
if file_formats is None:
file_formats = self.filename_formats
#
input_file = InputFile(self, filename_formats=file_formats)
#
return input_file
[docs] def update(self, *args, **kwargs):
r"""
Updates the InputFile instance, passing any unknown keys to the
filename_format_args dictionary instead of raising a KeyError like
in __setitem__
Parameters
----------
\*args, \*\*kwargs : any valid dictionary initializer value set
The resulting dictionary formed internally is used to update the
instance
Examples
--------
>>> from apmapflow.run_model import InputFile
>>> fmts = {'VTK-FILE': '{apmap}-data.vtk'}
>>> inp_file = InputFile('input-file-path.inp', filename_formats=fmts)
>>> new_vals = {'OUTLET-SIDE': 'LEFT', 'apmap': 'fracture-1'}
>>> inp_file.update(new_vals)
>>> inp_file['OUTLET-SIDE'].value
'LEFT'
>>> inp_file.filename_format_args
{'apmap': 'fracture-1'}
"""
if len(args) > 1:
msg = 'update expected at most 1 arguments, got {:d}'
raise TypeError(msg.format(len(args)))
other = dict(*args, **kwargs)
for key in other:
try:
self[key] = other[key]
except KeyError:
self.filename_format_args[key] = other[key]
#
# setting up new filenames
self._construct_file_names()
[docs] def get_uncommented_values(self):
r"""
Generate and return all uncommented parameters as an OrderedDict.
Returns
-------
uncommented_values : OrderedDict
An OrderedDict containing all ArgInput instances that were not
commented out
"""
#
args = [(key, arg) for key, arg in self.items() if not arg.commented_out]
#
return OrderedDict(args)
[docs] def _construct_file_names(self, make_dirs=False):
r"""
This updates the instance's outfile names to match current arguments.
Parameters
----------
make_dirs : boolean, optional
If ``make_dirs`` evaluates to True then all parent directories for each
output file are created as well.
"""
#
outfiles = {key: value for key, value in self.filename_formats.items()}
format_args = {key: arg.value for key, arg in self.items()}
format_args.update(self.filename_format_args)
#
for keyword in outfiles.keys():
outfiles[keyword] = outfiles[keyword].format(**format_args)
#
# checking existance of directories and updating dict
for fname in outfiles.keys():
try:
self[fname] = outfiles[fname]
except KeyError:
if fname == 'input_file':
pass
else:
msg = 'Outfile: {} not defined in initialization file'
logger.error(msg.format(fname))
raise KeyError(fname)
#
if not make_dirs:
continue
#
# using path split to prevent creating directories out of filenames
dir_arr = list(os.path.split(outfiles[fname]))
dir_arr[0] = '.' if not dir_arr[0] else dir_arr[0]
path = os.path.join(*dir_arr[:-1])
if not os.path.isdir(path):
os.makedirs(path)
#
self.outfile_name = outfiles['input_file']
[docs] def write_inp_file(self, alt_path=None):
r"""
Writes an input file to the ``outfile_name`` attribute applying any
formats defined based the current parameter values.
Parameters
----------
alt_path : string,
An alternate path to preappend to the generated filename
>>> from apmapflow.run_model import InputFile
>>> inp_file = InputFile('input-file-path.inp')
>>> inp_file.write_inp_file(alt_path='.')
"""
#
# creating file directories and generating input file
self._construct_file_names(make_dirs=True)
content = str(self)
#
file_name = self.outfile_name
if alt_path:
file_name = os.path.join(alt_path, file_name)
#
with open(file_name, 'w') as fname:
fname.write(content)
#
logger.info('Input file saved as: ' + file_name)
[docs]def estimate_req_RAM(input_maps, avail_RAM=sp_inf, suppress=False, **kwargs):
r"""
Reads in the input maps to estimate the RAM requirement of each map
and to make sure the user has alloted enough space. The RAM estimation is a
rough estimate based on a linear regression of several simulations of varying
aperture map size.
Parameters
----------
input_maps : list
A list of filepaths to read in
avail_RAM : float, optional
The maximum amount of RAM avilable on the system to be used. When exceeded
an EnvironmentError is raised and an error message is generated.
suppress : boolan, optional
If it evaluates out to True and a map exceeds the ``avail_RAM`` the
EnvironmentError is suppressed.
**kwargs : optional
Additional keyword args to pass on to the DataField initialization.
Returns
-------
ram_values : list
A list of floats with the corresponding RAM estimate for each of the maps
passed in.
Examples
--------
>>> from apmapflow.run_model import estimate_req_RAM
>>> maps = ['fracture-1.txt', 'fracture-2.txt', 'fracture-3.txt']
>>> estimate_req_RAM(maps, avail_RAM=8.0, suppress=True)
[6.7342, 8.1023, 5.7833]
"""
RAM_per_map = []
error = False
for fname in input_maps:
#
field = DataField(fname, **kwargs)
tot_coef = (field.nx * field.nz)**2
RAM = 0.00505193 * tot_coef**(0.72578813)
RAM = RAM * 2**(-20) # KB -> GB
RAM_per_map.append(RAM)
if RAM > avail_RAM:
error = True
fmt = 'Map {} requires {} GBs of RAM only {} GBs was alloted.'
logger.fatal(fmt.format(fname, RAM, avail_RAM))
if error and not suppress:
raise EnvironmentError
#
return RAM_per_map
[docs]def run_model(input_file_obj, synchronous=False, show_stdout=False):
r"""
Runs an instance of the LCL model defined by the InputFile instance passed in.
Parameters
----------
input_file_obj : apmapflow.run_model.InputFile
An InputFile instance with the desired simulation parameters to run.
synchronous : boolean, optional
If True then run_model will block the main Python execution thread until
the simulation is complete.
show_stdout : boolean, optional
If True then the stdout and stderr produced during the simulation run are
printed to the screen instead of being stored on the Popen instance
Returns
-------
model_popen_obj : Popen
The Popen instance that contains the LCL model process, which may or man
not be finished executing at upon return.
Examples
--------
>>> from apmapflow.run_model import InputFile, run_model
>>> inp_file = InputFile('input-file-path.inp')
>>> proc = run_model(inp_file) # asynchronous run process isn't completed yet
>>> proc.returncode
None
>>> # process is finished upon return when using synchronous=True
>>> proc2 = run_model(inp_file, synchronous=True)
>>> proc2.returncode
0
Notes
-----
This writes out the inputfile at the perscribed path, a pre-existing file
will be overwritten.
"""
input_file_obj.write_inp_file()
exe_file = os.path.abspath(input_file_obj.executable)
logger.debug('Using executable located at: ' + exe_file)
cmd = (exe_file, input_file_obj.outfile_name)
#
out = PIPE
if show_stdout:
out = None
#
# beginning simulation
proc = Popen(cmd, stdout=out, stderr=out, universal_newlines=True)
proc.input_file = input_file_obj
proc.start_time = time()
#
msg = 'Beginning Simulation:\n\tInput File: {} \n\tProcess ID: {}'
logger.info(msg.format(input_file_obj.outfile_name, proc.pid))
#
async_comm = AsyncCommunicate(proc)
async_comm.start()
#
if synchronous:
async_comm.join()
#
return proc