import os
import re
import numpy as np
from concurrent.futures import ThreadPoolExecutor
from numpy import ndarray
from scipy.io import loadmat
def _is_numeric(s):
"""Match integer or floating-point numbers."""
pattern = r'^-?\d+(\.\d+)?$'
return bool(re.match(pattern, s))
def _load_mat_h5py(file_path):
"""Read a MATLAB v7.3 (HDF5) ``.mat`` file using ``h5py``.
Parameters
----------
file_path : str
Path to the MATLAB v7.3 HDF5 file.
Returns
-------
dict or None
Parsed nested dictionary on success, otherwise ``None``.
Examples
--------
>>> _load_mat_h5py("example_v73.mat") # doctest: +SKIP
"""
try:
import h5py
except ImportError:
return None
def _visit(item):
if isinstance(item, h5py.Dataset):
return item[()]
if isinstance(item, h5py.Group):
return {k: _visit(v) for k, v in item.items()}
return item
skip_keys = {"__header__", "__version__", "__globals__"}
try:
with h5py.File(file_path, "r") as fh:
return {k: _visit(v) for k, v in fh.items() if k not in skip_keys}
except Exception:
return None
def _load_mat_file(file_path):
"""Load a MATLAB ``.mat`` file with automatic format detection.
Parameters
----------
file_path : str
Path to a ``.mat`` file.
Returns
-------
dict or None
Parsed data dictionary on success, otherwise ``None``.
Examples
--------
>>> _load_mat_file("example.mat") # doctest: +SKIP
"""
try:
return loadmat(file_path, simplify_cells=True)
except NotImplementedError:
return _load_mat_h5py(file_path)
except Exception:
return None
def _process_single_mat_file(file_path):
"""
Process a single MAT file and return metadata dict or None.
Parameters
----------
file_path : str
Path to the MAT file.
Returns
-------
dict or None
A dictionary containing extracted metadata, or None if the file cannot be processed.
"""
file_size = os.path.getsize(file_path)
if file_size <= 5 * 1024 * 1024:
return None
data = _load_mat_file(file_path)
if data is None:
print(f" [eeg_parser_mat] Skipping {file_path}: cannot parse MAT file.")
return None
channel_name = _find_variables_by_condition(data, _condition_sampling_channel_name,
max_depth=10, max_width=50)
sampling_rate = _find_variables_by_condition(data, _condition_sampling_rate,
max_depth=10, max_width=50)
source_data = _find_variables_by_condition(data, _condition_source_data,
max_depth=10, max_width=50)
source_data_3d = _find_variables_by_condition(data, _condition_source_data_3d,
max_depth=10, max_width=50)
result = {}
if isinstance(source_data[1], ndarray):
result['Sampling Rate'] = str(sampling_rate[1]).strip("HhZz")
if isinstance(channel_name[1], ndarray):
result['Channel Names'] = ','.join(str(x) for x in channel_name[1])
result['Number of Channels'] = str(min(source_data[1].shape))
result['Data Shape'] = str(source_data[1].shape)
if _is_numeric(result['Sampling Rate']):
result['Duration'] = str(max(source_data[1].shape) / float(result['Sampling Rate']))
else:
result['Duration'] = ''
result['File Type'] = "matRawData:" + str(source_data[0])
return result
elif isinstance(source_data_3d[1], ndarray):
result['Sampling Rate'] = str(sampling_rate[1]).strip("HhZz")
if isinstance(channel_name[1], ndarray):
print(','.join(str(x) for x in channel_name[1]))
result['Channel Names'] = ','.join(str(x) for x in channel_name[1])
result['Number of Channels'] = str(len(channel_name[1]))
result['Data Shape'] = str(source_data_3d[1].shape)
if _is_numeric(result['Sampling Rate']):
result['Duration'] = str(max(source_data_3d[1].shape) / float(result['Sampling Rate']))
else:
result['Duration'] = ''
result['File Type'] = "matEpochData:" + str(source_data_3d[0])
return result
else:
return None
[docs]
def process_hdf5_set_files(files_locator, num_workers=0):
"""Process EEGLAB .set files saved in HDF5 (MATLAB v7.3) format.
Targets .set files that failed MNE reading with a "HDF reader" error.
Extracts metadata (channels, srate, duration) via h5py without loading
raw signal data.
Parameters
----------
files_locator : pandas.DataFrame
Locator DataFrame; must already contain an 'Error' column populated by
process_mne_files().
num_workers : int, optional
Number of parallel worker threads (0 = sequential).
Returns
-------
pandas.DataFrame
Updated DataFrame with metadata filled for readable HDF5 .set files.
Examples
--------
>>> process_hdf5_set_files(locator_df, num_workers=2) # doctest: +SKIP
"""
if 'Error' not in files_locator.columns:
return files_locator
eligible = []
for idx, row in files_locator.iterrows():
path = row['File Path']
error = str(row.get('Error', ''))
if (path.endswith('.set')
and str(row.get('File Type', 'unknown')) == 'unknown'
and 'HDF reader' in error):
eligible.append((idx, path))
if not eligible:
return files_locator
indices, file_paths = zip(*eligible)
if num_workers > 0:
with ThreadPoolExecutor(max_workers=num_workers) as executor:
results = list(executor.map(_process_single_hdf5_set_file, file_paths))
else:
results = [_process_single_hdf5_set_file(fp) for fp in file_paths]
for idx, result in zip(indices, results):
if result is not None:
for key, value in result.items():
files_locator.at[idx, key] = '' if value is None else str(value)
return files_locator
def _process_single_hdf5_set_file(file_path):
"""Extract metadata from a single HDF5-format EEGLAB .set file.
Parameters
----------
file_path : str
Path to the .set file.
Returns
-------
dict or None
Metadata dict with keys compatible with the locator DataFrame, or None
if the file cannot be parsed.
Examples
--------
>>> _process_single_hdf5_set_file("sample.set") # doctest: +SKIP
"""
try:
import h5py
except ImportError:
print(" [eeg_parser_mat] h5py not installed; cannot read HDF5 .set files.")
return None
try:
with h5py.File(file_path, 'r') as hf:
srate = float(np.squeeze(hf['srate'][()]))
nbchan = int(np.squeeze(hf['nbchan'][()]))
pnts = int(np.squeeze(hf['pnts'][()]))
# Channel labels: stored as (nchan, 1) object-reference array
labels = []
try:
lab = hf['chanlocs']['labels'] # shape (nchan, 1)
for i in range(lab.shape[0]):
ref = lab[i, 0]
chars = hf[ref][:]
label = ''.join(chr(int(c)) for c in chars.flatten())
labels.append(label.strip())
except Exception:
pass
if len(labels) != nbchan:
labels = [f'Ch{i + 1}' for i in range(nbchan)]
return {
'File Type': 'eeglab_hdf5',
'Sampling Rate': str(srate),
'Number of Channels': str(nbchan),
'Channel Names': ','.join(labels),
'Data Shape': f'({nbchan}, {pnts})',
'Duration': str(pnts / srate),
}
except Exception as e:
print(f" [eeg_parser_mat] HDF5 .set skip {file_path}: {e}")
return None
[docs]
def read_eeglab_hdf5(filepath, preload=True, verbose='CRITICAL'):
"""Read a HDF5-format EEGLAB .set file into an MNE RawArray.
Used by handle_nonstandard_data() when file_type == 'eeglab_hdf5'.
When preload=False a zero-filled array is returned (metadata + annotations
only), which is sufficient for kernels that only need sidecar files and
raw.annotations.
Parameters
----------
filepath : str
Path to the HDF5 .set file.
preload : bool, optional
If True, load the full EEG signal. If False, return a stub RawArray
with annotations only (faster, suitable for metadata-only kernels).
verbose : str, optional
MNE verbosity level.
Returns
-------
mne.io.RawArray
MNE Raw object with channel info and (when available) annotations.
Examples
--------
>>> raw = read_eeglab_hdf5("sample.set", preload=False) # doctest: +SKIP
"""
import h5py
import mne
with h5py.File(filepath, 'r') as hf:
srate = float(np.squeeze(hf['srate'][()]))
nbchan = int(np.squeeze(hf['nbchan'][()]))
pnts = int(np.squeeze(hf['pnts'][()]))
# Channel labels: stored as (nchan, 1) object-reference array
labels = []
try:
lab = hf['chanlocs']['labels'] # shape (nchan, 1)
for i in range(lab.shape[0]):
ref = lab[i, 0]
chars = hf[ref][:]
label = ''.join(chr(int(c)) for c in chars.flatten())
labels.append(label.strip())
except Exception:
pass
if len(labels) != nbchan:
labels = [f'Ch{i + 1}' for i in range(nbchan)]
info = mne.create_info(ch_names=labels, sfreq=srate,
ch_types='eeg', verbose=verbose)
if preload:
# Data shape in HDF5 is (pnts, nchan), transpose to (nchan, pnts).
data = np.array(hf['data'], dtype=np.float64).T
else:
data = np.zeros((nbchan, 1), dtype=np.float64)
raw = mne.io.RawArray(data, info, verbose=verbose)
# Extract event annotations from the EEGLAB event structure
try:
onsets, durations, descriptions = [], [], []
evt = hf.get('event')
if evt is not None and 'latency' in evt and 'type' in evt:
lats = evt['latency']
typs = evt['type']
for i in range(len(lats)):
try:
if lats.dtype.kind == 'O':
lat_val = float(np.squeeze(hf[lats[i]][()]))
else:
lat_val = float(lats[i])
onset = lat_val / srate
if typs.dtype.kind == 'O':
typ_chars = hf[typs[i]][:]
desc = ''.join(chr(int(c)) for c in typ_chars.flatten())
else:
desc = str(typs[i])
onsets.append(onset)
durations.append(0.0)
descriptions.append(desc.strip())
except Exception:
continue
if onsets:
raw.set_annotations(
mne.Annotations(onsets, durations, descriptions))
except Exception:
pass
return raw
[docs]
def process_mat_files(files_locator, num_workers=0):
"""
Process MAT files and update a DataFrame with file details.
Parameters
----------
files_locator : pandas.DataFrame
A DataFrame containing the metadata of files, including their file paths and other details.
The column 'File Path' is expected to contain paths to the MAT files.
num_workers : int, optional
Number of worker threads for parallel processing (default is 0, sequential).
Returns
-------
pandas.DataFrame
Updated DataFrame with additional columns 'File Type', 'Sampling Rate', 'Channel Names', 'Number of Channels', and 'Duration' for each file.
If a file cannot be processed, appropriate messages are printed.
Raises
------
FileNotFoundError
If the MAT file cannot be located.
Exception
General exception for unexpected errors during file processing.
Examples
--------
>>> process_mat_files(locator_df, num_workers=0) # doctest: +SKIP
"""
# Collect indices of eligible files
eligible = []
for index, row in files_locator.iterrows():
file_path = row['File Path']
file_type = row['File Type']
if file_path.endswith('.mat') and file_type == 'unknown':
eligible.append((index, file_path))
if not eligible:
return files_locator
indices, file_paths = zip(*eligible)
if num_workers > 0:
with ThreadPoolExecutor(max_workers=num_workers) as executor:
results = list(executor.map(_process_single_mat_file, file_paths))
else:
results = [_process_single_mat_file(fp) for fp in file_paths]
for idx, result in zip(indices, results):
if result is not None:
for key, value in result.items():
files_locator.at[idx, key] = '' if value is None else str(value)
return files_locator
def _find_variables_by_condition(data, condition_func, max_depth=5, max_width=5, debug=False):
"""
Search for variables in a nested data structure that satisfy a given condition.
Parameters
----------
data : dict
The data structure to search through, typically loaded from a .mat file.
condition_func : function
A function that takes in a variable's path and value, and returns a boolean indicating whether the variable meets the specified condition.
max_depth : int, optional
The maximum depth to search within the nested structure. Defaults to 5.
max_width : int, optional
The maximum number of items to check at each depth level. Defaults to 5.
debug : bool, optional
If True, enables additional logging for debugging purposes. Defaults to False.
Returns
-------
tuple
A tuple containing the first variable's name and its value that satisfies the condition.
If no variable satisfies the condition, returns ("unknown", '').
"""
satisfying_variables = []
_search_data(data, '', condition_func, satisfying_variables, 0, max_depth, max_width, debug=debug)
if len(satisfying_variables) == 0:
return "unknown", ''
else:
return satisfying_variables[0]
def _condition_source_data(var_path, var_value):
"""
Condition function to check if a variable is a 2D ndarray of significant size.
Parameters
----------
var_path : str
The path of the variable within the data structure.
var_value : any
The value of the variable, typically an array or other data structure.
Returns
-------
bool
True if `var_value` is a 2D ndarray larger than 5MB, otherwise False.
"""
if isinstance(var_value, ndarray) and var_value.ndim == 2 and var_value.nbytes > 5 * 1024 * 1024:
return True
return False
def _condition_source_data_3d(var_path, var_value):
"""
Condition function to check if a variable is a 3D ndarray of significant size.
Parameters
----------
var_path : str
The path of the variable within the data structure.
var_value : any
The value of the variable, typically an array or other data structure.
Returns
-------
bool
True if `var_value` is a 3D ndarray larger than 5MB, otherwise False.
"""
if isinstance(var_value, ndarray) and var_value.ndim == 3 and var_value.nbytes > 5 * 1024 * 1024:
return True
return False
def _condition_sampling_rate(var_path, var_value):
"""
Condition function that checks if the variable path contains
sampling-rate related keywords.
Parameters
----------
var_path : str
The path of the variable.
var_value : any
The value of the variable (unused in this condition).
Returns
-------
bool
True if ``'fs'``, ``'fre'``, or ``'rate'`` is in the variable path.
"""
var_path = var_path.lower()
return 'fs' in var_path or 'fre' in var_path or 'rate' in var_path
def _condition_sampling_channel_name(var_path, var_value):
"""
Condition function that checks if the variable path contains 'chan'.
Parameters
----------
var_path : str
The path of the variable.
var_value: any
The value of the variable (unused in this condition).
Returns
-------
bool
True if 'chan' is in the variable path, False otherwise.
"""
var_path = var_path.lower()
return ('chan' in var_path or 'chname' in var_path or 'clab' in var_path) and (
isinstance(var_value, ndarray)) and var_value.shape[0] > 2
def _search_data(data, path, condition_func, satisfying_variables, current_depth=0, max_depth=5, max_width=5,
ignore_keys=None, debug=False):
"""
Recursively search for variables in nested data structures that satisfy a given condition.
Parameters
----------
data : dict or ndarray
The data structure to search through, which can be a dictionary or a NumPy ndarray.
path : str
The current path in the data structure, used for tracking the location of found variables.
condition_func : callable
A function that takes a path and data item and returns True if the item satisfies the search condition.
satisfying_variables : list
A list that will be populated with tuples of paths and their corresponding data that satisfy the condition.
current_depth : int, optional
The current depth of recursion, default is 0.
max_depth : int, optional
The maximum depth to search, default is 5.
max_width : int, optional
The maximum number of items to process at each level, default is 5.
ignore_keys : list, optional
A list of keys to ignore during the search, default excludes certain internal keys.
debug : bool, optional
If True, prints debugging information during the search.
Returns
-------
None
The function modifies the satisfying_variables list in place.
"""
if ignore_keys is None:
ignore_keys = ['__header__', '__version__', '__globals__']
if debug:
print(f"Searching path: {path}, Current depth: {current_depth}")
if current_depth >= max_depth:
if debug:
print(f"Reached maximum depth, stopping search. Current path: {path}")
return # Stop search if maximum depth is reached
if isinstance(data, dict):
for key, value in list(data.items())[:max_width]:
if key in ignore_keys:
continue
new_path = f"{path}.{key}" if path else key
_search_data(value, new_path, condition_func, satisfying_variables, current_depth + 1, max_depth,
max_width, ignore_keys, debug)
elif isinstance(data, list):
if current_depth < max_depth:
for i, item in enumerate(data[:max_width]):
new_path = f"{path}[{i}]" if path else f"[{i}]"
_search_data(item, new_path, condition_func, satisfying_variables, current_depth + 1,
max_depth, max_width, ignore_keys, debug)
elif isinstance(data, ndarray):
if data.dtype.names is not None: # Structured array
for name in list(data.dtype.names)[:max_width]:
try:
nested_value = data[name][0] if data[name].size == 1 else data[name]
except (AttributeError, IndexError):
nested_value = data[name] # Handle potential access to non-ndarray types
new_path = f"{path}.{name}" if path else name
_search_data(nested_value, new_path, condition_func, satisfying_variables, current_depth + 1,
max_depth, max_width, ignore_keys, debug)
else: # Regular ndarray
if current_depth < max_depth:
for i, item in enumerate(data[:max_width]):
try:
item = item.item() if np.isscalar(item) else item
except AttributeError:
pass # item may not be an ndarray
new_path = f"{path}[{i}]" if path else f"[{i}]"
_search_data(item, new_path, condition_func, satisfying_variables, current_depth + 1,
max_depth, max_width, ignore_keys, debug)
if condition_func(path, data):
if debug:
print(f"Found a variable satisfying the condition: {path}") # Print the path of the satisfying variable
satisfying_variables.append((path, data))
elif not isinstance(data, (ndarray, dict)) and debug:
print(f"Non-target type (not ndarray or dict), stopping search. Current path: {path}")