Source code for eegunity.unifieddataset
import copy
from eegunity.modules.batch.eeg_batch import EEGBatch
from eegunity.modules.correction.eeg_correction import EEGCorrection
from eegunity.modules.parser.eeg_parser import EEGParser
from eegunity.modules.llm_booster.eeg_llm_booster import EEGLLMBooster
from eegunity._share_attributes import _UDatasetSharedAttributes
from eegunity.modules.kernel.kernel_loader import load_kernel_object
[docs]
class UnifiedDataset(_UDatasetSharedAttributes):
"""
This is the kernel class to manage mutiple EEG datasets and associated processing tools.
Attributes:
-----------
dataset_path : str, optional
Path to the dataset (folder). Should not be provided alongside locator_path.
locator_path : str, optional
Path to the locator. Should not be provided alongside dataset_path.
is_unzip : bool, optional
If set to True, any Zip files in the specified dataset will be unzipped. Be aware that unzipping may modify the dataset.
domain_tag : str, optional
The domain tag identifies the dataset name and is required if you specify a dataset path.
verbose : str, optional
Level of verbosity for logging (default is 'CRITICAL').
eeg_parser : EEGParser
EEGParser module
eeg_batch : EEGBatch
EEGBatch module
eeg_correction : EEGCorrection
EEGCorrection module
llm_booster : EEGLLMBooster
EEGLLMBooster module
"""
def __init__(self, dataset_path: str = None, locator_path: str = None, domain_tag: str = None,
is_unzip: bool = True, verbose: str = 'CRITICAL', num_workers: int = 0,
kernel_spec: str = None, min_file_size: int = 5 * 1024 * 1024):
"""
Initialize the class with either dataset_path or locator_path. Only one of
these parameters should be provided. If dataset_path is provided, domain_tag is required.
Parameters:
-----------
dataset_path : str, optional
Path to the dataset (folder). Note: Do not provide dataset_path if you are using locator_path.
locator_path : str, optional
The file path to the locator (a CSV-like file) that stores all metadata for the UnifiedDataset in EEGUnity. Note: Do not provide locator_path if you are using dataset_path.
domain_tag : str, optional
The domain tag identifies the dataset name. Note: Do not provide domain_tag if you are using locator_path.
is_unzip : bool, optional
A flag indicating whether the dataset should be unzipped (default is True).
verbose : str, optional
The verbosity level for logging (default is 'CRITICAL').
num_workers : int, optional
Number of worker threads for parallel processing (default is 0).
0 means sequential execution in the main thread.
>0 uses a ThreadPoolExecutor with the specified number of workers.
kernel_spec : str, optional
Specification string for the processing kernel. Defaults to ``None``.
min_file_size : int, optional
Minimum file size in bytes for CSV/TXT files scanned during dataset
parsing. Defaults to ``5 * 1024 * 1024``.
Raises:
-------
ValueError
If both dataset_path and locator_path are provided, or neither is provided.
Examples
--------
>>> unified_dataset = UnifiedDataset("path/to/your/dataset")
>>> unified_dataset_locator = UnifiedDataset(locator_path="path/to/your/locator.csv")
>>> unified_dataset_small = UnifiedDataset("path/to/your/dataset", min_file_size=0)
"""
super().__init__()
# Validate num_workers
if not isinstance(num_workers, int) or num_workers < 0:
raise ValueError("'num_workers' must be a non-negative integer.")
# Ensure only one of dataset_path or locator_path is provided
if dataset_path and locator_path:
raise ValueError("Only one of 'dataset_path' or 'locator_path' can be provided, not both.")
if not dataset_path and not locator_path:
raise ValueError("One of 'dataset_path' or 'locator_path' must be provided.")
# Ensure domain_tag is provided when dataset_path is used
if dataset_path and not domain_tag:
domain_tag = "not_specified"
# Set attributes
self.set_shared_attr({'dataset_path': dataset_path})
self.set_shared_attr({'locator_path': locator_path})
self.set_shared_attr({'is_unzip': is_unzip})
self.set_shared_attr({'domain_tag': domain_tag})
self.set_shared_attr({'verbose': verbose})
self.set_shared_attr({'num_workers': num_workers})
self.set_shared_attr({'min_file_size': min_file_size})
# --- anchor: after set_shared_attr(...) in __init__ ---
self.set_shared_attr({'kernel_spec': kernel_spec})
self.set_shared_attr({'kernel': None})
if kernel_spec:
self.load_kernel(kernel_spec)
# Initialize associated modules
self.eeg_parser = EEGParser(self)
self.eeg_batch = EEGBatch(self)
self.eeg_correction = EEGCorrection(self)
self.module_eeg_llm_booster = EEGLLMBooster(self)
[docs]
def copy(self):
"""
Create a deep copy of the UnifiedDataset instance.
Returns:
--------
UnifiedDataset
A deep copy of the current UnifiedDataset instance.
"""
return copy.deepcopy(self)
[docs]
def save_locator(self, path):
"""
Save the locator of this UnifiedDataset to a CSV file at the specified path. This file is helpful for checking the current status and metadata after data processing.
You can also reload the UnifiedDataset later by using this locator file, for example:
unified_dataset = UnifiedDataset(locator_path="your_locator_path")
Parameters:
-----------
path : str
The file path where the locator should be saved.
"""
self.get_shared_attr()['locator'].to_csv(path, index=False)
# --- anchor: UnifiedDataset kernel methods ---
[docs]
def load_kernel(self, kernel_spec: str):
"""Load an external kernel and bind it to this dataset.
Parameters
----------
kernel_spec
Spec string in the form ``"<path_or_module>:<object_name>"``.
This can be called at construction time or any time later.
Returns
-------
Any
The loaded kernel object.
Raises
------
ValueError
If the loaded object does not implement ``apply(udataset, raw, row)``.
"""
# --- anchor: UnifiedDataset.load_kernel new loader contract ---
kernel, normalized_spec = load_kernel_object(kernel_spec)
if not hasattr(kernel, "apply") or not callable(getattr(kernel, "apply")):
raise ValueError(
"Invalid kernel object. The module-level variable `KERNEL` must provide "
"a callable `apply(udataset, raw, row)` method."
)
self.set_shared_attr({'kernel_spec': normalized_spec})
self.set_shared_attr({'kernel': kernel})
return kernel
[docs]
def clear_kernel(self):
"""Unbind the current kernel from this dataset."""
self.set_shared_attr({'kernel_spec': None})
self.set_shared_attr({'kernel': None})
[docs]
def get_kernel(self):
"""Return the currently bound kernel object or None."""
return self.get_shared_attr().get('kernel', None)
[docs]
def get_locator(self):
"""
Return the locator in DataFrame.
Returns:
--------
pandas.DataFrame
The locator DataFrame associated with the dataset.
"""
return self.get_shared_attr()['locator']
[docs]
def set_locator(self, new_locator):
"""
Set a new locator for this UnifiedDataset instance.
This allows you to update the metadata for the entire dataset without altering the original raw file.
Parameters:
-----------
new_locator : pandas.DataFrame
The new locator DataFrame to associate with the dataset.
"""
self.get_shared_attr()['locator'] = new_locator
[docs]
def group_by_domain(self):
"""
Groups the locator data by the 'domain_tag' and returns multiple UnifiedDataset instances.
Returns:
--------
List[UnifiedDataset] : A list of UnifiedDataset instances, each grouped by domain.
"""
locator = self.get_shared_attr()['locator']
if 'Domain Tag' not in locator.columns:
raise ValueError("Locator dataframe must have a 'Domain Tag' column.")
# Group the locator data by 'domain_tag'
grouped = locator.groupby('Domain Tag')
datasets = []
for domain, grouped_locator in grouped:
# Create a new UnifiedDataset instance
new_dataset = self.copy()
# Set the new locator for the instance
new_dataset.set_locator(grouped_locator)
# Add the new instance to the result list
datasets.append(new_dataset)
return datasets