feat: Very basic tally parsing

This commit is contained in:
Baud 2024-02-12 22:53:55 +00:00
parent 2ee3d71e78
commit 9d872eecc9
11 changed files with 173 additions and 45 deletions

1
Cargo.lock generated
View File

@ -93,6 +93,7 @@ dependencies = [
"clap", "clap",
"color-eyre", "color-eyre",
"env_logger", "env_logger",
"log",
"tokio", "tokio",
] ]

View File

@ -7,13 +7,3 @@ pub struct AtemOptions {
disable_multi_threaded: bool, disable_multi_threaded: bool,
child_process_timeout: Option<u64>, child_process_timeout: Option<u64>,
} }
pub enum AtemEvents {
Error(String),
Info(String),
Debug(String),
Connected,
Disconnected,
StateChanged(Box<(AtemState, Vec<String>)>),
ReceivedCommands(Vec<Box<dyn DeserializedCommand>>),
}

View File

@ -1,13 +1,17 @@
use std::{ use std::{
io, io,
net::SocketAddr, net::SocketAddr,
sync::Arc,
time::{Duration, SystemTime}, time::{Duration, SystemTime},
}; };
use log::debug; use log::debug;
use tokio::net::UdpSocket; use tokio::net::UdpSocket;
use crate::atem_lib::atem_util; use crate::{
atem_lib::atem_util,
commands::{command_base::DeserializedCommand, parse_commands::deserialize_commands},
};
const IN_FLIGHT_TIMEOUT: u64 = 60; const IN_FLIGHT_TIMEOUT: u64 = 60;
const CONNECTION_TIMEOUT: u64 = 5000; const CONNECTION_TIMEOUT: u64 = 5000;
@ -21,6 +25,17 @@ const MAX_PACKET_PER_ACK: u16 = 16;
const MAX_PACKET_RECEIVE_SIZE: usize = 65535; const MAX_PACKET_RECEIVE_SIZE: usize = 65535;
const ACK_PACKET_LENGTH: u16 = 12; const ACK_PACKET_LENGTH: u16 = 12;
#[derive(Clone)]
pub enum AtemEvent {
Error(String),
Info(String),
Debug(String),
Connected,
Disconnected,
ReceivedCommand(Arc<dyn DeserializedCommand>),
AckedCommand(AckedPacket),
}
#[derive(PartialEq, Clone)] #[derive(PartialEq, Clone)]
enum ConnectionState { enum ConnectionState {
Closed, Closed,
@ -69,6 +84,7 @@ struct InFlightPacket {
pub resent: u16, pub resent: u16,
} }
#[derive(Clone)]
struct AckedPacket { struct AckedPacket {
packet_id: u16, packet_id: u16,
tracking_id: u64, tracking_id: u64,
@ -97,6 +113,8 @@ pub struct AtemSocketInner {
in_flight: Vec<InFlightPacket>, in_flight: Vec<InFlightPacket>,
ack_timer: Option<SystemTime>, ack_timer: Option<SystemTime>,
received_without_ack: u16, received_without_ack: u16,
atem_event_tx: tokio::sync::broadcast::Sender<AtemEvent>,
} }
enum AtemSocketReceiveError { enum AtemSocketReceiveError {
@ -114,6 +132,8 @@ enum AtemSocketWriteError {
impl AtemSocketInner { impl AtemSocketInner {
pub fn new() -> Self { pub fn new() -> Self {
let (atem_event_tx, _) = tokio::sync::broadcast::channel(100);
AtemSocketInner { AtemSocketInner {
connection_state: ConnectionState::Closed, connection_state: ConnectionState::Closed,
reconnect_timer: None, reconnect_timer: None,
@ -131,6 +151,8 @@ impl AtemSocketInner {
in_flight: vec![], in_flight: vec![],
ack_timer: None, ack_timer: None,
received_without_ack: 0, received_without_ack: 0,
atem_event_tx,
} }
} }
@ -214,6 +236,10 @@ impl AtemSocketInner {
}) })
} }
pub fn subscribe_to_events(&self) -> tokio::sync::broadcast::Receiver<AtemEvent> {
self.atem_event_tx.subscribe()
}
async fn restart_connection(&mut self) { async fn restart_connection(&mut self) {
self.disconnect(); self.disconnect();
self.connect(self.address.clone(), self.port).await.ok(); self.connect(self.address.clone(), self.port).await.ok();
@ -320,7 +346,7 @@ impl AtemSocketInner {
self.send_or_queue_ack().await; self.send_or_queue_ack().await;
if length > 12 { if length > 12 {
self.on_command_received(&packet[12..], remote_packet_id); self.on_command_received(&packet[12..]);
} }
} else if self } else if self
.is_packet_covered_by_ack(self.last_received_packed_id, remote_packet_id) .is_packet_covered_by_ack(self.last_received_packed_id, remote_packet_id)
@ -438,16 +464,18 @@ impl AtemSocketInner {
} }
} }
fn on_command_received(&mut self, payload: &[u8], packet_id: u16) { fn on_command_received(&mut self, payload: &[u8]) {
// TODO: Emit some event let commands = deserialize_commands(payload);
} }
fn on_command_acknowledged(&mut self, ids: Vec<AckedPacket>) { fn on_command_acknowledged(&mut self, packets: Vec<AckedPacket>) {
// TODO: Emit some event for ack in packets {
let _ = self.atem_event_tx.send(AtemEvent::AckedCommand(ack));
}
} }
fn on_disconnect(&mut self) { fn on_disconnect(&mut self) {
// TODO: Emit some event let _ = self.atem_event_tx.send(AtemEvent::Disconnected);
} }
fn start_timers(&mut self) { fn start_timers(&mut self) {

View File

@ -1,8 +1,11 @@
use std::collections::HashMap; use std::{collections::HashMap, sync::Arc};
use crate::{enums::ProtocolVersion, state::AtemState}; use crate::{enums::ProtocolVersion, state::AtemState};
pub trait DeserializedCommand { pub trait DeserializedCommand: Send + Sync {
fn deserialize(buffer: &[u8]) -> Self
where
Self: Sized;
fn apply_to_state(&self, state: &mut AtemState) -> Vec<String>; fn apply_to_state(&self, state: &mut AtemState) -> Vec<String>;
} }

View File

@ -1,27 +1,3 @@
use super::command_base::{BasicWritableCommand, SerializableCommand}; use super::command_base::{BasicWritableCommand, SerializableCommand};
#[derive(new)] pub mod program_input;
pub struct ProgramInput {
mix_effect: u8,
source: u16,
}
impl SerializableCommand for ProgramInput {
fn payload(&self, _version: crate::enums::ProtocolVersion) -> Vec<u8> {
let mut buf = vec![0; 4];
buf[..1].copy_from_slice(&self.mix_effect.to_be_bytes());
buf[2..].copy_from_slice(&self.source.to_be_bytes());
buf
}
}
impl BasicWritableCommand for ProgramInput {
fn get_raw_name(&self) -> &'static str {
"CPgI"
}
fn get_minimum_version(&self) -> crate::enums::ProtocolVersion {
crate::enums::ProtocolVersion::Unknown
}
}

View File

@ -0,0 +1,27 @@
use crate::commands::command_base::{BasicWritableCommand, SerializableCommand};
#[derive(new)]
pub struct ProgramInput {
mix_effect: u8,
source: u16,
}
impl SerializableCommand for ProgramInput {
fn payload(&self, _version: crate::enums::ProtocolVersion) -> Vec<u8> {
let mut buf = vec![0; 4];
buf[..1].copy_from_slice(&self.mix_effect.to_be_bytes());
buf[2..].copy_from_slice(&self.source.to_be_bytes());
buf
}
}
impl BasicWritableCommand for ProgramInput {
fn get_raw_name(&self) -> &'static str {
"CPgI"
}
fn get_minimum_version(&self) -> crate::enums::ProtocolVersion {
crate::enums::ProtocolVersion::Unknown
}
}

