From 676ff7630e705b56f11b6425fb59154cec575d31 Mon Sep 17 00:00:00 2001 From: Baud Date: Mon, 4 Mar 2024 21:33:46 +0000 Subject: [PATCH] feat: Wait for connect --- atem-connection-rs/src/atem.rs | 4 +-- .../src/atem_lib/atem_socket.rs | 25 ++++++++++++++++--- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/atem-connection-rs/src/atem.rs b/atem-connection-rs/src/atem.rs index f50f504..f4d21b0 100644 --- a/atem-connection-rs/src/atem.rs +++ b/atem-connection-rs/src/atem.rs @@ -21,7 +21,7 @@ impl Atem { } } - pub async fn connect(&self, address: SocketAddr) { + pub async fn connect(&self, address: SocketAddr) -> bool { let (callback_tx, callback_rx) = tokio::sync::oneshot::channel(); self.socket_message_tx .send(AtemSocketMessage::Connect { @@ -31,7 +31,7 @@ impl Atem { .await .unwrap(); - callback_rx.await.unwrap().unwrap(); + callback_rx.await.unwrap() } pub async fn run( diff --git a/atem-connection-rs/src/atem_lib/atem_socket.rs b/atem-connection-rs/src/atem_lib/atem_socket.rs index 612d262..bf85e2b 100644 --- a/atem-connection-rs/src/atem_lib/atem_socket.rs +++ b/atem-connection-rs/src/atem_lib/atem_socket.rs @@ -6,7 +6,11 @@ use std::{ time::{Duration, SystemTime}, }; -use tokio::{net::UdpSocket, sync::Barrier, task::yield_now}; +use tokio::{ + net::UdpSocket, + sync::{Barrier, Mutex}, + task::yield_now, +}; use crate::{ atem_lib::{atem_packet::AtemPacket, atem_util}, @@ -34,7 +38,7 @@ const ACK_PACKET_LENGTH: u16 = 12; pub enum AtemSocketMessage { Connect { address: SocketAddr, - result_callback: tokio::sync::oneshot::Sender>, + result_callback: tokio::sync::oneshot::Sender, }, Disconnect, SendCommands { @@ -108,6 +112,7 @@ pub struct AtemSocket { received_without_ack: u16, atem_event_tx: tokio::sync::mpsc::UnboundedSender, + connected_callbacks: Mutex>>, } #[derive(PartialEq, Clone)] @@ -163,6 +168,7 @@ impl AtemSocket { received_without_ack: 0, atem_event_tx, + connected_callbacks: Mutex::default(), } } @@ -178,7 +184,16 @@ impl AtemSocket { address, result_callback, } => { - result_callback.send(self.connect(address).await).ok(); + { + let mut connected_callbacks = self.connected_callbacks.lock().await; + connected_callbacks.push(result_callback); + } + if self.connect(address).await.is_err() { + let mut connected_callbacks = self.connected_callbacks.lock().await; + for callback in connected_callbacks.drain(0..) { + let _ = callback.send(false); + } + } } AtemSocketMessage::Disconnect => self.disconnect(), AtemSocketMessage::SendCommands { @@ -523,6 +538,10 @@ impl AtemSocket { fn on_connect(&mut self) { let _ = self.atem_event_tx.send(AtemEvent::Connected); + let mut connected_callbacks = self.connected_callbacks.blocking_lock(); + for callback in connected_callbacks.drain(0..) { + let _ = callback.send(false); + } } fn on_disconnect(&mut self) {