arduino_control

This module is used to establish communication with the Arduino board. It sends information like schedule, experiment variable, and valve duration information to the boar, along with other commands important for the operation of the Experiment Rig.

  1"""
  2This module is used to establish communication with the Arduino board. It sends information like schedule, experiment variable, and
  3valve duration information to the boar, along with other commands important for the operation of the Experiment Rig.
  4"""
  5
  6import serial
  7import serial.tools.list_ports
  8import time
  9import threading
 10import queue
 11import logging
 12import numpy as np
 13
 14### USED FOR TYPE HINTING ###
 15from models.experiment_process_data import ExperimentProcessData
 16import numpy.typing as npt
 17### USED FOR TYPE HINTING ###
 18
 19from views.gui_common import GUIUtils
 20
 21
 22logger = logging.getLogger(__name__)
 23
 24# each side has 8 valves, therefore 8 durations in each array
 25VALVES_PER_SIDE = 8
 26
 27
 28class ArduinoManager:
 29    """
 30    This class establishes a Serial connection to the Arduino board and facilitates communication of information between the board and the controlling
 31    PC. All actions involving the Arduino are managed here.
 32
 33    Attributes
 34    ----------
 35    - **BAUD_RATE** (*int*): This holds the class constant baud rate which is essentially the max rate of communication between the Arduino and the PC.
 36    - **arduino** (*serial.Serial*): This variable hold the serial.Serial information for the Arduino. This includes what physical port the board is
 37    connected to, baud rate and more.
 38    - **exp_data** (*ExperimentProcessData*): An instance of `models.experiment_process_data` ExperimentProcessData, this variable allows access to
 39    important program attributes like num_trials and program schedules that need to be send to the arduino board.
 40    - **arduino_data** (*ArduinoData*): This is a reference to the program instance of `models.arduino_data` ArduinoData. It allows access to this
 41    class and its methods which allows ArduinoManager to load data stored there such as valve duration times and schedule indicies.
 42    - **data_queue** (*queue.Queue[tuple[str, str]]*): This is the queue that facilitates data transmission between the arduino's `listener_thread`,
 43    which constantly listens for any data coming from the arduino board. This queue is processed in the `app_logic` method `process_queue`. This allows
 44    the main thread to do other important work, only processing arduino information every so often, if there is any to process.
 45    - **stop_event** (*threading.Event*): This event is set in the class method `stop_listener_thread`. This is a thread safe data type that allows
 46    us to exit the listener thread to avoid leaving threads busy when exiting the main application.
 47    - **listener_thread** (*threading.Thread | None*): Previously discussed peripherally, this is the thread that listens constantly for new information
 48    from the arduino board. The threads target method is the `listen_for_serial` method.
 49
 50    Methods
 51    -------
 52    - `connect_to_arduino`()
 53        Scans available ports for devices named 'Arduino' to establish a connection with the Arduino board.
 54    - `listen_for_serial`()
 55        Continuously listens for incoming serial data from the Arduino, adding received messages to `data_queue`.
 56    - `stop_listener_thread`()
 57        Signals the listener thread to stop and safely joins it back to the main thread.
 58    - `reset_arduino`()
 59        Sends a reset command to the Arduino board. Used after connection is established to clear any residual data on the board.
 60    - `close_connection`()
 61        Closes the serial connection to the Arduino board.
 62    - `send_experiment_variables`()
 63        Transmits experimental variables (number of stimuli and trials) to the Arduino for schedule configuration.
 64    - `send_schedule_data`()
 65        Sends valve schedule data to the Arduino to define which valve should open on either side for a given trial.
 66    - `send_valve_durations`()
 67        Sends the calculated duration settings for each valve to the Arduino.
 68    - `verify_schedule`(side_one: npt.NDArray[np.int8], side_two: npt.NDArray[np.int8])
 69        Requests and verifies the valve schedule received by the Arduino by comparing against the sent values.
 70    - `verify_durations`(side_one: npt.NDArray[np.int32], side_two: npt.NDArray[np.int32])
 71        Requests and verifies the valve durations received by the Arduino by comparing against the sent values.
 72    - `send_command`(command: bytes)
 73        Sends a given command to the Arduino, ensuring communication reliability.
 74    """
 75
 76    def __init__(self, exp_data: ExperimentProcessData) -> None:
 77        """
 78        Initialize and handle the ArduinoManager class. Here we establish the Arduino connection and reset the board to clear any
 79        residual data left on the Arduino board from previous experimental runs.
 80
 81        Parameters
 82        ----------
 83        - **exp_data** (*ExperimentProcessData*): An instance of `models.experiment_process_data` ExperimentProcessData. Used to access
 84        experiment variables and send them to the Arduino board.
 85        """
 86        self.BAUD_RATE: int = 115200
 87        self.arduino: None | serial.Serial = None
 88
 89        self.exp_data = exp_data
 90        self.arduino_data = exp_data.arduino_data
 91
 92        self.data_queue: queue.Queue[tuple[str, str]] = queue.Queue()
 93        self.stop_event: threading.Event = threading.Event()
 94        self.listener_thread: threading.Thread | None = None
 95
 96        # connect to the Arduino board if it is connected to the PC.
 97        self.connect_to_arduino()
 98
 99        # reset the board fully to avoid improper communication on program 'reset'
