import os
import time
import tracemalloc
import numpy as np
import h5py
[docs]def merge_h5_sets(path_h5_a, path_h5_b, path_h5_merged,
groups_to_merge=['events', 'testpulses', 'noise', 'controlpulses', 'stream'],
sets_to_merge=['event', 'mainpar', 'true_ph', 'true_onset', 'of_ph', 'sev_fit_par', 'sev_fit_rms',
'hours', 'labels', 'testpulseamplitude', 'time_s', 'time_mus', 'pulse_height',
'pca_error', 'pca_projection', 'tp_hours', 'tp_time_mus', 'tp_time_s', 'tpa',
'trigger_hours', 'trigger_time_mus', 'trigger_time_s'],
concatenate_axis=[1, 1, 1, 0, 1, 1, 1, 0, 1, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0],
groups_from_a=[],
groups_from_b=[],
a_name='keep',
b_name='keep',
continue_hours=True,
second_file_start=None,
keep_original_files=True,
verb=False,
trace=False,
):
"""
Merges two HDF5 files.
:param path_h5_a: Path to the first file to merge.
:type path_h5_a: string
:param path_h5_b: Path to the other file to merge.
:type path_h5_b: string
:param path_h5_merged: Path where the merged file is saved.
:type path_h5_merged: string
:param groups_to_merge: The groups that hold the arrays that we want to concatenate.
:type groups_to_merge: list of strings
:param sets_to_merge: The sets that hold the arrays we want to concatenate, same sets for all groups.
:type sets_to_merge: list of strings
:param concatenate_axis: The axis along which the arrays are concatenated. Each n'th index in this list corresponds
to the n'th string in the sets_to_merge list. If -1, the set is originally a scalar and gets reshaped to a 1D
array after merge.
:type concatenate_axis: list of ints
:param groups_from_a: Which groups are copied from the first HDF5 set.
:type groups_from_a: list of strings
:param groups_from_b: Which groups are copied from the second HDF5 set.
:type groups_from_b: list of strings
:param a_name: Type a name for the first HDF5 set to identify the data later on with the original data set. This
name is stored in the origin data set in the corresponding group. If 'keep', the content of the origin data set
from the HDF5 set is copied.
:type a_name: string
:param b_name: Type a name for the second HDF5 set to identify the data later on with the original data set. This
name is stored in the origin data set in the corresponding group. If 'keep', the content of the origin data set
from the HDF5 set is copied.
:type b_name: string
:param continue_hours: If True, the value of the last hours in a is added to the hours in b.
:type continue_hours: bool
:param second_file_start: The hours value at which the second file starts. If this is not handed and continue_hours
is activated, the value is extracted from the test pulses.
:type second_file_start: float or None
:param keep_original_files: If False, the original files are deleted after the merge.
:type keep_original_files: bool
:param verb: If True, verbal feedback about the process of the merge is given.
:type verb: bool
:param trace: Traces the memory and runtime consumption.
:type trace: bool
"""
if trace:
tracemalloc.start()
start_time = time.time()
for v in concatenate_axis:
if v not in [-1, 0, 1]:
raise KeyError('The concatenation axis must all be either 0, 1 or -1!')
with h5py.File(path_h5_a, 'r') as a, h5py.File(path_h5_b, 'r') as b, h5py.File(path_h5_merged, 'w') as m:
# define hours gap
if continue_hours:
if second_file_start is None:
if 'testpulses' in a and 'testpulses' in b:
second_file_start = a['testpulses']['hours'][-1] + (
b['testpulses']['time_s'][0] + 10e-6 * b['testpulses']['time_mus'][0] -
a['testpulses']['time_s'][-1] - 10e-6 * a['testpulses']['time_mus'][
-1]) / 3600
else:
raise KeyError('continue_hours argument requires testpulses group in both files, or needs the start of'
'the second file (second_file_hours) handed seperately!')
# merge the groups
for group in groups_to_merge:
if verb:
print('--> MERGE GROUP: {}.'.format(group))
if group in a.keys() and group in b.keys():
# create group in hdf5
g = m.create_group(group)
for i, set in enumerate(sets_to_merge):
if verb:
print('SET: {}.'.format(set))
if set in list(a[group].keys()) and set in list(b[group].keys()):
if verb:
print('creating ...')
# get shape
shape_a = a[group][set].shape
shape_b = b[group][set].shape
shape_m = np.copy(shape_a)
if concatenate_axis[i] == -1:
if len(shape_a) == 0:
shape_m = [2, ]
else:
shape_m[0] += 1
else:
shape_m[concatenate_axis[i]] += shape_b[concatenate_axis[i]]
# get dtype
data_type = a[group][set].dtype
# create set in new hdf 5 file
g.create_dataset(set,
shape=shape_m,
dtype=data_type)
# write data to new file
if continue_hours and set in ['hours', 'trigger_hours', 'tp_hours']:
g[set][:shape_a[0]] = a[group][set][:]
g[set][shape_a[0]:] = b[group][set][:] + second_file_start
elif concatenate_axis[i] == 0:
g[set][:shape_a[0]] = a[group][set][:]
g[set][shape_a[0]:] = b[group][set][:]
elif concatenate_axis[i] == 1:
for c in range(shape_a[0]):
g[set][c, :shape_a[1]] = a[group][set][c, :]
g[set][c, shape_a[1]:] = b[group][set][c, :]
elif concatenate_axis[i] == -1:
if len(shape_a) == 0:
g[set][0] = a[group][set][()]
else:
g[set][:-1] = a[group][set][:]
g[set][-1] = b[group][set][()]
# add the labels list
if set == 'labels':
g[set].attrs.create(name='unlabeled', data=0)
g[set].attrs.create(name='Event_Pulse', data=1)
g[set].attrs.create(name='Test/Control_Pulse', data=2)
g[set].attrs.create(name='Noise', data=3)
g[set].attrs.create(name='Squid_Jump', data=4)
g[set].attrs.create(name='Spike', data=5)
g[set].attrs.create(name='Early_or_late_Trigger', data=6)
g[set].attrs.create(name='Pile_Up', data=7)
g[set].attrs.create(name='Carrier_Event', data=8)
g[set].attrs.create(name='Strongly_Saturated_Event_Pulse', data=9)
g[set].attrs.create(name='Strongly_Saturated_Test/Control_Pulse', data=10)
g[set].attrs.create(name='Decaying_Baseline', data=11)
g[set].attrs.create(name='Temperature_Rise', data=12)
g[set].attrs.create(name='Stick_Event', data=13)
g[set].attrs.create(name='Square_Waves', data=14)
g[set].attrs.create(name='Human_Disturbance', data=15)
g[set].attrs.create(name='Large_Sawtooth', data=16)
g[set].attrs.create(name='Cosinus_Tail', data=17)
g[set].attrs.create(name='Light_only_Event', data=18)
g[set].attrs.create(name='Ring_Light_Event', data=19)
g[set].attrs.create(name='Sharp_Light_Event', data=20)
g[set].attrs.create(name='unknown/other', data=99)
# add the mainpar labels
if len(set) >= 7:
if set[:7] == 'mainpar':
g[set].attrs.create(name='pulse_height', data=0)
g[set].attrs.create(name='t_zero', data=1)
g[set].attrs.create(name='t_rise', data=2)
g[set].attrs.create(name='t_max', data=3)
g[set].attrs.create(name='t_decaystart', data=4)
g[set].attrs.create(name='t_half', data=5)
g[set].attrs.create(name='t_end', data=6)
g[set].attrs.create(name='offset', data=7)
g[set].attrs.create(name='linear_drift', data=8)
g[set].attrs.create(name='quadratic_drift', data=9)
# add the additional mainpar labels
if len(set) >= 11:
if set[:11] == 'add_mainpar':
g[set].attrs.create(name='array_max', data=0)
g[set].attrs.create(name='array_min', data=1)
g[set].attrs.create(name='var_first_eight', data=2)
g[set].attrs.create(name='mean_first_eight', data=3)
g[set].attrs.create(name='var_last_eight', data=4)
g[set].attrs.create(name='mean_last_eight', data=5)
g[set].attrs.create(name='var', data=6)
g[set].attrs.create(name='mean', data=7)
g[set].attrs.create(name='skewness', data=8)
g[set].attrs.create(name='max_derivative', data=9)
g[set].attrs.create(name='ind_max_derivative', data=10)
g[set].attrs.create(name='min_derivative', data=11)
g[set].attrs.create(name='ind_min_derivative', data=12)
g[set].attrs.create(name='max_filtered', data=13)
g[set].attrs.create(name='ind_max_filtered', data=14)
g[set].attrs.create(name='skewness_filtered_peak', data=15)
if set[:11] == 'sev_fit_par':
g[set].attrs.create(name='pulse_height', data=0)
g[set].attrs.create(name='onset', data=1)
g[set].attrs.create(name='constant_coefficient', data=2)
g[set].attrs.create(name='linear_coefficient', data=3)
g[set].attrs.create(name='quadratic_coefficient', data=4)
g[set].attrs.create(name='cubic_coefficient', data=5)
if 'event' in a[group] and 'event' in b[group]:
nmbr_a = len(a[group]['event'][0])
nmbr_b = len(b[group]['event'][0])
nmbr_m = nmbr_a + nmbr_b
# write the original file names to dataset
string_dt = h5py.special_dtype(vlen=str)
orig = g.require_dataset('origin',
shape=(nmbr_m,),
dtype=string_dt)
if a_name == 'keep':
if 'origin' in a[group]:
orig[:nmbr_a] = a[group]['origin']
else:
orig[:nmbr_a] = 'a'
else:
orig[:nmbr_a] = a_name
if b_name == 'keep':
if 'origin' in b[group]:
orig[nmbr_a:] = b[group]['origin']
else:
orig[nmbr_a:] = 'b'
else:
orig[nmbr_a:] = b_name
# take the groups from a and b
for f, g_lists in zip([a, b], [groups_from_a, groups_from_b]):
for group in g_lists:
if group in list(f.keys()):
if verb:
print('COPY GROUP: {}.'.format(group))
g = m.create_group(group)
for set in f[group].keys():
if verb:
print('SET: {}.'.format(set))
print('creating ...')
g.create_dataset(set, data=f[group][set])
if not keep_original_files:
for p in [path_h5_a, path_h5_b]:
if verb:
print('Deleting {}.'.format(p))
os.remove(p)
print('Merge done.')
if trace:
current, peak = tracemalloc.get_traced_memory()
print(
f"Current memory usage is {current / 10 ** 6}MB; Peak was {peak / 10 ** 6}MB; Runtime was {time.time() - start_time};")
tracemalloc.stop()