suspendtools/field.py

312 lines
15 KiB
Python

import numpy
class Field3d:
def __init__(self,data,origin,spacing):
assert len(origin)==3, "'origin' must be of length 3"
assert len(spacing)==3, "'spacing' must be of length 3"
self.data = numpy.array(data)
self.numpoints = self.data.shape
self.origin = tuple([float(x) for x in origin])
self.spacing = tuple([float(x) for x in spacing])
self.eps_collapse = 1e-12
return
def __str__(self):
str = 'Field3d with\n'
str+= ' dimension: {}, {}, {}\n'.format(*self.numpoints)
str+= ' origin: {:G}, {:G}, {:G}\n'.format(*self.origin)
str+= ' spacing: {:G}, {:G}, {:G}'.format(*self.spacing)
return str
# TBD: this should return another Field3d object
# def __getitem__(self,val):
# assert isinstance(val,tuple) and len(val)==3, "Field3d must be indexed by [ii,jj,kk]."
# sl = []
# for x in val:
# if isinstance(x,int):
# lo,hi = x,x+1
# hi = hi if hi!=0 else None
# sl.append(slice(lo,hi))
# elif isinstance(x,slice):
# sl.append(x)
# else:
# raise TypeError("Trajectories can only be sliced by slice objects or integers.")
# return self.data[sl[0],sl[1],sl[2]]
@classmethod
def from_chunk(cls,chunk,gridg):
'''Initialize Field3d from chunk data and global grid.'''
xg,yg,zg = gridg
ib,jb,kb = chunk['ibeg']-1, chunk['jbeg']-1, chunk['kbeg']-1
dx,dy,dz = xg[1]-xg[0], yg[1]-yg[0], zg[1]-zg[0]
xo,yo,zo = xg[ib]-chunk['ighost']*dx, yg[jb]-chunk['ighost']*dy, zg[kb]-chunk['ighost']*dz
nx,ny,nz = chunk['data'].shape
assert (chunk['nxl']+2*chunk['ighost'])==nx, "Invalid chunk data: nxl != chunk['data'].shape[0]"
assert (chunk['nyl']+2*chunk['ighost'])==ny, "Invalid chunk data: nyl != chunk['data'].shape[1]"
assert (chunk['nzl']+2*chunk['ighost'])==nz, "Invalid chunk data: nzl != chunk['data'].shape[2]"
return cls(chunk['data'],origin=(xo,yo,zo),spacing=(dx,dy,dz))
def insert_subfield(self,subfield):
assert all([abs(subfield.spacing[ii]-self.spacing[ii])<self.eps_collapse
for ii in range(3)]), "spacing differs."
assert all([self.distance_to_nearest_gridpoint(subfield.origin[ii],axis=ii)<self.eps_collapse
for ii in range(3)]), "subfield has shifted origin."
assert all(self.is_within_bounds(subfield.origin,axis=None)), "subfield origin is out-of-bounds."
assert all(self.is_within_bounds(subfield.endpoint(),axis=None)), "subfield endpoint is out-of-bounds."
#ib,jb,kb = [int(round((subfield.origin[ii]-self.origin[ii])/self.spacing[ii])) for ii in range(3)]
ib,jb,kb = self.nearest_gridpoint(subfield.origin,axis=None)
nx,ny,nz = subfield.numpoints
self.data[ib:ib+nx,jb:jb+ny,kb:kb+nz] = subfield.data[:,:,:]
return
def extract_subfield(self,idx_origin,numpoints,stride=(1,1,1)):
assert all(idx_origin[ii]>=0 and idx_origin[ii]<self.numpoints[ii] for ii in range(3)),\
"'origin' is out-of-bounds."
assert all(idx_origin[ii]+stride[ii]*(numpoints[ii]-1)<self.numpoints[ii] for ii in range(3)),\
"endpoint is out-of-bounds."
sl = tuple(slice(idx_origin[ii],idx_origin[ii]+stride[ii]*numpoints[ii],stride[ii]) for ii in range(3))
origin = self.coordinate(idx_origin)
spacing = tuple(self.spacing[ii]*stride[ii] for ii in range(3))
data = self.data[sl].copy()
return Field3d(data,origin,spacing)
def coordinate(self,idx,axis=None):
if axis is None:
assert len(idx)==3, "If 'axis' is None, 'idx' must be a tuple/list of length 3."
return tuple(self.coordinate(idx[ii],axis=ii) for ii in range(3))
assert axis<3, "'axis' must be one of 0,1,2."
assert idx<self.numpoints[axis], "'idx' is out-of-bounds."
return self.origin[axis]+idx*self.spacing[axis]
def nearest_gridpoint(self,coord,axis=None,lower=False):
if axis is None:
assert len(coord)==3, "If 'axis' is None, 'coord' must be a tuple/list of length 3."
return tuple(self.nearest_gridpoint(coord[ii],axis=ii,lower=lower) for ii in range(3))
assert axis<3, "'axis' must be one of 0,1,2."
if lower:
return numpy.floor((coord+self.eps_collapse-self.origin[axis])/self.spacing[axis]).astype('int')
else:
return numpy.round((coord-self.origin[axis])/self.spacing[axis]).astype('int')
def distance_to_nearest_gridpoint(self,coord,axis=None,lower=False):
if axis is None:
assert len(coord)==3, "If 'axis' is None, 'coord' must be a tuple/list of length 3."
return tuple(self.distance_to_nearest_gridpoint(coord[ii],axis=ii,lower=lower) for ii in range(3))
assert axis<3, "'axis' must be one of 0,1,2."
val = numpy.remainder(coord+self.eps_collapse-self.origin[axis],self.spacing[axis])-self.eps_collapse
if not lower and val>0.5*self.spacing[axis]:
val = self.spacing[axis]-val
return val
def is_within_bounds(self,coord,axis=None):
if axis is None:
assert len(coord)==3, "If 'axis' is None, 'coord' must be a tuple/list of length 3."
return tuple(self.is_within_bounds(coord[ii],axis=ii) for ii in range(3))
assert axis<3, "'axis' must be one of 0,1,2."
idx_nearest = self.nearest_gridpoint(coord,axis=axis)
if idx_nearest>0 and idx_nearest<self.numpoints[axis]-1:
return True
dist_nearest = self.distance_to_nearest_gridpoint(coord,axis=axis)
if (idx_nearest==0 or idx_nearest==self.numpoints[axis]-1) and abs(dist_nearest)<self.eps_collapse:
return True
else:
return False
def dim(self,axis=None):
if axis is None:
return tuple(self.dim(axis=ii) for ii in range(3))
assert axis<3, "'axis' must be one of 0,1,2."
return self.numpoints[axis]
def endpoint(self,axis=None):
if axis is None:
return tuple(self.endpoint(axis=ii) for ii in range(3))
assert axis<3, "'axis' must be one of 0,1,2."
return self.origin[axis]+(self.numpoints[axis]-1)*self.spacing[axis]
def derivative(self,axis,preserve_grid=False,padding=None):
if not preserve_grid:
origin = list(self.origin)
if padding is None:
data = numpy.zeros(self.data.shape,dtype=self.data.dtype)
slout = [slice(None),slice(None),slice(None)]
origin[axis] += 0.5*self.spacing[axis]
elif padding=='before':
shape = numpy.array(self.data.shape)
shape[axis] += 1
data = numpy.zeros(tuple(shape),dtype=self.data.dtype)
slout = [slice(None),slice(None),slice(None)]
slout[axis] = slice(1,None)
origin[axis] -= 0.5*self.spacing[axis]
elif padding=='after':
shape = numpy.array(self.data.shape)
shape[axis] += 1
data = numpy.zeros(tuple(shape),dtype=self.data.dtype)
slout = [slice(None),slice(None),slice(None)]
slout[axis] = slice(0,-1)
origin[axis] += 0.5*self.spacing[axis]
elif padding=='both':
shape = numpy.array(self.data.shape)
shape[axis] += 2
data = numpy.zeros(tuple(shape),dtype=self.data.dtype)
slout = [slice(None),slice(None),slice(None)]
slout[axis] = slice(1,-1)
origin[axis] -= 0.5*self.spacing[axis]
else:
raise ValueError("'padding' must be one of 'none','before','after'.")
slout = tuple(slout)
# 2nd order FD with h = dx/2
slhi = [slice(None),slice(None),slice(None)]
sllo = [slice(None),slice(None),slice(None)]
slhi[axis] = slice(1,None)
sllo[axis] = slice(0,-1)
slhi = tuple(slhi)
sllo = tuple(sllo)
data[slout] = (1./self.spacing[axis])*(self.data[slhi]-self.data[sllo])
else:
# 2nd order FD with h = dx, one-sided at boundaries
# https://numpy.org/doc/stable/reference/generated/numpy.gradient.html
origin = self.origin
data = numpy.gradient(self.data,self.spacing[axis],axis=axis,edge_order=2)
return Field3d(data,origin,self.spacing)
def gradient(self,axis,preserve_origin=False,padding=None):
return [self.derivative(axis,preserve_origin=preserve_origin,padding=padding) for axis in range(0,3)]
def gaussian_filter(self,sigma,truncate=4.0,border='constant',const_val=numpy.nan):
'''Applies a gaussian filter: sigma is standard deviation for Gaussian kernel for each axis.'''
from scipy import ndimage
assert isinstance(sigma,(tuple,list,numpy.ndarray)) and len(sigma)==3,\
"'sigma' must be a tuple/list of length 3"
# Convert sigma from simulation length scales to grid points as required by ndimage
sigma_img = tuple(sigma[ii]/self.spacing[ii] for ii in range(3))
data = ndimage.gaussian_filter(self.data,sigma_img,truncate=truncate,mode=border,cval=const_val)
print(data.shape,self.data.shape)
print(numpy.linalg.norm(data-self.data))
print(numpy.max(data))
return Field3d(data,self.origin,self.spacing)
def gaussian_filter_radius(self,sigma,truncate=4.0):
'''Radius of Gaussian filter. Stencil width is 2*radius+1.'''
assert isinstance(sigma,(tuple,list,numpy.ndarray)) and len(sigma)==3,\
"'sigma' must be a tuple/list of length 3"
# Convert sigma from simulation length scales to grid points as required by ndimage
sigma_img = tuple(sigma[ii]/self.spacing[ii] for ii in range(3))
radius = []
for ii in range(3):
radius.append(int(truncate*sigma_img[ii]+0.5))
return tuple(radius)
def change_grid(self,origin,spacing,numpoints):
assert all([origin[ii]>=self.origin[ii] for ii in range(0,3)]), "New origin is out of bounds."
endpoint = [origin[ii]+(numpoints[ii]-1)*spacing[ii] for ii in range(0,3)]
assert all([endpoint[ii]<=self.endpoint(ii) for ii in range(0,3)]), "New end point is out of bounds."
data = numpy.zeros(numpoints)
if numpy.allclose(spacing,self.spacing):
# spacing is the same: we can construct universal weights for the stencil
i0,j0,k0 = self.nearest_gridpoint(origin,axis=None,lower=True)
cx,cy,cz = [self.distance_to_nearest_gridpoint(origin[ii],axis=ii,lower=True)/self.spacing[ii]
for ii in range(3)]
c = self.weights_trilinear((cx,cy,cz))
for ii in range(0,2):
for jj in range(0,2):
for kk in range(0,2):
if c[ii,jj,kk]>self.eps_collapse:
data += c[ii,jj,kk]*self.data[
i0+ii:i0+ii+numpoints[0],
j0+jj:j0+jj+numpoints[1],
k0+kk:k0+kk+numpoints[2]]
else:
for ii in range(0,numpoints[0]):
for jj in range(0,numpoints[1]):
for kk in range(0,numpoints[2]):
coord = (
origin[0]+ii*spacing[0],
origin[1]+jj*spacing[1],
origin[2]+kk*spacing[2])
data[ii,jj,kk] = self.interpolate(coord)
return Field3d(data,origin,spacing)
def interpolate(self,coord):
assert all([coord[ii]>=self.origin[ii] for ii in range(0,3)]), "'coord' is out of bounds."
assert all([coord[ii]<=self.endpoint(ii) for ii in range(0,3)]), "'coord' is out of bounds."
i0,j0,k0 = self.nearest_gridpoint(coord,axis=None,lower=True)
cx,cy,cz = [self.distance_to_nearest_gridpoint(coord[ii],axis=ii,lower=True)/self.spacing[ii]
for ii in range(3)]
c = self.weights_trilinear((cx,cy,cz))
val = 0.0
for ii in range(0,2):
for jj in range(0,2):
for kk in range(0,2):
if c[ii,jj,kk]>self.eps_collapse:
val += c[ii,jj,kk]*self.data[i0+ii,j0+jj,k0+kk]
return val
def weights_trilinear(self,rel_dist):
assert len(rel_dist)==3, "len(rel_dist) must be 3."
cx,cy,cz = rel_dist
if cx<0.0 and cx>-self.eps_collapse: cx=0.0
if cy<0.0 and cy>-self.eps_collapse: cy=0.0
if cz<0.0 and cz>-self.eps_collapse: cz=0.0
if cx>1.0 and cx<1.0+self.eps_collapse: cx=1.0
if cy>1.0 and cy<1.0+self.eps_collapse: cy=1.0
if cz>1.0 and cz<1.0+self.eps_collapse: cz=1.0
assert cx>=0.0 and cy>=0.0 and cz>=0.0, "'rel_dist' must be >=0"
assert cx<=1.0 and cy<=1.0 and cz<=1.0, "'rel_dist' must be <=1"
c = numpy.zeros((2,2,2))
c[0,0,0] = 1.0-(cx+cy+cz)+(cx*cy+cx*cz+cy*cz)-(cx*cy*cz)
c[1,0,0] = cx-(cx*cy+cx*cz)+(cx*cy*cz)
c[0,1,0] = cy-(cx*cy+cy*cz)+(cx*cy*cz)
c[0,0,1] = cz-(cx*cz+cy*cz)+(cx*cy*cz)
c[1,1,0] = (cx*cy)-(cx*cy*cz)
c[1,0,1] = (cx*cz)-(cx*cy*cz)
c[0,1,1] = (cy*cz)-(cx*cy*cz)
c[1,1,1] = (cx*cy*cz)
return c
def set_writable(self,flag):
self.data.setflags(write=flag)
return
def to_vtk(self):
import pyvista as pv
mesh = pv.UniformGrid()
mesh.dimensions = self.dim(axis=None)
mesh.origin = self.origin
mesh.spacing = self.spacing
mesh.point_arrays['data'] = self.data.flatten(order='F')
return mesh
def vtk_contour(self,val):
if not isinstance(val,(tuple,list)):
val = [val]
return self.to_vtk().contour(val)
def vtk_slice(self,normal,origin):
assert (normal in ('x','y','z') or (isinstance(normal,(tuple,list))
and len(normal)==3)), "'normal' must be 'x','y','z' or tuple of length 3."
assert isinstance(origin,(tuple,list)) and len(origin)==3,\
"'origin' must be tuple of length 3."
return self.to_vtk().slice(normal=normal,origin=origin)
class ChunkIterator:
'''Iterates through all chunks. 'snapshot' must be an instance
of a class which returns a Field3d from the method call
snapshot.field_chunk(rank,key,keep_ghost=keep_ghost).
One example implementation is UCFSnapshot from suspendtools.ucf.'''
def __init__(self,snapshot,key,keep_ghost=True):
self.snapshot = snapshot
self.key = key
self.keep_ghost = keep_ghost
self.iter_rank = 0
def __iter__(self):
self.iter_rank = 0
return self
def __next__(self):
if self.iter_rank<self.snapshot.nproc():
field = self.snapshot.field_chunk(
self.iter_rank,self.key,keep_ghost=self.keep_ghost)
self.iter_rank += 1
return field
else:
raise StopIteration