eegunity.utils package#
Submodules#
eegunity.utils.channel_align_raw module#
- eegunity.utils.channel_align_raw.channel_align_raw(mne_raw, channel_order, min_matched_channel=1)[source]#
Aligns and orders the channels of an MNE Raw object according to a specified channel order.
This function ensures that the channels in the raw MNE object are aligned and ordered according to the specified channel_order. If some channels from channel_order are missing in the raw data, they will be added with zero values and later interpolated.
misclabel channels andstimtrigger channels are temporarily removed before alignment so they do not interfere with EEG-specific operations (montage fitting, bad-channel interpolation). They are re-appended at the end of the channel list after alignment is complete.- Parameters:
mne_raw (mne.io.Raw) – The raw EEG/MEG data in an MNE Raw object.
channel_order (list of str) – The desired order of channels. Should contain only channels to align (typically EEG).
miscandstimchannels are handled separately and must not be listed here.min_matched_channel (int, optional) – The minimum required number of matched channels, by default 1.
- Returns:
The modified raw object with channels aligned, missing channels interpolated, and preserved
misc/stimchannels appended at the end.- Return type:
mne.io.Raw
- Raises:
ValueError – If the number of matched channels is less than min_matched_channel.
Notes
The function picks and reorders the matched channels to match channel_order.
If some channels from channel_order are missing in mne_raw, they are added as zero data channels and interpolated.
The missing channels are first marked as ‘bad’ before interpolation.
misc/stimchannels are not interpolated and are not included in the alignment order.
Examples
>>> import mne >>> raw = mne.io.read_raw_fif('sample_raw.fif', preload=True) >>> desired_order = ['Fp1', 'Fp2', 'F3', 'F4', 'C3', 'C4', 'P3', 'P4', 'O1', 'O2'] >>> aligned_raw = channel_align_raw(raw, desired_order, min_matched_channel=5)
eegunity.utils.con_udatasets module#
- eegunity.utils.con_udatasets.con_udatasets(datasets)[source]#
Concatenates the locator DataFrames of the given UnifiedDataset objects, and returns a new UnifiedDataset with the concatenated locator.
The function checks if all elements in the input list are instances of the ‘UnifiedDataset’ class without directly importing it. It then calls the get_locator() method of each dataset, concatenates them, and sets the new locator in a copied version of the first dataset using set_locator().
- Parameters:
datasets (list) – A list of UnifiedDataset instances to concatenate their locators.
- Returns:
A new UnifiedDataset with the concatenated locator.
- Return type:
- Raises:
ValueError – If any element in the list is not an instance of ‘UnifiedDataset’.
eegunity.utils.h5 module#
- class eegunity.utils.h5.h5Dataset(path, name)[source]#
Bases:
objectHandle HDF5 file operations in a format compatible with h5py.
This class is adapted from: 935963004/LaBraM.
Data is written to
<name>.hdf5.tmpand only renamed to<name>.hdf5whensave()is called successfully. If the final.hdf5file already exists,FileExistsErroris raised at construction time so the user can clean up explicitly.- Parameters:
path (Path)
name (str)
- addGroup(grpName)[source]#
Add a new group to the HDF5 file.
- Parameters:
grpName (str) – The name of the group to create.
- Returns:
The created group object.
- Return type:
h5py.Group
- addDataset(grp, dsName, arr, chunks=None, **kwargs)[source]#
Add a dataset to a specified group.
- Parameters:
grp (h5py.Group) – The group to which the dataset will be added.
dsName (str) – The name of the dataset.
arr (np.array) – The data to store in the dataset.
chunks (tuple, optional) – The chunk shape to use when storing the dataset.
**kwargs – Additional keyword arguments passed to create_dataset.
- Returns:
The created dataset object.
- Return type:
h5py.Dataset
- addAttributes(src, attrName, attrValue)[source]#
Add an attribute to a dataset or group.
- Parameters:
src (h5py.Dataset or h5py.Group) – The target object to which the attribute will be added.
attrName (str) – The name of the attribute.
attrValue (any) – The value of the attribute.
- property name#
Get the name of the HDF5 dataset.
- Returns:
The name of the HDF5 file.
- Return type:
str
- class eegunity.utils.h5.h5EpochDatasetV2(path, name)[source]#
Bases:
objectEEGUnity v2 epoch HDF5 format writer (v2.1 schema).
Flat-array layout optimised for PyTorch random access:
Structure#
- / (root)
- attrs: version (“2.1”), sfreq, ch_names (JSON), n_channels, n_times,
label_map (JSON: code->event_name), n_epochs_total, info_fields (JSON list[str]; v2.1), created_by, created_at
├── data (N, n_ch, n_times) float32 │ chunk=(1, n_ch, n_times), gzip level-1 ├── epoch_meta/ │ ├── source_group (N,) variable-length UTF-8 string │ └── event_code (N,) int16 ├── misc_meta/ (v2.1, optional) │ ├── {misc_channel_name} (N,) float32 │ └── attrs.names: JSON list[str] └── source_meta/
- └── {group_name}/
- attrs: file_path, n_epochs_in_source, sfreq,
ch_names (JSON), age, gender, amplifier, cap, handedness, <any additional participants.tsv columns>
└── info (uint8) pickled mne.Info bytes
Backward compat#
Readers of v2.0 can still open v2.1 files if they ignore /misc_meta/ and the extra source attrs. The dataset_api in eeg_kernel_agent accepts both “2.0” and “2.1” as valid version strings.
Usage#
writer = h5EpochDatasetV2(output_dir, “MyDataset”) writer.add_epochs(group_name, event_name, epoch_array_float32,
info_bytes, source_attrs, sfreq, ch_names, misc_values={“accuracy”: np.array([1, 0, 1])}) # v2.1
writer.save()
Reading in PyTorch#
Use eeg_kernel_agent.dataset_api.{EventDataset, MISCDataset, InfoDataset}.
- add_epochs(group_name, event_name, epoch_data, info_bytes, source_attrs, sfreq, ch_names, misc_values=None)[source]#
Append epochs for one (source_file, event) pair.
- Parameters:
group_name (str) – Unique identifier for the source file (e.g. basename without extension).
event_name (str) – Human-readable event / class label.
epoch_data (np.ndarray, shape (n_epochs, n_ch, n_times)) – Epoch array; will be cast to float32.
info_bytes (bytes) –
pickle.dumps(raw.info)from the source file.source_attrs (dict) – Scalar metadata stored as HDF5 attrs on the source_meta group. Canonical keys (optional): file_path, age, gender, amplifier, cap, handedness. Any additional keys (e.g. participants.tsv columns like
p_factor) are stored verbatim and their names recorded in the rootinfo_fieldsattr.sfreq (float) – Sampling frequency (used only during lazy initialisation).
ch_names (list[str]) – Channel names (used only during lazy initialisation).
misc_values (dict, optional) – Mapping
{misc_channel_name: array-like of length n}for v2.1 per-epoch misc values (e.g. reaction_time, accuracy). Stored under/misc_meta/<name>as float32.
- Return type:
None
- Parameters:
path (Path)
name (str)
eegunity.utils.handle_errors module#
- eegunity.utils.handle_errors.handle_errors(miss_bad_data, error_list=None)[source]#
Decorator to handle errors in function execution based on the miss_bad_data flag.
- Parameters:
miss_bad_data (bool) – If True, errors are caught and logged instead of raising exceptions.
error_list (list, optional) – If provided, errors will be added to this list. Default is None (do not store errors).
- Return type:
Decorated function that handles errors as specified.
eegunity.utils.label_channel module#
Utilities for EEGUnity misc (continuous-label) channels.
EEGUnity uses the misc: channel name prefix together with MNE’s built-in
misc channel type to attach per-sample continuous signals to a
BaseRaw object alongside the EEG data. Typical use cases
include regression labels such as reaction time, inter-event gap, or any
other scalar value produced by a dataset-specific kernel for each epoch.
Channel naming convention#
All EEGUnity label channels follow the misc:{task_name} pattern, for
example misc:reaction_time or misc:inter_event_gap. The MNE channel
type is always misc.
This convention is consistent with EEGUnity’s locator channel prefix system
(eeg:, eog:, emg:, ecg:, stim:; legacy uppercase forms are
also accepted) and is distinct from stim: channels in the following ways:
Value type:
stimchannels carry integer trigger codes, whilemisclabel channels carry continuous float values.Typical use:
stimis for event onset or TTL-like pulses;miscis for per-sample regression targets.Resampling: MNE handles
stimwith nearest-neighbour logic, but appliesresample_polytomiscchannels unless EEGUnity’s wrapper is used.filter()/ICA.fit(): both channel types are excluded by default.events_from_annotations()does not directly consume either channel type.
Because MNE’s resample() applies
scipy.signal.resample_poly to all channels (misc included), label
channels must be resampled with nearest-neighbour interpolation to preserve
their original float values. Always call resample_raw_with_labels()
instead of raw.resample() directly in any EEGUnity code path where
label channels may be present.
See also
resample_raw_with_labels()Drop-in replacement for
raw.resample().is_misc_channel()Predicate for identifying label channels.
misc_task_name()Extract the task name from a label channel name.
- eegunity.utils.label_channel.MISC_CH_PREFIX: str = 'misc:'#
Prefix string that identifies all EEGUnity misc (label) channels.
Every channel whose name starts with this prefix is treated as a continuous label channel with MNE type
misc.
- eegunity.utils.label_channel.is_misc_channel(ch_name)[source]#
Return
Trueif ch_name is an EEGUnity misc (label) channel.The check is case-sensitive and matches the
'misc:'prefix exactly.- Parameters:
ch_name (str) – Channel name to test.
- Returns:
Truewhen ch_name starts with'misc:',Falseotherwise.- Return type:
bool
Examples
>>> is_misc_channel('misc:reaction_time') True >>> is_misc_channel('eeg:Fz') False
- eegunity.utils.label_channel.is_misc_channel_in_raw(raw, ch_idx)[source]#
Return
Trueif channel index points to a misc label channel.The check primarily uses MNE channel type metadata and falls back to the EEGUnity
misc:prefix for backward compatibility.- Parameters:
raw (mne.io.BaseRaw) – Raw object.
ch_idx (int) – Channel index.
- Returns:
Truewhen channel type ismiscor name starts withmisc:.- Return type:
bool
Examples
>>> # is_misc_channel_in_raw(raw, 0)
- eegunity.utils.label_channel.is_stim_channel_in_raw(raw, ch_idx)[source]#
Return
Trueif channel index points to a stim channel.- Parameters:
raw (mne.io.BaseRaw) – Raw object.
ch_idx (int) – Channel index.
- Returns:
Truewhen channel type isstim.- Return type:
bool
Examples
>>> # is_stim_channel_in_raw(raw, 0)
- eegunity.utils.label_channel.misc_channel_indices(raw)[source]#
Return indices of misc channels in a raw object.
- Parameters:
raw (mne.io.BaseRaw) – Raw object.
- Returns:
Indices of channels treated as misc labels.
- Return type:
list of int
Examples
>>> # idx = misc_channel_indices(raw)
- eegunity.utils.label_channel.stim_channel_indices(raw)[source]#
Return indices of stim channels in a raw object.
- Parameters:
raw (mne.io.BaseRaw) – Raw object.
- Returns:
Indices of channels with type
stim.- Return type:
list of int
Examples
>>> # idx = stim_channel_indices(raw)
- eegunity.utils.label_channel.misc_task_name(ch_name)[source]#
Extract the task name from a misc label channel name.
- Parameters:
ch_name (str) – A channel name of the form
'misc:{task_name}'.- Returns:
The task name portion after the
'misc:'prefix.- Return type:
str
- Raises:
ValueError – If ch_name does not start with
'misc:'.
Examples
>>> misc_task_name('misc:reaction_time') 'reaction_time'
- eegunity.utils.label_channel.resample_raw_with_labels(raw, sfreq, **kwargs)[source]#
Resample raw, applying nearest-neighbour interpolation to misc channels.
MNE’s
resample()usesscipy.signal.resample_polyfor all channels, which introduces low-pass filtering artefacts on the step-function signals typically stored inmisc:label channels. This function wraps the standard resample call and overwrites the resampled misc channel data with values obtained by nearest-neighbour interpolation, preserving the original float values exactly for samples that fall in the interior of constant regions.EEG, EOG, MEG and all other non-misc channels are resampled with the standard MNE pipeline and are not affected by this wrapper.
- Parameters:
raw (mne.io.BaseRaw) – The raw object to resample. Will be loaded into memory if not already preloaded.
sfreq (float) – New sampling frequency in Hz.
**kwargs – Additional keyword arguments forwarded to
resample()(e.g.npad,window).
- Returns:
The resampled raw object (modified in-place).
- Return type:
mne.io.BaseRaw
Notes
If no
misc:channels are present, this function is equivalent to callingraw.resample(sfreq, **kwargs)directly.The nearest-neighbour mapping is computed as:
new_index[i] = round(i * old_n_times / new_n_times)
which guarantees that samples in the interior of a constant label region are reproduced exactly regardless of the resampling ratio.
Examples
>>> raw = resample_raw_with_labels(raw, sfreq=256)
eegunity.utils.log_processing module#
- eegunity.utils.log_processing.log_processing(func)[source]#
Decorator that logs the processing of a data row.
This decorator prints a message indicating which row is being processed before calling the original function.
- Parameters:
func (callable) – The function to decorate. It must accept a ‘row’ as its first argument.
- Returns:
The wrapped function with added logging behavior.
- Return type:
callable
eegunity.utils.normalize module#
- eegunity.utils.normalize.normalize_mne(mne_raw)[source]#
Normalize each non-misc/non-stim channel to zero mean and unit variance.
This function processes data from an
mne.io.Rawobject and normalizes each eligible channel independently.Channels with MNE type
miscandstimare excluded from normalization:miscchannels may carry continuous labels that should remain in original units.stimchannels contain integer trigger codes that must not be standardized.
- Parameters:
mne_raw (mne.io.Raw) – Raw object containing EEG/MEG data.
- Returns:
The same raw object after in-place normalization.
- Return type:
mne.io.Raw
Notes
Normalization is performed in place.
Examples
>>> raw = mne.io.read_raw_fif('sample_data.fif') >>> raw_normalized = normalize_mne(raw) >>> print(raw_normalized.get_data())
eegunity.utils.parallel module#
Parallel execution utilities for EEGUnity batch processing.
This module provides the central dispatcher used by
batch_process().
Two execution backends are available in addition to the default
sequential mode:
- Thread mode (
'thread') Uses
ThreadPoolExecutor. The CPython GIL is released during blocking I/O, so this mode is well-suited to network- or disk-bound workloads (e.g. reading files from NFS or issuingos.stat()calls over a network filesystem).- Process mode (
'process') Uses
ProcessPoolExecutor. Each worker runs in a separate OS process with its own GIL, enabling genuine CPU parallelism. The callable is serialised with cloudpickle, which supports closures and locally-defined functions that the built-inpicklecannot handle. This mode is suited to CPU-intensive signal-processing operations such as filtering, ICA, or resampling.
- eegunity.utils.parallel.parallel_execute(tasks, app_func, is_patch, result_type, execution_mode, num_workers)[source]#
Apply app_func to each eligible task row and return an ordered result list.
This is the central dispatcher for
batch_process(). It selects the execution backend based on execution_mode and num_workers, then runs app_func on every task whose should_apply flag isTrue.- Parameters:
tasks (list of tuple) – Sequence of
(index, row, should_apply)triples where index is the locator row index, row is apandas.Series, and should_apply is theboolresult ofcon_func(row).app_func (callable) – Function to apply to each row when should_apply is
True. Must accept a single positional argument (thepandas.Seriesrow). May be a closure or a locally-defined function.is_patch (bool) – When
True, rows for which should_apply isFalsecontribute a placeholder value to the output so that the result list length matches the number of tasks. WhenFalsethose rows are represented byNonein the returned list (callers that passis_patch=Falsetypically filterNonevalues afterwards).result_type ({'series', 'value', None}) – Controls which placeholder is inserted for skipped rows. When
'series'and is_patch isTrue, the original row is used as placeholder, preserving the fullpandas.Series; in all other casesNoneis used.execution_mode ({'thread', 'process', None}) –
Selects the concurrency backend:
'thread'Concurrent execution via
ThreadPoolExecutor. The Python GIL is released during blocking I/O, making this mode effective for network- or disk-bound workloads. num_workers controls the thread-pool size.'process'Concurrent execution via
ProcessPoolExecutor. Each worker runs in a separate OS process, bypassing the GIL for true CPU parallelism. app_func is serialised with cloudpickle so closures and locally-defined functions are fully supported. num_workers controls the process-pool size.NoneSequential execution on the calling thread, regardless of num_workers. Use this mode for lightweight operations or whenever concurrent access to shared state (e.g. an open HDF5 file handle) must be avoided.
num_workers (int) – Maximum number of concurrent workers for thread or process pools. When
<= 0, execution always falls back to the sequential path even if execution_mode is'thread'or'process'.
- Returns:
Ordered result list with one entry per task. Entries for skipped rows (
should_apply=False) contain the placeholder value described above.- Return type:
list
- Raises:
ValueError – If execution_mode is not one of
'thread','process', orNone.
Examples
>>> tasks = [(0, {'x': 1}, True), (1, {'x': 2}, False)] >>> parallel_execute( ... tasks=tasks, ... app_func=lambda row: row['x'] * 2, ... is_patch=False, ... result_type='value', ... execution_mode='thread', ... num_workers=2, ... )
eegunity.utils.pipeline module#
- class eegunity.utils.pipeline.Pipeline(functions)[source]#
Bases:
objectApply a list of functions sequentially to an input.
The Pipeline class enables users to define and apply a sequence of transformations (functions) to input data.
- functions#
A list of functions to apply in order.
- Type:
list of callable
Examples
EEG processing pipeline using MNE: >>> import mne >>> def bandpass_filter(raw, l_freq, h_freq): … return raw.filter(l_freq=l_freq, h_freq=h_freq) >>> def notch_filter(raw, freqs): … return raw.notch_filter(freqs=freqs) >>> def resample(raw, sfreq): … return raw.resample(sfreq=sfreq) >>> # Define processing functions >>> functions = [ … lambda raw: bandpass_filter(raw, 0.1, 75), … lambda raw: notch_filter(raw, freqs=50), … lambda raw: resample(raw, sfreq=200) … ] >>> # Initialize and apply the pipeline >>> pipeline = Pipeline(functions) >>> processed_raw = pipeline.forward(raw) >>> print(processed_raw.info[‘sfreq’])
eegunity.utils.split_hdf5_file module#
- eegunity.utils.split_hdf5_file.split_hdf5_file(input_path, max_file_size=10737418240, output_dir='.')[source]#
Split an HDF5 file into multiple parts if its total size exceeds the given limit.
Both v1 (file-per-group) and v2 (flat-array) EEGUnity HDF5 formats are supported. The format is detected automatically from the root
versionattribute.v1 behaviour — The minimal splitting unit is one top-level group (one source file). Groups are accumulated greedily and a new output file is started whenever adding the next group would exceed
max_file_size.v2 behaviour — The minimal splitting unit is all epochs from one source file (epochs from the same source are never split across files). Size is estimated from uncompressed epoch data (
n_epochs × n_channels × n_times × 4bytes). Each output file is a fully self-contained v2 HDF5 that shares the globallabel_mapof the source file and contains only thesource_metaentries relevant to that split.If the file fits within
max_file_sizea single_s1copy is returned without modification.- Parameters:
input_path (str) – Path to the input HDF5 file.
max_file_size (int, optional) – Maximum size in bytes for each output HDF5 file (default is 10 GB). For v2 files this is compared against the uncompressed epoch data size, which is a conservative upper bound.
output_dir (str, optional) – Directory where output files will be saved. Defaults to the current directory.
- Returns:
A list of paths to the generated HDF5 files.
- Return type:
list of str
- Raises:
FileNotFoundError – If
input_pathdoes not exist.