arduino_data

This module defines the ArduinoData class, responsible for storing data related to the Arduino controller that persists over different sesssions, primarily valve open durations and experimental schedule information.

It handles loading and saving valve timing profiles (including archiving) to a TOML configuration file at the current user's Documents/Photologic-Experiment-Rig-Files directory.

It also processes incoming data strings from the Arduino during an experiment, parsing lick events and motor movements, and recording them into the main experiment data structure (ExperimentProcessData). Relies on system_config to locate configuration files and interacts with an ExperimentProcessData instance passed during initialization.

  1"""
  2This module defines the ArduinoData class, responsible for storing
  3data related to the Arduino controller that persists over different sesssions,
  4primarily valve open durations and experimental schedule information.
  5
  6It handles loading and saving valve timing profiles (including archiving) to a
  7TOML configuration file at the current user's `Documents/Photologic-Experiment-Rig-Files` directory.
  8
  9It also processes incoming data strings from the Arduino
 10during an experiment, parsing lick events and motor movements, and recording
 11them into the main experiment data structure (`ExperimentProcessData`). Relies on
 12`system_config` to locate configuration files and interacts with an `ExperimentProcessData`
 13instance passed during initialization.
 14"""
 15
 16import logging
 17import datetime
 18import copy
 19from typing import Callable
 20import toml
 21import numpy as np
 22import numpy.typing as npt
 23from models.event_data import EventData
 24import system_config
 25
 26from typing import TYPE_CHECKING
 27
 28if TYPE_CHECKING:
 29    ###TYPE HINTING###
 30    from models.experiment_process_data import ExperimentProcessData
 31    ###TYPE HINTING###
 32
 33# Get the logger in use for the app
 34logger = logging.getLogger()
 35
 36rig_config = system_config.get_rig_config()
 37
 38with open(rig_config, "r") as f:
 39    VALVE_CONFIG = toml.load(f)["valve_config"]
 40
 41# pull total valves constant from toml config
 42TOTAL_POSSIBLE_VALVES = VALVE_CONFIG["TOTAL_POSSIBLE_VALVES"]
 43VALVES_PER_SIDE = TOTAL_POSSIBLE_VALVES // 2
 44
 45
 46class ArduinoData:
 47    """
 48    Manages Arduino-related data, focusing on valve duration storage persistence and
 49    processing incoming serial data during experiments.
 50
 51    This class acts as an interface for saving and retrieving valve open durations
 52    from the TOML configuration file, implementing a simple archival and 'profile' system. It also
 53    contains methods to interpret data strings sent from the Arduino (like lick
 54    events and motor status) and record relevant information into an associated
 55    ExperimentProcessData pandas dataframe.
 56
 57    Attributes
 58    ----------
 59    - **`exp_data`** (*ExperimentProcessData*): A reference to the main experiment data object, used for accessing schedule information and recording processed events.
 60
 61    Methods
 62    -------
 63    - `find_first_not_filled`(...)
 64        Helper method to find the first available archive slot in the durations TOML file.
 65    - `save_durations`(...)
 66        Saves provided valve durations (side_one, side_two) either as the 'selected' profile or into an archive slot.
 67    - `load_durations`(...)
 68        Loads a specified valve duration profile (defaulting to 'selected') from the TOML file.
 69    - `load_schedule_indices`()
 70        Generates 0-indexed numpy arrays representing the valve schedule for an experiment based on `ExperimentProcessData` program_schedule_df.
 71    - `increment_licks`(...)
 72        Increments the appropriate lick counter in the `ExperimentProcessData`.
 73    - `handle_licks`(...)
 74        Parses and processes data strings specifically identified as lick events.
 75    - `record_event`(...)
 76        Records a processed event (lick or motor movement) into the `ExperimentProcessData` DataFrame.
 77    - `process_data`(...)
 78        Main entry point for parsing incoming data strings from the Arduino, routing to specific handlers (like `handle_licks`) based on content.
 79    """
 80
 81    def __init__(self, exp_data):
 82        """
 83        Initializes the ArduinoData manager.
 84
 85        Parameters
 86        ----------
 87        - **exp_data** (*ExperimentProcessData*): The instance of the ExperimentProcessData class holding all current experiment configuration and trial data.
 88        """
 89        self.exp_data = exp_data
 90
 91    def find_first_not_filled(self, toml_file: dict) -> int:
 92        """
 93        Searches the loaded TOML file structure for the first available archive slot.
 94
 95        Checks `archive_1`, `archive_2`, then `archive_3` for attribute `filled = false`.
 96
 97        Parameters
 98        ----------
 99        - **toml_file** (*dict*): The dictionary representation of the loaded valve durations TOML file.
100
101        Returns
102        -------
103        - *int*: The index (1, 2, or 3) of the first unfilled archive slot. Returns 0 if all slots (1, 2, 3) are marked as filled.
104        """
105        first_available = 0
106
107        for i in range(1, 4):
108            archive = toml_file[f"archive_{i}"]
109            if archive["filled"]:
110                continue
111            else:
112                first_available = i
113        return first_available
114
115    def save_durations(
116        self,
117        side_one: npt.NDArray[np.int32],
118        side_two: npt.NDArray[np.int32],
119        type_durations: str,
120    ) -> None:
121        """
122        Saves valve open durations to the `valve_durations.toml` configuration file.
123
124        This function handles saving durations either as the primary 'selected' profile
125        or archiving them. When archiving, it implements a FIFO (First-In, First-Out)
126        logic using three archive slots:
127        1. If slot 1 is free, save there.
128        2. If slot 1 is full but 2 is free, move 1->2, save new to 1.
129        3. If slots 1 and 2 are full (or all slots are full), move 2->3 (overwriting 3 if necessary), move 1->2, save new to 1.
130
131        Converts NumPy arrays to Python lists before saving to TOML for compatibility reasons. Records the
132        current datetime for the saved profile to mark when the profile was created.
133
134        Parameters
135        ----------
136        - **side_one** (*npt.NDArray[np.int32]*): Numpy array of durations (microseconds) for side one valves.
137        - **side_two** (*npt.NDArray[np.int32]*): Numpy array of durations (microseconds) for side two valves.
138        - **type_durations** (*str*): Specifies the save location. Must be either `"selected"` (to update the main profile) or `"archive"`
139        (to save to the next available archive slot).
140
141        Raises
142        ------
143        - *FileNotFoundError*: If the `valve_durations.toml` file cannot be found at the path specified by `system_config`.
144        - *toml.TomlDecodeError*: If the TOML file is malformed.
145        - *IOError*: If there are issues reading or writing the TOML file.
146        - *KeyError*: If the expected keys (`selected_durations`, `archive_1`, etc.) are missing in the TOML structure.
147        """
148
149        dur_path = system_config.get_valve_durations()
150
151        with open(dur_path, "r") as f:
152            toml_file = toml.load(f)
153
154        if type_durations == "selected":
155            selected_durations = toml_file["selected_durations"]
156
157            # toml only knows how to deal with python native types (list, int, etc)
158            # so we convert np arrays to lists
159            selected_durations["date_used"] = datetime.datetime.now()
160            selected_durations["side_one_durations"] = side_one.tolist()
161            selected_durations["side_two_durations"] = side_two.tolist()
162
163            # save the file with the updated content
164            with open(dur_path, "w") as f:
165                toml.dump(toml_file, f)
166        elif type_durations == "archive":
167            # find first archive not filled
168            first_avail = self.find_first_not_filled(toml_file)
169
170            match first_avail:
171                case 1:
172                    # 1 is available just insert
173                    archive = toml_file["archive_1"]
174
175                    archive["filled"] = True
176                    archive["date_used"] = datetime.datetime.now()
177                    archive["side_one_durations"] = side_one.tolist()
178                    archive["side_two_durations"] = side_two.tolist()
179
180                    with open(dur_path, "w") as f:
181                        toml.dump(toml_file, f)
182                case 2:
183                    # move 1-> 2, insert 1
184                    toml_file["archive_2"] = copy.deepcopy(toml_file["archive_1"])
185
186                    archive_1 = toml_file["archive_1"]
187
188                    archive_1["filled"] = True
189                    archive_1["date_used"] = datetime.datetime.now()
190                    archive_1["side_one_durations"] = side_one.tolist()
191                    archive_1["side_two_durations"] = side_two.tolist()
192
193                    with open(dur_path, "w") as f:
194                        toml.dump(toml_file, f)
195
196                case 3 | 0:
197                    # case 3) 2 -> 3, 1-> 2, insert 1
198                    # case 0 (no available archives) ) del / overwrite 3 (oldest), 2 -> 3, 1-> 2, insert 1
199                    toml_file["archive_3"] = copy.deepcopy(toml_file["archive_2"])
200                    toml_file["archive_2"] = copy.deepcopy(toml_file["archive_1"])
201
202                    archive_1 = toml_file["archive_1"]
203
204                    archive_1["filled"] = True
205                    archive_1["date_used"] = datetime.datetime.now()
206                    archive_1["side_one_durations"] = side_one.tolist()
207                    archive_1["side_two_durations"] = side_two.tolist()
208
209                    with open(dur_path, "w") as f:
210                        toml.dump(toml_file, f)
211
212    def load_durations(
213        self, type_durations="selected_durations"
214    ) -> tuple[npt.NDArray[np.int32], npt.NDArray[np.int32], datetime.datetime]:
215        """
216        Loads a specific valve duration profile from the `valve_durations.toml` file.
217
218        Retrieves the durations for side one and side two, along with the timestamp
219        when that profile was created. Converts the lists from the TOML file back
220        into NumPy `np.int32` arrays for efficient operations.
221
222        Parameters
223        ----------
224        - **type_durations** (*str, optional*): The key corresponding to the desired profile in the TOML file
225        (e.g., `"selected_durations"`, `"default_durations"`, `"archive_1"`). Defaults to `"selected_durations"`.
226
227        Returns
228        -------
229        - *tuple[npt.NDArray[np.int32], npt.NDArray[np.int32], datetime.datetime]*: A tuple containing:
230            - Numpy array of durations for side one.
231            - Numpy array of durations for side two.
232            - The datetime object indicating when the loaded profile was saved.
233
234        Raises
235        ------
236        - *FileNotFoundError*: If the `valve_durations.toml` file cannot be found.
237        - *toml.TomlDecodeError*: If the TOML file is malformed.
238        - *IOError*: If there are issues reading the TOML file.
239        - *KeyError*: If the specified `type_durations` key or expected sub-keys (`side_one_durations`, `date_used`, etc.) are missing.
240        """
241        valve_durations_toml = system_config.get_valve_durations()
242
243        with open(valve_durations_toml, "r") as f:
244            toml_file = toml.load(f)
245
246        file_durations = toml_file[type_durations]
247
248        # create 2 np arrays 8 n long with np.int32s
249        dur_side_one = np.full(VALVES_PER_SIDE, np.int32(0))
250        dur_side_two = np.full(VALVES_PER_SIDE, np.int32(0))
251
252        # keys here are names of durations i.e side_one_dur or side_two_dur, values
253        # are lists of durations, VALVES_PER_SIDE long. Will produce two lists
254        # VALVES_PER_SIDE long so that each valve in the rig is assigned a duration.
255        for key, value in file_durations.items():
256            if key == "side_one_durations":
257                for i, duration in enumerate(value):
258                    dur_side_one[i] = duration
259            elif key == "side_two_durations":
260                for i, duration in enumerate(value):
261                    dur_side_two[i] = duration
262
263        date_used = file_durations["date_used"]
264
265        return dur_side_one, dur_side_two, date_used
266
267    def load_schedule_indices(
268        self,
269    ) -> tuple[npt.NDArray[np.int8], npt.NDArray[np.int8]]:
270        """
271        Generates 0-indexed valve schedule arrays based on the experiment schedule.
272
273        Uses the stimuli names and the trial-by-trial schedule defined in the
274        associated `ExperimentProcessData` object (`self.exp_data`) to create two NumPy arrays.
275        The index for side one is first found, then the associated valve is found via `exp_data.get_paired_index`.
276
277        These arrays represent the 0-indexed valve number to be activated for each trial
278        on side one and side two, respectively. This format is suitable for sending
279        to the Arduino.
280
281        Returns
282        -------
283        - *tuple[npt.NDArray[np.int8], npt.NDArray[np.int8]]*: A tuple containing:
284            - Numpy array of 0-indexed valve numbers for side one schedule.
285            - Numpy array of 0-indexed valve numbers for side two schedule.
286
287        Raises
288        ------
289        - *AttributeError*: If `self.exp_data` or its required attributes/methods (like `stimuli_data`, `program_schedule_df`, `get_paired_index`) are missing or invalid.
290        - *KeyError*: If expected keys (like 'Num Stimuli', 'Port 1') are missing in `exp_data`.
291        - *ValueError*: If a stimulus name from the schedule is not found in the unique stimuli list.
292        """
293        stimuli_data = self.exp_data.stimuli_data
294        schedule_df = self.exp_data.program_schedule_df
295
296        num_stimuli = self.exp_data.exp_var_entries["Num Stimuli"]
297        num_trials = self.exp_data.exp_var_entries["Num Trials"]
298
299        sched_side_one = np.full(num_trials, np.int8(0))
300        sched_side_two = np.full(num_trials, np.int8(0))
301
302        # get unique stim names. this will give us all names on side one.
303        # finding the index of a given stimuli appearing in a trial on side
304        # one will easily allow us to find the paired index. We've already done
305        # this before in exp_process_data
306        names = list(stimuli_data.stimuli_vars.values())
307        unique_names = names[: num_stimuli // 2]
308
309        side_one_trial_stimuli = schedule_df["Port 1"].to_numpy()
310
311        for i, trial_stim in enumerate(side_one_trial_stimuli):
312            index = unique_names.index(trial_stim)
313            sd_two_index = self.exp_data.get_paired_index(index, num_stimuli)
314            sched_side_one[i] = index
315            sched_side_two[i] = sd_two_index
316
317        return sched_side_one, sched_side_two
318
319    def increment_licks(self, side: int, event_data: EventData):
320        """
321        Increments the lick counter for the specified side within the EventData object.
322
323        Parameters
324        ----------
325        - **side** (*int*): The side where the lick occurred (0 for side one, 1 for side two).
326        - **event_data** (*EventData*): The EventData instance associated with `self.exp_data` where lick counts are stored.
327        """
328        match side:
329            case 0:
330                event_data.side_one_licks += 1
331            case 1:
332                event_data.side_two_licks += 1
333            case _:
334                logger.error("invalid side")
335
336    def handle_licks(
337        self,
338        split_data: list[str],
339        event_data: EventData,
340        state: str,
341        trigger: Callable,
342    ) -> None:
343        """
344        Parses and records data specifically identified as a lick event.
345
346        Extracts side, lick duration, timestamps, and valve duration (if in "SAMPLE" state)
347        from the `split_data` list based on the current experiment `state` ("TTC" or "SAMPLE").
348        Calls `increment_licks` and `record_event`. For "TTC" state, it checks if
349        the lick count threshold is met to trigger a state change to "SAMPLE".
350
351        Parameters
352        ----------
353        - **split_data** (*list[str]*): The list of strings resulting from splitting the incoming Arduino data by content separator '|'.
354        - **event_data** (*EventData*): The EventData instance for accessing lick counts.
355        - **state** (*str*): The current state of the experiment FSM (e.g., "TTC", "SAMPLE").
356        - **trigger** (*Callable*): The state machine's trigger function, used here to transition state to "SAMPLE" state based on lick counts.
357
358        Raises
359        ------
360        - *IndexError*: If `split_data` does not contain the expected number of elements for the given state.
361        - *ValueError*: If elements in `split_data` cannot be converted to the expected types (int, np.int8, np.int32).
362        """
363        side = None
364        valve_duration = None
365        match state:
366            case "TTC":
367                # split_data will have the below form for ttc
368                # 0(side)|67(duration)|6541(stamp_rel_to_start)|6541(stamp_rel_to_trial)
369                # 1|16|0|496570|41 from arduino
370                side = np.int8(split_data[0])
371                self.increment_licks(side, event_data)
372
373                lick_duration = np.int32(split_data[1])
374                time_rel_to_start = np.int32(split_data[2]) / 1000
375                time_rel_to_trial = np.int32(split_data[3]) / 1000
376
377                # insert the values held in licks for respective sides in a shorter variable name
378                side_one = self.exp_data.event_data.side_one_licks
379                side_two = self.exp_data.event_data.side_two_licks
380                # if 3 or more licks in a ttc time, jump straight to sample
381                if side_one > 2 or side_two > 2:
382                    trigger("SAMPLE")
383                self.record_event(
384                    side, lick_duration, time_rel_to_start, time_rel_to_trial, state
385                )
386
387            case "SAMPLE":
388                try:
389                    # 0|87|26064|8327|8327
390                    side = np.int8(split_data[0])
391                    self.increment_licks(side, event_data)
392
393                    lick_duration = np.int32(split_data[1])
394                    valve_duration = np.int32(split_data[2])
395                    time_rel_to_start = np.int32(split_data[3]) / 1000
396                    time_rel_to_trial = np.int32(split_data[4]) / 1000
397
398                    self.record_event(
399                        side,
400                        lick_duration,
401                        time_rel_to_start,
402                        time_rel_to_trial,
403                        state,
404                        valve_duration,
405                    )
406
407                except Exception as e:
408                    logging.error(f"IMPROPER DATA.... IGNORING.....{e}")
409
410    def record_event(
411        self,
412        side: int,
413        duration: np.int32,
414        time_rel_to_start: float,
415        time_rel_to_trial: float,
416        state: str,
417        valve_dur: np.int32 | None = None,
418    ):
419        """
420        Records a processed event (lick or motor movement) into the `ExperimentProcessData` `EventData` DataFrame.
421
422        Uses the `insert_row_into_df` method of the `EventData` object (`self.exp_data.event_data`)
423        to add a new row containing the details of the event.
424
425        Parameters
426        ----------
427        - **side** (*int*): The side associated with the event (0 for side one, 1 for side two).
428        - **duration** (*np.int32*): The duration of the event (e.g., lick contact time) in milliseconds.
429        - **time_rel_to_start** (*float*): The timestamp of the event relative to the start of the entire program, in seconds.
430        - **time_rel_to_trial** (*float*): The timestamp of the event relative to the start of the current trial, in seconds.
431        - **state** (*str*): A string describing the state during the event (e.g., "TTC", "SAMPLE").
432        - **valve_dur** (*np.int32 | None, optional*): The duration the valve was open for this event (microseconds),
433        if applicable (e.g., during "SAMPLE" state licks). Defaults to None.
434
435        Raises
436        ------
437        - Propagates exceptions from `event_data.insert_row_into_df`.
438        """
439        try:
440            event_data = self.exp_data.event_data
441            current_trial = self.exp_data.current_trial_number
442            # insert the lick record into the dataframe
443            event_data.insert_row_into_df(
444                current_trial,
445                side + 1,
446                duration,
447                time_rel_to_start,
448                time_rel_to_trial,
449                state,
450                valve_duration=(valve_dur if valve_dur else None),
451            )
452
453            logging.info(f"Lick data recorded for side: {side + 1}")
454        except Exception as e:
455            logging.error(f"Error recording lick data: {e}")
456            raise
457
458    def process_data(
459        self, source: str, data: str, state: str, trigger: Callable
460    ) -> None:
461        """
462        Processes incoming data strings received from the Arduino via the serial connection.
463
464        Splits the raw data string by the '|' content separator. Determines the type of
465        data based on the first element (`split_data[0]`). Directly handles "MOTOR" events,
466        and routes lick data (`split_data[0]` is '0' or '1')
467        to `handle_licks` for further processing.
468
469        Parameters
470        ----------
471        - **source** (*str*): Identifier for the source of the data. Included for logging/debugging.
472        - **data** (*str*): The raw data string received from the Arduino.
473        - **state** (*str*): The current state of the experiment FSM, passed to handlers like `handle_licks`.
474        - **trigger** (*Callable*): The state machine's trigger function, passed to handlers like `handle_licks`.
475
476        Raises
477        ------
478        - *IndexError*: If `data.split('|')` results in an empty list or accessing `split_data[0]` fails.
479        - Propagates exceptions from `handle_licks` or `record_event`.
480        """
481        event_data = self.exp_data.event_data
482        split_data = data.split("|")
483        try:
484            if split_data[0] == "MOTOR":
485                # data will arrive in the following format
486                # MOTOR|DOWN|2338|7340|7340
487                # MOTOR|MOVEMENT|DURATION|END_TIME_REL_TO_PROG_START|END_TIME_REL_TO_PROG_TRIAL_START
488                event_data = self.exp_data.event_data
489
490                current_trial = self.exp_data.current_trial_number
491
492                event_data.insert_row_into_df(
493                    current_trial,
494                    None,
495                    np.int32(split_data[2]),
496                    (np.int32(split_data[3]) / 1000),
497                    (np.int32(split_data[4]) / 1000),
498                    f"MOTOR {split_data[1]}",
499                )
500            elif split_data[0] == "0" or split_data[0] == "1":
501                self.handle_licks(split_data, event_data, state, trigger)
502
503        except Exception as e:
504            logging.error(f"Error processing data from {source}: {e}")
505            raise
logger = <RootLogger root (INFO)>
rig_config = '/home/blake/Documents/Photologic-Experiment-Rig-Files/assets/rig_config.toml'
TOTAL_POSSIBLE_VALVES = 16
VALVES_PER_SIDE = 8
class ArduinoData:
 47class ArduinoData:
 48    """
 49    Manages Arduino-related data, focusing on valve duration storage persistence and
 50    processing incoming serial data during experiments.
 51
 52    This class acts as an interface for saving and retrieving valve open durations
 53    from the TOML configuration file, implementing a simple archival and 'profile' system. It also
 54    contains methods to interpret data strings sent from the Arduino (like lick
 55    events and motor status) and record relevant information into an associated
 56    ExperimentProcessData pandas dataframe.
 57
 58    Attributes
 59    ----------
 60    - **`exp_data`** (*ExperimentProcessData*): A reference to the main experiment data object, used for accessing schedule information and recording processed events.
 61
 62    Methods
 63    -------
 64    - `find_first_not_filled`(...)
 65        Helper method to find the first available archive slot in the durations TOML file.
 66    - `save_durations`(...)
 67        Saves provided valve durations (side_one, side_two) either as the 'selected' profile or into an archive slot.
 68    - `load_durations`(...)
 69        Loads a specified valve duration profile (defaulting to 'selected') from the TOML file.
 70    - `load_schedule_indices`()
 71        Generates 0-indexed numpy arrays representing the valve schedule for an experiment based on `ExperimentProcessData` program_schedule_df.
 72    - `increment_licks`(...)
 73        Increments the appropriate lick counter in the `ExperimentProcessData`.
 74    - `handle_licks`(...)
 75        Parses and processes data strings specifically identified as lick events.
 76    - `record_event`(...)
 77        Records a processed event (lick or motor movement) into the `ExperimentProcessData` DataFrame.
 78    - `process_data`(...)
 79        Main entry point for parsing incoming data strings from the Arduino, routing to specific handlers (like `handle_licks`) based on content.
 80    """
 81
 82    def __init__(self, exp_data):
 83        """
 84        Initializes the ArduinoData manager.
 85
 86        Parameters
 87        ----------
 88        - **exp_data** (*ExperimentProcessData*): The instance of the ExperimentProcessData class holding all current experiment configuration and trial data.
 89        """
 90        self.exp_data = exp_data
 91
 92    def find_first_not_filled(self, toml_file: dict) -> int:
 93        """
 94        Searches the loaded TOML file structure for the first available archive slot.
 95
 96        Checks `archive_1`, `archive_2`, then `archive_3` for attribute `filled = false`.
 97
 98        Parameters
 99        ----------
100        - **toml_file** (*dict*): The dictionary representation of the loaded valve durations TOML file.
101
102        Returns
103        -------
104        - *int*: The index (1, 2, or 3) of the first unfilled archive slot. Returns 0 if all slots (1, 2, 3) are marked as filled.
105        """
106        first_available = 0
107
108        for i in range(1, 4):
109            archive = toml_file[f"archive_{i}"]
110            if archive["filled"]:
111                continue
112            else:
113                first_available = i
114        return first_available
115
116    def save_durations(
117        self,
118        side_one: npt.NDArray[np.int32],
119        side_two: npt.NDArray[np.int32],
120        type_durations: str,
121    ) -> None:
122        """
123        Saves valve open durations to the `valve_durations.toml` configuration file.
124
125        This function handles saving durations either as the primary 'selected' profile
126        or archiving them. When archiving, it implements a FIFO (First-In, First-Out)
127        logic using three archive slots:
128        1. If slot 1 is free, save there.
129        2. If slot 1 is full but 2 is free, move 1->2, save new to 1.
130        3. If slots 1 and 2 are full (or all slots are full), move 2->3 (overwriting 3 if necessary), move 1->2, save new to 1.
131
132        Converts NumPy arrays to Python lists before saving to TOML for compatibility reasons. Records the
133        current datetime for the saved profile to mark when the profile was created.
134
135        Parameters
136        ----------
137        - **side_one** (*npt.NDArray[np.int32]*): Numpy array of durations (microseconds) for side one valves.
138        - **side_two** (*npt.NDArray[np.int32]*): Numpy array of durations (microseconds) for side two valves.
139        - **type_durations** (*str*): Specifies the save location. Must be either `"selected"` (to update the main profile) or `"archive"`
140        (to save to the next available archive slot).
141
142        Raises
143        ------
144        - *FileNotFoundError*: If the `valve_durations.toml` file cannot be found at the path specified by `system_config`.
145        - *toml.TomlDecodeError*: If the TOML file is malformed.
146        - *IOError*: If there are issues reading or writing the TOML file.
147        - *KeyError*: If the expected keys (`selected_durations`, `archive_1`, etc.) are missing in the TOML structure.
148        """
149
150        dur_path = system_config.get_valve_durations()
151
152        with open(dur_path, "r") as f:
153            toml_file = toml.load(f)
154
155        if type_durations == "selected":
156            selected_durations = toml_file["selected_durations"]
157
158            # toml only knows how to deal with python native types (list, int, etc)
159            # so we convert np arrays to lists
160            selected_durations["date_used"] = datetime.datetime.now()
161            selected_durations["side_one_durations"] = side_one.tolist()
162            selected_durations["side_two_durations"] = side_two.tolist()
163
164            # save the file with the updated content
165            with open(dur_path, "w") as f:
166                toml.dump(toml_file, f)
167        elif type_durations == "archive":
168            # find first archive not filled
169            first_avail = self.find_first_not_filled(toml_file)
170
171            match first_avail:
172                case 1:
173                    # 1 is available just insert
174                    archive = toml_file["archive_1"]
175
176                    archive["filled"] = True
177                    archive["date_used"] = datetime.datetime.now()
178                    archive["side_one_durations"] = side_one.tolist()
179                    archive["side_two_durations"] = side_two.tolist()
180
181                    with open(dur_path, "w") as f:
182                        toml.dump(toml_file, f)
183                case 2:
184                    # move 1-> 2, insert 1
185                    toml_file["archive_2"] = copy.deepcopy(toml_file["archive_1"])
186
187                    archive_1 = toml_file["archive_1"]
188
189                    archive_1["filled"] = True
190                    archive_1["date_used"] = datetime.datetime.now()
191                    archive_1["side_one_durations"] = side_one.tolist()
192                    archive_1["side_two_durations"] = side_two.tolist()
193
194                    with open(dur_path, "w") as f:
195                        toml.dump(toml_file, f)
196
197                case 3 | 0:
198                    # case 3) 2 -> 3, 1-> 2, insert 1
199                    # case 0 (no available archives) ) del / overwrite 3 (oldest), 2 -> 3, 1-> 2, insert 1
200                    toml_file["archive_3"] = copy.deepcopy(toml_file["archive_2"])
201                    toml_file["archive_2"] = copy.deepcopy(toml_file["archive_1"])
202
203                    archive_1 = toml_file["archive_1"]
204
205                    archive_1["filled"] = True
206                    archive_1["date_used"] = datetime.datetime.now()
207                    archive_1["side_one_durations"] = side_one.tolist()
208                    archive_1["side_two_durations"] = side_two.tolist()
209
210                    with open(dur_path, "w") as f:
211                        toml.dump(toml_file, f)
212
213    def load_durations(
214        self, type_durations="selected_durations"
215    ) -> tuple[npt.NDArray[np.int32], npt.NDArray[np.int32], datetime.datetime]:
216        """
217        Loads a specific valve duration profile from the `valve_durations.toml` file.
218
219        Retrieves the durations for side one and side two, along with the timestamp
220        when that profile was created. Converts the lists from the TOML file back
221        into NumPy `np.int32` arrays for efficient operations.
222
223        Parameters
224        ----------
225        - **type_durations** (*str, optional*): The key corresponding to the desired profile in the TOML file
226        (e.g., `"selected_durations"`, `"default_durations"`, `"archive_1"`). Defaults to `"selected_durations"`.
227
228        Returns
229        -------
230        - *tuple[npt.NDArray[np.int32], npt.NDArray[np.int32], datetime.datetime]*: A tuple containing:
231            - Numpy array of durations for side one.
232            - Numpy array of durations for side two.
233            - The datetime object indicating when the loaded profile was saved.
234
235        Raises
236        ------
237        - *FileNotFoundError*: If the `valve_durations.toml` file cannot be found.
238        - *toml.TomlDecodeError*: If the TOML file is malformed.
239        - *IOError*: If there are issues reading the TOML file.
240        - *KeyError*: If the specified `type_durations` key or expected sub-keys (`side_one_durations`, `date_used`, etc.) are missing.
241        """
242        valve_durations_toml = system_config.get_valve_durations()
243
244        with open(valve_durations_toml, "r") as f:
245            toml_file = toml.load(f)
246
247        file_durations = toml_file[type_durations]
248
249        # create 2 np arrays 8 n long with np.int32s
250        dur_side_one = np.full(VALVES_PER_SIDE, np.int32(0))
251        dur_side_two = np.full(VALVES_PER_SIDE, np.int32(0))
252
253        # keys here are names of durations i.e side_one_dur or side_two_dur, values
254        # are lists of durations, VALVES_PER_SIDE long. Will produce two lists
255        # VALVES_PER_SIDE long so that each valve in the rig is assigned a duration.
256        for key, value in file_durations.items():
257            if key == "side_one_durations":
258                for i, duration in enumerate(value):
259                    dur_side_one[i] = duration
260            elif key == "side_two_durations":
261                for i, duration in enumerate(value):
262                    dur_side_two[i] = duration
263
264        date_used = file_durations["date_used"]
265
266        return dur_side_one, dur_side_two, date_used
267
268    def load_schedule_indices(
269        self,
270    ) -> tuple[npt.NDArray[np.int8], npt.NDArray[np.int8]]:
271        """
272        Generates 0-indexed valve schedule arrays based on the experiment schedule.
273
274        Uses the stimuli names and the trial-by-trial schedule defined in the
275        associated `ExperimentProcessData` object (`self.exp_data`) to create two NumPy arrays.
276        The index for side one is first found, then the associated valve is found via `exp_data.get_paired_index`.
277
278        These arrays represent the 0-indexed valve number to be activated for each trial
279        on side one and side two, respectively. This format is suitable for sending
280        to the Arduino.
281
282        Returns
283        -------
284        - *tuple[npt.NDArray[np.int8], npt.NDArray[np.int8]]*: A tuple containing:
285            - Numpy array of 0-indexed valve numbers for side one schedule.
286            - Numpy array of 0-indexed valve numbers for side two schedule.
287
288        Raises
289        ------
290        - *AttributeError*: If `self.exp_data` or its required attributes/methods (like `stimuli_data`, `program_schedule_df`, `get_paired_index`) are missing or invalid.
291        - *KeyError*: If expected keys (like 'Num Stimuli', 'Port 1') are missing in `exp_data`.
292        - *ValueError*: If a stimulus name from the schedule is not found in the unique stimuli list.
293        """
294        stimuli_data = self.exp_data.stimuli_data
295        schedule_df = self.exp_data.program_schedule_df
296
297        num_stimuli = self.exp_data.exp_var_entries["Num Stimuli"]
298        num_trials = self.exp_data.exp_var_entries["Num Trials"]
299
300        sched_side_one = np.full(num_trials, np.int8(0))
301        sched_side_two = np.full(num_trials, np.int8(0))
302
303        # get unique stim names. this will give us all names on side one.
304        # finding the index of a given stimuli appearing in a trial on side
305        # one will easily allow us to find the paired index. We've already done
306        # this before in exp_process_data
307        names = list(stimuli_data.stimuli_vars.values())
308        unique_names = names[: num_stimuli // 2]
309
310        side_one_trial_stimuli = schedule_df["Port 1"].to_numpy()
311
312        for i, trial_stim in enumerate(side_one_trial_stimuli):
313            index = unique_names.index(trial_stim)
314            sd_two_index = self.exp_data.get_paired_index(index, num_stimuli)
315            sched_side_one[i] = index
316            sched_side_two[i] = sd_two_index
317
318        return sched_side_one, sched_side_two
319
320    def increment_licks(self, side: int, event_data: EventData):
321        """
322        Increments the lick counter for the specified side within the EventData object.
323
324        Parameters
325        ----------
326        - **side** (*int*): The side where the lick occurred (0 for side one, 1 for side two).
327        - **event_data** (*EventData*): The EventData instance associated with `self.exp_data` where lick counts are stored.
328        """
329        match side:
330            case 0:
331                event_data.side_one_licks += 1
332            case 1:
333                event_data.side_two_licks += 1
334            case _:
335                logger.error("invalid side")
336
337    def handle_licks(
338        self,
339        split_data: list[str],
340        event_data: EventData,
341        state: str,
342        trigger: Callable,
343    ) -> None:
344        """
345        Parses and records data specifically identified as a lick event.
346
347        Extracts side, lick duration, timestamps, and valve duration (if in "SAMPLE" state)
348        from the `split_data` list based on the current experiment `state` ("TTC" or "SAMPLE").
349        Calls `increment_licks` and `record_event`. For "TTC" state, it checks if
350        the lick count threshold is met to trigger a state change to "SAMPLE".
351
352        Parameters
353        ----------
354        - **split_data** (*list[str]*): The list of strings resulting from splitting the incoming Arduino data by content separator '|'.
355        - **event_data** (*EventData*): The EventData instance for accessing lick counts.
356        - **state** (*str*): The current state of the experiment FSM (e.g., "TTC", "SAMPLE").
357        - **trigger** (*Callable*): The state machine's trigger function, used here to transition state to "SAMPLE" state based on lick counts.
358
359        Raises
360        ------
361        - *IndexError*: If `split_data` does not contain the expected number of elements for the given state.
362        - *ValueError*: If elements in `split_data` cannot be converted to the expected types (int, np.int8, np.int32).
363        """
364        side = None
365        valve_duration = None
366        match state:
367            case "TTC":
368                # split_data will have the below form for ttc
369                # 0(side)|67(duration)|6541(stamp_rel_to_start)|6541(stamp_rel_to_trial)
370                # 1|16|0|496570|41 from arduino
371                side = np.int8(split_data[0])
372                self.increment_licks(side, event_data)
373
374                lick_duration = np.int32(split_data[1])
375                time_rel_to_start = np.int32(split_data[2]) / 1000
376                time_rel_to_trial = np.int32(split_data[3]) / 1000
377
378                # insert the values held in licks for respective sides in a shorter variable name
379                side_one = self.exp_data.event_data.side_one_licks
380                side_two = self.exp_data.event_data.side_two_licks
381                # if 3 or more licks in a ttc time, jump straight to sample
382                if side_one > 2 or side_two > 2:
383                    trigger("SAMPLE")
384                self.record_event(
385                    side, lick_duration, time_rel_to_start, time_rel_to_trial, state
386                )
387
388            case "SAMPLE":
389                try:
390                    # 0|87|26064|8327|8327
391                    side = np.int8(split_data[0])
392                    self.increment_licks(side, event_data)
393
394                    lick_duration = np.int32(split_data[1])
395                    valve_duration = np.int32(split_data[2])
396                    time_rel_to_start = np.int32(split_data[3]) / 1000
397                    time_rel_to_trial = np.int32(split_data[4]) / 1000
398
399                    self.record_event(
400                        side,
401                        lick_duration,
402                        time_rel_to_start,
403                        time_rel_to_trial,
404                        state,
405                        valve_duration,
406                    )
407
408                except Exception as e:
409                    logging.error(f"IMPROPER DATA.... IGNORING.....{e}")
410
411    def record_event(
412        self,
413        side: int,
414        duration: np.int32,
415        time_rel_to_start: float,
416        time_rel_to_trial: float,
417        state: str,
418        valve_dur: np.int32 | None = None,
419    ):
420        """
421        Records a processed event (lick or motor movement) into the `ExperimentProcessData` `EventData` DataFrame.
422
423        Uses the `insert_row_into_df` method of the `EventData` object (`self.exp_data.event_data`)
424        to add a new row containing the details of the event.
425
426        Parameters
427        ----------
428        - **side** (*int*): The side associated with the event (0 for side one, 1 for side two).
429        - **duration** (*np.int32*): The duration of the event (e.g., lick contact time) in milliseconds.
430        - **time_rel_to_start** (*float*): The timestamp of the event relative to the start of the entire program, in seconds.
431        - **time_rel_to_trial** (*float*): The timestamp of the event relative to the start of the current trial, in seconds.
432        - **state** (*str*): A string describing the state during the event (e.g., "TTC", "SAMPLE").
433        - **valve_dur** (*np.int32 | None, optional*): The duration the valve was open for this event (microseconds),
434        if applicable (e.g., during "SAMPLE" state licks). Defaults to None.
435
436        Raises
437        ------
438        - Propagates exceptions from `event_data.insert_row_into_df`.
439        """
440        try:
441            event_data = self.exp_data.event_data
442            current_trial = self.exp_data.current_trial_number
443            # insert the lick record into the dataframe
444            event_data.insert_row_into_df(
445                current_trial,
446                side + 1,
447                duration,
448                time_rel_to_start,
449                time_rel_to_trial,
450                state,
451                valve_duration=(valve_dur if valve_dur else None),
452            )
453
454            logging.info(f"Lick data recorded for side: {side + 1}")
455        except Exception as e:
456            logging.error(f"Error recording lick data: {e}")
457            raise
458
459    def process_data(
460        self, source: str, data: str, state: str, trigger: Callable
461    ) -> None:
462        """
463        Processes incoming data strings received from the Arduino via the serial connection.
464
465        Splits the raw data string by the '|' content separator. Determines the type of
466        data based on the first element (`split_data[0]`). Directly handles "MOTOR" events,
467        and routes lick data (`split_data[0]` is '0' or '1')
468        to `handle_licks` for further processing.
469
470        Parameters
471        ----------
472        - **source** (*str*): Identifier for the source of the data. Included for logging/debugging.
473        - **data** (*str*): The raw data string received from the Arduino.
474        - **state** (*str*): The current state of the experiment FSM, passed to handlers like `handle_licks`.
475        - **trigger** (*Callable*): The state machine's trigger function, passed to handlers like `handle_licks`.
476
477        Raises
478        ------
479        - *IndexError*: If `data.split('|')` results in an empty list or accessing `split_data[0]` fails.
480        - Propagates exceptions from `handle_licks` or `record_event`.
481        """
482        event_data = self.exp_data.event_data
483        split_data = data.split("|")
484        try:
485            if split_data[0] == "MOTOR":
486                # data will arrive in the following format
487                # MOTOR|DOWN|2338|7340|7340
488                # MOTOR|MOVEMENT|DURATION|END_TIME_REL_TO_PROG_START|END_TIME_REL_TO_PROG_TRIAL_START
489                event_data = self.exp_data.event_data
490
491                current_trial = self.exp_data.current_trial_number
492
493                event_data.insert_row_into_df(
494                    current_trial,
495                    None,
496                    np.int32(split_data[2]),
497                    (np.int32(split_data[3]) / 1000),
498                    (np.int32(split_data[4]) / 1000),
499                    f"MOTOR {split_data[1]}",
500                )
501            elif split_data[0] == "0" or split_data[0] == "1":
502                self.handle_licks(split_data, event_data, state, trigger)
503
504        except Exception as e:
505            logging.error(f"Error processing data from {source}: {e}")
506            raise

Manages Arduino-related data, focusing on valve duration storage persistence and processing incoming serial data during experiments.

This class acts as an interface for saving and retrieving valve open durations from the TOML configuration file, implementing a simple archival and 'profile' system. It also contains methods to interpret data strings sent from the Arduino (like lick events and motor status) and record relevant information into an associated ExperimentProcessData pandas dataframe.

Attributes

  • exp_data (ExperimentProcessData): A reference to the main experiment data object, used for accessing schedule information and recording processed events.

Methods

  • find_first_not_filled(...) Helper method to find the first available archive slot in the durations TOML file.
  • save_durations(...) Saves provided valve durations (side_one, side_two) either as the 'selected' profile or into an archive slot.
  • load_durations(...) Loads a specified valve duration profile (defaulting to 'selected') from the TOML file.
  • load_schedule_indices() Generates 0-indexed numpy arrays representing the valve schedule for an experiment based on ExperimentProcessData program_schedule_df.
  • increment_licks(...) Increments the appropriate lick counter in the ExperimentProcessData.
  • handle_licks(...) Parses and processes data strings specifically identified as lick events.
  • record_event(...) Records a processed event (lick or motor movement) into the ExperimentProcessData DataFrame.
  • process_data(...) Main entry point for parsing incoming data strings from the Arduino, routing to specific handlers (like handle_licks) based on content.
ArduinoData(exp_data)
82    def __init__(self, exp_data):
83        """
84        Initializes the ArduinoData manager.
85
86        Parameters
87        ----------
88        - **exp_data** (*ExperimentProcessData*): The instance of the ExperimentProcessData class holding all current experiment configuration and trial data.
89        """
90        self.exp_data = exp_data

Initializes the ArduinoData manager.

Parameters

  • exp_data (ExperimentProcessData): The instance of the ExperimentProcessData class holding all current experiment configuration and trial data.
exp_data
def find_first_not_filled(self, toml_file: dict) -> int:
 92    def find_first_not_filled(self, toml_file: dict) -> int:
 93        """
 94        Searches the loaded TOML file structure for the first available archive slot.
 95
 96        Checks `archive_1`, `archive_2`, then `archive_3` for attribute `filled = false`.
 97
 98        Parameters
 99        ----------
100        - **toml_file** (*dict*): The dictionary representation of the loaded valve durations TOML file.
101
102        Returns
103        -------
104        - *int*: The index (1, 2, or 3) of the first unfilled archive slot. Returns 0 if all slots (1, 2, 3) are marked as filled.
105        """
106        first_available = 0
107
108        for i in range(1, 4):
109            archive = toml_file[f"archive_{i}"]
110            if archive["filled"]:
111                continue
112            else:
113                first_available = i
114        return first_available

Searches the loaded TOML file structure for the first available archive slot.

Checks archive_1, archive_2, then archive_3 for attribute filled = false.

Parameters

  • toml_file (dict): The dictionary representation of the loaded valve durations TOML file.

Returns

  • int: The index (1, 2, or 3) of the first unfilled archive slot. Returns 0 if all slots (1, 2, 3) are marked as filled.
def save_durations( self, side_one: numpy.ndarray[tuple[int, ...], numpy.dtype[numpy.int32]], side_two: numpy.ndarray[tuple[int, ...], numpy.dtype[numpy.int32]], type_durations: str) -> None:
116    def save_durations(
117        self,
118        side_one: npt.NDArray[np.int32],
119        side_two: npt.NDArray[np.int32],
120        type_durations: str,
121    ) -> None:
122        """
123        Saves valve open durations to the `valve_durations.toml` configuration file.
124
125        This function handles saving durations either as the primary 'selected' profile
126        or archiving them. When archiving, it implements a FIFO (First-In, First-Out)
127        logic using three archive slots:
128        1. If slot 1 is free, save there.
129        2. If slot 1 is full but 2 is free, move 1->2, save new to 1.
130        3. If slots 1 and 2 are full (or all slots are full), move 2->3 (overwriting 3 if necessary), move 1->2, save new to 1.
131
132        Converts NumPy arrays to Python lists before saving to TOML for compatibility reasons. Records the
133        current datetime for the saved profile to mark when the profile was created.
134
135        Parameters
136        ----------
137        - **side_one** (*npt.NDArray[np.int32]*): Numpy array of durations (microseconds) for side one valves.
138        - **side_two** (*npt.NDArray[np.int32]*): Numpy array of durations (microseconds) for side two valves.
139        - **type_durations** (*str*): Specifies the save location. Must be either `"selected"` (to update the main profile) or `"archive"`
140        (to save to the next available archive slot).
141
142        Raises
143        ------
144        - *FileNotFoundError*: If the `valve_durations.toml` file cannot be found at the path specified by `system_config`.
145        - *toml.TomlDecodeError*: If the TOML file is malformed.
146        - *IOError*: If there are issues reading or writing the TOML file.
147        - *KeyError*: If the expected keys (`selected_durations`, `archive_1`, etc.) are missing in the TOML structure.
148        """
149
150        dur_path = system_config.get_valve_durations()
151
152        with open(dur_path, "r") as f:
153            toml_file = toml.load(f)
154
155        if type_durations == "selected":
156            selected_durations = toml_file["selected_durations"]
157
158            # toml only knows how to deal with python native types (list, int, etc)
159            # so we convert np arrays to lists
160            selected_durations["date_used"] = datetime.datetime.now()
161            selected_durations["side_one_durations"] = side_one.tolist()
162            selected_durations["side_two_durations"] = side_two.tolist()
163
164            # save the file with the updated content
165            with open(dur_path, "w") as f:
166                toml.dump(toml_file, f)
167        elif type_durations == "archive":
168            # find first archive not filled
169            first_avail = self.find_first_not_filled(toml_file)
170
171            match first_avail:
172                case 1:
173                    # 1 is available just insert
174                    archive = toml_file["archive_1"]
175
176                    archive["filled"] = True
177                    archive["date_used"] = datetime.datetime.now()
178                    archive["side_one_durations"] = side_one.tolist()
179                    archive["side_two_durations"] = side_two.tolist()
180
181                    with open(dur_path, "w") as f:
182                        toml.dump(toml_file, f)
183                case 2:
184                    # move 1-> 2, insert 1
185                    toml_file["archive_2"] = copy.deepcopy(toml_file["archive_1"])
186
187                    archive_1 = toml_file["archive_1"]
188
189                    archive_1["filled"] = True
190                    archive_1["date_used"] = datetime.datetime.now()
191                    archive_1["side_one_durations"] = side_one.tolist()
192                    archive_1["side_two_durations"] = side_two.tolist()
193
194                    with open(dur_path, "w") as f:
195                        toml.dump(toml_file, f)
196
197                case 3 | 0:
198                    # case 3) 2 -> 3, 1-> 2, insert 1
199                    # case 0 (no available archives) ) del / overwrite 3 (oldest), 2 -> 3, 1-> 2, insert 1
200                    toml_file["archive_3"] = copy.deepcopy(toml_file["archive_2"])
201                    toml_file["archive_2"] = copy.deepcopy(toml_file["archive_1"])
202
203                    archive_1 = toml_file["archive_1"]
204
205                    archive_1["filled"] = True
206                    archive_1["date_used"] = datetime.datetime.now()
207                    archive_1["side_one_durations"] = side_one.tolist()
208                    archive_1["side_two_durations"] = side_two.tolist()
209
210                    with open(dur_path, "w") as f:
211                        toml.dump(toml_file, f)

Saves valve open durations to the valve_durations.toml configuration file.

This function handles saving durations either as the primary 'selected' profile or archiving them. When archiving, it implements a FIFO (First-In, First-Out) logic using three archive slots:

  1. If slot 1 is free, save there.
  2. If slot 1 is full but 2 is free, move 1->2, save new to 1.
  3. If slots 1 and 2 are full (or all slots are full), move 2->3 (overwriting 3 if necessary), move 1->2, save new to 1.

Converts NumPy arrays to Python lists before saving to TOML for compatibility reasons. Records the current datetime for the saved profile to mark when the profile was created.

Parameters

  • side_one (npt.NDArray[np.int32]): Numpy array of durations (microseconds) for side one valves.
  • side_two (npt.NDArray[np.int32]): Numpy array of durations (microseconds) for side two valves.
  • type_durations (str): Specifies the save location. Must be either "selected" (to update the main profile) or "archive" (to save to the next available archive slot).

Raises

  • FileNotFoundError: If the valve_durations.toml file cannot be found at the path specified by system_config.
  • toml.TomlDecodeError: If the TOML file is malformed.
  • IOError: If there are issues reading or writing the TOML file.
  • KeyError: If the expected keys (selected_durations, archive_1, etc.) are missing in the TOML structure.
def load_durations( self, type_durations='selected_durations') -> tuple[numpy.ndarray[tuple[int, ...], numpy.dtype[numpy.int32]], numpy.ndarray[tuple[int, ...], numpy.dtype[numpy.int32]], datetime.datetime]:
213    def load_durations(
214        self, type_durations="selected_durations"
215    ) -> tuple[npt.NDArray[np.int32], npt.NDArray[np.int32], datetime.datetime]:
216        """
217        Loads a specific valve duration profile from the `valve_durations.toml` file.
218
219        Retrieves the durations for side one and side two, along with the timestamp
220        when that profile was created. Converts the lists from the TOML file back
221        into NumPy `np.int32` arrays for efficient operations.
222
223        Parameters
224        ----------
225        - **type_durations** (*str, optional*): The key corresponding to the desired profile in the TOML file
226        (e.g., `"selected_durations"`, `"default_durations"`, `"archive_1"`). Defaults to `"selected_durations"`.
227
228        Returns
229        -------
230        - *tuple[npt.NDArray[np.int32], npt.NDArray[np.int32], datetime.datetime]*: A tuple containing:
231            - Numpy array of durations for side one.
232            - Numpy array of durations for side two.
233            - The datetime object indicating when the loaded profile was saved.
234
235        Raises
236        ------
237        - *FileNotFoundError*: If the `valve_durations.toml` file cannot be found.
238        - *toml.TomlDecodeError*: If the TOML file is malformed.
239        - *IOError*: If there are issues reading the TOML file.
240        - *KeyError*: If the specified `type_durations` key or expected sub-keys (`side_one_durations`, `date_used`, etc.) are missing.
241        """
242        valve_durations_toml = system_config.get_valve_durations()
243
244        with open(valve_durations_toml, "r") as f:
245            toml_file = toml.load(f)
246
247        file_durations = toml_file[type_durations]
248
249        # create 2 np arrays 8 n long with np.int32s
250        dur_side_one = np.full(VALVES_PER_SIDE, np.int32(0))
251        dur_side_two = np.full(VALVES_PER_SIDE, np.int32(0))
252
253        # keys here are names of durations i.e side_one_dur or side_two_dur, values
254        # are lists of durations, VALVES_PER_SIDE long. Will produce two lists
255        # VALVES_PER_SIDE long so that each valve in the rig is assigned a duration.
256        for key, value in file_durations.items():
257            if key == "side_one_durations":
258                for i, duration in enumerate(value):
259                    dur_side_one[i] = duration
260            elif key == "side_two_durations":
261                for i, duration in enumerate(value):
262                    dur_side_two[i] = duration
263
264        date_used = file_durations["date_used"]
265
266        return dur_side_one, dur_side_two, date_used

Loads a specific valve duration profile from the valve_durations.toml file.

Retrieves the durations for side one and side two, along with the timestamp when that profile was created. Converts the lists from the TOML file back into NumPy np.int32 arrays for efficient operations.

Parameters

  • type_durations (str, optional): The key corresponding to the desired profile in the TOML file (e.g., "selected_durations", "default_durations", "archive_1"). Defaults to "selected_durations".

Returns

  • tuple[npt.NDArray[np.int32], npt.NDArray[np.int32], datetime.datetime]: A tuple containing:
    • Numpy array of durations for side one.
    • Numpy array of durations for side two.
    • The datetime object indicating when the loaded profile was saved.

Raises

  • FileNotFoundError: If the valve_durations.toml file cannot be found.
  • toml.TomlDecodeError: If the TOML file is malformed.
  • IOError: If there are issues reading the TOML file.
  • KeyError: If the specified type_durations key or expected sub-keys (side_one_durations, date_used, etc.) are missing.
def load_schedule_indices( self) -> tuple[numpy.ndarray[tuple[int, ...], numpy.dtype[numpy.int8]], numpy.ndarray[tuple[int, ...], numpy.dtype[numpy.int8]]]:
268    def load_schedule_indices(
269        self,
270    ) -> tuple[npt.NDArray[np.int8], npt.NDArray[np.int8]]:
271        """
272        Generates 0-indexed valve schedule arrays based on the experiment schedule.
273
274        Uses the stimuli names and the trial-by-trial schedule defined in the
275        associated `ExperimentProcessData` object (`self.exp_data`) to create two NumPy arrays.
276        The index for side one is first found, then the associated valve is found via `exp_data.get_paired_index`.
277
278        These arrays represent the 0-indexed valve number to be activated for each trial
279        on side one and side two, respectively. This format is suitable for sending
280        to the Arduino.
281
282        Returns
283        -------
284        - *tuple[npt.NDArray[np.int8], npt.NDArray[np.int8]]*: A tuple containing:
285            - Numpy array of 0-indexed valve numbers for side one schedule.
286            - Numpy array of 0-indexed valve numbers for side two schedule.
287
288        Raises
289        ------
290        - *AttributeError*: If `self.exp_data` or its required attributes/methods (like `stimuli_data`, `program_schedule_df`, `get_paired_index`) are missing or invalid.
291        - *KeyError*: If expected keys (like 'Num Stimuli', 'Port 1') are missing in `exp_data`.
292        - *ValueError*: If a stimulus name from the schedule is not found in the unique stimuli list.
293        """
294        stimuli_data = self.exp_data.stimuli_data
295        schedule_df = self.exp_data.program_schedule_df
296
297        num_stimuli = self.exp_data.exp_var_entries["Num Stimuli"]
298        num_trials = self.exp_data.exp_var_entries["Num Trials"]
299
300        sched_side_one = np.full(num_trials, np.int8(0))
301        sched_side_two = np.full(num_trials, np.int8(0))
302
303        # get unique stim names. this will give us all names on side one.
304        # finding the index of a given stimuli appearing in a trial on side
305        # one will easily allow us to find the paired index. We've already done
306        # this before in exp_process_data
307        names = list(stimuli_data.stimuli_vars.values())
308        unique_names = names[: num_stimuli // 2]
309
310        side_one_trial_stimuli = schedule_df["Port 1"].to_numpy()
311
312        for i, trial_stim in enumerate(side_one_trial_stimuli):
313            index = unique_names.index(trial_stim)
314            sd_two_index = self.exp_data.get_paired_index(index, num_stimuli)
315            sched_side_one[i] = index
316            sched_side_two[i] = sd_two_index
317
318        return sched_side_one, sched_side_two

Generates 0-indexed valve schedule arrays based on the experiment schedule.

Uses the stimuli names and the trial-by-trial schedule defined in the associated ExperimentProcessData object (self.exp_data) to create two NumPy arrays. The index for side one is first found, then the associated valve is found via exp_data.get_paired_index.

These arrays represent the 0-indexed valve number to be activated for each trial on side one and side two, respectively. This format is suitable for sending to the Arduino.

Returns

  • tuple[npt.NDArray[np.int8], npt.NDArray[np.int8]]: A tuple containing:
    • Numpy array of 0-indexed valve numbers for side one schedule.
    • Numpy array of 0-indexed valve numbers for side two schedule.

Raises

  • AttributeError: If self.exp_data or its required attributes/methods (like stimuli_data, program_schedule_df, get_paired_index) are missing or invalid.
  • KeyError: If expected keys (like 'Num Stimuli', 'Port 1') are missing in exp_data.
  • ValueError: If a stimulus name from the schedule is not found in the unique stimuli list.
def increment_licks(self, side: int, event_data: models.event_data.EventData):
320    def increment_licks(self, side: int, event_data: EventData):
321        """
322        Increments the lick counter for the specified side within the EventData object.
323
324        Parameters
325        ----------
326        - **side** (*int*): The side where the lick occurred (0 for side one, 1 for side two).
327        - **event_data** (*EventData*): The EventData instance associated with `self.exp_data` where lick counts are stored.
328        """
329        match side:
330            case 0:
331                event_data.side_one_licks += 1
332            case 1:
333                event_data.side_two_licks += 1
334            case _:
335                logger.error("invalid side")

Increments the lick counter for the specified side within the EventData object.

Parameters

  • side (int): The side where the lick occurred (0 for side one, 1 for side two).
  • event_data (EventData): The EventData instance associated with self.exp_data where lick counts are stored.
def handle_licks( self, split_data: list[str], event_data: models.event_data.EventData, state: str, trigger: Callable) -> None:
337    def handle_licks(
338        self,
339        split_data: list[str],
340        event_data: EventData,
341        state: str,
342        trigger: Callable,
343    ) -> None:
344        """
345        Parses and records data specifically identified as a lick event.
346
347        Extracts side, lick duration, timestamps, and valve duration (if in "SAMPLE" state)
348        from the `split_data` list based on the current experiment `state` ("TTC" or "SAMPLE").
349        Calls `increment_licks` and `record_event`. For "TTC" state, it checks if
350        the lick count threshold is met to trigger a state change to "SAMPLE".
351
352        Parameters
353        ----------
354        - **split_data** (*list[str]*): The list of strings resulting from splitting the incoming Arduino data by content separator '|'.
355        - **event_data** (*EventData*): The EventData instance for accessing lick counts.
356        - **state** (*str*): The current state of the experiment FSM (e.g., "TTC", "SAMPLE").
357        - **trigger** (*Callable*): The state machine's trigger function, used here to transition state to "SAMPLE" state based on lick counts.
358
359        Raises
360        ------
361        - *IndexError*: If `split_data` does not contain the expected number of elements for the given state.
362        - *ValueError*: If elements in `split_data` cannot be converted to the expected types (int, np.int8, np.int32).
363        """
364        side = None
365        valve_duration = None
366        match state:
367            case "TTC":
368                # split_data will have the below form for ttc
369                # 0(side)|67(duration)|6541(stamp_rel_to_start)|6541(stamp_rel_to_trial)
370                # 1|16|0|496570|41 from arduino
371                side = np.int8(split_data[0])
372                self.increment_licks(side, event_data)
373
374                lick_duration = np.int32(split_data[1])
375                time_rel_to_start = np.int32(split_data[2]) / 1000
376                time_rel_to_trial = np.int32(split_data[3]) / 1000
377
378                # insert the values held in licks for respective sides in a shorter variable name
379                side_one = self.exp_data.event_data.side_one_licks
380                side_two = self.exp_data.event_data.side_two_licks
381                # if 3 or more licks in a ttc time, jump straight to sample
382                if side_one > 2 or side_two > 2:
383                    trigger("SAMPLE")
384                self.record_event(
385                    side, lick_duration, time_rel_to_start, time_rel_to_trial, state
386                )
387
388            case "SAMPLE":
389                try:
390                    # 0|87|26064|8327|8327
391                    side = np.int8(split_data[0])
392                    self.increment_licks(side, event_data)
393
394                    lick_duration = np.int32(split_data[1])
395                    valve_duration = np.int32(split_data[2])
396                    time_rel_to_start = np.int32(split_data[3]) / 1000
397                    time_rel_to_trial = np.int32(split_data[4]) / 1000
398
399                    self.record_event(
400                        side,
401                        lick_duration,
402                        time_rel_to_start,
403                        time_rel_to_trial,
404                        state,
405                        valve_duration,
406                    )
407
408                except Exception as e:
409                    logging.error(f"IMPROPER DATA.... IGNORING.....{e}")

Parses and records data specifically identified as a lick event.

Extracts side, lick duration, timestamps, and valve duration (if in "SAMPLE" state) from the split_data list based on the current experiment state ("TTC" or "SAMPLE"). Calls increment_licks and record_event. For "TTC" state, it checks if the lick count threshold is met to trigger a state change to "SAMPLE".

Parameters

  • split_data (list[str]): The list of strings resulting from splitting the incoming Arduino data by content separator '|'.
  • event_data (EventData): The EventData instance for accessing lick counts.
  • state (str): The current state of the experiment FSM (e.g., "TTC", "SAMPLE").
  • trigger (Callable): The state machine's trigger function, used here to transition state to "SAMPLE" state based on lick counts.

Raises

  • IndexError: If split_data does not contain the expected number of elements for the given state.
  • ValueError: If elements in split_data cannot be converted to the expected types (int, np.int8, np.int32).
def record_event( self, side: int, duration: numpy.int32, time_rel_to_start: float, time_rel_to_trial: float, state: str, valve_dur: numpy.int32 | None = None):
411    def record_event(
412        self,
413        side: int,
414        duration: np.int32,
415        time_rel_to_start: float,
416        time_rel_to_trial: float,
417        state: str,
418        valve_dur: np.int32 | None = None,
419    ):
420        """
421        Records a processed event (lick or motor movement) into the `ExperimentProcessData` `EventData` DataFrame.
422
423        Uses the `insert_row_into_df` method of the `EventData` object (`self.exp_data.event_data`)
424        to add a new row containing the details of the event.
425
426        Parameters
427        ----------
428        - **side** (*int*): The side associated with the event (0 for side one, 1 for side two).
429        - **duration** (*np.int32*): The duration of the event (e.g., lick contact time) in milliseconds.
430        - **time_rel_to_start** (*float*): The timestamp of the event relative to the start of the entire program, in seconds.
431        - **time_rel_to_trial** (*float*): The timestamp of the event relative to the start of the current trial, in seconds.
432        - **state** (*str*): A string describing the state during the event (e.g., "TTC", "SAMPLE").
433        - **valve_dur** (*np.int32 | None, optional*): The duration the valve was open for this event (microseconds),
434        if applicable (e.g., during "SAMPLE" state licks). Defaults to None.
435
436        Raises
437        ------
438        - Propagates exceptions from `event_data.insert_row_into_df`.
439        """
440        try:
441            event_data = self.exp_data.event_data
442            current_trial = self.exp_data.current_trial_number
443            # insert the lick record into the dataframe
444            event_data.insert_row_into_df(
445                current_trial,
446                side + 1,
447                duration,
448                time_rel_to_start,
449                time_rel_to_trial,
450                state,
451                valve_duration=(valve_dur if valve_dur else None),
452            )
453
454            logging.info(f"Lick data recorded for side: {side + 1}")
455        except Exception as e:
456            logging.error(f"Error recording lick data: {e}")
457            raise

Records a processed event (lick or motor movement) into the ExperimentProcessData EventData DataFrame.

Uses the insert_row_into_df method of the EventData object (self.exp_data.event_data) to add a new row containing the details of the event.

Parameters

  • side (int): The side associated with the event (0 for side one, 1 for side two).
  • duration (np.int32): The duration of the event (e.g., lick contact time) in milliseconds.
  • time_rel_to_start (float): The timestamp of the event relative to the start of the entire program, in seconds.
  • time_rel_to_trial (float): The timestamp of the event relative to the start of the current trial, in seconds.
  • state (str): A string describing the state during the event (e.g., "TTC", "SAMPLE").
  • valve_dur (np.int32 | None, optional): The duration the valve was open for this event (microseconds), if applicable (e.g., during "SAMPLE" state licks). Defaults to None.

Raises

  • Propagates exceptions from event_data.insert_row_into_df.
def process_data(self, source: str, data: str, state: str, trigger: Callable) -> None:
459    def process_data(
460        self, source: str, data: str, state: str, trigger: Callable
461    ) -> None:
462        """
463        Processes incoming data strings received from the Arduino via the serial connection.
464
465        Splits the raw data string by the '|' content separator. Determines the type of
466        data based on the first element (`split_data[0]`). Directly handles "MOTOR" events,
467        and routes lick data (`split_data[0]` is '0' or '1')
468        to `handle_licks` for further processing.
469
470        Parameters
471        ----------
472        - **source** (*str*): Identifier for the source of the data. Included for logging/debugging.
473        - **data** (*str*): The raw data string received from the Arduino.
474        - **state** (*str*): The current state of the experiment FSM, passed to handlers like `handle_licks`.
475        - **trigger** (*Callable*): The state machine's trigger function, passed to handlers like `handle_licks`.
476
477        Raises
478        ------
479        - *IndexError*: If `data.split('|')` results in an empty list or accessing `split_data[0]` fails.
480        - Propagates exceptions from `handle_licks` or `record_event`.
481        """
482        event_data = self.exp_data.event_data
483        split_data = data.split("|")
484        try:
485            if split_data[0] == "MOTOR":
486                # data will arrive in the following format
487                # MOTOR|DOWN|2338|7340|7340
488                # MOTOR|MOVEMENT|DURATION|END_TIME_REL_TO_PROG_START|END_TIME_REL_TO_PROG_TRIAL_START
489                event_data = self.exp_data.event_data
490
491                current_trial = self.exp_data.current_trial_number
492
493                event_data.insert_row_into_df(
494                    current_trial,
495                    None,
496                    np.int32(split_data[2]),
497                    (np.int32(split_data[3]) / 1000),
498                    (np.int32(split_data[4]) / 1000),
499                    f"MOTOR {split_data[1]}",
500                )
501            elif split_data[0] == "0" or split_data[0] == "1":
502                self.handle_licks(split_data, event_data, state, trigger)
503
504        except Exception as e:
505            logging.error(f"Error processing data from {source}: {e}")
506            raise

Processes incoming data strings received from the Arduino via the serial connection.

Splits the raw data string by the '|' content separator. Determines the type of data based on the first element (split_data[0]). Directly handles "MOTOR" events, and routes lick data (split_data[0] is '0' or '1') to handle_licks for further processing.

Parameters

  • source (str): Identifier for the source of the data. Included for logging/debugging.
  • data (str): The raw data string received from the Arduino.
  • state (str): The current state of the experiment FSM, passed to handlers like handle_licks.
  • trigger (Callable): The state machine's trigger function, passed to handlers like handle_licks.

Raises

  • IndexError: If data.split('|') results in an empty list or accessing split_data[0] fails.
  • Propagates exceptions from handle_licks or record_event.