diff --git a/field.py b/field.py index 36aa591..f10d13b 100644 --- a/field.py +++ b/field.py @@ -1,27 +1,39 @@ import numpy class Field3d: - def __init__(self,data,origin,spacing,bounds,period): + def __init__(self,data,origin,spacing): assert len(origin)==3, "'origin' must be of length 3" assert len(spacing)==3, "'spacing' must be of length 3" - assert len(period)==3, "'period' must be of length 3" self.data = numpy.array(data) - self.data.setflags(write=False) self.numpoints = self.data.shape self.origin = tuple([float(x) for x in origin]) self.spacing = tuple([float(x) for x in spacing]) - self.period = tuple(period) + self.eps_collapse = 1e-12 return def __str__(self): str = 'Field3d with\n' str+= ' dimension: {}, {}, {}\n'.format(*self.numpoints) str+= ' origin: {:G}, {:G}, {:G}\n'.format(*self.origin) - str+= ' spacing: {:G}, {:G}, {:G}\n'.format(*self.spacing) - str+= ' period: {}, {}, {}'.format(*self.period) + str+= ' spacing: {:G}, {:G}, {:G}'.format(*self.spacing) return str +# TBD: this should return another Field3d object +# def __getitem__(self,val): +# assert isinstance(val,tuple) and len(val)==3, "Field3d must be indexed by [ii,jj,kk]." +# sl = [] +# for x in val: +# if isinstance(x,int): +# lo,hi = x,x+1 +# hi = hi if hi!=0 else None +# sl.append(slice(lo,hi)) +# elif isinstance(x,slice): +# sl.append(x) +# else: +# raise TypeError("Trajectories can only be sliced by slice objects or integers.") +# return self.data[sl[0],sl[1],sl[2]] + @classmethod - def from_chunk(cls,chunk,gridg,period=(None,None,None)): + def from_chunk(cls,chunk,gridg): '''Initialize Field3d from chunk data and global grid.''' xg,yg,zg = gridg ib,jb,kb = chunk['ibeg']-1, chunk['jbeg']-1, chunk['kbeg']-1 @@ -31,79 +43,174 @@ class Field3d: assert (chunk['nxl']+2*chunk['ighost'])==nx, "Invalid chunk data: nxl != chunk['data'].shape[0]" assert (chunk['nyl']+2*chunk['ighost'])==ny, "Invalid chunk data: nyl != chunk['data'].shape[1]" assert (chunk['nzl']+2*chunk['ighost'])==nz, "Invalid chunk data: nzl != chunk['data'].shape[2]" - return cls(chunk['data'],origin=(xo,yo,zo),spacing=(dx,dy,dz),period=period) + return cls(chunk['data'],origin=(xo,yo,zo),spacing=(dx,dy,dz)) + + def insert_subfield(self,subfield): + assert all([abs(subfield.spacing[ii]-self.spacing[ii])=0 and idx_origin[ii]0.5*self.spacing[axis]: + val = self.spacing[axis]-val + return val + + def is_within_bounds(self,coord,axis=None): + if axis is None: + assert len(coord)==3, "If 'axis' is None, 'coord' must be a tuple/list of length 3." + return tuple(self.is_within_bounds(coord[ii],axis=ii) for ii in range(3)) + assert axis<3, "'axis' must be one of 0,1,2." + idx_nearest = self.nearest_gridpoint(coord,axis=axis) + if idx_nearest>0 and idx_nearest=self.origin[ii] for ii in range(0,3)]), "New origin is out of bounds." endpoint = [origin[ii]+(numpoints[ii]-1)*spacing[ii] for ii in range(0,3)] assert all([endpoint[ii]<=self.endpoint(ii) for ii in range(0,3)]), "New end point is out of bounds." data = numpy.zeros(numpoints) if numpy.allclose(spacing,self.spacing): # spacing is the same: we can construct universal weights for the stencil - i0,j0,k0 = [numpy.floor((origin[ii]-self.origin[ii])/self.spacing[ii]).astype('int') for ii in range(0,3)] - cx,cy,cz = [numpy.remainder(origin[ii]-self.origin[ii],self.spacing[ii]) for ii in range(0,3)] - c = weights_trilinear((cx,cy,cz)) + i0,j0,k0 = self.nearest_gridpoint(origin,axis=None,lower=True) + cx,cy,cz = [self.distance_to_nearest_gridpoint(origin[ii],axis=ii,lower=True)/self.spacing[ii] + for ii in range(3)] + c = self.weights_trilinear((cx,cy,cz)) for ii in range(0,2): for jj in range(0,2): for kk in range(0,2): - if c[ii,jj,kk]>EPS_COLLAPSE: + if c[ii,jj,kk]>self.eps_collapse: data += c[ii,jj,kk]*self.data[ i0+ii:i0+ii+numpoints[0], j0+jj:j0+jj+numpoints[1], @@ -112,50 +219,75 @@ class Field3d: for ii in range(0,numpoints[0]): for jj in range(0,numpoints[1]): for kk in range(0,numpoints[2]): - coordinate = ( + coord = ( origin[0]+ii*spacing[0], origin[1]+jj*spacing[1], origin[2]+kk*spacing[2]) - data[ii,jj,kk] = self.interpolate(coordinate) - return Field3d(data,origin,spacing,self.period) + data[ii,jj,kk] = self.interpolate(coord) + return Field3d(data,origin,spacing) - def interpolate(self,coordinate,EPS_COLLAPSE=1e-12): - assert all([coordinate[ii]>=self.origin[ii] for ii in range(0,3)]), "'coordinate' is out of bounds." - assert all([coordinate[ii]<=self.endpoint(ii) for ii in range(0,3)]), "'coordinate' is out of bounds." - i0,j0,k0 = [numpy.floor((coordinate[ii]-self.origin[ii])/self.spacing[ii]).astype('int') for ii in range(0,3)] - cx,cy,cz = [numpy.remainder(coordinate[ii]-self.origin[ii],self.spacing[ii]) for ii in range(0,3)] - c = weights_trilinear((cx,cy,cz)) + def interpolate(self,coord): + assert all([coord[ii]>=self.origin[ii] for ii in range(0,3)]), "'coord' is out of bounds." + assert all([coord[ii]<=self.endpoint(ii) for ii in range(0,3)]), "'coord' is out of bounds." + i0,j0,k0 = self.nearest_gridpoint(coord,axis=None,lower=True) + cx,cy,cz = [self.distance_to_nearest_gridpoint(coord[ii],axis=ii,lower=True)/self.spacing[ii] + for ii in range(3)] + c = self.weights_trilinear((cx,cy,cz)) val = 0.0 for ii in range(0,2): for jj in range(0,2): for kk in range(0,2): - if c[ii,jj,kk]>EPS_COLLAPSE: + if c[ii,jj,kk]>self.eps_collapse: val += c[ii,jj,kk]*self.data[i0+ii,j0+jj,k0+kk] return val - def move_origin_periodic(self,shift,axis): - '''Moves origin of grid in periodic direction by shift*spacing.''' - assert self.period[axis], "Origin can only be moved in periodic directions." - self.data = numpy.roll(self.data,shift,axis=axis) - self.data.setflags(write=False) - origin = list(self.origin) - origin[axis] += shift*self.spacing[axis] - self.origin = tuple(origin) - return - @staticmethod - def __update_ghostcells_periodic(data,axis): - slsrc = [slice(None),slice(None),slice(None)] - sldst = [slice(None),slice(None),slice(None)] - slsrc[axis] = slice(1,2) - sldst[axis] = slice(-1,None) - data[sldst] = data[slsrc] - slsrc = [slice(None),slice(None),slice(None)] - sldst = [slice(None),slice(None),slice(None)] - slsrc[axis] = slice(-2,-1) - sldst[axis] = slice(0,1) - data[sldst] = data[slsrc] + def weights_trilinear(self,rel_dist): + assert len(rel_dist)==3, "len(rel_dist) must be 3." + cx,cy,cz = rel_dist + if cx<0.0 and cx>-self.eps_collapse: cx=0.0 + if cy<0.0 and cy>-self.eps_collapse: cy=0.0 + if cz<0.0 and cz>-self.eps_collapse: cz=0.0 + if cx>1.0 and cx<1.0+self.eps_collapse: cx=1.0 + if cy>1.0 and cy<1.0+self.eps_collapse: cy=1.0 + if cz>1.0 and cz<1.0+self.eps_collapse: cz=1.0 + assert cx>=0.0 and cy>=0.0 and cz>=0.0, "'rel_dist' must be >=0" + assert cx<=1.0 and cy<=1.0 and cz<=1.0, "'rel_dist' must be <=1" + c = numpy.zeros((2,2,2)) + c[0,0,0] = 1.0-(cx+cy+cz)+(cx*cy+cx*cz+cy*cz)-(cx*cy*cz) + c[1,0,0] = cx-(cx*cy+cx*cz)+(cx*cy*cz) + c[0,1,0] = cy-(cx*cy+cy*cz)+(cx*cy*cz) + c[0,0,1] = cz-(cx*cz+cy*cz)+(cx*cy*cz) + c[1,1,0] = (cx*cy)-(cx*cy*cz) + c[1,0,1] = (cx*cz)-(cx*cy*cz) + c[0,1,1] = (cy*cz)-(cx*cy*cz) + c[1,1,1] = (cx*cy*cz) + return c + + def set_writable(self,flag): + self.data.setflags(write=flag) return + def to_vtk(self): + import pyvista as pv + mesh = pv.UniformGrid() + mesh.dimensions = self.dim(axis=None) + mesh.origin = self.origin + mesh.spacing = self.spacing + mesh.point_arrays['data'] = self.data.flatten(order='F') + return mesh + + def vtk_contour(self,val): + if not isinstance(val,(tuple,list)): + val = [val] + return self.to_vtk().contour(val) + + def vtk_slice(self,normal,origin): + assert (normal in ('x','y','z') or (isinstance(normal,(tuple,list)) + and len(normal)==3)), "'normal' must be 'x','y','z' or tuple of length 3." + assert isinstance(origin,(tuple,list)) and len(origin)==3,\ + "'origin' must be tuple of length 3." + return self.to_vtk().slice(normal=normal,origin=origin) + class ChunkIterator: '''Iterates through all chunks. 'snapshot' must be an instance of a class which returns a Field3d from the method call @@ -177,19 +309,3 @@ class ChunkIterator: return field else: raise StopIteration - -def weights_trilinear(rel_dist): - assert len(rel_dist)==3, "len(rel_dist) must be 3." - assert all([rel_dist[ii]>=0.0 for ii in range(0,3)]), "'rel_dist' must be >=0" - assert all([rel_dist[ii]<=1.0 for ii in range(0,3)]), "'rel_dist' must be <=1" - cx,cy,cz = rel_dist - c = numpy.zeros((2,2,2)) - c[0,0,0] = 1.0-(cx+cy+cz)+(cx*cy+cx*cz+cy*cz)-(cx*cy*cz) - c[1,0,0] = cx-(cx*cy+cx*cz)+(cx*cy*cz) - c[0,1,0] = cy-(cx*cy+cy*cz)+(cx*cy*cz) - c[0,0,1] = cz-(cx*cz+cy*cz)+(cx*cy*cz) - c[1,1,0] = (cx*cy)-(cx*cy*cz) - c[1,0,1] = (cx*cz)-(cx*cy*cz) - c[0,1,1] = (cy*cz)-(cx*cy*cz) - c[1,1,1] = (cx*cy*cz) - return c diff --git a/parallel.py b/parallel.py new file mode 100644 index 0000000..bfe6f0a --- /dev/null +++ b/parallel.py @@ -0,0 +1,624 @@ +class PPP: + """Parallel Python Postprocessor for suspend""" + def __init__(self,comm,func_load,num_ghost,precision,origin,spacing,periodicity,bounds,proc,chunks_per_proc): + '''Constructor: except for comm, only rank 0 needs initialized data.''' + self.comm = comm + self.rank = comm.Get_rank() + self.func_load = func_load + self.init_settings(num_ghost,precision) + self.init_domain(origin,spacing,periodicity,bounds) + self.init_procgrid(proc,chunks_per_proc) + self.field = {} + self.symmetries = {} + return + + @classmethod + def from_snapshot(cls,snap,chunks_per_proc=(1,1,1),num_ghost=(1,1,1),precision='float64'): + from mpi4py import MPI + comm = MPI.COMM_WORLD + rank = comm.Get_rank() + if rank==0: + proc = snap.procgrid() + origin = snap.origin() + spacing = snap.spacing() + periodicity = snap.periodicity() + bounds = snap.bounds() + else: + proc = None + origin = None + spacing = None + periodicity = None + bounds = None + func_load = snap.field_chunk + return cls(comm,func_load,num_ghost,precision,origin,spacing,periodicity,bounds,proc,chunks_per_proc) + + def init_settings(self,num_ghost,precision): + '''Initializes PPP settings for all processors.''' + # TBD: some assertions + self.num_ghost = self.comm.bcast(num_ghost,root=0) + self.precision = self.comm.bcast(precision,root=0) + # Some shortcuts + self.nghx,self.nghy,self.nghz = self.num_ghost + + def init_symmetries(self,key,mirror=(True,True)): + '''Sets the symmetries for ghost cells behind the wall''' + # Example: wall in y + # No-slip boundary (no mirror) free-slip boundary (mirror) + # u -> -u u -> u + # v -> -v v -> -v + # w -> -w w -> w + # p -> p p -> p + import numpy as np + self.symmetries[key] = np.zeros((3,3,3),dtype='i') + if not self.xperiodic: + if key=='u': + self.symmetries[key][0,1,1] = -1 + self.symmetries[key][2,1,1] = -1 + else: + self.symmetries[key][0,1,1] = 1 if mirror[0] else -1 + self.symmetries[key][2,1,1] = 1 if mirror[1] else -1 + if not self.yperiodic: + if key=='v': + self.symmetries[key][1,0,1] = -1 + self.symmetries[key][1,2,1] = -1 + else: + self.symmetries[key][1,0,1] = 1 if mirror[0] else -1 + self.symmetries[key][1,2,1] = 1 if mirror[1] else -1 + if not self.zperiodic: + if key=='w': + self.symmetries[key][1,1,0] = -1 + self.symmetries[key][1,1,2] = -1 + else: + self.symmetries[key][1,1,0] = 1 if mirror[0] else -1 + self.symmetries[key][1,1,2] = 1 if mirror[1] else -1 + + def init_domain(self,origin,spacing,periodicity,bounds): + '''Sets up domain information for all processors''' + # TBD: some assertions + self.origin = self.comm.bcast(origin,root=0) + self.spacing = self.comm.bcast(spacing,root=0) + self.periodicity = self.comm.bcast(periodicity,root=0) + self.bounds = self.comm.bcast(bounds,root=0) + # Some shortcuts + self.xperiodic,self.yperiodic,self.zperiodic = self.periodicity + return + + def init_procgrid(self,proc,chunks_per_proc): + # Note: requires nghx, xperiodic to be set + '''Read input processor grid, compute processor grid for workers''' + import numpy as np + self.chunks_per_proc = self.comm.bcast(chunks_per_proc,root=0) + self.nxcpp = chunks_per_proc[0] + self.nycpp = chunks_per_proc[1] + self.nzcpp = chunks_per_proc[2] + if self.rank==0: + # Assert proc and add it to class + assert all(k in proc for k in ('u','v','w','p','s')), "'proc' must be a dictionary with "\ + "keys 'u','v','w','p','s'" + for key in proc: + assert len(proc[key])==6, "Entries of 'proc' must have length of 6." + proc_grid_ext = proc + # Initialize chunks per processor + # Verify that this processor grid can be redistributed onto the requested processor layout + nxp_ext = len(proc_grid_ext['u'][0]) + nyp_ext = len(proc_grid_ext['u'][2]) + nzp_ext = len(proc_grid_ext['u'][4]) + nproc_ext = nxp_ext*nyp_ext*nzp_ext + # + assert nxp_ext%self.nxcpp==0, "Number of processors must be divisible by the number "\ + "of processors per process. (nxp_ext={}, nxcpp={})".format( + nxp_ext,self.nxcpp) + assert nyp_ext%self.nycpp==0, "Number of processors must be divisible by the number "\ + "of processors per process. (nyp_ext={}, nycpp={})".format( + nyp_ext,self.nycpp) + assert nzp_ext%self.nzcpp==0, "Number of processors must be divisible by the number "\ + "of processors per process. (nzp_ext={}, nzcpp={})".format( + nzp_ext,self.nzcpp) + # Determine the new processor layout and verify total number of MPI processes + nxp = nxp_ext//self.nxcpp + nyp = nyp_ext//self.nycpp + nzp = nzp_ext//self.nzcpp + nproc = nxp*nyp*nzp + assert nproc==self.comm.Get_size(), "Number of MPI processes does not match the requested "\ + "processor layout. (MPI procs: {}, required procs: {})".format( + self.comm.Get_size(),nproc) + # Construct internal processor grid + proc_grid = {} + for key in proc_grid_ext: + proc_grid[key] = [None]*6 + proc_grid[key][0] = proc_grid_ext[key][0][0::self.nxcpp] + proc_grid[key][1] = proc_grid_ext[key][1][self.nxcpp-1::self.nxcpp] + proc_grid[key][2] = proc_grid_ext[key][2][0::self.nycpp] + proc_grid[key][3] = proc_grid_ext[key][3][self.nycpp-1::self.nycpp] + proc_grid[key][4] = proc_grid_ext[key][4][0::self.nzcpp] + proc_grid[key][5] = proc_grid_ext[key][5][self.nzcpp-1::self.nzcpp] + else: + proc_grid_ext = None + proc_grid = None + nxp_ext,nyp_ext,nzp_ext,nproc_ext = None,None,None,None + nxp,nyp,nzp,nproc = None,None,None,None + # Broadcast the data + self.proc_grid_ext = self.comm.bcast(proc_grid_ext,root=0) + self.proc_grid = self.comm.bcast(proc_grid,root=0) + self.nxp_ext,self.nyp_ext,\ + self.nzp_ext,self.nproc_ext = self.comm.bcast((nxp_ext,nyp_ext,nzp_ext,nproc_ext),root=0) + self.nxp,self.nyp,\ + self.nzp,self.nproc = self.comm.bcast((nxp,nyp,nzp,nproc),root=0) + # Get position in processor grid + self.ip,self.jp,self.kp = self.position_from_rank(self.rank,external=False) + # Determine local grid indices and size + self.chunk_bounds = {} + self.chunk_size = {} + for key in self.proc_grid: + self.chunk_bounds[key] = [None]*6 + self.chunk_bounds[key][0] = self.proc_grid[key][0][self.ip] + self.chunk_bounds[key][1] = self.proc_grid[key][1][self.ip] + self.chunk_bounds[key][2] = self.proc_grid[key][2][self.jp] + self.chunk_bounds[key][3] = self.proc_grid[key][3][self.jp] + self.chunk_bounds[key][4] = self.proc_grid[key][4][self.kp] + self.chunk_bounds[key][5] = self.proc_grid[key][5][self.kp] + self.chunk_size[key] = [None]*3 + self.chunk_size[key][0] = self.chunk_bounds[key][1]-self.chunk_bounds[key][0]+1 + self.chunk_size[key][1] = self.chunk_bounds[key][3]-self.chunk_bounds[key][2]+1 + self.chunk_size[key][2] = self.chunk_bounds[key][5]-self.chunk_bounds[key][4]+1 + # Verify that local grid size is not smaller than ghost cell size + assert (self.chunk_size[key][0]>=self.nghx and + self.chunk_size[key][1]>=self.nghy and + self.chunk_size[key][2]>=self.nghz), "Local grid size must be greater than number "\ + "of ghost cells in each direction!" + # Initialize neighbor array + nghbr = np.empty((3,3,3),dtype='int') + # wrap-around x + ipl = self.ip-1 + if ipl<0: + if self.xperiodic: ipl = self.nxp-1 + else: ipl = -1 + ipr = self.ip+1 + if ipr>self.nxp-1: + if self.xperiodic: ipr = 0 + else: ipr = -1 + inghbr = (ipl,self.ip,ipr) + # wrap-around y + jpl = self.jp-1 + if jpl<0: + if self.yperiodic: jpl = self.nyp-1 + else: jpl = -1 + jpr = self.jp+1 + if jpr>self.nyp-1: + if self.yperiodic: jpr = 0 + else: jpr = -1 + jnghbr = (jpl,self.jp,jpr) + # wrap-around z + kpl = self.kp-1 + if kpl<0: + if self.zperiodic: kpl = self.nzp-1 + else: kpl = -1 + kpr = self.kp+1 + if kpr>self.nzp-1: + if self.zperiodic: kpr = 0 + else: kpr = -1 + knghbr = (kpl,self.kp,kpr) + # Construct array of neighbors + for ip in range(3): + for jp in range(3): + for kp in range(3): + # Assign rank to neighbor array + if inghbr[ip]<0 or jnghbr[jp]<0 or knghbr[kp]<0: + nghbr[ip,jp,kp] = -1 + else: + nghbr[ip,jp,kp] = self.rank_from_position(inghbr[ip],jnghbr[jp],knghbr[kp],external=False) + # Save neighbors as class variable + self.nghbr = nghbr + + def load_field(self,key,io_limit=None,barrier=False): + '''Loads the required chunks from file''' + from .field import Field3d + import numpy as np + # Block execution of some processors if IO is limited + self._baton_wait(io_limit) + # Determine which chunks are to be loaded by the current processor + ip_beg_ext = self.ip*self.chunks_per_proc[0] + jp_beg_ext = self.jp*self.chunks_per_proc[1] + kp_beg_ext = self.kp*self.chunks_per_proc[2] + ip_end_ext = ip_beg_ext+self.chunks_per_proc[0]-1 + jp_end_ext = jp_beg_ext+self.chunks_per_proc[1]-1 + kp_end_ext = kp_beg_ext+self.chunks_per_proc[2]-1 + # Get the total size of the field to be loaded + ib = self.proc_grid_ext[key][0][ip_beg_ext] + ie = self.proc_grid_ext[key][1][ip_end_ext] + jb = self.proc_grid_ext[key][2][jp_beg_ext] + je = self.proc_grid_ext[key][3][jp_end_ext] + kb = self.proc_grid_ext[key][4][kp_beg_ext] + ke = self.proc_grid_ext[key][5][kp_end_ext] + nxl = ie-ib+1 + nyl = je-jb+1 + nzl = ke-kb+1 + # Allocate an array to hold the entire field + data = np.empty( + (nxl+2*self.nghx, + nyl+2*self.nghy, + nzl+2*self.nghz),dtype=self.precision) + # Compute origin of subfield + origin = (self.origin[key][0]+(ib-1-self.nghx)*self.spacing[0], + self.origin[key][1]+(jb-1-self.nghy)*self.spacing[1], + self.origin[key][2]+(kb-1-self.nghz)*self.spacing[2]) + # Create a Field3d + self.field[key] = Field3d(data,origin,self.spacing) + # Go through each chunk to be read and construct the field + for ip_ext in range(ip_beg_ext,ip_end_ext+1): + for jp_ext in range(jp_beg_ext,jp_end_ext+1): + for kp_ext in range(kp_beg_ext,kp_end_ext+1): + # Determine rank of the chunk to be read + rank_ext = self.rank_from_position(ip_ext,jp_ext,kp_ext,external=True) + # Compute bounds of this chunk + ib_ext = self.proc_grid_ext[key][0][ip_ext] + ie_ext = self.proc_grid_ext[key][1][ip_ext] + jb_ext = self.proc_grid_ext[key][2][jp_ext] + je_ext = self.proc_grid_ext[key][3][jp_ext] + kb_ext = self.proc_grid_ext[key][4][kp_ext] + ke_ext = self.proc_grid_ext[key][5][kp_ext] + nxl_ext = ie_ext-ib_ext+1 + nyl_ext = je_ext-jb_ext+1 + nzl_ext = ke_ext-kb_ext+1 + # Read data and insert it + subfield = self.func_load(rank_ext,key) + self.field[key].insert_subfield(subfield) + # Continue execution of waiting processors if IO was limited + self._baton_pass(io_limit) + # Exchange ghost cells + self.exchange_ghost_cells(key) + # Initialize symmetries and impose BC + self.init_symmetries(key) + self.impose_boundary_conditions(key) + # Syncronize processes if requested + if barrier: self.comm.Barrier() + + def differentiate(self,key,axis,key_out=None): + assert axis<3, "'axis' must be one of 0,1,2." + if key_out is None: + key_out = key+('x','y','z')[axis] + origin = list(self.origin) + shifting_state = self.shifting_state(key,axis=axis) + if shifting_state==-1: + padding = 'after' + origin[axis] += 0.5*self.spacing[axis] + elif shifting_state==0: + padding = 'before' + origin[axis] -= 0.5*self.spacing[axis] + elif shifting_state==1: + padding = 'before' + origin[axis] -= 0.5*self.spacing[axis] + else: + raise ValueError("Invalid shifting state.") + self.field[key_out] = self.field[key].derivative(axis,padding=padding) + self.origin[key_out] = tuple(origin) + self.spacing[key_out] = self.spacing[key].copy() + self.symmetries[key_out] = self.symmetries[key].copy() + # TBD: copy everything field specific + # TBD: make sure processor distribution is fine + if axis==0: + self.symmetries[key_out][0,1,1] = -self.symmetries[key_out][0,1,1] + self.symmetries[key_out][2,1,1] = -self.symmetries[key_out][2,1,1] + elif axis==1: + self.symmetries[key_out][1,0,1] = -self.symmetries[key_out][1,0,1] + self.symmetries[key_out][1,2,1] = -self.symmetries[key_out][1,2,1] + elif axis==2: + self.symmetries[key_out][1,1,0] = -self.symmetries[key_out][1,1,0] + self.symmetries[key_out][1,1,2] = -self.symmetries[key_out][1,1,2] + # Exchange ghost cells and set boundary conditions + self.exchange_ghost_cells(key_out) + self.impose_boundary_conditions(key_out) + + def gaussian_filter(self,key,sigma,truncate=4.0,key_out=None,iterate=False): + '''Applies a gaussian filter to a field as in-place operation. Sigma is the std of the filter in terms of grid width.''' + import numpy as np + if key_out is None: + key_out = key + else: + self.origin[key_out] = self.origin[key].copy() + self.spacing[key_out] = self.spacing[key].copy() + # Compute radius of Gaussian filter + radius = self.field[key].gaussian_filter_radius(sigma,truncate=truncate) + if not iterate: + # Assert that we have sufficient amount of ghost cells + assert all([self.num_ghost[ii]>=radius[ii] for ii in range(3)]),\ + "Too few ghost cells for stencil: {}, {}".format(self.num_ghost,radius) + niter = 1 + else: + # Determine number of iterations required for current ghost cell setup + sigma = np.array(sigma) + niter = 1 + while not all([self.num_ghost[ii]>=radius[ii] for ii in range(3)]): + sigma /= np.sqrt(2) + niter *= 2 + radius = self.field[key].gaussian_filter_radius(sigma,truncate=truncate) + assert all([radius[ii]>0 if sigma[ii]>0.0 else True for ii in range(3)]),\ + "Iterative procedure leads to invalid stencil radius: "\ + "increase number of ghost cells. {}".format(radius) + print('Iterations: {}, stencil radius: {}'.format(niter,radius)) + for iiter in range(niter): + # Filter field: if key_out is None, perform operation inplace + self.field[key_out] = self.field[key].gaussian_filter(sigma, + truncate=truncate,border='constant',const_val=0.0) + # Exchange ghost cells and set boundary conditions + self.exchange_ghost_cells(key_out) + self.impose_boundary_conditions(key_out) + # Iterate inplace from now on + key = key_out + + def vtk_contour(self,key,val): + '''Compute isocontour for chunks.''' + if any([self.num_ghost[ii]>1 for ii in range(3)]): + idx_origin = tuple(self.num_ghost[ii]-1 for ii in range(3)) + numpoints = tuple(self.field[key].numpoints[ii]-2*(self.num_ghost[ii]-1) + for ii in range(3)) + return self.field[key].extract_subfield( + idx_origin,numpoints).vtk_contour(val) + else: + return self.field[key].vtk_contour(val) + return + + def vtk_slice(self,key,normal,origin): + '''Extracts a plane from field.''' + if any([self.num_ghost[ii]>1 for ii in range(3)]): + idx_origin = tuple(self.num_ghost[ii]-1 for ii in range(3)) + numpoints = tuple(self.field[key].numpoints[ii]-2*(self.num_ghost[ii]-1) + for ii in range(3)) + return self.field[key].extract_subfield( + idx_origin,numpoints).vtk_slice(normal,origin) + else: + return self.field[key].vtk_slice(normal,origin) + return + + def rank_from_position(self,ip,jp,kp,external=False): + if external: + nyp,nzp = self.nyp_ext,self.nzp_ext + else: + nyp,nzp = self.nyp,self.nzp + return ip*nyp*nzp+jp*nzp+kp + + def position_from_rank(self,rank,external=False): + if external: + nyp,nzp = self.nyp_ext,self.nzp_ext + else: + nyp,nzp = self.nyp,self.nzp + ip = rank//(nyp*nzp) + jp = (rank//nzp)%nyp + kp = rank%nzp + return (ip,jp,kp) + + def shifting_state(self,key,axis=None): + if axis is None: + return tuple(self.shifting_state(key,axis=ii) for axis in range(3)) + assert axis<3, "'axis' must be one of 0,1,2." + return int(round((self.origin[key][axis]-self.bounds[2*axis])/(0.5*self.spacing[axis]))) + + def exchange_ghost_cells(self,key): + '''Communicates all ghost cells of specified field''' + # Trigger non-blocking communication: + # Communicate faces (6 faces) + self._communicate_ghost_cells(key,(-1,0,0)) # left + self._communicate_ghost_cells(key,(+1,0,0)) # right + self._communicate_ghost_cells(key,(0,-1,0)) # down + self._communicate_ghost_cells(key,(0,+1,0)) # up + self._communicate_ghost_cells(key,(0,0,-1)) # front + self._communicate_ghost_cells(key,(0,0,+1)) # back + # Communicate edges (12 edges) + self._communicate_ghost_cells(key,(-1,-1,0)) # left,down + self._communicate_ghost_cells(key,(-1,0,-1)) # left,front + self._communicate_ghost_cells(key,(-1,+1,0)) # left,up + self._communicate_ghost_cells(key,(-1,0,+1)) # left,back + self._communicate_ghost_cells(key,(+1,-1,0)) # right,down + self._communicate_ghost_cells(key,(+1,0,-1)) # right,front + self._communicate_ghost_cells(key,(+1,+1,0)) # right,up + self._communicate_ghost_cells(key,(+1,0,+1)) # right,back + self._communicate_ghost_cells(key,(0,-1,-1)) # down,front + self._communicate_ghost_cells(key,(0,-1,+1)) # down,back + self._communicate_ghost_cells(key,(0,+1,-1)) # up,front + self._communicate_ghost_cells(key,(0,+1,+1)) # up,back + # Communicate corners (8 corners) + self._communicate_ghost_cells(key,(-1,-1,-1)) # left,down,front + self._communicate_ghost_cells(key,(-1,-1,+1)) # left,down,back + self._communicate_ghost_cells(key,(-1,+1,-1)) # left,up,front + self._communicate_ghost_cells(key,(-1,+1,+1)) # left,up,back + self._communicate_ghost_cells(key,(+1,-1,-1)) # right,down,front + self._communicate_ghost_cells(key,(+1,-1,+1)) # right,down,back + self._communicate_ghost_cells(key,(+1,+1,-1)) # right,up,front + self._communicate_ghost_cells(key,(+1,+1,+1)) # right,up,back + + def impose_boundary_conditions(self,key): + '''Imposes symmetry boundary conditions on each non-periodic wall''' + self._symmetrize_wall(key,(-1,0,0)) + self._symmetrize_wall(key,(+1,0,0)) + self._symmetrize_wall(key,(0,-1,0)) + self._symmetrize_wall(key,(0,+1,0)) + self._symmetrize_wall(key,(0,0,-1)) + self._symmetrize_wall(key,(0,0,+1)) + + def _communicate_ghost_cells(self,key,positionDst): + '''Triggers communication of ghost cells''' + import numpy as np + assert np.max(positionDst)<=1 and np.min(positionDst)>=-1, "communicate_ghost_cells: "\ + "invalid neighbor position {}".format(positionDst) + # [send/recv] get the rank of the neighbor where data is to be sent to + # The position is passed as values -1,0,+1, but need to be converted to array indices + ip_dst = positionDst[0]+1 + jp_dst = positionDst[1]+1 + kp_dst = positionDst[2]+1 + ip_src = -positionDst[0]+1 + jp_src = -positionDst[1]+1 + kp_src = -positionDst[2]+1 + rank_dst = self.nghbr[ip_dst,jp_dst,kp_dst] + rank_src = self.nghbr[ip_src,jp_src,kp_src] + # [send/recv] create a tag + tag = ip_dst*100+jp_dst*10+kp_dst + # [send/recv] get the indices of data to be sent/received + nxl,nyl,nzl = self.chunk_size[key] + if positionDst[0]==-1: + ii_src = slice(self.nghx,2*self.nghx) + ii_dst = slice(self.nghx+nxl,2*self.nghx+nxl) + elif positionDst[0]==0: + ii_src = slice(self.nghx,self.nghx+nxl) + ii_dst = slice(self.nghx,self.nghx+nxl) + elif positionDst[0]==1: + ii_src = slice(nxl,nxl+self.nghx) + ii_dst = slice(0,self.nghx) + else: + raise ValueError('Invalid direction for ghost cell exchange: {}'.format(positionDst[0])) + if positionDst[1]==-1: + jj_src = slice(self.nghy,2*self.nghy) + jj_dst = slice(self.nghy+nyl,2*self.nghy+nyl) + elif positionDst[1]==0: + jj_src = slice(self.nghy,self.nghy+nyl) + jj_dst = slice(self.nghy,self.nghy+nyl) + elif positionDst[1]==1: + jj_src = slice(nyl,nyl+self.nghy) + jj_dst = slice(0,self.nghy) + else: + raise ValueError('Invalid direction for ghost cell exchange: {}'.format(positionDst[1])) + if positionDst[2]==-1: + kk_src = slice(self.nghz,2*self.nghz) + kk_dst = slice(self.nghz+nzl,2*self.nghz+nzl) + elif positionDst[2]==0: + kk_src = slice(self.nghz,self.nghz+nzl) + kk_dst = slice(self.nghz,self.nghz+nzl) + elif positionDst[2]==1: + kk_src = slice(nzl,nzl+self.nghz) + kk_dst = slice(0,self.nghz) + else: + raise ValueError('Invalid direction for ghost cell exchange: {}'.format(positionDst[2])) + # [send/recv] Create a list for requests + reqsend = None + reqrecv = None + # [send] now send the data to the neighbor, but only if there is one! + # Communication must be done non-blocking, and we can use upper-case routines since this is a numpy array + if rank_dst>=0: + sendbuf = np.ascontiguousarray(self.field[key].data[ii_src,jj_src,kk_src]) + reqsend = self.comm.Isend(sendbuf,dest=rank_dst,tag=tag) + # [recv] the corresponding receive: results are stored in a buffer which will be assigned to the parent array later + if rank_src>=0: + recvbuf = np.zeros( + (ii_dst.stop-ii_dst.start, + jj_dst.stop-jj_dst.start, + kk_dst.stop-kk_dst.start),dtype=self.precision) + reqrecv = self.comm.Irecv(recvbuf,source=rank_src,tag=tag) + # [recv] wait for data to be received + if reqrecv is not None: + reqrecv.wait() + self.field[key].data[ii_dst,jj_dst,kk_dst] = recvbuf + # [send] wait for data to be sent + if reqsend is not None: + reqsend.wait() + + def _symmetrize_wall(self,key,positionBd): + import numpy as np + assert np.max(positionBd)<=1 and np.min(positionBd)>=-1, "symmetrize_wall: invalid neighbor "\ + "position {}".format(positionBd) + assert np.count_nonzero(positionBd)==1, "symmetrize_wall: only face direction is accepted "\ + "(no edges or corners)" + inghbr = positionBd[0]+1 + jnghbr = positionBd[1]+1 + knghbr = positionBd[2]+1 + if self.nghbr[inghbr,jnghbr,knghbr]<0: + sl_dst = [slice(None),slice(None),slice(None)] + sl_src = [slice(None),slice(None),slice(None)] + for axis in range(3): + if positionBd[axis]==-1: # lower boundary + # index of first point within the domain + idx = self.field[key].nearest_gridpoint(self.bounds[2*axis],axis=axis,lower=True)+1 + # distance first point to wall: should be either 0 or dx/2 + dist = np.abs(self.field[key].coordinate(idx,axis=axis)-self.bounds[2*axis]) + if dist>=0.25*self.spacing[axis]: # no point on boundary + sl_dst[axis] = slice(0,idx,1) + sl_src[axis] = slice(2*idx-1,idx-1,-1) + else: # point on boundary + sl_dst[axis] = slice(0,idx,1) + sl_src[axis] = slice(2*idx,idx,-1) + break + elif positionBd[axis]==1: # upper boundary + # index of last point within the domain + idx = self.field[key].nearest_gridpoint(self.bounds[2*axis+1],axis=axis,lower=True) + # distance last point to wall: should be either 0 or -dx/2 + dist = np.abs(self.field[key].coordinate(idx,axis=axis)-self.bounds[2*axis+1]) + if dist>=0.25*self.spacing[axis]: # no point on boundary + sl_dst[axis] = slice(idx+1,self.field[key].numpoints[axis],1) + sl_src[axis] = slice(idx,2*idx+1-self.field[key].numpoints[axis],-1) + else: # point on boundary + sl_dst[axis] = slice(idx+1,self.field[key].numpoints[axis],1) + sl_src[axis] = slice(idx-1,2*idx-self.field[key].numpoints[axis],-1) + break + self.field[key].data[tuple(sl_dst)] = self.symmetries[key][inghbr,jnghbr,knghbr]*\ + self.field[key].data[tuple(sl_src)] + + def _baton_wait(self,batch_size,tag=420): + '''Block execution until an empty message from rank-batch_wait + is received (issued by _baton_pass)''' + from mpi4py import MPI + if batch_size is not None: + if self.rank>=batch_size: + source = self.rank-batch_size + self.comm.recv(source=source,tag=tag) + + def _baton_pass(self,batch_size,tag=420): + '''Sends an empty message to rank+batch_wait to unblock its + execution (issued by _baton_wait)''' + from mpi4py import MPI + if batch_size is not None: + dest = self.rank+batch_size + if dest=self.size: + if self.barrier: self.comm.Barrier() + raise StopIteration + if self.rank==self.root: + if self.rank==self.iter: + r = self.data + else: + r = self.comm.recv(source=self.iter,tag=0) + else: + if self.rank==self.iter: + self.comm.send(self.data,dest=self.root,tag=0) + r = None + self.iter += 1 + return r + +def parprint(*args, **kwargs): + from mpi4py import MPI + rank = kwargs.pop('rank') if 'rank' in kwargs else 0 + comm = kwargs.pop('comm') if 'comm' in kwargs else MPI.COMM_WORLD + if get_rank(comm=comm)==rank: + print(*args, **kwargs) + +def get_rank(comm=None): + from mpi4py import MPI + comm = MPI.COMM_WORLD if comm is None else comm + return comm.Get_rank() + +def is_root(comm=None,root=0): + from mpi4py import MPI + comm = MPI.COMM_WORLD if comm is None else comm + return comm.Get_rank()==root + +def gather(data,comm=None): + from mpi4py import MPI + comm = MPI.COMM_WORLD if comm is None else comm + return comm.gather(data,root=0) \ No newline at end of file diff --git a/ucf.py b/ucf.py index 6fcf8e2..868cbf7 100644 --- a/ucf.py +++ b/ucf.py @@ -460,6 +460,7 @@ class UCF: self.__currentSetParams = 0 self.__currentSetNumElements = 0 +# TBD: refactor -> UCFCollection, UCFTarFile constructors etc class UCFTarFile: '''Superclass for instances of ucf.tar files.''' def __init__(self,file_tar,file_index=None): @@ -511,6 +512,33 @@ class UCFTarFile: period = params.getfloat('geometry',key_bound[axis]) isPeriodic = params.getboolean('geometry',key_period[axis]) return period if isPeriodic else None + def procgrid(self): + self._initialize_buffer_proc() + proc = {} + offset = 0 + for key in ('u','v','w','p','s'): + proc[key] = tuple(self._buffer_proc[offset]) + offset += 1 + return proc + def origin(self): + self._initialize_buffer_grid() + origin = {} + offset = 0 + for key in ('u','v','w','p','s'): + origin[key] = ( + self._buffer_grid[offset][0][0], + self._buffer_grid[offset][1][0], + self._buffer_grid[offset][2][0]) + offset +=1 + return origin + def spacing(self): + self._initialize_buffer_grid() + spacing = ( + self._buffer_grid[0][0][1]-self._buffer_grid[0][0][0], + self._buffer_grid[0][1][1]-self._buffer_grid[0][1][0], + self._buffer_grid[0][2][1]-self._buffer_grid[0][2][0], + ) + return spacing def gidx_from_key(self,key): if key[0]=='u': return 0 elif key[0]=='v': return 1 @@ -569,18 +597,10 @@ class UCFSnapshot(UCFTarFile): dx,dy,dz = (grid[0][1]-grid[0][0], grid[1][1]-grid[1][0], grid[2][1]-grid[2][0]) - xo,yo,zo = (grid[0][chunk['ibeg']]-chunk['ighost']*dx, - grid[1][chunk['jbeg']]-chunk['ighost']*dy, - grid[2][chunk['kbeg']]-chunk['ighost']*dz) - # Per convention periodicity requires ghost cells for Field3d - if keep_ghost: - period = tuple(self.period(ii) if self.nproc(axis=ii)==1 - else None for ii in range(0,3)) - else: - period = (None,None,None) - return Field3d(chunk['data'],(xo,yo,zo),(dx,dy,dz),period) - def field_complete(self): - return + xo,yo,zo = (grid[0][chunk['ibeg']-1]-(chunk['ighost'])*dx, + grid[1][chunk['jbeg']-1]-(chunk['ighost'])*dy, + grid[2][chunk['kbeg']-1]-(chunk['ighost'])*dz) + return Field3d(chunk['data'],(xo,yo,zo),(dx,dy,dz)) def particles(self,select_col=None): '''Returns particle data as Particles object.''' from .particle import Particles @@ -607,10 +627,10 @@ class UCFSnapshot(UCFTarFile): return ijk[0]*self.nproc(axis=1)*self.nproc(axis=2)+ijk[1]*self.nproc(axis=2)+ijk[2] def dset_from_key(self,key): assert self.has_key(key), "Snapshot does not contain requested key." - if key[0]=='u': return 0 - elif key[0]=='v': return 1 - elif key[0]=='w': return 2 - elif key[0]=='p': return 3 + if key[0]=='u': return 1 + elif key[0]=='v': return 2 + elif key[0]=='w': return 3 + elif key[0]=='p': return 4 elif key[0]=='s': return int(key[1:]) else: raise ValueError("Invalid value of 'key'.") def has_key(self,key):