Source code for apmapflow.openfoam.parallel_mesh_gen

"""
================================================================================
Parallel Mesh Generation
================================================================================
| A class build to handle generation of a blockMeshDict in parallel using the
| mergeMeshes and stitchMesh OpenFoam utilities.

| Written By: Matthew Stadelman
| Date Written: 2016/08/09
| Last Modifed: 2016/08/15

"""
import os
import re
from queue import Queue, Empty
from shutil import rmtree
from subprocess import Popen, PIPE
from threading import Event, Thread
import scipy as sp
from scipy.sparse import csgraph
from .. import DataField, _get_logger
from .openfoam import OpenFoamFile, OpenFoamDict, OpenFoamList
from .block_mesh_dict import BlockMeshDict


# module globals
_blockMesh_error = Event()
_mergeMesh_error = Event()
_stitchMesh_error = Event()
logger = _get_logger(__name__)


[docs]class DataFieldRegion(DataField): r""" Used to manipulate a specfic data region of a DataField. In order to maintain data integrity point data is not able to be recalculated here. """ def __init__(self, data, point_data): # # setting up the region if data.shape != point_data.shape[0:2]: msg = 'data and point_data have different dimensions: {} != {}' raise ValueError(msg.format(data.shape, point_data.shape[:2])) # super().__init__(data) self.point_data = sp.copy(point_data) def create_point_data(self): msg = 'DataFieldRegions cannot have point_data recalculated' raise NotImplementedError(msg)
[docs]class BlockMeshRegion(BlockMeshDict): r""" Used to handle a sub-set of field point data for parallel mesh generation """
[docs] def __init__(self, region, avg_fact, x_shift, z_shift, mesh_params, offset_reg): r""" Takes a field object and a set of mesh params to set the properties of the blockMeshDict. The x_shift and z_shift values are multipled by avg_fact. data_region : DataFieldRegion object avg_fact : float, number of voxels along the X and Z axes. x_shift : int, number of voxels shifted from orgin z_shift : int, number of voxels shifted from origin mesh_params : dict, a dictionary containing parameters to offsets : DataFieldRegion object use instead of the defaults """ self.x_shift = x_shift self.z_shift = z_shift super().__init__(region, avg_fact, mesh_params=mesh_params, offset_field=offset_reg)
[docs] def _create_blocks(self, cell_mask=None): r""" Generates blocks and verticies the same as the parent class and then applies the x and z shift """ # super()._create_blocks(cell_mask) self._vertices[:, 0] += self.avg_fact * self.x_shift self._vertices[:, 2] += self.avg_fact * self.z_shift
[docs] def run_block_mesh(self, mesh_type, path, system_dir, t_name, overwrite): r""" Writes the blockMeshDict and runs blockMesh on a region """ # # writing files based on mesh type if re.search('symmetry', mesh_type): self.write_symmetry_plane(path=path, overwrite=overwrite) else: self.write_foam_file(path=path, overwrite=overwrite) # # copying system directory to region directory new_sys_path = os.path.join(path, 'system') os.system('cp -r {0} {1}'.format(system_dir, new_sys_path)) # # running blockMesh cmd = ('blockMesh', '-case', path) msg = 'Running: %s in thread: %s' logger.info(msg, ' '.join(cmd), t_name) proc = Popen(cmd, stdout=PIPE, stderr=PIPE, universal_newlines=True) out, err = proc.communicate() if proc.poll() != 0: print(out) print(err) raise OSError('Mesh generation failure')
[docs]class MergeGroup(object): r""" Handles merging of meshes and stitching any patches that become internal """
[docs] def __init__(self, region_id, external_patches, path): r""" Sets up the initial merge group as a single region. External patches is a dictionary with an entry for each side, {side: 'patch_name'}. The attribute 'external_patches' has the same format except its entries for each side are lists. """ # super().__init__() self.region_id = region_id self.regions = sp.array([region_id], ndmin=2, dtype=int) self.region_dir = os.path.join(path, 'mesh-region{}'.format(region_id)) # self.external_patches = {} for side, patch_name in external_patches.items(): self.external_patches[side] = [patch_name]
[docs] def merge_regions(self, region_in, direction, t_name): r""" Merges two regions updating the external_patches dict and stitching and patches that become internal. It is assumed the region_in will be a higher 'id' than this region. """ # # setting direction specific data if direction == 'right': axis = 1 external_keys = ['bottom', 'top'] swap = ['right', 'left'] elif direction == 'top': axis = 0 external_keys = ['left', 'right'] swap = ['top', 'bottom'] # # updating regions array self.regions = sp.append(self.regions, region_in.regions, axis=axis) # # appending to external_patches for key in external_keys: self.external_patches[key] += region_in.external_patches[key] # # swapping patches in merge direction and setting internal patches internal_patches = zip(self.external_patches[swap[0]], region_in.external_patches[swap[1]]) # self.external_patches[swap[0]] = region_in.external_patches[swap[0]] # # merging regions and then stitching internal patches master_path = self.region_dir slave_path = region_in.region_dir cmd = ('mergeMeshes', '-overwrite', master_path, slave_path) msg = 'Running: %s in thread: %s' logger.info(msg, ' '.join(cmd), t_name) proc = Popen(cmd, stdout=PIPE, stderr=PIPE, universal_newlines=True) out, err = proc.communicate() if proc.poll() != 0: print(out) print(err) raise OSError('Mesh merging failure') # # removing slave directiory and cleaning the polyMesh of merge files rmtree(slave_path) self._clean_polymesh(master_path) # self.stitch_patches(internal_patches, t_name)
[docs] def stitch_patches(self, internal_patches, t_name): r""" Stitches all internal patches in the region together """ # # stitching faces together to couple them path = self.region_dir args = ['stitchMesh', '-case', path, '-overwrite', '-perfect', '', ''] for master, slave in internal_patches: # args[5] = master args[6] = slave cmd = tuple(args) msg = 'Running: %s in thread: %s' logger.info(msg, ' '.join(cmd), t_name) proc = Popen(cmd, stdout=PIPE, stderr=PIPE, universal_newlines=True) out, err = proc.communicate() if proc.poll() != 0: print(out) print(err) _stitchMesh_error.set() self._clean_polymesh(path)
@staticmethod
[docs] def _clean_polymesh(path): r""" Removes files left over from merging and stitching meshes together """ polymesh_path = os.path.join(path, 'constant', 'polyMesh') os.system('rm -f '+os.path.join(polymesh_path, '*Zones')) os.system('rm -f '+os.path.join(polymesh_path, 'meshMod*'))
[docs]class ParallelMeshGen(object): r""" Handles creation of a large mesh in parallel utilizing the OpenFoam utilties mergeMesh and stitchMesh. """ def __init__(self, field, system_dir, nprocs=4, **kwargs): # super().__init__() # # field attributes that are copied over field.create_point_data() self.nx = field.nx self.nz = field.nz self.data_vector = field.data_vector self.data_map = field.data_map self.point_data = field.point_data self._field = DataField(field.data_map) self._field.point_data = sp.copy(field.point_data) self._mask = sp.ones(self.data_map.shape, dtype=bool) # self.offset_map = sp.zeros(self.data_map.shape) self.offset_points = sp.zeros(self.point_data.shape) if kwargs.get('offset_field', None): kwargs['offset_field'].create_point_data() self.offset_map = sp.copy(kwargs['offset_field'].data_map) self.offset_points = sp.copy(kwargs['offset_field'].point_data) # self.system_dir = system_dir self.nprocs = nprocs self.avg_fact = kwargs.get('avg_fact', 1.0) self.mesh_params = kwargs.get('mesh_params', {}) self.merge_groups = []
[docs] def generate_mesh(self, mesh_type='simple', path='.', ndivs=8, **kwargs): r""" Generates multiple mesh types and outputs them to a specific path. Valid mesh_types are: simple, threshold and symmetry. Additional kwargs need to be supplied for the given mesh type if it needs additional keywords. """ # # resetting error Events _blockMesh_error.clear() _mergeMesh_error.clear() _stitchMesh_error.clear() # grid = sp.arange(0, ndivs*ndivs, dtype=int) grid = sp.reshape(grid, (ndivs, ndivs)) self.merge_groups = [] # # updating mask if threshold if re.search('threshold', mesh_type): min_value = kwargs.get('min_value', 0.0) max_value = kwargs.get('max_value', 1e9) self._field.threshold_data(min_value, max_value, repl=0.0) self._field.copy_data(self) # adj_matrix = self._field.create_adjacency_matrix() num_cs, cs_ids = csgraph.connected_components(csgraph=adj_matrix, directed=False) # only saving the largest cluster if num_cs > 1: cs_count = sp.zeros(num_cs, dtype=int) for cs_num in cs_ids: cs_count[cs_num] += 1 self.data_vector[sp.where(cs_ids != sp.argmax(cs_count))[0]] = 0.0 self.data_map = sp.reshape(self.data_vector, self.data_map.shape) # self._mask = self.data_map > 0.0 # self._create_subregion_meshes(ndivs, mesh_type=mesh_type, path=path, **kwargs) # self._merge_submeshes(grid) # if not _stitchMesh_error.is_set(): self._remove_leftover_patches(path)
[docs] def _create_subregion_meshes(self, ndivs, **kwargs): r""" Divides the data map into smaller regions and creates a BlockMeshRegion object for each one. """ # # calculating region sizes r0 is first the col and row 0 # rn is the rest of the region sizes r0_nx = int(self.nx/ndivs) + self.nx % ndivs r0_nz = int(self.nz/ndivs) + self.nz % ndivs rn_nx = int((self.nx - r0_nx)/(ndivs - 1)) rn_nz = int((self.nz - r0_nz)/(ndivs - 1)) # # setting up data slices slice_list = [(slice(0, r0_nz), slice(0, r0_nx))] x_st = slice_list[-1][1].stop for i in range(1, ndivs): slice_list.append((slice(0, r0_nz), slice(x_st, x_st+rn_nx))) x_st = slice_list[-1][1].stop # z_st = slice_list[-1][0].stop for i in range(1, ndivs): slice_list.append((slice(z_st, z_st+rn_nz), slice(0, r0_nx))) x_st = slice_list[-1][1].stop # for i in range(1, ndivs): x_en, z_en = x_st+rn_nx, z_st+rn_nz slice_list.append((slice(z_st, z_en), slice(x_st, x_en))) x_st = slice_list[-1][1].stop # z_st = slice_list[-1][0].stop # # creating queue from slice list region_queue = Queue() for i, (z_slice, x_slice) in enumerate(slice_list): region_queue.put((i, (z_slice, x_slice))) # # creating worker threads n_max = region_queue.qsize() for i in range(min(self.nprocs, n_max)): t_name = 'BlockMesh Worker {}'.format(i) thread = Thread(name=t_name, target=self._create_regions_thread, args=(region_queue, t_name, kwargs)) logger.debug('Thread: %s is running.', thread.name) thread.start() # region_queue.join() if _blockMesh_error.is_set(): raise OSError('Mesh Generation Failure') # self.merge_groups.sort(key=lambda grp: grp.region_id)
[docs] def _create_regions_thread(self, region_queue, t_name, kwargs): r""" Handles processing of the queue in it's own thread """ # # processing kwargs mesh_type = kwargs.get('mesh_type', 'simple') overwrite = kwargs.get('overwrite', False) path = kwargs.get('path', '.') sys_dir = self.system_dir try: while True: reg_id, (z_slice, x_slice) = region_queue.get(False) # short circuiting loop if there was an error if _blockMesh_error.is_set(): msg = '{} - There has been an error skipping region {}' logger.debug(msg.format(t_name, reg_id)) region_queue.task_done() continue # args = [reg_id, z_slice, x_slice] reg_mesh = self._setup_region(*args, **kwargs) filename = os.path.join(path, 'mesh-region{}'.format(reg_id)) # # writing mesh and calling blockMesh args = [mesh_type, filename, sys_dir, t_name, overwrite] reg_mesh.run_block_mesh(*args) region_queue.task_done() except Empty: logger.debug(t_name+' region_queue is empty.') except Exception as err: region_queue.task_done() _blockMesh_error.set() raise err
[docs] def _setup_region(self, region_id, z_slice, x_slice, **kwargs): r""" sets up an individual mesh region """ # sides = ['left', 'right', 'bottom', 'top'] patches = ['mergeLR{}', 'mergeRL{}', 'mergeBT{}', 'mergeTB{}'] labels = ['boundary.'+patch for patch in patches] external_patches = {side: side for side in sides} # # setting offset values and region x_offset = x_slice.start z_offset = z_slice.start region = DataFieldRegion(self.data_map[z_slice, x_slice], self.point_data[z_slice, x_slice, :]) offset_reg = DataFieldRegion(self.offset_map[z_slice, x_slice], self.offset_points[z_slice, x_slice, :]) # # creating regional mesh args = [region, self.avg_fact, x_offset, z_offset, self.mesh_params, offset_reg] region_mesh = BlockMeshRegion(*args) region_mesh._generate_masked_mesh(cell_mask=self._mask[z_slice, x_slice]) # # need to test for holes on merge boundaries and change patch to internal # creating map indexed 1:_blocks.size but with shape of (nz, nx) mesh_map = sp.ones(region_mesh.data_vector.size, dtype=int) mesh_map *= -sp.iinfo(int).max inds = sp.where(region_mesh.data_vector > 0)[0] mesh_map[inds] = sp.arange(inds.size) mesh_map = sp.reshape(mesh_map, region_mesh.data_map.shape) boundary_dict = { 'internal': {'bottom': [], 'top': [], 'left': [], 'right': []} } # # updating patches sides = {} if x_slice.start != 0: sides['left'] = 0 for iz in range(region_mesh.nz): IZ = iz + z_offset IX = x_offset if self._mask[IZ, IX] and not self._mask[IZ, IX-1]: boundary_dict['internal']['left'].append(mesh_map[iz, 0]) # if x_slice.stop != self.nx: sides['right'] = 1 for iz in range(region_mesh.nz): IZ = iz + z_offset IX = x_offset + region_mesh.nx - 1 if self._mask[IZ, IX] and not self._mask[IZ, IX+1]: boundary_dict['internal']['right'].append(mesh_map[iz, -1]) # if z_slice.start != 0: sides['bottom'] = 2 for ix in range(region_mesh.nx): IZ = z_offset IX = ix + x_offset if self._mask[IZ, IX] and not self._mask[IZ-1, IX]: boundary_dict['internal']['bottom'].append(mesh_map[0, ix]) # if z_slice.stop != self.nz: sides['top'] = 3 for ix in range(region_mesh.nx): IZ = z_offset + region_mesh.nz - 1 IX = ix + x_offset if self._mask[IZ, IX] and not self._mask[IZ+1, IX]: boundary_dict['internal']['top'].append(mesh_map[-1, ix]) # face_labels = region_mesh.face_labels for side, index in sides.items(): label = labels[index].format(region_id) external_patches[side] = patches[index].format(region_id) region_mesh.mesh_params[label+'.type'] = 'empty' region_mesh.face_labels[label] = face_labels.pop('boundary.'+side) region_mesh.set_boundary_patches(boundary_dict) # # setting up initial MergeGroup as an individual region group = MergeGroup(region_id, external_patches, kwargs.get('path', '.')) self.merge_groups.append(group) # return region_mesh
[docs] def _merge_submeshes(self, grid): r""" Handles merging and stitching of meshes based on alternating rounds of horizontal pairing and then vertical pairing. """ # def merge_worker(merge_queue, t_name): try: while True: master, slave = merge_queue.get(False) # short circuiting loop if there was an error if _mergeMesh_error.is_set(): msg = '{} - There has been an error not merging {} -> {}' logger.debug(msg.format(t_name, slave, master)) merge_queue.task_done() continue # master = self.merge_groups[master] slave = self.merge_groups[slave] master.merge_regions(slave, direction, t_name) merge_queue.task_done() except Empty: logger.debug(t_name+' merge_queue is empty.') except Exception as err: merge_queue.task_done() _mergeMesh_error.set() raise err # direction = 'right' while sp.size(grid) > 1: # # getting merge queue and updating grid merge_queue, grid = self._create_merge_queue(grid, direction) # # merging and stitching the regions together n_max = merge_queue.qsize() for i in range(min(self.nprocs, n_max)): t_name = 'MergeMesh Worker {}'.format(i) args = (merge_queue, t_name) thread = Thread(name=t_name, target=merge_worker, args=args) logger.debug('Thread: %s is running.', thread.name) thread.start() # merge_queue.join() if _mergeMesh_error.is_set(): raise OSError('Mesh Merging failure') direction = 'top' if direction == 'right' else 'right'
@staticmethod
[docs] def _create_merge_queue(grid, direction): r""" Determines the region merge queue based on the grid supplied """ # merge_queue = Queue() new_grid = sp.copy(grid) if direction == 'right': # Horizontally pairing up regions new_nx = int(grid.shape[1]/2) + grid.shape[1] % 2 new_grid = -sp.ones(grid.shape[0]*new_nx, dtype=int) # for iz in range(grid.shape[0]): i = iz*new_nx ix = 0 for ix in range(0, grid.shape[1]-1, 2): merge_queue.put((grid[iz, ix], grid[iz, ix+1])) new_grid[i] = grid[iz, ix] i += 1 if ix != grid.shape[1]-2: new_grid[i] = grid[iz, -1] # new_grid = sp.reshape(new_grid, (grid.shape[0], new_nx)) # elif direction == 'top': # Vertically pairing up regions new_nz = int(grid.shape[0]/2) + grid.shape[0] % 2 new_grid = -sp.ones(new_nz*grid.shape[1], dtype=int) # for ix in range(grid.shape[1]): i = ix iz = 0 for iz in range(0, grid.shape[0]-1, 2): merge_queue.put((grid[iz, ix], grid[iz+1, ix])) new_grid[i] = grid[iz, ix] i += grid.shape[1] if iz != grid.shape[0]-2: new_grid[i] = grid[-1, ix] # new_grid = sp.reshape(new_grid, (new_nz, grid.shape[1])) # return merge_queue, new_grid
@staticmethod
[docs] def _remove_leftover_patches(path): r""" Removes all left over merge patches from theboundary file """ file_name = os.path.join(path, 'mesh-region0', 'constant', 'polyMesh') file_name = os.path.join(file_name, 'boundary') # boundary_file = OpenFoamFile(file_name) list_key = list(boundary_file.keys())[0] # # only keeping non-merge patches face_list = [] for patch in boundary_file[list_key]: if not isinstance(patch, OpenFoamDict): continue if not re.match('merge', patch.name): face_list.append(patch) # # writing out new file del boundary_file[list_key] list_key = str(len(face_list)) boundary_file[list_key] = OpenFoamList(list_key, face_list) path = os.path.join(path, 'mesh-region0') boundary_file.write_foam_file(path=path, overwrite=True)