suspendtools/field.py

1441 lines
68 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import numpy as np
class Field3d:
def __init__(self,data,origin,spacing,deep=False):
assert len(origin)==3, "'origin' must be of length 3"
assert len(spacing)==3, "'spacing' must be of length 3"
assert isinstance(data,np.ndarray), "'data' must be numpy.ndarray."
if deep:
self.data = data.copy()
else:
self.data = data
self.origin = tuple([float(x) for x in origin])
self.spacing = tuple([float(x) for x in spacing])
self.eps_collapse = 1e-4*np.min(spacing)
self._dim = None
return
def __str__(self):
str = 'Field3d with\n'
str+= ' dimension: {}, {}, {}\n'.format(*self.dim())
str+= ' origin: {:G}, {:G}, {:G}\n'.format(*self.origin)
str+= ' spacing: {:G}, {:G}, {:G}\n'.format(*self.spacing)
str+= ' datatype: {}'.format(self.dtype())
return str
def __add__(self,other):
if isinstance(other,Field3d):
assert self.has_same_grid(other), "Grid mismatch."
return Field3d(self.data+other.data,self.origin,self.spacing)
else:
return Field3d(self.data+other,self.origin,self.spacing)
def __sub__(self,other):
if isinstance(other,Field3d):
assert self.has_same_grid(other), "Grid mismatch."
return Field3d(self.data-other.data,self.origin,self.spacing)
else:
return Field3d(self.data-other,self.origin,self.spacing)
def __mul__(self,other):
if isinstance(other,Field3d):
assert self.has_same_grid(other), "Grid mismatch."
return Field3d(self.data*other.data,self.origin,self.spacing)
else:
return Field3d(self.data*other,self.origin,self.spacing)
def __truediv__(self,other):
if isinstance(other,Field3d):
assert self.has_same_grid(other), "Grid mismatch."
return Field3d(self.data/other.data,self.origin,self.spacing)
else:
return Field3d(self.data/other,self.origin,self.spacing)
def __radd__(self,other):
return Field3d(other+self.data,self.origin,self.spacing)
def __rmul__(self,other):
return Field3d(other*self.data,self.origin,self.spacing)
def __pow__(self,other):
return Field3d(self.data**other,self.origin,self.spacing)
def __iadd__(self,other):
if isinstance(other,Field3d):
assert self.has_same_grid(other), "Grid mismatch."
self.data += other.data
else:
self.data += other
return self
def __isub__(self,other):
if isinstance(other,Field3d):
assert self.has_same_grid(other), "Grid mismatch."
self.data -= other.data
else:
self.data -= other
return self
def __imul__(self,other):
if isinstance(other,Field3d):
assert self.has_same_grid(other), "Grid mismatch."
self.data *= other.data
else:
self.data *= other
return self
def __itruediv__(self,other):
if isinstance(other,Field3d):
assert self.has_same_grid(other), "Grid mismatch."
self.data /= other.data
else:
self.data /= other
return self
# TBD: this should return another Field3d object
# def __getitem__(self,val):
# assert isinstance(val,tuple) and len(val)==3, "Field3d must be indexed by [ii,jj,kk]."
# sl = []
# for x in val:
# if isinstance(x,int):
# lo,hi = x,x+1
# hi = hi if hi!=0 else None
# sl.append(slice(lo,hi))
# elif isinstance(x,slice):
# sl.append(x)
# else:
# raise TypeError("Trajectories can only be sliced by slice objects or integers.")
# return self.data[sl[0],sl[1],sl[2]]
@classmethod
def from_chunk(cls,chunk,gridg):
'''Initialize Field3d from chunk data and global grid.'''
xg,yg,zg = gridg
ib,jb,kb = chunk['ibeg']-1, chunk['jbeg']-1, chunk['kbeg']-1
dx,dy,dz = xg[1]-xg[0], yg[1]-yg[0], zg[1]-zg[0]
xo,yo,zo = xg[ib]-chunk['ighost']*dx, yg[jb]-chunk['ighost']*dy, zg[kb]-chunk['ighost']*dz
nx,ny,nz = chunk['data'].shape
assert (chunk['nxl']+2*chunk['ighost'])==nx, "Invalid chunk data: nxl != chunk['data'].shape[0]"
assert (chunk['nyl']+2*chunk['ighost'])==ny, "Invalid chunk data: nyl != chunk['data'].shape[1]"
assert (chunk['nzl']+2*chunk['ighost'])==nz, "Invalid chunk data: nzl != chunk['data'].shape[2]"
return cls(chunk['data'],origin=(xo,yo,zo),spacing=(dx,dy,dz))
@classmethod
def from_file(cls,file,name='Field3d'):
import h5py
is_open = isinstance(file,(h5py.File,h5py.Group))
f = file if is_open else h5py.File(file,'r')
g = f[name]
origin = tuple(g['origin'])
spacing = tuple(g['spacing'])
data = g['data'][:]
if not is_open: f.close()
return cls(data,origin,spacing)
@classmethod
def from_snapshot(cls,snap,key,procslice=None,dtype=np.float64):
'''
Construct a Field3d from a field saved in an UCF snapshot.
Use procslice = (ib,ie,jb,je,kb,je) to load only a block
of the full data.
'''
from .helper import eval_procslice, key_grid, rank_from_position
proc = snap.procgrid()
origin = snap.origin()
spacing = snap.spacing()
periodicity = snap.periodicity()
bounds = snap.bounds()
keyg = key_grid(key)
nxp,nyp,nzp = [len(proc[keyg][2*ii]) for ii in range(3)]
# Get field dimensions
(ipb,ipe,jpb,jpe,kpb,kpe),ori,(nxl,nyl,nzl) = eval_procslice(
proc,origin,spacing,keyg,procslice)
# Create a Field3d: ghost cells will be included!
data = np.empty((nxl+2,nyl+2,nzl+2),dtype=dtype)
field_ = cls(data,[ori[ii]-spacing[ii] for ii in range(3)],spacing)
# Go through each chunk to be read and construct the field
for ip_ in range(ipb,ipe+1):
for jp_ in range(jpb,jpe+1):
for kp_ in range(kpb,kpe+1):
# Determine rank of the chunk to be read
rank_ = rank_from_position(ip_,jp_,kp_,nyp,nzp)
# Compute bounds of this chunk
ib_ = proc[keyg][0][ip_]
ie_ = proc[keyg][1][ip_]
jb_ = proc[keyg][2][jp_]
je_ = proc[keyg][3][jp_]
kb_ = proc[keyg][4][kp_]
ke_ = proc[keyg][5][kp_]
nxl_ = ie_-ib_+1
nyl_ = je_-jb_+1
nzl_ = ke_-kb_+1
# Read data and insert it
subfield = snap.field_chunk(rank_,key)
field_.insert_subfield(subfield)
return field_
@classmethod
def allocate(cls,dim,origin,spacing,fill=None,dtype=np.float64):
'''Allocates an empty field, or a field filled with 'fill'.'''
assert isinstance(dim,(tuple,list,np.ndarray)) and len(dim)==3,\
"'dim' must be a tuple/list of length 3."
assert isinstance(origin,(tuple,list,np.ndarray)) and len(origin)==3,\
"'origin' must be a tuple/list of length 3."
assert isinstance(spacing,(tuple,list,np.ndarray)) and len(spacing)==3,\
"'spacing' must be a tuple/list of length 3."
if fill is None:
data = np.empty(dim,dtype=dtype)
else:
data = np.full(dim,fill,dtype=dtype)
return cls(data,origin,spacing)
@classmethod
def pseudo_field(cls,dim,origin,spacing):
'''Creates a Field3d instance without allocating any memory.'''
assert isinstance(dim,(tuple,list,np.ndarray)) and len(dim)==3,\
"'dim' must be a tuple/list of length 3."
assert isinstance(origin,(tuple,list,np.ndarray)) and len(origin)==3,\
"'origin' must be a tuple/list of length 3."
assert isinstance(spacing,(tuple,list,np.ndarray)) and len(spacing)==3,\
"'spacing' must be a tuple/list of length 3."
data = np.empty((0,0,0))
r = cls(data,origin,spacing)
r._dim = dim
return r
def save(self,file,name='Field3d',truncate=False,single_precision=False):
import h5py
is_open = isinstance(file,(h5py.File,h5py.Group))
flag = 'w' if truncate else 'a'
f = file if is_open else h5py.File(file,flag)
g = f.create_group(name)
g.create_dataset('origin',data=self.origin)
g.create_dataset('spacing',data=self.spacing)
if single_precision:
g.create_dataset('data',data=self.data,dtype='f4')
else:
g.create_dataset('data',data=self.data)
if not is_open: f.close()
return
def copy(self):
return Field3d(self.data.copy(),self.origin,self.spacing)
def pseudo_copy(self):
return self.pseudo_field(self.dim(),self.origin,self.spacing)
def insert_subfield(self,subfield):
assert all([abs(subfield.spacing[ii]-self.spacing[ii])<self.eps_collapse
for ii in range(3)]), "spacing differs. Got {}, have {}".format(subfield.spacing,self.spacing)
assert all([self.distance_to_nearest_gridpoint(subfield.origin[ii],axis=ii)<self.eps_collapse
for ii in range(3)]), "subfield has shifted origin."
assert all(self.is_within_bounds(subfield.origin,axis=None)), "subfield origin is out-of-bounds."
assert all(self.is_within_bounds(subfield.endpoint(),axis=None)), "subfield endpoint is out-of-bounds."
#ib,jb,kb = [int(round((subfield.origin[ii]-self.origin[ii])/self.spacing[ii])) for ii in range(3)]
ib,jb,kb = self.nearest_gridpoint(subfield.origin,axis=None)
nx,ny,nz = subfield.dim()
self.data[ib:ib+nx,jb:jb+ny,kb:kb+nz] = subfield.data[:,:,:]
return
def extract_subfield(self,idx_origin,dim,stride=(1,1,1),deep=False,strict_bounds=True):
if strict_bounds:
assert all(idx_origin[ii]>=0 and idx_origin[ii]<self.dim(axis=ii) for ii in range(3)),\
"'origin' is out-of-bounds."
assert all(idx_origin[ii]+stride[ii]*(dim[ii]-1)<self.dim(axis=ii) for ii in range(3)),\
"endpoint is out-of-bounds."
else:
for ii in range(3):
if idx_origin[ii]<0:
dim[ii] += idx_origin[ii]
idx_origin[ii] = 0
if idx_origin[ii]+stride[ii]*(dim[ii]-1)>=self.dim(axis=ii):
dim[ii] = (self.dim(axis=ii)-idx_origin[ii])//stride[ii]
if dim[ii]<=0:
return None
sl = tuple(slice(idx_origin[ii],
idx_origin[ii]+stride[ii]*dim[ii],
stride[ii]) for ii in range(3))
origin = self.coordinate(idx_origin)
spacing = tuple(self.spacing[ii]*stride[ii] for ii in range(3))
data = self.data[sl]
return Field3d(data,origin,spacing,deep=deep)
def clip(self,position,axis,invert=False,deep=False,is_relative=False):
'''Extracts a subfield by clipping with a plane with normal pointing
direction specified by 'axis'.'''
if is_relative:
coord = self.origin[axis] + coord*self.dim(axis=axis)*self.spacing[axis]
idx_clip = self.nearest_gridpoint(coord,axis=axis,lower=True)
sl = 3*[slice(None)]
origin_ = self.origin
spacing_ = self.spacing
if invert:
sl[axis] = slice(idx_clip+1,None)
origin_[axis] = self.origin[axis]+idx_clip*self.spacing[axis]
else:
sl[axis] = slice(0,idx_clip+1)
data_ = self.data[tuple(sl)]
return Field3d(data_,origin_,spacing_,deep=deep)
def clip_box(self,bounds,deep=False,is_relative=False):
'''Extracts a subfield by clipping with a box.'''
if is_relative:
bounds = tuple(self.origin[ii//2] + bounds[ii]*self.dim(ii//2)*self.spacing[ii//2] for ii in range(6))
idx_lo = self.nearest_gridpoint((bounds[0],bounds[2],bounds[4]),lower=True)
idx_hi = self.nearest_gridpoint((bounds[1],bounds[3],bounds[5]),lower=True)
idx_lo = tuple(0 if idx_lo[axis]<0 else idx_lo[axis] for axis in range(3))
idx_hi = tuple(0 if idx_hi[axis]<0 else idx_hi[axis] for axis in range(3))
sl_ = tuple([slice(idx_lo[0],idx_hi[0]+1),
slice(idx_lo[1],idx_hi[1]+1),
slice(idx_lo[2],idx_hi[2]+1)])
origin_ = tuple(self.origin[axis]+idx_lo[axis]*self.spacing[axis] for axis in range(3))
spacing_ = self.spacing
data_ = self.data[sl_]
return Field3d(data_,origin_,spacing_,deep=deep)
def threshold(self,val,invert=False):
'''Returns a binary array indicating which grid points are above,
or in case of invert=True below, a given threshold.'''
if invert:
return self.data<val
else:
return self.data>=val
def coordinate(self,idx,axis=None):
if axis is None:
assert len(idx)==3, "If 'axis' is None, 'idx' must be a tuple/list of length 3."
return tuple(self.coordinate(idx[ii],axis=ii) for ii in range(3))
assert axis<3, "'axis' must be one of 0,1,2."
assert idx<self.dim(axis=axis), "'idx' is out-of-bounds."
return self.origin[axis]+idx*self.spacing[axis]
def grid(self,axis=None):
if axis is None:
return tuple(self.grid(axis=ii) for ii in range(3))
assert axis<3, "'axis' must be one of 0,1,2."
return self.origin[axis]+np.arange(0,self.dim(axis=axis))*self.spacing[axis]
def x(self): return self.grid(axis=0)
def y(self): return self.grid(axis=1)
def z(self): return self.grid(axis=2)
def nearest_gridpoint(self,coord,axis=None,lower=False):
if axis is None:
assert len(coord)==3, "If 'axis' is None, 'coord' must be a tuple/list of length 3."
return tuple(self.nearest_gridpoint(coord[ii],axis=ii,lower=lower) for ii in range(3))
assert axis<3, "'axis' must be one of 0,1,2."
if lower:
return np.floor((coord+self.eps_collapse-self.origin[axis])/self.spacing[axis]).astype('int')
else:
return np.round((coord-self.origin[axis])/self.spacing[axis]).astype('int')
def distance_to_nearest_gridpoint(self,coord,axis=None,lower=False):
if axis is None:
assert len(coord)==3, "If 'axis' is None, 'coord' must be a tuple/list of length 3."
return tuple(self.distance_to_nearest_gridpoint(coord[ii],axis=ii,lower=lower) for ii in range(3))
assert axis<3, "'axis' must be one of 0,1,2."
val = np.remainder(coord+self.eps_collapse-self.origin[axis],self.spacing[axis])-self.eps_collapse
if not lower and val>0.5*self.spacing[axis]:
val = self.spacing[axis]-val
return val
def is_within_bounds(self,coord,axis=None):
if axis is None:
assert len(coord)==3, "If 'axis' is None, 'coord' must be a tuple/list of length 3."
return tuple(self.is_within_bounds(coord[ii],axis=ii) for ii in range(3))
assert axis<3, "'axis' must be one of 0,1,2."
idx_nearest = self.nearest_gridpoint(coord,axis=axis)
if idx_nearest>0 and idx_nearest<self.dim(axis=axis)-1:
return True
dist_nearest = self.distance_to_nearest_gridpoint(coord,axis=axis)
if (idx_nearest==0 or idx_nearest==self.dim(axis=axis)-1) and abs(dist_nearest)<self.eps_collapse:
return True
else:
return False
def has_same_grid(self,other):
if not self.has_same_origin(other): return False
if not self.has_same_spacing(other): return False
if not self.has_same_origin(other): return False
return True
def has_same_origin(self,other,axis=None):
if axis is None:
return all([self.has_same_origin(other,axis=ii) for ii in range(3)])
origin = other.origin if isinstance(other,Field3d) else other
return abs(origin[axis]-self.origin[axis])<self.eps_collapse
def has_same_spacing(self,other,axis=None):
if axis is None:
return all([self.has_same_spacing(other,axis=ii) for ii in range(3)])
spacing = other.spacing if isinstance(other,Field3d) else other
return abs(spacing[axis]-self.spacing[axis])<self.eps_collapse
def has_same_dim(self,other,axis=None):
if axis is None:
return all([self.has_same_dim(other,axis=ii) for ii in range(3)])
dim = other.dim(axis=axis) if isinstance(other,Field3d) else other
return dim==self.dim(axis=axis)
def dim(self,axis=None):
if axis is None:
return tuple(self.dim(axis=ii) for ii in range(3))
assert axis<3, "'axis' must be one of 0,1,2."
if self._dim is not None:
return self._dim[axis]
else:
return self.data.shape[axis]
def endpoint(self,axis=None):
if axis is None:
return tuple(self.endpoint(axis=ii) for ii in range(3))
assert axis<3, "'axis' must be one of 0,1,2."
return self.origin[axis]+(self.dim(axis=axis)-1)*self.spacing[axis]
def dtype(self):
return self.data.dtype
def convert_dtype(self,dtype):
self.data = self.data.astype(dtype,copy=False)
return
def derivative(self,axis,only_keep_interior=False,shift_origin='before'):
'''Computes derivative wrt to direction 'axis' with 2nd order finite differences
centered between the origin grid points'''
from scipy import ndimage
assert axis<3, "'axis' must be one of 0,1,2."
origin = list(self.origin)
assert shift_origin in ('before','after'), "'shift_origin' must be one of {'before','after'}."
if shift_origin=='before':
corr_orig = 0
origin[axis] -= 0.5*self.spacing[axis]
else:
corr_orig = -1
origin[axis] += 0.5*self.spacing[axis]
data = ndimage.correlate1d(self.data,np.array([-1.,1.])/self.spacing[axis],axis=axis,
mode='constant',cval=np.nan,origin=corr_orig)
if only_keep_interior:
sl = 3*[slice(None)]
if shift_origin=='before':
sl[axis] = slice(-1,None)
origin[axis] += self.spacing[axis]
else:
sl[axis] = slice(0,-1)
data = data[tuple(sl)]
return Field3d(data,origin,self.spacing)
def gradient(self,axis,preserve_origin=False,only_keep_interior=False,add_border='before'):
return [self.derivative(axis,preserve_origin=preserve_origin,
only_keep_interior=only_keep_interior,add_border=add_border) for axis in range(0,3)]
def laplacian(self,only_keep_interior=False):
'''Computes the Laplacian of a field.'''
from scipy import ndimage
data = ndimage.correlate1d(self.data,np.array([1.,-2.,1.])/self.spacing[0]**2,axis=0,mode='constant',cval=np.nan,origin=0)
data += ndimage.correlate1d(self.data,np.array([1.,-2.,1.])/self.spacing[1]**2,axis=1,mode='constant',cval=np.nan,origin=0)
data += ndimage.correlate1d(self.data,np.array([1.,-2.,1.])/self.spacing[2]**2,axis=2,mode='constant',cval=np.nan,origin=0)
origin = list(self.origin)
if only_keep_interior:
data = data[1:-1,1:-1,1:-1]
for axis in range(3):
origin[axis] = origin[axis]+self.spacing[axis]
return Field3d(data,origin,self.spacing)
def integral(self,integrate_axis,average=False,ignore_nan=False,return_weights=False,ufunc=None):
'''Computes the integral or average along a given axis applying the
function 'ufunc' to each node.'''
assert isinstance(integrate_axis,(list,tuple,np.ndarray)) and len(integrate_axis)==3,\
"'integrate_axis' must be a tuple/list of length 3."
assert all([isinstance(integrate_axis[ii],(bool,int)) for ii in range(3)]),\
"'integrate_axis' requires bool values."
assert any(integrate_axis), "'integrate_axis' must contain at least one True."
axes = []
weight = 1.0
for axis in range(3):
if integrate_axis[axis]:
axes.append(axis)
if average:
weight *= self.dim(axis=axis)
else:
weight *= self.spacing[axis]
axes = tuple(axes)
if ignore_nan:
if average:
weight = np.sum(~np.isnan(self.data),axis=axes,keepdims=True)
func_sum = np.nansum
else:
func_sum = np.sum
if ufunc is None:
out = func_sum(self.data,axis=axes,keepdims=True)
else:
assert isinstance(ufunc,np.ufunc), "'ufunc' needs to be a numpy ufunc. "\
"Check out https://numpy.org/doc/stable/reference/ufuncs.html for reference."
assert ufunc.nin==1, "Only ufunc with single input argument are supported for now."
out = func_sum(ufunc(self.data),axis=axes,keepdims=True)
if return_weights:
return (out,weight)
else:
return out/weight
def gaussian_filter(self,sigma,truncate=4.0,only_keep_interior=False):
'''Applies a gaussian filter: sigma is standard deviation for Gaussian kernel for each axis.'''
from scipy import ndimage
assert isinstance(sigma,(tuple,list,np.ndarray)) and len(sigma)==3,\
"'sigma' must be a tuple/list of length 3"
# Convert sigma from simulation length scales to grid points as required by ndimage
sigma_img = tuple(sigma[ii]/self.spacing[ii] for ii in range(3))
data = ndimage.gaussian_filter(self.data,sigma_img,truncate=truncate,mode='constant',cval=np.nan)
origin = list(self.origin)
if only_keep_interior:
r = self.gaussian_filter_radius(sigma,truncate=truncate)
data = data[r[0]:-r[0],r[1]:-r[1],r[2]:-r[2]]
for axis in range(3):
origin[axis] = origin[axis]+r[axis]*self.spacing[axis]
return Field3d(data,origin,self.spacing)
def gaussian_filter_radius(self,sigma,truncate=4.0):
'''Radius of Gaussian filter. Stencil width is 2*radius+1.'''
assert isinstance(sigma,(tuple,list,np.ndarray)) and len(sigma)==3,\
"'sigma' must be a tuple/list of length 3"
# Convert sigma from simulation length scales to grid points as required by ndimage
sigma_img = tuple(sigma[ii]/self.spacing[ii] for ii in range(3))
radius = []
for ii in range(3):
radius.append(int(truncate*sigma_img[ii]+0.5))
return tuple(radius)
def shift_origin(self,rel_shift,only_keep_interior=False):
'''Shifts the origin of a field by multiple of spacing.'''
from scipy import ndimage
assert isinstance(rel_shift,(tuple,list,np.ndarray)) and len(rel_shift)==3,\
"'shift' must be tuple/list with length 3."
assert all([rel_shift[ii]>=-1.0-self.eps_collapse and rel_shift[ii]<=1.0+self.eps_collapse for ii in range(3)]),\
"'shift' must be in (-1.0,1.0). {}".format(rel_shift)
data = self.data.copy()
origin = list(self.origin)
sl = 3*[slice(None)]
for axis in range(3):
if abs(rel_shift[axis])<self.eps_collapse:
continue
elif rel_shift[axis]>0:
w = rel_shift[axis] if rel_shift[axis]<=1.0 else 1.0
weights = (1.0-w,w)
data = ndimage.correlate1d(data,weights,axis=axis,mode='constant',cval=np.nan,origin=-1)
origin[axis] += w*self.spacing[axis]
if only_keep_interior:
sl[axis] = slice(0,-1)
else:
w = rel_shift[axis] if rel_shift[axis]>=-1.0 else -1.0
weights = (-w,1.0+w)
data = ndimage.correlate1d(data,weights,axis=axis,mode='constant',cval=np.nan,origin=0)
origin[axis] += w*self.spacing[axis]
if only_keep_interior:
sl[axis] = slice(1,None)
origin[axis] += self.spacing[axis]
if only_keep_interior:
data = data[sl]
return Field3d(data,origin,self.spacing)
def relative_shift(self,field):
'''Compute the relative shift (in terms of spacing) to shift self onto field.'''
assert self.has_same_spacing(field), "spacing differs."
rel_shift = [0.0,0.0,0.0]
for axis in range(3):
dist = field.origin[axis]-self.origin[axis]
if abs(dist)>self.eps_collapse:
rel_shift[axis] = dist/self.spacing[axis]
return tuple(rel_shift)
def change_grid(self,origin,spacing,dim,padding=None,numpad=1):
assert all([origin[ii]>=self.origin[ii] for ii in range(0,3)]), "New origin is out of bounds."
endpoint = [origin[ii]+(dim[ii]-1)*spacing[ii] for ii in range(0,3)]
assert all([endpoint[ii]<=self.endpoint(ii) for ii in range(0,3)]), "New end point is out of bounds."
# Allocate (possibly padded array)
origin_pad,dim_pad,sl_out = padding(origin,spacing,dim,padding,numpad)
data = np.zeros(dim_pad,dtype=self.data.dtype)
# Trilinear interpolation
if np.allclose(spacing,self.spacing):
# spacing is the same: we can construct universal weights for the stencil
i0,j0,k0 = self.nearest_gridpoint(origin,axis=None,lower=True)
cx,cy,cz = [self.distance_to_nearest_gridpoint(origin[ii],axis=ii,lower=True)/self.spacing[ii]
for ii in range(3)]
c = self.weights_trilinear((cx,cy,cz))
for ii in range(0,2):
for jj in range(0,2):
for kk in range(0,2):
if c[ii,jj,kk]>self.eps_collapse:
data[sl_out] += c[ii,jj,kk]*self.data[
i0+ii:i0+ii+dim[0],
j0+jj:j0+jj+dim[1],
k0+kk:k0+kk+dim[2]]
else:
data_ref = data[sl_out]
for ii in range(0,dim[0]):
for jj in range(0,dim[1]):
for kk in range(0,dim[2]):
coord = (
origin[0]+ii*spacing[0],
origin[1]+jj*spacing[1],
origin[2]+kk*spacing[2])
data_ref[ii,jj,kk] = self.interpolate(coord)
return Field3d(data,origin_pad,spacing)
def interpolate(self,coord):
assert all([coord[ii]>=self.origin[ii] for ii in range(0,3)]), "'coord' is out of bounds."
assert all([coord[ii]<=self.endpoint(ii) for ii in range(0,3)]), "'coord' is out of bounds."
i0,j0,k0 = self.nearest_gridpoint(coord,axis=None,lower=True)
cx,cy,cz = [self.distance_to_nearest_gridpoint(coord[ii],axis=ii,lower=True)/self.spacing[ii]
for ii in range(3)]
c = self.weights_trilinear((cx,cy,cz))
val = 0.0
for ii in range(0,2):
for jj in range(0,2):
for kk in range(0,2):
if c[ii,jj,kk]>self.eps_collapse:
val += c[ii,jj,kk]*self.data[i0+ii,j0+jj,k0+kk]
return val
def scipy_interpolate(self,pts,oob_fill=None):
from scipy.interpolate import RegularGridInterpolator
if oob_fill is None:
bounds_error = True
fill_value = None
else:
bounds_error = False
fill_value = oob_fill
rgi = RegularGridInterpolator(self.grid(),self.data,method='linear',
bounds_error=bounds_error,fill_value=fill_value)
return rgi(pts)
@staticmethod
def padding(origin,spacing,dim,padding,numpad):
if isinstance(numpad,int):
numpad = np.fill(3,numpad,dtype=int)
else:
numpad = np.array(numpad,dtype=int)
assert len(numpad)==3, "'numpad' must be either an integer or tuple/list of length 3."
origin_pad = np.array(origin)
dim_pad = np.array(dim)
sl_out = [slice(None),slice(None),slice(None)]
if padding is not None:
if padding=='before':
dim_pad += numpad
origin_pad -= numpad*spacing
for axis in range(3):
sl_out[axis] = slice(numpad[axis],None)
elif padding=='after':
dim_pad += numpad
for axis in range(3):
sl_out[axis] = slice(0,-numpad[axis])
elif padding=='both':
dim_pad += 2*numpad
origin_pad -= numpad*spacing
for axis in range(3):
sl_out[axis] = slice(numpad[axis],-numpad[axis])
else:
raise ValueError("'padding' must either be None or one of {'before','after','both'}.")
sl_out = tuple(sl_out)
origin_pad = tuple(origin_pad)
dim_pad = tuple(dim_pad)
return (origin_pad,dim_pad,sl_out)
def weights_trilinear(self,rel_dist):
assert len(rel_dist)==3, "len(rel_dist) must be 3."
cx,cy,cz = rel_dist
if cx<0.0 and cx>-self.eps_collapse: cx=0.0
if cy<0.0 and cy>-self.eps_collapse: cy=0.0
if cz<0.0 and cz>-self.eps_collapse: cz=0.0
if cx>1.0 and cx<1.0+self.eps_collapse: cx=1.0
if cy>1.0 and cy<1.0+self.eps_collapse: cy=1.0
if cz>1.0 and cz<1.0+self.eps_collapse: cz=1.0
assert cx>=0.0 and cy>=0.0 and cz>=0.0, "'rel_dist' must be >=0"
assert cx<=1.0 and cy<=1.0 and cz<=1.0, "'rel_dist' must be <=1"
c = np.zeros((2,2,2))
c[0,0,0] = 1.0-(cx+cy+cz)+(cx*cy+cx*cz+cy*cz)-(cx*cy*cz)
c[1,0,0] = cx-(cx*cy+cx*cz)+(cx*cy*cz)
c[0,1,0] = cy-(cx*cy+cy*cz)+(cx*cy*cz)
c[0,0,1] = cz-(cx*cz+cy*cz)+(cx*cy*cz)
c[1,1,0] = (cx*cy)-(cx*cy*cz)
c[1,0,1] = (cx*cz)-(cx*cy*cz)
c[0,1,1] = (cy*cz)-(cx*cy*cz)
c[1,1,1] = (cx*cy*cz)
return c
def set_writable(self,flag):
self.data.setflags(write=flag)
return
def to_vtk(self,deep=False):
import pyvista as pv
mesh = pv.UniformGrid()
mesh.dimensions = self.dim(axis=None)
mesh.origin = self.origin
mesh.spacing = self.spacing
# order needs to be F no matter how array is stored in memory
if deep:
mesh.point_arrays['data'] = self.data.flatten(order='F')
else:
if not self.data.flags['F_CONTIGUOUS']:
self.data = np.asfortranarray(self.data)
mesh.point_arrays['data'] = self.data.ravel(order='F')
return mesh
def vtk_contour(self,val,deep=False,method='contour'):
if not isinstance(val,(tuple,list)):
val = [val]
return self.to_vtk(deep=deep).contour(val,method=method)
def vtk_slice(self,normal,origin,deep=False):
assert (normal in ('x','y','z') or (isinstance(normal,(tuple,list))
and len(normal)==3)), "'normal' must be 'x','y','z' or tuple of length 3."
assert isinstance(origin,(tuple,list)) and len(origin)==3,\
"'origin' must be tuple of length 3."
return self.to_vtk(deep=deep).slice(normal=normal,origin=origin)
def gaussian_filter_umean_channel(array,spacing,sigma,truncate=4.0):
'''Applies a Gaussian filter to a numpy array of dimension (1,ny,1) which
contains the mean streamwise velocity of the channel (or possibly sth else).
Since yu[0] = c and yu[-1] = d, we can use scipy's mirror settings and don't
need ghost cells.'''
from scipy import ndimage
assert array.ndim==3, "Expected an array with three dimensions/axes."
assert array.shape[0]==1 and array.shape[1]>1 and array.shape[2]==1,\
"Expected an array with shape (1,ny,1)."
sigma_img = sigma/spacing
array = ndimage.gaussian_filter1d(array,sigma_img,axis=1,truncate=truncate,mode='mirror')
return array
class Features3d:
def __init__(self,input,threshold,origin,spacing,periodicity,
invert=False,has_ghost=False,keep_input=False,
contour_method='flying_edges',
report=False):
assert len(origin)==3, "'origin' must be of length 3"
assert len(spacing)==3, "'spacing' must be of length 3"
assert len(periodicity)==3, "'periodicity' must be of length 3"
assert isinstance(input,np.ndarray), "'input' must be numpy.ndarray."
# Assign basic properties to class variables
self.spacing = np.array(spacing,dtype=np.float)
self.origin = np.array(origin,dtype=np.float)-self.spacing
self.periodicity = tuple(bool(x) for x in periodicity)
# If regions are supposed to be inverted, i.e. the interior consists of values
# smaller than the threshold instead of larger, change the sign of the array.
sign_invert = -1 if invert else +1
self._threshold = sign_invert*threshold
# '_input' is the array which is to be triangulated. Here one trailing ghost cell is
# required to get closed surfaces. The data will always be copied since it probably
# needs to be copied for vtk anyway (needs FORTRAN contiguous array) and this way
# there will never be side effects on the input data. The array is padded with
# a very high value (cannot use NaN or Inf for vtk) in order to close the contour at
# the boundaries.
if has_ghost:
self.dimensions = input.shape+np.array((2,2,2))
self._input = np.full(self.dimensions,-1e30,dtype=input.dtype,order='F')
self._input[1:-1,1:-1,1:-1] = sign_invert*input
else:
pw = tuple((0,1) if x else (0,0) for x in periodicity)
self.dimensions = input.shape+np.array((2,2,2))+np.array([sum(x) for x in pw])
self._input = np.full(self.dimensions,-1e30,dtype=input.dtype,order='F')
self._input[1:-1,1:-1,1:-1] = np.pad(sign_invert*input,pw,mode='wrap')
# Triangulate
self.triangulate(contour_method=contour_method,
report=report)
# Set some state variable
self._kdtree = None
self._kdaxis = None
self._kdradius = None
# Drop input if requested: this may free memory in case it was copied/temporary.
if not keep_input: self._input = None
return
@classmethod
def from_field(cls,fld,threshold,periodicity,invert=False,has_ghost=False,
keep_input=False,contour_method='flying_edges',
correct_normals=True,report=False):
return cls(fld.data,threshold,fld.origin,fld.spacing,periodicity,
invert=invert,has_ghost=has_ghost,keep_input=keep_input,
contour_method=contour_method,
report=report)
def triangulate(self,contour_method='flying_edges',report=False):
import pyvista as pv
import vtk
from scipy import ndimage, spatial
from scipy.spatial import KDTree
from numba import jit
from time import time
# Check if '_input' is available: might have been dropped after initialization
assert self._input is not None, "'_input' not available. Initialize object with keep_input=True flag."
# Wrap data for VTK using pyvista
datavtk = pv.UniformGrid()
datavtk.dimensions = self.dimensions
datavtk.origin = self.origin
datavtk.spacing = self.spacing
datavtk.point_arrays['data'] = self._input.ravel('F')
# Compute full contour: due to the padding, the contour will result in closed object.
# This is necessary for proper inside/outside checks and volume computation. However,
# for surface area computation, the boundary faces have to be removed. Therefore, we
# will isolate them and store them in an extra array.
if report: print('[Features3d.triangulate] computing isocontour using {}...'.format(contour_method))
contour = datavtk.contour([self._threshold],method=contour_method,compute_scalars=False,compute_gradients=True)
assert contour.is_all_triangles(), "Contouring produced non-triangle cells."
points = contour.points
faces = contour.faces.reshape(-1,4)
gradients = contour.point_arrays['Gradients'][faces[:,1],:]
# Python for loops are horribly slow, but for loops are the easiest way to perform
# some of the necessary check. Therefore let's define some functions to be jit compiled
# by numba to speed things up.
@jit(nopython=True)
def __get_boundary_faces(faces,points,bounds,tolerance):
assert faces.shape[1]==4
assert points.shape[1]==3
assert len(bounds)==6
assert len(tolerance)==3
nfaces = faces.shape[0]
output = np.full((nfaces,),-1,dtype=np.int8)
for ii in range(nfaces):
for jj in range(3):
if (points[faces[ii,1],jj]<bounds[2*jj]+tolerance[jj] and
points[faces[ii,2],jj]<bounds[2*jj]+tolerance[jj] and
points[faces[ii,3],jj]<bounds[2*jj]+tolerance[jj]):
output[ii] = 2*jj
elif (points[faces[ii,1],jj]>bounds[2*jj+1]-tolerance[jj] and
points[faces[ii,2],jj]>bounds[2*jj+1]-tolerance[jj] and
points[faces[ii,3],jj]>bounds[2*jj+1]-tolerance[jj]):
output[ii] = 2*jj+1
return output
@jit(nopython=True)
def __connect_faces_periodic(face_uni_lo,face_uni_hi,points,dist,idx,search_dist):
N = len(face_uni_hi)
output = np.zeros((N,4),dtype=face_uni_lo.dtype)
jj = 0
for ii in range(N):
if dist[ii]<search_dist:
output[jj,:] = (3,face_uni_hi[ii],face_uni_lo[idx[ii]],face_uni_hi[ii])
jj += 1
return output[:jj,:]
# In order to treat periodicity when labeling the various regions, we connect the
# overlapping regions with degenerate triangles. Then vtkPolyDataConnectivityFilter
# returns the correct result right away, and we will just ignore the trailing
# virtual faces afterwards.
tol_overlap = 1e-5*self.spacing
bd_face_tag = __get_boundary_faces(faces,points,tuple(contour.bounds),tol_overlap)
faces_connect = [faces]
for axis in range(3):
if not self.periodicity[axis]: continue
sl_pln = [0,1,2]
del sl_pln[axis]
face_uni_lo = np.unique(faces[bd_face_tag==2*axis,1:].ravel())
face_uni_hi = np.unique(faces[bd_face_tag==2*axis+1,1:].ravel())
kdt = KDTree(points[np.ix_(face_uni_lo,sl_pln)],leafsize=10,compact_nodes=False,
copy_data=False,balanced_tree=False)
dist,idx = kdt.query(points[np.ix_(face_uni_hi,sl_pln)],k=1,
distance_upper_bound=tol_overlap[axis])
faces_connect += [__connect_faces_periodic(face_uni_lo,face_uni_hi,points,
dist,idx,tol_overlap[axis])]
# Save original number of faces and add the virtual connecting faces to the polydata.
nfaces = faces.shape[0]
contour.faces = np.concatenate(faces_connect,axis=0)
# Compute the connectivity using pure VTK (pyvista only supports vtkConnectivityFilter
# which takes around twice as long).
alg = vtk.vtkPolyDataConnectivityFilter()
alg.SetInputData(contour)
alg.SetExtractionModeToAllRegions()
alg.SetColorRegions(True)
alg.Update()
contour = pv.filters._get_output(alg)
# RegionIds are now stored as point data. To efficiently convert it to cell data, the first
# point of each cell determines the value for this cell.
regionid = contour.point_arrays['RegionId'][contour.faces.reshape(-1,4)[:nfaces,1]].ravel()
nfeatures = np.amax(regionid)+1
# Store the face/vertex data in class arrays. Sort the faces by RegionId for quick access
# later on.
if report: print('[Features3d.triangulate] sorting faces by labels...')
self._points = np.array(points)
self._nfeatures = nfeatures
idx = np.flatnonzero(bd_face_tag<0)
idxbd = np.flatnonzero(bd_face_tag>=0)
self._offset = np.zeros((2*nfeatures+1,),dtype=np.int64)
self._offset[1:nfeatures+1] = np.cumsum(np.bincount(regionid[idx],minlength=nfeatures))
self._offset[nfeatures+1:] = np.cumsum(np.bincount(regionid[idxbd],minlength=nfeatures))
self._offset[nfeatures+1:] += self._offset[nfeatures]
self._faces = np.concatenate((
faces[idx,:][np.argsort(regionid[idx]),:],
faces[idxbd,:][np.argsort(regionid[idxbd]),:]),axis=0)
gradients = np.concatenate((
gradients[idx,:][np.argsort(regionid[idx]),:],
gradients[idxbd,:][np.argsort(regionid[idxbd]),:]),axis=0)
# Compute the volume and area per cell.
if report: print('[Features3d.triangulate] calculating surface area and volume...')
A = self._points[self._faces[:,1],:]
B = self._points[self._faces[:,2],:]
C = self._points[self._faces[:,3],:]
cn = np.cross(B-A,C-A)
# Check if cell normal points in direction of gradient. If not, switch vertex order.
idx = (gradients*cn).sum(axis=-1)>0
self._faces[np.ix_(idx,[2,3])] = self._faces[np.ix_(idx,[3,2])]
cn[idx,:] = -cn[idx,:]
# Compute area and signed volume per cell
cc = (A+B+C)/3
cell_areas = 0.5*np.sqrt(np.square(cn).sum(axis=1))
cell_volumes = 0.5*np.mean(cn*cc,axis=1)
@jit(nopython=True)
def __sum_up_cell_data(cell_data,offset,incl_boundary):
nfeat = (len(offset)-1)//2
niter = 2*nfeat if incl_boundary else nfeat
output = np.zeros(nfeat,dtype=cell_data.dtype)
for ifeat in range(niter):
for ii in range(offset[ifeat],offset[ifeat+1]):
output[ifeat%nfeat] += cell_data[ii]
return output
self._areas = __sum_up_cell_data(cell_areas, self._offset,False)
self._volumes = __sum_up_cell_data(cell_volumes,self._offset,True )
return
@property
def threshold(self): return self._threshold
@property
def nfeatures(self): return self._nfeatures
@property
def areas(self): return self._areas
@property
def volumes(self): return self._volumes
def reduce_noise(self,threshold=1e-5,is_relative=True,report=False):
'''Discards all objects with smaller volume than a threshold.'''
if is_relative:
vol_domain = np.prod(self.spacing*self.dimensions)
threshold = threshold*vol_domain
features = np.flatnonzero(np.abs(self._volumes)<threshold)
if len(features)>0:
self.discard_features(features,report=report)
return
def discard_features(self,features,clean_points=False,report=False):
'''Removes features from triangulated data.'''
features = self.list_of_features(features)
# Get index ranges which are to be deleted and also create an array
# which determines the size of the block to be deleted
print('Discarding',features)
idx = []
gapsize = np.zeros((self._nfeatures,),dtype=np.int)
for feature in features:
rng_ = np.arange(self._offset[feature],self._offset[feature+1])
idx.append(rng_)
gapsize[feature] = len(rng_)
idx = np.concatenate(idx,axis=0)
# Save former number of faces for report
nfaces = self._faces.shape[0]
# Delete indexed elements from arrays
self._faces = np.delete(self._faces,idx,axis=0)
self._areas = np.delete(self._areas,features,axis=0)
self._volumes = np.delete(self._volumes,features,axis=0)
# Correct offset
self._offset[1:] = self._offset[1:]-np.cumsum(gapsize)
self._offset = np.delete(self._offset,features,axis=0)
self._nfeatures = self._offset.shape[0]-1
if report:
print('[Features3d.discard_features]',end=' ')
print('discarded {:} features with {:} faces.'.format(
len(features),nfaces-self._faces.shape[0]))
if clean_points:
self.clean_points(report=report)
return
def clean_points(self,report=False):
'''Removes points which are not referenced by any face.'''
nfaces = self._faces.shape[0]
ind,inv = np.unique(self._faces[:,1:],return_inverse=True)
if report:
print('[Features3d.clean_points]',end=' ')
print('removed {:} orphan points.'.format(self._points.shape[0]-len(ind)))
self._faces[:,1:4] = inv.reshape(nfaces,3)
self._points = self._points[ind,:]
return
def area(self,feature):
'''Returns the surface area of feature. If feature is None, total surface
area of all features is returned.'''
if feature is None:
return np.sum(self.areas)
else:
return self.areas[feature]
def volume(self,feature):
'''Returns volume enclosed by feature. If feature isNone, total volume of
all features is returned.'''
if feature is None:
return np.sum(self.volumes)
else:
return self.volumes[feature]
def build_kdtree(self,kdaxis=0,leafsize=100,compact_nodes=False,balanced_tree=False,report=False):
'''Builds a KD-tree for feature search.'''
from scipy.spatial import KDTree
from numba import jit
@jit(nopython=True,cache=True)
def __get_center_and_search_radius(points,faces,kdaxis):
nfaces = faces.shape[0]
center = np.zeros((nfaces,2),dtype=points.dtype)
radius = 0.0
if kdaxis==0: i1,i2 = 1,2
elif kdaxis==1: i1,i2 = 0,2
elif kdaxis==2: i1,i2 = 0,1
for iface in range(nfaces):
min1 = min(min(points[faces[iface,1],i1],points[faces[iface,2],i1]),points[faces[iface,3],i1])
max1 = max(max(points[faces[iface,1],i1],points[faces[iface,2],i1]),points[faces[iface,3],i1])
min2 = min(min(points[faces[iface,1],i2],points[faces[iface,2],i2]),points[faces[iface,3],i2])
max2 = max(max(points[faces[iface,1],i2],points[faces[iface,2],i2]),points[faces[iface,3],i2])
center[iface,0] = 0.5*(max1+min1)
center[iface,1] = 0.5*(max2+min2)
radius = max(radius,(max1-min1)**2+(max2-min2)**2)
radius = 0.5*np.sqrt(radius)
return center,radius
center,radius = __get_center_and_search_radius(self._points,self._faces,kdaxis)
self._kdtree = KDTree(center,leafsize=leafsize,compact_nodes=compact_nodes,
copy_data=False,balanced_tree=balanced_tree)
self._kdaxis = kdaxis
self._kdradius = radius
if report:
print('[Features3d.build_kdtree]',end=' ')
print('KD-Tree built for axis {}.'.format(kdaxis))
def inside_feature(self,coords,report=True):
'''Find nearest triangle from point R in direction ±dR and its orientation w.r.t dR.
The algorithm determines the closest intersection of a ray cast from the origin of the
point in the specified direction (with positiv and negative sign). The ray is supposed to
be sent in a direction perpendicular to any boundary. Periodicity does not matter here.
The ray-triangle intersection is implemented by the MöllerTrumbore algorithm (see method
'ray_triangle_intersection()' for details). Whether a hit occured from the interior
or the exterior is determined by the direction of the surface normal of the triangle
and the direction of the ray.
This method is very similar to 'ray_triangle_intersection()', but takes advantage of
the fact that only the nearest intersection is to be returned.
Input:
R, dR: the point to be checked and the direction of the ray as (3,) numpy arrays
A,B,C: vertices of N triangles as (N,3) numpy arrays
Returns:
is_inside: is point inside? [bool]
t: parameter to determine intersection point (x = R+t*dR) [float]
hit_dir: direction from which the triangle was hit, from inward/outward = +1,-1 [int]
'''
from numba import jit
#
coords = np.array(coords)
assert coords.ndim==2 and coords.shape[1]==3, "'coords' need to be provided as Nx3 array."
#
if not self._kdtree:
self.build_kdtree()
if self._kdaxis==0: query_axis = [1,2]
elif self._kdaxis==1: query_axis = [0,2]
elif self._kdaxis==2: query_axis = [0,1]
#
candidates = self._kdtree.query_ball_point(coords[:,query_axis],self._kdradius).tolist()
cand_num = np.asarray(tuple(map(len,candidates)))
cand_arr = np.concatenate(candidates).astype(np.int)
#
raydir = np.zeros((3,),dtype=self._points.dtype)
raydir[self._kdaxis] = 1.0
#
@jit(nopython=True,error_model='numpy',cache=True)
def __ray_triangle_intersect(R,dR,A,B,C):
'''Implements the MöllerTrumbore ray-triangle intersection algorithm. Taken from
https://stackoverflow.com/questions/42740765/intersection-between-line-and-triangle-in-3d'''
E1 = B-A
E2 = C-A
N = np.cross(E1,E2)
norm = np.sqrt(np.sum(N*N))
det = -np.sum(dR*N)
invdet = 1.0/det
AO = R-A
DAO = np.cross(AO,dR)
u = np.sum(E2*DAO)*invdet
v = -np.sum(E1*DAO)*invdet
t = np.sum(AO*N) *invdet
return (np.abs(det)>=1e-7*norm and t>=0.0 and u>=0 and v>=0 and (u+v)-1.0<=0), t
@jit(nopython=True,cache=True)
def __get_nearest_face(coords,raydir,cand_arr,cand_num,faces,points):
N = coords.shape[0]
nearface = np.zeros(N,dtype=np.int64)
isinside = np.zeros(N,dtype=np.bool_)
jj = 0
for ii in range(N):
t = np.inf
f = -1
n = 0
for kk in range(cand_num[ii]):
idx = cand_arr[jj]
hit_,t_ = __ray_triangle_intersect(coords[ii],raydir,
points[faces[idx,1],:],
points[faces[idx,2],:],
points[faces[idx,3],:])
if hit_:
n += 1
if t_<t:
t = t_
f = idx
jj += 1
nearface[ii] = f
isinside[ii] = (n%2)!=0
return nearface,isinside
nf,isinside = __get_nearest_face(coords,raydir,cand_arr,cand_num,self._faces,self._points)
output = self.feature_from_face(nf)
output[np.logical_not(isinside)] = -1
if report:
print('[Features3d.inside_feature]',end=' ')
print('{} of {} points are located inside of features.'.format(sum(isinside),coords.shape[0]))
return output
def feature_from_face(self,idx_cell):
'''Gets feature ID for a given cell.'''
# This is an optimized solution for monotonically increasing arrays:
# as long as offset is smaller than the cell index, the boolean operation
# returns False. As soon as the first True is returned, i.e.
# as soon as idx_cell is equal or larger than offset, the argmax function
# short circuits and returns the index of the first occurence without
# checking any value afterwards.
return (np.searchsorted(self._offset,idx_cell,side='right')-1)%self._nfeatures
def faces_from_feature(self,features,incl_boundary=False):
'''Returns indices of cells which belong to given features.'''
features = self.list_of_features(features)
faces = []
for feature in features:
idx = np.arange(self._offset[feature],self._offset[feature+1])
faces.append(self._faces[idx,:])
if incl_boundary:
for feature in features:
idx = np.arange(self._offset[self._nfeatures+feature],
self._offset[self._nfeatures+feature+1])
faces.append(self._faces[idx,:])
return np.concatenate(faces,axis=0)
def regionid_from_feature(self,features,incl_boundary=False):
'''Returns regionid cell array to be added to PolyData.'''
features = self.list_of_features(features)
regionid = []
for feature in features:
nfaces = self._offset[feature+1]-self._offset[feature]
regionid.append(np.full(nfaces,feature,dtype=np.int64))
if incl_boundary:
for feature in features:
nfaces = (self._offset[self._nfeatures+feature+1]-
self._offset[self._nfeatures+feature])
regionid.append(np.full(nfaces,feature,dtype=np.int64))
return np.concatenate(regionid,axis=0)
def nfaces_of_feature(self,features,incl_boundary=False):
'''Returns number of cells which belong to given features. Can be used
to construct new offsets.'''
features = self.list_of_features(features)
nfaces = []
for feature in features:
nfaces.append(self._offset[feature+1]-self._offset[feature])
if incl_boundary:
for feature in features:
nfaces.append(self._offset[self._nfeatures+feature+1]-
self._offset[self._nfeatures+feature])
return np.concatenate(nfaces,axis=0)
def list_of_features(self,features):
'''Ensures that 'features' is a list.'''
from collections.abc import Iterable
if features is None:
return np.arange(0,self._nfeatures)
if not isinstance(features,Iterable):
return [features]
else:
return list(features)
@staticmethod
def cmap_features(nfeatures,name='tab10'):
from matplotlib.colors import ListedColormap
if name=='tab10': # seaborn/matplotlib tab10
cmap = [(0.12156862745098039, 0.4666666666666667, 0.7058823529411765),
(1.0, 0.4980392156862745, 0.054901960784313725),
(0.17254901960784313, 0.6274509803921569, 0.17254901960784313),
(0.8392156862745098, 0.15294117647058825, 0.1568627450980392),
(0.5803921568627451, 0.403921568627451, 0.7411764705882353),
(0.5490196078431373, 0.33725490196078434, 0.29411764705882354),
(0.8901960784313725, 0.4666666666666667, 0.7607843137254902),
(0.4980392156862745, 0.4980392156862745, 0.4980392156862745),
(0.7372549019607844, 0.7411764705882353, 0.13333333333333333),
(0.09019607843137255, 0.7450980392156863, 0.8117647058823529)]
else:
raise NotImplementedError('Colormap not implemented.')
ncolor = len(cmap)
icolor = 0
output = np.empty((nfeatures,3))
for ii in range(nfeatures):
output[ii] = cmap[icolor]
icolor = (icolor+1)%ncolor
return ListedColormap(output)
def to_vtk(self,features,incl_boundary=False,incl_regionid=False,remap_regionid=True):
import pyvista as pv
faces = self.faces_from_feature(features,incl_boundary=incl_boundary)
output = pv.PolyData(self._points,faces)
if incl_regionid:
output.cell_arrays['RegionId'] = self.regionid_from_feature(features,incl_boundary=incl_boundary)
if remap_regionid:
output.cell_arrays['RegionId'] = np.unique(output.cell_arrays['RegionId'],return_inverse=True)[1]
return output
# def to_vtk2(self,features):
# import pyvista as pv
# idx = self.faces_from_feature(features)
# faces_ = self._faces[idx,:]
# A = self._points[faces_[:,1],:]
# B = self._points[faces_[:,2],:]
# C = self._points[faces_[:,3],:]
# cn = np.cross(B-A,C-A)
# cn = cn/np.sqrt(np.square(cn).sum(axis=-1,keepdims=True))
# cc = (A+B+C)/3
# # print(A[0,:],B[0,:],C[0,:],cn[0,:])
# # print(A.shape,B.shape,C.shape,cn.shape,faces_.shape,self._faces.shape)
# output = pv.PolyData(cc)
# output['cellnormal'] = cn
# return output
# def vtk_faces(self,face_idx):
# import pyvista as pv
# return pv.PolyData(self._points,self._faces[face_idx,:])
class BinaryFieldNd:
def __init__(self,input,periodicity,connect_diagonals=False,deep=False,has_ghost=False):
assert isinstance(input,np.ndarray) and input.dtype==bool,\
"'input' must be a numpy array of type bool."
assert all([isinstance(x,(bool,int)) for x in periodicity]),\
"'periodicity' requires bool values."
assert len(periodicity)==input.ndim,\
"Number of entries in 'periodicity' must match dimension of binary field."
if has_ghost and deep:
self._data = input.copy()
elif has_ghost:
self._data = input
else:
pw = tuple((0,1) if x else (0,0) for x in periodicity)
self._data = np.pad(input,pw,mode='wrap')
self._sldata = tuple(slice(0,-1) if x else None for x in periodicity)
self._dim = self._data.shape
self._ndim = self._data.ndim
self._labels = None
self.nlabels = None
self.wrap = tuple(self._ndim*[None])
self.periodicity = tuple(bool(x) for x in periodicity)
self.connect_diagonals = connect_diagonals
@property
def data(self):
return self._data[self._sldata]
@property
def labels(self):
if self._labels is None:
return None
return self._labels[self._sldata]
def label(self,use_cc3d=False):
'''Labels connected regions in binary fields.'''
if use_cc3d:
import cc3d
if self._ndim==2: connectivity = 8 if self.connect_diagonals else 4
elif self._ndim==3: connectivity = 18 if self.connect_diagonals else 6
else: raise RuntimeError("'use_cc3d' can only be used with 2D or 3D data.")
else:
from scipy import ndimage
if self.connect_diagonals: structure = ndimage.generate_binary_structure(self._ndim,self._ndim)
else: structure = ndimage.generate_binary_structure(self._ndim,1)
#
if use_cc3d:
self._labels,self.nlabels = cc3d.connected_components(self._data,connectivity=connectivity,return_N=True)
else:
self._labels,self.nlabels = ndimage.label(self._data,structure=structure)
if not any(self.periodicity):
return
# Get a mapping of labels which differ at periodic overlap
map_ = np.array(range(self.nlabels+1),dtype=self._labels.dtype)
wrap_ = self._ndim*[None]
for axis in range(self._ndim):
if not self.periodicity[axis]: continue
sl_lo = tuple(slice(0,1) if ii==axis else slice(None) for ii in range(self._ndim))
sl_hi = tuple(slice(-1,None) if ii==axis else slice(None) for ii in range(self._ndim))
sl_pre = tuple(slice(-2,-1) if ii==axis else slice(None) for ii in range(self._ndim))
lab_lo = self._labels[sl_lo]
lab_hi = self._labels[sl_hi]
lab_pre = np.unique(self._labels[sl_pre]) # all labels in last (unwrapped) slice
# Initialize array to keep track of wrapping
wrap_[axis] = np.zeros(self.nlabels+1,dtype=bool)
# Determine new label and map
lab_new = np.minimum(lab_lo,lab_hi)
for lab_ in [lab_lo,lab_hi]:
li = (lab_!=lab_new) #if not map_to_zero else (lab_!=0)
lab_li = lab_[li]
lab_new_li = lab_new[li]
for idx_ in np.unique(lab_li,return_index=True)[1]:
source_ = lab_li[idx_] # the label to be changed
target_ = lab_new_li[idx_] # the label which will be newly assigned
while target_ != map_[target_]: # map it recursively
target_ = map_[target_]
map_[source_] = target_
if source_ in lab_pre: # check if source is not a ghost
wrap_[axis][target_] = True
# Remove gaps from target mapping
idx_,map_ = np.unique(map_,return_index=True,return_inverse=True)[1:3]
# Relabel
self._labels = map_[self._labels]
self.nlabels = np.max(map_)
self.wrap = tuple(None if x is None else x[idx_] for x in wrap_)
return
def fill_holes(self,keep_wall_attached=True,use_cc3d=False,return_mask=False):
'''Fill the holes in binary objects while taking into account periodicity.
In the non-periodic sense, a hole is a region of zeros which does not connect
to a boundary. When keep_wall_attached==False, only regions are kept which
fully connect from top to bottom wall.
In the periodic sense, a hole is a region of zeros which is not
connected to itself accross the opposite periodic boundary.'''
if return_mask: mask = self._data.copy()
np.logical_not(self._data,self._data)
if use_cc3d:
import cc3d
if self._ndim==2: connectivity = 8 if self.connect_diagonals else 4
elif self._ndim==3: connectivity = 18 if self.connect_diagonals else 6
else: raise RuntimeError("'use_cc3d' can only be used with 2D or 3D data.")
labels_,nlabels_ = cc3d.connected_components(self._data,connectivity=connectivity,return_N=True)
else:
from scipy import ndimage
if self.connect_diagonals: structure = ndimage.generate_binary_structure(self._ndim,self._ndim)
else: structure = ndimage.generate_binary_structure(self._ndim,1)
labels_,nlabels_ = ndimage.label(self._data,structure=structure)
labels_keep = set()
for axis in range(3):
sl_lo = tuple(slice(None) if ii!=axis else 0 for ii in range(3))
sl_hi = tuple(slice(None) if ii!=axis else -1 for ii in range(3))
if self.periodicity[axis]:
labels_keep |= set(np.unique(labels_[sl_lo])) & set(np.unique(labels_[sl_hi]))
elif keep_wall_attached:
labels_keep |= set(np.unique(labels_[sl_lo])) | set(np.unique(labels_[sl_hi]))
labels_keep.discard(0)
self._data = np.isin(labels_,tuple(labels_keep),invert=True)
# If labels have been computed already, set them to None because they are now invalid
if self._labels is not None:
self._labels = None
self.nlabels = None
self.wrap = tuple(self._ndim*[None])
# Compute mask if it is to be returned, otherwise we are done
if return_mask:
np.logical_xor(mask,self._data,out=mask)
return mask
return
def probe(self,idx,probe_label=False):
'''Returns whether or not a point at idx is True or False.'''
if probe_label:
return self._labels[tuple(idx)]
else:
return self._data[tuple(idx)]
def volume(self,label):
'''Returns the sum of True values.'''
if label is None:
return np.sum(self.data)
else:
if self._labels is None: self.label()
return np.sum(self.labels==label)
def volumes(self):
'''Returns volume of all features, i.e. connected regions which have been
labeled using the label() method including the volume of the background region.
The array is sorted by labels, i.e. vol[0] is the volume of the background
region, vol[1] the volume of label 1, etc. If 'label' is an integer value,
only the volume of the corresponding region is returned.
Note: it is more efficient to retrieve all volumes at once than querying
single labels.'''
if self._labels is None: self.label()
return np.bincount(self.labels.ravel())
def volume_domain(self):
'''Returns volume of entire domain. Should be equal to sum(volume_feature()).'''
dim = tuple(self._dim[axis]-1 if self.periodicity[axis] else self._dim[axis] for axis in range(self._ndim))
return np.prod(dim)
def feature_labels_by_volume(self,descending=True,return_volumes=False):
'''Returns labels of connected regions sorted by volume.'''
vol = self.volumes()[1:]
labels = np.argsort(vol)+1
if descending:
labels = labels[::-1]
vol = vol[::-1]
if return_volumes:
return labels,vol
else:
return labels
def discard_features(self,selection,return_mask=False):
'''Removes features from data. Optionally returns a boolean array 'mask'
which marks position of features which have been removed.'''
if self._labels is None:
self.label()
selection = self._select_features(selection)
# Map tagged regions to zero in order to discard them
map_ = np.array(range(self.nlabels+1),dtype=self._labels.dtype)
map_[selection] = 0
# Remove gaps from target mapping
idx_,map_ = np.unique(map_,return_index=True,return_inverse=True)[1:3]
# Discard regions
self._labels = map_[self._labels]
self.nlabels = np.max(map_)
print(self.nlabels)
self.wrap = tuple(None if x is None else x[idx_] for x in self.wrap)
# for axis in range(self._ndim):
# if self.wrap[axis] is not None:
# self.wrap[axis] = self.wrap[axis][idx_]
if return_mask:
mask = np.logical_and(self._labels==0,self._data)
self._data = self._labels>0
if return_mask: return mask
def _select_features(self,selection):
dtype = self._labels.dtype
if selection is None:
return np.array(range(1,self.nlabels+1))
elif np.issubdtype(type(selection),np.integer):
return np.array(selection,dtype=dtype,ndmin=1)
elif isinstance(selection,(list,tuple,np.ndarray)):
selection = np.array(selection)
if selection.dtype==np.dtype('bool'):
assert selection.ndim==1 and selection.shape[0]==self.nlabels+1,\
"Boolean indexing must provide count+1 values."
else:
selection = selection.astype(dtype)
assert np.max(selection)<=self.nlabels and np.min(selection)>=0,\
"Entry in selection is out-of-bounds."
return selection
else:
raise ValueError('Invalid input. Accepting int,list,tuple,ndarray.')
class ChunkIterator:
'''Iterates through all chunks. 'snapshot' must be an instance
of a class which returns a Field3d from the method call
snapshot.field_chunk(rank,key,keep_ghost=keep_ghost).
One example implementation is UCFSnapshot from suspendtools.ucf.'''
def __init__(self,snapshot,key,keep_ghost=True):
self.snapshot = snapshot
self.key = key
self.keep_ghost = keep_ghost
self.iter_rank = 0
def __iter__(self):
self.iter_rank = 0
return self
def __next__(self):
if self.iter_rank<self.snapshot.nproc():
field = self.snapshot.field_chunk(
self.iter_rank,self.key,keep_ghost=self.keep_ghost)
self.iter_rank += 1
return field
else:
raise StopIteration