eegunity.utils package#

Submodules#

eegunity.utils.channel_align_raw module#

eegunity.utils.channel_align_raw.channel_align_raw(mne_raw, channel_order, min_matched_channel=1)[source]#

Aligns and orders the channels of an MNE Raw object according to a specified channel order.

This function ensures that the channels in the raw MNE object are aligned and ordered according to the specified channel_order. If some channels from channel_order are missing in the raw data, they will be added with zero values and later interpolated.

misc label channels and stim trigger channels are temporarily removed before alignment so they do not interfere with EEG-specific operations (montage fitting, bad-channel interpolation). They are re-appended at the end of the channel list after alignment is complete.

Parameters:
  • mne_raw (mne.io.Raw) – The raw EEG/MEG data in an MNE Raw object.

  • channel_order (list of str) – The desired order of channels. Should contain only channels to align (typically EEG). misc and stim channels are handled separately and must not be listed here.

  • min_matched_channel (int, optional) – The minimum required number of matched channels, by default 1.

Returns:

The modified raw object with channels aligned, missing channels interpolated, and preserved misc/stim channels appended at the end.

Return type:

mne.io.Raw

Raises:

ValueError – If the number of matched channels is less than min_matched_channel.

Notes

  • The function picks and reorders the matched channels to match channel_order.

  • If some channels from channel_order are missing in mne_raw, they are added as zero data channels and interpolated.

  • The missing channels are first marked as ‘bad’ before interpolation.

  • misc/stim channels are not interpolated and are not included in the alignment order.

Examples

>>> import mne
>>> raw = mne.io.read_raw_fif('sample_raw.fif', preload=True)
>>> desired_order = ['Fp1', 'Fp2', 'F3', 'F4', 'C3', 'C4', 'P3', 'P4', 'O1', 'O2']
>>> aligned_raw = channel_align_raw(raw, desired_order, min_matched_channel=5)

eegunity.utils.con_udatasets module#

eegunity.utils.con_udatasets.con_udatasets(datasets)[source]#

Concatenates the locator DataFrames of the given UnifiedDataset objects, and returns a new UnifiedDataset with the concatenated locator.

The function checks if all elements in the input list are instances of the ‘UnifiedDataset’ class without directly importing it. It then calls the get_locator() method of each dataset, concatenates them, and sets the new locator in a copied version of the first dataset using set_locator().

Parameters:

datasets (list) – A list of UnifiedDataset instances to concatenate their locators.

Returns:

A new UnifiedDataset with the concatenated locator.

Return type:

UnifiedDataset

Raises:

ValueError – If any element in the list is not an instance of ‘UnifiedDataset’.

eegunity.utils.h5 module#

class eegunity.utils.h5.h5Dataset(path, name)[source]#

Bases: object

Handle HDF5 file operations in a format compatible with h5py.

This class is adapted from: 935963004/LaBraM.

Parameters:
  • path (Path)

  • name (str)

addGroup(grpName)[source]#

Add a new group to the HDF5 file.

Parameters:

grpName (str) – The name of the group to create.

Returns:

The created group object.

Return type:

h5py.Group

addDataset(grp, dsName, arr, chunks=None, **kwargs)[source]#

Add a dataset to a specified group.

Parameters:
  • grp (h5py.Group) – The group to which the dataset will be added.

  • dsName (str) – The name of the dataset.

  • arr (np.array) – The data to store in the dataset.

  • chunks (tuple, optional) – The chunk shape to use when storing the dataset.

  • **kwargs – Additional keyword arguments passed to create_dataset.

Returns:

The created dataset object.

Return type:

h5py.Dataset

addAttributes(src, attrName, attrValue)[source]#

Add an attribute to a dataset or group.

Parameters:
  • src (h5py.Dataset or h5py.Group) – The target object to which the attribute will be added.

  • attrName (str) – The name of the attribute.

  • attrValue (any) – The value of the attribute.

save()[source]#

Close the HDF5 file.

property name#

Get the name of the HDF5 dataset.

Returns:

The name of the HDF5 file.

Return type:

str

eegunity.utils.handle_errors module#

eegunity.utils.handle_errors.handle_errors(miss_bad_data, error_list=None)[source]#

Decorator to handle errors in function execution based on the miss_bad_data flag.