100        reset_arduino = "RESET\n".encode("utf-8")
101        self.send_command(command=reset_arduino)
102
103    def connect_to_arduino(self) -> None:
104        """Connect to the Arduino board by searching all serial ports for a device with a manufacturer name 'Arduino'. If
105        found, establish serial.Serial connection. If not notify user."""
106        ports = serial.tools.list_ports.comports()
107        arduino_port = None
108
109        # find the arduino boards so we don't waste time trying to connect to empty ports
110        for p in ports:
111            if p.manufacturer is not None and "Arduino" in p.manufacturer:
112                arduino_port = p
113
114        # if we cannot find the arduino, no connection can be established, inform user and return.
115        if arduino_port is None:
116            error_message = (
117                "Arduino not connected. Reconnect Arduino and relaunch the program."
118            )
119            GUIUtils.display_error("Arduino Not Found", error_message)
120            logger.error(error_message)
121            return
122        else:
123            port = arduino_port.device
124
125            self.arduino = serial.Serial(port, self.BAUD_RATE)
126            logger.info(f"Arduino connected on port {port}")
127
128    def listen_for_serial(self) -> None:
129        """
130        Method to constantly scan for Arduino input. If received place in thread-save `data_queue` to process it later.
131        """
132        # if we do not have an arduino to listen to return and don't try to listen to it!
133        if self.arduino is None:
134            logger.error(
135                "ARDUINO COMMUNICATION ERROR!!! Class attribute 'arduino' is None."
136            )
137            return
138
139        while 1:
140            if self.stop_event.is_set():
141                break
142            try:
143                if self.arduino.in_waiting > 0:
144                    data = self.arduino.readline().decode("utf-8").strip()
145                    self.data_queue.put(("Arduino", data))
146
147                    # log the received data
148                    logger.info(f"Received -> {data} from arduino")
149            except Exception as e:
150                logger.error(f"Error reading from Arduino: {e}")
151                break
152            # sleep for a short time to avoid busy waiting
153            time.sleep(0.001)
154
155    def stop_listener_thread(self) -> None:
156        """
157        Method to set the stop event for the listener thread and
158        join it back to the main program thread.
159        """
160        self.stop_event.set()
161
162        if self.listener_thread is None:
163            return
164
165        if self.listener_thread.is_alive():
166            self.listener_thread.join()
167
168    def reset_arduino(self) -> None:
169        """
170        Send a reset command to the Arduino board.
171        """
172        try:
173            command = "RESET\n".encode("utf-8")
174            self.send_command(command)
175
176            logger.info("Arduino reset.")
177        except Exception as e:
178            error_msg = f"Error resetting Arduino: {e}"
179
180            GUIUtils.display_error("RESET ERROR", error_msg)
181            logger.error(error_msg)
182
183    def close_connection(self) -> None:
184        """Close the serial connection to the Arduino board."""
185        if self.arduino is not None:
186            self.arduino.close()
187        logger.info("Closed connections to Arduino.")
188
189    def send_experiment_variables(self):
190        """
191        This method is defined to send program variables num_stimuli and num_trials to
192        the Arduino. This is useful because it allows the Arduino to understand how long schedules should be (num_trials),
193        and what valve should be selected for side 2 (num_stimuli).
194        """
195        num_stimuli = self.exp_data.exp_var_entries["Num Stimuli"]
196        num_trials = self.exp_data.exp_var_entries["Num Trials"]
197
198        # 2 np 8 bit numbers converted to raw bytes
199        num_stimuli = np.int8(num_stimuli).tobytes()
200        num_trials = np.int8(num_trials).tobytes()
201
202        var_comm = "REC VAR\n".encode("utf-8")
203        self.send_command(var_comm)
204
205        packet = num_stimuli + num_trials
206        self.send_command(packet)
207
208    def send_schedule_data(self) -> None:
209        """
210        Method to send valve schedule to the arduino so it know which valve to
211        open on a given trial.
212        First load np schedule arrays for both sides, turn them into bytes, and
213        set them back to back in a packet. i.e side two will follow side one
214        """
215        side_one, side_two = self.arduino_data.load_schedule_indices()
216
217        schedule_packet: bytes = side_one.tobytes() + side_two.tobytes()
218
219        sched_command = "REC SCHED\n".encode("utf-8")
220
221        self.send_command(sched_command)
222
223        self.send_command(schedule_packet)
224
225        self.verify_schedule(side_one, side_two)
226
227    def send_valve_durations(self) -> None:
228        """
229        Get side_one and side_two durations from `models.arduino_data` Arduino data model
230        will load from arduino_data.toml from the last_used 'profile'.
231        """
232        side_one, side_two, _ = self.arduino_data.load_durations()
233
234        side_one_durs = side_one.tobytes()
235        side_two_durs = side_two.tobytes()
236
237        dur_packet = side_one_durs + side_two_durs
238
239        dur_command = "REC DURATIONS\n".encode("utf-8")
240
241        self.send_command(dur_command)
242
243        self.send_command(dur_packet)
244
245        self.verify_durations(side_one, side_two)
246
247    def verify_schedule(
248        self, side_one: npt.NDArray[np.int8], side_two: npt.NDArray[np.int8]
249    ) -> None:
250        """
251        Method to tell arduino to give us the schedules that it
252        recieved. It will send data byte by byte in the order that it
253        recieved it (side one schedule, then side two) it will send EXACTLY
254        num_trials * 2 bytes (8 bit / 1 byte int for each trial on each side).
255        if successful we continue execution and log success message.
256        """
257        try:
258            ver_sched_command = "VER SCHED\n".encode("utf-8")
259            self.send_command(ver_sched_command)
260
261            num_trials = self.exp_data.exp_var_entries["Num Trials"]
262
263            ver1 = np.zeros((num_trials,), dtype=np.int8)
264            ver2 = np.zeros((num_trials,), dtype=np.int8)
265
266            if self.arduino is None:
267                msg = "ARDUINO IS NOT CONNECTED! Try reconnecting and restart the program."
268                logger.error(msg)
269                GUIUtils.display_error("ARDUINO ERROR", msg)
270
271                return
272
273            # wait for the data to arrive
274            while self.arduino.in_waiting < 0:
275                pass
276
277            for i in range(num_trials):
278                ver1[i] = int.from_bytes(
279                    self.arduino.read(), byteorder="little", signed=False
280                )
281            for i in range(num_trials):
282                ver2[i] = int.from_bytes(
283                    self.arduino.read(), byteorder="little", signed=False
284                )
285            logger.info(f"Arduino recieved side one as => {ver1}")
286            logger.info(f"Arduino recieved side two as => {ver2}")
287
288            if np.array_equal(side_one, ver1) and np.array_equal(side_two, ver2):
289                logger.info(
290                    "arduino has recieved and verified experiment valve schedule"
291                )
292            else:
293                GUIUtils.display_error(
294                    "======SCHEDULE ERROR======",
295                    "ARDUINO did not recieve the correct schedule. Please restart the program and attempt\
296                        schedule generation again.",
297                )
298        except Exception as e:
299            logger.error(f"error verifying arduino schedule {e}")
300
301    def verify_durations(
302        self, side_one: npt.NDArray[np.int32], side_two: npt.NDArray[np.int32]
303    ) -> None:
304        """
305        Method to tell arduino to give us the schedules that it
306        recieved. It will send data byte by byte in the order that it
307        recieved it (side one schedule, then side two) it will send EXACTLY
308        num_trials * 2 bytes (8 bit / 1 byte int for each trial on each side).
309        if successful we continue execution and log success message.
310        """
311        try:
312            ver_dur_command = "VER DURATIONS\n".encode("utf-8")
313            self.send_command(ver_dur_command)
314
315            ver1 = np.zeros((VALVES_PER_SIDE,), dtype=np.int32)
316            ver2 = np.zeros((VALVES_PER_SIDE,), dtype=np.int32)
317
318            if self.arduino is None:
319                msg = "ARDUINO IS NOT CONNECTED! Try reconnecting and restart the program."
320                logger.error(msg)
321                GUIUtils.display_error("ARDUINO ERROR", msg)
322
323                return
324
325            # wait for the data to arrive
326            while self.arduino.in_waiting < 4 * 8:
327                pass
328            for i in range(VALVES_PER_SIDE):
329                duration = int.from_bytes(
330                    self.arduino.read(4), byteorder="little", signed=False
331                )
332                ver1[i] = duration
333            for i in range(VALVES_PER_SIDE):
334                duration = int.from_bytes(
335                    self.arduino.read(4), byteorder="little", signed=False
336                )
337                ver2[i] = duration
338
339            logger.info(f"Arduino recieved side one as => {ver1}")
340            logger.info(f"Arduino recieved side two as => {ver2}")
341
342            if np.array_equal(side_one, ver1) and np.array_equal(side_two, ver2):
343                logger.info("arduino has recieved and verified valve durations")
344            else:
345                GUIUtils.display_error(
346                    "======DURATIONS ERROR======",
347                    "ARDUINO did not recieve the correct valve durations. Please restart the program and attempt\
348                        schedule generation again.",
349                )
350        except Exception as e:
351            logger.error(f"error verifying arduino durations -> {e}")
352
353    def send_command(self, command: bytes):
354        """
355        Send a specific command to the Arduino. Command must be converted to raw bytes object
356        before being passed into this method
357        """
358        if self.arduino is None:
359            error_message = "Arduino connection was not established. Please reconnect the Arduino board and restart the program."
360            GUIUtils.display_error(
361                "====ARDUINO COMMUNICATION ERROR====:", error_message
362            )
363            logger.error(error_message)
364            return
365
366        try:
367            self.arduino.write(command)
368            logger.info(f"Sent {command} to arduino on -> {self.arduino.port}: ")
369        except Exception as e:
370            error_message = f"Error sending command to {self.arduino.port} Arduino: {e}"
371            GUIUtils.display_error("Error sending command to Arduino:", error_message)
372            logger.error(error_message)
logger = <Logger arduino_control (INFO)>
VALVES_PER_SIDE = 8
class ArduinoManager:
 29class ArduinoManager:
 30    """
 31    This class establishes a Serial connection to the Arduino board and facilitates communication of information between the board and the controlling
 32    PC. All actions involving the Arduino are managed here.
 33
 34    Attributes
 35    ----------
 36    - **BAUD_RATE** (*int*): This holds the class constant baud rate which is essentially the max rate of communication between the Arduino and the PC.
 37    - **arduino** (*serial.Serial*): This variable hold the serial.Serial information for the Arduino. This includes what physical port the board is
 38    connected to, baud rate and more.
 39    - **exp_data** (*ExperimentProcessData*): An instance of `models.experiment_process_data` ExperimentProcessData, this variable allows access to
 40    important program attributes like num_trials and program schedules that need to be send to the arduino board.
 41    - **arduino_data** (*ArduinoData*): This is a reference to the program instance of `models.arduino_data` ArduinoData. It allows access to this
 42    class and its methods which allows ArduinoManager to load data stored there such as valve duration times and schedule indicies.
 43    - **data_queue** (*queue.Queue[tuple[str, str]]*): This is the queue that facilitates data transmission between the arduino's `listener_thread`,
 44    which constantly listens for any data coming from the arduino board. This queue is processed in the `app_logic` method `process_queue`. This allows
 45    the main thread to do other important work, only processing arduino information every so often, if there is any to process.
 46    - **stop_event** (*threading.Event*): This event is set in the class method `stop_listener_thread`. This is a thread safe data type that allows
 47    us to exit the listener thread to avoid leaving threads busy when exiting the main application.
 48    - **listener_thread** (*threading.Thread | None*): Previously discussed peripherally, this is the thread that listens constantly for new information
 49    from the arduino board. The threads target method is the `listen_for_serial` method.
 50
 51    Methods
 52    -------
 53    - `connect_to_arduino`()
 54        Scans available ports for devices named 'Arduino' to establish a connection with the Arduino board.
 55    - `listen_for_serial`()
 56        Continuously listens for incoming serial data from the Arduino, adding received messages to `data_queue`.
 57    - `stop_listener_thread`()
 58        Signals the listener thread to stop and safely joins it back to the main thread.
 59    - `reset_arduino`()
 60        Sends a reset command to the Arduino board. Used after connection is established to clear any residual data on the board.
 61    - `close_connection`()
 62        Closes the serial connection to the Arduino board.
 63    - `send_experiment_variables`()
 64        Transmits experimental variables (number of stimuli and trials) to the Arduino for schedule configuration.
 65    - `send_schedule_data`()
 66        Sends valve schedule data to the Arduino to define which valve should open on either side for a given trial.
 67    - `send_valve_durations`()
 68        Sends the calculated duration settings for each valve to the Arduino.
 69    - `verify_schedule`(side_one: npt.NDArray[np.int8], side_two: npt.NDArray[np.int8])
 70        Requests and verifies the valve schedule received by the Arduino by comparing against the sent values.
 71    - `verify_durations`(side_one: npt.NDArray[np.int32], side_two: npt.NDArray[np.int32])
 72        Requests and verifies the valve durations received by the Arduino by comparing against the sent values.
 73    - `send_command`(command: bytes)
 74        Sends a given command to the Arduino, ensuring communication reliability.
 75    """
 76
 77    def __init__(self, exp_data: ExperimentProcessData) -> None:
 78        """
 79        Initialize and handle the ArduinoManager class. Here we establish the Arduino connection and reset the board to clear any
 80        residual data left on the Arduino board from previous experimental runs.
 81
 82        Parameters
 83        ----------
 84        - **exp_data** (*ExperimentProcessData*): An instance of `models.experiment_process_data` ExperimentProcessData. Used to access
 85        experiment variables and send them to the Arduino board.
 86        """
 87        self.BAUD_RATE: int = 115200
 88        self.arduino: None | serial.Serial = None
 89
 90        self.exp_data = exp_data
 91        self.arduino_data = exp_data.arduino_data
 92
 93        self.data_queue: queue.Queue[tuple[str, str]] = queue.Queue()
 94        self.stop_event: threading.Event = threading.Event()
 95        self.listener_thread: threading.Thread | None = None
 96
 97        # connect to the Arduino board if it is connected to the PC.
 98        self.connect_to_arduino()
 99