View File

@ -1,2 +1,4 @@
pub mod command_base; pub mod command_base;
pub mod mix_effects; pub mod mix_effects;
pub mod parse_commands;
pub mod tally_by_source;

View File

@ -0,0 +1,43 @@
use std::sync::Arc;
use crate::commands::tally_by_source::TallyBySource;
use super::command_base::DeserializedCommand;
pub fn deserialize_commands(payload: &[u8]) -> Vec<Arc<dyn DeserializedCommand>> {
let mut parsed_commands = vec![];
let mut head = 0;
while payload.len() > head + 8 {
// log::debug!("Head at {} out of {}", head, payload.len());
let length = u16::from_be_bytes([payload[head], payload[head + 1]]) as usize;
let Ok(name) = String::from_utf8(payload[(head + 4)..(head + 8)].to_vec()) else {
break;
};
if length < 8 {
break;
}
log::debug!("Received command {} with length {}", name, length);
match name.as_str() {
"TlSr" => {
let tally = TallyBySource::deserialize(&payload[head + 8..head + length]);
for (source, state) in tally.sources {
log::info!(
"Source {} Program: {}, Preview: {}",
source,
state.program,
state.preview
)
}
}
_ => {}
}
head += length;
}
parsed_commands
}

View File

@ -0,0 +1,56 @@
use std::collections::HashMap;
use crate::commands::command_base::{BasicWritableCommand, SerializableCommand};
use super::command_base::{DeserializableCommand, DeserializedCommand};
pub struct TallySource {
pub program: bool,
pub preview: bool,
}
#[derive(new)]
pub struct TallyBySource {
pub sources: HashMap<u16, TallySource>,
}
impl DeserializableCommand for TallyBySource {
fn get_raw_name(&self) -> &'static str {
"TlSr"
}
fn get_minimum_version(&self) -> crate::enums::ProtocolVersion {
crate::enums::ProtocolVersion::Unknown
}
}
impl DeserializedCommand for TallyBySource {
fn deserialize(buffer: &[u8]) -> Self {
let source_count = u16::from_be_bytes([buffer[0], buffer[1]]) as usize;
log::debug!("{:?}", buffer);
log::debug!("Source count: {}", source_count);
let mut sources = HashMap::new();
for i in 0..source_count {
let source_byte_offset = 2 + (i * 3);
let source =
u16::from_be_bytes([buffer[source_byte_offset], buffer[source_byte_offset + 1]]);
let value_byte_offset = 4 + (i * 3);
let value = u8::from_be_bytes([buffer[value_byte_offset]]);
sources.insert(
source,
TallySource {
program: (value & 0x01) > 0,
preview: (value & 0x02) > 0,
},
);
}
Self { sources }
}
fn apply_to_state(&self, state: &mut crate::state::AtemState) -> Vec<String> {
todo!()
}
}

