Source code for eegunity.modules.parser.eeg_parser_wfdb

import os
from concurrent.futures import ThreadPoolExecutor
import pandas as pd


def _process_single_wfdb_file(file_path):
    """
    Process a single WFDB header file and return metadata dict or None.

    Parameters
    ----------
    file_path : str
        Path to the WFDB header file (.hea).

    Returns
    -------
    dict or None
        A dictionary containing extracted metadata, or None if the file cannot
        be processed or has no companion .dat data file.

    Examples
    --------
    >>> _process_single_wfdb_file("record.hea")  # doctest: +SKIP
    """
    import wfdb

    dat_path = os.path.splitext(file_path)[0] + '.dat'
    if not os.path.isfile(dat_path):
        return None

    try:
        record_name = os.path.splitext(file_path)[0]
        record = wfdb.rdheader(record_name)

        channel_names = list(record.sig_name)
        sampling_rate = record.fs
        n_samples = record.sig_len
        n_channels = record.n_sig

        # Deduplicate channel names: if duplicates exist, append _{n} suffix
        # (e.g. 256-channel SSVEP recordings that use generic 'EEG' for most
        # channels). Without dedup the completeness check always fails.
        seen = {}
        for i, name in enumerate(channel_names):
            if name in seen:
                seen[name] += 1
                channel_names[i] = f"{name}_{seen[name]}"
            else:
                seen[name] = 0

        result = {
            'File Type': 'wfdbData',
            'Sampling Rate': sampling_rate,
            'Channel Names': ','.join(channel_names),
            'Number of Channels': n_channels,
            'Data Shape': f'({n_channels}, {n_samples})',
            'Duration': n_samples / sampling_rate if sampling_rate else '',
        }
        return result
    except Exception as e:
        print(f"Error processing WFDB file {file_path}: {e}")
        return None


[docs] def process_wfdb_files(files_locator, num_workers=0): """ Process WFDB header files and update a DataFrame with file details. Parameters ---------- files_locator : pandas.DataFrame A DataFrame containing the metadata of files, including their file paths and other details. The column 'File Path' is expected to contain paths to the files. Only rows with 'File Type' equal to 'unknown' are processed. num_workers : int, optional Number of worker threads for parallel processing (default is 0, sequential). Returns ------- pandas.DataFrame Updated DataFrame with additional columns 'File Type', 'Sampling Rate', 'Channel Names', 'Number of Channels', 'Data Shape', and 'Duration' for each eligible WFDB file. Files without a companion .dat file or that cannot be parsed are left unchanged. Examples -------- >>> process_wfdb_files(locator_df, num_workers=2) # doctest: +SKIP """ eligible = [] for index, row in files_locator.iterrows(): file_path = row['File Path'] file_type = row['File Type'] if file_path.endswith('.hea') and file_type == 'unknown': eligible.append((index, file_path)) if not eligible: return files_locator indices, file_paths = zip(*eligible) if num_workers > 0: with ThreadPoolExecutor(max_workers=num_workers) as executor: results = list(executor.map(_process_single_wfdb_file, file_paths)) else: results = [_process_single_wfdb_file(fp) for fp in file_paths] for idx, result in zip(indices, results): if result is not None: for key, value in result.items(): files_locator.at[idx, key] = pd.NA if pd.isna(value) else str(value) return files_locator