100        # reset the board fully to avoid improper communication on program 'reset'
101        reset_arduino = "RESET\n".encode("utf-8")
102        self.send_command(command=reset_arduino)
103
104    def connect_to_arduino(self) -> None:
105        """Connect to the Arduino board by searching all serial ports for a device with a manufacturer name 'Arduino'. If
106        found, establish serial.Serial connection. If not notify user."""
107        ports = serial.tools.list_ports.comports()
108        arduino_port = None
109
110        # find the arduino boards so we don't waste time trying to connect to empty ports
111        for p in ports:
112            if p.manufacturer is not None and "Arduino" in p.manufacturer:
113                arduino_port = p
114
115        # if we cannot find the arduino, no connection can be established, inform user and return.
116        if arduino_port is None:
117            error_message = (
118                "Arduino not connected. Reconnect Arduino and relaunch the program."
119            )
120            GUIUtils.display_error("Arduino Not Found", error_message)
121            logger.error(error_message)
122            return
123        else:
124            port = arduino_port.device
125
126            self.arduino = serial.Serial(port, self.BAUD_RATE)
127            logger.info(f"Arduino connected on port {port}")
128
129    def listen_for_serial(self) -> None:
130        """
131        Method to constantly scan for Arduino input. If received place in thread-save `data_queue` to process it later.
132        """
133        # if we do not have an arduino to listen to return and don't try to listen to it!
134        if self.arduino is None:
135            logger.error(
136                "ARDUINO COMMUNICATION ERROR!!! Class attribute 'arduino' is None."
137            )
138            return
139
140        while 1:
141            if self.stop_event.is_set():
142                break
143            try:
144                if self.arduino.in_waiting > 0:
145                    data = self.arduino.readline().decode("utf-8").strip()
146                    self.data_queue.put(("Arduino", data))
147
148                    # log the received data
149                    logger.info(f"Received -> {data} from arduino")
150            except Exception as e:
151                logger.error(f"Error reading from Arduino: {e}")
152                break
153            # sleep for a short time to avoid busy waiting
154            time.sleep(0.001)
155
156    def stop_listener_thread(self) -> None:
157        """
158        Method to set the stop event for the listener thread and
159        join it back to the main program thread.
160        """
161        self.stop_event.set()
162
163        if self.listener_thread is None:
164            return
165
166        if self.listener_thread.is_alive():
167            self.listener_thread.join()
168
169    def reset_arduino(self) -> None:
170        """
171        Send a reset command to the Arduino board.
172        """
173        try:
174            command = "RESET\n".encode("utf-8")
175            self.send_command(command)
176
177            logger.info("Arduino reset.")
178        except Exception as e:
179            error_msg = f"Error resetting Arduino: {e}"
180
181            GUIUtils.display_error("RESET ERROR", error_msg)
182            logger.error(error_msg)
183
184    def close_connection(self) -> None:
185        """Close the serial connection to the Arduino board."""
186        if self.arduino is not None:
187            self.arduino.close()
188        logger.info("Closed connections to Arduino.")
189
190    def send_experiment_variables(self):
191        """
192        This method is defined to send program variables num_stimuli and num_trials to
193        the Arduino. This is useful because it allows the Arduino to understand how long schedules should be (num_trials),
194        and what valve should be selected for side 2 (num_stimuli).
195        """
196        num_stimuli = self.exp_data.exp_var_entries["Num Stimuli"]
197        num_trials = self.exp_data.exp_var_entries["Num Trials"]
198
199        # 2 np 8 bit numbers converted to raw bytes
200        num_stimuli = np.int8(num_stimuli).tobytes()
201        num_trials = np.int8(num_trials).tobytes()
202
203        var_comm = "REC VAR\n".encode("utf-8")
204        self.send_command(var_comm)
205
206        packet = num_stimuli + num_trials
207        self.send_command(packet)
208
209    def send_schedule_data(self) -> None:
210        """
211        Method to send valve schedule to the arduino so it know which valve to
212        open on a given trial.
213        First load np schedule arrays for both sides, turn them into bytes, and
214        set them back to back in a packet. i.e side two will follow side one
215        """
216        side_one, side_two = self.arduino_data.load_schedule_indices()
217
218        schedule_packet: bytes = side_one.tobytes() + side_two.tobytes()
219
220        sched_command = "REC SCHED\n".encode("utf-8")
221
222        self.send_command(sched_command)
223
224        self.send_command(schedule_packet)
225
226        self.verify_schedule(side_one, side_two)
227
228    def send_valve_durations(self) -> None:
229        """
230        Get side_one and side_two durations from `models.arduino_data` Arduino data model
231        will load from arduino_data.toml from the last_used 'profile'.
232        """
233        side_one, side_two, _ = self.arduino_data.load_durations()
234
235        side_one_durs = side_one.tobytes()
236        side_two_durs = side_two.tobytes()
237
238        dur_packet = side_one_durs + side_two_durs
239
240        dur_command = "REC DURATIONS\n".encode("utf-8")
241
242        self.send_command(dur_command)
243
244        self.send_command(dur_packet)
245
246        self.verify_durations(side_one, side_two)
247
248    def verify_schedule(
249        self, side_one: npt.NDArray[np.int8], side_two: npt.NDArray[np.int8]
250    ) -> None:
251        """
252        Method to tell arduino to give us the schedules that it
253        recieved. It will send data byte by byte in the order that it
254        recieved it (side one schedule, then side two) it will send EXACTLY
255        num_trials * 2 bytes (8 bit / 1 byte int for each trial on each side).
256        if successful we continue execution and log success message.
257        """
258        try:
259            ver_sched_command = "VER SCHED\n".encode("utf-8")
260            self.send_command(ver_sched_command)
261
262            num_trials = self.exp_data.exp_var_entries["Num Trials"]
263
264            ver1 = np.zeros((num_trials,), dtype=np.int8)
265            ver2 = np.zeros((num_trials,), dtype=np.int8)
266
267            if self.arduino is None:
268                msg = "ARDUINO IS NOT CONNECTED! Try reconnecting and restart the program."
269                logger.error(msg)
270                GUIUtils.display_error("ARDUINO ERROR", msg)
271
272                return
273
274            # wait for the data to arrive
275            while self.arduino.in_waiting < 0:
276                pass
277
278            for i in range(num_trials):
279                ver1[i] = int.from_bytes(
280                    self.arduino.read(), byteorder="little", signed=False
281                )
282            for i in range(num_trials):
283                ver2[i] = int.from_bytes(
284                    self.arduino.read(), byteorder="little", signed=False
285                )
286            logger.info(f"Arduino recieved side one as => {ver1}")
287            logger.info(f"Arduino recieved side two as => {ver2}")
288
289            if np.array_equal(side_one, ver1) and np.array_equal(side_two, ver2):
290                logger.info(
291                    "arduino has recieved and verified experiment valve schedule"
292                )
293            else:
294                GUIUtils.display_error(
295                    "======SCHEDULE ERROR======",
296                    "ARDUINO did not recieve the correct schedule. Please restart the program and attempt\
297                        schedule generation again.",
298                )
299        except Exception as e:
300            logger.error(f"error verifying arduino schedule {e}")
301
302    def verify_durations(
303        self, side_one: npt.NDArray[np.int32], side_two: npt.NDArray[np.int32]
304    ) -> None:
305        """
306        Method to tell arduino to give us the schedules that it
307        recieved. It will send data byte by byte in the order that it
308        recieved it (side one schedule, then side two) it will send EXACTLY
309        num_trials * 2 bytes (8 bit / 1 byte int for each trial on each side).
310        if successful we continue execution and log success message.
311        """
312        try:
313            ver_dur_command = "VER DURATIONS\n".encode("utf-8")
314            self.send_command(ver_dur_command)
315
316            ver1 = np.zeros((VALVES_PER_SIDE,), dtype=np.int32)
317            ver2 = np.zeros((VALVES_PER_SIDE,), dtype=np.int32)
318
319            if self.arduino is None:
320                msg = "ARDUINO IS NOT CONNECTED! Try reconnecting and restart the program."
321                logger.error(msg)
322                GUIUtils.display_error("ARDUINO ERROR", msg)
323
324                return
325
326            # wait for the data to arrive
327            while self.arduino.in_waiting < 4 * 8:
328                pass
329            for i in range(VALVES_PER_SIDE):
330                duration = int.from_bytes(
331                    self.arduino.read(4), byteorder="little", signed=False
332                )
333                ver1[i] = duration
334            for i in range(VALVES_PER_SIDE):
335                duration = int.from_bytes(
336                    self.arduino.read(4), byteorder="little", signed=False
337                )
338                ver2[i] = duration
339
340            logger.info(f"Arduino recieved side one as => {ver1}")
341            logger.info(f"Arduino recieved side two as => {ver2}")
342
343            if np.array_equal(side_one, ver1) and np.array_equal(side_two, ver2):
344                logger.info("arduino has recieved and verified valve durations")
345            else:
346                GUIUtils.display_error(
347                    "======DURATIONS ERROR======",
348                    "ARDUINO did not recieve the correct valve durations. Please restart the program and attempt\
349                        schedule generation again.",
350                )
351        except Exception as e:
352            logger.error(f"error verifying arduino durations -> {e}")
353
354    def send_command(self, command: bytes):
355        """
356        Send a specific command to the Arduino. Command must be converted to raw bytes object
357        before being passed into this method
358        """
359        if self.arduino is None:
360            error_message = "Arduino connection was not established. Please reconnect the Arduino board and restart the program."
361            GUIUtils.display_error(
362                "====ARDUINO COMMUNICATION ERROR====:", error_message
363            )
364            logger.error(error_message)
365            return
366
367        try:
368            self.arduino.write(command)
369            logger.info(f"Sent {command} to arduino on -> {self.arduino.port}: ")
370        except Exception as e:
371            error_message = f"Error sending command to {self.arduino.port} Arduino: {e}"
372            GUIUtils.display_error("Error sending command to Arduino:", error_message)
373            logger.error(error_message)

