import numpy as np class Particles: def __init__(self,num,time,attr,period): '''Initialize a Particles object. num number of particles time time value attr dictionary of np arrays with length equal to num period the domain periods ''' assert 'id' in attr, "Need attribute 'id' to initialize Particles" assert 'x' in attr, "Need attribute 'x' to initialize Particles" assert 'y' in attr, "Need attribute 'y' to initialize Particles" assert 'z' in attr, "Need attribute 'z' to initialize Particles" assert isinstance(period,(tuple,list)) and len(period)==3, "'period' must be tuple/list with length 3" self.num = num if isinstance(time,(tuple,list,np.ndarray)): assert len(time)==1, "'time' must be a scalar value." time = time[0] self.time = time self.attr = {} sortidx = attr['id'].argsort() for key in attr: assert attr[key].ndim==1, "Attributes must be a one-dimensional np array." assert attr[key].shape[0]==num, "Attributes must be of length 'num'." if key=='id': self.add_attribute(key,attr[key][sortidx].astype('int')) else: self.add_attribute(key,attr[key][sortidx]) self.period = tuple(period) self.frame_velocity = np.zeros(3) return @classmethod def from_array(cls,pp,col,time,period,select_col=None): assert 'id' in col, "Need column 'id' to initialize Particles" assert 'x' in col, "Need column 'x' to initialize Particles" assert 'y' in col, "Need column 'y' to initialize Particles" assert 'z' in col, "Need column 'z' to initialize Particles" assert pp.ndim==2 or pp.shape[2]==1, "'pp' cannot be a time series." num = pp.shape[1] if pp.ndim==2: pp = np.expand_dims(pp,axis=2) if select_col is not None: for scol in select_col: assert scol in col, "Selected column '{}' not found in 'col'.".format(scol) attr = {} for key in col: if (select_col is None or key in select_col or key in ('id','x','y','z')): attr[key] = pp[col[key],:,0].squeeze() return cls(num,time,attr,period) @classmethod def from_mat(cls,file,select_col=None): from .helper import load_mat pp = load_mat(file,'pp',cast_integer=False) # int casting is expensive and useless on potentially large array col,time,ccinfo = load_mat(file,['colpy','time','ccinfo'],cast_integer=True) period = [None,None,None] if bool(ccinfo['xperiodic']): period[0]=ccinfo['b'] if bool(ccinfo['yperiodic']): period[1]=ccinfo['d'] if bool(ccinfo['zperiodic']): period[2]=ccinfo['f'] for key in col: col[key]-=1 return cls.from_array(pp,col,time,period,select_col=select_col) @classmethod def from_ucf(cls,file,period,select_col=None): from .ucf import read_particles (pp,col,time) = read_particles(file,step=1,verbosity=False,debug=False) return cls.from_array(pp,col,time,period,select_col=select_col) @classmethod def from_position(cls,x,y,z,time,period): assert x.ndim==1 and y.ndim==1 and z.ndim==1,\ "'x','y','z' must be one dimensional arrays of length Np." assert len(x)==len(y) and len(y)==len(z) num = len(x) attr = {'id': np.arange(1,num+1),'x': np.asarray(x),'y': np.asarray(y),'z': np.asarray(z)} return cls(num,time,attr,period) def __getitem__(self,val): if isinstance(val,int): lo,hi = val,val+1 hi = hi if hi!=0 else None val = slice(lo,hi) return self._slice(val) def __str__(self): str = '{:d} particles with\n'.format(self.num) str+= ' time: {}\n'.format(self.time) str += ' attributes:' for key in self.attr.keys(): str += ' {}'.format(key) str += '\n' str+= ' period: {}\n'.format(self.period) str+= ' frame velocity: {}'.format(self.frame_velocity) return str def _slice(self,slice_part=None): assert slice_part is None or \ isinstance(slice_part,slice) or \ (isinstance(slice_part,np.ndarray) and slice_part.dtype==bool and slice_part.shape[0]==self.num),\ "Slicing requires int, slice or logical array." attr = {} for key in self.attr: attr[key] = self.attr[key][slice_part] num = attr['id'].shape[0] return Particles(num,self.time,attr,self.period) def copy(self): attr = {} for key in self.attr: attr[key] = self.attr[key].copy() return Particles(self.num,self.time,attr,self.period) def add_attribute(self,key,val): if isinstance(val,(tuple,list,np.ndarray)): assert len(val)==self.num and val.ndim==1, "Invalid 'val'." self.attr[key] = np.array(val) else: self.attr[key] = np.full(self.num,val) return def del_attribute(self,key): del self.attr[key] def get_attribute(self,key): return self.attr[key] def get_position(self,axis=None): if axis is None: return np.vstack([self.attr[key].copy() for key in ('x','y','z')]) else: assert axis<3, "'axis' must be smaller than 3." key = ('x','y','z')[axis] return self.attr[key].copy() def has_attribute(self,key): return key in self.attr def translate(self,translation,axis=0): '''Translates particles. Periodicity must be enforced manually.''' assert axis<3, "'axis' must be smaller than 3." key = ('x','y','z')[axis] self.attr[key] += translation return def set_frame_velocity(self,val,axis=0): '''Adjust frame of reference by translating particles by time*frame_velocity and subtracting frame_velocity from particle velocities.''' assert axis<3, "'axis' must be smaller than 3." valdiff = val-self.frame_velocity[axis] self.translate(-self.time*valdiff,axis=axis) key = ('u','v','w')[axis] if self.has_attribute(key): self.attr[key] -= valdiff self.frame_velocity[axis] = val return def enforce_periodicity(self,axis=None): if axis is None: for ii in range(3): self.enforce_periodicity(axis=ii) else: if self.period[axis] is not None: key = ('x','y','z')[axis] self.attr[key] %= self.period[axis] return def extend_periodic(self,rep=1,axis=0): '''Duplicates particle along a periodic direction.''' assert axis<3, "'axis' must be smaller than 3." assert self.period[axis] is not None, "Cannot duplicate along non-periodic direction." npart = self.num*(rep+1) period = list(self.period) period[axis] *= (rep+1) attr = {} for key in self.attr: attr[key] = np.empty(npart,dtype=self.attr[key].dtype) offset = 0 for irep in range(rep+1): attr['id'][offset:offset+self.num] = self.attr['id']+offset for key in self.attr: if key=='id': continue attr[key][offset:offset+self.num] = self.attr[key] attr[('x','y','z')[axis]][offset:offset+self.num] += irep*self.period[axis] offset += self.num return Particles(npart,self.time,attr,period) def position_with_duplicates(self,ipart,padding=0.0): pos = np.array( (self.attr['x'][ipart], self.attr['y'][ipart], self.attr['z'][ipart])) rp = self.attr['r'][ipart]+padding posd = [pos.copy()] for axis in range(3): if self.period[axis] is not None: nposd = len(posd) if pos[axis]-rp<0.0: for ii in range(nposd): tmp = posd[ii].copy() tmp[axis] = tmp[axis]+self.period[axis] posd.append(tmp) if pos[axis]+rp>self.period[axis]: for ii in range(nposd): tmp = posd[ii].copy() tmp[axis] = tmp[axis]-self.period[axis] posd.append(tmp) return posd def mask_field(self,fld,cval=np.nan,padding=0.0): '''Fills grid points which lie inside of solid phase with values.''' assert self.has_attribute('r'), "Attribute 'r' required." from numba import jit # Define functions for efficient duplication and masking def __duplicate(xyzr,period,axis): li = (xyzr[:,axis]-xyzr[:,3])<0.0 dupl_lo = xyzr[li,:].copy() dupl_lo[:,axis] += period li = (xyzr[:,axis]+xyzr[:,3])>period dupl_hi = xyzr[li,:].copy() dupl_hi[:,axis] -= period return np.concatenate((xyzr,dupl_lo,dupl_hi),axis=0) @jit(nopython=True) def __mask(origin,spacing,data,cval,xyzr): npart = xyzr.shape[0] nx,ny,nz = data.shape for ipart in range(npart): xp = xyzr[ipart,0] yp = xyzr[ipart,1] zp = xyzr[ipart,2] rp = xyzr[ipart,3] ib = int(np.ceil((xp-rp-origin[0])/spacing[0])) jb = int(np.ceil((yp-rp-origin[1])/spacing[1])) kb = int(np.ceil((zp-rp-origin[2])/spacing[2])) ie = min(nx,ib+int(np.ceil(2*rp/spacing[0]))) je = min(ny,jb+int(np.ceil(2*rp/spacing[1]))) ke = min(nz,kb+int(np.ceil(2*rp/spacing[2]))) ib = max(ib,0) jb = max(jb,0) kb = max(kb,0) for ii in range(ib,ie): x_ = origin[0]+ii*spacing[0] for jj in range(jb,je): y_ = origin[1]+jj*spacing[1] for kk in range(kb,ke): z_ = origin[2]+kk*spacing[2] dsq = (x_-xp)**2+(y_-yp)**2+(z_-zp)**2 if dsq<=rp**2: data[ii,jj,kk] = cval return # Duplicate particles in all periodic directions xyzr = np.stack((self.attr['x'],self.attr['y'],self.attr['z'],self.attr['r']+padding),axis=1) for axis in range(3): if self.period[axis] is None: continue xyzr = __duplicate(xyzr,self.period[axis],axis) # Mask values __mask(fld.origin,fld.spacing,fld.data,cval,xyzr) return def clip(self,position,axis,invert=False): '''Clips particles by a plane with normal pointing direction specified by 'axis'.''' return self.threshold(('x','y','z')[axis],position,invert=(not invert)) def clip_box(self,bounds,invert=False): '''Extracts particls within a bounding box.''' li = np.logical_and(np.logical_and( np.logical_and(self.attr['x']>=bounds[0],self.attr['x']<=bounds[1]), np.logical_and(self.attr['y']>=bounds[2],self.attr['y']<=bounds[3])), np.logical_and(self.attr['z']>=bounds[4],self.attr['z']<=bounds[5])) if invert: np.logical_not(li,li) return self._slice(li) def threshold(self,key,val,invert=False): '''Returns particles for which specified attribute is above, or in case invert=True below, the specified threshold.''' assert key in self.attr, "'key' not found in attr." if invert: li = self.attr[key]=val return self._slice(li) def to_vtk(self,deep=False): import pyvista as pv position = np.vstack([self.attr[key] for key in ('x','y','z')]).transpose() mesh = pv.PolyData(position,deep=deep) for key in self.attr: mesh[key] = self.attr[key] return mesh def glyph(self,theta_resolution=30,phi_resolution=30,deep=False): import pyvista as pv assert self.has_attribute('r'), "Attribute 'r' required." geom = pv.Sphere(radius=1,theta_resolution=theta_resolution, phi_resolution=phi_resolution) return self.to_vtk(deep=deep).glyph(scale='r',factor=1,geom=geom) class Trajectories: def __init__(self,part,unraveled=False,copy_particles=False): self.numtime = 1 self.numpart = part.num self.copy_particles = copy_particles # Attributes are first stored as a list of view to instances of # Particles. Later they will be concatenated into np array # and the attributes of Particles will be views. self.attr = {} self.part = [] self._is_data_list = True if copy_particles: self.part.append(part.copy()) else: self.part.append(part) for key in self.part[0].attr: self.attr[key] = [self.part[0].attr[key].view()] self.period = self.part[0].period self.unraveled = unraveled self.frame_velocity = np.zeros(3) def __str__(self): str = 'Trajectories with\n' str+= ' time steps: {:d}\n'.format(self.numtime) str+= ' particles: {:d}\n'.format(self.numpart) str += ' attributes:' for key in self.part[0].attr.keys(): str += ' {}'.format(key) str+= '\n' str+= ' frame velocity: {}\n'.format(self.frame_velocity) str+= ' unraveled: {}'.format(self.unraveled) return str def __iadd__(self,other): if isinstance(other,Trajectories): self.add_trajectories(other) elif isinstance(other,Particles): self.add_particles(other) else: raise TypeError('Cannot add type {}'.format(type(other))) return self def __getitem__(self,val): assert isinstance(val,tuple) and len(val)==2, "Trajectories must be indexed by [part,time]." sl = [] for x in val: if isinstance(x,int): lo,hi = x,x+1 hi = hi if hi!=0 else None sl.append(slice(lo,hi)) elif isinstance(x,slice): sl.append(x) else: raise TypeError("Trajectories can only be sliced by slice objects or integers.") return self._slice(slice_part=sl[0],slice_time=sl[1]) @classmethod def from_mat(cls,file,unraveled=False,select_col=None,varpp='pp'): from .helper import load_mat pp = load_mat(file,varpp,cast_integer=False) # int casting is expensive and useless on potentially large array col,time,ccinfo = load_mat(file,['colpy','time','ccinfo'],cast_integer=True) period = [None,None,None] if bool(ccinfo['xperiodic']): period[0]=ccinfo['b'] if bool(ccinfo['yperiodic']): period[1]=ccinfo['d'] if bool(ccinfo['zperiodic']): period[2]=ccinfo['f'] for key in col: col[key]-=1 return cls.from_array(pp,col,time,period,unraveled=unraveled,select_col=select_col) @classmethod def from_array(cls,pp,col,time,period,unraveled=False,select_col=None): ntime = len(time) traj = cls(Particles.from_array(pp[:,:,0],col,time[0],period,select_col=select_col),unraveled=unraveled) for ii in range(1,ntime): traj += Particles.from_array(pp[:,:,ii],col,time[ii],period,select_col=select_col) return traj def _slice(self,slice_part=slice(None),slice_time=slice(None)): assert isinstance(slice_part,slice), "'slice_part' must be a slice object." assert isinstance(slice_time,slice), "'slice_time' must be a slice object." parts = self.part[slice_time] assert len(parts)>0, "Slicing yielded zero time steps." for ii,part in enumerate(parts): if ii==0: traj = Trajectories(part[slice_part],unraveled=self.unraveled) else: traj += Trajectories(part[slice_part]) traj.frame_velocity = self.frame_velocity return traj def add_particles(self,part): # Verify part assert part.time>self.part[-1].time, "Time steps must be added in monotonically increasing order." assert part.num==self.numpart, "Number of particles is different from previous time steps." assert all(part.attr['id']==self.part[0].attr['id']), "Particle IDs differ or a not in the same order." assert all([part.period[ii]==self.period[ii] for ii in range(0,3)]), "Period differs!" for key in self.attr: assert key in part.attr, "Particles to be added are missing attribute '{}'".format(key) for key in part.attr: assert key in self.attr, "Particles to be added have additional attribute '{}'".format(key) # Add new data self._make_data_list() if self.copy_particles: self.part.append(part.copy()) else: self.part.append(part) for key in self.attr: self.attr[key].append(self.part[-1].attr[key].view()) self.numtime += 1 return def add_trajectories(self,traj): # Verify traj assert traj.part[0].time>self.part[-1].time, "Time steps must be added in monotonically increasing order." assert traj.numpart==self.numpart, "Number of particles is different from previous time steps." assert all([traj.period[ii]==self.period[ii] for ii in range(0,3)]), "Period differs!" for key in self.attr: assert key in traj.attr, "Trajectories to be added are missing attribute '{}'".format(key) for key in traj.attr: assert key in self.attr, "Trajectories to be added have additional attribute '{}'".format(key) # Add new data self._make_data_list() traj._make_data_list() if self.copy_particles: self.part += deepcopy(traj.part) else: self.part += traj.part for ii in range(0,traj.numtime): for key in self.attr: self.attr[key].append(self.part[-traj.numtime+ii].attr[key].view()) self.numtime += traj.numtime return def get_time(self): time = np.zeros(self.numtime) for ii,part in enumerate(self.part): time[ii] = part.time return time def get_trajectories(self,slice_part=slice(None),slice_time=slice(None)): '''Get (x,y,z) trajectories in np array of dimension (3,npart,ntime).''' assert isinstance(slice_part,slice), "'slice_part' must be a slice." assert isinstance(slice_time,slice), "'slice_time' must be a slice." self._make_data_array() return np.stack([ self.attr['x'][slice_part,slice_time], self.attr['y'][slice_part,slice_time], self.attr['z'][slice_part,slice_time]],axis=0) def get_trajectories_segmented(self,slice_part=slice(None),slice_time=slice(None),restore_ravel_state=True): '''Get (x,y,z) segments of trajectories as a tuple (len: npart) of lists (len: nsegments) of np arrays (shape: 3 X ntime of the segment)''' assert isinstance(slice_part,slice), "'slice_part' must be a slice." assert isinstance(slice_time,slice), "'slice_time' must be a slice." was_unraveled = self.unraveled self.unravel() xyzpath = self.get_trajectories(slice_part=slice_part,slice_time=slice_time) npart = xyzpath.shape[1] ntime = xyzpath.shape[2] # Initialize output and helper arrays out = tuple([] for ii in range(npart)) lastJump = np.zeros(npart,dtype=np.uint) lastPeriod = np.zeros((3,npart),dtype=np.int) for axis in range(3): if self.period[axis] is not None: lastPeriod[axis,:] = np.floor_divide(xyzpath[axis,:,0],self.period[axis]) # Construct output tuple for itime in range(1,ntime+1): thisPeriod = np.zeros((3,npart),dtype=int) if itime==ntime: hasJumped = np.ones(npart,dtype=bool) else: for axis in range(3): if self.period[axis] is not None: thisPeriod[axis,:] = np.floor_divide(xyzpath[axis,:,itime],self.period[axis]) hasJumped = np.any(thisPeriod!=lastPeriod,axis=0) for ipart in range(npart): if hasJumped[ipart]: sl = slice(lastJump[ipart],itime) segment = xyzpath[:,ipart,sl].copy() for axis in range(3): if self.period[axis] is not None: segment[axis,:] -= lastPeriod[axis,ipart]*self.period[axis] out[ipart].append(segment) lastJump[ipart] = itime lastPeriod = thisPeriod # Restore previous ravel state if restore_ravel_state and not was_unraveled: self.enforce_periodicity() return out def slice_time(self,time_beg=None,time_end=None): time = self.get_time() if time_beg is not None: idx_beg = np.searchsorted(time,time_beg,side='left') else: idx_beg = None if time_end is not None: idx_end = np.searchsorted(time,time_end,side='left') else: idx_end = None sl = slice(idx_beg,idx_end+1) return self._slice(slice_time=sl) def set_frame_velocity(self,val,axis=0): assert axis<3, "'axis' must be smaller than 3." for part in self.part: part.set_frame_velocity(val,axis=axis) self.frame_velocity[axis] = val if not self.unraveled: self.enforce_periodicity() return def unravel(self): if self.unraveled: return #raise NotImplementedError('Implemented but untested!') self._make_data_array() for axis in range(0,3): if self.period[axis] is not None: key = ('x','y','z')[axis] #posdiff = (self.part[ii].attr[key]-self.part[ii-1].attr[key]).squeeze() posdiff = self.attr[key][:,1:]-self.attr[key][:,0:-1] coeff = -np.sign(posdiff)*(np.abs(posdiff)>0.5*self.period[axis]) coeff = np.cumsum(coeff,axis=1) self.attr[key][:,1:] += coeff*self.period[axis] self.unraveled = True return def enforce_periodicity(self): for part in self.part: part.enforce_periodicity(axis=None) self.unraveled = False return def to_vtk(self,slice_part=slice(None),slice_time=slice(None),force_ravel=False): import pyvista as pv mesh = pv.PolyData() if force_ravel or not self.unraveled: for part in self.get_trajectories_segmented(slice_part=slice_part,slice_time=slice_time): for seg in part: mesh += pv.helpers.lines_from_points(seg.transpose()) else: tmp = self.get_trajectories(slice_part=slice_part,slice_time=slice_time) for ipart in range(self.numpart): mesh += pv.helpers.lines_from_points(np.array([tmp[0][ipart],tmp[1][ipart],tmp[2][ipart]]).transpose()) return mesh def _make_data_array(self): '''Transforms self.attr[key] to a np array of dimension npart X ntime and updates self.part[itime].attr[key] to views on self.attr[key][:,itime].''' #print('DEBUG: _make_data_array') if not self._is_data_list: return #print([x.shape for x in self.attr['x']]) for key in self.attr: self.attr[key] = np.stack(self.attr[key],axis=1) for ii in range(0,self.numtime): self.part[ii].attr[key] = self.attr[key][:,ii] #print(self.attr['x'].shape) self._is_data_list = False return def _make_data_list(self): '''Transforms self.attr[key] to a list of length ntime of np arrays of dimension npart and updates self.part[itime].attr[key] to views on self.attr[key][itime].''' #print('DEBUG: _make_data_list') if self._is_data_list: return #print(self.attr['x'].shape) for key in self.attr: self.attr[key] = [np.squeeze(x) for x in np.split(self.attr[key],self.numtime,axis=1)] for ii in range(0,self.numtime): self.part[ii].attr[key] = self.attr[key][ii].view() #print([x.shape for x in self.attr['x']]) self._is_data_list = True return def sort(pp,col): ncol,npart,ntime = pp.shape assert('id' in col) for itime in range(0,ntime): idx = pp[col['id'],:,itime].squeeze().argsort() pp[:,:,itime] = pp[:,idx,itime] return pp # def mask(pp,col,field,reconstruct=False,cval=np.nan,padding=0.0): # from .field import Field3d # '''Fills grid points which lie inside of solid phase with values.''' # # Loop through local particles # for ipart in range(0,self.__npartl): # # Slice a box from the field around the particle # xp,yp,zp,rp = (self.particles[self.col['x'],ipart], # self.particles[self.col['y'],ipart], # self.particles[self.col['z'],ipart], # self.particles[self.col['r'],ipart]) # rp += padding*self.__dx[key] # # Get coefficients for reconstruction # if reconstruct: # coeff_lin = self.particles[self.col[key],ipart] # if key=='u': # coeff_rotx = 0.0 # coeff_roty = -self.particles[self.col['oz'],ipart] # coeff_rotz = self.particles[self.col['oy'],ipart] # elif key=='v': # coeff_rotx = self.particles[self.col['oz'],ipart] # coeff_roty = 0.0 # coeff_rotz = -self.particles[self.col['ox'],ipart] # elif key=='w': # coeff_rotx = -self.particles[self.col['oy'],ipart] # coeff_roty = self.particles[self.col['ox'],ipart] # coeff_rotz = 0.0 # else: # coeff_rotx = 0.0 # coeff_roty = 0.0 # coeff_rotz = 0.0 # # Get bounding box of particle # idx_x = np.nonzero((xg>=xp-rp) & (xg<=xp+rp))[0] # idx_y = np.nonzero((yg>=yp-rp) & (yg<=yp+rp))[0] # idx_z = np.nonzero((zg>=zp-rp) & (zg<=zp+rp))[0] # # Triple for loop # for ii in range(idx_x[0],idx_x[-1]+1): # Dx = xg[ii]-xp # for jj in range(idx_y[0],idx_y[-1]+1): # Dy = yg[jj]-yp # for kk in range(idx_z[0],idx_z[-1]+1): # Dz = zg[kk]-zp # isInside = Dx*Dx+Dy*Dy+Dz*Dz <= rp*rp # if isInside: # if reconstruct: # self.field[key][ii,jj,kk] = coeff_lin + coeff_rotx*Dx + coeff_roty*Dy + coeff_rotz*Dz # else: # self.field[key][ii,jj,kk] = cval def slice_columns(pp,col,keys): idx_col = [] col_new = {} ii = 0 for key in keys: idx_col.append(col[key]) col_new[key] = ii ii+=1 return pp[idx_col,:,:], col_new def translate_circular(pp,col,translation,bounds,axis=0): '''Translates particles while taking into account the bounding box''' assert(axis<3) L = bounds[2*axis+1] keys = ('x','y','z') pp[col[keys[axis]],:,:] = (pp[col[keys[axis]],:,:]+translation)%L return pp