Source code for eegunity.utils.split_hdf5_file
import os
import h5py
import shutil
def _get_group_size(group):
"""
Calculate the total storage size of datasets under a given group.
This function recursively iterates through all datasets within the group,
summing their storage sizes.
Parameters
----------
group : h5py.Group
The HDF5 group whose total dataset size we want to calculate.
Returns
-------
int
The total storage size in bytes of all datasets in this group.
"""
total_size = 0
def size_visitor_func(name, obj):
nonlocal total_size
if isinstance(obj, h5py.Dataset):
total_size += obj.id.get_storage_size()
group.visititems(size_visitor_func)
return total_size
[docs]
def split_hdf5_file(input_path, max_file_size=10 * 1024**3, output_dir="."):
"""
Split an HDF5 file into multiple parts if its total size exceeds the given limit.
The minimal splitting unit is a top-level group. If the file size surpasses
the specified max_file_size, this function creates multiple output HDF5 files
and distributes the top-level groups among them without splitting any single group.
Output files are named based on the input file's base name, with suffixes like
`_s1.hdf5`, `_s2.hdf5`, etc.
Parameters
----------
input_path : str
Path to the input HDF5 file.
max_file_size : int, optional
Maximum size in bytes for each output HDF5 file (default is 10GB).
output_dir : str, optional
Directory where output files will be saved. Defaults to the current directory.
Returns
-------
list of str
A list of paths to the generated HDF5 files.
"""
# Check if input file exists
if not os.path.isfile(input_path):
raise FileNotFoundError(f"Input file not found: {input_path}")
# Create output directory if it does not exist
os.makedirs(output_dir, exist_ok=True)
# Extract the base name of the input file (without extension)
base_name = os.path.splitext(os.path.basename(input_path))[0]
# Calculate total size and group sizes
with h5py.File(input_path, 'r') as f:
# Collect top-level groups and their sizes
groups_info = []
for name in f.keys():
obj = f[name]
if isinstance(obj, h5py.Group):
grp_size = _get_group_size(obj)
groups_info.append((name, grp_size))
else:
# If top-level item is a dataset (not group), treat it as a "group-like" unit.
grp_size = obj.id.get_storage_size()
groups_info.append((name, grp_size))
total_size = sum(s for _, s in groups_info)
if total_size <= max_file_size:
# Simply copy the original file as is, naming it with _s1 suffix
output_path = os.path.join(output_dir, f"{base_name}_s1.hdf5")
shutil.copyfile(input_path, output_path)
return [output_path]
# Otherwise, we need to split into multiple files
output_files = []
current_file_index = 1
current_output_path = os.path.join(output_dir, f"{base_name}_s{current_file_index}.hdf5")
current_out_file = h5py.File(current_output_path, 'w')
current_used_space = 0
# We'll re-open the input in read mode for copying groups
with h5py.File(input_path, 'r') as in_f:
for grp_name, grp_size in groups_info:
# If adding this group exceeds the limit, start a new file
if current_used_space + grp_size > max_file_size:
current_out_file.close()
output_files.append(current_output_path)
current_file_index += 1
current_output_path = os.path.join(output_dir, f"{base_name}_s{current_file_index}.hdf5")
current_out_file = h5py.File(current_output_path, 'w')
current_used_space = 0
# Copy the group to the current output file
in_f.copy(grp_name, current_out_file)
current_used_space += grp_size
current_out_file.close()
output_files.append(current_output_path)
return output_files