This class establishes a Serial connection to the Arduino board and facilitates communication of information between the board and the controlling PC. All actions involving the Arduino are managed here.

Attributes

  • BAUD_RATE (int): This holds the class constant baud rate which is essentially the max rate of communication between the Arduino and the PC.
  • arduino (serial.Serial): This variable hold the serial.Serial information for the Arduino. This includes what physical port the board is connected to, baud rate and more.
  • exp_data (ExperimentProcessData): An instance of models.experiment_process_data ExperimentProcessData, this variable allows access to important program attributes like num_trials and program schedules that need to be send to the arduino board.
  • arduino_data (ArduinoData): This is a reference to the program instance of models.arduino_data ArduinoData. It allows access to this class and its methods which allows ArduinoManager to load data stored there such as valve duration times and schedule indicies.
  • data_queue (queue.Queue[tuple[str, str]]): This is the queue that facilitates data transmission between the arduino's listener_thread, which constantly listens for any data coming from the arduino board. This queue is processed in the app_logic method process_queue. This allows the main thread to do other important work, only processing arduino information every so often, if there is any to process.
  • stop_event (threading.Event): This event is set in the class method stop_listener_thread. This is a thread safe data type that allows us to exit the listener thread to avoid leaving threads busy when exiting the main application.
  • listener_thread (threading.Thread | None): Previously discussed peripherally, this is the thread that listens constantly for new information from the arduino board. The threads target method is the listen_for_serial method.

