From 4a075c3d1ea7007331ad39ddd45c987ab660405c Mon Sep 17 00:00:00 2001 From: Baud Date: Fri, 8 Mar 2024 17:43:53 +0000 Subject: [PATCH] wip: handle received commands --- atem-connection-rs/src/atem.rs | 50 +++++++++++++++++-- .../src/atem_lib/atem_packet.rs | 10 ++-- .../src/atem_lib/atem_socket.rs | 11 ++-- .../src/commands/command_base.rs | 1 + .../src/commands/device_profile.rs | 34 +++++++++++++ .../src/commands/init_complete.rs | 27 ++++++++++ .../src/commands/mix_effects/program_input.rs | 6 +++ atem-connection-rs/src/commands/mod.rs | 3 ++ .../src/commands/parse_commands.rs | 14 ++++-- .../src/commands/tally_by_source.rs | 6 +++ atem-connection-rs/src/commands/time.rs | 49 ++++++++++++++++++ atem-connection-rs/src/enums/mod.rs | 18 ++++++- .../src/state/video/upstream_keyers.rs | 2 +- 13 files changed, 216 insertions(+), 15 deletions(-) create mode 100644 atem-connection-rs/src/commands/device_profile.rs create mode 100644 atem-connection-rs/src/commands/init_complete.rs create mode 100644 atem-connection-rs/src/commands/time.rs diff --git a/atem-connection-rs/src/atem.rs b/atem-connection-rs/src/atem.rs index f4d21b0..7468406 100644 --- a/atem-connection-rs/src/atem.rs +++ b/atem-connection-rs/src/atem.rs @@ -1,13 +1,31 @@ -use std::{collections::HashMap, net::SocketAddr, sync::Arc}; +use std::{ + collections::{HashMap, VecDeque}, + net::SocketAddr, + sync::Arc, +}; use tokio::sync::{mpsc::error::TryRecvError, Semaphore}; use tokio_util::sync::CancellationToken; use crate::{ atem_lib::atem_socket::{AtemEvent, AtemSocketCommand, AtemSocketMessage, TrackingId}, - commands::command_base::BasicWritableCommand, + commands::{ + command_base::{BasicWritableCommand, DeserializedCommand}, + device_profile::DESERIALIZE_VERSION_RAW_NAME, + init_complete::DESERIALIZE_INIT_COMPLETE_RAW_NAME, + time::DESERIALIZE_TIME_RAW_NAME, + }, + state::AtemState, }; +#[derive(Default)] +pub enum AtemConnectionStatus { + #[default] + Closed, + Connecting, + Connected, +} + pub struct Atem { waiting_semaphores: tokio::sync::RwLock>>, socket_message_tx: tokio::sync::mpsc::Sender, @@ -39,6 +57,9 @@ impl Atem { mut atem_event_rx: tokio::sync::mpsc::UnboundedReceiver, cancel: CancellationToken, ) { + let mut status = AtemConnectionStatus::default(); + let mut state = AtemState::default(); + while !cancel.is_cancelled() { match atem_event_rx.try_recv() { Ok(event) => match event { @@ -49,7 +70,9 @@ impl Atem { log::info!("Atem connected"); } AtemEvent::Disconnected => todo!(), - AtemEvent::ReceivedCommand(_) => todo!(), + AtemEvent::ReceivedCommands(commands) => { + self.mutate_state(&mut state, &mut status, commands).await + } AtemEvent::AckedCommand(tracking_id) => { log::debug!("Received tracking Id {tracking_id}"); if let Some(semaphore) = @@ -109,4 +132,25 @@ impl Atem { self.waiting_semaphores.write().await.remove(tracking_id); } } + + async fn mutate_state( + &self, + state: &mut AtemState, + status: &mut AtemConnectionStatus, + commands: VecDeque>, + ) { + for command in commands { + match command.raw_name() { + DESERIALIZE_VERSION_RAW_NAME => { + *state = AtemState::default(); + *status = AtemConnectionStatus::Connecting + } + DESERIALIZE_INIT_COMPLETE_RAW_NAME => *status = AtemConnectionStatus::Connected, + DESERIALIZE_TIME_RAW_NAME => { + todo!("Time command") + } + _ => {} + } + } + } } diff --git a/atem-connection-rs/src/atem_lib/atem_packet.rs b/atem-connection-rs/src/atem_lib/atem_packet.rs index 2220788..4c562d1 100755 --- a/atem-connection-rs/src/atem_lib/atem_packet.rs +++ b/atem-connection-rs/src/atem_lib/atem_packet.rs @@ -6,7 +6,7 @@ pub struct AtemPacket<'packet_buffer> { remote_packet_id: u16, retransmit_requested_from_packet_id: Option, ack_reply: Option, - body: &'packet_buffer [u8], + body: Option<&'packet_buffer [u8]>, } pub enum AtemPacketErr { @@ -48,7 +48,7 @@ impl<'packet_buffer> AtemPacket<'packet_buffer> { self.remote_packet_id } - pub fn body(&self) -> &[u8] { + pub fn body(&self) -> Option<&[u8]> { self.body } @@ -89,7 +89,11 @@ impl<'packet_buffer> TryFrom<&'packet_buffer [u8]> for AtemPacket<'packet_buffer let session_id = u16::from_be_bytes([buffer[2], buffer[3]]); let remote_packet_id = u16::from_be_bytes([buffer[10], buffer[11]]); - let body = &buffer[12..]; + let body = if buffer.len() > 12 { + Some(&buffer[12..]) + } else { + None + }; let retransmit_requested_from_packet_id = if flags & u8::from(PacketFlag::RetransmitRequest) > 0 { diff --git a/atem-connection-rs/src/atem_lib/atem_socket.rs b/atem-connection-rs/src/atem_lib/atem_socket.rs index bf85e2b..4c35533 100644 --- a/atem-connection-rs/src/atem_lib/atem_socket.rs +++ b/atem-connection-rs/src/atem_lib/atem_socket.rs @@ -1,4 +1,5 @@ use std::{ + collections::VecDeque, fmt::Display, io, net::SocketAddr, @@ -59,7 +60,7 @@ pub enum AtemEvent { Debug(String), Connected, Disconnected, - ReceivedCommand(Arc), + ReceivedCommands(VecDeque>), AckedCommand(TrackingId), } @@ -406,8 +407,8 @@ impl AtemSocket { self.last_received_packed_id = remote_packet_id; self.send_or_queue_ack().await; - if atem_packet.length() > 12 { - self.on_commands_received(atem_packet.body()); + if let Some(body) = atem_packet.body() { + self.on_commands_received(body); } } else if self .is_packet_covered_by_ack(self.last_received_packed_id, remote_packet_id) @@ -526,6 +527,10 @@ impl AtemSocket { fn on_commands_received(&mut self, payload: &[u8]) { let commands = deserialize_commands(payload); + + let _ = self + .atem_event_tx + .send(AtemEvent::ReceivedCommands(commands)); } fn on_command_acknowledged(&mut self, packets: Vec) { diff --git a/atem-connection-rs/src/commands/command_base.rs b/atem-connection-rs/src/commands/command_base.rs index 0fb2b5c..a7940d9 100644 --- a/atem-connection-rs/src/commands/command_base.rs +++ b/atem-connection-rs/src/commands/command_base.rs @@ -3,6 +3,7 @@ use std::{collections::HashMap, fmt::Debug, sync::Arc}; use crate::{enums::ProtocolVersion, state::AtemState}; pub trait DeserializedCommand: Send + Sync + Debug { + fn raw_name(&self) -> &'static str; fn apply_to_state(&self, state: &mut AtemState) -> Vec; } diff --git a/atem-connection-rs/src/commands/device_profile.rs b/atem-connection-rs/src/commands/device_profile.rs new file mode 100644 index 0000000..9fc9c73 --- /dev/null +++ b/atem-connection-rs/src/commands/device_profile.rs @@ -0,0 +1,34 @@ +use std::sync::Arc; + +use crate::enums::ProtocolVersion; + +use super::command_base::{CommandDeserializer, DeserializedCommand}; + +pub const DESERIALIZE_VERSION_RAW_NAME: &str = "_ver"; + +#[derive(Debug)] +pub struct VersionCommand { + pub version: ProtocolVersion, +} + +impl DeserializedCommand for VersionCommand { + fn raw_name(&self) -> &'static str { + DESERIALIZE_VERSION_RAW_NAME + } + + fn apply_to_state(&self, state: &mut crate::state::AtemState) -> Vec { + todo!() + } +} + +#[derive(Default)] +pub struct VersionCommandDeserializer {} + +impl CommandDeserializer for VersionCommandDeserializer { + fn deserialize(&self, buffer: &[u8]) -> std::sync::Arc { + let version = u32::from_be_bytes([buffer[0], buffer[1], buffer[2], buffer[3]]); + let version: ProtocolVersion = version.try_into().expect("Invalid protocol version"); + + Arc::new(VersionCommand { version }) + } +} diff --git a/atem-connection-rs/src/commands/init_complete.rs b/atem-connection-rs/src/commands/init_complete.rs new file mode 100644 index 0000000..71df0c3 --- /dev/null +++ b/atem-connection-rs/src/commands/init_complete.rs @@ -0,0 +1,27 @@ +use std::sync::Arc; + +use super::command_base::{CommandDeserializer, DeserializedCommand}; + +pub const DESERIALIZE_INIT_COMPLETE_RAW_NAME: &str = "InCm"; + +#[derive(Debug)] +pub struct InitComplete {} + +impl DeserializedCommand for InitComplete { + fn raw_name(&self) -> &'static str { + DESERIALIZE_INIT_COMPLETE_RAW_NAME + } + + fn apply_to_state(&self, state: &mut crate::state::AtemState) -> Vec { + todo!() + } +} + +#[derive(Default)] +pub struct InitCompleteDeserializer {} + +impl CommandDeserializer for InitCompleteDeserializer { + fn deserialize(&self, buffer: &[u8]) -> std::sync::Arc { + Arc::new(InitComplete {}) + } +} diff --git a/atem-connection-rs/src/commands/mix_effects/program_input.rs b/atem-connection-rs/src/commands/mix_effects/program_input.rs index 044a6fd..dd25fa7 100644 --- a/atem-connection-rs/src/commands/mix_effects/program_input.rs +++ b/atem-connection-rs/src/commands/mix_effects/program_input.rs @@ -4,6 +4,8 @@ use crate::commands::command_base::{ BasicWritableCommand, CommandDeserializer, DeserializedCommand, SerializableCommand, }; +pub const DESERIALIZE_PROGRAM_INPUT_RAW_NAME: &str = "PrgI"; + #[derive(Debug, new)] pub struct ProgramInput { pub mix_effect: u8, @@ -31,6 +33,10 @@ impl BasicWritableCommand for ProgramInput { } impl DeserializedCommand for ProgramInput { + fn raw_name(&self) -> &'static str { + DESERIALIZE_PROGRAM_INPUT_RAW_NAME + } + fn apply_to_state(&self, state: &mut crate::state::AtemState) -> Vec { todo!() } diff --git a/atem-connection-rs/src/commands/mod.rs b/atem-connection-rs/src/commands/mod.rs index f8c64e4..8de1246 100644 --- a/atem-connection-rs/src/commands/mod.rs +++ b/atem-connection-rs/src/commands/mod.rs @@ -1,4 +1,7 @@ pub mod command_base; +pub mod device_profile; +pub mod init_complete; pub mod mix_effects; pub mod parse_commands; pub mod tally_by_source; +pub mod time; diff --git a/atem-connection-rs/src/commands/parse_commands.rs b/atem-connection-rs/src/commands/parse_commands.rs index 2ad4aec..222c256 100644 --- a/atem-connection-rs/src/commands/parse_commands.rs +++ b/atem-connection-rs/src/commands/parse_commands.rs @@ -2,8 +2,11 @@ use std::{collections::VecDeque, sync::Arc}; use super::{ command_base::{CommandDeserializer, DeserializedCommand}, - mix_effects::program_input::ProgramInputDeserializer, - tally_by_source::TallyBySourceDeserializer, + device_profile::{VersionCommandDeserializer, DESERIALIZE_VERSION_RAW_NAME}, + init_complete::{InitCompleteDeserializer, DESERIALIZE_INIT_COMPLETE_RAW_NAME}, + mix_effects::program_input::{ProgramInputDeserializer, DESERIALIZE_PROGRAM_INPUT_RAW_NAME}, + tally_by_source::{TallyBySourceDeserializer, DESERIALIZE_TALLY_BY_SOURCE_RAW_NAME}, + time::{TimeDeserializer, DESERIALIZE_TIME_RAW_NAME}, }; pub fn deserialize_commands(payload: &[u8]) -> VecDeque> { @@ -36,8 +39,11 @@ pub fn deserialize_commands(payload: &[u8]) -> VecDeque Option> { match command_str { - "PrgI" => Some(Box::::default()), - "TlSr" => Some(Box::::default()), + DESERIALIZE_VERSION_RAW_NAME => Some(Box::::default()), + DESERIALIZE_INIT_COMPLETE_RAW_NAME => Some(Box::::default()), + DESERIALIZE_PROGRAM_INPUT_RAW_NAME => Some(Box::::default()), + DESERIALIZE_TALLY_BY_SOURCE_RAW_NAME => Some(Box::::default()), + DESERIALIZE_TIME_RAW_NAME => Some(Box::::default()), _ => None, } } diff --git a/atem-connection-rs/src/commands/tally_by_source.rs b/atem-connection-rs/src/commands/tally_by_source.rs index 8d3eeb8..07dd38d 100644 --- a/atem-connection-rs/src/commands/tally_by_source.rs +++ b/atem-connection-rs/src/commands/tally_by_source.rs @@ -2,6 +2,8 @@ use std::{collections::HashMap, sync::Arc}; use super::command_base::{CommandDeserializer, DeserializedCommand}; +pub const DESERIALIZE_TALLY_BY_SOURCE_RAW_NAME: &str = "TlSr"; + #[derive(Debug)] pub struct TallySource { pub program: bool, @@ -14,6 +16,10 @@ pub struct TallyBySource { } impl DeserializedCommand for TallyBySource { + fn raw_name(&self) -> &'static str { + DESERIALIZE_TALLY_BY_SOURCE_RAW_NAME + } + fn apply_to_state(&self, state: &mut crate::state::AtemState) -> Vec { todo!() } diff --git a/atem-connection-rs/src/commands/time.rs b/atem-connection-rs/src/commands/time.rs new file mode 100644 index 0000000..f88447d --- /dev/null +++ b/atem-connection-rs/src/commands/time.rs @@ -0,0 +1,49 @@ +use std::sync::Arc; + +use super::command_base::{CommandDeserializer, DeserializedCommand}; + +pub const DESERIALIZE_TIME_RAW_NAME: &str = "Time"; + +#[derive(Debug)] +pub struct TimeInfo { + pub hour: u8, + pub minute: u8, + pub second: u8, + pub frame: u8, + pub drop_frame: bool, +} + +#[derive(Debug)] +pub struct Time { + info: TimeInfo, +} + +impl DeserializedCommand for Time { + fn raw_name(&self) -> &'static str { + DESERIALIZE_TIME_RAW_NAME + } + + fn apply_to_state(&self, state: &mut crate::state::AtemState) -> Vec { + todo!() + } +} + +#[derive(Default)] +pub struct TimeDeserializer {} + +impl CommandDeserializer for TimeDeserializer { + fn deserialize( + &self, + buffer: &[u8], + ) -> std::sync::Arc { + let info = TimeInfo { + hour: buffer[0], + minute: buffer[1], + second: buffer[2], + frame: buffer[3], + drop_frame: buffer[5] == 1, + }; + + Arc::new(Time { info }) + } +} diff --git a/atem-connection-rs/src/enums/mod.rs b/atem-connection-rs/src/enums/mod.rs index adcd96a..34c4192 100644 --- a/atem-connection-rs/src/enums/mod.rs +++ b/atem-connection-rs/src/enums/mod.rs @@ -21,7 +21,7 @@ pub enum Model { MiniExtremeISO = 0x11, } -#[derive(Default)] +#[derive(Debug, Default)] pub enum ProtocolVersion { #[default] Unknown = 0, @@ -32,6 +32,22 @@ pub enum ProtocolVersion { V8_1_1 = 0x0002001e, // 2.30 } +impl TryFrom for ProtocolVersion { + type Error = (); + + fn try_from(value: u32) -> Result { + match value { + 0 => Ok(ProtocolVersion::Unknown), + 0x00020016 => Ok(ProtocolVersion::V7_2), + 0x0002001b => Ok(ProtocolVersion::V7_5_2), + 0x0002001c => Ok(ProtocolVersion::V8_0), + 0x0002001d => Ok(ProtocolVersion::V8_0_1), + 0x0002001e => Ok(ProtocolVersion::V8_1_1), + _ => Ok(ProtocolVersion::Unknown), + } + } +} + pub enum TransitionStyle { MIX = 0x00, DIP = 0x01, diff --git a/atem-connection-rs/src/state/video/upstream_keyers.rs b/atem-connection-rs/src/state/video/upstream_keyers.rs index f8c39aa..511af5b 100644 --- a/atem-connection-rs/src/state/video/upstream_keyers.rs +++ b/atem-connection-rs/src/state/video/upstream_keyers.rs @@ -16,7 +16,7 @@ pub trait UpstreamKeyerTypeSettings { fn set_fly_enabled(&mut self, enabled: bool); } -pub trait UpstreamKeyerMaskSettings { +pub trait UpstreamKeyerMaskSettings: Send { fn get_mask_enabled(&self) -> bool; fn set_mask_enabled(&mut self, enabled: bool); fn get_mask_top(&self) -> f64;