From 90b2cfd98491cf600d2d8232664f33aab00fd131 Mon Sep 17 00:00:00 2001 From: Baud Date: Sun, 10 Mar 2024 21:53:30 +0000 Subject: [PATCH] chore: Various cleanups --- atem-connection-rs/src/atem.rs | 51 ++++---- .../src/atem_lib/atem_socket.rs | 110 +++++++++--------- .../src/commands/command_base.rs | 2 +- .../src/commands/device_profile.rs | 4 +- .../src/commands/init_complete.rs | 2 +- .../src/commands/mix_effects/program_input.rs | 2 +- .../src/commands/tally_by_source.rs | 2 +- atem-connection-rs/src/commands/time.rs | 4 +- atem-connection-rs/src/enums/mod.rs | 2 +- atem-connection-rs/src/state/mod.rs | 22 ++-- 10 files changed, 100 insertions(+), 101 deletions(-) diff --git a/atem-connection-rs/src/atem.rs b/atem-connection-rs/src/atem.rs index aacc2cc..79cb83d 100644 --- a/atem-connection-rs/src/atem.rs +++ b/atem-connection-rs/src/atem.rs @@ -4,7 +4,7 @@ use std::{ sync::Arc, }; -use tokio::sync::{mpsc::error::TryRecvError, Semaphore}; +use tokio::{select, sync::Semaphore}; use tokio_util::sync::CancellationToken; use crate::{ @@ -61,33 +61,32 @@ impl Atem { let mut state = AtemState::default(); while !cancel.is_cancelled() { - match atem_event_rx.try_recv() { - Ok(event) => match event { - AtemEvent::Error(_) => todo!(), - AtemEvent::Info(_) => todo!(), - AtemEvent::Debug(_) => todo!(), - AtemEvent::Connected => { - log::info!("Atem connected"); - } - AtemEvent::Disconnected => todo!(), - AtemEvent::ReceivedCommands(commands) => { - self.mutate_state(&mut state, &mut status, commands).await - } - AtemEvent::AckedCommand(tracking_id) => { - log::debug!("Received tracking Id {tracking_id}"); - if let Some(semaphore) = - self.waiting_semaphores.read().await.get(&tracking_id) - { - semaphore.add_permits(1); - } else { - log::warn!("Received tracking Id {tracking_id} with no-one waiting for it to be resolved.") + select! { + _ = cancel.cancelled() => {}, + message = atem_event_rx.recv() => match message { + Some(event) => match event { + AtemEvent::Connected => { + log::info!("Atem connected"); } + AtemEvent::Disconnected => todo!(), + AtemEvent::ReceivedCommands(commands) => { + self.mutate_state(&mut state, &mut status, commands).await + } + AtemEvent::AckedCommand(tracking_id) => { + log::debug!("Received tracking Id {tracking_id}"); + if let Some(semaphore) = + self.waiting_semaphores.read().await.get(&tracking_id) + { + semaphore.add_permits(1); + } else { + log::warn!("Received tracking Id {tracking_id} with no-one waiting for it to be resolved.") + } + } + }, + None => { + log::info!("ATEM event channel has closed, exiting event loop."); + cancel.cancel(); } - }, - Err(TryRecvError::Empty) => {} - Err(TryRecvError::Disconnected) => { - log::info!("ATEM event channel has closed, exiting event loop."); - cancel.cancel(); } } } diff --git a/atem-connection-rs/src/atem_lib/atem_socket.rs b/atem-connection-rs/src/atem_lib/atem_socket.rs index 4c35533..554aa04 100644 --- a/atem-connection-rs/src/atem_lib/atem_socket.rs +++ b/atem-connection-rs/src/atem_lib/atem_socket.rs @@ -9,8 +9,8 @@ use std::{ use tokio::{ net::UdpSocket, + select, sync::{Barrier, Mutex}, - task::yield_now, }; use crate::{ @@ -55,9 +55,6 @@ pub struct TrackingIdsCallback { #[derive(Clone)] pub enum AtemEvent { - Error(String), - Info(String), - Debug(String), Connected, Disconnected, ReceivedCommands(VecDeque>), @@ -179,60 +176,65 @@ impl AtemSocket { cancel: tokio_util::sync::CancellationToken, ) { while !cancel.is_cancelled() { - if let Ok(msg) = atem_message_rx.try_recv() { - match msg { - AtemSocketMessage::Connect { - address, - result_callback, - } => { - { - let mut connected_callbacks = self.connected_callbacks.lock().await; - connected_callbacks.push(result_callback); - } - if self.connect(address).await.is_err() { - let mut connected_callbacks = self.connected_callbacks.lock().await; - for callback in connected_callbacks.drain(0..) { - let _ = callback.send(false); + select! { + _ = cancel.cancelled() => {}, + message = atem_message_rx.recv() => { + match message { + Some(AtemSocketMessage::Connect { + address, + result_callback, + }) => { + { + let mut connected_callbacks = self.connected_callbacks.lock().await; + connected_callbacks.push(result_callback); + } + if self.connect(address).await.is_err() { + let mut connected_callbacks = self.connected_callbacks.lock().await; + for callback in connected_callbacks.drain(0..) { + let _ = callback.send(false); + } } } - } - AtemSocketMessage::Disconnect => self.disconnect(), - AtemSocketMessage::SendCommands { - commands, - tracking_ids_callback, - } => { - let barrier = Arc::new(Barrier::new(2)); - tracking_ids_callback - .send(TrackingIdsCallback { - tracking_ids: self.send_commands(commands).await, - barrier: barrier.clone(), - }) - .ok(); + Some(AtemSocketMessage::Disconnect) => self.disconnect(), + Some(AtemSocketMessage::SendCommands { + commands, + tracking_ids_callback, + }) => { + let barrier = Arc::new(Barrier::new(2)); + tracking_ids_callback + .send(TrackingIdsCallback { + tracking_ids: self.send_commands(commands).await, + barrier: barrier.clone(), + }) + .ok(); - // Let's play the game "Synchronisation Shenanigans"! - // So, we are sending tracking Ids to the sender of this message, the sender will then wait - // for each of these tracking Ids to be ACK'd by the ATEM. However, the sender will need to - // do ✨ some form of shenanigans ✨ in order to be ready to receive tracking Ids. So we send - // them a barrier as part of the callback so that they can tell us that they are ready for - // us to continue with ATEM communication, at which point we may immediately inform them of a - // received tracking Id matching one included in this callback. - // - // Now, if we were being 🚩 Real Proper Software Developers 🚩 we'd probably expect the receiver - // of the callback to do clever things so that if a tracking Id is received immediately, they - // then wait for something that wants that tracking Id on their side, rather than blocking this - // task so that the caller can do ✨ shenanigans ✨. However, that sounds far too clever and too - // much like 🚩 Real Actual Work 🚩 so instead we've chosen to do this and hope that whichever - // actor we're waiting on doesn't take _too_ long to do ✨ shenanigans ✨ before signalling that - // they are ready. If they do, I suggest finding whoever wrote that code and bonking them 🔨. - barrier.wait().await; + // Let's play the game "Synchronisation Shenanigans"! + // So, we are sending tracking Ids to the sender of this message, the sender will then wait + // for each of these tracking Ids to be ACK'd by the ATEM. However, the sender will need to + // do ✨ some form of shenanigans ✨ in order to be ready to receive tracking Ids. So we send + // them a barrier as part of the callback so that they can tell us that they are ready for + // us to continue with ATEM communication, at which point we may immediately inform them of a + // received tracking Id matching one included in this callback. + // + // Now, if we were being 🚩 Real Proper Software Developers 🚩 we'd probably expect the receiver + // of the callback to do clever things so that if a tracking Id is received immediately, they + // then wait for something that wants that tracking Id on their side, rather than blocking this + // task so that the caller can do ✨ shenanigans ✨. However, that sounds far too clever and too + // much like 🚩 Real Actual Work 🚩 so instead we've chosen to do this and hope that whichever + // actor we're waiting on doesn't take _too_ long to do ✨ shenanigans ✨ before signalling that + // they are ready. If they do, I suggest finding whoever wrote that code and bonking them 🔨. + barrier.wait().await; + }, + None => { + log::info!("ATEM message channel has closed, exiting event loop."); + cancel.cancel(); + } } } - - yield_now().await; - } - - self.tick().await; + }; } + + self.tick().await; } pub async fn connect(&mut self, address: SocketAddr) -> Result<(), io::Error> { @@ -316,7 +318,7 @@ impl AtemSocket { async fn restart_connection(&mut self) { self.disconnect(); - self.connect(self.address.clone()).await.ok(); + self.connect(self.address).await.ok(); } async fn tick(&mut self) { @@ -470,7 +472,7 @@ impl AtemSocket { let flag: u8 = PacketFlag::AckReply.into(); let opcode = u16::from(flag) << 11; let mut buffer: [u8; ACK_PACKET_LENGTH as _] = [0; 12]; - buffer[0..2].copy_from_slice(&u16::to_be_bytes(opcode as u16 | ACK_PACKET_LENGTH)); + buffer[0..2].copy_from_slice(&u16::to_be_bytes(opcode | ACK_PACKET_LENGTH)); buffer[2..4].copy_from_slice(&u16::to_be_bytes(self.session_id)); buffer[4..6].copy_from_slice(&u16::to_be_bytes(packet_id)); self.send_packet(&buffer).await; diff --git a/atem-connection-rs/src/commands/command_base.rs b/atem-connection-rs/src/commands/command_base.rs index 768db79..80001e3 100644 --- a/atem-connection-rs/src/commands/command_base.rs +++ b/atem-connection-rs/src/commands/command_base.rs @@ -4,7 +4,7 @@ use crate::{enums::ProtocolVersion, state::AtemState}; pub trait DeserializedCommand: Send + Sync + Debug { fn raw_name(&self) -> &'static str; - fn apply_to_state(&self, state: &mut AtemState) -> bool; + fn apply_to_state(&self, state: &mut AtemState); } pub trait CommandDeserializer: Send + Sync { diff --git a/atem-connection-rs/src/commands/device_profile.rs b/atem-connection-rs/src/commands/device_profile.rs index b60d968..910c1c9 100644 --- a/atem-connection-rs/src/commands/device_profile.rs +++ b/atem-connection-rs/src/commands/device_profile.rs @@ -16,8 +16,8 @@ impl DeserializedCommand for VersionCommand { DESERIALIZE_VERSION_RAW_NAME } - fn apply_to_state(&self, state: &mut crate::state::AtemState) -> bool { - todo!("Apply to state: Version") + fn apply_to_state(&self, state: &mut crate::state::AtemState) { + state.info.api_version = self.version; } } diff --git a/atem-connection-rs/src/commands/init_complete.rs b/atem-connection-rs/src/commands/init_complete.rs index 85cec9c..43b934e 100644 --- a/atem-connection-rs/src/commands/init_complete.rs +++ b/atem-connection-rs/src/commands/init_complete.rs @@ -12,7 +12,7 @@ impl DeserializedCommand for InitComplete { DESERIALIZE_INIT_COMPLETE_RAW_NAME } - fn apply_to_state(&self, state: &mut crate::state::AtemState) -> bool { + fn apply_to_state(&self, state: &mut crate::state::AtemState) { todo!("Apply to state: Init Complete") } } diff --git a/atem-connection-rs/src/commands/mix_effects/program_input.rs b/atem-connection-rs/src/commands/mix_effects/program_input.rs index 0253a59..c51eb82 100644 --- a/atem-connection-rs/src/commands/mix_effects/program_input.rs +++ b/atem-connection-rs/src/commands/mix_effects/program_input.rs @@ -37,7 +37,7 @@ impl DeserializedCommand for ProgramInput { DESERIALIZE_PROGRAM_INPUT_RAW_NAME } - fn apply_to_state(&self, state: &mut crate::state::AtemState) -> bool { + fn apply_to_state(&self, state: &mut crate::state::AtemState) { todo!("Apply to state: Program Input") } } diff --git a/atem-connection-rs/src/commands/tally_by_source.rs b/atem-connection-rs/src/commands/tally_by_source.rs index 25c00c9..e0e2764 100644 --- a/atem-connection-rs/src/commands/tally_by_source.rs +++ b/atem-connection-rs/src/commands/tally_by_source.rs @@ -20,7 +20,7 @@ impl DeserializedCommand for TallyBySource { DESERIALIZE_TALLY_BY_SOURCE_RAW_NAME } - fn apply_to_state(&self, state: &mut crate::state::AtemState) -> bool { + fn apply_to_state(&self, state: &mut crate::state::AtemState) { todo!("Apply to state: Tally By Source") } } diff --git a/atem-connection-rs/src/commands/time.rs b/atem-connection-rs/src/commands/time.rs index dd108c8..f7604e7 100644 --- a/atem-connection-rs/src/commands/time.rs +++ b/atem-connection-rs/src/commands/time.rs @@ -23,9 +23,7 @@ impl DeserializedCommand for Time { DESERIALIZE_TIME_RAW_NAME } - fn apply_to_state(&self, state: &mut crate::state::AtemState) -> bool { - false - } + fn apply_to_state(&self, state: &mut crate::state::AtemState) {} } #[derive(Default)] diff --git a/atem-connection-rs/src/enums/mod.rs b/atem-connection-rs/src/enums/mod.rs index cd531ce..e17d3ae 100644 --- a/atem-connection-rs/src/enums/mod.rs +++ b/atem-connection-rs/src/enums/mod.rs @@ -21,7 +21,7 @@ pub enum Model { MiniExtremeISO = 0x11, } -#[derive(Debug, Default, Clone, PartialEq)] +#[derive(Debug, Default, Clone, Copy, PartialEq)] pub enum ProtocolVersion { #[default] Unknown = 0, diff --git a/atem-connection-rs/src/state/mod.rs b/atem-connection-rs/src/state/mod.rs index 143a873..407a300 100644 --- a/atem-connection-rs/src/state/mod.rs +++ b/atem-connection-rs/src/state/mod.rs @@ -16,16 +16,16 @@ pub mod video; #[derive(Default, Clone, PartialEq)] pub struct AtemState { - info: info::DeviceInfo, - video: video::AtemVideoState, - audio: Option, - fairlight: Option, - media: media::MediaState, - inputs: HashMap, + pub info: info::DeviceInfo, + pub video: video::AtemVideoState, + pub audio: Option, + pub fairlight: Option, + pub media: media::MediaState, + pub inputs: HashMap, // macro is a rust keyword - atem_macro: atem_macro::MacroState, - settings: settings::SettingsState, - recording: Option, - streaming: Option, - color_generators: HashMap, + pub atem_macro: atem_macro::MacroState, + pub settings: settings::SettingsState, + pub recording: Option, + pub streaming: Option, + pub color_generators: HashMap, }