Methods

  • connect_to_arduino() Scans available ports for devices named 'Arduino' to establish a connection with the Arduino board.
  • listen_for_serial() Continuously listens for incoming serial data from the Arduino, adding received messages to data_queue.
  • stop_listener_thread() Signals the listener thread to stop and safely joins it back to the main thread.
  • reset_arduino() Sends a reset command to the Arduino board. Used after connection is established to clear any residual data on the board.
  • close_connection() Closes the serial connection to the Arduino board.
  • send_experiment_variables() Transmits experimental variables (number of stimuli and trials) to the Arduino for schedule configuration.
  • send_schedule_data() Sends valve schedule data to the Arduino to define which valve should open on either side for a given trial.
  • send_valve_durations() Sends the calculated duration settings for each valve to the Arduino.
  • verify_schedule(side_one: npt.NDArray[np.int8], side_two: npt.NDArray[np.int8]) Requests and verifies the valve schedule received by the Arduino by comparing against the sent values.
  • verify_durations(side_one: npt.NDArray[np.int32], side_two: npt.NDArray[np.int32]) Requests and verifies the valve durations received by the Arduino by comparing against the sent values.
  • send_command(command: bytes) Sends a given command to the Arduino, ensuring communication reliability.
ArduinoManager(exp_data: models.experiment_process_data.ExperimentProcessData)
 77    def __init__(self, exp_data: ExperimentProcessData) -> None:
 78        """
 79        Initialize and handle the ArduinoManager class. Here we establish the Arduino connection and reset the board to clear any
 80        residual data left on the Arduino board from previous experimental runs.
 81
 82        Parameters
 83        ----------
 84        - **exp_data** (*ExperimentProcessData*): An instance of `models.experiment_process_data` ExperimentProcessData. Used to access
 85        experiment variables and send them to the Arduino board.
 86        """
 87        self.BAUD_RATE: int = 115200
 88        self.arduino: None | serial.Serial = None
 89
 90        self.exp_data = exp_data
 91        self.arduino_data = exp_data.arduino_data
 92
 93        self.data_queue: queue.Queue[tuple[str, str]] = queue.Queue()
 94        self.stop_event: threading.Event = threading.Event()
 95        self.listener_thread: threading.Thread | None = None
 96
 97        # connect to the Arduino board if it is connected to the PC.
 98        self.connect_to_arduino()
 99
