arduino_control
This module is used to establish communication with the Arduino board. It sends information like schedule, experiment variable, and valve duration information to the boar, along with other commands important for the operation of the Experiment Rig.
1""" 2This module is used to establish communication with the Arduino board. It sends information like schedule, experiment variable, and 3valve duration information to the boar, along with other commands important for the operation of the Experiment Rig. 4""" 5 6import serial 7import serial.tools.list_ports 8import time 9import threading 10import queue 11import logging 12import numpy as np 13 14### USED FOR TYPE HINTING ### 15from models.experiment_process_data import ExperimentProcessData 16import numpy.typing as npt 17### USED FOR TYPE HINTING ### 18 19from views.gui_common import GUIUtils 20 21 22logger = logging.getLogger(__name__) 23 24# each side has 8 valves, therefore 8 durations in each array 25VALVES_PER_SIDE = 8 26 27 28class ArduinoManager: 29 """ 30 This class establishes a Serial connection to the Arduino board and facilitates communication of information between the board and the controlling 31 PC. All actions involving the Arduino are managed here. 32 33 Attributes 34 ---------- 35 - **BAUD_RATE** (*int*): This holds the class constant baud rate which is essentially the max rate of communication between the Arduino and the PC. 36 - **arduino** (*serial.Serial*): This variable hold the serial.Serial information for the Arduino. This includes what physical port the board is 37 connected to, baud rate and more. 38 - **exp_data** (*ExperimentProcessData*): An instance of `models.experiment_process_data` ExperimentProcessData, this variable allows access to 39 important program attributes like num_trials and program schedules that need to be send to the arduino board. 40 - **arduino_data** (*ArduinoData*): This is a reference to the program instance of `models.arduino_data` ArduinoData. It allows access to this 41 class and its methods which allows ArduinoManager to load data stored there such as valve duration times and schedule indicies. 42 - **data_queue** (*queue.Queue[tuple[str, str]]*): This is the queue that facilitates data transmission between the arduino's `listener_thread`, 43 which constantly listens for any data coming from the arduino board. This queue is processed in the `app_logic` method `process_queue`. This allows 44 the main thread to do other important work, only processing arduino information every so often, if there is any to process. 45 - **stop_event** (*threading.Event*): This event is set in the class method `stop_listener_thread`. This is a thread safe data type that allows 46 us to exit the listener thread to avoid leaving threads busy when exiting the main application. 47 - **listener_thread** (*threading.Thread | None*): Previously discussed peripherally, this is the thread that listens constantly for new information 48 from the arduino board. The threads target method is the `listen_for_serial` method. 49 50 Methods 51 ------- 52 - `connect_to_arduino`() 53 Scans available ports for devices named 'Arduino' to establish a connection with the Arduino board. 54 - `listen_for_serial`() 55 Continuously listens for incoming serial data from the Arduino, adding received messages to `data_queue`. 56 - `stop_listener_thread`() 57 Signals the listener thread to stop and safely joins it back to the main thread. 58 - `reset_arduino`() 59 Sends a reset command to the Arduino board. Used after connection is established to clear any residual data on the board. 60 - `close_connection`() 61 Closes the serial connection to the Arduino board. 62 - `send_experiment_variables`() 63 Transmits experimental variables (number of stimuli and trials) to the Arduino for schedule configuration. 64 - `send_schedule_data`() 65 Sends valve schedule data to the Arduino to define which valve should open on either side for a given trial. 66 - `send_valve_durations`() 67 Sends the calculated duration settings for each valve to the Arduino. 68 - `verify_schedule`(side_one: npt.NDArray[np.int8], side_two: npt.NDArray[np.int8]) 69 Requests and verifies the valve schedule received by the Arduino by comparing against the sent values. 70 - `verify_durations`(side_one: npt.NDArray[np.int32], side_two: npt.NDArray[np.int32]) 71 Requests and verifies the valve durations received by the Arduino by comparing against the sent values. 72 - `send_command`(command: bytes) 73 Sends a given command to the Arduino, ensuring communication reliability. 74 """ 75 76 def __init__(self, exp_data: ExperimentProcessData) -> None: 77 """ 78 Initialize and handle the ArduinoManager class. Here we establish the Arduino connection and reset the board to clear any 79 residual data left on the Arduino board from previous experimental runs. 80 81 Parameters 82 ---------- 83 - **exp_data** (*ExperimentProcessData*): An instance of `models.experiment_process_data` ExperimentProcessData. Used to access 84 experiment variables and send them to the Arduino board. 85 """ 86 self.BAUD_RATE: int = 115200 87 self.arduino: None | serial.Serial = None 88 89 self.exp_data = exp_data 90 self.arduino_data = exp_data.arduino_data 91 92 self.data_queue: queue.Queue[tuple[str, str]] = queue.Queue() 93 self.stop_event: threading.Event = threading.Event() 94 self.listener_thread: threading.Thread | None = None 95 96 # connect to the Arduino board if it is connected to the PC. 97 self.connect_to_arduino() 98 99 # reset the board fully to avoid improper communication on program 'reset' 100 reset_arduino = "RESET\n".encode("utf-8") 101 self.send_command(command=reset_arduino) 102 103 def connect_to_arduino(self) -> None: 104 """Connect to the Arduino board by searching all serial ports for a device with a manufacturer name 'Arduino'. If 105 found, establish serial.Serial connection. If not notify user.""" 106 ports = serial.tools.list_ports.comports() 107 arduino_port = None 108 109 # find the arduino boards so we don't waste time trying to connect to empty ports 110 for p in ports: 111 if p.manufacturer is not None and "Arduino" in p.manufacturer: 112 arduino_port = p 113 114 # if we cannot find the arduino, no connection can be established, inform user and return. 115 if arduino_port is None: 116 error_message = ( 117 "Arduino not connected. Reconnect Arduino and relaunch the program." 118 ) 119 GUIUtils.display_error("Arduino Not Found", error_message) 120 logger.error(error_message) 121 return 122 else: 123 port = arduino_port.device 124 125 self.arduino = serial.Serial(port, self.BAUD_RATE) 126 logger.info(f"Arduino connected on port {port}") 127 128 def listen_for_serial(self) -> None: 129 """ 130 Method to constantly scan for Arduino input. If received place in thread-save `data_queue` to process it later. 131 """ 132 # if we do not have an arduino to listen to return and don't try to listen to it! 133 if self.arduino is None: 134 logger.error( 135 "ARDUINO COMMUNICATION ERROR!!! Class attribute 'arduino' is None." 136 ) 137 return 138 139 while 1: 140 if self.stop_event.is_set(): 141 break 142 try: 143 if self.arduino.in_waiting > 0: 144 data = self.arduino.readline().decode("utf-8").strip() 145 self.data_queue.put(("Arduino", data)) 146 147 # log the received data 148 logger.info(f"Received -> {data} from arduino") 149 except Exception as e: 150 logger.error(f"Error reading from Arduino: {e}") 151 break 152 # sleep for a short time to avoid busy waiting 153 time.sleep(0.001) 154 155 def stop_listener_thread(self) -> None: 156 """ 157 Method to set the stop event for the listener thread and 158 join it back to the main program thread. 159 """ 160 self.stop_event.set() 161 162 if self.listener_thread is None: 163 return 164 165 if self.listener_thread.is_alive(): 166 self.listener_thread.join() 167 168 def reset_arduino(self) -> None: 169 """ 170 Send a reset command to the Arduino board. 171 """ 172 try: 173 command = "RESET\n".encode("utf-8") 174 self.send_command(command) 175 176 logger.info("Arduino reset.") 177 except Exception as e: 178 error_msg = f"Error resetting Arduino: {e}" 179 180 GUIUtils.display_error("RESET ERROR", error_msg) 181 logger.error(error_msg) 182 183 def close_connection(self) -> None: 184 """Close the serial connection to the Arduino board.""" 185 if self.arduino is not None: 186 self.arduino.close() 187 logger.info("Closed connections to Arduino.") 188 189 def send_experiment_variables(self): 190 """ 191 This method is defined to send program variables num_stimuli and num_trials to 192 the Arduino. This is useful because it allows the Arduino to understand how long schedules should be (num_trials), 193 and what valve should be selected for side 2 (num_stimuli). 194 """ 195 num_stimuli = self.exp_data.exp_var_entries["Num Stimuli"] 196 num_trials = self.exp_data.exp_var_entries["Num Trials"] 197 198 # 2 np 8 bit numbers converted to raw bytes 199 num_stimuli = np.int8(num_stimuli).tobytes() 200 num_trials = np.int8(num_trials).tobytes() 201 202 var_comm = "REC VAR\n".encode("utf-8") 203 self.send_command(var_comm) 204 205 packet = num_stimuli + num_trials 206 self.send_command(packet) 207 208 def send_schedule_data(self) -> None: 209 """ 210 Method to send valve schedule to the arduino so it know which valve to 211 open on a given trial. 212 First load np schedule arrays for both sides, turn them into bytes, and 213 set them back to back in a packet. i.e side two will follow side one 214 """ 215 side_one, side_two = self.arduino_data.load_schedule_indices() 216 217 schedule_packet: bytes = side_one.tobytes() + side_two.tobytes() 218 219 sched_command = "REC SCHED\n".encode("utf-8") 220 221 self.send_command(sched_command) 222 223 self.send_command(schedule_packet) 224 225 self.verify_schedule(side_one, side_two) 226 227 def send_valve_durations(self) -> None: 228 """ 229 Get side_one and side_two durations from `models.arduino_data` Arduino data model 230 will load from arduino_data.toml from the last_used 'profile'. 231 """ 232 side_one, side_two, _ = self.arduino_data.load_durations() 233 234 side_one_durs = side_one.tobytes() 235 side_two_durs = side_two.tobytes() 236 237 dur_packet = side_one_durs + side_two_durs 238 239 dur_command = "REC DURATIONS\n".encode("utf-8") 240 241 self.send_command(dur_command) 242 243 self.send_command(dur_packet) 244 245 self.verify_durations(side_one, side_two) 246 247 def verify_schedule( 248 self, side_one: npt.NDArray[np.int8], side_two: npt.NDArray[np.int8] 249 ) -> None: 250 """ 251 Method to tell arduino to give us the schedules that it 252 recieved. It will send data byte by byte in the order that it 253 recieved it (side one schedule, then side two) it will send EXACTLY 254 num_trials * 2 bytes (8 bit / 1 byte int for each trial on each side). 255 if successful we continue execution and log success message. 256 """ 257 try: 258 ver_sched_command = "VER SCHED\n".encode("utf-8") 259 self.send_command(ver_sched_command) 260 261 num_trials = self.exp_data.exp_var_entries["Num Trials"] 262 263 ver1 = np.zeros((num_trials,), dtype=np.int8) 264 ver2 = np.zeros((num_trials,), dtype=np.int8) 265 266 if self.arduino is None: 267 msg = "ARDUINO IS NOT CONNECTED! Try reconnecting and restart the program." 268 logger.error(msg) 269 GUIUtils.display_error("ARDUINO ERROR", msg) 270 271 return 272 273 # wait for the data to arrive 274 while self.arduino.in_waiting < 0: 275 pass 276 277 for i in range(num_trials): 278 ver1[i] = int.from_bytes( 279 self.arduino.read(), byteorder="little", signed=False 280 ) 281 for i in range(num_trials): 282 ver2[i] = int.from_bytes( 283 self.arduino.read(), byteorder="little", signed=False 284 ) 285 logger.info(f"Arduino recieved side one as => {ver1}") 286 logger.info(f"Arduino recieved side two as => {ver2}") 287 288 if np.array_equal(side_one, ver1) and np.array_equal(side_two, ver2): 289 logger.info( 290 "arduino has recieved and verified experiment valve schedule" 291 ) 292 else: 293 GUIUtils.display_error( 294 "======SCHEDULE ERROR======", 295 "ARDUINO did not recieve the correct schedule. Please restart the program and attempt\ 296 schedule generation again.", 297 ) 298 except Exception as e: 299 logger.error(f"error verifying arduino schedule {e}") 300 301 def verify_durations( 302 self, side_one: npt.NDArray[np.int32], side_two: npt.NDArray[np.int32] 303 ) -> None: 304 """ 305 Method to tell arduino to give us the schedules that it 306 recieved. It will send data byte by byte in the order that it 307 recieved it (side one schedule, then side two) it will send EXACTLY 308 num_trials * 2 bytes (8 bit / 1 byte int for each trial on each side). 309 if successful we continue execution and log success message. 310 """ 311 try: 312 ver_dur_command = "VER DURATIONS\n".encode("utf-8") 313 self.send_command(ver_dur_command) 314 315 ver1 = np.zeros((VALVES_PER_SIDE,), dtype=np.int32) 316 ver2 = np.zeros((VALVES_PER_SIDE,), dtype=np.int32) 317 318 if self.arduino is None: 319 msg = "ARDUINO IS NOT CONNECTED! Try reconnecting and restart the program." 320 logger.error(msg) 321 GUIUtils.display_error("ARDUINO ERROR", msg) 322 323 return 324 325 # wait for the data to arrive 326 while self.arduino.in_waiting < 4 * 8: 327 pass 328 for i in range(VALVES_PER_SIDE): 329 duration = int.from_bytes( 330 self.arduino.read(4), byteorder="little", signed=False 331 ) 332 ver1[i] = duration 333 for i in range(VALVES_PER_SIDE): 334 duration = int.from_bytes( 335 self.arduino.read(4), byteorder="little", signed=False 336 ) 337 ver2[i] = duration 338 339 logger.info(f"Arduino recieved side one as => {ver1}") 340 logger.info(f"Arduino recieved side two as => {ver2}") 341 342 if np.array_equal(side_one, ver1) and np.array_equal(side_two, ver2): 343 logger.info("arduino has recieved and verified valve durations") 344 else: 345 GUIUtils.display_error( 346 "======DURATIONS ERROR======", 347 "ARDUINO did not recieve the correct valve durations. Please restart the program and attempt\ 348 schedule generation again.", 349 ) 350 except Exception as e: 351 logger.error(f"error verifying arduino durations -> {e}") 352 353 def send_command(self, command: bytes): 354 """ 355 Send a specific command to the Arduino. Command must be converted to raw bytes object 356 before being passed into this method 357 """ 358 if self.arduino is None: 359 error_message = "Arduino connection was not established. Please reconnect the Arduino board and restart the program." 360 GUIUtils.display_error( 361 "====ARDUINO COMMUNICATION ERROR====:", error_message 362 ) 363 logger.error(error_message) 364 return 365 366 try: 367 self.arduino.write(command) 368 logger.info(f"Sent {command} to arduino on -> {self.arduino.port}: ") 369 except Exception as e: 370 error_message = f"Error sending command to {self.arduino.port} Arduino: {e}" 371 GUIUtils.display_error("Error sending command to Arduino:", error_message) 372 logger.error(error_message)
29class ArduinoManager: 30 """ 31 This class establishes a Serial connection to the Arduino board and facilitates communication of information between the board and the controlling 32 PC. All actions involving the Arduino are managed here. 33 34 Attributes 35 ---------- 36 - **BAUD_RATE** (*int*): This holds the class constant baud rate which is essentially the max rate of communication between the Arduino and the PC. 37 - **arduino** (*serial.Serial*): This variable hold the serial.Serial information for the Arduino. This includes what physical port the board is 38 connected to, baud rate and more. 39 - **exp_data** (*ExperimentProcessData*): An instance of `models.experiment_process_data` ExperimentProcessData, this variable allows access to 40 important program attributes like num_trials and program schedules that need to be send to the arduino board. 41 - **arduino_data** (*ArduinoData*): This is a reference to the program instance of `models.arduino_data` ArduinoData. It allows access to this 42 class and its methods which allows ArduinoManager to load data stored there such as valve duration times and schedule indicies. 43 - **data_queue** (*queue.Queue[tuple[str, str]]*): This is the queue that facilitates data transmission between the arduino's `listener_thread`, 44 which constantly listens for any data coming from the arduino board. This queue is processed in the `app_logic` method `process_queue`. This allows 45 the main thread to do other important work, only processing arduino information every so often, if there is any to process. 46 - **stop_event** (*threading.Event*): This event is set in the class method `stop_listener_thread`. This is a thread safe data type that allows 47 us to exit the listener thread to avoid leaving threads busy when exiting the main application. 48 - **listener_thread** (*threading.Thread | None*): Previously discussed peripherally, this is the thread that listens constantly for new information 49 from the arduino board. The threads target method is the `listen_for_serial` method. 50 51 Methods 52 ------- 53 - `connect_to_arduino`() 54 Scans available ports for devices named 'Arduino' to establish a connection with the Arduino board. 55 - `listen_for_serial`() 56 Continuously listens for incoming serial data from the Arduino, adding received messages to `data_queue`. 57 - `stop_listener_thread`() 58 Signals the listener thread to stop and safely joins it back to the main thread. 59 - `reset_arduino`() 60 Sends a reset command to the Arduino board. Used after connection is established to clear any residual data on the board. 61 - `close_connection`() 62 Closes the serial connection to the Arduino board. 63 - `send_experiment_variables`() 64 Transmits experimental variables (number of stimuli and trials) to the Arduino for schedule configuration. 65 - `send_schedule_data`() 66 Sends valve schedule data to the Arduino to define which valve should open on either side for a given trial. 67 - `send_valve_durations`() 68 Sends the calculated duration settings for each valve to the Arduino. 69 - `verify_schedule`(side_one: npt.NDArray[np.int8], side_two: npt.NDArray[np.int8]) 70 Requests and verifies the valve schedule received by the Arduino by comparing against the sent values. 71 - `verify_durations`(side_one: npt.NDArray[np.int32], side_two: npt.NDArray[np.int32]) 72 Requests and verifies the valve durations received by the Arduino by comparing against the sent values. 73 - `send_command`(command: bytes) 74 Sends a given command to the Arduino, ensuring communication reliability. 75 """ 76 77 def __init__(self, exp_data: ExperimentProcessData) -> None: 78 """ 79 Initialize and handle the ArduinoManager class. Here we establish the Arduino connection and reset the board to clear any 80 residual data left on the Arduino board from previous experimental runs. 81 82 Parameters 83 ---------- 84 - **exp_data** (*ExperimentProcessData*): An instance of `models.experiment_process_data` ExperimentProcessData. Used to access 85 experiment variables and send them to the Arduino board. 86 """ 87 self.BAUD_RATE: int = 115200 88 self.arduino: None | serial.Serial = None 89 90 self.exp_data = exp_data 91 self.arduino_data = exp_data.arduino_data 92 93 self.data_queue: queue.Queue[tuple[str, str]] = queue.Queue() 94 self.stop_event: threading.Event = threading.Event() 95 self.listener_thread: threading.Thread | None = None 96 97 # connect to the Arduino board if it is connected to the PC. 98 self.connect_to_arduino() 99 100 # reset the board fully to avoid improper communication on program 'reset' 101 reset_arduino = "RESET\n".encode("utf-8") 102 self.send_command(command=reset_arduino) 103 104 def connect_to_arduino(self) -> None: 105 """Connect to the Arduino board by searching all serial ports for a device with a manufacturer name 'Arduino'. If 106 found, establish serial.Serial connection. If not notify user.""" 107 ports = serial.tools.list_ports.comports() 108 arduino_port = None 109 110 # find the arduino boards so we don't waste time trying to connect to empty ports 111 for p in ports: 112 if p.manufacturer is not None and "Arduino" in p.manufacturer: 113 arduino_port = p 114 115 # if we cannot find the arduino, no connection can be established, inform user and return. 116 if arduino_port is None: 117 error_message = ( 118 "Arduino not connected. Reconnect Arduino and relaunch the program." 119 ) 120 GUIUtils.display_error("Arduino Not Found", error_message) 121 logger.error(error_message) 122 return 123 else: 124 port = arduino_port.device 125 126 self.arduino = serial.Serial(port, self.BAUD_RATE) 127 logger.info(f"Arduino connected on port {port}") 128 129 def listen_for_serial(self) -> None: 130 """ 131 Method to constantly scan for Arduino input. If received place in thread-save `data_queue` to process it later. 132 """ 133 # if we do not have an arduino to listen to return and don't try to listen to it! 134 if self.arduino is None: 135 logger.error( 136 "ARDUINO COMMUNICATION ERROR!!! Class attribute 'arduino' is None." 137 ) 138 return 139 140 while 1: 141 if self.stop_event.is_set(): 142 break 143 try: 144 if self.arduino.in_waiting > 0: 145 data = self.arduino.readline().decode("utf-8").strip() 146 self.data_queue.put(("Arduino", data)) 147 148 # log the received data 149 logger.info(f"Received -> {data} from arduino") 150 except Exception as e: 151 logger.error(f"Error reading from Arduino: {e}") 152 break 153 # sleep for a short time to avoid busy waiting 154 time.sleep(0.001) 155 156 def stop_listener_thread(self) -> None: 157 """ 158 Method to set the stop event for the listener thread and 159 join it back to the main program thread. 160 """ 161 self.stop_event.set() 162 163 if self.listener_thread is None: 164 return 165 166 if self.listener_thread.is_alive(): 167 self.listener_thread.join() 168 169 def reset_arduino(self) -> None: 170 """ 171 Send a reset command to the Arduino board. 172 """ 173 try: 174 command = "RESET\n".encode("utf-8") 175 self.send_command(command) 176 177 logger.info("Arduino reset.") 178 except Exception as e: 179 error_msg = f"Error resetting Arduino: {e}" 180 181 GUIUtils.display_error("RESET ERROR", error_msg) 182 logger.error(error_msg) 183 184 def close_connection(self) -> None: 185 """Close the serial connection to the Arduino board.""" 186 if self.arduino is not None: 187 self.arduino.close() 188 logger.info("Closed connections to Arduino.") 189 190 def send_experiment_variables(self): 191 """ 192 This method is defined to send program variables num_stimuli and num_trials to 193 the Arduino. This is useful because it allows the Arduino to understand how long schedules should be (num_trials), 194 and what valve should be selected for side 2 (num_stimuli). 195 """ 196 num_stimuli = self.exp_data.exp_var_entries["Num Stimuli"] 197 num_trials = self.exp_data.exp_var_entries["Num Trials"] 198 199 # 2 np 8 bit numbers converted to raw bytes 200 num_stimuli = np.int8(num_stimuli).tobytes() 201 num_trials = np.int8(num_trials).tobytes() 202 203 var_comm = "REC VAR\n".encode("utf-8") 204 self.send_command(var_comm) 205 206 packet = num_stimuli + num_trials 207 self.send_command(packet) 208 209 def send_schedule_data(self) -> None: 210 """ 211 Method to send valve schedule to the arduino so it know which valve to 212 open on a given trial. 213 First load np schedule arrays for both sides, turn them into bytes, and 214 set them back to back in a packet. i.e side two will follow side one 215 """ 216 side_one, side_two = self.arduino_data.load_schedule_indices() 217 218 schedule_packet: bytes = side_one.tobytes() + side_two.tobytes() 219 220 sched_command = "REC SCHED\n".encode("utf-8") 221 222 self.send_command(sched_command) 223 224 self.send_command(schedule_packet) 225 226 self.verify_schedule(side_one, side_two) 227 228 def send_valve_durations(self) -> None: 229 """ 230 Get side_one and side_two durations from `models.arduino_data` Arduino data model 231 will load from arduino_data.toml from the last_used 'profile'. 232 """ 233 side_one, side_two, _ = self.arduino_data.load_durations() 234 235 side_one_durs = side_one.tobytes() 236 side_two_durs = side_two.tobytes() 237 238 dur_packet = side_one_durs + side_two_durs 239 240 dur_command = "REC DURATIONS\n".encode("utf-8") 241 242 self.send_command(dur_command) 243 244 self.send_command(dur_packet) 245 246 self.verify_durations(side_one, side_two) 247 248 def verify_schedule( 249 self, side_one: npt.NDArray[np.int8], side_two: npt.NDArray[np.int8] 250 ) -> None: 251 """ 252 Method to tell arduino to give us the schedules that it 253 recieved. It will send data byte by byte in the order that it 254 recieved it (side one schedule, then side two) it will send EXACTLY 255 num_trials * 2 bytes (8 bit / 1 byte int for each trial on each side). 256 if successful we continue execution and log success message. 257 """ 258 try: 259 ver_sched_command = "VER SCHED\n".encode("utf-8") 260 self.send_command(ver_sched_command) 261 262 num_trials = self.exp_data.exp_var_entries["Num Trials"] 263 264 ver1 = np.zeros((num_trials,), dtype=np.int8) 265 ver2 = np.zeros((num_trials,), dtype=np.int8) 266 267 if self.arduino is None: 268 msg = "ARDUINO IS NOT CONNECTED! Try reconnecting and restart the program." 269 logger.error(msg) 270 GUIUtils.display_error("ARDUINO ERROR", msg) 271 272 return 273 274 # wait for the data to arrive 275 while self.arduino.in_waiting < 0: 276 pass 277 278 for i in range(num_trials): 279 ver1[i] = int.from_bytes( 280 self.arduino.read(), byteorder="little", signed=False 281 ) 282 for i in range(num_trials): 283 ver2[i] = int.from_bytes( 284 self.arduino.read(), byteorder="little", signed=False 285 ) 286 logger.info(f"Arduino recieved side one as => {ver1}") 287 logger.info(f"Arduino recieved side two as => {ver2}") 288 289 if np.array_equal(side_one, ver1) and np.array_equal(side_two, ver2): 290 logger.info( 291 "arduino has recieved and verified experiment valve schedule" 292 ) 293 else: 294 GUIUtils.display_error( 295 "======SCHEDULE ERROR======", 296 "ARDUINO did not recieve the correct schedule. Please restart the program and attempt\ 297 schedule generation again.", 298 ) 299 except Exception as e: 300 logger.error(f"error verifying arduino schedule {e}") 301 302 def verify_durations( 303 self, side_one: npt.NDArray[np.int32], side_two: npt.NDArray[np.int32] 304 ) -> None: 305 """ 306 Method to tell arduino to give us the schedules that it 307 recieved. It will send data byte by byte in the order that it 308 recieved it (side one schedule, then side two) it will send EXACTLY 309 num_trials * 2 bytes (8 bit / 1 byte int for each trial on each side). 310 if successful we continue execution and log success message. 311 """ 312 try: 313 ver_dur_command = "VER DURATIONS\n".encode("utf-8") 314 self.send_command(ver_dur_command) 315 316 ver1 = np.zeros((VALVES_PER_SIDE,), dtype=np.int32) 317 ver2 = np.zeros((VALVES_PER_SIDE,), dtype=np.int32) 318 319 if self.arduino is None: 320 msg = "ARDUINO IS NOT CONNECTED! Try reconnecting and restart the program." 321 logger.error(msg) 322 GUIUtils.display_error("ARDUINO ERROR", msg) 323 324 return 325 326 # wait for the data to arrive 327 while self.arduino.in_waiting < 4 * 8: 328 pass 329 for i in range(VALVES_PER_SIDE): 330 duration = int.from_bytes( 331 self.arduino.read(4), byteorder="little", signed=False 332 ) 333 ver1[i] = duration 334 for i in range(VALVES_PER_SIDE): 335 duration = int.from_bytes( 336 self.arduino.read(4), byteorder="little", signed=False 337 ) 338 ver2[i] = duration 339 340 logger.info(f"Arduino recieved side one as => {ver1}") 341 logger.info(f"Arduino recieved side two as => {ver2}") 342 343 if np.array_equal(side_one, ver1) and np.array_equal(side_two, ver2): 344 logger.info("arduino has recieved and verified valve durations") 345 else: 346 GUIUtils.display_error( 347 "======DURATIONS ERROR======", 348 "ARDUINO did not recieve the correct valve durations. Please restart the program and attempt\ 349 schedule generation again.", 350 ) 351 except Exception as e: 352 logger.error(f"error verifying arduino durations -> {e}") 353 354 def send_command(self, command: bytes): 355 """ 356 Send a specific command to the Arduino. Command must be converted to raw bytes object 357 before being passed into this method 358 """ 359 if self.arduino is None: 360 error_message = "Arduino connection was not established. Please reconnect the Arduino board and restart the program." 361 GUIUtils.display_error( 362 "====ARDUINO COMMUNICATION ERROR====:", error_message 363 ) 364 logger.error(error_message) 365 return 366 367 try: 368 self.arduino.write(command) 369 logger.info(f"Sent {command} to arduino on -> {self.arduino.port}: ") 370 except Exception as e: 371 error_message = f"Error sending command to {self.arduino.port} Arduino: {e}" 372 GUIUtils.display_error("Error sending command to Arduino:", error_message) 373 logger.error(error_message)
This class establishes a Serial connection to the Arduino board and facilitates communication of information between the board and the controlling PC. All actions involving the Arduino are managed here.
Attributes
- BAUD_RATE (int): This holds the class constant baud rate which is essentially the max rate of communication between the Arduino and the PC.
- arduino (serial.Serial): This variable hold the serial.Serial information for the Arduino. This includes what physical port the board is connected to, baud rate and more.
- exp_data (ExperimentProcessData): An instance of
models.experiment_process_data
ExperimentProcessData, this variable allows access to important program attributes like num_trials and program schedules that need to be send to the arduino board. - arduino_data (ArduinoData): This is a reference to the program instance of
models.arduino_data
ArduinoData. It allows access to this class and its methods which allows ArduinoManager to load data stored there such as valve duration times and schedule indicies. - data_queue (queue.Queue[tuple[str, str]]): This is the queue that facilitates data transmission between the arduino's
listener_thread
, which constantly listens for any data coming from the arduino board. This queue is processed in theapp_logic
methodprocess_queue
. This allows the main thread to do other important work, only processing arduino information every so often, if there is any to process. - stop_event (threading.Event): This event is set in the class method
stop_listener_thread
. This is a thread safe data type that allows us to exit the listener thread to avoid leaving threads busy when exiting the main application. - listener_thread (threading.Thread | None): Previously discussed peripherally, this is the thread that listens constantly for new information
from the arduino board. The threads target method is the
listen_for_serial
method.
Methods
connect_to_arduino
() Scans available ports for devices named 'Arduino' to establish a connection with the Arduino board.listen_for_serial
() Continuously listens for incoming serial data from the Arduino, adding received messages todata_queue
.stop_listener_thread
() Signals the listener thread to stop and safely joins it back to the main thread.reset_arduino
() Sends a reset command to the Arduino board. Used after connection is established to clear any residual data on the board.close_connection
() Closes the serial connection to the Arduino board.send_experiment_variables
() Transmits experimental variables (number of stimuli and trials) to the Arduino for schedule configuration.send_schedule_data
() Sends valve schedule data to the Arduino to define which valve should open on either side for a given trial.send_valve_durations
() Sends the calculated duration settings for each valve to the Arduino.verify_schedule
(side_one: npt.NDArray[np.int8], side_two: npt.NDArray[np.int8]) Requests and verifies the valve schedule received by the Arduino by comparing against the sent values.verify_durations
(side_one: npt.NDArray[np.int32], side_two: npt.NDArray[np.int32]) Requests and verifies the valve durations received by the Arduino by comparing against the sent values.send_command
(command: bytes) Sends a given command to the Arduino, ensuring communication reliability.
77 def __init__(self, exp_data: ExperimentProcessData) -> None: 78 """ 79 Initialize and handle the ArduinoManager class. Here we establish the Arduino connection and reset the board to clear any 80 residual data left on the Arduino board from previous experimental runs. 81 82 Parameters 83 ---------- 84 - **exp_data** (*ExperimentProcessData*): An instance of `models.experiment_process_data` ExperimentProcessData. Used to access 85 experiment variables and send them to the Arduino board. 86 """ 87 self.BAUD_RATE: int = 115200 88 self.arduino: None | serial.Serial = None 89 90 self.exp_data = exp_data 91 self.arduino_data = exp_data.arduino_data 92 93 self.data_queue: queue.Queue[tuple[str, str]] = queue.Queue() 94 self.stop_event: threading.Event = threading.Event() 95 self.listener_thread: threading.Thread | None = None 96 97 # connect to the Arduino board if it is connected to the PC. 98 self.connect_to_arduino() 99 100 # reset the board fully to avoid improper communication on program 'reset' 101 reset_arduino = "RESET\n".encode("utf-8") 102 self.send_command(command=reset_arduino)
Initialize and handle the ArduinoManager class. Here we establish the Arduino connection and reset the board to clear any residual data left on the Arduino board from previous experimental runs.
Parameters
- exp_data (ExperimentProcessData): An instance of
models.experiment_process_data
ExperimentProcessData. Used to access experiment variables and send them to the Arduino board.
104 def connect_to_arduino(self) -> None: 105 """Connect to the Arduino board by searching all serial ports for a device with a manufacturer name 'Arduino'. If 106 found, establish serial.Serial connection. If not notify user.""" 107 ports = serial.tools.list_ports.comports() 108 arduino_port = None 109 110 # find the arduino boards so we don't waste time trying to connect to empty ports 111 for p in ports: 112 if p.manufacturer is not None and "Arduino" in p.manufacturer: 113 arduino_port = p 114 115 # if we cannot find the arduino, no connection can be established, inform user and return. 116 if arduino_port is None: 117 error_message = ( 118 "Arduino not connected. Reconnect Arduino and relaunch the program." 119 ) 120 GUIUtils.display_error("Arduino Not Found", error_message) 121 logger.error(error_message) 122 return 123 else: 124 port = arduino_port.device 125 126 self.arduino = serial.Serial(port, self.BAUD_RATE) 127 logger.info(f"Arduino connected on port {port}")
Connect to the Arduino board by searching all serial ports for a device with a manufacturer name 'Arduino'. If found, establish serial.Serial connection. If not notify user.
129 def listen_for_serial(self) -> None: 130 """ 131 Method to constantly scan for Arduino input. If received place in thread-save `data_queue` to process it later. 132 """ 133 # if we do not have an arduino to listen to return and don't try to listen to it! 134 if self.arduino is None: 135 logger.error( 136 "ARDUINO COMMUNICATION ERROR!!! Class attribute 'arduino' is None." 137 ) 138 return 139 140 while 1: 141 if self.stop_event.is_set(): 142 break 143 try: 144 if self.arduino.in_waiting > 0: 145 data = self.arduino.readline().decode("utf-8").strip() 146 self.data_queue.put(("Arduino", data)) 147 148 # log the received data 149 logger.info(f"Received -> {data} from arduino") 150 except Exception as e: 151 logger.error(f"Error reading from Arduino: {e}") 152 break 153 # sleep for a short time to avoid busy waiting 154 time.sleep(0.001)
Method to constantly scan for Arduino input. If received place in thread-save data_queue
to process it later.
156 def stop_listener_thread(self) -> None: 157 """ 158 Method to set the stop event for the listener thread and 159 join it back to the main program thread. 160 """ 161 self.stop_event.set() 162 163 if self.listener_thread is None: 164 return 165 166 if self.listener_thread.is_alive(): 167 self.listener_thread.join()
Method to set the stop event for the listener thread and join it back to the main program thread.
169 def reset_arduino(self) -> None: 170 """ 171 Send a reset command to the Arduino board. 172 """ 173 try: 174 command = "RESET\n".encode("utf-8") 175 self.send_command(command) 176 177 logger.info("Arduino reset.") 178 except Exception as e: 179 error_msg = f"Error resetting Arduino: {e}" 180 181 GUIUtils.display_error("RESET ERROR", error_msg) 182 logger.error(error_msg)
Send a reset command to the Arduino board.
184 def close_connection(self) -> None: 185 """Close the serial connection to the Arduino board.""" 186 if self.arduino is not None: 187 self.arduino.close() 188 logger.info("Closed connections to Arduino.")
Close the serial connection to the Arduino board.
190 def send_experiment_variables(self): 191 """ 192 This method is defined to send program variables num_stimuli and num_trials to 193 the Arduino. This is useful because it allows the Arduino to understand how long schedules should be (num_trials), 194 and what valve should be selected for side 2 (num_stimuli). 195 """ 196 num_stimuli = self.exp_data.exp_var_entries["Num Stimuli"] 197 num_trials = self.exp_data.exp_var_entries["Num Trials"] 198 199 # 2 np 8 bit numbers converted to raw bytes 200 num_stimuli = np.int8(num_stimuli).tobytes() 201 num_trials = np.int8(num_trials).tobytes() 202 203 var_comm = "REC VAR\n".encode("utf-8") 204 self.send_command(var_comm) 205 206 packet = num_stimuli + num_trials 207 self.send_command(packet)
This method is defined to send program variables num_stimuli and num_trials to the Arduino. This is useful because it allows the Arduino to understand how long schedules should be (num_trials), and what valve should be selected for side 2 (num_stimuli).
209 def send_schedule_data(self) -> None: 210 """ 211 Method to send valve schedule to the arduino so it know which valve to 212 open on a given trial. 213 First load np schedule arrays for both sides, turn them into bytes, and 214 set them back to back in a packet. i.e side two will follow side one 215 """ 216 side_one, side_two = self.arduino_data.load_schedule_indices() 217 218 schedule_packet: bytes = side_one.tobytes() + side_two.tobytes() 219 220 sched_command = "REC SCHED\n".encode("utf-8") 221 222 self.send_command(sched_command) 223 224 self.send_command(schedule_packet) 225 226 self.verify_schedule(side_one, side_two)
Method to send valve schedule to the arduino so it know which valve to open on a given trial. First load np schedule arrays for both sides, turn them into bytes, and set them back to back in a packet. i.e side two will follow side one
228 def send_valve_durations(self) -> None: 229 """ 230 Get side_one and side_two durations from `models.arduino_data` Arduino data model 231 will load from arduino_data.toml from the last_used 'profile'. 232 """ 233 side_one, side_two, _ = self.arduino_data.load_durations() 234 235 side_one_durs = side_one.tobytes() 236 side_two_durs = side_two.tobytes() 237 238 dur_packet = side_one_durs + side_two_durs 239 240 dur_command = "REC DURATIONS\n".encode("utf-8") 241 242 self.send_command(dur_command) 243 244 self.send_command(dur_packet) 245 246 self.verify_durations(side_one, side_two)
Get side_one and side_two durations from models.arduino_data
Arduino data model
will load from arduino_data.toml from the last_used 'profile'.
248 def verify_schedule( 249 self, side_one: npt.NDArray[np.int8], side_two: npt.NDArray[np.int8] 250 ) -> None: 251 """ 252 Method to tell arduino to give us the schedules that it 253 recieved. It will send data byte by byte in the order that it 254 recieved it (side one schedule, then side two) it will send EXACTLY 255 num_trials * 2 bytes (8 bit / 1 byte int for each trial on each side). 256 if successful we continue execution and log success message. 257 """ 258 try: 259 ver_sched_command = "VER SCHED\n".encode("utf-8") 260 self.send_command(ver_sched_command) 261 262 num_trials = self.exp_data.exp_var_entries["Num Trials"] 263 264 ver1 = np.zeros((num_trials,), dtype=np.int8) 265 ver2 = np.zeros((num_trials,), dtype=np.int8) 266 267 if self.arduino is None: 268 msg = "ARDUINO IS NOT CONNECTED! Try reconnecting and restart the program." 269 logger.error(msg) 270 GUIUtils.display_error("ARDUINO ERROR", msg) 271 272 return 273 274 # wait for the data to arrive 275 while self.arduino.in_waiting < 0: 276 pass 277 278 for i in range(num_trials): 279 ver1[i] = int.from_bytes( 280 self.arduino.read(), byteorder="little", signed=False 281 ) 282 for i in range(num_trials): 283 ver2[i] = int.from_bytes( 284 self.arduino.read(), byteorder="little", signed=False 285 ) 286 logger.info(f"Arduino recieved side one as => {ver1}") 287 logger.info(f"Arduino recieved side two as => {ver2}") 288 289 if np.array_equal(side_one, ver1) and np.array_equal(side_two, ver2): 290 logger.info( 291 "arduino has recieved and verified experiment valve schedule" 292 ) 293 else: 294 GUIUtils.display_error( 295 "======SCHEDULE ERROR======", 296 "ARDUINO did not recieve the correct schedule. Please restart the program and attempt\ 297 schedule generation again.", 298 ) 299 except Exception as e: 300 logger.error(f"error verifying arduino schedule {e}")
Method to tell arduino to give us the schedules that it recieved. It will send data byte by byte in the order that it recieved it (side one schedule, then side two) it will send EXACTLY num_trials * 2 bytes (8 bit / 1 byte int for each trial on each side). if successful we continue execution and log success message.
302 def verify_durations( 303 self, side_one: npt.NDArray[np.int32], side_two: npt.NDArray[np.int32] 304 ) -> None: 305 """ 306 Method to tell arduino to give us the schedules that it 307 recieved. It will send data byte by byte in the order that it 308 recieved it (side one schedule, then side two) it will send EXACTLY 309 num_trials * 2 bytes (8 bit / 1 byte int for each trial on each side). 310 if successful we continue execution and log success message. 311 """ 312 try: 313 ver_dur_command = "VER DURATIONS\n".encode("utf-8") 314 self.send_command(ver_dur_command) 315 316 ver1 = np.zeros((VALVES_PER_SIDE,), dtype=np.int32) 317 ver2 = np.zeros((VALVES_PER_SIDE,), dtype=np.int32) 318 319 if self.arduino is None: 320 msg = "ARDUINO IS NOT CONNECTED! Try reconnecting and restart the program." 321 logger.error(msg) 322 GUIUtils.display_error("ARDUINO ERROR", msg) 323 324 return 325 326 # wait for the data to arrive 327 while self.arduino.in_waiting < 4 * 8: 328 pass 329 for i in range(VALVES_PER_SIDE): 330 duration = int.from_bytes( 331 self.arduino.read(4), byteorder="little", signed=False 332 ) 333 ver1[i] = duration 334 for i in range(VALVES_PER_SIDE): 335 duration = int.from_bytes( 336 self.arduino.read(4), byteorder="little", signed=False 337 ) 338 ver2[i] = duration 339 340 logger.info(f"Arduino recieved side one as => {ver1}") 341 logger.info(f"Arduino recieved side two as => {ver2}") 342 343 if np.array_equal(side_one, ver1) and np.array_equal(side_two, ver2): 344 logger.info("arduino has recieved and verified valve durations") 345 else: 346 GUIUtils.display_error( 347 "======DURATIONS ERROR======", 348 "ARDUINO did not recieve the correct valve durations. Please restart the program and attempt\ 349 schedule generation again.", 350 ) 351 except Exception as e: 352 logger.error(f"error verifying arduino durations -> {e}")
Method to tell arduino to give us the schedules that it recieved. It will send data byte by byte in the order that it recieved it (side one schedule, then side two) it will send EXACTLY num_trials * 2 bytes (8 bit / 1 byte int for each trial on each side). if successful we continue execution and log success message.
354 def send_command(self, command: bytes): 355 """ 356 Send a specific command to the Arduino. Command must be converted to raw bytes object 357 before being passed into this method 358 """ 359 if self.arduino is None: 360 error_message = "Arduino connection was not established. Please reconnect the Arduino board and restart the program." 361 GUIUtils.display_error( 362 "====ARDUINO COMMUNICATION ERROR====:", error_message 363 ) 364 logger.error(error_message) 365 return 366 367 try: 368 self.arduino.write(command) 369 logger.info(f"Sent {command} to arduino on -> {self.arduino.port}: ") 370 except Exception as e: 371 error_message = f"Error sending command to {self.arduino.port} Arduino: {e}" 372 GUIUtils.display_error("Error sending command to Arduino:", error_message) 373 logger.error(error_message)
Send a specific command to the Arduino. Command must be converted to raw bytes object before being passed into this method