Parameters:
  • miss_bad_data (bool) – If True, errors are caught and logged instead of raising exceptions.

  • error_list (list, optional) – If provided, errors will be added to this list. Default is None (do not store errors).

Return type:

Decorated function that handles errors as specified.

eegunity.utils.label_channel module#

Utilities for EEGUnity misc (continuous-label) channels.

EEGUnity uses the misc: channel name prefix together with MNE’s built-in misc channel type to attach per-sample continuous signals to a BaseRaw object alongside the EEG data. Typical use cases include regression labels such as reaction time, inter-event gap, or any other scalar value produced by a dataset-specific kernel for each epoch.

Channel naming convention#

All EEGUnity label channels follow the misc:{task_name} pattern, for example misc:reaction_time or misc:inter_event_gap. The MNE channel type is always misc.

This convention is consistent with EEGUnity’s locator channel prefix system (eeg:, eog:, emg:, ecg:, stim:; legacy uppercase forms are also accepted) and is distinct from stim: channels in the following ways:

  • Value type: stim channels carry integer trigger codes, while misc label channels carry continuous float values.

  • Typical use: stim is for event onset or TTL-like pulses; misc is for per-sample regression targets.

  • Resampling: MNE handles stim with nearest-neighbour logic, but applies resample_poly to misc channels unless EEGUnity’s wrapper is used.

  • filter() / ICA.fit(): both channel types are excluded by default.

  • events_from_annotations() does not directly consume either channel type.

Because MNE’s resample() applies scipy.signal.resample_poly to all channels (misc included), label channels must be resampled with nearest-neighbour interpolation to preserve their original float values. Always call resample_raw_with_labels() instead of raw.resample() directly in any EEGUnity code path where label channels may be present.

See also

resample_raw_with_labels()

Drop-in replacement for raw.resample().

is_misc_channel()

Predicate for identifying label channels.

misc_task_name()

Extract the task name from a label channel name.

eegunity.utils.label_channel.MISC_CH_PREFIX: str = 'misc:'#

Prefix string that identifies all EEGUnity misc (label) channels.

Every channel whose name starts with this prefix is treated as a continuous label channel with MNE type misc.

eegunity.utils.label_channel.is_misc_channel(ch_name)[source]#

Return True if ch_name is an EEGUnity misc (label) channel.

The check is case-sensitive and matches the 'misc:' prefix exactly.

Parameters:

ch_name (str) – Channel name to test.

Returns:

True when ch_name starts with 'misc:', False otherwise.

Return type:

bool

Examples

>>> is_misc_channel('misc:reaction_time')
True
>>> is_misc_channel('eeg:Fz')
False
eegunity.utils.label_channel.is_misc_channel_in_raw(raw, ch_idx)[source]#

Return True if channel index points to a misc label channel.

The check primarily uses MNE channel type metadata and falls back to the EEGUnity misc: prefix for backward compatibility.

Parameters:
  • raw (mne.io.BaseRaw) – Raw object.

  • ch_idx (int) – Channel index.

Returns:

True when channel type is misc or name starts with misc:.

Return type:

bool

Examples

>>> # is_misc_channel_in_raw(raw, 0)  
eegunity.utils.label_channel.is_stim_channel_in_raw(raw, ch_idx)[source]#

Return True if channel index points to a stim channel.

Parameters:
  • raw (mne.io.BaseRaw) – Raw object.

  • ch_idx (int) – Channel index.

Returns:

True when channel type is stim.

Return type:

bool

Examples

>>> # is_stim_channel_in_raw(raw, 0)  
eegunity.utils.label_channel.misc_channel_indices(raw)[source]#

Return indices of misc channels in a raw object.

Parameters:

raw (mne.io.BaseRaw) – Raw object.

Returns:

Indices of channels treated as misc labels.

Return type:

list of int

Examples

>>> # idx = misc_channel_indices(raw)  
eegunity.utils.label_channel.stim_channel_indices(raw)[source]#

Return indices of stim channels in a raw object.

Parameters:

raw (mne.io.BaseRaw) – Raw object.

Returns:

Indices of channels with type stim.

Return type:

list of int

Examples

>>> # idx = stim_channel_indices(raw)  
eegunity.utils.label_channel.misc_task_name(ch_name)[source]#