100        # reset the board fully to avoid improper communication on program 'reset'
101        reset_arduino = "RESET\n".encode("utf-8")
102        self.send_command(command=reset_arduino)

Initialize and handle the ArduinoManager class. Here we establish the Arduino connection and reset the board to clear any residual data left on the Arduino board from previous experimental runs.

Parameters

  • exp_data (ExperimentProcessData): An instance of models.experiment_process_data ExperimentProcessData. Used to access experiment variables and send them to the Arduino board.
BAUD_RATE: int
arduino: None | serial.serialposix.Serial
exp_data
arduino_data
data_queue: queue.Queue[tuple[str, str]]
stop_event: threading.Event
listener_thread: threading.Thread | None
def connect_to_arduino(self) -> None:
104    def connect_to_arduino(self) -> None:
105        """Connect to the Arduino board by searching all serial ports for a device with a manufacturer name 'Arduino'. If
106        found, establish serial.Serial connection. If not notify user."""
107        ports = serial.tools.list_ports.comports()
108        arduino_port = None
109
110        # find the arduino boards so we don't waste time trying to connect to empty ports
111        for p in ports:
112            if p.manufacturer is not None and "Arduino" in p.manufacturer:
113                arduino_port = p
114
115        # if we cannot find the arduino, no connection can be established, inform user and return.
116        if arduino_port is None:
117            error_message = (
118                "Arduino not connected. Reconnect Arduino and relaunch the program."
119            )
120            GUIUtils.display_error("Arduino Not Found", error_message)
121            logger.error(error_message)
122            return
123        else:
124            port = arduino_port.device
125
126            self.arduino = serial.Serial(port, self.BAUD_RATE)
127            logger.info(f"Arduino connected on port {port}")

