Python h5py 模块,Group() 实例源码
我们从Python开源项目中,提取了以下42个代码示例,用于说明如何使用h5py.Group()。
def dump(self, target):
"""Serializes MPArray to :code:`h5py.Group`. Recover using
:func:`~load`.
:param target: :code:`h5py.Group` the instance should be saved to or
path to h5 file (it's then serialized to /)
"""
if isinstance(target, str):
import h5py
with h5py.File(target, 'w') as outfile:
return self.dump(outfile)
for prop in ('ranks', 'shape'):
# these are only saved for convenience
target.attrs[prop] = str(getattr(self, prop))
# these are actually used in MPArray.load
target.attrs['len'] = len(self)
target.attrs['canonical_form'] = self.canonical_form
for site, lten in enumerate(self._lt):
target[str(site)] = lten
def group_sites(self, sites_per_group):
"""Group several MPA sites into one site.
The resulting MPA has length ``len(self) // sites_per_group`` and
``sites_per_group * self.ndims[i]`` physical legs on site ``i``. The
physical legs on each sites are in local form.
:param int sites_per_group: Number of sites to be grouped into one
:returns: An MPA with ``sites_per_group`` fewer sites and more ndims
"""
if (len(self) % sites_per_group) != 0:
raise ValueError('Cannot group: {} not a multiple of {}'
.format(len(self), sites_per_group))
if sites_per_group == 1:
return self
ltens = [_ltens_to_array(self._lt[i:i + sites_per_group])
for i in range(0, len(self), sites_per_group)]
return MPArray(ltens)
def __getitem__(self, key):
if type(key) is int:
raise NotImplementedError('Iteration not supported.')
with h5py.File(self.hdf5_dbase_root, 'r') as hf:
grp = hf[self.top_level_path]
if key not in grp:
raise IndexError('{} not found in {} filetype'.format(key, self.top_level_path))
ds = grp[key]
if isinstance(ds, h5py.Group):
data = DataIndexer(self.hdf5_dbase_root, '/'.join([self.top_level_path, key]))
else:
if ds.attrs['unit'] == 'SKIP' or ds.dtype == 'object':
data = np.array(ds, dtype=ds.dtype)
else:
data = u.Quantity(np.array(ds), ds.attrs['unit'], dtype=ds.dtype)
if '|S' in data.dtype.str:
data = data.astype(str)
return data
def save_hdf5(filename, obj, compression=4):
"""Saves an object to the file in HDF5 format.
This is a short-cut function to save only one object into an HDF5 file. If
you want to save multiple objects to one HDF5 file,use
:class:`HDF5Serializer` directly by passing appropriate :class:`h5py.Group`
objects.
Args:
filename (str): Target file name.
obj: Object to be serialized. It must support serialization protocol.
compression (int): Gzip compression level.
"""
_check_available()
with h5py.File(filename, 'w') as f:
s = HDF5Serializer(f, compression=compression)
s.save(obj)
def load_hdf5(filename, obj):
"""Loads an object from the file in HDF5 format.
This is a short-cut function to load from an HDF5 file that contains only
one object. If you want to load multiple objects from one HDF5 file,use
:class:`HDF5Deserializer` directly by passing appropriate
:class:`h5py.Group` objects.
Args:
filename (str): Name of the file to be loaded.
obj: Object to be deserialized. It must support serialization protocol.
"""
_check_available()
with h5py.File(filename, 'r') as f:
d = HDF5Deserializer(f)
d.load(obj)
def _visitfunc(self, name, node):
level = len(name.split('/'))
indent = ' '*4*(level-1)
#indent = '<span style="color:blue;">'.format(level*4)
localname = name.split('/')[-1]
#search_text = self.settings['search_text'].lower()
search_text = self.search_text
if search_text and (search_text in localname.lower()):
localname = """<span style="color: red;">{}</span>""".format(localname)
if isinstance(node, h5py.Group):
self.tree_str += indent +"|> <b>{}/</b><br/>".format(localname)
elif isinstance(node, h5py.Dataset):
self.tree_str += indent +"|D <b>{}</b>: {} {}<br/>".format(localname, node.shape, node.dtype)
for key, val in node.attrs.items():
if search_text:
if search_text in str(key).lower():
key = """<span style="color: red;">{}</span>""".format(key)
if search_text in str(val).lower():
val = """<span style="color: red;">{}</span>""".format(val)
self.tree_str += indent+" |- <i>{}</i> = {}<br/>".format(key, val)
def setup_openpmd_species_record( self, grp, quantity ) :
"""
Set the attributes that are specific to a species record
Parameter
---------
grp : an h5py.Group object or h5py.Dataset
The group that correspond to `quantity`
(in particular,its path must end with "/<quantity>")
quantity : string
The name of the record being setup
e.g. "position","momentum"
"""
# Generic setup
self.setup_openpmd_record( grp, quantity )
# weighting information
grp.attrs["macroWeighted"] = macro_weighted_dict[quantity]
grp.attrs["weightingPower"] = weighting_power_dict[quantity]
def h5py_dataset_iterator(self,g, prefix=''):
for key in g.keys():
item = g[key]
path = '{}/{}'.format(prefix, key)
keys = [i for i in item.keys()]
if isinstance(item[keys[0]], h5py.Dataset): # test for dataset
data = {'path':path}
for k in keys:
if not isinstance(item[k], h5py.Group):
dataset = np.array(item[k].value)
if type(dataset) is np.ndarray:
if dataset.size != 0:
if type(dataset[0]) is np.bytes_:
dataset = [a.decode('ascii') for a in dataset]
data.update({k:dataset})
yield data
else: # test for group (go down)
yield from self.h5py_dataset_iterator(item, path)
def get_data(self, path, prefix=''):
item = self.store[path]
path = '{}/{}'.format(prefix, path)
keys = [i for i in item.keys()]
data = {'path': path}
# print(path)
for k in keys:
if not isinstance(item[k], h5py.Group):
dataset = np.array(item[k].value)
if type(dataset) is np.ndarray:
if dataset.size != 0:
if type(dataset[0]) is np.bytes_:
dataset = [a.decode('ascii') for a in dataset]
data.update({k: dataset})
return data
def test_cache(self):
# create main test file
filename = self.getFileName("create_group_cache")
print("filename:", filename)
f = h5py.File(filename, 'w', use_cache=True)
self.assertTrue('/' in f)
r = f['/']
self.assertEqual(len(r), 0)
self.assertTrue(isinstance(r, h5py.Group))
self.assertTrue(r.name, '/')
self.assertEqual(len(r.attrs.keys()), 0)
self.assertFalse('g1' in r)
g1 = r.create_group('g1')
self.assertEqual(len(r), 1)
file = g1.file
f.close()
def __cal_firing_rate(self,data,channel,bin_size,overlap,pre_time,post_time):
bins_left = [pre_time]
while bins_left[-1] < post_time:
bins_left.append(bins_left[-1]+bin_size-overlap)
bins_left = np.array(bins_left)
bins_right = bins_left+bin_size
bins_mean = (bins_left+bins_right)/2.0
zero_offset = bins_mean[bins_mean>0][0]
bins_left = bins_left - zero_offset
bins_right = bins_right - zero_offset
bins_mean = bins_mean - zero_offset
bins_left = bins_left[bins_right<=post_time]
bins_mean = bins_mean[bins_right<=post_time]
bins_right = bins_right[bins_right<=post_time]
bins_mean = bins_mean[bins_left>=pre_time]
bins_right = bins_right[bins_left>=pre_time]
bins_left = bins_left[bins_left>=pre_time]
def cal_fr(ite_spike):
ite_fr = list()
for i in range(bins_left.shape[0]):
ite_fr_i = ite_spike[(ite_spike>=bins_left[i])&(ite_spike<bins_right[i])].shape[0]
ite_fr.append(ite_fr_i)
ite_fr = np.array(ite_fr)
ite_fr = ite_fr*1000.0/bin_size
return ite_fr
firing_rate = data[channel].apply(cal_fr)
return firing_rate, bins_mean
# Group data by experimental conditions and plot PSTH and raster of each condition
def reclaim_space(self,file_name):
'''
Args
file_name (string):
the name of the work space
Return
-
'''
f = hp.File(file_name,'r')
f2 = hp.File(file_name.split('.h5')[0]+'_reclaim.h5','w')
used_keys = list()
def valid_key(name):
if isinstance(f[name],hp.Group):
pass
else:
used_keys.append(name)
f.visit(valid_key)
for key in used_keys:
f2[key] = f[key].value
f.flush()
f2.flush()
f.close()
f2.close()
os.remove(file_name)
os.rename(file_name.split('.h5')[0]+'_reclaim.h5',file_name)
print('Space is reclaimed Now')
def __cal_firing_rate(self, bins_mean
# Group data by experimental conditions and plot PSTH and raster of each condition
def load(cls, source):
"""Deserializes MPArray from :code:`h5py.Group`. Serialize using
:func:`~dump`.
:param target: :code:`h5py.Group` containing serialized MPArray or
path to a single h5 File containing serialized MPArray under /
"""
if isinstance(source, str):
import h5py
with h5py.File(source, 'r') as infile:
return cls.load(infile)
ltens = [source[str(i)].value for i in range(source.attrs['len'])]
return cls(LocalTensors(ltens, cform=source.attrs['canonical_form']))
def _populate_data(self, ret_dict, name):
"""Read data recursively from an HDF5 value and add it to `ret_dict`.
If `obj` is a dataset,it is added to `ret_dict`. If `obj` is a group,
a sub-dictionary is created in `ret_dict` for `obj` and populated
recursively by calling this function on all of the items in the `obj`
group.
Parameters
----------
ret_dict : OrderedDict
Dictionary to which Metadata will be added.
obj : h5py.Dataset | h5py.Group
HDF5 value from which to read Metadata.
name : valid dictionary key
Dictionary key in `ret_dict` under which to store the data from
`obj`.
"""
if isinstance(obj, h5py.Dataset):
# [()] casts a Dataset as a numpy array
ret_dict[name] = obj[()]
else:
# create a dictionary for this group
ret_dict[name] = {}
for key, value in obj.items():
self._populate_data(ret_dict[name], value, key)
def to_hdf5(self, file_or_path):
"""
Write data to an HDF5 file.
Parameters
----------
file_or_path : str,`h5py.File`,`h5py.Group`
"""
import h5py
if isinstance(file_or_path, str):
f = h5py.File(file_or_path, 'w')
close = True
else:
f = file_or_path
close = False
d = f.create_dataset('mjd', data=self.t.tcb.mjd)
d.attrs['format'] = 'mjd'
d.attrs['scale'] = 'tcb'
d = f.create_dataset('rv', data=self.rv.value)
d.attrs['unit'] = str(self.rv.unit)
d = f.create_dataset('rv_err', data=self.stddev.value)
d.attrs['unit'] = str(self.stddev.unit)
if close:
f.close()
def from_hdf5(cls, file_or_path):
"""
Read data to an HDF5 file.
Parameters
----------
file_or_path : str, 'r')
close = True
else:
f = file_or_path
close = False
t = f['mjd']
rv = f['rv'][:] * u.Unit(f['rv'].attrs['unit'])
stddev = f['rv_err'][:] * u.Unit(f['rv_err'].attrs['unit'])
if close:
f.close()
return cls(t=t, rv=rv, stddev=stddev)
def Group(self):
if self._err:
raise self._err
if self._Group is None:
try:
from h5py import Group
except ImportError:
Group = NotAModule(self._name)
self._Group = Group
return self._Group
def __traverse_add(self, item, filename):
if isinstance(item, h5py.Dataset):
self.add_dataset(item, filename + item.name)
elif isinstance(item, h5py.Group):
for k in item:
self.__traverse_add(item[k], filename)
else:
print("Skipping " + item.name)
def _ls(item, recursive=False, groups=False, level=0):
keys = []
if isinstance(item, h5.Group):
if groups and level > 0:
keys.append(item.name)
if level == 0 or recursive:
for key in list(item.keys()):
keys.extend(_ls(item[key], recursive, groups, level + 1))
elif not groups:
keys.append(item.name)
return keys
def loadDataHDF5(data):
if isinstance(data,h5py.File) or isinstance(data,h5py.Group):
return {k:loadDataHDF5(v) for k,v in data.iteritems()}
elif isinstance(data,h5py.Dataset):
return data.value
else:
print 'unhandled datatype: %s' % type(data)
def _visitfunc(self, node):
level = len(name.split('/'))
indent = ' '*level
localname = name.split('/')[-1]
if isinstance(node, h5py.Group):
self.tree_str += indent +"|> {}\n".format(localname)
elif isinstance(node, h5py.Dataset):
self.tree_str += indent +"|D {}: {} {}\n".format(localname, val in node.attrs.items():
self.tree_str += indent+" |- {} = {}\n".format(key, val)
def _get_dset_array(self, dspath):
"""returns a pickle-safe array for the branch specified by dspath"""
branch = self._my_ds_from_path(dspath)
if isinstance(branch, h5py.Group):
return 'group'
else:
return (H5Array(branch), dict(branch.attrs))
def setup_openpmd_species_component( self, quantity ) :
"""
Set the attributes that are specific to a species component
Parameter
---------
grp : an h5py.Group object or h5py.Dataset
quantity : string
The name of the component
"""
self.setup_openpmd_component( grp )
def setup_openpmd_record( self, dset, quantity ) :
"""
Sets the attributes of a record,that comply with OpenPMD
Parameter
---------
dset : an h5py.Dataset or h5py.Group object
quantity : string
The name of the record considered
"""
dset.attrs["unitDimension"] = unit_dimension_dict[quantity]
# No time offset (approximation)
dset.attrs["timeOffset"] = 0.
def __getitem__(self, key):
h5py_item = self.h5py_group[key]
if isinstance(h5py_item, h5py.Group):
if 'h5sparse_format' in h5py_item.attrs:
# detect the sparse matrix
return Dataset(h5py_item)
else:
return Group(h5py_item)
elif isinstance(h5py_item, h5py.Dataset):
return h5py_item
else:
raise ValueError("Unexpected item type.")
def groups(self, path='/'):
"""Return the list of groups under a given node."""
return [key for key in self.children(path)
if isinstance(self._h5py_file[path + '/' + key],
h5py.Group)]
def _print_node_info(self, node):
"""Print node information."""
info = ('/' + name).ljust(50)
if isinstance(node, h5py.Group):
pass
elif isinstance(node, h5py.Dataset):
info += str(node.shape).ljust(20)
info += str(node.dtype).ljust(8)
print(info)
def __init__(self, filename, mode=0):
self.filename = filename
h5file = h5py.File(self.filename, 'r')
var_list = []
for var, g in h5file.items():
if not isinstance(g, h5py.Group): continue
uids = g.get('uids')[()].tolist()
var_list.append((var, uids))
super(FileInputProcessor, self).__init__(var_list, mode)
h5file.close()
def pre_run(self):
self.h5file = h5py.File(self.filename, 'r')
self.dsets = {}
for var, g in self.h5file.items():
if not isinstance(g, h5py.Group): continue
self.dsets[var] = g.get('data')
self.pointer = 0
self.end_of_file = False
def __getitem__(self, h5py.Group):
if 'h5sparse_format' in h5py_item.attrs:
# detect the sparse matrix
return SparseDataset(h5py_item)
else:
return Group(h5py_item)
elif isinstance(h5py_item, h5py.Dataset):
return h5py_item
else:
raise ValueError("Unexpected item type.")
def compare_hdf5(fh1, fh2, compare=None, compare_groups=True,
**kwargs):
"""
Compare all datasets between two hdf5 files.
"""
if compare is None:
compare = assert_array_equal
if not isinstance(fh1, h5py.Group):
fh1 = h5py.File(fh1, "r")
if not isinstance(fh2, h5py.Group):
fh2 = h5py.File(fh2, "r")
if compare_groups:
assert sorted(list(fh1.keys())) == sorted(list(fh2.keys())), \
"%s and %s have different datasets in group %s." % \
(fh1.file.filename, fh2.file.filename, fh1.name)
for key in fh1.keys():
if isinstance(fh1[key], h5py.Group):
compare_hdf5(fh1[key], fh2[key],
compare_groups=compare_groups,
compare=compare, **kwargs)
else:
err_msg = "%s field not equal for %s and %s" % \
(key, fh1.file.filename, fh2.file.filename)
if fh1[key].dtype == "int":
assert_array_equal(fh1[key].value, fh2[key].value,
err_msg=err_msg)
else:
compare(fh1[key].value,
err_msg=err_msg, **kwargs)
def arf2bark(arf_file, root_parent, timezone, verbose):
with arf.open_file(arf_file, 'r') as af:
# root
root_dirname = os.path.splitext(arf_file)[0]
root_path = os.path.join(os.path.abspath(root_parent), root_dirname)
os.mkdir(root_path)
root = bark.Root(root_path)
if verbose:
print('Created Root: ' + root_path)
tle = None
found_trigin = False
for ename, entry in af.items(): # entries and top-level datasets
if isinstance(entry, h5py.Group): # entries
entry_path = os.path.join(root_path, ename)
entry_attrs = copy_attrs(entry.attrs)
timestamp = entry_attrs.pop('timestamp')
if timezone:
timestamp = bark.convert_timestamp(timestamp, timezone)
else:
timestamp = bark.convert_timestamp(timestamp)
bark_entry = bark.create_entry(entry_path,
timestamp,
parents=False,
**entry_attrs)
if verbose:
print('Created Entry: ' + entry_path)
for ds_name, dataset in entry.items(): # entry-level datasets
if ds_name == 'trig_in': # accessing trig_in -> segfault
found_trigin = True # and skip the dataset
else:
transfer_dset(ds_name, dataset, entry_path, verbose)
elif isinstance(entry, h5py.Dataset): # top-level datasets
if tle is None:
path = os.path.join(root_path, 'top_level')
tle = bark.create_entry(path, 0, parents=False).path
transfer_dset(ename, entry, tle, verbose)
if found_trigin:
print('Warning: found datasets named "trig_in". Jill-created ' +
'"trig_in" datasets segfault when read,so these datasets' +
' were skipped. If you kNow the datasets are good,rename' +
' them and try again.')
return bark.Root(root_path)
def save_prior_samples(f, samples, rv_unit, ln_prior_probs=None):
"""
Save a dictionary of Astropy Quantity prior samples to
an HDF5 file in a format expected and used by
`thejoker.sampler.TheJoker`. The prior samples dictionary
must contain keys for:
- ``P``,period
- ``phi0``,phase at pericenter
- ``ecc``,eccentricity
- ``omega``,argument of periastron
- ``jitter``,veLocity jitter (optional)
Parameters
----------
f : str,:class:`h5py.File`,:class:`h5py.Group`,:class:`h5py.DataSet`
A string filename,or an instantiated `h5py` class.
samples : dict
A dictionary of prior samples as `~astropy.units.Quantity`
objects.
rv_unit : `~astropy.units.UnitBase`
The radial veLocity data unit.
Returns
-------
units : list
A list of `~astropy.units.UnitBase` objects specifying the
units for each column.
"""
packed_samples, units = pack_prior_samples(samples, rv_unit)
if isinstance(f, str):
import h5py
with h5py.File(f, 'a') as g:
g.attrs['units'] = np.array([str(x) for x in units]).astype('|S6')
g['samples'] = packed_samples
if ln_prior_probs is not None:
g['ln_prior_probs'] = ln_prior_probs
else:
f.attrs['units'] = np.array([str(x) for x in units]).astype('|S6')
f['samples'] = packed_samples
if ln_prior_probs is not None:
f['ln_prior_probs'] = ln_prior_probs
return units
def get_mat_test_Metadata():
test_f = h5py.File(test_mat_Metadata_file, 'w')
f = h5py.File(train_mat_Metadata_file)
refs, ds = f['#refs#'], f['digitStruct']
t_ds = test_f.create_group('digitStruct')
ref_dtype = h5py.special_dtype(ref=h5py.Reference)
t_refs = test_f.create_group('#refs#')
data_idx = 0
def create_t_real_data(ref):
nonlocal data_idx
real = refs[ref]
if isinstance(real, h5py.Group):
created_group = t_refs.create_group('data_%s' % data_idx)
data_idx += 1
attrs = 'label top left width height'.split()
for attr in attrs:
reshaped = real[attr].value.reshape(-1)
data_count = reshaped.shape[0]
if isinstance(reshaped[0], h5py.Reference):
t_real_attr = created_group.create_dataset(attr, shape=(data_count, 1), dtype=ref_dtype)
for i in range(data_count):
t_real_attr[i, 0] = create_t_real_data(reshaped[i])
else:
created_group.create_dataset(attr, data=real[attr].value)
data_idx += 1
return created_group.ref
else:
t_real = t_refs.create_dataset('data_%s' % data_idx, data=real.value)
data_idx += 1
return t_real.ref
def create_t_element(t_group, ref_group, data_count):
reshaped = ref_group[name].value.reshape(-1)
data_count = reshaped.shape[0] if data_count is None else data_count
created_dataset = t_group.create_dataset(name, (data_count, dtype=ref_dtype)
for i in range(data_count):
created_dataset[i, 0] = create_t_real_data(reshaped[i])
create_t_element(t_ds, 'name', ds, test_data_count)
create_t_element(t_ds, 'bBox', test_data_count)
test_f.close()
return test_mat_Metadata_file
def read_generic_hdf5(fname):
"""Reads hdf5 files according to their structure
In contrast to other file readers under :meth:`wradlib.io`,this function
will *not* return a two item tuple with (data,Metadata). Instead,this
function returns ONE dictionary that contains all the file contents - both
data and Metadata. The keys of the output dictionary conform to the
Group/Subgroup directory branches of the original file.
Parameters
----------
fname : string
a hdf5 file path
Returns
-------
output : dict
a dictionary that contains both data and Metadata according to the
original hdf5 file structure
Examples
--------
See :ref:`notebooks/fileio/wradlib_radar_formats.ipynb#Generic-HDF5`.
"""
f = h5py.File(fname, "r")
fcontent = {}
def filldict(x, y):
# create a new container
tmp = {}
# add attributes if present
if len(y.attrs) > 0:
tmp['attrs'] = dict(y.attrs)
# add data if it is a dataset
if isinstance(y, h5py.Dataset):
tmp['data'] = np.array(y)
# only add to the dictionary,if we have something meaningful to add
if tmp != {}:
fcontent[x] = tmp
f.visititems(filldict)
f.close()
return fcontent
def read_OPERA_hdf5(fname):
"""Reads hdf5 files according to OPERA conventions
Please refer to the OPERA data model documentation :cite:`OPERA-data-model`
in order to understand how an hdf5 file is organized that conforms to the
OPERA ODIM_H5 conventions.
In contrast to other file readers under :meth:`wradlib.io`,this
function returns ONE dictionary that contains all the file contents - both
data and Metadata. The keys of the output dictionary conform to the
Group/Subgroup directory branches of the original file.
If the end member of a branch (or path) is "data",then the corresponding
item of output dictionary is a numpy array with actual data.
Any other end member (either *how*,*where*,
and *what*) will contain the Meta information applying to the corresponding
level of the file hierarchy.
Parameters
----------
fname : string
a hdf5 file path
Returns
-------
output : dict
a dictionary that contains both data and Metadata according to the
original hdf5 file structure
"""
f = h5py.File(fname, "r")
# Now we browse through all Groups and Datasets and store the info in one
# dictionary
fcontent = {}
def filldict(x, y):
if isinstance(y, h5py.Group):
if len(y.attrs) > 0:
fcontent[x] = dict(y.attrs)
elif isinstance(y, h5py.Dataset):
fcontent[x] = np.array(y)
f.visititems(filldict)
f.close()
return fcontent
def setup_openpmd_species_group( self, species, constant_quantities ) :
"""
Set the attributes that are specific to the particle group
Parameter
---------
grp : an h5py.Group object
Contains all the species
species : a fbpic Particle object
constant_quantities: list of strings
The scalar quantities to be written for this particle
"""
# Generic attributes
grp.attrs["particleShape"] = 1.
grp.attrs["currentDeposition"] = np.string_("directMorseNielson")
grp.attrs["particleSmoothing"] = np.string_("none")
grp.attrs["particlePush"] = np.string_("Vay")
grp.attrs["particleInterpolation"] = np.string_("uniform")
# Setup constant datasets (e.g. charge,mass)
for quantity in constant_quantities:
grp.require_group( quantity )
self.setup_openpmd_species_record( grp[quantity], quantity )
self.setup_openpmd_species_component( grp[quantity], quantity )
grp[quantity].attrs["shape"] = np.array([1], dtype=np.uint64)
# Set the corresponding values
grp["mass"].attrs["value"] = species.m
if "charge" in constant_quantities:
grp["charge"].attrs["value"] = species.q
# Set the position records (required in openPMD)
quantity = "positionOffset"
grp.require_group(quantity)
self.setup_openpmd_species_record( grp[quantity], quantity )
for quantity in [ "positionOffset/x", "positionOffset/y",
"positionOffset/z"] :
grp.require_group(quantity)
self.setup_openpmd_species_component( grp[quantity], dtype=np.uint64)
# Set the corresponding values
grp["positionOffset/x"].attrs["value"] = 0.
grp["positionOffset/y"].attrs["value"] = 0.
grp["positionOffset/z"].attrs["value"] = 0.
def write_dataset( self, species_grp, quantity,
n_rank, Ntot, select_array ) :
"""
Write a given dataset
Parameters
----------
species_grp : an h5py.Group
The group where to write the species considered
species : a warp Species object
The species object to get the particle data from
path : string
The relative path where to write the dataset,
inside the species_grp
quantity : string
Describes which quantity is written
x,y,z,ux,uy,uz,w,id
n_rank : list of ints
A list containing the number of particles to send on each proc
Ntot : int
Contains the global number of particles
select_array : 1darray of bool
An array of the same shape as that particle array
containing True for the particles that satify all
the rules of self.select
"""
# Create the dataset and setup its attributes
if self.rank==0:
datashape = (Ntot, )
if quantity == "id":
dtype = 'uint64'
else:
dtype = 'f8'
# If the dataset already exists,remove it.
# (This avoids errors with diags from prevIoUs simulations,
# in case the number of particles is not exactly the same.)
if path in species_grp:
del species_grp[path]
dset = species_grp.create_dataset(path, datashape, dtype=dtype )
self.setup_openpmd_species_component( dset, quantity )
# Fill the dataset with the quantity
quantity_array = self.get_dataset( species, select_array,
n_rank, Ntot )
if self.rank==0:
dset[:] = quantity_array
def from_hdf5(self, handle, index):
""" Loads results object from HDF5.
Parameters
----------
handle : h5py.File or h5py.Group
An hdf5 file or group type to load from.
index : int
What step is this?
"""
# Grab handles
number_dset = handle["/number"]
eigenvalues_dset = handle["/eigenvalues"]
seeds_dset = handle["/seeds"]
time_dset = handle["/time"]
self.data = number_dset[index, :, :]
self.k = eigenvalues_dset[index, :]
self.seeds = seeds_dset[index, :]
self.time = time_dset[index, :]
# Reconstruct dictionaries
self.volume = OrderedDict()
self.mat_to_ind = OrderedDict()
self.nuc_to_ind = OrderedDict()
rxn_nuc_to_ind = OrderedDict()
rxn_to_ind = OrderedDict()
for mat in handle["/cells"]:
mat_handle = handle["/cells/" + mat]
vol = mat_handle.attrs["volume"]
ind = mat_handle.attrs["index"]
self.volume[mat] = vol
self.mat_to_ind[mat] = ind
for nuc in handle["/nuclides"]:
nuc_handle = handle["/nuclides/" + nuc]
ind_atom = nuc_handle.attrs["atom number index"]
self.nuc_to_ind[nuc] = ind_atom
if "reaction rate index" in nuc_handle.attrs:
rxn_nuc_to_ind[nuc] = nuc_handle.attrs["reaction rate index"]
for rxn in handle["/reactions"]:
rxn_handle = handle["/reactions/" + rxn]
rxn_to_ind[rxn] = rxn_handle.attrs["index"]
self.rates = []
# Reconstruct reactions
for i in range(self.n_stages):
rate = ReactionRates(self.mat_to_ind, rxn_nuc_to_ind, rxn_to_ind)
rate.rates = handle["/reaction rates"][index, i, :]
self.rates.append(rate)
def _triage_read(node):
h5py = _check_h5py()
type_str = node.attrs['TITLE']
if isinstance(type_str, bytes):
type_str = type_str.decode()
if isinstance(node, h5py.Group):
if type_str == 'dict':
data = dict()
for key, subnode in node.items():
data[key[4:]] = _triage_read(subnode)
elif type_str in ['list', 'tuple']:
data = list()
ii = 0
while True:
subnode = node.get('idx_{0}'.format(ii), None)
if subnode is None:
break
data.append(_triage_read(subnode))
ii += 1
assert len(data) == ii
data = tuple(data) if type_str == 'tuple' else data
return data
elif type_str == 'csc_matrix':
if sparse is None:
raise RuntimeError('scipy must be installed to read this data')
data = sparse.csc_matrix((_triage_read(node['data']),
_triage_read(node['indices']),
_triage_read(node['indptr'])))
else:
raise NotImplementedError('UnkNown group type: {0}'
''.format(type_str))
elif type_str == 'ndarray':
data = np.array(node)
elif type_str in ('int', 'float'):
cast = int if type_str == 'int' else float
data = cast(np.array(node)[0])
elif type_str in ('unicode', 'ascii', 'str'): # 'str' for backward compat
decoder = 'utf-8' if type_str == 'unicode' else 'ASCII'
cast = text_type if type_str == 'unicode' else str
data = cast(np.array(node).tostring().decode(decoder))
elif type_str == 'None':
data = None
else:
raise TypeError('UnkNown node type: {0}'.format(type_str))
return data
# ############################################################################
# UTILITIES
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。