class Particles: def __init__(self,pp,col,time,period=(None,None,None),select_col=None): import numpy assert 'id' in col, "Need column 'id' to initialize Particles" assert 'x' in col, "Need column 'x' to initialize Particles" assert 'y' in col, "Need column 'y' to initialize Particles" assert 'z' in col, "Need column 'z' to initialize Particles" assert pp.ndim==2 or pp.shape[2]==1, "'pp' cannot be a time series." self.num = pp.shape[1] if pp.ndim==2: pp = numpy.expand_dims(pp,axis=2) if select_col is not None: for scol in select_col: assert scol in col, "Selected column '{}' not found in 'col'.".format(scol) # Copy attributes and sort by ID self.attr = {} sortidx = pp[col['id'],:,0].argsort() idsorted = pp[col['id'],sortidx,0] for key in col: if key=='id': self.add_attribute(key,pp[col[key],sortidx,0].astype('int')) elif (select_col is None or key in select_col or key in ('x','y','z')): self.add_attribute(key,pp[col[key],sortidx,0]) self.period = period self.time = time def __getitem__(self,key): return self.get_attribute(key) def __str__(self): str = '{:d} particles with\n'.format(self.num) str += ' attributes:' for key in self.attr.keys(): str += ' {}'.format(key) str += '\n' str+= ' period: {}, {}, {}'.format(*self.period) return str def add_attribute(self,key,val): import numpy if isinstance(val,(tuple,list,numpy.ndarray)): assert len(val)==self.num and val.ndim==1, "Invalid 'val'." self.attr[key] = numpy.array(val) else: self.attr[key] = numpy.full(self.num,val) return def del_attribute(self,key): del self.attr[key] def get_attribute(self,key,id=None): import numpy if id is not None: assert id>=1, "Particle IDs start with 1." assert id<=self.num, "Requested ID exceeds number of particles." if id is None: return self.attr[key] else: return self.attr[key][id-1] def get_position(self,axis=None,id=None): import numpy if axis is None: return numpy.vstack([self.get_attribute(key,id=id) for key in ('x','y','z')]) else: assert axis<3, "'axis' must be smaller than 3." key = ('x','y','z')[axis] return self.get_attribute(key) def has_attribute(self,key): return key in self.attr def translate(self,translation,axis=0): '''Translates particles while taking into account periodicity.''' assert axis<3, "'axis' must be smaller than 3." key = ('x','y','z')[axis] self.attr[key] += translation if self.period[axis] is not None: self.attr[key] %= self.period[axis] return class Trajectories: def __init__(self,part,copy=False): import numpy self.numtime = 1 self.numpart = part.num self.copy = copy # Attributes are first stored as a list of view to instances of # Particles. Later they will be concatenated into numpy array # and the attributes of Particles will be views. self.attr = {} self.part = [] self._is_data_list = True if copy: self.part.append(deepcopy(part)) else: self.part.append(part) for key in self.part[0].attr: self.attr[key] = [self.part[0].attr[key].view()] self.period = self.part[0].period self.unraveled = False def __str__(self): str = 'Trajectory with\n' str+= ' time steps: {:d}\n'.format(self.numtime) str+= ' particles: {:d}\n'.format(self.numpart) str+= ' unraveled: {}'.format(self.unraveled) return str def __iadd__(self,other): if isinstance(other,Trajectories): self.add_trajectories(other) elif isinstance(other,Particles): self.add_particles(other) else: raise TypeError('Cannot add type {}'.format(type(other))) return self def __getitem__(self,val): assert isinstance(val,tuple) and len(val)==2, "Trajectories must be indexed by [part,time]." sl = [] for x in val: if isinstance(x,int): lo,hi = x,x+1 hi = hi if hi!=0 else None sl.append(slice(lo,hi)) elif isinstance(x,slice): sl.append(x) else: raise TypeError("Trajectories can only be sliced by slice objects or integers.") return self.get_trajectories(slice_part=sl[0],slice_time=sl[1]) @classmethod def from_mat(cls,file,unraveled=False): from .helper import load_mat pp,col,time,ccinfo = load_mat(file,['pp','colpy','time','ccinfo']) period = [None,None,None] if bool(ccinfo['xperiodic']): period[0]=ccinfo['b'] if bool(ccinfo['yperiodic']): period[1]=ccinfo['d'] if bool(ccinfo['zperiodic']): period[2]=ccinfo['f'] for key in col: col[key]-=1 ntime = len(time) traj = cls(Particles(pp[:,:,0],col,time[0],period)) for ii in range(1,ntime): traj += Particles(pp[:,:,ii],col,time[ii],period) traj.unraveled = unraveled return traj def add_particles(self,part): import numpy # Verify part assert part.time>self.part[-1].time, "Time steps must be added in monotonically increasing order." assert part.num==self.numpart, "Number of particles is different from previous time steps." assert all(part['id']==self.part[0].attr['id']), "Particle IDs differ or a not in the same order." assert all([part.period[ii]==self.period[ii] for ii in range(0,3)]), "Period differs!" for key in self.attr: assert key in part.attr, "Particles to be added are missing attribute '{}'".format(key) for key in part.attr: assert key in self.attr, "Particles to be added have additional attribute '{}'".format(key) # Add new data self._make_data_list() if self.copy: self.part.append(deepcopy(part)) else: self.part.append(part) for key in self.attr: self.attr[key].append(self.part[-1].attr[key].view()) self.numtime += 1 return def add_trajectories(self,traj): # Verify traj assert traj.part[0].time>self.part[-1].time, "Time steps must be added in monotonically increasing order." assert traj.numpart==self.numpart, "Number of particles is different from previous time steps." assert all([traj.period[ii]==self.period[ii] for ii in range(0,3)]), "Period differs!" for key in self.attr: assert key in traj.attr, "Trajectories to be added are missing attribute '{}'".format(key) for key in traj.attr: assert key in self.attr, "Trajectories to be added have additional attribute '{}'".format(key) # Add new data self._make_data_list() traj._make_data_list() if self.copy: self.part += deepcopy(traj.part) else: self.part += traj.part for ii in range(0,traj.numtime): for key in self.attr: self.attr[key].append(self.part[-traj.numtime+ii].attr[key].view()) self.numtime += traj.numtime return def get_time(self): import numpy time = numpy.zeros(self.numtime) for ii,part in enumerate(self.part): time[ii] = part.time return time def get_trajectories(self,slice_part=slice(None),slice_time=slice(None)): '''Get (x,y,z) trajectories in numpy array of dimension (3,npart,ntime).''' import numpy assert isinstance(slice_part,slice), "'slice_part' must be a slice." assert isinstance(slice_time,slice), "'slice_time' must be a slice." self._make_data_array() return numpy.stack([ self.attr['x'][slice_part,slice_time], self.attr['y'][slice_part,slice_time], self.attr['z'][slice_part,slice_time]],axis=0) def get_trajectories_segmented(self,slice_part=slice(None),slice_time=slice(None),restore_ravel_state=True): '''Get (x,y,z) segments of trajectories as a tuple (len: npart) of lists (len: nsegments) of numpy arrays (shape: 3 X ntime of the segment)''' import numpy assert isinstance(slice_part,slice), "'slice_part' must be a slice." assert isinstance(slice_time,slice), "'slice_time' must be a slice." was_unraveled = self.unraveled self.unravel() xyzpath = self.get_trajectories(slice_part=slice_part,slice_time=slice_time) npart = xyzpath.shape[1] ntime = xyzpath.shape[2] # Initialize output and helper arrays out = tuple([] for ii in range(npart)) lastJump = numpy.zeros(npart,dtype=numpy.uint) lastPeriod = numpy.zeros((3,npart),dtype=numpy.int) for axis in range(3): if self.period[axis] is not None: lastPeriod[axis,:] = numpy.floor_divide(xyzpath[axis,:,0],self.period[axis]) # Construct output tuple for itime in range(1,ntime+1): thisPeriod = numpy.zeros((3,npart),dtype=int) if itime==ntime: hasJumped = numpy.ones(npart,dtype=bool) else: for axis in range(3): if self.period[axis] is not None: thisPeriod[axis,:] = numpy.floor_divide(xyzpath[axis,:,itime],self.period[axis]) hasJumped = numpy.any(thisPeriod!=lastPeriod,axis=0) for ipart in range(npart): if hasJumped[ipart]: sl = slice(lastJump[ipart],itime) segment = xyzpath[:,ipart,sl].copy() for axis in range(3): if self.period[axis] is not None: segment[axis,:] -= lastPeriod[axis,ipart]*self.period[axis] out[ipart].append(segment) lastJump[ipart] = itime lastPeriod = thisPeriod # Restore previous ravel state if restore_ravel_state and not was_unraveled: self.ravel() return out def unravel(self): import numpy if self.unraveled: return raise NotImplementedError('Implemented but untested!') self._make_data_array() for axis in range(0,3): if self.period[axis] is not None: key = ('x','y','z')[axis] #posdiff = (self.part[ii].attr[key]-self.part[ii-1].attr[key]).squeeze() posdiff = self.attr[key][:,1:]-self.attr[key][:,0:-1] coeff = -numpy.sign(posdiff)*(numpy.abs(posdiff)>0.5*self.period[axis]) coeff = numpy.cumsum(coeff,axis=1) self.attr[key][:,1:] += coeff*self.period[axis] self.unraveled = True return def ravel(self): if not self.unraveled: return raise NotImplementedError('Implemented but untested!') for axis in range(0,3): if self.period[axis] is not None: key = ('x','y','z')[axis] self.part[ii].attr[key] %= self.period[axis] self.unraveled = False return def _make_data_array(self): '''Transforms self.attr[key] to a numpy array of dimension npart X ntime and updates self.part[itime].attr[key] to views on self.attr[key][:,itime].''' import numpy #print('DEBUG: _make_data_array') if not self._is_data_list: return #print([x.shape for x in self.attr['x']]) for key in self.attr: self.attr[key] = numpy.stack(self.attr[key],axis=1) for ii in range(0,self.numtime): self.part[ii].attr[key] = self.attr[key][:,ii] #print(self.attr['x'].shape) self._is_data_list = False return def _make_data_list(self): '''Transforms self.attr[key] to a list of length ntime of numpy arrays of dimension npart and updates self.part[itime].attr[key] to views on self.attr[key][itime].''' import numpy #print('DEBUG: _make_data_list') if self._is_data_list: return #print(self.attr['x'].shape) for key in self.attr: self.attr[key] = [numpy.squeeze(x) for x in numpy.split(self.attr[key],self.numtime,axis=1)] for ii in range(0,self.numtime): self.part[ii].attr[key] = self.attr[key][ii].view() #print([x.shape for x in self.attr['x']]) self._is_data_list = True return def sort(pp,col): ncol,npart,ntime = pp.shape assert('id' in col) for itime in range(0,ntime): idx = pp[col['id'],:,itime].squeeze().argsort() pp[:,:,itime] = pp[:,idx,itime] return pp # def mask(pp,col,field,reconstruct=False,cval=np.nan,padding=0.0): # from .field import Field3d # '''Fills grid points which lie inside of solid phase with values.''' # # Loop through local particles # for ipart in range(0,self.__npartl): # # Slice a box from the field around the particle # xp,yp,zp,rp = (self.particles[self.col['x'],ipart], # self.particles[self.col['y'],ipart], # self.particles[self.col['z'],ipart], # self.particles[self.col['r'],ipart]) # rp += padding*self.__dx[key] # # Get coefficients for reconstruction # if reconstruct: # coeff_lin = self.particles[self.col[key],ipart] # if key=='u': # coeff_rotx = 0.0 # coeff_roty = -self.particles[self.col['oz'],ipart] # coeff_rotz = self.particles[self.col['oy'],ipart] # elif key=='v': # coeff_rotx = self.particles[self.col['oz'],ipart] # coeff_roty = 0.0 # coeff_rotz = -self.particles[self.col['ox'],ipart] # elif key=='w': # coeff_rotx = -self.particles[self.col['oy'],ipart] # coeff_roty = self.particles[self.col['ox'],ipart] # coeff_rotz = 0.0 # else: # coeff_rotx = 0.0 # coeff_roty = 0.0 # coeff_rotz = 0.0 # # Get bounding box of particle # idx_x = np.nonzero((xg>=xp-rp) & (xg<=xp+rp))[0] # idx_y = np.nonzero((yg>=yp-rp) & (yg<=yp+rp))[0] # idx_z = np.nonzero((zg>=zp-rp) & (zg<=zp+rp))[0] # # Triple for loop # for ii in range(idx_x[0],idx_x[-1]+1): # Dx = xg[ii]-xp # for jj in range(idx_y[0],idx_y[-1]+1): # Dy = yg[jj]-yp # for kk in range(idx_z[0],idx_z[-1]+1): # Dz = zg[kk]-zp # isInside = Dx*Dx+Dy*Dy+Dz*Dz <= rp*rp # if isInside: # if reconstruct: # self.field[key][ii,jj,kk] = coeff_lin + coeff_rotx*Dx + coeff_roty*Dy + coeff_rotz*Dz # else: # self.field[key][ii,jj,kk] = cval def slice_columns(pp,col,keys): idx_col = [] col_new = {} ii = 0 for key in keys: idx_col.append(col[key]) col_new[key] = ii ii+=1 return pp[idx_col,:,:], col_new def translate_circular(pp,col,translation,bounds,axis=0): '''Translates particles while taking into account the bounding box''' assert(axis<3) L = bounds[2*axis+1] keys = ('x','y','z') pp[col[keys[axis]],:,:] = (pp[col[keys[axis]],:,:]+translation)%L return pp