Connect to the Arduino board by searching all serial ports for a device with a manufacturer name 'Arduino'. If found, establish serial.Serial connection. If not notify user.

def listen_for_serial(self) -> None:
129    def listen_for_serial(self) -> None:
130        """
131        Method to constantly scan for Arduino input. If received place in thread-save `data_queue` to process it later.
132        """
133        # if we do not have an arduino to listen to return and don't try to listen to it!
134        if self.arduino is None:
135            logger.error(
136                "ARDUINO COMMUNICATION ERROR!!! Class attribute 'arduino' is None."
137            )
138            return
139
140        while 1:
141            if self.stop_event.is_set():
142                break
143            try:
144                if self.arduino.in_waiting > 0:
145                    data = self.arduino.readline().decode("utf-8").strip()
146                    self.data_queue.put(("Arduino", data))
147
148                    # log the received data
149                    logger.info(f"Received -> {data} from arduino")
150            except Exception as e:
151                logger.error(f"Error reading from Arduino: {e}")
152                break
153            # sleep for a short time to avoid busy waiting
154            time.sleep(0.001)

Method to constantly scan for Arduino input. If received place in thread-save data_queue to process it later.

def stop_listener_thread(self) -> None:
156    def stop_listener_thread(self) -> None:
157        """
158        Method to set the stop event for the listener thread and
159        join it back to the main program thread.
160        """
161        self.stop_event.set()
162
163        if self.listener_thread is None:
164            return
165
166        if self.listener_thread.is_alive():
167            self.listener_thread.join()

Method to set the stop event for the listener thread and join it back to the main program thread.

def reset_arduino(self) -> None:
169    def reset_arduino(self) -> None:
170        """
171        Send a reset command to the Arduino board.
172        """
173        try:
174            command = "RESET\n".encode("utf-8")
175            self.send_command(command)
176
177            logger.info("Arduino reset.")
178        except Exception as e:
179            error_msg = f"Error resetting Arduino: {e}"
180
181            GUIUtils.display_error("RESET ERROR", error_msg)
182            logger.error(error_msg)

Send a reset command to the Arduino board.

def close_connection(self) -> None:
184    def close_connection(self) -> None:
185        """Close the serial connection to the Arduino board."""
186        if self.arduino is not None:
187            self.arduino.close()
188        logger.info("Closed connections to Arduino.")

Close the serial connection to the Arduino board.

def send_experiment_variables(self):
190    def send_experiment_variables(self):
191        """
192        This method is defined to send program variables num_stimuli and num_trials to
193        the Arduino. This is useful because it allows the Arduino to understand how long schedules should be (num_trials),
194        and what valve should be selected for side 2 (num_stimuli).
195        """
196        num_stimuli = self.exp_data.exp_var_entries["Num Stimuli"]
197        num_trials = self.exp_data.exp_var_entries["Num Trials"]
198
199        # 2 np 8 bit numbers converted to raw bytes
200        num_stimuli = np.int8(num_stimuli).tobytes()
201        num_trials = np.int8(num_trials).tobytes()
202
203        var_comm = "REC VAR\n".encode("utf-8")
204        self.send_command(var_comm)
205
206        packet = num_stimuli + num_trials
207        self.send_command(packet)

This method is defined to send program variables num_stimuli and num_trials to the Arduino. This is useful because it allows the Arduino to understand how long schedules should be (num_trials), and what valve should be selected for side 2 (num_stimuli).

def send_schedule_data(self) -> None:
209    def send_schedule_data(self) -> None:
210        """
211        Method to send valve schedule to the arduino so it know which valve to
212        open on a given trial.
213        First load np schedule arrays for both sides, turn them into bytes, and
214        set them back to back in a packet. i.e side two will follow side one
215        """
216        side_one, side_two = self.arduino_data.load_schedule_indices()
217
218        schedule_packet: bytes = side_one.tobytes() + side_two.tobytes()
219
220        sched_command = "REC SCHED\n".encode("utf-8")
221
222        self.send_command(sched_command)
223
224        self.send_command(schedule_packet)
225
226        self.verify_schedule(side_one, side_two)

Method to send valve schedule to the arduino so it know which valve to open on a given trial. First load np schedule arrays for both sides, turn them into bytes, and set them back to back in a packet. i.e side two will follow side one

def send_valve_durations(self) -> None:
228    def send_valve_durations(self) -> None:
229        """
230        Get side_one and side_two durations from `models.arduino_data` Arduino data model
231        will load from arduino_data.toml from the last_used 'profile'.
232        """
233        side_one, side_two, _ = self.arduino_data.load_durations()
234
235        side_one_durs = side_one.tobytes()
236        side_two_durs = side_two.tobytes()
237
238        dur_packet = side_one_durs + side_two_durs
239
240        dur_command = "REC DURATIONS\n".encode("utf-8")
241
242        self.send_command(dur_command)
243
244        self.send_command(dur_packet)
245
246        self.verify_durations(side_one, side_two)

Get side_one and side_two durations from models.arduino_data Arduino data model will load from arduino_data.toml from the last_used 'profile'.