Extract the task name from a misc label channel name.

Parameters:

ch_name (str) – A channel name of the form 'misc:{task_name}'.

Returns:

The task name portion after the 'misc:' prefix.

Return type:

str

Raises:

ValueError – If ch_name does not start with 'misc:'.

Examples

>>> misc_task_name('misc:reaction_time')
'reaction_time'
eegunity.utils.label_channel.resample_raw_with_labels(raw, sfreq, **kwargs)[source]#

Resample raw, applying nearest-neighbour interpolation to misc channels.

MNE’s resample() uses scipy.signal.resample_poly for all channels, which introduces low-pass filtering artefacts on the step-function signals typically stored in misc: label channels. This function wraps the standard resample call and overwrites the resampled misc channel data with values obtained by nearest-neighbour interpolation, preserving the original float values exactly for samples that fall in the interior of constant regions.

EEG, EOG, MEG and all other non-misc channels are resampled with the standard MNE pipeline and are not affected by this wrapper.

Parameters:
  • raw (mne.io.BaseRaw) – The raw object to resample. Will be loaded into memory if not already preloaded.

  • sfreq (float) – New sampling frequency in Hz.

  • **kwargs – Additional keyword arguments forwarded to resample() (e.g. npad, window).

Returns:

The resampled raw object (modified in-place).

Return type:

mne.io.BaseRaw

Notes

If no misc: channels are present, this function is equivalent to calling raw.resample(sfreq, **kwargs) directly.

The nearest-neighbour mapping is computed as:

new_index[i] = round(i * old_n_times / new_n_times)

which guarantees that samples in the interior of a constant label region are reproduced exactly regardless of the resampling ratio.

Examples

>>> raw = resample_raw_with_labels(raw, sfreq=256)

eegunity.utils.log_processing module#

eegunity.utils.log_processing.log_processing(func)[source]#

Decorator that logs the processing of a data row.

This decorator prints a message indicating which row is being processed before calling the original function.

Parameters:

func (callable) – The function to decorate. It must accept a ‘row’ as its first argument.

Returns:

The wrapped function with added logging behavior.

Return type:

callable

eegunity.utils.normalize module#

eegunity.utils.normalize.normalize_mne(mne_raw)[source]#

Normalize each non-misc/non-stim channel to zero mean and unit variance.

This function processes data from an mne.io.Raw object and normalizes each eligible channel independently.

Channels with MNE type misc and stim are excluded from normalization:

  • misc channels may carry continuous labels that should remain in original units.

  • stim channels contain integer trigger codes that must not be standardized.

Parameters:

mne_raw (mne.io.Raw) – Raw object containing EEG/MEG data.

Returns:

The same raw object after in-place normalization.

Return type:

mne.io.Raw

Notes

Normalization is performed in place.

Examples

>>> raw = mne.io.read_raw_fif('sample_data.fif')
>>> raw_normalized = normalize_mne(raw)
>>> print(raw_normalized.get_data())

eegunity.utils.parallel module#

Parallel execution utilities for EEGUnity batch processing.

This module provides the central dispatcher used by batch_process(). Two execution backends are available in addition to the default sequential mode:

Thread mode ('thread')

Uses ThreadPoolExecutor. The CPython GIL is released during blocking I/O, so this mode is well-suited to network- or disk-bound workloads (e.g. reading files from NFS or issuing os.stat() calls over a network filesystem).

Process mode ('process')

Uses ProcessPoolExecutor. Each worker runs in a separate OS process with its own GIL, enabling genuine CPU parallelism. The callable is serialised with cloudpickle, which supports closures and locally-defined functions that the built-in pickle cannot handle. This mode is suited to CPU-intensive signal-processing operations such as filtering, ICA, or resampling.

eegunity.utils.parallel.parallel_execute(tasks, app_func, is_patch, result_type, execution_mode, num_workers)[source]#

Apply app_func to each eligible task row and return an ordered result list.

This is the central dispatcher for batch_process(). It selects the execution backend based on execution_mode and num_workers, then runs app_func on every task whose should_apply flag is True.

