Source code for eegunity.unifieddataset

import copy
from eegunity.modules.batch.eeg_batch import EEGBatch
from eegunity.modules.correction.eeg_correction import EEGCorrection
from eegunity.modules.parser.eeg_parser import EEGParser
from eegunity.modules.llm_booster.eeg_llm_booster import EEGLLMBooster
from eegunity._share_attributes import _UDatasetSharedAttributes
from eegunity.modules.kernel.kernel_loader import load_kernel_object


[docs] class UnifiedDataset(_UDatasetSharedAttributes): """ This is the kernel class to manage mutiple EEG datasets and associated processing tools. Attributes: ----------- dataset_path : str, optional Path to the dataset (folder). Should not be provided alongside locator_path. locator_path : str, optional Path to the locator. Should not be provided alongside dataset_path. is_unzip : bool, optional If set to True, any Zip files in the specified dataset will be unzipped. Be aware that unzipping may modify the dataset. domain_tag : str, optional The domain tag identifies the dataset name and is required if you specify a dataset path. verbose : str, optional Level of verbosity for logging (default is 'CRITICAL'). eeg_parser : EEGParser EEGParser module eeg_batch : EEGBatch EEGBatch module eeg_correction : EEGCorrection EEGCorrection module llm_booster : EEGLLMBooster EEGLLMBooster module """ def __init__(self, dataset_path: str = None, locator_path: str = None, domain_tag: str = None, is_unzip: bool = True, verbose: str = 'CRITICAL', num_workers: int = 0, kernel_spec: str = None, min_file_size: int = 5 * 1024 * 1024): """ Initialize the class with either dataset_path or locator_path. Only one of these parameters should be provided. If dataset_path is provided, domain_tag is required. Parameters: ----------- dataset_path : str, optional Path to the dataset (folder). Note: Do not provide dataset_path if you are using locator_path. locator_path : str, optional The file path to the locator (a CSV-like file) that stores all metadata for the UnifiedDataset in EEGUnity. Note: Do not provide locator_path if you are using dataset_path. domain_tag : str, optional The domain tag identifies the dataset name. Note: Do not provide domain_tag if you are using locator_path. is_unzip : bool, optional A flag indicating whether the dataset should be unzipped (default is True). verbose : str, optional The verbosity level for logging (default is 'CRITICAL'). num_workers : int, optional Number of worker threads for parallel processing (default is 0). 0 means sequential execution in the main thread. >0 uses a ThreadPoolExecutor with the specified number of workers. kernel_spec : str, optional Specification string for the processing kernel. Defaults to ``None``. min_file_size : int, optional Minimum file size in bytes for CSV/TXT files scanned during dataset parsing. Defaults to ``5 * 1024 * 1024``. Raises: ------- ValueError If both dataset_path and locator_path are provided, or neither is provided. Examples -------- >>> unified_dataset = UnifiedDataset("path/to/your/dataset") >>> unified_dataset_locator = UnifiedDataset(locator_path="path/to/your/locator.csv") >>> unified_dataset_small = UnifiedDataset("path/to/your/dataset", min_file_size=0) """ super().__init__() # Validate num_workers if not isinstance(num_workers, int) or num_workers < 0: raise ValueError("'num_workers' must be a non-negative integer.") # Ensure only one of dataset_path or locator_path is provided if dataset_path and locator_path: raise ValueError("Only one of 'dataset_path' or 'locator_path' can be provided, not both.") if not dataset_path and not locator_path: raise ValueError("One of 'dataset_path' or 'locator_path' must be provided.") # Ensure domain_tag is provided when dataset_path is used if dataset_path and not domain_tag: domain_tag = "not_specified" # Set attributes self.set_shared_attr({'dataset_path': dataset_path}) self.set_shared_attr({'locator_path': locator_path}) self.set_shared_attr({'is_unzip': is_unzip}) self.set_shared_attr({'domain_tag': domain_tag}) self.set_shared_attr({'verbose': verbose}) self.set_shared_attr({'num_workers': num_workers}) self.set_shared_attr({'min_file_size': min_file_size}) # --- anchor: after set_shared_attr(...) in __init__ --- self.set_shared_attr({'kernel_spec': kernel_spec}) self.set_shared_attr({'kernel': None}) if kernel_spec: self.load_kernel(kernel_spec) # Initialize associated modules self.eeg_parser = EEGParser(self) self.eeg_batch = EEGBatch(self) self.eeg_correction = EEGCorrection(self) self.module_eeg_llm_booster = EEGLLMBooster(self)
[docs] def copy(self): """ Create a deep copy of the UnifiedDataset instance. Returns: -------- UnifiedDataset A deep copy of the current UnifiedDataset instance. """ return copy.deepcopy(self)
[docs] def save_locator(self, path): """ Save the locator of this UnifiedDataset to a CSV file at the specified path. This file is helpful for checking the current status and metadata after data processing. You can also reload the UnifiedDataset later by using this locator file, for example: unified_dataset = UnifiedDataset(locator_path="your_locator_path") Parameters: ----------- path : str The file path where the locator should be saved. """ self.get_shared_attr()['locator'].to_csv(path, index=False)
# --- anchor: UnifiedDataset kernel methods ---
[docs] def load_kernel(self, kernel_spec: str): """Load an external kernel and bind it to this dataset. Parameters ---------- kernel_spec Spec string in the form ``"<path_or_module>:<object_name>"``. This can be called at construction time or any time later. Returns ------- Any The loaded kernel object. Raises ------ ValueError If the loaded object does not implement ``apply(udataset, raw, row)``. """ # --- anchor: UnifiedDataset.load_kernel new loader contract --- kernel, normalized_spec = load_kernel_object(kernel_spec) if not hasattr(kernel, "apply") or not callable(getattr(kernel, "apply")): raise ValueError( "Invalid kernel object. The module-level variable `KERNEL` must provide " "a callable `apply(udataset, raw, row)` method." ) self.set_shared_attr({'kernel_spec': normalized_spec}) self.set_shared_attr({'kernel': kernel}) return kernel
[docs] def clear_kernel(self): """Unbind the current kernel from this dataset.""" self.set_shared_attr({'kernel_spec': None}) self.set_shared_attr({'kernel': None})
[docs] def get_kernel(self): """Return the currently bound kernel object or None.""" return self.get_shared_attr().get('kernel', None)
[docs] def get_locator(self): """ Return the locator in DataFrame. Returns: -------- pandas.DataFrame The locator DataFrame associated with the dataset. """ return self.get_shared_attr()['locator']
[docs] def set_locator(self, new_locator): """ Set a new locator for this UnifiedDataset instance. This allows you to update the metadata for the entire dataset without altering the original raw file. Parameters: ----------- new_locator : pandas.DataFrame The new locator DataFrame to associate with the dataset. """ self.get_shared_attr()['locator'] = new_locator
[docs] def group_by_domain(self): """ Groups the locator data by the 'domain_tag' and returns multiple UnifiedDataset instances. Returns: -------- List[UnifiedDataset] : A list of UnifiedDataset instances, each grouped by domain. """ locator = self.get_shared_attr()['locator'] if 'Domain Tag' not in locator.columns: raise ValueError("Locator dataframe must have a 'Domain Tag' column.") # Group the locator data by 'domain_tag' grouped = locator.groupby('Domain Tag') datasets = [] for domain, grouped_locator in grouped: # Create a new UnifiedDataset instance new_dataset = self.copy() # Set the new locator for the instance new_dataset.set_locator(grouped_locator) # Add the new instance to the result list datasets.append(new_dataset) return datasets