def verify_schedule( self, side_one: numpy.ndarray[tuple[int, ...], numpy.dtype[numpy.int8]], side_two: numpy.ndarray[tuple[int, ...], numpy.dtype[numpy.int8]]) -> None:
248    def verify_schedule(
249        self, side_one: npt.NDArray[np.int8], side_two: npt.NDArray[np.int8]
250    ) -> None:
251        """
252        Method to tell arduino to give us the schedules that it
253        recieved. It will send data byte by byte in the order that it
254        recieved it (side one schedule, then side two) it will send EXACTLY
255        num_trials * 2 bytes (8 bit / 1 byte int for each trial on each side).
256        if successful we continue execution and log success message.
257        """
258        try:
259            ver_sched_command = "VER SCHED\n".encode("utf-8")
260            self.send_command(ver_sched_command)
261
262            num_trials = self.exp_data.exp_var_entries["Num Trials"]
263
264            ver1 = np.zeros((num_trials,), dtype=np.int8)
265            ver2 = np.zeros((num_trials,), dtype=np.int8)
266
267            if self.arduino is None:
268                msg = "ARDUINO IS NOT CONNECTED! Try reconnecting and restart the program."
269                logger.error(msg)
270                GUIUtils.display_error("ARDUINO ERROR", msg)
271
272                return
273
274            # wait for the data to arrive
275            while self.arduino.in_waiting < 0:
276                pass
277
278            for i in range(num_trials):
279                ver1[i] = int.from_bytes(
280                    self.arduino.read(), byteorder="little", signed=False
281                )
282            for i in range(num_trials):
283                ver2[i] = int.from_bytes(
284                    self.arduino.read(), byteorder="little", signed=False
285                )
286            logger.info(f"Arduino recieved side one as => {ver1}")
287            logger.info(f"Arduino recieved side two as => {ver2}")
288
289            if np.array_equal(side_one, ver1) and np.array_equal(side_two, ver2):
290                logger.info(
291                    "arduino has recieved and verified experiment valve schedule"
292                )
293            else:
294                GUIUtils.display_error(
295                    "======SCHEDULE ERROR======",
296                    "ARDUINO did not recieve the correct schedule. Please restart the program and attempt\
297                        schedule generation again.",
298                )
299        except Exception as e:
300            logger.error(f"error verifying arduino schedule {e}")

Method to tell arduino to give us the schedules that it recieved. It will send data byte by byte in the order that it recieved it (side one schedule, then side two) it will send EXACTLY num_trials * 2 bytes (8 bit / 1 byte int for each trial on each side). if successful we continue execution and log success message.

def verify_durations( self, side_one: numpy.ndarray[tuple[int, ...], numpy.dtype[numpy.int32]], side_two: numpy.ndarray[tuple[int, ...], numpy.dtype[numpy.int32]]) -> None:
302    def verify_durations(
303        self, side_one: npt.NDArray[np.int32], side_two: npt.NDArray[np.int32]
304    ) -> None:
305        """
306        Method to tell arduino to give us the schedules that it
307        recieved. It will send data byte by byte in the order that it
308        recieved it (side one schedule, then side two) it will send EXACTLY
309        num_trials * 2 bytes (8 bit / 1 byte int for each trial on each side).
310        if successful we continue execution and log success message.
311        """
312        try:
313            ver_dur_command = "VER DURATIONS\n".encode("utf-8")
314            self.send_command(ver_dur_command)
315
316            ver1 = np.zeros((VALVES_PER_SIDE,), dtype=np.int32)
317            ver2 = np.zeros((VALVES_PER_SIDE,), dtype=np.int32)
318
319            if self.arduino is None:
320                msg = "ARDUINO IS NOT CONNECTED! Try reconnecting and restart the program."
321                logger.error(msg)
322                GUIUtils.display_error("ARDUINO ERROR", msg)
323
324                return
325
326            # wait for the data to arrive
327            while self.arduino.in_waiting < 4 * 8:
328                pass
329            for i in range(VALVES_PER_SIDE):
330                duration = int.from_bytes(
331                    self.arduino.read(4), byteorder="little", signed=False
332                )
333                ver1[i] = duration
334            for i in range(VALVES_PER_SIDE):
335                duration = int.from_bytes(
336                    self.arduino.read(4), byteorder="little", signed=False
337                )
338                ver2[i] = duration
339
340            logger.info(f"Arduino recieved side one as => {ver1}")
341            logger.info(f"Arduino recieved side two as => {ver2}")
342
343            if np.array_equal(side_one, ver1) and np.array_equal(side_two, ver2):
344                logger.info("arduino has recieved and verified valve durations")
345            else:
346                GUIUtils.display_error(
347                    "======DURATIONS ERROR======",
348                    "ARDUINO did not recieve the correct valve durations. Please restart the program and attempt\
349                        schedule generation again.",
350                )
351        except Exception as e:
352            logger.error(f"error verifying arduino durations -> {e}")

Method to tell arduino to give us the schedules that it recieved. It will send data byte by byte in the order that it recieved it (side one schedule, then side two) it will send EXACTLY num_trials * 2 bytes (8 bit / 1 byte int for each trial on each side). if successful we continue execution and log success message.

def send_command(self, command: bytes):
354    def send_command(self, command: bytes):
355        """
356        Send a specific command to the Arduino. Command must be converted to raw bytes object
357        before being passed into this method
358        """
359        if self.arduino is None:
360            error_message = "Arduino connection was not established. Please reconnect the Arduino board and restart the program."
361            GUIUtils.display_error(
362                "====ARDUINO COMMUNICATION ERROR====:", error_message
363            )
364            logger.error(error_message)
365            return
366
367        try:
368            self.arduino.write(command)
369            logger.info(f"Sent {command} to arduino on -> {self.arduino.port}: ")
370        except Exception as e:
371            error_message = f"Error sending command to {self.arduino.port} Arduino: {e}"
372            GUIUtils.display_error("Error sending command to Arduino:", error_message)
373            logger.error(error_message)

Send a specific command to the Arduino. Command must be converted to raw bytes object before being passed into this method