View File

@ -10,4 +10,5 @@ atem-connection-rs = { path = "../atem-connection-rs" }
clap = { version = "4.4.18", features = ["derive"] } clap = { version = "4.4.18", features = ["derive"] }
color-eyre = "0.5.11" color-eyre = "0.5.11"
env_logger = "0.9.0" env_logger = "0.9.0"
log = "0.4.14"
tokio = "1.14.0" tokio = "1.14.0"

View File

@ -4,7 +4,7 @@ use atem_connection_rs::{
atem_lib::atem_socket::AtemSocket, atem_lib::atem_socket::AtemSocket,
commands::{ commands::{
command_base::{BasicWritableCommand, SerializableCommand}, command_base::{BasicWritableCommand, SerializableCommand},
mix_effects::ProgramInput, mix_effects::program_input::ProgramInput,
}, },
}; };
@ -35,6 +35,7 @@ async fn main() {
let mut tracking_id = 0; let mut tracking_id = 0;
loop { loop {
tracking_id += 1;
sleep(Duration::from_millis(5000)).await; sleep(Duration::from_millis(5000)).await;
atem.send_command( atem.send_command(
&switch_to_source_1.payload(atem_connection_rs::enums::ProtocolVersion::Unknown), &switch_to_source_1.payload(atem_connection_rs::enums::ProtocolVersion::Unknown),