import os
import re
import numpy as np
from concurrent.futures import ThreadPoolExecutor
from numpy import ndarray
from scipy.io import loadmat
def _is_numeric(s):
"""Match integer or floating-point numbers."""
pattern = r'^-?\d+(\.\d+)?$'
return bool(re.match(pattern, s))
def _process_single_mat_file(file_path):
"""
Process a single MAT file and return metadata dict or None.
Parameters
----------
file_path : str
Path to the MAT file.
Returns
-------
dict or None
A dictionary containing extracted metadata, or None if the file cannot be processed.
"""
file_size = os.path.getsize(file_path)
if file_size <= 5 * 1024 * 1024:
return None
data = loadmat(file_path, simplify_cells=True)
channel_name = _find_variables_by_condition(data, _condition_sampling_channel_name,
max_depth=5, max_width=20)
sampling_rate = _find_variables_by_condition(data, _condition_sampling_rate,
max_depth=5, max_width=20)
source_data = _find_variables_by_condition(data, _condition_source_data,
max_depth=5, max_width=20)
source_data_3d = _find_variables_by_condition(data, _condition_source_data_3d,
max_depth=5, max_width=20)
result = {}
if isinstance(source_data[1], ndarray):
result['Sampling Rate'] = str(sampling_rate[1]).strip("HhZz")
if isinstance(channel_name[1], ndarray):
result['Channel Names'] = ','.join(str(x) for x in channel_name[1])
result['Number of Channels'] = str(min(source_data[1].shape))
result['Data Shape'] = str(source_data[1].shape)
if _is_numeric(result['Sampling Rate']):
result['Duration'] = str(max(source_data[1].shape) / float(result['Sampling Rate']))
else:
result['Duration'] = ''
result['File Type'] = "matRawData:" + str(source_data[0])
return result
elif isinstance(source_data_3d[1], ndarray):
result['Sampling Rate'] = str(sampling_rate[1]).strip("HhZz")
if isinstance(channel_name[1], ndarray):
print(','.join(str(x) for x in channel_name[1]))
result['Channel Names'] = ','.join(str(x) for x in channel_name[1])
result['Number of Channels'] = str(len(channel_name[1]))
result['Data Shape'] = str(source_data_3d[1].shape)
if _is_numeric(result['Sampling Rate']):
result['Duration'] = str(max(source_data_3d[1].shape) / float(result['Sampling Rate']))
else:
result['Duration'] = ''
result['File Type'] = "matEpochData:" + str(source_data_3d[0])
return result
else:
return None
[docs]
def process_mat_files(files_locator, num_workers=0):
"""
Process MAT files and update a DataFrame with file details.
Parameters
----------
files_locator : pandas.DataFrame
A DataFrame containing the metadata of files, including their file paths and other details.
The column 'File Path' is expected to contain paths to the MAT files.
num_workers : int, optional
Number of worker threads for parallel processing (default is 0, sequential).
Returns
-------
pandas.DataFrame
Updated DataFrame with additional columns 'File Type', 'Sampling Rate', 'Channel Names', 'Number of Channels', and 'Duration' for each file.
If a file cannot be processed, appropriate messages are printed.
Raises
------
FileNotFoundError
If the MAT file cannot be located.
Exception
General exception for unexpected errors during file processing.
"""
# Collect indices of eligible files
eligible = []
for index, row in files_locator.iterrows():
file_path = row['File Path']
file_type = row['File Type']
if file_path.endswith('.mat') and file_type == 'unknown':
eligible.append((index, file_path))
if not eligible:
return files_locator
indices, file_paths = zip(*eligible)
if num_workers > 0:
with ThreadPoolExecutor(max_workers=num_workers) as executor:
results = list(executor.map(_process_single_mat_file, file_paths))
else:
results = [_process_single_mat_file(fp) for fp in file_paths]
for idx, result in zip(indices, results):
if result is not None:
for key, value in result.items():
files_locator.at[idx, key] = value
return files_locator
def _find_variables_by_condition(data, condition_func, max_depth=5, max_width=5, debug=False):
"""
Search for variables in a nested data structure that satisfy a given condition.
Parameters
----------
data : dict
The data structure to search through, typically loaded from a .mat file.
condition_func : function
A function that takes in a variable's path and value, and returns a boolean indicating whether the variable meets the specified condition.
max_depth : int, optional
The maximum depth to search within the nested structure. Defaults to 5.
max_width : int, optional
The maximum number of items to check at each depth level. Defaults to 5.
debug : bool, optional
If True, enables additional logging for debugging purposes. Defaults to False.
Returns
-------
tuple
A tuple containing the first variable's name and its value that satisfies the condition.
If no variable satisfies the condition, returns ("unknown", '').
"""
satisfying_variables = []
_search_data(data, '', condition_func, satisfying_variables, 0, max_depth, max_width, debug=debug)
if len(satisfying_variables) == 0:
return "unknown", ''
else:
return satisfying_variables[0]
def _condition_source_data(var_path, var_value):
"""
Condition function to check if a variable is a 2D ndarray of significant size.
Parameters
----------
var_path : str
The path of the variable within the data structure.
var_value : any
The value of the variable, typically an array or other data structure.
Returns
-------
bool
True if `var_value` is a 2D ndarray larger than 5MB, otherwise False.
"""
if isinstance(var_value, ndarray) and var_value.ndim == 2 and var_value.nbytes > 5 * 1024 * 1024:
return True
return False
def _condition_source_data_3d(var_path, var_value):
"""
Condition function to check if a variable is a 3D ndarray of significant size.
Parameters
----------
var_path : str
The path of the variable within the data structure.
var_value : any
The value of the variable, typically an array or other data structure.
Returns
-------
bool
True if `var_value` is a 3D ndarray larger than 5MB, otherwise False.
"""
if isinstance(var_value, ndarray) and var_value.ndim == 3 and var_value.nbytes > 5 * 1024 * 1024:
return True
return False
def _condition_sampling_rate(var_path, var_value):
"""
Condition function that checks if the variable path contains 'fs' or 'fre'.
Parameters
----------
var_path : str
The path of the variable.
var_value : any
The value of the variable (unused in this condition).
Returns
-------
bool
True if 'fs' or 'fre' is in the variable path, False otherwise.
"""
var_path = var_path.lower()
return 'fs' in var_path or 'fre' in var_path
def _condition_sampling_channel_name(var_path, var_value):
"""
Condition function that checks if the variable path contains 'chan'.
Parameters
----------
var_path : str
The path of the variable.
var_value: any
The value of the variable (unused in this condition).
Returns
-------
bool
True if 'chan' is in the variable path, False otherwise.
"""
var_path = var_path.lower()
return ('chan' in var_path or 'chname' in var_path or 'clab' in var_path) and (
isinstance(var_value, ndarray)) and var_value.shape[0] > 2
def _search_data(data, path, condition_func, satisfying_variables, current_depth=0, max_depth=5, max_width=5,
ignore_keys=None, debug=False):
"""
Recursively search for variables in nested data structures that satisfy a given condition.
Parameters
----------
data : dict or ndarray
The data structure to search through, which can be a dictionary or a NumPy ndarray.
path : str
The current path in the data structure, used for tracking the location of found variables.
condition_func : callable
A function that takes a path and data item and returns True if the item satisfies the search condition.
satisfying_variables : list
A list that will be populated with tuples of paths and their corresponding data that satisfy the condition.
current_depth : int, optional
The current depth of recursion, default is 0.
max_depth : int, optional
The maximum depth to search, default is 5.
max_width : int, optional
The maximum number of items to process at each level, default is 5.
ignore_keys : list, optional
A list of keys to ignore during the search, default excludes certain internal keys.
debug : bool, optional
If True, prints debugging information during the search.
Returns
-------
None
The function modifies the satisfying_variables list in place.
"""
if ignore_keys is None:
ignore_keys = ['__header__', '__version__', '__globals__']
if debug:
print(f"Searching path: {path}, Current depth: {current_depth}")
if current_depth >= max_depth:
if debug:
print(f"Reached maximum depth, stopping search. Current path: {path}")
return # Stop search if maximum depth is reached
if isinstance(data, dict):
for key, value in list(data.items())[:max_width]:
if key in ignore_keys:
continue
new_path = f"{path}.{key}" if path else key
_search_data(value, new_path, condition_func, satisfying_variables, current_depth + 1, max_depth,
max_width, ignore_keys, debug)
elif isinstance(data, ndarray):
if data.dtype.names is not None: # Structured array
for name in list(data.dtype.names)[:max_width]:
try:
nested_value = data[name][0] if data[name].size == 1 else data[name]
except (AttributeError, IndexError):
nested_value = data[name] # Handle potential access to non-ndarray types
new_path = f"{path}.{name}" if path else name
_search_data(nested_value, new_path, condition_func, satisfying_variables, current_depth + 1,
max_depth, max_width, ignore_keys, debug)
else: # Regular ndarray
if current_depth < max_depth:
for i, item in enumerate(data[:max_width]):
try:
item = item.item() if np.isscalar(item) else item
except AttributeError:
pass # item may not be an ndarray
new_path = f"{path}[{i}]" if path else f"[{i}]"
_search_data(item, new_path, condition_func, satisfying_variables, current_depth + 1,
max_depth, max_width, ignore_keys, debug)
if condition_func(path, data):
if debug:
print(f"Found a variable satisfying the condition: {path}") # Print the path of the satisfying variable
satisfying_variables.append((path, data))
elif not isinstance(data, (ndarray, dict)) and debug:
print(f"Non-target type (not ndarray or dict), stopping search. Current path: {path}")