arduino_data
This module defines the ArduinoData class, responsible for storing data related to the Arduino controller that persists over different sesssions, primarily valve open durations and experimental schedule information.
It handles loading and saving valve timing profiles (including archiving) to a
TOML configuration file at the current user's Documents/Photologic-Experiment-Rig-Files
directory.
It also processes incoming data strings from the Arduino
during an experiment, parsing lick events and motor movements, and recording
them into the main experiment data structure (ExperimentProcessData
). Relies on
system_config
to locate configuration files and interacts with an ExperimentProcessData
instance passed during initialization.
1""" 2This module defines the ArduinoData class, responsible for storing 3data related to the Arduino controller that persists over different sesssions, 4primarily valve open durations and experimental schedule information. 5 6It handles loading and saving valve timing profiles (including archiving) to a 7TOML configuration file at the current user's `Documents/Photologic-Experiment-Rig-Files` directory. 8 9It also processes incoming data strings from the Arduino 10during an experiment, parsing lick events and motor movements, and recording 11them into the main experiment data structure (`ExperimentProcessData`). Relies on 12`system_config` to locate configuration files and interacts with an `ExperimentProcessData` 13instance passed during initialization. 14""" 15 16import logging 17import datetime 18import copy 19from typing import Callable 20import toml 21import numpy as np 22import numpy.typing as npt 23from models.event_data import EventData 24import system_config 25 26from typing import TYPE_CHECKING 27 28if TYPE_CHECKING: 29 ###TYPE HINTING### 30 from models.experiment_process_data import ExperimentProcessData 31 ###TYPE HINTING### 32 33# Get the logger in use for the app 34logger = logging.getLogger() 35 36rig_config = system_config.get_rig_config() 37 38with open(rig_config, "r") as f: 39 VALVE_CONFIG = toml.load(f)["valve_config"] 40 41# pull total valves constant from toml config 42TOTAL_POSSIBLE_VALVES = VALVE_CONFIG["TOTAL_POSSIBLE_VALVES"] 43VALVES_PER_SIDE = TOTAL_POSSIBLE_VALVES // 2 44 45 46class ArduinoData: 47 """ 48 Manages Arduino-related data, focusing on valve duration storage persistence and 49 processing incoming serial data during experiments. 50 51 This class acts as an interface for saving and retrieving valve open durations 52 from the TOML configuration file, implementing a simple archival and 'profile' system. It also 53 contains methods to interpret data strings sent from the Arduino (like lick 54 events and motor status) and record relevant information into an associated 55 ExperimentProcessData pandas dataframe. 56 57 Attributes 58 ---------- 59 - **`exp_data`** (*ExperimentProcessData*): A reference to the main experiment data object, used for accessing schedule information and recording processed events. 60 61 Methods 62 ------- 63 - `find_first_not_filled`(...) 64 Helper method to find the first available archive slot in the durations TOML file. 65 - `save_durations`(...) 66 Saves provided valve durations (side_one, side_two) either as the 'selected' profile or into an archive slot. 67 - `load_durations`(...) 68 Loads a specified valve duration profile (defaulting to 'selected') from the TOML file. 69 - `load_schedule_indices`() 70 Generates 0-indexed numpy arrays representing the valve schedule for an experiment based on `ExperimentProcessData` program_schedule_df. 71 - `increment_licks`(...) 72 Increments the appropriate lick counter in the `ExperimentProcessData`. 73 - `handle_licks`(...) 74 Parses and processes data strings specifically identified as lick events. 75 - `record_event`(...) 76 Records a processed event (lick or motor movement) into the `ExperimentProcessData` DataFrame. 77 - `process_data`(...) 78 Main entry point for parsing incoming data strings from the Arduino, routing to specific handlers (like `handle_licks`) based on content. 79 """ 80 81 def __init__(self, exp_data): 82 """ 83 Initializes the ArduinoData manager. 84 85 Parameters 86 ---------- 87 - **exp_data** (*ExperimentProcessData*): The instance of the ExperimentProcessData class holding all current experiment configuration and trial data. 88 """ 89 self.exp_data = exp_data 90 91 def find_first_not_filled(self, toml_file: dict) -> int: 92 """ 93 Searches the loaded TOML file structure for the first available archive slot. 94 95 Checks `archive_1`, `archive_2`, then `archive_3` for attribute `filled = false`. 96 97 Parameters 98 ---------- 99 - **toml_file** (*dict*): The dictionary representation of the loaded valve durations TOML file. 100 101 Returns 102 ------- 103 - *int*: The index (1, 2, or 3) of the first unfilled archive slot. Returns 0 if all slots (1, 2, 3) are marked as filled. 104 """ 105 first_available = 0 106 107 for i in range(1, 4): 108 archive = toml_file[f"archive_{i}"] 109 if archive["filled"]: 110 continue 111 else: 112 first_available = i 113 return first_available 114 115 def save_durations( 116 self, 117 side_one: npt.NDArray[np.int32], 118 side_two: npt.NDArray[np.int32], 119 type_durations: str, 120 ) -> None: 121 """ 122 Saves valve open durations to the `valve_durations.toml` configuration file. 123 124 This function handles saving durations either as the primary 'selected' profile 125 or archiving them. When archiving, it implements a FIFO (First-In, First-Out) 126 logic using three archive slots: 127 1. If slot 1 is free, save there. 128 2. If slot 1 is full but 2 is free, move 1->2, save new to 1. 129 3. If slots 1 and 2 are full (or all slots are full), move 2->3 (overwriting 3 if necessary), move 1->2, save new to 1. 130 131 Converts NumPy arrays to Python lists before saving to TOML for compatibility reasons. Records the 132 current datetime for the saved profile to mark when the profile was created. 133 134 Parameters 135 ---------- 136 - **side_one** (*npt.NDArray[np.int32]*): Numpy array of durations (microseconds) for side one valves. 137 - **side_two** (*npt.NDArray[np.int32]*): Numpy array of durations (microseconds) for side two valves. 138 - **type_durations** (*str*): Specifies the save location. Must be either `"selected"` (to update the main profile) or `"archive"` 139 (to save to the next available archive slot). 140 141 Raises 142 ------ 143 - *FileNotFoundError*: If the `valve_durations.toml` file cannot be found at the path specified by `system_config`. 144 - *toml.TomlDecodeError*: If the TOML file is malformed. 145 - *IOError*: If there are issues reading or writing the TOML file. 146 - *KeyError*: If the expected keys (`selected_durations`, `archive_1`, etc.) are missing in the TOML structure. 147 """ 148 149 dur_path = system_config.get_valve_durations() 150 151 with open(dur_path, "r") as f: 152 toml_file = toml.load(f) 153 154 if type_durations == "selected": 155 selected_durations = toml_file["selected_durations"] 156 157 # toml only knows how to deal with python native types (list, int, etc) 158 # so we convert np arrays to lists 159 selected_durations["date_used"] = datetime.datetime.now() 160 selected_durations["side_one_durations"] = side_one.tolist() 161 selected_durations["side_two_durations"] = side_two.tolist() 162 163 # save the file with the updated content 164 with open(dur_path, "w") as f: 165 toml.dump(toml_file, f) 166 elif type_durations == "archive": 167 # find first archive not filled 168 first_avail = self.find_first_not_filled(toml_file) 169 170 match first_avail: 171 case 1: 172 # 1 is available just insert 173 archive = toml_file["archive_1"] 174 175 archive["filled"] = True 176 archive["date_used"] = datetime.datetime.now() 177 archive["side_one_durations"] = side_one.tolist() 178 archive["side_two_durations"] = side_two.tolist() 179 180 with open(dur_path, "w") as f: 181 toml.dump(toml_file, f) 182 case 2: 183 # move 1-> 2, insert 1 184 toml_file["archive_2"] = copy.deepcopy(toml_file["archive_1"]) 185 186 archive_1 = toml_file["archive_1"] 187 188 archive_1["filled"] = True 189 archive_1["date_used"] = datetime.datetime.now() 190 archive_1["side_one_durations"] = side_one.tolist() 191 archive_1["side_two_durations"] = side_two.tolist() 192 193 with open(dur_path, "w") as f: 194 toml.dump(toml_file, f) 195 196 case 3 | 0: 197 # case 3) 2 -> 3, 1-> 2, insert 1 198 # case 0 (no available archives) ) del / overwrite 3 (oldest), 2 -> 3, 1-> 2, insert 1 199 toml_file["archive_3"] = copy.deepcopy(toml_file["archive_2"]) 200 toml_file["archive_2"] = copy.deepcopy(toml_file["archive_1"]) 201 202 archive_1 = toml_file["archive_1"] 203 204 archive_1["filled"] = True 205 archive_1["date_used"] = datetime.datetime.now() 206 archive_1["side_one_durations"] = side_one.tolist() 207 archive_1["side_two_durations"] = side_two.tolist() 208 209 with open(dur_path, "w") as f: 210 toml.dump(toml_file, f) 211 212 def load_durations( 213 self, type_durations="selected_durations" 214 ) -> tuple[npt.NDArray[np.int32], npt.NDArray[np.int32], datetime.datetime]: 215 """ 216 Loads a specific valve duration profile from the `valve_durations.toml` file. 217 218 Retrieves the durations for side one and side two, along with the timestamp 219 when that profile was created. Converts the lists from the TOML file back 220 into NumPy `np.int32` arrays for efficient operations. 221 222 Parameters 223 ---------- 224 - **type_durations** (*str, optional*): The key corresponding to the desired profile in the TOML file 225 (e.g., `"selected_durations"`, `"default_durations"`, `"archive_1"`). Defaults to `"selected_durations"`. 226 227 Returns 228 ------- 229 - *tuple[npt.NDArray[np.int32], npt.NDArray[np.int32], datetime.datetime]*: A tuple containing: 230 - Numpy array of durations for side one. 231 - Numpy array of durations for side two. 232 - The datetime object indicating when the loaded profile was saved. 233 234 Raises 235 ------ 236 - *FileNotFoundError*: If the `valve_durations.toml` file cannot be found. 237 - *toml.TomlDecodeError*: If the TOML file is malformed. 238 - *IOError*: If there are issues reading the TOML file. 239 - *KeyError*: If the specified `type_durations` key or expected sub-keys (`side_one_durations`, `date_used`, etc.) are missing. 240 """ 241 valve_durations_toml = system_config.get_valve_durations() 242 243 with open(valve_durations_toml, "r") as f: 244 toml_file = toml.load(f) 245 246 file_durations = toml_file[type_durations] 247 248 # create 2 np arrays 8 n long with np.int32s 249 dur_side_one = np.full(VALVES_PER_SIDE, np.int32(0)) 250 dur_side_two = np.full(VALVES_PER_SIDE, np.int32(0)) 251 252 # keys here are names of durations i.e side_one_dur or side_two_dur, values 253 # are lists of durations, VALVES_PER_SIDE long. Will produce two lists 254 # VALVES_PER_SIDE long so that each valve in the rig is assigned a duration. 255 for key, value in file_durations.items(): 256 if key == "side_one_durations": 257 for i, duration in enumerate(value): 258 dur_side_one[i] = duration 259 elif key == "side_two_durations": 260 for i, duration in enumerate(value): 261 dur_side_two[i] = duration 262 263 date_used = file_durations["date_used"] 264 265 return dur_side_one, dur_side_two, date_used 266 267 def load_schedule_indices( 268 self, 269 ) -> tuple[npt.NDArray[np.int8], npt.NDArray[np.int8]]: 270 """ 271 Generates 0-indexed valve schedule arrays based on the experiment schedule. 272 273 Uses the stimuli names and the trial-by-trial schedule defined in the 274 associated `ExperimentProcessData` object (`self.exp_data`) to create two NumPy arrays. 275 The index for side one is first found, then the associated valve is found via `exp_data.get_paired_index`. 276 277 These arrays represent the 0-indexed valve number to be activated for each trial 278 on side one and side two, respectively. This format is suitable for sending 279 to the Arduino. 280 281 Returns 282 ------- 283 - *tuple[npt.NDArray[np.int8], npt.NDArray[np.int8]]*: A tuple containing: 284 - Numpy array of 0-indexed valve numbers for side one schedule. 285 - Numpy array of 0-indexed valve numbers for side two schedule. 286 287 Raises 288 ------ 289 - *AttributeError*: If `self.exp_data` or its required attributes/methods (like `stimuli_data`, `program_schedule_df`, `get_paired_index`) are missing or invalid. 290 - *KeyError*: If expected keys (like 'Num Stimuli', 'Port 1') are missing in `exp_data`. 291 - *ValueError*: If a stimulus name from the schedule is not found in the unique stimuli list. 292 """ 293 stimuli_data = self.exp_data.stimuli_data 294 schedule_df = self.exp_data.program_schedule_df 295 296 num_stimuli = self.exp_data.exp_var_entries["Num Stimuli"] 297 num_trials = self.exp_data.exp_var_entries["Num Trials"] 298 299 sched_side_one = np.full(num_trials, np.int8(0)) 300 sched_side_two = np.full(num_trials, np.int8(0)) 301 302 # get unique stim names. this will give us all names on side one. 303 # finding the index of a given stimuli appearing in a trial on side 304 # one will easily allow us to find the paired index. We've already done 305 # this before in exp_process_data 306 names = list(stimuli_data.stimuli_vars.values()) 307 unique_names = names[: num_stimuli // 2] 308 309 side_one_trial_stimuli = schedule_df["Port 1"].to_numpy() 310 311 for i, trial_stim in enumerate(side_one_trial_stimuli): 312 index = unique_names.index(trial_stim) 313 sd_two_index = self.exp_data.get_paired_index(index, num_stimuli) 314 sched_side_one[i] = index 315 sched_side_two[i] = sd_two_index 316 317 return sched_side_one, sched_side_two 318 319 def increment_licks(self, side: int, event_data: EventData): 320 """ 321 Increments the lick counter for the specified side within the EventData object. 322 323 Parameters 324 ---------- 325 - **side** (*int*): The side where the lick occurred (0 for side one, 1 for side two). 326 - **event_data** (*EventData*): The EventData instance associated with `self.exp_data` where lick counts are stored. 327 """ 328 match side: 329 case 0: 330 event_data.side_one_licks += 1 331 case 1: 332 event_data.side_two_licks += 1 333 case _: 334 logger.error("invalid side") 335 336 def handle_licks( 337 self, 338 split_data: list[str], 339 event_data: EventData, 340 state: str, 341 trigger: Callable, 342 ) -> None: 343 """ 344 Parses and records data specifically identified as a lick event. 345 346 Extracts side, lick duration, timestamps, and valve duration (if in "SAMPLE" state) 347 from the `split_data` list based on the current experiment `state` ("TTC" or "SAMPLE"). 348 Calls `increment_licks` and `record_event`. For "TTC" state, it checks if 349 the lick count threshold is met to trigger a state change to "SAMPLE". 350 351 Parameters 352 ---------- 353 - **split_data** (*list[str]*): The list of strings resulting from splitting the incoming Arduino data by content separator '|'. 354 - **event_data** (*EventData*): The EventData instance for accessing lick counts. 355 - **state** (*str*): The current state of the experiment FSM (e.g., "TTC", "SAMPLE"). 356 - **trigger** (*Callable*): The state machine's trigger function, used here to transition state to "SAMPLE" state based on lick counts. 357 358 Raises 359 ------ 360 - *IndexError*: If `split_data` does not contain the expected number of elements for the given state. 361 - *ValueError*: If elements in `split_data` cannot be converted to the expected types (int, np.int8, np.int32). 362 """ 363 side = None 364 valve_duration = None 365 match state: 366 case "TTC": 367 # split_data will have the below form for ttc 368 # 0(side)|67(duration)|6541(stamp_rel_to_start)|6541(stamp_rel_to_trial) 369 # 1|16|0|496570|41 from arduino 370 side = np.int8(split_data[0]) 371 self.increment_licks(side, event_data) 372 373 lick_duration = np.int32(split_data[1]) 374 time_rel_to_start = np.int32(split_data[2]) / 1000 375 time_rel_to_trial = np.int32(split_data[3]) / 1000 376 377 # insert the values held in licks for respective sides in a shorter variable name 378 side_one = self.exp_data.event_data.side_one_licks 379 side_two = self.exp_data.event_data.side_two_licks 380 # if 3 or more licks in a ttc time, jump straight to sample 381 if side_one > 2 or side_two > 2: 382 trigger("SAMPLE") 383 self.record_event( 384 side, lick_duration, time_rel_to_start, time_rel_to_trial, state 385 ) 386 387 case "SAMPLE": 388 try: 389 # 0|87|26064|8327|8327 390 side = np.int8(split_data[0]) 391 self.increment_licks(side, event_data) 392 393 lick_duration = np.int32(split_data[1]) 394 valve_duration = np.int32(split_data[2]) 395 time_rel_to_start = np.int32(split_data[3]) / 1000 396 time_rel_to_trial = np.int32(split_data[4]) / 1000 397 398 self.record_event( 399 side, 400 lick_duration, 401 time_rel_to_start, 402 time_rel_to_trial, 403 state, 404 valve_duration, 405 ) 406 407 except Exception as e: 408 logging.error(f"IMPROPER DATA.... IGNORING.....{e}") 409 410 def record_event( 411 self, 412 side: int, 413 duration: np.int32, 414 time_rel_to_start: float, 415 time_rel_to_trial: float, 416 state: str, 417 valve_dur: np.int32 | None = None, 418 ): 419 """ 420 Records a processed event (lick or motor movement) into the `ExperimentProcessData` `EventData` DataFrame. 421 422 Uses the `insert_row_into_df` method of the `EventData` object (`self.exp_data.event_data`) 423 to add a new row containing the details of the event. 424 425 Parameters 426 ---------- 427 - **side** (*int*): The side associated with the event (0 for side one, 1 for side two). 428 - **duration** (*np.int32*): The duration of the event (e.g., lick contact time) in milliseconds. 429 - **time_rel_to_start** (*float*): The timestamp of the event relative to the start of the entire program, in seconds. 430 - **time_rel_to_trial** (*float*): The timestamp of the event relative to the start of the current trial, in seconds. 431 - **state** (*str*): A string describing the state during the event (e.g., "TTC", "SAMPLE"). 432 - **valve_dur** (*np.int32 | None, optional*): The duration the valve was open for this event (microseconds), 433 if applicable (e.g., during "SAMPLE" state licks). Defaults to None. 434 435 Raises 436 ------ 437 - Propagates exceptions from `event_data.insert_row_into_df`. 438 """ 439 try: 440 event_data = self.exp_data.event_data 441 current_trial = self.exp_data.current_trial_number 442 # insert the lick record into the dataframe 443 event_data.insert_row_into_df( 444 current_trial, 445 side + 1, 446 duration, 447 time_rel_to_start, 448 time_rel_to_trial, 449 state, 450 valve_duration=(valve_dur if valve_dur else None), 451 ) 452 453 logging.info(f"Lick data recorded for side: {side + 1}") 454 except Exception as e: 455 logging.error(f"Error recording lick data: {e}") 456 raise 457 458 def process_data( 459 self, source: str, data: str, state: str, trigger: Callable 460 ) -> None: 461 """ 462 Processes incoming data strings received from the Arduino via the serial connection. 463 464 Splits the raw data string by the '|' content separator. Determines the type of 465 data based on the first element (`split_data[0]`). Directly handles "MOTOR" events, 466 and routes lick data (`split_data[0]` is '0' or '1') 467 to `handle_licks` for further processing. 468 469 Parameters 470 ---------- 471 - **source** (*str*): Identifier for the source of the data. Included for logging/debugging. 472 - **data** (*str*): The raw data string received from the Arduino. 473 - **state** (*str*): The current state of the experiment FSM, passed to handlers like `handle_licks`. 474 - **trigger** (*Callable*): The state machine's trigger function, passed to handlers like `handle_licks`. 475 476 Raises 477 ------ 478 - *IndexError*: If `data.split('|')` results in an empty list or accessing `split_data[0]` fails. 479 - Propagates exceptions from `handle_licks` or `record_event`. 480 """ 481 event_data = self.exp_data.event_data 482 split_data = data.split("|") 483 try: 484 if split_data[0] == "MOTOR": 485 # data will arrive in the following format 486 # MOTOR|DOWN|2338|7340|7340 487 # MOTOR|MOVEMENT|DURATION|END_TIME_REL_TO_PROG_START|END_TIME_REL_TO_PROG_TRIAL_START 488 event_data = self.exp_data.event_data 489 490 current_trial = self.exp_data.current_trial_number 491 492 event_data.insert_row_into_df( 493 current_trial, 494 None, 495 np.int32(split_data[2]), 496 (np.int32(split_data[3]) / 1000), 497 (np.int32(split_data[4]) / 1000), 498 f"MOTOR {split_data[1]}", 499 ) 500 elif split_data[0] == "0" or split_data[0] == "1": 501 self.handle_licks(split_data, event_data, state, trigger) 502 503 except Exception as e: 504 logging.error(f"Error processing data from {source}: {e}") 505 raise
47class ArduinoData: 48 """ 49 Manages Arduino-related data, focusing on valve duration storage persistence and 50 processing incoming serial data during experiments. 51 52 This class acts as an interface for saving and retrieving valve open durations 53 from the TOML configuration file, implementing a simple archival and 'profile' system. It also 54 contains methods to interpret data strings sent from the Arduino (like lick 55 events and motor status) and record relevant information into an associated 56 ExperimentProcessData pandas dataframe. 57 58 Attributes 59 ---------- 60 - **`exp_data`** (*ExperimentProcessData*): A reference to the main experiment data object, used for accessing schedule information and recording processed events. 61 62 Methods 63 ------- 64 - `find_first_not_filled`(...) 65 Helper method to find the first available archive slot in the durations TOML file. 66 - `save_durations`(...) 67 Saves provided valve durations (side_one, side_two) either as the 'selected' profile or into an archive slot. 68 - `load_durations`(...) 69 Loads a specified valve duration profile (defaulting to 'selected') from the TOML file. 70 - `load_schedule_indices`() 71 Generates 0-indexed numpy arrays representing the valve schedule for an experiment based on `ExperimentProcessData` program_schedule_df. 72 - `increment_licks`(...) 73 Increments the appropriate lick counter in the `ExperimentProcessData`. 74 - `handle_licks`(...) 75 Parses and processes data strings specifically identified as lick events. 76 - `record_event`(...) 77 Records a processed event (lick or motor movement) into the `ExperimentProcessData` DataFrame. 78 - `process_data`(...) 79 Main entry point for parsing incoming data strings from the Arduino, routing to specific handlers (like `handle_licks`) based on content. 80 """ 81 82 def __init__(self, exp_data): 83 """ 84 Initializes the ArduinoData manager. 85 86 Parameters 87 ---------- 88 - **exp_data** (*ExperimentProcessData*): The instance of the ExperimentProcessData class holding all current experiment configuration and trial data. 89 """ 90 self.exp_data = exp_data 91 92 def find_first_not_filled(self, toml_file: dict) -> int: 93 """ 94 Searches the loaded TOML file structure for the first available archive slot. 95 96 Checks `archive_1`, `archive_2`, then `archive_3` for attribute `filled = false`. 97 98 Parameters 99 ---------- 100 - **toml_file** (*dict*): The dictionary representation of the loaded valve durations TOML file. 101 102 Returns 103 ------- 104 - *int*: The index (1, 2, or 3) of the first unfilled archive slot. Returns 0 if all slots (1, 2, 3) are marked as filled. 105 """ 106 first_available = 0 107 108 for i in range(1, 4): 109 archive = toml_file[f"archive_{i}"] 110 if archive["filled"]: 111 continue 112 else: 113 first_available = i 114 return first_available 115 116 def save_durations( 117 self, 118 side_one: npt.NDArray[np.int32], 119 side_two: npt.NDArray[np.int32], 120 type_durations: str, 121 ) -> None: 122 """ 123 Saves valve open durations to the `valve_durations.toml` configuration file. 124 125 This function handles saving durations either as the primary 'selected' profile 126 or archiving them. When archiving, it implements a FIFO (First-In, First-Out) 127 logic using three archive slots: 128 1. If slot 1 is free, save there. 129 2. If slot 1 is full but 2 is free, move 1->2, save new to 1. 130 3. If slots 1 and 2 are full (or all slots are full), move 2->3 (overwriting 3 if necessary), move 1->2, save new to 1. 131 132 Converts NumPy arrays to Python lists before saving to TOML for compatibility reasons. Records the 133 current datetime for the saved profile to mark when the profile was created. 134 135 Parameters 136 ---------- 137 - **side_one** (*npt.NDArray[np.int32]*): Numpy array of durations (microseconds) for side one valves. 138 - **side_two** (*npt.NDArray[np.int32]*): Numpy array of durations (microseconds) for side two valves. 139 - **type_durations** (*str*): Specifies the save location. Must be either `"selected"` (to update the main profile) or `"archive"` 140 (to save to the next available archive slot). 141 142 Raises 143 ------ 144 - *FileNotFoundError*: If the `valve_durations.toml` file cannot be found at the path specified by `system_config`. 145 - *toml.TomlDecodeError*: If the TOML file is malformed. 146 - *IOError*: If there are issues reading or writing the TOML file. 147 - *KeyError*: If the expected keys (`selected_durations`, `archive_1`, etc.) are missing in the TOML structure. 148 """ 149 150 dur_path = system_config.get_valve_durations() 151 152 with open(dur_path, "r") as f: 153 toml_file = toml.load(f) 154 155 if type_durations == "selected": 156 selected_durations = toml_file["selected_durations"] 157 158 # toml only knows how to deal with python native types (list, int, etc) 159 # so we convert np arrays to lists 160 selected_durations["date_used"] = datetime.datetime.now() 161 selected_durations["side_one_durations"] = side_one.tolist() 162 selected_durations["side_two_durations"] = side_two.tolist() 163 164 # save the file with the updated content 165 with open(dur_path, "w") as f: 166 toml.dump(toml_file, f) 167 elif type_durations == "archive": 168 # find first archive not filled 169 first_avail = self.find_first_not_filled(toml_file) 170 171 match first_avail: 172 case 1: 173 # 1 is available just insert 174 archive = toml_file["archive_1"] 175 176 archive["filled"] = True 177 archive["date_used"] = datetime.datetime.now() 178 archive["side_one_durations"] = side_one.tolist() 179 archive["side_two_durations"] = side_two.tolist() 180 181 with open(dur_path, "w") as f: 182 toml.dump(toml_file, f) 183 case 2: 184 # move 1-> 2, insert 1 185 toml_file["archive_2"] = copy.deepcopy(toml_file["archive_1"]) 186 187 archive_1 = toml_file["archive_1"] 188 189 archive_1["filled"] = True 190 archive_1["date_used"] = datetime.datetime.now() 191 archive_1["side_one_durations"] = side_one.tolist() 192 archive_1["side_two_durations"] = side_two.tolist() 193 194 with open(dur_path, "w") as f: 195 toml.dump(toml_file, f) 196 197 case 3 | 0: 198 # case 3) 2 -> 3, 1-> 2, insert 1 199 # case 0 (no available archives) ) del / overwrite 3 (oldest), 2 -> 3, 1-> 2, insert 1 200 toml_file["archive_3"] = copy.deepcopy(toml_file["archive_2"]) 201 toml_file["archive_2"] = copy.deepcopy(toml_file["archive_1"]) 202 203 archive_1 = toml_file["archive_1"] 204 205 archive_1["filled"] = True 206 archive_1["date_used"] = datetime.datetime.now() 207 archive_1["side_one_durations"] = side_one.tolist() 208 archive_1["side_two_durations"] = side_two.tolist() 209 210 with open(dur_path, "w") as f: 211 toml.dump(toml_file, f) 212 213 def load_durations( 214 self, type_durations="selected_durations" 215 ) -> tuple[npt.NDArray[np.int32], npt.NDArray[np.int32], datetime.datetime]: 216 """ 217 Loads a specific valve duration profile from the `valve_durations.toml` file. 218 219 Retrieves the durations for side one and side two, along with the timestamp 220 when that profile was created. Converts the lists from the TOML file back 221 into NumPy `np.int32` arrays for efficient operations. 222 223 Parameters 224 ---------- 225 - **type_durations** (*str, optional*): The key corresponding to the desired profile in the TOML file 226 (e.g., `"selected_durations"`, `"default_durations"`, `"archive_1"`). Defaults to `"selected_durations"`. 227 228 Returns 229 ------- 230 - *tuple[npt.NDArray[np.int32], npt.NDArray[np.int32], datetime.datetime]*: A tuple containing: 231 - Numpy array of durations for side one. 232 - Numpy array of durations for side two. 233 - The datetime object indicating when the loaded profile was saved. 234 235 Raises 236 ------ 237 - *FileNotFoundError*: If the `valve_durations.toml` file cannot be found. 238 - *toml.TomlDecodeError*: If the TOML file is malformed. 239 - *IOError*: If there are issues reading the TOML file. 240 - *KeyError*: If the specified `type_durations` key or expected sub-keys (`side_one_durations`, `date_used`, etc.) are missing. 241 """ 242 valve_durations_toml = system_config.get_valve_durations() 243 244 with open(valve_durations_toml, "r") as f: 245 toml_file = toml.load(f) 246 247 file_durations = toml_file[type_durations] 248 249 # create 2 np arrays 8 n long with np.int32s 250 dur_side_one = np.full(VALVES_PER_SIDE, np.int32(0)) 251 dur_side_two = np.full(VALVES_PER_SIDE, np.int32(0)) 252 253 # keys here are names of durations i.e side_one_dur or side_two_dur, values 254 # are lists of durations, VALVES_PER_SIDE long. Will produce two lists 255 # VALVES_PER_SIDE long so that each valve in the rig is assigned a duration. 256 for key, value in file_durations.items(): 257 if key == "side_one_durations": 258 for i, duration in enumerate(value): 259 dur_side_one[i] = duration 260 elif key == "side_two_durations": 261 for i, duration in enumerate(value): 262 dur_side_two[i] = duration 263 264 date_used = file_durations["date_used"] 265 266 return dur_side_one, dur_side_two, date_used 267 268 def load_schedule_indices( 269 self, 270 ) -> tuple[npt.NDArray[np.int8], npt.NDArray[np.int8]]: 271 """ 272 Generates 0-indexed valve schedule arrays based on the experiment schedule. 273 274 Uses the stimuli names and the trial-by-trial schedule defined in the 275 associated `ExperimentProcessData` object (`self.exp_data`) to create two NumPy arrays. 276 The index for side one is first found, then the associated valve is found via `exp_data.get_paired_index`. 277 278 These arrays represent the 0-indexed valve number to be activated for each trial 279 on side one and side two, respectively. This format is suitable for sending 280 to the Arduino. 281 282 Returns 283 ------- 284 - *tuple[npt.NDArray[np.int8], npt.NDArray[np.int8]]*: A tuple containing: 285 - Numpy array of 0-indexed valve numbers for side one schedule. 286 - Numpy array of 0-indexed valve numbers for side two schedule. 287 288 Raises 289 ------ 290 - *AttributeError*: If `self.exp_data` or its required attributes/methods (like `stimuli_data`, `program_schedule_df`, `get_paired_index`) are missing or invalid. 291 - *KeyError*: If expected keys (like 'Num Stimuli', 'Port 1') are missing in `exp_data`. 292 - *ValueError*: If a stimulus name from the schedule is not found in the unique stimuli list. 293 """ 294 stimuli_data = self.exp_data.stimuli_data 295 schedule_df = self.exp_data.program_schedule_df 296 297 num_stimuli = self.exp_data.exp_var_entries["Num Stimuli"] 298 num_trials = self.exp_data.exp_var_entries["Num Trials"] 299 300 sched_side_one = np.full(num_trials, np.int8(0)) 301 sched_side_two = np.full(num_trials, np.int8(0)) 302 303 # get unique stim names. this will give us all names on side one. 304 # finding the index of a given stimuli appearing in a trial on side 305 # one will easily allow us to find the paired index. We've already done 306 # this before in exp_process_data 307 names = list(stimuli_data.stimuli_vars.values()) 308 unique_names = names[: num_stimuli // 2] 309 310 side_one_trial_stimuli = schedule_df["Port 1"].to_numpy() 311 312 for i, trial_stim in enumerate(side_one_trial_stimuli): 313 index = unique_names.index(trial_stim) 314 sd_two_index = self.exp_data.get_paired_index(index, num_stimuli) 315 sched_side_one[i] = index 316 sched_side_two[i] = sd_two_index 317 318 return sched_side_one, sched_side_two 319 320 def increment_licks(self, side: int, event_data: EventData): 321 """ 322 Increments the lick counter for the specified side within the EventData object. 323 324 Parameters 325 ---------- 326 - **side** (*int*): The side where the lick occurred (0 for side one, 1 for side two). 327 - **event_data** (*EventData*): The EventData instance associated with `self.exp_data` where lick counts are stored. 328 """ 329 match side: 330 case 0: 331 event_data.side_one_licks += 1 332 case 1: 333 event_data.side_two_licks += 1 334 case _: 335 logger.error("invalid side") 336 337 def handle_licks( 338 self, 339 split_data: list[str], 340 event_data: EventData, 341 state: str, 342 trigger: Callable, 343 ) -> None: 344 """ 345 Parses and records data specifically identified as a lick event. 346 347 Extracts side, lick duration, timestamps, and valve duration (if in "SAMPLE" state) 348 from the `split_data` list based on the current experiment `state` ("TTC" or "SAMPLE"). 349 Calls `increment_licks` and `record_event`. For "TTC" state, it checks if 350 the lick count threshold is met to trigger a state change to "SAMPLE". 351 352 Parameters 353 ---------- 354 - **split_data** (*list[str]*): The list of strings resulting from splitting the incoming Arduino data by content separator '|'. 355 - **event_data** (*EventData*): The EventData instance for accessing lick counts. 356 - **state** (*str*): The current state of the experiment FSM (e.g., "TTC", "SAMPLE"). 357 - **trigger** (*Callable*): The state machine's trigger function, used here to transition state to "SAMPLE" state based on lick counts. 358 359 Raises 360 ------ 361 - *IndexError*: If `split_data` does not contain the expected number of elements for the given state. 362 - *ValueError*: If elements in `split_data` cannot be converted to the expected types (int, np.int8, np.int32). 363 """ 364 side = None 365 valve_duration = None 366 match state: 367 case "TTC": 368 # split_data will have the below form for ttc 369 # 0(side)|67(duration)|6541(stamp_rel_to_start)|6541(stamp_rel_to_trial) 370 # 1|16|0|496570|41 from arduino 371 side = np.int8(split_data[0]) 372 self.increment_licks(side, event_data) 373 374 lick_duration = np.int32(split_data[1]) 375 time_rel_to_start = np.int32(split_data[2]) / 1000 376 time_rel_to_trial = np.int32(split_data[3]) / 1000 377 378 # insert the values held in licks for respective sides in a shorter variable name 379 side_one = self.exp_data.event_data.side_one_licks 380 side_two = self.exp_data.event_data.side_two_licks 381 # if 3 or more licks in a ttc time, jump straight to sample 382 if side_one > 2 or side_two > 2: 383 trigger("SAMPLE") 384 self.record_event( 385 side, lick_duration, time_rel_to_start, time_rel_to_trial, state 386 ) 387 388 case "SAMPLE": 389 try: 390 # 0|87|26064|8327|8327 391 side = np.int8(split_data[0]) 392 self.increment_licks(side, event_data) 393 394 lick_duration = np.int32(split_data[1]) 395 valve_duration = np.int32(split_data[2]) 396 time_rel_to_start = np.int32(split_data[3]) / 1000 397 time_rel_to_trial = np.int32(split_data[4]) / 1000 398 399 self.record_event( 400 side, 401 lick_duration, 402 time_rel_to_start, 403 time_rel_to_trial, 404 state, 405 valve_duration, 406 ) 407 408 except Exception as e: 409 logging.error(f"IMPROPER DATA.... IGNORING.....{e}") 410 411 def record_event( 412 self, 413 side: int, 414 duration: np.int32, 415 time_rel_to_start: float, 416 time_rel_to_trial: float, 417 state: str, 418 valve_dur: np.int32 | None = None, 419 ): 420 """ 421 Records a processed event (lick or motor movement) into the `ExperimentProcessData` `EventData` DataFrame. 422 423 Uses the `insert_row_into_df` method of the `EventData` object (`self.exp_data.event_data`) 424 to add a new row containing the details of the event. 425 426 Parameters 427 ---------- 428 - **side** (*int*): The side associated with the event (0 for side one, 1 for side two). 429 - **duration** (*np.int32*): The duration of the event (e.g., lick contact time) in milliseconds. 430 - **time_rel_to_start** (*float*): The timestamp of the event relative to the start of the entire program, in seconds. 431 - **time_rel_to_trial** (*float*): The timestamp of the event relative to the start of the current trial, in seconds. 432 - **state** (*str*): A string describing the state during the event (e.g., "TTC", "SAMPLE"). 433 - **valve_dur** (*np.int32 | None, optional*): The duration the valve was open for this event (microseconds), 434 if applicable (e.g., during "SAMPLE" state licks). Defaults to None. 435 436 Raises 437 ------ 438 - Propagates exceptions from `event_data.insert_row_into_df`. 439 """ 440 try: 441 event_data = self.exp_data.event_data 442 current_trial = self.exp_data.current_trial_number 443 # insert the lick record into the dataframe 444 event_data.insert_row_into_df( 445 current_trial, 446 side + 1, 447 duration, 448 time_rel_to_start, 449 time_rel_to_trial, 450 state, 451 valve_duration=(valve_dur if valve_dur else None), 452 ) 453 454 logging.info(f"Lick data recorded for side: {side + 1}") 455 except Exception as e: 456 logging.error(f"Error recording lick data: {e}") 457 raise 458 459 def process_data( 460 self, source: str, data: str, state: str, trigger: Callable 461 ) -> None: 462 """ 463 Processes incoming data strings received from the Arduino via the serial connection. 464 465 Splits the raw data string by the '|' content separator. Determines the type of 466 data based on the first element (`split_data[0]`). Directly handles "MOTOR" events, 467 and routes lick data (`split_data[0]` is '0' or '1') 468 to `handle_licks` for further processing. 469 470 Parameters 471 ---------- 472 - **source** (*str*): Identifier for the source of the data. Included for logging/debugging. 473 - **data** (*str*): The raw data string received from the Arduino. 474 - **state** (*str*): The current state of the experiment FSM, passed to handlers like `handle_licks`. 475 - **trigger** (*Callable*): The state machine's trigger function, passed to handlers like `handle_licks`. 476 477 Raises 478 ------ 479 - *IndexError*: If `data.split('|')` results in an empty list or accessing `split_data[0]` fails. 480 - Propagates exceptions from `handle_licks` or `record_event`. 481 """ 482 event_data = self.exp_data.event_data 483 split_data = data.split("|") 484 try: 485 if split_data[0] == "MOTOR": 486 # data will arrive in the following format 487 # MOTOR|DOWN|2338|7340|7340 488 # MOTOR|MOVEMENT|DURATION|END_TIME_REL_TO_PROG_START|END_TIME_REL_TO_PROG_TRIAL_START 489 event_data = self.exp_data.event_data 490 491 current_trial = self.exp_data.current_trial_number 492 493 event_data.insert_row_into_df( 494 current_trial, 495 None, 496 np.int32(split_data[2]), 497 (np.int32(split_data[3]) / 1000), 498 (np.int32(split_data[4]) / 1000), 499 f"MOTOR {split_data[1]}", 500 ) 501 elif split_data[0] == "0" or split_data[0] == "1": 502 self.handle_licks(split_data, event_data, state, trigger) 503 504 except Exception as e: 505 logging.error(f"Error processing data from {source}: {e}") 506 raise
Manages Arduino-related data, focusing on valve duration storage persistence and processing incoming serial data during experiments.
This class acts as an interface for saving and retrieving valve open durations from the TOML configuration file, implementing a simple archival and 'profile' system. It also contains methods to interpret data strings sent from the Arduino (like lick events and motor status) and record relevant information into an associated ExperimentProcessData pandas dataframe.
Attributes
exp_data
(ExperimentProcessData): A reference to the main experiment data object, used for accessing schedule information and recording processed events.
Methods
find_first_not_filled
(...) Helper method to find the first available archive slot in the durations TOML file.save_durations
(...) Saves provided valve durations (side_one, side_two) either as the 'selected' profile or into an archive slot.load_durations
(...) Loads a specified valve duration profile (defaulting to 'selected') from the TOML file.load_schedule_indices
() Generates 0-indexed numpy arrays representing the valve schedule for an experiment based onExperimentProcessData
program_schedule_df.increment_licks
(...) Increments the appropriate lick counter in theExperimentProcessData
.handle_licks
(...) Parses and processes data strings specifically identified as lick events.record_event
(...) Records a processed event (lick or motor movement) into theExperimentProcessData
DataFrame.process_data
(...) Main entry point for parsing incoming data strings from the Arduino, routing to specific handlers (likehandle_licks
) based on content.
82 def __init__(self, exp_data): 83 """ 84 Initializes the ArduinoData manager. 85 86 Parameters 87 ---------- 88 - **exp_data** (*ExperimentProcessData*): The instance of the ExperimentProcessData class holding all current experiment configuration and trial data. 89 """ 90 self.exp_data = exp_data
Initializes the ArduinoData manager.
Parameters
- exp_data (ExperimentProcessData): The instance of the ExperimentProcessData class holding all current experiment configuration and trial data.
92 def find_first_not_filled(self, toml_file: dict) -> int: 93 """ 94 Searches the loaded TOML file structure for the first available archive slot. 95 96 Checks `archive_1`, `archive_2`, then `archive_3` for attribute `filled = false`. 97 98 Parameters 99 ---------- 100 - **toml_file** (*dict*): The dictionary representation of the loaded valve durations TOML file. 101 102 Returns 103 ------- 104 - *int*: The index (1, 2, or 3) of the first unfilled archive slot. Returns 0 if all slots (1, 2, 3) are marked as filled. 105 """ 106 first_available = 0 107 108 for i in range(1, 4): 109 archive = toml_file[f"archive_{i}"] 110 if archive["filled"]: 111 continue 112 else: 113 first_available = i 114 return first_available
Searches the loaded TOML file structure for the first available archive slot.
Checks archive_1
, archive_2
, then archive_3
for attribute filled = false
.
Parameters
- toml_file (dict): The dictionary representation of the loaded valve durations TOML file.
Returns
- int: The index (1, 2, or 3) of the first unfilled archive slot. Returns 0 if all slots (1, 2, 3) are marked as filled.
116 def save_durations( 117 self, 118 side_one: npt.NDArray[np.int32], 119 side_two: npt.NDArray[np.int32], 120 type_durations: str, 121 ) -> None: 122 """ 123 Saves valve open durations to the `valve_durations.toml` configuration file. 124 125 This function handles saving durations either as the primary 'selected' profile 126 or archiving them. When archiving, it implements a FIFO (First-In, First-Out) 127 logic using three archive slots: 128 1. If slot 1 is free, save there. 129 2. If slot 1 is full but 2 is free, move 1->2, save new to 1. 130 3. If slots 1 and 2 are full (or all slots are full), move 2->3 (overwriting 3 if necessary), move 1->2, save new to 1. 131 132 Converts NumPy arrays to Python lists before saving to TOML for compatibility reasons. Records the 133 current datetime for the saved profile to mark when the profile was created. 134 135 Parameters 136 ---------- 137 - **side_one** (*npt.NDArray[np.int32]*): Numpy array of durations (microseconds) for side one valves. 138 - **side_two** (*npt.NDArray[np.int32]*): Numpy array of durations (microseconds) for side two valves. 139 - **type_durations** (*str*): Specifies the save location. Must be either `"selected"` (to update the main profile) or `"archive"` 140 (to save to the next available archive slot). 141 142 Raises 143 ------ 144 - *FileNotFoundError*: If the `valve_durations.toml` file cannot be found at the path specified by `system_config`. 145 - *toml.TomlDecodeError*: If the TOML file is malformed. 146 - *IOError*: If there are issues reading or writing the TOML file. 147 - *KeyError*: If the expected keys (`selected_durations`, `archive_1`, etc.) are missing in the TOML structure. 148 """ 149 150 dur_path = system_config.get_valve_durations() 151 152 with open(dur_path, "r") as f: 153 toml_file = toml.load(f) 154 155 if type_durations == "selected": 156 selected_durations = toml_file["selected_durations"] 157 158 # toml only knows how to deal with python native types (list, int, etc) 159 # so we convert np arrays to lists 160 selected_durations["date_used"] = datetime.datetime.now() 161 selected_durations["side_one_durations"] = side_one.tolist() 162 selected_durations["side_two_durations"] = side_two.tolist() 163 164 # save the file with the updated content 165 with open(dur_path, "w") as f: 166 toml.dump(toml_file, f) 167 elif type_durations == "archive": 168 # find first archive not filled 169 first_avail = self.find_first_not_filled(toml_file) 170 171 match first_avail: 172 case 1: 173 # 1 is available just insert 174 archive = toml_file["archive_1"] 175 176 archive["filled"] = True 177 archive["date_used"] = datetime.datetime.now() 178 archive["side_one_durations"] = side_one.tolist() 179 archive["side_two_durations"] = side_two.tolist() 180 181 with open(dur_path, "w") as f: 182 toml.dump(toml_file, f) 183 case 2: 184 # move 1-> 2, insert 1 185 toml_file["archive_2"] = copy.deepcopy(toml_file["archive_1"]) 186 187 archive_1 = toml_file["archive_1"] 188 189 archive_1["filled"] = True 190 archive_1["date_used"] = datetime.datetime.now() 191 archive_1["side_one_durations"] = side_one.tolist() 192 archive_1["side_two_durations"] = side_two.tolist() 193 194 with open(dur_path, "w") as f: 195 toml.dump(toml_file, f) 196 197 case 3 | 0: 198 # case 3) 2 -> 3, 1-> 2, insert 1 199 # case 0 (no available archives) ) del / overwrite 3 (oldest), 2 -> 3, 1-> 2, insert 1 200 toml_file["archive_3"] = copy.deepcopy(toml_file["archive_2"]) 201 toml_file["archive_2"] = copy.deepcopy(toml_file["archive_1"]) 202 203 archive_1 = toml_file["archive_1"] 204 205 archive_1["filled"] = True 206 archive_1["date_used"] = datetime.datetime.now() 207 archive_1["side_one_durations"] = side_one.tolist() 208 archive_1["side_two_durations"] = side_two.tolist() 209 210 with open(dur_path, "w") as f: 211 toml.dump(toml_file, f)
Saves valve open durations to the valve_durations.toml
configuration file.
This function handles saving durations either as the primary 'selected' profile or archiving them. When archiving, it implements a FIFO (First-In, First-Out) logic using three archive slots:
- If slot 1 is free, save there.
- If slot 1 is full but 2 is free, move 1->2, save new to 1.
- If slots 1 and 2 are full (or all slots are full), move 2->3 (overwriting 3 if necessary), move 1->2, save new to 1.
Converts NumPy arrays to Python lists before saving to TOML for compatibility reasons. Records the current datetime for the saved profile to mark when the profile was created.
Parameters
- side_one (npt.NDArray[np.int32]): Numpy array of durations (microseconds) for side one valves.
- side_two (npt.NDArray[np.int32]): Numpy array of durations (microseconds) for side two valves.
- type_durations (str): Specifies the save location. Must be either
"selected"
(to update the main profile) or"archive"
(to save to the next available archive slot).
Raises
- FileNotFoundError: If the
valve_durations.toml
file cannot be found at the path specified bysystem_config
. - toml.TomlDecodeError: If the TOML file is malformed.
- IOError: If there are issues reading or writing the TOML file.
- KeyError: If the expected keys (
selected_durations
,archive_1
, etc.) are missing in the TOML structure.
213 def load_durations( 214 self, type_durations="selected_durations" 215 ) -> tuple[npt.NDArray[np.int32], npt.NDArray[np.int32], datetime.datetime]: 216 """ 217 Loads a specific valve duration profile from the `valve_durations.toml` file. 218 219 Retrieves the durations for side one and side two, along with the timestamp 220 when that profile was created. Converts the lists from the TOML file back 221 into NumPy `np.int32` arrays for efficient operations. 222 223 Parameters 224 ---------- 225 - **type_durations** (*str, optional*): The key corresponding to the desired profile in the TOML file 226 (e.g., `"selected_durations"`, `"default_durations"`, `"archive_1"`). Defaults to `"selected_durations"`. 227 228 Returns 229 ------- 230 - *tuple[npt.NDArray[np.int32], npt.NDArray[np.int32], datetime.datetime]*: A tuple containing: 231 - Numpy array of durations for side one. 232 - Numpy array of durations for side two. 233 - The datetime object indicating when the loaded profile was saved. 234 235 Raises 236 ------ 237 - *FileNotFoundError*: If the `valve_durations.toml` file cannot be found. 238 - *toml.TomlDecodeError*: If the TOML file is malformed. 239 - *IOError*: If there are issues reading the TOML file. 240 - *KeyError*: If the specified `type_durations` key or expected sub-keys (`side_one_durations`, `date_used`, etc.) are missing. 241 """ 242 valve_durations_toml = system_config.get_valve_durations() 243 244 with open(valve_durations_toml, "r") as f: 245 toml_file = toml.load(f) 246 247 file_durations = toml_file[type_durations] 248 249 # create 2 np arrays 8 n long with np.int32s 250 dur_side_one = np.full(VALVES_PER_SIDE, np.int32(0)) 251 dur_side_two = np.full(VALVES_PER_SIDE, np.int32(0)) 252 253 # keys here are names of durations i.e side_one_dur or side_two_dur, values 254 # are lists of durations, VALVES_PER_SIDE long. Will produce two lists 255 # VALVES_PER_SIDE long so that each valve in the rig is assigned a duration. 256 for key, value in file_durations.items(): 257 if key == "side_one_durations": 258 for i, duration in enumerate(value): 259 dur_side_one[i] = duration 260 elif key == "side_two_durations": 261 for i, duration in enumerate(value): 262 dur_side_two[i] = duration 263 264 date_used = file_durations["date_used"] 265 266 return dur_side_one, dur_side_two, date_used
Loads a specific valve duration profile from the valve_durations.toml
file.
Retrieves the durations for side one and side two, along with the timestamp
when that profile was created. Converts the lists from the TOML file back
into NumPy np.int32
arrays for efficient operations.
Parameters
- type_durations (str, optional): The key corresponding to the desired profile in the TOML file
(e.g.,
"selected_durations"
,"default_durations"
,"archive_1"
). Defaults to"selected_durations"
.
Returns
- tuple[npt.NDArray[np.int32], npt.NDArray[np.int32], datetime.datetime]: A tuple containing:
- Numpy array of durations for side one.
- Numpy array of durations for side two.
- The datetime object indicating when the loaded profile was saved.
Raises
- FileNotFoundError: If the
valve_durations.toml
file cannot be found. - toml.TomlDecodeError: If the TOML file is malformed.
- IOError: If there are issues reading the TOML file.
- KeyError: If the specified
type_durations
key or expected sub-keys (side_one_durations
,date_used
, etc.) are missing.
268 def load_schedule_indices( 269 self, 270 ) -> tuple[npt.NDArray[np.int8], npt.NDArray[np.int8]]: 271 """ 272 Generates 0-indexed valve schedule arrays based on the experiment schedule. 273 274 Uses the stimuli names and the trial-by-trial schedule defined in the 275 associated `ExperimentProcessData` object (`self.exp_data`) to create two NumPy arrays. 276 The index for side one is first found, then the associated valve is found via `exp_data.get_paired_index`. 277 278 These arrays represent the 0-indexed valve number to be activated for each trial 279 on side one and side two, respectively. This format is suitable for sending 280 to the Arduino. 281 282 Returns 283 ------- 284 - *tuple[npt.NDArray[np.int8], npt.NDArray[np.int8]]*: A tuple containing: 285 - Numpy array of 0-indexed valve numbers for side one schedule. 286 - Numpy array of 0-indexed valve numbers for side two schedule. 287 288 Raises 289 ------ 290 - *AttributeError*: If `self.exp_data` or its required attributes/methods (like `stimuli_data`, `program_schedule_df`, `get_paired_index`) are missing or invalid. 291 - *KeyError*: If expected keys (like 'Num Stimuli', 'Port 1') are missing in `exp_data`. 292 - *ValueError*: If a stimulus name from the schedule is not found in the unique stimuli list. 293 """ 294 stimuli_data = self.exp_data.stimuli_data 295 schedule_df = self.exp_data.program_schedule_df 296 297 num_stimuli = self.exp_data.exp_var_entries["Num Stimuli"] 298 num_trials = self.exp_data.exp_var_entries["Num Trials"] 299 300 sched_side_one = np.full(num_trials, np.int8(0)) 301 sched_side_two = np.full(num_trials, np.int8(0)) 302 303 # get unique stim names. this will give us all names on side one. 304 # finding the index of a given stimuli appearing in a trial on side 305 # one will easily allow us to find the paired index. We've already done 306 # this before in exp_process_data 307 names = list(stimuli_data.stimuli_vars.values()) 308 unique_names = names[: num_stimuli // 2] 309 310 side_one_trial_stimuli = schedule_df["Port 1"].to_numpy() 311 312 for i, trial_stim in enumerate(side_one_trial_stimuli): 313 index = unique_names.index(trial_stim) 314 sd_two_index = self.exp_data.get_paired_index(index, num_stimuli) 315 sched_side_one[i] = index 316 sched_side_two[i] = sd_two_index 317 318 return sched_side_one, sched_side_two
Generates 0-indexed valve schedule arrays based on the experiment schedule.
Uses the stimuli names and the trial-by-trial schedule defined in the
associated ExperimentProcessData
object (self.exp_data
) to create two NumPy arrays.
The index for side one is first found, then the associated valve is found via exp_data.get_paired_index
.
These arrays represent the 0-indexed valve number to be activated for each trial on side one and side two, respectively. This format is suitable for sending to the Arduino.
Returns
- tuple[npt.NDArray[np.int8], npt.NDArray[np.int8]]: A tuple containing:
- Numpy array of 0-indexed valve numbers for side one schedule.
- Numpy array of 0-indexed valve numbers for side two schedule.
Raises
- AttributeError: If
self.exp_data
or its required attributes/methods (likestimuli_data
,program_schedule_df
,get_paired_index
) are missing or invalid. - KeyError: If expected keys (like 'Num Stimuli', 'Port 1') are missing in
exp_data
. - ValueError: If a stimulus name from the schedule is not found in the unique stimuli list.
320 def increment_licks(self, side: int, event_data: EventData): 321 """ 322 Increments the lick counter for the specified side within the EventData object. 323 324 Parameters 325 ---------- 326 - **side** (*int*): The side where the lick occurred (0 for side one, 1 for side two). 327 - **event_data** (*EventData*): The EventData instance associated with `self.exp_data` where lick counts are stored. 328 """ 329 match side: 330 case 0: 331 event_data.side_one_licks += 1 332 case 1: 333 event_data.side_two_licks += 1 334 case _: 335 logger.error("invalid side")
Increments the lick counter for the specified side within the EventData object.
Parameters
- side (int): The side where the lick occurred (0 for side one, 1 for side two).
- event_data (EventData): The EventData instance associated with
self.exp_data
where lick counts are stored.
337 def handle_licks( 338 self, 339 split_data: list[str], 340 event_data: EventData, 341 state: str, 342 trigger: Callable, 343 ) -> None: 344 """ 345 Parses and records data specifically identified as a lick event. 346 347 Extracts side, lick duration, timestamps, and valve duration (if in "SAMPLE" state) 348 from the `split_data` list based on the current experiment `state` ("TTC" or "SAMPLE"). 349 Calls `increment_licks` and `record_event`. For "TTC" state, it checks if 350 the lick count threshold is met to trigger a state change to "SAMPLE". 351 352 Parameters 353 ---------- 354 - **split_data** (*list[str]*): The list of strings resulting from splitting the incoming Arduino data by content separator '|'. 355 - **event_data** (*EventData*): The EventData instance for accessing lick counts. 356 - **state** (*str*): The current state of the experiment FSM (e.g., "TTC", "SAMPLE"). 357 - **trigger** (*Callable*): The state machine's trigger function, used here to transition state to "SAMPLE" state based on lick counts. 358 359 Raises 360 ------ 361 - *IndexError*: If `split_data` does not contain the expected number of elements for the given state. 362 - *ValueError*: If elements in `split_data` cannot be converted to the expected types (int, np.int8, np.int32). 363 """ 364 side = None 365 valve_duration = None 366 match state: 367 case "TTC": 368 # split_data will have the below form for ttc 369 # 0(side)|67(duration)|6541(stamp_rel_to_start)|6541(stamp_rel_to_trial) 370 # 1|16|0|496570|41 from arduino 371 side = np.int8(split_data[0]) 372 self.increment_licks(side, event_data) 373 374 lick_duration = np.int32(split_data[1]) 375 time_rel_to_start = np.int32(split_data[2]) / 1000 376 time_rel_to_trial = np.int32(split_data[3]) / 1000 377 378 # insert the values held in licks for respective sides in a shorter variable name 379 side_one = self.exp_data.event_data.side_one_licks 380 side_two = self.exp_data.event_data.side_two_licks 381 # if 3 or more licks in a ttc time, jump straight to sample 382 if side_one > 2 or side_two > 2: 383 trigger("SAMPLE") 384 self.record_event( 385 side, lick_duration, time_rel_to_start, time_rel_to_trial, state 386 ) 387 388 case "SAMPLE": 389 try: 390 # 0|87|26064|8327|8327 391 side = np.int8(split_data[0]) 392 self.increment_licks(side, event_data) 393 394 lick_duration = np.int32(split_data[1]) 395 valve_duration = np.int32(split_data[2]) 396 time_rel_to_start = np.int32(split_data[3]) / 1000 397 time_rel_to_trial = np.int32(split_data[4]) / 1000 398 399 self.record_event( 400 side, 401 lick_duration, 402 time_rel_to_start, 403 time_rel_to_trial, 404 state, 405 valve_duration, 406 ) 407 408 except Exception as e: 409 logging.error(f"IMPROPER DATA.... IGNORING.....{e}")
Parses and records data specifically identified as a lick event.
Extracts side, lick duration, timestamps, and valve duration (if in "SAMPLE" state)
from the split_data
list based on the current experiment state
("TTC" or "SAMPLE").
Calls increment_licks
and record_event
. For "TTC" state, it checks if
the lick count threshold is met to trigger a state change to "SAMPLE".
Parameters
- split_data (list[str]): The list of strings resulting from splitting the incoming Arduino data by content separator '|'.
- event_data (EventData): The EventData instance for accessing lick counts.
- state (str): The current state of the experiment FSM (e.g., "TTC", "SAMPLE").
- trigger (Callable): The state machine's trigger function, used here to transition state to "SAMPLE" state based on lick counts.
Raises
- IndexError: If
split_data
does not contain the expected number of elements for the given state. - ValueError: If elements in
split_data
cannot be converted to the expected types (int, np.int8, np.int32).
411 def record_event( 412 self, 413 side: int, 414 duration: np.int32, 415 time_rel_to_start: float, 416 time_rel_to_trial: float, 417 state: str, 418 valve_dur: np.int32 | None = None, 419 ): 420 """ 421 Records a processed event (lick or motor movement) into the `ExperimentProcessData` `EventData` DataFrame. 422 423 Uses the `insert_row_into_df` method of the `EventData` object (`self.exp_data.event_data`) 424 to add a new row containing the details of the event. 425 426 Parameters 427 ---------- 428 - **side** (*int*): The side associated with the event (0 for side one, 1 for side two). 429 - **duration** (*np.int32*): The duration of the event (e.g., lick contact time) in milliseconds. 430 - **time_rel_to_start** (*float*): The timestamp of the event relative to the start of the entire program, in seconds. 431 - **time_rel_to_trial** (*float*): The timestamp of the event relative to the start of the current trial, in seconds. 432 - **state** (*str*): A string describing the state during the event (e.g., "TTC", "SAMPLE"). 433 - **valve_dur** (*np.int32 | None, optional*): The duration the valve was open for this event (microseconds), 434 if applicable (e.g., during "SAMPLE" state licks). Defaults to None. 435 436 Raises 437 ------ 438 - Propagates exceptions from `event_data.insert_row_into_df`. 439 """ 440 try: 441 event_data = self.exp_data.event_data 442 current_trial = self.exp_data.current_trial_number 443 # insert the lick record into the dataframe 444 event_data.insert_row_into_df( 445 current_trial, 446 side + 1, 447 duration, 448 time_rel_to_start, 449 time_rel_to_trial, 450 state, 451 valve_duration=(valve_dur if valve_dur else None), 452 ) 453 454 logging.info(f"Lick data recorded for side: {side + 1}") 455 except Exception as e: 456 logging.error(f"Error recording lick data: {e}") 457 raise
Records a processed event (lick or motor movement) into the ExperimentProcessData
EventData
DataFrame.
Uses the insert_row_into_df
method of the EventData
object (self.exp_data.event_data
)
to add a new row containing the details of the event.
Parameters
- side (int): The side associated with the event (0 for side one, 1 for side two).
- duration (np.int32): The duration of the event (e.g., lick contact time) in milliseconds.
- time_rel_to_start (float): The timestamp of the event relative to the start of the entire program, in seconds.
- time_rel_to_trial (float): The timestamp of the event relative to the start of the current trial, in seconds.
- state (str): A string describing the state during the event (e.g., "TTC", "SAMPLE").
- valve_dur (np.int32 | None, optional): The duration the valve was open for this event (microseconds), if applicable (e.g., during "SAMPLE" state licks). Defaults to None.
Raises
- Propagates exceptions from
event_data.insert_row_into_df
.
459 def process_data( 460 self, source: str, data: str, state: str, trigger: Callable 461 ) -> None: 462 """ 463 Processes incoming data strings received from the Arduino via the serial connection. 464 465 Splits the raw data string by the '|' content separator. Determines the type of 466 data based on the first element (`split_data[0]`). Directly handles "MOTOR" events, 467 and routes lick data (`split_data[0]` is '0' or '1') 468 to `handle_licks` for further processing. 469 470 Parameters 471 ---------- 472 - **source** (*str*): Identifier for the source of the data. Included for logging/debugging. 473 - **data** (*str*): The raw data string received from the Arduino. 474 - **state** (*str*): The current state of the experiment FSM, passed to handlers like `handle_licks`. 475 - **trigger** (*Callable*): The state machine's trigger function, passed to handlers like `handle_licks`. 476 477 Raises 478 ------ 479 - *IndexError*: If `data.split('|')` results in an empty list or accessing `split_data[0]` fails. 480 - Propagates exceptions from `handle_licks` or `record_event`. 481 """ 482 event_data = self.exp_data.event_data 483 split_data = data.split("|") 484 try: 485 if split_data[0] == "MOTOR": 486 # data will arrive in the following format 487 # MOTOR|DOWN|2338|7340|7340 488 # MOTOR|MOVEMENT|DURATION|END_TIME_REL_TO_PROG_START|END_TIME_REL_TO_PROG_TRIAL_START 489 event_data = self.exp_data.event_data 490 491 current_trial = self.exp_data.current_trial_number 492 493 event_data.insert_row_into_df( 494 current_trial, 495 None, 496 np.int32(split_data[2]), 497 (np.int32(split_data[3]) / 1000), 498 (np.int32(split_data[4]) / 1000), 499 f"MOTOR {split_data[1]}", 500 ) 501 elif split_data[0] == "0" or split_data[0] == "1": 502 self.handle_licks(split_data, event_data, state, trigger) 503 504 except Exception as e: 505 logging.error(f"Error processing data from {source}: {e}") 506 raise
Processes incoming data strings received from the Arduino via the serial connection.
Splits the raw data string by the '|' content separator. Determines the type of
data based on the first element (split_data[0]
). Directly handles "MOTOR" events,
and routes lick data (split_data[0]
is '0' or '1')
to handle_licks
for further processing.
Parameters
- source (str): Identifier for the source of the data. Included for logging/debugging.
- data (str): The raw data string received from the Arduino.
- state (str): The current state of the experiment FSM, passed to handlers like
handle_licks
. - trigger (Callable): The state machine's trigger function, passed to handlers like
handle_licks
.
Raises
- IndexError: If
data.split('|')
results in an empty list or accessingsplit_data[0]
fails. - Propagates exceptions from
handle_licks
orrecord_event
.