Parameters:
  • tasks (list of tuple) – Sequence of (index, row, should_apply) triples where index is the locator row index, row is a pandas.Series, and should_apply is the bool result of con_func(row).

  • app_func (callable) – Function to apply to each row when should_apply is True. Must accept a single positional argument (the pandas.Series row). May be a closure or a locally-defined function.

  • is_patch (bool) – When True, rows for which should_apply is False contribute a placeholder value to the output so that the result list length matches the number of tasks. When False those rows are represented by None in the returned list (callers that pass is_patch=False typically filter None values afterwards).

  • result_type ({'series', 'value', None}) – Controls which placeholder is inserted for skipped rows. When 'series' and is_patch is True, the original row is used as placeholder, preserving the full pandas.Series; in all other cases None is used.

  • execution_mode ({'thread', 'process', None}) –

    Selects the concurrency backend:

    'thread'

    Concurrent execution via ThreadPoolExecutor. The Python GIL is released during blocking I/O, making this mode effective for network- or disk-bound workloads. num_workers controls the thread-pool size.

    'process'

    Concurrent execution via ProcessPoolExecutor. Each worker runs in a separate OS process, bypassing the GIL for true CPU parallelism. app_func is serialised with cloudpickle so closures and locally-defined functions are fully supported. num_workers controls the process-pool size.

    None

    Sequential execution on the calling thread, regardless of num_workers. Use this mode for lightweight operations or whenever concurrent access to shared state (e.g. an open HDF5 file handle) must be avoided.

  • num_workers (int) – Maximum number of concurrent workers for thread or process pools. When <= 0, execution always falls back to the sequential path even if execution_mode is 'thread' or 'process'.

Returns:

Ordered result list with one entry per task. Entries for skipped rows (should_apply=False) contain the placeholder value described above.

Return type:

list

Raises:

ValueError – If execution_mode is not one of 'thread', 'process', or None.

Examples

>>> tasks = [(0, {'x': 1}, True), (1, {'x': 2}, False)]
>>> parallel_execute(  
...     tasks=tasks,
...     app_func=lambda row: row['x'] * 2,
...     is_patch=False,
...     result_type='value',
...     execution_mode='thread',
...     num_workers=2,
... )

eegunity.utils.pipeline module#

class eegunity.utils.pipeline.Pipeline(functions)[source]#

Bases: object

Apply a list of functions sequentially to an input.

The Pipeline class enables users to define and apply a sequence of transformations (functions) to input data.

functions#

A list of functions to apply in order.

Type:

list of callable

Examples

EEG processing pipeline using MNE: >>> import mne >>> def bandpass_filter(raw, l_freq, h_freq): … return raw.filter(l_freq=l_freq, h_freq=h_freq) >>> def notch_filter(raw, freqs): … return raw.notch_filter(freqs=freqs) >>> def resample(raw, sfreq): … return raw.resample(sfreq=sfreq) >>> # Define processing functions >>> functions = [ … lambda raw: bandpass_filter(raw, 0.1, 75), … lambda raw: notch_filter(raw, freqs=50), … lambda raw: resample(raw, sfreq=200) … ] >>> # Initialize and apply the pipeline >>> pipeline = Pipeline(functions) >>> processed_raw = pipeline.forward(raw) >>> print(processed_raw.info[‘sfreq’])

forward(X)[source]#

Apply all functions in the pipeline to the input data.

Parameters:

X (any) – The input data to be transformed.

Returns:

The transformed data after applying all functions.

Return type:

any

eegunity.utils.split_hdf5_file module#

eegunity.utils.split_hdf5_file.split_hdf5_file(input_path, max_file_size=10737418240, output_dir='.')[source]#

Split an HDF5 file into multiple parts if its total size exceeds the given limit.

The minimal splitting unit is a top-level group. If the file size surpasses the specified max_file_size, this function creates multiple output HDF5 files and distributes the top-level groups among them without splitting any single group. Output files are named based on the input file’s base name, with suffixes like _s1.hdf5, _s2.hdf5, etc.

Parameters:
  • input_path (str) – Path to the input HDF5 file.

  • max_file_size (int, optional) – Maximum size in bytes for each output HDF5 file (default is 10GB).

  • output_dir (str, optional) – Directory where output files will be saved. Defaults to the current directory.

Returns:

A list of paths to the generated HDF5 files.

Return type:

list of str

Package exports#