diff --git a/Cargo.lock b/Cargo.lock index 7f2c13a..cfc8410 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -93,6 +93,7 @@ dependencies = [ "clap", "color-eyre", "env_logger", + "log", "tokio", ] diff --git a/atem-connection-rs/src/atem.rs b/atem-connection-rs/src/atem.rs index 07e00aa..2bc214e 100644 --- a/atem-connection-rs/src/atem.rs +++ b/atem-connection-rs/src/atem.rs @@ -7,13 +7,3 @@ pub struct AtemOptions { disable_multi_threaded: bool, child_process_timeout: Option, } - -pub enum AtemEvents { - Error(String), - Info(String), - Debug(String), - Connected, - Disconnected, - StateChanged(Box<(AtemState, Vec)>), - ReceivedCommands(Vec>), -} diff --git a/atem-connection-rs/src/atem_lib/atem_socket_inner.rs b/atem-connection-rs/src/atem_lib/atem_socket_inner.rs index 4126687..2584f70 100644 --- a/atem-connection-rs/src/atem_lib/atem_socket_inner.rs +++ b/atem-connection-rs/src/atem_lib/atem_socket_inner.rs @@ -1,13 +1,17 @@ use std::{ io, net::SocketAddr, + sync::Arc, time::{Duration, SystemTime}, }; use log::debug; use tokio::net::UdpSocket; -use crate::atem_lib::atem_util; +use crate::{ + atem_lib::atem_util, + commands::{command_base::DeserializedCommand, parse_commands::deserialize_commands}, +}; const IN_FLIGHT_TIMEOUT: u64 = 60; const CONNECTION_TIMEOUT: u64 = 5000; @@ -21,6 +25,17 @@ const MAX_PACKET_PER_ACK: u16 = 16; const MAX_PACKET_RECEIVE_SIZE: usize = 65535; const ACK_PACKET_LENGTH: u16 = 12; +#[derive(Clone)] +pub enum AtemEvent { + Error(String), + Info(String), + Debug(String), + Connected, + Disconnected, + ReceivedCommand(Arc), + AckedCommand(AckedPacket), +} + #[derive(PartialEq, Clone)] enum ConnectionState { Closed, @@ -69,6 +84,7 @@ struct InFlightPacket { pub resent: u16, } +#[derive(Clone)] struct AckedPacket { packet_id: u16, tracking_id: u64, @@ -97,6 +113,8 @@ pub struct AtemSocketInner { in_flight: Vec, ack_timer: Option, received_without_ack: u16, + + atem_event_tx: tokio::sync::broadcast::Sender, } enum AtemSocketReceiveError { @@ -114,6 +132,8 @@ enum AtemSocketWriteError { impl AtemSocketInner { pub fn new() -> Self { + let (atem_event_tx, _) = tokio::sync::broadcast::channel(100); + AtemSocketInner { connection_state: ConnectionState::Closed, reconnect_timer: None, @@ -131,6 +151,8 @@ impl AtemSocketInner { in_flight: vec![], ack_timer: None, received_without_ack: 0, + + atem_event_tx, } } @@ -214,6 +236,10 @@ impl AtemSocketInner { }) } + pub fn subscribe_to_events(&self) -> tokio::sync::broadcast::Receiver { + self.atem_event_tx.subscribe() + } + async fn restart_connection(&mut self) { self.disconnect(); self.connect(self.address.clone(), self.port).await.ok(); @@ -320,7 +346,7 @@ impl AtemSocketInner { self.send_or_queue_ack().await; if length > 12 { - self.on_command_received(&packet[12..], remote_packet_id); + self.on_command_received(&packet[12..]); } } else if self .is_packet_covered_by_ack(self.last_received_packed_id, remote_packet_id) @@ -438,16 +464,18 @@ impl AtemSocketInner { } } - fn on_command_received(&mut self, payload: &[u8], packet_id: u16) { - // TODO: Emit some event + fn on_command_received(&mut self, payload: &[u8]) { + let commands = deserialize_commands(payload); } - fn on_command_acknowledged(&mut self, ids: Vec) { - // TODO: Emit some event + fn on_command_acknowledged(&mut self, packets: Vec) { + for ack in packets { + let _ = self.atem_event_tx.send(AtemEvent::AckedCommand(ack)); + } } fn on_disconnect(&mut self) { - // TODO: Emit some event + let _ = self.atem_event_tx.send(AtemEvent::Disconnected); } fn start_timers(&mut self) { diff --git a/atem-connection-rs/src/commands/command_base.rs b/atem-connection-rs/src/commands/command_base.rs index 896e73a..b9ef1c7 100644 --- a/atem-connection-rs/src/commands/command_base.rs +++ b/atem-connection-rs/src/commands/command_base.rs @@ -1,8 +1,11 @@ -use std::collections::HashMap; +use std::{collections::HashMap, sync::Arc}; use crate::{enums::ProtocolVersion, state::AtemState}; -pub trait DeserializedCommand { +pub trait DeserializedCommand: Send + Sync { + fn deserialize(buffer: &[u8]) -> Self + where + Self: Sized; fn apply_to_state(&self, state: &mut AtemState) -> Vec; } diff --git a/atem-connection-rs/src/commands/mix_effects.rs b/atem-connection-rs/src/commands/mix_effects.rs index b0a6e40..cca5d48 100644 --- a/atem-connection-rs/src/commands/mix_effects.rs +++ b/atem-connection-rs/src/commands/mix_effects.rs @@ -1,27 +1,3 @@ use super::command_base::{BasicWritableCommand, SerializableCommand}; -#[derive(new)] -pub struct ProgramInput { - mix_effect: u8, - source: u16, -} - -impl SerializableCommand for ProgramInput { - fn payload(&self, _version: crate::enums::ProtocolVersion) -> Vec { - let mut buf = vec![0; 4]; - buf[..1].copy_from_slice(&self.mix_effect.to_be_bytes()); - buf[2..].copy_from_slice(&self.source.to_be_bytes()); - - buf - } -} - -impl BasicWritableCommand for ProgramInput { - fn get_raw_name(&self) -> &'static str { - "CPgI" - } - - fn get_minimum_version(&self) -> crate::enums::ProtocolVersion { - crate::enums::ProtocolVersion::Unknown - } -} +pub mod program_input; diff --git a/atem-connection-rs/src/commands/mix_effects/program_input.rs b/atem-connection-rs/src/commands/mix_effects/program_input.rs new file mode 100644 index 0000000..2d8ec5e --- /dev/null +++ b/atem-connection-rs/src/commands/mix_effects/program_input.rs @@ -0,0 +1,27 @@ +use crate::commands::command_base::{BasicWritableCommand, SerializableCommand}; + +#[derive(new)] +pub struct ProgramInput { + mix_effect: u8, + source: u16, +} + +impl SerializableCommand for ProgramInput { + fn payload(&self, _version: crate::enums::ProtocolVersion) -> Vec { + let mut buf = vec![0; 4]; + buf[..1].copy_from_slice(&self.mix_effect.to_be_bytes()); + buf[2..].copy_from_slice(&self.source.to_be_bytes()); + + buf + } +} + +impl BasicWritableCommand for ProgramInput { + fn get_raw_name(&self) -> &'static str { + "CPgI" + } + + fn get_minimum_version(&self) -> crate::enums::ProtocolVersion { + crate::enums::ProtocolVersion::Unknown + } +} diff --git a/atem-connection-rs/src/commands/mod.rs b/atem-connection-rs/src/commands/mod.rs index a95dff4..f8c64e4 100644 --- a/atem-connection-rs/src/commands/mod.rs +++ b/atem-connection-rs/src/commands/mod.rs @@ -1,2 +1,4 @@ pub mod command_base; pub mod mix_effects; +pub mod parse_commands; +pub mod tally_by_source; diff --git a/atem-connection-rs/src/commands/parse_commands.rs b/atem-connection-rs/src/commands/parse_commands.rs new file mode 100644 index 0000000..8bae3d3 --- /dev/null +++ b/atem-connection-rs/src/commands/parse_commands.rs @@ -0,0 +1,43 @@ +use std::sync::Arc; + +use crate::commands::tally_by_source::TallyBySource; + +use super::command_base::DeserializedCommand; + +pub fn deserialize_commands(payload: &[u8]) -> Vec> { + let mut parsed_commands = vec![]; + let mut head = 0; + + while payload.len() > head + 8 { + // log::debug!("Head at {} out of {}", head, payload.len()); + let length = u16::from_be_bytes([payload[head], payload[head + 1]]) as usize; + let Ok(name) = String::from_utf8(payload[(head + 4)..(head + 8)].to_vec()) else { + break; + }; + + if length < 8 { + break; + } + + log::debug!("Received command {} with length {}", name, length); + + match name.as_str() { + "TlSr" => { + let tally = TallyBySource::deserialize(&payload[head + 8..head + length]); + for (source, state) in tally.sources { + log::info!( + "Source {} Program: {}, Preview: {}", + source, + state.program, + state.preview + ) + } + } + _ => {} + } + + head += length; + } + + parsed_commands +} diff --git a/atem-connection-rs/src/commands/tally_by_source.rs b/atem-connection-rs/src/commands/tally_by_source.rs new file mode 100644 index 0000000..6019f5c --- /dev/null +++ b/atem-connection-rs/src/commands/tally_by_source.rs @@ -0,0 +1,56 @@ +use std::collections::HashMap; + +use crate::commands::command_base::{BasicWritableCommand, SerializableCommand}; + +use super::command_base::{DeserializableCommand, DeserializedCommand}; + +pub struct TallySource { + pub program: bool, + pub preview: bool, +} + +#[derive(new)] +pub struct TallyBySource { + pub sources: HashMap, +} + +impl DeserializableCommand for TallyBySource { + fn get_raw_name(&self) -> &'static str { + "TlSr" + } + + fn get_minimum_version(&self) -> crate::enums::ProtocolVersion { + crate::enums::ProtocolVersion::Unknown + } +} + +impl DeserializedCommand for TallyBySource { + fn deserialize(buffer: &[u8]) -> Self { + let source_count = u16::from_be_bytes([buffer[0], buffer[1]]) as usize; + + log::debug!("{:?}", buffer); + log::debug!("Source count: {}", source_count); + + let mut sources = HashMap::new(); + for i in 0..source_count { + let source_byte_offset = 2 + (i * 3); + let source = + u16::from_be_bytes([buffer[source_byte_offset], buffer[source_byte_offset + 1]]); + let value_byte_offset = 4 + (i * 3); + let value = u8::from_be_bytes([buffer[value_byte_offset]]); + sources.insert( + source, + TallySource { + program: (value & 0x01) > 0, + preview: (value & 0x02) > 0, + }, + ); + } + + Self { sources } + } + + fn apply_to_state(&self, state: &mut crate::state::AtemState) -> Vec { + todo!() + } +} diff --git a/atem-test/Cargo.toml b/atem-test/Cargo.toml index 1bd44e8..64ade34 100644 --- a/atem-test/Cargo.toml +++ b/atem-test/Cargo.toml @@ -10,4 +10,5 @@ atem-connection-rs = { path = "../atem-connection-rs" } clap = { version = "4.4.18", features = ["derive"] } color-eyre = "0.5.11" env_logger = "0.9.0" +log = "0.4.14" tokio = "1.14.0" diff --git a/atem-test/src/main.rs b/atem-test/src/main.rs index 62a6999..28b1d98 100644 --- a/atem-test/src/main.rs +++ b/atem-test/src/main.rs @@ -4,7 +4,7 @@ use atem_connection_rs::{ atem_lib::atem_socket::AtemSocket, commands::{ command_base::{BasicWritableCommand, SerializableCommand}, - mix_effects::ProgramInput, + mix_effects::program_input::ProgramInput, }, }; @@ -35,6 +35,7 @@ async fn main() { let mut tracking_id = 0; loop { + tracking_id += 1; sleep(Duration::from_millis(5000)).await; atem.send_command( &switch_to_source_1.payload(atem_connection_rs::enums::ProtocolVersion::Unknown),