From 7e973af192c1cede32e2661264149730fb4698c2 Mon Sep 17 00:00:00 2001 From: Baud Date: Sat, 16 Mar 2024 17:41:16 +0000 Subject: [PATCH] fix: Not ticking --- atem-connection-rs/src/atem.rs | 9 +++++++-- atem-connection-rs/src/atem_lib/atem_socket.rs | 15 ++++++++++----- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/atem-connection-rs/src/atem.rs b/atem-connection-rs/src/atem.rs index 79cb83d..1d9aeb5 100644 --- a/atem-connection-rs/src/atem.rs +++ b/atem-connection-rs/src/atem.rs @@ -68,7 +68,7 @@ impl Atem { AtemEvent::Connected => { log::info!("Atem connected"); } - AtemEvent::Disconnected => todo!(), + AtemEvent::Disconnected => todo!("Disconnected"), AtemEvent::ReceivedCommands(commands) => { self.mutate_state(&mut state, &mut status, commands).await } @@ -142,14 +142,19 @@ impl Atem { for command in commands { match command.raw_name() { DESERIALIZE_VERSION_RAW_NAME => { + log::debug!("Received version response"); *state = AtemState::default(); *status = AtemConnectionStatus::Connecting } - DESERIALIZE_INIT_COMPLETE_RAW_NAME => *status = AtemConnectionStatus::Connected, + DESERIALIZE_INIT_COMPLETE_RAW_NAME => { + log::debug!("Received init complete from ATEM"); + *status = AtemConnectionStatus::Connected + } DESERIALIZE_TIME_RAW_NAME => { todo!("Time command") } _ => { + log::debug!("Applying {} to state", command.raw_name()); command.apply_to_state(state); } } diff --git a/atem-connection-rs/src/atem_lib/atem_socket.rs b/atem-connection-rs/src/atem_lib/atem_socket.rs index 3653852..5476b90 100644 --- a/atem-connection-rs/src/atem_lib/atem_socket.rs +++ b/atem-connection-rs/src/atem_lib/atem_socket.rs @@ -175,9 +175,12 @@ impl AtemSocket { mut atem_message_rx: tokio::sync::mpsc::Receiver, cancel: tokio_util::sync::CancellationToken, ) { + let mut interval = tokio::time::interval(Duration::from_millis(5)); while !cancel.is_cancelled() { + let tick = interval.tick(); select! { _ = cancel.cancelled() => {}, + _ = tick => {}, message = atem_message_rx.recv() => { match message { Some(AtemSocketMessage::Connect { @@ -189,6 +192,7 @@ impl AtemSocket { connected_callbacks.push(result_callback); } if self.connect(address).await.is_err() { + log::debug!("Connect failed"); let mut connected_callbacks = self.connected_callbacks.lock().await; for callback in connected_callbacks.drain(0..) { let _ = callback.send(false); @@ -232,9 +236,9 @@ impl AtemSocket { } } }; - } - self.tick().await; + self.tick().await; + } } pub async fn connect(&mut self, address: SocketAddr) -> Result<(), io::Error> { @@ -393,7 +397,7 @@ impl AtemSocket { self.connection_state = ConnectionState::Established; self.last_received_packed_id = remote_packet_id; self.send_ack(remote_packet_id).await; - self.on_connect(); + self.on_connect().await; return; } @@ -543,9 +547,9 @@ impl AtemSocket { } } - fn on_connect(&mut self) { + async fn on_connect(&mut self) { let _ = self.atem_event_tx.send(AtemEvent::Connected); - let mut connected_callbacks = self.connected_callbacks.blocking_lock(); + let mut connected_callbacks = self.connected_callbacks.lock().await; for callback in connected_callbacks.drain(0..) { let _ = callback.send(false); } @@ -556,6 +560,7 @@ impl AtemSocket { } fn start_timers(&mut self) { + log::debug!("Starting timers"); self.start_reconnect_timer(); self.start_retransmit_timer(); }