chore: Various cleanups

This commit is contained in:
Baud 2024-03-10 21:53:30 +00:00
parent 2bbf2c5c6b
commit 90b2cfd984
10 changed files with 100 additions and 101 deletions

View File

@ -4,7 +4,7 @@ use std::{
sync::Arc, sync::Arc,
}; };
use tokio::sync::{mpsc::error::TryRecvError, Semaphore}; use tokio::{select, sync::Semaphore};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use crate::{ use crate::{
@ -61,33 +61,32 @@ impl Atem {
let mut state = AtemState::default(); let mut state = AtemState::default();
while !cancel.is_cancelled() { while !cancel.is_cancelled() {
match atem_event_rx.try_recv() { select! {
Ok(event) => match event { _ = cancel.cancelled() => {},
AtemEvent::Error(_) => todo!(), message = atem_event_rx.recv() => match message {
AtemEvent::Info(_) => todo!(), Some(event) => match event {
AtemEvent::Debug(_) => todo!(), AtemEvent::Connected => {
AtemEvent::Connected => { log::info!("Atem connected");
log::info!("Atem connected");
}
AtemEvent::Disconnected => todo!(),
AtemEvent::ReceivedCommands(commands) => {
self.mutate_state(&mut state, &mut status, commands).await
}
AtemEvent::AckedCommand(tracking_id) => {
log::debug!("Received tracking Id {tracking_id}");
if let Some(semaphore) =
self.waiting_semaphores.read().await.get(&tracking_id)
{
semaphore.add_permits(1);
} else {
log::warn!("Received tracking Id {tracking_id} with no-one waiting for it to be resolved.")
} }
AtemEvent::Disconnected => todo!(),
AtemEvent::ReceivedCommands(commands) => {
self.mutate_state(&mut state, &mut status, commands).await
}
AtemEvent::AckedCommand(tracking_id) => {
log::debug!("Received tracking Id {tracking_id}");
if let Some(semaphore) =
self.waiting_semaphores.read().await.get(&tracking_id)
{
semaphore.add_permits(1);
} else {
log::warn!("Received tracking Id {tracking_id} with no-one waiting for it to be resolved.")
}
}
},
None => {
log::info!("ATEM event channel has closed, exiting event loop.");
cancel.cancel();
} }
},
Err(TryRecvError::Empty) => {}
Err(TryRecvError::Disconnected) => {
log::info!("ATEM event channel has closed, exiting event loop.");
cancel.cancel();
} }
} }
} }

View File

@ -9,8 +9,8 @@ use std::{
use tokio::{ use tokio::{
net::UdpSocket, net::UdpSocket,
select,
sync::{Barrier, Mutex}, sync::{Barrier, Mutex},
task::yield_now,
}; };
use crate::{ use crate::{
@ -55,9 +55,6 @@ pub struct TrackingIdsCallback {
#[derive(Clone)] #[derive(Clone)]
pub enum AtemEvent { pub enum AtemEvent {
Error(String),
Info(String),
Debug(String),
Connected, Connected,
Disconnected, Disconnected,
ReceivedCommands(VecDeque<Arc<dyn DeserializedCommand>>), ReceivedCommands(VecDeque<Arc<dyn DeserializedCommand>>),
@ -179,60 +176,65 @@ impl AtemSocket {
cancel: tokio_util::sync::CancellationToken, cancel: tokio_util::sync::CancellationToken,
) { ) {
while !cancel.is_cancelled() { while !cancel.is_cancelled() {
if let Ok(msg) = atem_message_rx.try_recv() { select! {
match msg { _ = cancel.cancelled() => {},
AtemSocketMessage::Connect { message = atem_message_rx.recv() => {
address, match message {
result_callback, Some(AtemSocketMessage::Connect {
} => { address,
{ result_callback,
let mut connected_callbacks = self.connected_callbacks.lock().await; }) => {
connected_callbacks.push(result_callback); {
} let mut connected_callbacks = self.connected_callbacks.lock().await;
if self.connect(address).await.is_err() { connected_callbacks.push(result_callback);
let mut connected_callbacks = self.connected_callbacks.lock().await; }
for callback in connected_callbacks.drain(0..) { if self.connect(address).await.is_err() {
let _ = callback.send(false); let mut connected_callbacks = self.connected_callbacks.lock().await;
for callback in connected_callbacks.drain(0..) {
let _ = callback.send(false);
}
} }
} }
} Some(AtemSocketMessage::Disconnect) => self.disconnect(),
AtemSocketMessage::Disconnect => self.disconnect(), Some(AtemSocketMessage::SendCommands {
AtemSocketMessage::SendCommands { commands,
commands, tracking_ids_callback,
tracking_ids_callback, }) => {
} => { let barrier = Arc::new(Barrier::new(2));
let barrier = Arc::new(Barrier::new(2)); tracking_ids_callback
tracking_ids_callback .send(TrackingIdsCallback {
.send(TrackingIdsCallback { tracking_ids: self.send_commands(commands).await,
tracking_ids: self.send_commands(commands).await, barrier: barrier.clone(),
barrier: barrier.clone(), })
}) .ok();
.ok();
// Let's play the game "Synchronisation Shenanigans"! // Let's play the game "Synchronisation Shenanigans"!
// So, we are sending tracking Ids to the sender of this message, the sender will then wait // So, we are sending tracking Ids to the sender of this message, the sender will then wait
// for each of these tracking Ids to be ACK'd by the ATEM. However, the sender will need to // for each of these tracking Ids to be ACK'd by the ATEM. However, the sender will need to
// do ✨ some form of shenanigans ✨ in order to be ready to receive tracking Ids. So we send // do ✨ some form of shenanigans ✨ in order to be ready to receive tracking Ids. So we send
// them a barrier as part of the callback so that they can tell us that they are ready for // them a barrier as part of the callback so that they can tell us that they are ready for
// us to continue with ATEM communication, at which point we may immediately inform them of a // us to continue with ATEM communication, at which point we may immediately inform them of a
// received tracking Id matching one included in this callback. // received tracking Id matching one included in this callback.
// //
// Now, if we were being 🚩 Real Proper Software Developers 🚩 we'd probably expect the receiver // Now, if we were being 🚩 Real Proper Software Developers 🚩 we'd probably expect the receiver
// of the callback to do clever things so that if a tracking Id is received immediately, they // of the callback to do clever things so that if a tracking Id is received immediately, they
// then wait for something that wants that tracking Id on their side, rather than blocking this // then wait for something that wants that tracking Id on their side, rather than blocking this
// task so that the caller can do ✨ shenanigans ✨. However, that sounds far too clever and too // task so that the caller can do ✨ shenanigans ✨. However, that sounds far too clever and too
// much like 🚩 Real Actual Work 🚩 so instead we've chosen to do this and hope that whichever // much like 🚩 Real Actual Work 🚩 so instead we've chosen to do this and hope that whichever
// actor we're waiting on doesn't take _too_ long to do ✨ shenanigans ✨ before signalling that // actor we're waiting on doesn't take _too_ long to do ✨ shenanigans ✨ before signalling that
// they are ready. If they do, I suggest finding whoever wrote that code and bonking them 🔨. // they are ready. If they do, I suggest finding whoever wrote that code and bonking them 🔨.
barrier.wait().await; barrier.wait().await;
},
None => {
log::info!("ATEM message channel has closed, exiting event loop.");
cancel.cancel();
}
} }
} }
};
yield_now().await;
}
self.tick().await;
} }
self.tick().await;
} }
pub async fn connect(&mut self, address: SocketAddr) -> Result<(), io::Error> { pub async fn connect(&mut self, address: SocketAddr) -> Result<(), io::Error> {
@ -316,7 +318,7 @@ impl AtemSocket {
async fn restart_connection(&mut self) { async fn restart_connection(&mut self) {
self.disconnect(); self.disconnect();
self.connect(self.address.clone()).await.ok(); self.connect(self.address).await.ok();
} }
async fn tick(&mut self) { async fn tick(&mut self) {
@ -470,7 +472,7 @@ impl AtemSocket {
let flag: u8 = PacketFlag::AckReply.into(); let flag: u8 = PacketFlag::AckReply.into();
let opcode = u16::from(flag) << 11; let opcode = u16::from(flag) << 11;
let mut buffer: [u8; ACK_PACKET_LENGTH as _] = [0; 12]; let mut buffer: [u8; ACK_PACKET_LENGTH as _] = [0; 12];
buffer[0..2].copy_from_slice(&u16::to_be_bytes(opcode as u16 | ACK_PACKET_LENGTH)); buffer[0..2].copy_from_slice(&u16::to_be_bytes(opcode | ACK_PACKET_LENGTH));
buffer[2..4].copy_from_slice(&u16::to_be_bytes(self.session_id)); buffer[2..4].copy_from_slice(&u16::to_be_bytes(self.session_id));
buffer[4..6].copy_from_slice(&u16::to_be_bytes(packet_id)); buffer[4..6].copy_from_slice(&u16::to_be_bytes(packet_id));
self.send_packet(&buffer).await; self.send_packet(&buffer).await;

View File

@ -4,7 +4,7 @@ use crate::{enums::ProtocolVersion, state::AtemState};
pub trait DeserializedCommand: Send + Sync + Debug { pub trait DeserializedCommand: Send + Sync + Debug {
fn raw_name(&self) -> &'static str; fn raw_name(&self) -> &'static str;
fn apply_to_state(&self, state: &mut AtemState) -> bool; fn apply_to_state(&self, state: &mut AtemState);
} }
pub trait CommandDeserializer: Send + Sync { pub trait CommandDeserializer: Send + Sync {

View File

@ -16,8 +16,8 @@ impl DeserializedCommand for VersionCommand {
DESERIALIZE_VERSION_RAW_NAME DESERIALIZE_VERSION_RAW_NAME
} }
fn apply_to_state(&self, state: &mut crate::state::AtemState) -> bool { fn apply_to_state(&self, state: &mut crate::state::AtemState) {
todo!("Apply to state: Version") state.info.api_version = self.version;
} }
} }

View File

@ -12,7 +12,7 @@ impl DeserializedCommand for InitComplete {
DESERIALIZE_INIT_COMPLETE_RAW_NAME DESERIALIZE_INIT_COMPLETE_RAW_NAME
} }
fn apply_to_state(&self, state: &mut crate::state::AtemState) -> bool { fn apply_to_state(&self, state: &mut crate::state::AtemState) {
todo!("Apply to state: Init Complete") todo!("Apply to state: Init Complete")
} }
} }

View File

@ -37,7 +37,7 @@ impl DeserializedCommand for ProgramInput {
DESERIALIZE_PROGRAM_INPUT_RAW_NAME DESERIALIZE_PROGRAM_INPUT_RAW_NAME
} }
fn apply_to_state(&self, state: &mut crate::state::AtemState) -> bool { fn apply_to_state(&self, state: &mut crate::state::AtemState) {
todo!("Apply to state: Program Input") todo!("Apply to state: Program Input")
} }
} }

View File

@ -20,7 +20,7 @@ impl DeserializedCommand for TallyBySource {
DESERIALIZE_TALLY_BY_SOURCE_RAW_NAME DESERIALIZE_TALLY_BY_SOURCE_RAW_NAME
} }
fn apply_to_state(&self, state: &mut crate::state::AtemState) -> bool { fn apply_to_state(&self, state: &mut crate::state::AtemState) {
todo!("Apply to state: Tally By Source") todo!("Apply to state: Tally By Source")
} }
} }

View File

@ -23,9 +23,7 @@ impl DeserializedCommand for Time {
DESERIALIZE_TIME_RAW_NAME DESERIALIZE_TIME_RAW_NAME
} }
fn apply_to_state(&self, state: &mut crate::state::AtemState) -> bool { fn apply_to_state(&self, state: &mut crate::state::AtemState) {}
false
}
} }
#[derive(Default)] #[derive(Default)]

View File

@ -21,7 +21,7 @@ pub enum Model {
MiniExtremeISO = 0x11, MiniExtremeISO = 0x11,
} }
#[derive(Debug, Default, Clone, PartialEq)] #[derive(Debug, Default, Clone, Copy, PartialEq)]
pub enum ProtocolVersion { pub enum ProtocolVersion {
#[default] #[default]
Unknown = 0, Unknown = 0,

View File

@ -16,16 +16,16 @@ pub mod video;
#[derive(Default, Clone, PartialEq)] #[derive(Default, Clone, PartialEq)]
pub struct AtemState { pub struct AtemState {
info: info::DeviceInfo, pub info: info::DeviceInfo,
video: video::AtemVideoState, pub video: video::AtemVideoState,
audio: Option<audio::AtemClassicAudioState>, pub audio: Option<audio::AtemClassicAudioState>,
fairlight: Option<fairlight::AtemFairlightAudioState>, pub fairlight: Option<fairlight::AtemFairlightAudioState>,
media: media::MediaState, pub media: media::MediaState,
inputs: HashMap<u64, input::InputChannel>, pub inputs: HashMap<u64, input::InputChannel>,
// macro is a rust keyword // macro is a rust keyword
atem_macro: atem_macro::MacroState, pub atem_macro: atem_macro::MacroState,
settings: settings::SettingsState, pub settings: settings::SettingsState,
recording: Option<recording::RecordingState>, pub recording: Option<recording::RecordingState>,
streaming: Option<streaming::StreamingState>, pub streaming: Option<streaming::StreamingState>,
color_generators: HashMap<u64, color::ColorGeneratorState>, pub color_generators: HashMap<u64, color::ColorGeneratorState>,
} }