From 9f45b7b6d9318c91764b402315feb86cca24f34d Mon Sep 17 00:00:00 2001 From: Sam Willcocks Date: Mon, 9 Jun 2025 23:23:11 +0100 Subject: [PATCH 01/10] flake: friendship ended with devshell and alejandra --- flake.lock | 62 ++++++++++++------------------------------------------ flake.nix | 16 +++----------- 2 files changed, 17 insertions(+), 61 deletions(-) diff --git a/flake.lock b/flake.lock index 73a9e2f..0dea3fb 100644 --- a/flake.lock +++ b/flake.lock @@ -1,26 +1,8 @@ { "nodes": { - "devshell": { - "inputs": { - "nixpkgs": "nixpkgs" - }, - "locked": { - "lastModified": 1741473158, - "narHash": "sha256-kWNaq6wQUbUMlPgw8Y+9/9wP0F8SHkjy24/mN3UAppg=", - "owner": "numtide", - "repo": "devshell", - "rev": "7c9e793ebe66bcba8292989a68c0419b737a22a0", - "type": "github" - }, - "original": { - "owner": "numtide", - "repo": "devshell", - "type": "github" - } - }, "naersk": { "inputs": { - "nixpkgs": "nixpkgs_2" + "nixpkgs": "nixpkgs" }, "locked": { "lastModified": 1745925850, @@ -38,11 +20,11 @@ }, "nixpkgs": { "locked": { - "lastModified": 1722073938, - "narHash": "sha256-OpX0StkL8vpXyWOGUD6G+MA26wAXK6SpT94kLJXo6B4=", + "lastModified": 1749619289, + "narHash": "sha256-qX6gXVjaCXXbcn6A9eSLUf8Fm07MgPGe5ir3++y2O1Q=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "e36e9f57337d0ff0cf77aceb58af4c805472bfae", + "rev": "f72be405a10668b8b00937b452f2145244103ebc", "type": "github" }, "original": { @@ -54,25 +36,10 @@ }, "nixpkgs_2": { "locked": { - "lastModified": 1749401433, - "narHash": "sha256-HXIQzULIG/MEUW2Q/Ss47oE3QrjxvpUX7gUl4Xp6lnc=", - "owner": "NixOS", - "repo": "nixpkgs", - "rev": "08fcb0dcb59df0344652b38ea6326a2d8271baff", - "type": "github" - }, - "original": { - "owner": "NixOS", - "ref": "nixpkgs-unstable", - "repo": "nixpkgs", - "type": "github" - } - }, - "nixpkgs_3": { - "locked": { - "lastModified": 0, - "narHash": "sha256-DDe16FJk18sadknQKKG/9FbwEro7A57tg9vB5kxZ8kY=", - "path": "/nix/store/2d1ahim48jhzg4bbm97mvjlb4p7fpan3-source", + "lastModified": 1733412085, + "narHash": "sha256-FillH0qdWDt/nlO6ED7h4cmN+G9uXwGjwmCnHs0QVYM=", + "path": "/nix/store/5wvrlpfrwlmg24ywkybvidyzcvki6bwx-source", + "rev": "4dc2fc4e62dbf62b84132fe526356fbac7b03541", "type": "path" }, "original": { @@ -80,7 +47,7 @@ "type": "indirect" } }, - "nixpkgs_4": { + "nixpkgs_3": { "locked": { "lastModified": 1744536153, "narHash": "sha256-awS2zRgF4uTwrOKwwiJcByDzDOdo3Q1rPZbiHQg/N38=", @@ -98,23 +65,22 @@ }, "root": { "inputs": { - "devshell": "devshell", "naersk": "naersk", - "nixpkgs": "nixpkgs_3", + "nixpkgs": "nixpkgs_2", "rust-overlay": "rust-overlay", "utils": "utils" } }, "rust-overlay": { "inputs": { - "nixpkgs": "nixpkgs_4" + "nixpkgs": "nixpkgs_3" }, "locked": { - "lastModified": 1749436897, - "narHash": "sha256-OkDtaCGQQVwVFz5HWfbmrMJR99sFIMXHCHEYXzUJEJY=", + "lastModified": 1749695868, + "narHash": "sha256-debjTLOyqqsYOUuUGQsAHskFXH5+Kx2t3dOo/FCoNRA=", "owner": "oxalica", "repo": "rust-overlay", - "rev": "e7876c387e35dc834838aff254d8e74cf5bd4f19", + "rev": "55f914d5228b5c8120e9e0f9698ed5b7214d09cd", "type": "github" }, "original": { diff --git a/flake.nix b/flake.nix index e7101ef..883f484 100644 --- a/flake.nix +++ b/flake.nix @@ -3,7 +3,6 @@ inputs = { utils.url = "github:numtide/flake-utils"; - devshell.url = "github:numtide/devshell"; naersk.url = "github:nix-community/naersk"; rust-overlay.url = "github:oxalica/rust-overlay"; }; @@ -13,7 +12,6 @@ nixpkgs, utils, naersk, - devshell, rust-overlay, }: utils.lib.eachDefaultSystem (system: let @@ -35,18 +33,10 @@ apps.default = utils.lib.mkApp {drv = packages.default;}; - # Provide a dev env with rust and rust-analyzer - devShells.default = let - pkgs = import nixpkgs { - inherit system; - overlays = [devshell.overlays.default]; - }; - in - pkgs.devshell.mkShell { - motd = "Hello you wonderful person, I hope you are having a lovely day 💜"; - + devShells.default = pkgs.mkShell { packages = with pkgs; [(rust.override {extensions = ["rust-src"];}) rust-analyzer gcc]; + }; - formatter = pkgs.alejandra; + formatter = pkgs.nixfmt-rfc-style; }); } -- 2.44.1 From 16829080db165d2d0799aafef64bb35701f869b6 Mon Sep 17 00:00:00 2001 From: Sam Willcocks Date: Tue, 10 Jun 2025 10:19:09 +0100 Subject: [PATCH 02/10] Make AtemPacket operate on `&[u8] `, add field parsing --- Cargo.lock | 18 +- atem-connection-rs/Cargo.toml | 10 +- atem-connection-rs/src/atem_lib/atem_field.rs | 88 ++++++++ .../src/atem_lib/atem_packet.rs | 201 ++++++++++++------ .../src/atem_lib/atem_socket.rs | 28 ++- atem-connection-rs/src/atem_lib/atem_util.rs | 4 - atem-connection-rs/src/atem_lib/mod.rs | 3 +- .../src/commands/parse_commands.rs | 34 +-- 8 files changed, 273 insertions(+), 113 deletions(-) create mode 100644 atem-connection-rs/src/atem_lib/atem_field.rs delete mode 100644 atem-connection-rs/src/atem_lib/atem_util.rs diff --git a/Cargo.lock b/Cargo.lock index 4422846..e9e4d12 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -80,6 +80,7 @@ version = "0.1.0" dependencies = [ "derive-getters", "derive-new", + "itertools", "log", "tokio", "tokio-util", @@ -249,6 +250,12 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "either" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" + [[package]] name = "env_logger" version = "0.9.0" @@ -323,6 +330,15 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ce23b50ad8242c51a442f3ff322d56b02f08852c77e4c0b4d3fd684abc89c683" +[[package]] +name = "itertools" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" +dependencies = [ + "either", +] + [[package]] name = "lazy_static" version = "1.4.0" diff --git a/atem-connection-rs/Cargo.toml b/atem-connection-rs/Cargo.toml index 58dc8eb..0eae179 100644 --- a/atem-connection-rs/Cargo.toml +++ b/atem-connection-rs/Cargo.toml @@ -6,6 +6,12 @@ edition = "2021" [dependencies] derive-getters = "0.2.0" derive-new = "0.6.0" +itertools = {version = "0.14.0"} log = "0.4.14" -tokio = { version = "1.13.0", features = ["full"] } -tokio-util = "0.7.10" +tokio = { version = "1.13.0", features = ["full"], optional = true } +tokio-util = { version = "0.7.10", optional = true } + +[features] +default = ["std"] + +std = ["dep:tokio", "dep:tokio-util"] diff --git a/atem-connection-rs/src/atem_lib/atem_field.rs b/atem-connection-rs/src/atem_lib/atem_field.rs new file mode 100644 index 0000000..1fa07d7 --- /dev/null +++ b/atem-connection-rs/src/atem_lib/atem_field.rs @@ -0,0 +1,88 @@ +//! Definitions and decoding of ATEM protocol fields + +/// An uninterpreted ATEM protocol field - a 4-ascii character type plus variable length data +pub struct RawField<'a> { + pub r#type: &'a [u8; 4], + pub data: &'a [u8], +} + +#[derive(Debug)] +pub struct _Ver { + pub major: u16, + pub minor: u16, +} + +impl<'a> Field<'a> for _Ver { + const TYPE: [u8; 4] = [b'_', b'v', b'e', b'r']; + + fn decode(data: &'a [u8]) -> Result { + let data = checked_len::<4>(data)?; + Ok(Self { + major: u16::from_be_bytes(data[0..=1].try_into().unwrap()), + minor: u16::from_be_bytes(data[2..=3].try_into().unwrap()), + }) + } +} + +#[derive(Debug)] +pub struct PrvI { + pub m_e_index: u8, + pub source_index: u16, + pub pvw_in_pgm: bool, +} + +impl<'a> Field<'a> for PrvI { + const TYPE: [u8; 4] = [b'P', b'r', b'v', b'I']; + fn decode(data: &'a [u8]) -> Result { + let data = checked_len::<8>(data)?; + Ok(Self { + m_e_index: data[0], + source_index: u16::from_be_bytes(data[2..=3].try_into().unwrap()), + pvw_in_pgm: data[4] != 0, + }) + } +} + +#[derive(Debug)] +pub struct PrgI { + pub m_e_index: u8, + pub source_index: u16, +} + +impl<'a> Field<'a> for PrgI { + const TYPE: [u8; 4] = [b'P', b'r', b'g', b'I']; + fn decode(data: &'a [u8]) -> Result { + let data = checked_len::<4>(data)?; + Ok(Self { + m_e_index: data[0], + source_index: u16::from_be_bytes(data[2..=3].try_into().unwrap()), + }) + } +} + +pub trait Field<'a>: Sized { + const TYPE: [u8; 4]; + fn decode(data: &'a [u8]) -> Result; + fn try_from_raw(raw: RawField<'a>) -> Result { + if Self::TYPE != *raw.r#type { + Err(FieldParsingError::MismatchedFieldType) + } else { + Self::decode(&raw.data) + } + } +} + +#[derive(Debug)] +pub enum FieldParsingError { + UnexpectedLength { expected: usize, got: usize }, + UnknownFieldType { r#type: [u8; 4] }, + MismatchedFieldType, +} + +fn checked_len<'a, const LEN: usize>(data: &'a [u8]) -> Result<&'a [u8; LEN], FieldParsingError> { + data.try_into() + .map_err(|_| FieldParsingError::UnexpectedLength { + expected: LEN, + got: data.len(), + }) +} diff --git a/atem-connection-rs/src/atem_lib/atem_packet.rs b/atem-connection-rs/src/atem_lib/atem_packet.rs index deeb5b9..2d3cfa8 100755 --- a/atem-connection-rs/src/atem_lib/atem_packet.rs +++ b/atem-connection-rs/src/atem_lib/atem_packet.rs @@ -1,16 +1,28 @@ +use core::{fmt::Display, str}; + +/// The "hello" packet to start communication with the ATEM +pub const COMMAND_CONNECT_HELLO: [u8; 20] = [ + 0x10, 0x14, 0x53, 0xab, 0x00, 0x00, 0x00, 0x00, 0x00, 0x3a, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, +]; + #[derive(Debug)] -pub struct AtemPacket<'packet_buffer> { - flags: u8, - session_id: u16, - remote_packet_id: u16, - retransmit_requested_from_packet_id: Option, - ack_reply: Option, - body: Option<&'packet_buffer [u8]>, +pub struct AtemPacket> { + buf: T, } +impl<'a> From<&'a [u8]> for AtemPacket<&'a [u8]> { + fn from(buf: &'a [u8]) -> Self { + AtemPacket { buf } + } +} + +#[derive(Debug)] pub enum AtemPacketErr { - TooShort(String), - LengthDiffers(String), + /// The packet was too short + TooShort { got: usize }, + /// The packet's stated and actual lengths were different + LengthDiffers { expected: u16, got: usize }, } #[derive(PartialEq)] @@ -34,82 +46,133 @@ impl From for u8 { } } -impl<'packet_buffer> AtemPacket<'packet_buffer> { +impl> AtemPacket { + pub fn new_checked(buf: T) -> Result { + let len = buf.as_ref().len(); + if len < 12 { + return Err(AtemPacketErr::TooShort { + got: buf.as_ref().len(), + }); + } + let p = Self { buf }; + if p.length() as usize != len { + return Err(AtemPacketErr::LengthDiffers { + expected: p.length(), + got: len, + }); + } + Ok(p) + } + pub fn length(&self) -> u16 { + u16::from_be_bytes(self.buf.as_ref()[0..=1].try_into().unwrap()) & 0x07ff + } + + pub fn flags(&self) -> u8 { + self.buf.as_ref()[0] >> 3 + } + pub fn session_id(&self) -> u16 { - self.session_id + u16::from_be_bytes(self.buf.as_ref()[2..=3].try_into().unwrap()) } - pub fn remote_packet_id(&self) -> u16 { - self.remote_packet_id + pub fn ack_number(&self) -> u16 { + u16::from_be_bytes(self.buf.as_ref()[4..=5].try_into().unwrap()) } - pub fn body(&self) -> Option<&[u8]> { - self.body + pub fn remote_sequence_number(&self) -> u16 { + u16::from_be_bytes(self.buf.as_ref()[9..=10].try_into().unwrap()) + } + + pub fn local_sequence_number(&self) -> u16 { + u16::from_be_bytes(self.buf.as_ref()[10..=11].try_into().unwrap()) } pub fn retransmit_request(&self) -> Option { - self.retransmit_requested_from_packet_id + self.has_flag(PacketFlag::RetransmitRequest) + .then_some(u16::from_be_bytes([ + self.buf.as_ref()[6], + self.buf.as_ref()[7], + ])) } pub fn ack_reply(&self) -> Option { - self.ack_reply + self.has_flag(PacketFlag::AckReply) + .then_some(u16::from_be_bytes([ + self.buf.as_ref()[4], + self.buf.as_ref()[5], + ])) } + /// Return true if this packet has the given [`PacketFlag`] pub fn has_flag(&self, flag: PacketFlag) -> bool { - self.flags & u8::from(flag) > 0 + self.flags() & u8::from(flag) > 0 + } + + /// Get an iterator over the `Field`s in this packet. + /// + /// Returns None if this is a packet without fields. + pub fn fields(&self) -> Fields { + // TODO: do we only ever get newsessionid during the handshake (i.e. not in a packet with fields)? + let has_fields = !self.has_flag(PacketFlag::NewSessionId); + Fields::new(if has_fields { self.body() } else { &[] }) + } + + pub fn body(&self) -> &[u8] { + &self.buf.as_ref()[12..] } } -impl<'packet_buffer> TryFrom<&'packet_buffer [u8]> for AtemPacket<'packet_buffer> { - type Error = AtemPacketErr; - - fn try_from(buffer: &'packet_buffer [u8]) -> Result { - if buffer.len() < 12 { - return Err(AtemPacketErr::TooShort(format!( - "Invalid packet from ATEM {:x?}", - buffer - ))); - } - - let length = u16::from_be_bytes([buffer[0], buffer[1]]) & 0x07ff; - if length as usize != buffer.len() { - return Err(AtemPacketErr::LengthDiffers(format!( - "Length of message differs, expected {} got {}", - length, - buffer.len() - ))); - } - - let flags = buffer[0] >> 3; - let session_id = u16::from_be_bytes([buffer[2], buffer[3]]); - let remote_packet_id = u16::from_be_bytes([buffer[10], buffer[11]]); - - let body = if buffer.len() > 12 { - Some(&buffer[12..]) - } else { - None - }; - - let retransmit_requested_from_packet_id = - if flags & u8::from(PacketFlag::RetransmitRequest) > 0 { - Some(u16::from_be_bytes([buffer[6], buffer[7]])) - } else { - None - }; - - let ack_reply = if flags & u8::from(PacketFlag::AckReply) > 0 { - Some(u16::from_be_bytes([buffer[4], buffer[5]])) - } else { - None - }; - - Ok(AtemPacket { - flags, - session_id, - remote_packet_id, - body, - retransmit_requested_from_packet_id, - ack_reply, - }) +impl> Display for AtemPacket { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!( + f, + "len: {}, sid: {}, flags: {}, ack: {}, localseq: {}, remoteseq: {}", + self.length(), + self.session_id(), + self.flags(), + self.ack_number(), + self.remote_sequence_number(), + self.local_sequence_number(), + ) + } +} + +/// An ATEM protocol field - a 4-ascii character type plus variable length data +pub struct Field<'a> { + pub r#type: &'a str, + pub data: &'a [u8], +} + +/// Created by [`AtemPacket::fields`] +pub struct Fields<'a> { + data: &'a [u8], + // The offset of the next field in the packet + offset: usize, +} + +impl<'a> Fields<'a> { + pub(crate) fn new(data: &'a [u8]) -> Self { + Self { data, offset: 0 } + } +} + +impl<'a> Iterator for Fields<'a> { + type Item = Field<'a>; + fn next(&mut self) -> Option { + let remain = self.data.len() - self.offset; + if remain == 0 { + return None; + } else if remain < 8 { + // TODO: is 8 indeed the minimum size for something here? (i.e. no field data) + panic!("Oh no"); + } + + let length = + u16::from_be_bytes(self.data[self.offset..=self.offset + 1].try_into().unwrap()); + // TODO: sanity check length + let r#type = str::from_utf8(&self.data[self.offset + 4..=self.offset + 7]).unwrap(); + let data = &self.data[self.offset + 8..self.offset + (length as usize)]; + self.offset += (length as usize); + Some(Field { r#type, data }) } } diff --git a/atem-connection-rs/src/atem_lib/atem_socket.rs b/atem-connection-rs/src/atem_lib/atem_socket.rs index 4945b54..e96529b 100644 --- a/atem-connection-rs/src/atem_lib/atem_socket.rs +++ b/atem-connection-rs/src/atem_lib/atem_socket.rs @@ -7,6 +7,7 @@ use std::{ time::{Duration, SystemTime}, }; +use itertools::Itertools; use tokio::{ net::UdpSocket, select, @@ -14,7 +15,7 @@ use tokio::{ }; use crate::{ - atem_lib::{atem_packet::AtemPacket, atem_util}, + atem_lib::atem_packet::{self, AtemPacket, COMMAND_CONNECT_HELLO}, commands::{ command_base::{BasicWritableCommand, DeserializedCommand}, parse_commands::deserialize_commands, @@ -258,7 +259,7 @@ impl AtemSocket { self.in_flight = vec![]; log::debug!("Reconnect"); - self.send_packet(&atem_util::COMMAND_CONNECT_HELLO).await; + self.send_packet(&COMMAND_CONNECT_HELLO).await; self.connection_state = ConnectionState::SynSent; Ok(()) @@ -386,16 +387,21 @@ impl AtemSocket { } async fn recieved_packet(&mut self, packet: &[u8]) { - let Ok(atem_packet): Result = packet.try_into() else { + let Ok(atem_packet): Result, _> = packet.try_into() else { return; }; - log::debug!("Received {:x?}", atem_packet); + log::debug!("Received {}", atem_packet,); + log::debug!( + "fields: {}", + atem_packet.fields().map(|f| f.r#type).join(",") + ); self.last_received_at = SystemTime::now(); self.session_id = atem_packet.session_id(); - let remote_packet_id = atem_packet.remote_packet_id(); + // TODO: naming seems rather off here + let remote_packet_id = atem_packet.local_sequence_number(); if atem_packet.has_flag(PacketFlag::NewSessionId) { log::debug!("New session"); @@ -418,9 +424,7 @@ impl AtemSocket { self.last_received_packed_id = remote_packet_id; self.send_or_queue_ack().await; - if let Some(body) = atem_packet.body() { - self.on_commands_received(body); - } + self.on_commands_received(atem_packet.body()); } else if self .is_packet_covered_by_ack(self.last_received_packed_id, remote_packet_id) { @@ -537,9 +541,11 @@ impl AtemSocket { } fn on_commands_received(&mut self, payload: &[u8]) { - let _ = self - .atem_event_tx - .send(AtemSocketEvent::ReceivedCommands(payload.to_vec())); + if !payload.is_empty() { + let _ = self + .atem_event_tx + .send(AtemSocketEvent::ReceivedCommands(payload.to_vec())); + } } fn on_command_acknowledged(&mut self, packets: Vec) { diff --git a/atem-connection-rs/src/atem_lib/atem_util.rs b/atem-connection-rs/src/atem_lib/atem_util.rs deleted file mode 100644 index 09b970a..0000000 --- a/atem-connection-rs/src/atem_lib/atem_util.rs +++ /dev/null @@ -1,4 +0,0 @@ -pub const COMMAND_CONNECT_HELLO: [u8; 20] = [ - 0x10, 0x14, 0x53, 0xab, 0x00, 0x00, 0x00, 0x00, 0x00, 0x3a, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, -]; diff --git a/atem-connection-rs/src/atem_lib/mod.rs b/atem-connection-rs/src/atem_lib/mod.rs index 9b3090b..388a4e1 100644 --- a/atem-connection-rs/src/atem_lib/mod.rs +++ b/atem-connection-rs/src/atem_lib/mod.rs @@ -1,3 +1,2 @@ -mod atem_packet; +pub mod atem_packet; pub mod atem_socket; -pub mod atem_util; diff --git a/atem-connection-rs/src/commands/parse_commands.rs b/atem-connection-rs/src/commands/parse_commands.rs index 1c51d1d..e32386e 100644 --- a/atem-connection-rs/src/commands/parse_commands.rs +++ b/atem-connection-rs/src/commands/parse_commands.rs @@ -1,6 +1,7 @@ use std::{collections::VecDeque, sync::Arc}; use crate::{ + atem_lib::atem_packet::{AtemPacket, Fields}, commands::device_profile::version::{deserialize_version, DESERIALIZE_VERSION_RAW_NAME}, enums::ProtocolVersion, }; @@ -25,43 +26,28 @@ use super::{ time::{TimeDeserializer, DESERIALIZE_TIME_RAW_NAME}, }; -pub fn deserialize_commands( - payload: &[u8], +pub fn deserialize_commands>( + payload: T, version: &mut ProtocolVersion, ) -> VecDeque> { let mut parsed_commands: VecDeque> = VecDeque::new(); - let mut head = 0; - while payload.len() > head + 8 { - let length = u16::from_be_bytes([payload[head], payload[head + 1]]) as usize; - let Ok(name) = String::from_utf8(payload[(head + 4)..(head + 8)].to_vec()) else { - break; - }; + for field in Fields::new(payload.as_ref()) { + let name: &str = field.r#type.try_into().unwrap(); + log::debug!("Received command {} with length {}", name, field.data.len(),); - if length < 8 { - break; - } - - log::debug!("Received command {} with length {}", name, length); - - let command_buffer = &payload[head + 8..head + length]; - - if name == DESERIALIZE_VERSION_RAW_NAME { - let version_command = deserialize_version(command_buffer); + if field.r#type == DESERIALIZE_VERSION_RAW_NAME { + let version_command = deserialize_version(field.data); *version = version_command.version.clone(); log::info!("Switched to protocol version {}", version); parsed_commands.push_back(Arc::new(version_command)); - } else if let Some(deserializer) = command_deserializer_from_string(name.as_str()) { - let deserialized_command = deserializer.deserialize(command_buffer, version); + } else if let Some(deserializer) = command_deserializer_from_string(name) { + let deserialized_command = deserializer.deserialize(field.data, version); log::debug!("Received {:?}", deserialized_command); parsed_commands.push_back(deserialized_command); } else { log::warn!("Received command {name} for which there is no deserializer."); - // TODO: Remove! - todo!("Write deserializer for {name}."); } - - head += length; } parsed_commands -- 2.44.1 From d88cf9249bc0186f2a557cf746ad09f6d172cf0a Mon Sep 17 00:00:00 2001 From: Sam Willcocks Date: Thu, 19 Jun 2025 10:05:36 +0100 Subject: [PATCH 03/10] Make AtemPacket operate on `&[u8] `, add field parsing --- atem-connection-rs/src/atem_lib/atem_field.rs | 13 +++-- .../src/atem_lib/atem_packet.rs | 47 ++++++++++--------- .../src/atem_lib/atem_socket.rs | 2 +- atem-connection-rs/src/atem_lib/mod.rs | 1 + .../src/commands/parse_commands.rs | 4 +- 5 files changed, 35 insertions(+), 32 deletions(-) diff --git a/atem-connection-rs/src/atem_lib/atem_field.rs b/atem-connection-rs/src/atem_lib/atem_field.rs index 1fa07d7..3ea7a22 100644 --- a/atem-connection-rs/src/atem_lib/atem_field.rs +++ b/atem-connection-rs/src/atem_lib/atem_field.rs @@ -2,7 +2,7 @@ /// An uninterpreted ATEM protocol field - a 4-ascii character type plus variable length data pub struct RawField<'a> { - pub r#type: &'a [u8; 4], + pub r#type: &'a str, pub data: &'a [u8], } @@ -13,7 +13,7 @@ pub struct _Ver { } impl<'a> Field<'a> for _Ver { - const TYPE: [u8; 4] = [b'_', b'v', b'e', b'r']; + const TYPE: &'static str = "_ver"; fn decode(data: &'a [u8]) -> Result { let data = checked_len::<4>(data)?; @@ -32,7 +32,7 @@ pub struct PrvI { } impl<'a> Field<'a> for PrvI { - const TYPE: [u8; 4] = [b'P', b'r', b'v', b'I']; + const TYPE: &'static str = "PrvI"; fn decode(data: &'a [u8]) -> Result { let data = checked_len::<8>(data)?; Ok(Self { @@ -50,7 +50,7 @@ pub struct PrgI { } impl<'a> Field<'a> for PrgI { - const TYPE: [u8; 4] = [b'P', b'r', b'g', b'I']; + const TYPE: &'static str = "PrgI"; fn decode(data: &'a [u8]) -> Result { let data = checked_len::<4>(data)?; Ok(Self { @@ -61,10 +61,10 @@ impl<'a> Field<'a> for PrgI { } pub trait Field<'a>: Sized { - const TYPE: [u8; 4]; + const TYPE: &'static str; fn decode(data: &'a [u8]) -> Result; fn try_from_raw(raw: RawField<'a>) -> Result { - if Self::TYPE != *raw.r#type { + if Self::TYPE != raw.r#type { Err(FieldParsingError::MismatchedFieldType) } else { Self::decode(&raw.data) @@ -75,7 +75,6 @@ pub trait Field<'a>: Sized { #[derive(Debug)] pub enum FieldParsingError { UnexpectedLength { expected: usize, got: usize }, - UnknownFieldType { r#type: [u8; 4] }, MismatchedFieldType, } diff --git a/atem-connection-rs/src/atem_lib/atem_packet.rs b/atem-connection-rs/src/atem_lib/atem_packet.rs index 2d3cfa8..832ef11 100755 --- a/atem-connection-rs/src/atem_lib/atem_packet.rs +++ b/atem-connection-rs/src/atem_lib/atem_packet.rs @@ -1,5 +1,7 @@ use core::{fmt::Display, str}; +use super::atem_field::{Field, FieldParsingError, RawField}; + /// The "hello" packet to start communication with the ATEM pub const COMMAND_CONNECT_HELLO: [u8; 20] = [ 0x10, 0x14, 0x53, 0xab, 0x00, 0x00, 0x00, 0x00, 0x00, 0x3a, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, @@ -11,9 +13,10 @@ pub struct AtemPacket> { buf: T, } -impl<'a> From<&'a [u8]> for AtemPacket<&'a [u8]> { - fn from(buf: &'a [u8]) -> Self { - AtemPacket { buf } +impl<'a> TryFrom<&'a [u8]> for AtemPacket<&'a [u8]> { + type Error = AtemPacketErr; + fn try_from(buf: &'a [u8]) -> Result { + AtemPacket::new_checked(buf) } } @@ -97,10 +100,7 @@ impl> AtemPacket { pub fn ack_reply(&self) -> Option { self.has_flag(PacketFlag::AckReply) - .then_some(u16::from_be_bytes([ - self.buf.as_ref()[4], - self.buf.as_ref()[5], - ])) + .then_some(self.ack_number()) } /// Return true if this packet has the given [`PacketFlag`] @@ -111,10 +111,19 @@ impl> AtemPacket { /// Get an iterator over the `Field`s in this packet. /// /// Returns None if this is a packet without fields. - pub fn fields(&self) -> Fields { + pub fn raw_fields(&self) -> RawFields { // TODO: do we only ever get newsessionid during the handshake (i.e. not in a packet with fields)? let has_fields = !self.has_flag(PacketFlag::NewSessionId); - Fields::new(if has_fields { self.body() } else { &[] }) + RawFields::new(if has_fields { self.body() } else { &[] }) + } + + pub fn fields<'a, F: Field<'a>>( + &'a self, + ) -> impl Iterator> + use<'a, F, T> { + self.raw_fields().filter_map(|f| match F::try_from_raw(f) { + Err(FieldParsingError::MismatchedFieldType) => None, + x => Some(x), + }) } pub fn body(&self) -> &[u8] { @@ -137,27 +146,21 @@ impl> Display for AtemPacket { } } -/// An ATEM protocol field - a 4-ascii character type plus variable length data -pub struct Field<'a> { - pub r#type: &'a str, - pub data: &'a [u8], -} - -/// Created by [`AtemPacket::fields`] -pub struct Fields<'a> { +/// Created by [`AtemPacket::raw_fields`] +pub struct RawFields<'a> { data: &'a [u8], // The offset of the next field in the packet offset: usize, } -impl<'a> Fields<'a> { +impl<'a> RawFields<'a> { pub(crate) fn new(data: &'a [u8]) -> Self { Self { data, offset: 0 } } } -impl<'a> Iterator for Fields<'a> { - type Item = Field<'a>; +impl<'a> Iterator for RawFields<'a> { + type Item = RawField<'a>; fn next(&mut self) -> Option { let remain = self.data.len() - self.offset; if remain == 0 { @@ -172,7 +175,7 @@ impl<'a> Iterator for Fields<'a> { // TODO: sanity check length let r#type = str::from_utf8(&self.data[self.offset + 4..=self.offset + 7]).unwrap(); let data = &self.data[self.offset + 8..self.offset + (length as usize)]; - self.offset += (length as usize); - Some(Field { r#type, data }) + self.offset += length as usize; + Some(RawField { r#type, data }) } } diff --git a/atem-connection-rs/src/atem_lib/atem_socket.rs b/atem-connection-rs/src/atem_lib/atem_socket.rs index e96529b..a006a77 100644 --- a/atem-connection-rs/src/atem_lib/atem_socket.rs +++ b/atem-connection-rs/src/atem_lib/atem_socket.rs @@ -394,7 +394,7 @@ impl AtemSocket { log::debug!("Received {}", atem_packet,); log::debug!( "fields: {}", - atem_packet.fields().map(|f| f.r#type).join(",") + atem_packet.raw_fields().map(|f| f.r#type).join(",") ); self.last_received_at = SystemTime::now(); diff --git a/atem-connection-rs/src/atem_lib/mod.rs b/atem-connection-rs/src/atem_lib/mod.rs index 388a4e1..9e604ce 100644 --- a/atem-connection-rs/src/atem_lib/mod.rs +++ b/atem-connection-rs/src/atem_lib/mod.rs @@ -1,2 +1,3 @@ +pub mod atem_field; pub mod atem_packet; pub mod atem_socket; diff --git a/atem-connection-rs/src/commands/parse_commands.rs b/atem-connection-rs/src/commands/parse_commands.rs index e32386e..a463c0e 100644 --- a/atem-connection-rs/src/commands/parse_commands.rs +++ b/atem-connection-rs/src/commands/parse_commands.rs @@ -1,7 +1,7 @@ use std::{collections::VecDeque, sync::Arc}; use crate::{ - atem_lib::atem_packet::{AtemPacket, Fields}, + atem_lib::atem_packet::{AtemPacket, RawFields}, commands::device_profile::version::{deserialize_version, DESERIALIZE_VERSION_RAW_NAME}, enums::ProtocolVersion, }; @@ -32,7 +32,7 @@ pub fn deserialize_commands>( ) -> VecDeque> { let mut parsed_commands: VecDeque> = VecDeque::new(); - for field in Fields::new(payload.as_ref()) { + for field in RawFields::new(payload.as_ref()) { let name: &str = field.r#type.try_into().unwrap(); log::debug!("Received command {} with length {}", name, field.data.len(),); -- 2.44.1 From fe7243e359893d84c7a46c673096424e5c99e400 Mon Sep 17 00:00:00 2001 From: Sam Willcocks Date: Thu, 12 Jun 2025 18:45:14 +0100 Subject: [PATCH 04/10] Use enumflags for PacketFlag --- Cargo.lock | 21 +++++++ atem-connection-rs/Cargo.toml | 1 + .../src/atem_lib/atem_packet.rs | 62 ++++++++++--------- .../src/atem_lib/atem_socket.rs | 10 +-- 4 files changed, 60 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e9e4d12..2273f83 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -80,6 +80,7 @@ version = "0.1.0" dependencies = [ "derive-getters", "derive-new", + "enumflags2", "itertools", "log", "tokio", @@ -256,6 +257,26 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "enumflags2" +version = "0.7.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1027f7680c853e056ebcec683615fb6fbbc07dbaa13b4d5d9442b146ded4ecef" +dependencies = [ + "enumflags2_derive", +] + +[[package]] +name = "enumflags2_derive" +version = "0.7.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67c78a4d8fdf9953a5c9d458f9efe940fd97a0cab0941c075a813ac594733827" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "env_logger" version = "0.9.0" diff --git a/atem-connection-rs/Cargo.toml b/atem-connection-rs/Cargo.toml index 0eae179..ffc5820 100644 --- a/atem-connection-rs/Cargo.toml +++ b/atem-connection-rs/Cargo.toml @@ -10,6 +10,7 @@ itertools = {version = "0.14.0"} log = "0.4.14" tokio = { version = "1.13.0", features = ["full"], optional = true } tokio-util = { version = "0.7.10", optional = true } +enumflags2 = { version = "0.7.12", default-features = false } [features] default = ["std"] diff --git a/atem-connection-rs/src/atem_lib/atem_packet.rs b/atem-connection-rs/src/atem_lib/atem_packet.rs index 832ef11..94cd59f 100755 --- a/atem-connection-rs/src/atem_lib/atem_packet.rs +++ b/atem-connection-rs/src/atem_lib/atem_packet.rs @@ -1,4 +1,5 @@ use core::{fmt::Display, str}; +use enumflags2::{bitflags, BitFlags}; use super::atem_field::{Field, FieldParsingError, RawField}; @@ -23,30 +24,26 @@ impl<'a> TryFrom<&'a [u8]> for AtemPacket<&'a [u8]> { #[derive(Debug)] pub enum AtemPacketErr { /// The packet was too short - TooShort { got: usize }, + TooShort { + got: usize, + }, /// The packet's stated and actual lengths were different - LengthDiffers { expected: u16, got: usize }, + LengthDiffers { + expected: u16, + got: usize, + }, + InvalidFlags, } -#[derive(PartialEq)] +#[bitflags] +#[repr(u8)] +#[derive(PartialEq, Copy, Clone, Debug)] pub enum PacketFlag { - AckRequest, - NewSessionId, - IsRetransmit, - RetransmitRequest, - AckReply, -} - -impl From for u8 { - fn from(flag: PacketFlag) -> Self { - match flag { - PacketFlag::AckRequest => 0x01, - PacketFlag::NewSessionId => 0x02, - PacketFlag::IsRetransmit => 0x04, - PacketFlag::RetransmitRequest => 0x08, - PacketFlag::AckReply => 0x10, - } - } + AckRequest = 0x1, + NewSessionId = 0x2, + IsRetransmit = 0x4, + RetransmitRequest = 0x8, + AckReply = 0x10, } impl> AtemPacket { @@ -64,16 +61,26 @@ impl> AtemPacket { got: len, }); } + // Check flags are valid + let _: BitFlags = p + .flags_raw() + .try_into() + .map_err(|_| AtemPacketErr::InvalidFlags)?; Ok(p) } pub fn length(&self) -> u16 { u16::from_be_bytes(self.buf.as_ref()[0..=1].try_into().unwrap()) & 0x07ff } - pub fn flags(&self) -> u8 { + fn flags_raw(&self) -> u8 { self.buf.as_ref()[0] >> 3 } + pub fn flags(&self) -> BitFlags { + // We `unwrap` here, but given we check the flags in the constructor this should never panic. + self.flags_raw().try_into().unwrap() + } + pub fn session_id(&self) -> u16 { u16::from_be_bytes(self.buf.as_ref()[2..=3].try_into().unwrap()) } @@ -91,7 +98,8 @@ impl> AtemPacket { } pub fn retransmit_request(&self) -> Option { - self.has_flag(PacketFlag::RetransmitRequest) + self.flags() + .contains(PacketFlag::RetransmitRequest) .then_some(u16::from_be_bytes([ self.buf.as_ref()[6], self.buf.as_ref()[7], @@ -99,21 +107,17 @@ impl> AtemPacket { } pub fn ack_reply(&self) -> Option { - self.has_flag(PacketFlag::AckReply) + self.flags() + .contains(PacketFlag::AckReply) .then_some(self.ack_number()) } - /// Return true if this packet has the given [`PacketFlag`] - pub fn has_flag(&self, flag: PacketFlag) -> bool { - self.flags() & u8::from(flag) > 0 - } - /// Get an iterator over the `Field`s in this packet. /// /// Returns None if this is a packet without fields. pub fn raw_fields(&self) -> RawFields { // TODO: do we only ever get newsessionid during the handshake (i.e. not in a packet with fields)? - let has_fields = !self.has_flag(PacketFlag::NewSessionId); + let has_fields = !self.flags().contains(PacketFlag::NewSessionId); RawFields::new(if has_fields { self.body() } else { &[] }) } diff --git a/atem-connection-rs/src/atem_lib/atem_socket.rs b/atem-connection-rs/src/atem_lib/atem_socket.rs index a006a77..06b94cb 100644 --- a/atem-connection-rs/src/atem_lib/atem_socket.rs +++ b/atem-connection-rs/src/atem_lib/atem_socket.rs @@ -300,7 +300,7 @@ impl AtemSocket { self.next_send_packet_id = 0; } - let opcode = u16::from(u8::from(PacketFlag::AckRequest)) << 11; + let opcode = u16::from(PacketFlag::AckRequest as u8) << 11; let mut buffer = vec![0; 20 + payload.len()]; @@ -403,7 +403,7 @@ impl AtemSocket { // TODO: naming seems rather off here let remote_packet_id = atem_packet.local_sequence_number(); - if atem_packet.has_flag(PacketFlag::NewSessionId) { + if atem_packet.flags().contains(PacketFlag::NewSessionId) { log::debug!("New session"); self.connection_state = ConnectionState::Established; self.last_received_packed_id = remote_packet_id; @@ -419,7 +419,7 @@ impl AtemSocket { self.retransmit_from(from_packet_id).await; } - if atem_packet.has_flag(PacketFlag::AckRequest) { + if atem_packet.flags().contains(PacketFlag::AckRequest) { if remote_packet_id == (self.last_received_packed_id + 1) % MAX_PACKET_ID { self.last_received_packed_id = remote_packet_id; self.send_or_queue_ack().await; @@ -432,7 +432,7 @@ impl AtemSocket { } } - if atem_packet.has_flag(PacketFlag::IsRetransmit) { + if atem_packet.flags().contains(PacketFlag::IsRetransmit) { log::debug!("ATEM retransmitted packet {:x?}", remote_packet_id); } @@ -482,7 +482,7 @@ impl AtemSocket { async fn send_ack(&mut self, packet_id: u16) { log::debug!("Sending ack for packet {:x?}", packet_id); - let flag: u8 = PacketFlag::AckReply.into(); + let flag: u8 = PacketFlag::AckReply as u8; let opcode = u16::from(flag) << 11; let mut buffer: [u8; ACK_PACKET_LENGTH as _] = [0; 12]; buffer[0..2].copy_from_slice(&u16::to_be_bytes(opcode | ACK_PACKET_LENGTH)); -- 2.44.1 From 3e1ecb0df105205596ac6da31998796422be36cb Mon Sep 17 00:00:00 2001 From: Sam Willcocks Date: Thu, 19 Jun 2025 00:25:56 +0100 Subject: [PATCH 05/10] use Instant over SystemTime --- .../src/atem_lib/atem_socket.rs | 49 +++++++++---------- 1 file changed, 22 insertions(+), 27 deletions(-) diff --git a/atem-connection-rs/src/atem_lib/atem_socket.rs b/atem-connection-rs/src/atem_lib/atem_socket.rs index 06b94cb..89cffe3 100644 --- a/atem-connection-rs/src/atem_lib/atem_socket.rs +++ b/atem-connection-rs/src/atem_lib/atem_socket.rs @@ -4,7 +4,7 @@ use std::{ io, net::SocketAddr, sync::Arc, - time::{Duration, SystemTime}, + time::{Duration, Instant}, }; use itertools::Itertools; @@ -25,10 +25,10 @@ use crate::{ use super::atem_packet::PacketFlag; -const IN_FLIGHT_TIMEOUT: u64 = 60; -const CONNECTION_TIMEOUT: u64 = 5000; -const CONNECTION_RETRY_INTERVAL: u64 = 1000; -const RETRANSMIT_CHECK_INTERVAL: u64 = 1000; +const IN_FLIGHT_TIMEOUT: Duration = Duration::from_millis(60); +const CONNECTION_TIMEOUT: Duration = Duration::from_millis(5000); +const CONNECTION_RETRY_INTERVAL: Duration = Duration::from_millis(1000); +const RETRANSMIT_CHECK_INTERVAL: Duration = Duration::from_millis(1000); const MAX_PACKET_RETRIES: u16 = 10; const MAX_PACKET_ID: u16 = 1 << 15; const MAX_PACKET_PER_ACK: u16 = 16; @@ -93,8 +93,8 @@ impl AtemSocketCommand { pub struct AtemSocket { connection_state: ConnectionState, - reconnect_timer: Option, - retransmit_timer: Option, + reconnect_timer: Option, + retransmit_timer: Option, next_tracking_id: u64, @@ -106,10 +106,10 @@ pub struct AtemSocket { protocol_version: ProtocolVersion, - last_received_at: SystemTime, + last_received_at: Instant, last_received_packed_id: u16, in_flight: Vec, - ack_timer: Option, + ack_timer: Option, received_without_ack: u16, atem_message_rx: tokio::sync::mpsc::Receiver, @@ -142,7 +142,7 @@ struct InFlightPacket { packet_id: u16, tracking_id: u64, payload: Vec, - pub last_sent: SystemTime, + pub last_sent: Instant, pub resent: u16, } @@ -171,7 +171,7 @@ impl AtemSocket { protocol_version: ProtocolVersion::V7_2, - last_received_at: SystemTime::now(), + last_received_at: Instant::now(), last_received_packed_id: 0, in_flight: vec![], ack_timer: None, @@ -321,7 +321,7 @@ impl AtemSocket { packet_id, tracking_id, payload: buffer, - last_sent: SystemTime::now(), + last_sent: Instant::now(), resent: 0, }) } @@ -339,17 +339,15 @@ impl AtemSocket { } } if let Some(ack_time) = self.ack_timer { - if ack_time <= SystemTime::now() { + if ack_time <= Instant::now() { self.ack_timer = None; self.received_without_ack = 0; self.send_ack(self.last_received_packed_id).await; } } if let Some(reconnect_time) = self.reconnect_timer { - if reconnect_time <= SystemTime::now() { - if self.last_received_at + Duration::from_millis(CONNECTION_TIMEOUT) - <= SystemTime::now() - { + if reconnect_time <= Instant::now() { + if self.last_received_at + CONNECTION_TIMEOUT <= Instant::now() { log::debug!("{:?}", self.last_received_at); log::debug!("Connection timed out, restarting"); self.restart_connection().await; @@ -358,7 +356,7 @@ impl AtemSocket { } } if let Some(retransmit_time) = self.retransmit_timer { - if retransmit_time <= SystemTime::now() { + if retransmit_time <= Instant::now() { self.check_for_retransmit().await; self.start_retransmit_timer(); } @@ -397,7 +395,7 @@ impl AtemSocket { atem_packet.raw_fields().map(|f| f.r#type).join(",") ); - self.last_received_at = SystemTime::now(); + self.last_received_at = Instant::now(); self.session_id = atem_packet.session_id(); // TODO: naming seems rather off here @@ -476,7 +474,7 @@ impl AtemSocket { self.ack_timer = None; self.send_ack(self.last_received_packed_id).await; } else if self.ack_timer.is_none() { - self.ack_timer = Some(SystemTime::now() + Duration::from_millis(5)); + self.ack_timer = Some(Instant::now() + Duration::from_millis(5)); } } @@ -509,7 +507,7 @@ impl AtemSocket { if sent_packet.packet_id == from_id || !self.is_packet_covered_by_ack(from_id, sent_packet.packet_id) { - sent_packet.last_sent = SystemTime::now(); + sent_packet.last_sent = Instant::now(); sent_packet.resent += 1; self.send_packet(&sent_packet.payload).await; @@ -523,8 +521,7 @@ impl AtemSocket { async fn check_for_retransmit(&mut self) { for sent_packet in self.in_flight.clone() { - if sent_packet.last_sent + Duration::from_millis(IN_FLIGHT_TIMEOUT) < SystemTime::now() - { + if sent_packet.last_sent + IN_FLIGHT_TIMEOUT < Instant::now() { if sent_packet.resent <= MAX_PACKET_RETRIES && self .is_packet_covered_by_ack(self.next_send_packet_id, sent_packet.packet_id) @@ -580,13 +577,11 @@ impl AtemSocket { } fn start_reconnect_timer(&mut self) { - self.reconnect_timer = - Some(SystemTime::now() + Duration::from_millis(CONNECTION_RETRY_INTERVAL)); + self.reconnect_timer = Some(Instant::now() + CONNECTION_RETRY_INTERVAL); } fn start_retransmit_timer(&mut self) { - self.retransmit_timer = - Some(SystemTime::now() + Duration::from_millis(RETRANSMIT_CHECK_INTERVAL)); + self.retransmit_timer = Some(Instant::now() + RETRANSMIT_CHECK_INTERVAL); } fn next_packet_tracking_id(&mut self) -> u64 { -- 2.44.1 From aaa8640a96ff977d1469d29ac8dfcd4bbec37b03 Mon Sep 17 00:00:00 2001 From: Sam Willcocks Date: Thu, 19 Jun 2025 00:36:38 +0100 Subject: [PATCH 06/10] edition 2024 --- atem-connection-rs/Cargo.toml | 2 +- .../src/atem_lib/atem_packet.rs | 2 +- .../src/commands/command_base.rs | 2 +- .../src/commands/mix_effects/program_input.rs | 1 + .../src/commands/parse_commands.rs | 20 +++++++++---------- .../src/commands/tally_by_source.rs | 1 + atem-connection-rs/src/lib.rs | 5 +---- atem-connection-rs/src/state/atem_macro.rs | 2 ++ atem-connection-rs/src/state/audio.rs | 2 ++ atem-connection-rs/src/state/color.rs | 2 ++ atem-connection-rs/src/state/common.rs | 2 ++ atem-connection-rs/src/state/fairlight.rs | 2 ++ atem-connection-rs/src/state/info.rs | 2 ++ atem-connection-rs/src/state/input.rs | 2 ++ atem-connection-rs/src/state/media.rs | 2 ++ atem-connection-rs/src/state/recording.rs | 2 ++ atem-connection-rs/src/state/settings.rs | 2 ++ atem-connection-rs/src/state/streaming.rs | 2 ++ atem-connection-rs/src/state/util.rs | 2 +- .../src/state/video/downstream_keyers.rs | 2 ++ atem-connection-rs/src/state/video/mod.rs | 2 ++ .../src/state/video/super_source.rs | 2 ++ atem-test/Cargo.toml | 4 +--- atem-test/src/main.rs | 8 ++++++-- 24 files changed, 52 insertions(+), 23 deletions(-) diff --git a/atem-connection-rs/Cargo.toml b/atem-connection-rs/Cargo.toml index ffc5820..732362e 100644 --- a/atem-connection-rs/Cargo.toml +++ b/atem-connection-rs/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "atem-connection-rs" version = "0.1.0" -edition = "2021" +edition = "2024" [dependencies] derive-getters = "0.2.0" diff --git a/atem-connection-rs/src/atem_lib/atem_packet.rs b/atem-connection-rs/src/atem_lib/atem_packet.rs index 94cd59f..d597c0b 100755 --- a/atem-connection-rs/src/atem_lib/atem_packet.rs +++ b/atem-connection-rs/src/atem_lib/atem_packet.rs @@ -1,5 +1,5 @@ use core::{fmt::Display, str}; -use enumflags2::{bitflags, BitFlags}; +use enumflags2::{BitFlags, bitflags}; use super::atem_field::{Field, FieldParsingError, RawField}; diff --git a/atem-connection-rs/src/commands/command_base.rs b/atem-connection-rs/src/commands/command_base.rs index b708af6..f754fc8 100644 --- a/atem-connection-rs/src/commands/command_base.rs +++ b/atem-connection-rs/src/commands/command_base.rs @@ -9,7 +9,7 @@ pub trait DeserializedCommand: Send + Sync + Debug { pub trait CommandDeserializer: Send + Sync { fn deserialize(&self, buffer: &[u8], version: &ProtocolVersion) - -> Arc; + -> Arc; } pub trait SerializableCommand: Send + Sync { diff --git a/atem-connection-rs/src/commands/mix_effects/program_input.rs b/atem-connection-rs/src/commands/mix_effects/program_input.rs index 47a6738..85da9ac 100644 --- a/atem-connection-rs/src/commands/mix_effects/program_input.rs +++ b/atem-connection-rs/src/commands/mix_effects/program_input.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use derive_new::new; use crate::{ commands::command_base::{ diff --git a/atem-connection-rs/src/commands/parse_commands.rs b/atem-connection-rs/src/commands/parse_commands.rs index a463c0e..2f83e10 100644 --- a/atem-connection-rs/src/commands/parse_commands.rs +++ b/atem-connection-rs/src/commands/parse_commands.rs @@ -2,7 +2,7 @@ use std::{collections::VecDeque, sync::Arc}; use crate::{ atem_lib::atem_packet::{AtemPacket, RawFields}, - commands::device_profile::version::{deserialize_version, DESERIALIZE_VERSION_RAW_NAME}, + commands::device_profile::version::{DESERIALIZE_VERSION_RAW_NAME, deserialize_version}, enums::ProtocolVersion, }; @@ -10,20 +10,20 @@ use super::{ command_base::{CommandDeserializer, DeserializedCommand}, device_profile::{ audio_mixer_config::{AudioMixerConfigDeserializer, DESERIALIZE_AUDIO_MIXER_CONFIG_NAME}, - media_pool_config::{MediaPoolConfigDeserializer, DESERIALIZE_MEDIA_POOL_CONFIG_NAME}, + media_pool_config::{DESERIALIZE_MEDIA_POOL_CONFIG_NAME, MediaPoolConfigDeserializer}, mix_effect_block_config::{ - MixEffectBlockConfigDeserializer, DESERIALIZE_MIX_EFFECT_BLOCK_CONFIG_NAME, + DESERIALIZE_MIX_EFFECT_BLOCK_CONFIG_NAME, MixEffectBlockConfigDeserializer, }, - multiviewer_config::{MultiviewerConfigDeserializer, DESERIALIZE_MULTIVIEWER_NAME}, + multiviewer_config::{DESERIALIZE_MULTIVIEWER_NAME, MultiviewerConfigDeserializer}, product_identifier::{ - ProductIdentifierDeserializer, DESERIALIZE_PRODUCT_IDENTIFIER_RAW_NAME, + DESERIALIZE_PRODUCT_IDENTIFIER_RAW_NAME, ProductIdentifierDeserializer, }, - topology::{TopologyDeserializer, DESERIALIZE_TOPOLOGY_RAW_NAME}, + topology::{DESERIALIZE_TOPOLOGY_RAW_NAME, TopologyDeserializer}, }, - init_complete::{InitCompleteDeserializer, DESERIALIZE_INIT_COMPLETE_RAW_NAME}, - mix_effects::program_input::{ProgramInputDeserializer, DESERIALIZE_PROGRAM_INPUT_RAW_NAME}, - tally_by_source::{TallyBySourceDeserializer, DESERIALIZE_TALLY_BY_SOURCE_RAW_NAME}, - time::{TimeDeserializer, DESERIALIZE_TIME_RAW_NAME}, + init_complete::{DESERIALIZE_INIT_COMPLETE_RAW_NAME, InitCompleteDeserializer}, + mix_effects::program_input::{DESERIALIZE_PROGRAM_INPUT_RAW_NAME, ProgramInputDeserializer}, + tally_by_source::{DESERIALIZE_TALLY_BY_SOURCE_RAW_NAME, TallyBySourceDeserializer}, + time::{DESERIALIZE_TIME_RAW_NAME, TimeDeserializer}, }; pub fn deserialize_commands>( diff --git a/atem-connection-rs/src/commands/tally_by_source.rs b/atem-connection-rs/src/commands/tally_by_source.rs index dd6e341..a3f184b 100644 --- a/atem-connection-rs/src/commands/tally_by_source.rs +++ b/atem-connection-rs/src/commands/tally_by_source.rs @@ -1,3 +1,4 @@ +use derive_new::new; use std::{collections::HashMap, sync::Arc}; use crate::enums::ProtocolVersion; diff --git a/atem-connection-rs/src/lib.rs b/atem-connection-rs/src/lib.rs index b82f233..3b421b2 100644 --- a/atem-connection-rs/src/lib.rs +++ b/atem-connection-rs/src/lib.rs @@ -1,7 +1,4 @@ -#[macro_use] -extern crate derive_new; -#[macro_use] -extern crate derive_getters; + pub mod atem; pub mod atem_lib; diff --git a/atem-connection-rs/src/state/atem_macro.rs b/atem-connection-rs/src/state/atem_macro.rs index 4236470..0b5dd01 100644 --- a/atem-connection-rs/src/state/atem_macro.rs +++ b/atem-connection-rs/src/state/atem_macro.rs @@ -1,3 +1,5 @@ +use derive_getters::Getters; +use derive_new::new; #[derive(Clone, PartialEq, Getters, new, Default)] pub struct MacroPlayerState { pub is_running: bool, diff --git a/atem-connection-rs/src/state/audio.rs b/atem-connection-rs/src/state/audio.rs index 0255afd..6df76e1 100644 --- a/atem-connection-rs/src/state/audio.rs +++ b/atem-connection-rs/src/state/audio.rs @@ -1,6 +1,8 @@ use std::collections::HashMap; use crate::enums::{AudioMixOption, AudioSourceType, ExternalPortType}; +use derive_getters::Getters; +use derive_new::new; pub type AudioChannel = ClassicAudioChannel; pub type AudioMasterChannel = ClassicAudioMasterChannel; diff --git a/atem-connection-rs/src/state/color.rs b/atem-connection-rs/src/state/color.rs index 9fd9290..a867ccf 100644 --- a/atem-connection-rs/src/state/color.rs +++ b/atem-connection-rs/src/state/color.rs @@ -1,3 +1,5 @@ +use derive_getters::Getters; +use derive_new::new; #[derive(Clone, PartialEq, Getters, new)] pub struct ColorGeneratorState { pub hue: u64, diff --git a/atem-connection-rs/src/state/common.rs b/atem-connection-rs/src/state/common.rs index aaf2f02..f69e0b4 100644 --- a/atem-connection-rs/src/state/common.rs +++ b/atem-connection-rs/src/state/common.rs @@ -1,3 +1,5 @@ +use derive_getters::Getters; +use derive_new::new; #[derive(Clone, PartialEq, Getters, new)] pub struct Timecode { pub hours: u64, diff --git a/atem-connection-rs/src/state/fairlight.rs b/atem-connection-rs/src/state/fairlight.rs index ba97e30..981adf7 100644 --- a/atem-connection-rs/src/state/fairlight.rs +++ b/atem-connection-rs/src/state/fairlight.rs @@ -1,4 +1,6 @@ use std::collections::HashMap; +use derive_getters::Getters; +use derive_new::new; use crate::enums::{ ExternalPortType, FairlightAnalogInputLevel, FairlightAudioMixOption, FairlightAudioSourceType, diff --git a/atem-connection-rs/src/state/info.rs b/atem-connection-rs/src/state/info.rs index c97f2be..79a37b6 100644 --- a/atem-connection-rs/src/state/info.rs +++ b/atem-connection-rs/src/state/info.rs @@ -1,4 +1,6 @@ use crate::enums::{Model, ProtocolVersion}; +use derive_getters::Getters; +use derive_new::new; #[derive(Clone, PartialEq, Getters, new)] pub struct AtemCapabilites { diff --git a/atem-connection-rs/src/state/input.rs b/atem-connection-rs/src/state/input.rs index 0c3c654..9b312b2 100644 --- a/atem-connection-rs/src/state/input.rs +++ b/atem-connection-rs/src/state/input.rs @@ -1,4 +1,6 @@ use crate::enums::{ExternalPortType, InternalPortType, MeAvailability, SourceAvailability}; +use derive_getters::Getters; +use derive_new::new; #[derive(Clone, PartialEq, Getters, new)] pub struct InputChannel { diff --git a/atem-connection-rs/src/state/media.rs b/atem-connection-rs/src/state/media.rs index f32824e..7d10789 100644 --- a/atem-connection-rs/src/state/media.rs +++ b/atem-connection-rs/src/state/media.rs @@ -1,4 +1,6 @@ use crate::enums; +use derive_getters::Getters; +use derive_new::new; #[derive(Clone, PartialEq, Getters, new)] pub struct MediaPlayer { diff --git a/atem-connection-rs/src/state/recording.rs b/atem-connection-rs/src/state/recording.rs index 4aa8220..58b62e4 100644 --- a/atem-connection-rs/src/state/recording.rs +++ b/atem-connection-rs/src/state/recording.rs @@ -1,3 +1,5 @@ +use derive_getters::Getters; +use derive_new::new; use std::collections::HashMap; use crate::enums::{RecordingDiskStatus, RecordingError, RecordingStatus}; diff --git a/atem-connection-rs/src/state/settings.rs b/atem-connection-rs/src/state/settings.rs index 2f601cc..7f965ba 100644 --- a/atem-connection-rs/src/state/settings.rs +++ b/atem-connection-rs/src/state/settings.rs @@ -1,4 +1,6 @@ use crate::enums::{MultiViewerLayout, VideoMode}; +use derive_getters::Getters; +use derive_new::new; pub trait MultiViewerSourceState { fn get_source(&self) -> u64; diff --git a/atem-connection-rs/src/state/streaming.rs b/atem-connection-rs/src/state/streaming.rs index c94644b..aa04def 100644 --- a/atem-connection-rs/src/state/streaming.rs +++ b/atem-connection-rs/src/state/streaming.rs @@ -1,4 +1,6 @@ use crate::enums::{StreamingError, StreamingStatus}; +use derive_getters::Getters; +use derive_new::new; use super::common::Timecode; diff --git a/atem-connection-rs/src/state/util.rs b/atem-connection-rs/src/state/util.rs index 776e7b5..de8dacf 100644 --- a/atem-connection-rs/src/state/util.rs +++ b/atem-connection-rs/src/state/util.rs @@ -1,9 +1,9 @@ use crate::enums::{TransitionSelection, TransitionStyle}; use super::{ + AtemState, settings::MultiViewer, video::{MixEffect, TransitionPosition, TransitionProperties, TransitionSettings}, - AtemState, }; pub fn create() -> AtemState { diff --git a/atem-connection-rs/src/state/video/downstream_keyers.rs b/atem-connection-rs/src/state/video/downstream_keyers.rs index 06d244d..a18cb90 100644 --- a/atem-connection-rs/src/state/video/downstream_keyers.rs +++ b/atem-connection-rs/src/state/video/downstream_keyers.rs @@ -1,3 +1,5 @@ +use derive_getters::Getters; +use derive_new::new; pub trait DownstreamKeyerBase { fn get_in_transition(&self) -> bool; fn get_remaining_frames(&self) -> f64; diff --git a/atem-connection-rs/src/state/video/mod.rs b/atem-connection-rs/src/state/video/mod.rs index fba70b5..4b3c396 100644 --- a/atem-connection-rs/src/state/video/mod.rs +++ b/atem-connection-rs/src/state/video/mod.rs @@ -1,4 +1,6 @@ use crate::enums; +use derive_getters::Getters; +use derive_new::new; mod downstream_keyers; mod super_source; diff --git a/atem-connection-rs/src/state/video/super_source.rs b/atem-connection-rs/src/state/video/super_source.rs index 92d85cd..b7c4ae4 100644 --- a/atem-connection-rs/src/state/video/super_source.rs +++ b/atem-connection-rs/src/state/video/super_source.rs @@ -1,4 +1,6 @@ use crate::enums; +use derive_getters::Getters; +use derive_new::new; #[derive(Clone, PartialEq, Getters, new)] pub struct SuperSourceBox { diff --git a/atem-test/Cargo.toml b/atem-test/Cargo.toml index 6496268..028b623 100644 --- a/atem-test/Cargo.toml +++ b/atem-test/Cargo.toml @@ -1,9 +1,7 @@ [package] name = "atem-test" version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +edition = "2024" [dependencies] atem-connection-rs = { path = "../atem-connection-rs" } diff --git a/atem-test/src/main.rs b/atem-test/src/main.rs index 3fe8795..72e0911 100644 --- a/atem-test/src/main.rs +++ b/atem-test/src/main.rs @@ -69,12 +69,16 @@ async fn main() { fn setup_logging() -> Result<(), Report> { if std::env::var("RUST_LIB_BACKTRACE").is_err() { - std::env::set_var("RUST_LIB_BACKTRACE", "1"); + unsafe { + std::env::set_var("RUST_LIB_BACKTRACE", "1"); + } } color_eyre::install()?; if std::env::var("RUST_LOG").is_err() { - std::env::set_var("RUST_LOG", "debug"); + unsafe { + std::env::set_var("RUST_LOG", "debug"); + } } env_logger::init(); -- 2.44.1 From cd614c0956fac71170e7ce4b3046da2f98215d87 Mon Sep 17 00:00:00 2001 From: Sam Willcocks Date: Thu, 19 Jun 2025 10:02:07 +0100 Subject: [PATCH 07/10] feature gate std stuff --- atem-connection-rs/Cargo.toml | 4 ++-- atem-connection-rs/src/atem.rs | 2 ++ atem-connection-rs/src/atem_lib/atem_socket.rs | 5 ++++- atem-connection-rs/src/atem_lib/mod.rs | 1 + atem-connection-rs/src/commands/command_base.rs | 5 ++++- .../src/commands/device_profile/product_identifier.rs | 7 ++++++- .../src/commands/device_profile/topology.rs | 2 +- .../src/commands/mix_effects/program_input.rs | 2 +- atem-connection-rs/src/commands/parse_commands.rs | 2 +- atem-connection-rs/src/lib.rs | 8 ++++++++ atem-connection-rs/src/state/atem_macro.rs | 2 ++ atem-connection-rs/src/state/fairlight.rs | 2 +- atem-connection-rs/src/state/info.rs | 2 ++ atem-connection-rs/src/state/input.rs | 2 ++ atem-connection-rs/src/state/media.rs | 2 ++ atem-connection-rs/src/state/recording.rs | 1 + atem-connection-rs/src/state/settings.rs | 1 + atem-connection-rs/src/state/streaming.rs | 1 + atem-connection-rs/src/state/util.rs | 1 + atem-connection-rs/src/state/video/mod.rs | 1 + atem-connection-rs/src/tally.rs | 1 + 21 files changed, 45 insertions(+), 9 deletions(-) diff --git a/atem-connection-rs/Cargo.toml b/atem-connection-rs/Cargo.toml index 732362e..1e08a94 100644 --- a/atem-connection-rs/Cargo.toml +++ b/atem-connection-rs/Cargo.toml @@ -6,7 +6,7 @@ edition = "2024" [dependencies] derive-getters = "0.2.0" derive-new = "0.6.0" -itertools = {version = "0.14.0"} +itertools = {version = "0.14.0", default-features = false} log = "0.4.14" tokio = { version = "1.13.0", features = ["full"], optional = true } tokio-util = { version = "0.7.10", optional = true } @@ -15,4 +15,4 @@ enumflags2 = { version = "0.7.12", default-features = false } [features] default = ["std"] -std = ["dep:tokio", "dep:tokio-util"] +std = ["dep:tokio", "dep:tokio-util", "itertools/use_std"] diff --git a/atem-connection-rs/src/atem.rs b/atem-connection-rs/src/atem.rs index bbfd9a0..f588212 100644 --- a/atem-connection-rs/src/atem.rs +++ b/atem-connection-rs/src/atem.rs @@ -1,9 +1,11 @@ use std::{ + boxed::Box, collections::{HashMap, VecDeque}, net::SocketAddr, ops::DerefMut, sync::Arc, time::Duration, + vec::Vec, }; use tokio::{select, sync::Semaphore}; diff --git a/atem-connection-rs/src/atem_lib/atem_socket.rs b/atem-connection-rs/src/atem_lib/atem_socket.rs index 89cffe3..f6dcd10 100644 --- a/atem-connection-rs/src/atem_lib/atem_socket.rs +++ b/atem-connection-rs/src/atem_lib/atem_socket.rs @@ -1,10 +1,13 @@ use std::{ - collections::VecDeque, + borrow::ToOwned, fmt::Display, io, net::SocketAddr, + string::{String, ToString}, sync::Arc, time::{Duration, Instant}, + vec, + vec::Vec, }; use itertools::Itertools; diff --git a/atem-connection-rs/src/atem_lib/mod.rs b/atem-connection-rs/src/atem_lib/mod.rs index 9e604ce..d0a6cd7 100644 --- a/atem-connection-rs/src/atem_lib/mod.rs +++ b/atem-connection-rs/src/atem_lib/mod.rs @@ -1,3 +1,4 @@ pub mod atem_field; pub mod atem_packet; +#[cfg(feature = "std")] pub mod atem_socket; diff --git a/atem-connection-rs/src/commands/command_base.rs b/atem-connection-rs/src/commands/command_base.rs index f754fc8..e55caef 100644 --- a/atem-connection-rs/src/commands/command_base.rs +++ b/atem-connection-rs/src/commands/command_base.rs @@ -1,4 +1,7 @@ -use std::{collections::HashMap, fmt::Debug, process::Command, sync::Arc}; +use std::{ + boxed::Box, collections::HashMap, fmt::Debug, process::Command, string::String, sync::Arc, + vec::Vec, +}; use crate::{enums::ProtocolVersion, state::AtemState}; diff --git a/atem-connection-rs/src/commands/device_profile/product_identifier.rs b/atem-connection-rs/src/commands/device_profile/product_identifier.rs index 6d79764..a51052e 100644 --- a/atem-connection-rs/src/commands/device_profile/product_identifier.rs +++ b/atem-connection-rs/src/commands/device_profile/product_identifier.rs @@ -1,4 +1,9 @@ -use std::{ffi::CString, sync::Arc}; +use std::{ + ffi::CString, + string::{String, ToString}, + sync::Arc, + vec, +}; use crate::{ commands::command_base::{CommandDeserializer, DeserializedCommand}, diff --git a/atem-connection-rs/src/commands/device_profile/topology.rs b/atem-connection-rs/src/commands/device_profile/topology.rs index 0f69cd1..4010610 100644 --- a/atem-connection-rs/src/commands/device_profile/topology.rs +++ b/atem-connection-rs/src/commands/device_profile/topology.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, vec}; use crate::{ commands::command_base::{CommandDeserializer, DeserializedCommand}, diff --git a/atem-connection-rs/src/commands/mix_effects/program_input.rs b/atem-connection-rs/src/commands/mix_effects/program_input.rs index 85da9ac..8a38883 100644 --- a/atem-connection-rs/src/commands/mix_effects/program_input.rs +++ b/atem-connection-rs/src/commands/mix_effects/program_input.rs @@ -1,5 +1,5 @@ -use std::sync::Arc; use derive_new::new; +use std::{sync::Arc, vec, vec::Vec}; use crate::{ commands::command_base::{ diff --git a/atem-connection-rs/src/commands/parse_commands.rs b/atem-connection-rs/src/commands/parse_commands.rs index 2f83e10..3c4eb21 100644 --- a/atem-connection-rs/src/commands/parse_commands.rs +++ b/atem-connection-rs/src/commands/parse_commands.rs @@ -1,4 +1,4 @@ -use std::{collections::VecDeque, sync::Arc}; +use std::{boxed::Box, collections::VecDeque, sync::Arc}; use crate::{ atem_lib::atem_packet::{AtemPacket, RawFields}, diff --git a/atem-connection-rs/src/lib.rs b/atem-connection-rs/src/lib.rs index 3b421b2..8304049 100644 --- a/atem-connection-rs/src/lib.rs +++ b/atem-connection-rs/src/lib.rs @@ -1,8 +1,16 @@ +#![no_std] +#[cfg(feature = "std")] +extern crate std; +#[cfg(feature = "std")] pub mod atem; pub mod atem_lib; +#[cfg(feature = "std")] pub mod commands; +#[cfg(feature = "std")] pub mod enums; +#[cfg(feature = "std")] pub mod state; +#[cfg(feature = "std")] pub mod tally; diff --git a/atem-connection-rs/src/state/atem_macro.rs b/atem-connection-rs/src/state/atem_macro.rs index 0b5dd01..0a3a5d3 100644 --- a/atem-connection-rs/src/state/atem_macro.rs +++ b/atem-connection-rs/src/state/atem_macro.rs @@ -1,3 +1,5 @@ +use std::{string::String, vec::Vec}; + use derive_getters::Getters; use derive_new::new; #[derive(Clone, PartialEq, Getters, new, Default)] diff --git a/atem-connection-rs/src/state/fairlight.rs b/atem-connection-rs/src/state/fairlight.rs index 981adf7..bb7a861 100644 --- a/atem-connection-rs/src/state/fairlight.rs +++ b/atem-connection-rs/src/state/fairlight.rs @@ -1,6 +1,6 @@ -use std::collections::HashMap; use derive_getters::Getters; use derive_new::new; +use std::{collections::HashMap, string::String, vec::Vec}; use crate::enums::{ ExternalPortType, FairlightAnalogInputLevel, FairlightAudioMixOption, FairlightAudioSourceType, diff --git a/atem-connection-rs/src/state/info.rs b/atem-connection-rs/src/state/info.rs index 79a37b6..5f2699b 100644 --- a/atem-connection-rs/src/state/info.rs +++ b/atem-connection-rs/src/state/info.rs @@ -1,3 +1,5 @@ +use std::{string::String, vec::Vec}; + use crate::enums::{Model, ProtocolVersion}; use derive_getters::Getters; use derive_new::new; diff --git a/atem-connection-rs/src/state/input.rs b/atem-connection-rs/src/state/input.rs index 9b312b2..ab4a013 100644 --- a/atem-connection-rs/src/state/input.rs +++ b/atem-connection-rs/src/state/input.rs @@ -1,6 +1,8 @@ use crate::enums::{ExternalPortType, InternalPortType, MeAvailability, SourceAvailability}; use derive_getters::Getters; use derive_new::new; +use std::string::String; +use std::vec::Vec; #[derive(Clone, PartialEq, Getters, new)] pub struct InputChannel { diff --git a/atem-connection-rs/src/state/media.rs b/atem-connection-rs/src/state/media.rs index 7d10789..6ddce79 100644 --- a/atem-connection-rs/src/state/media.rs +++ b/atem-connection-rs/src/state/media.rs @@ -1,6 +1,8 @@ use crate::enums; use derive_getters::Getters; use derive_new::new; +use std::string::String; +use std::vec::Vec; #[derive(Clone, PartialEq, Getters, new)] pub struct MediaPlayer { diff --git a/atem-connection-rs/src/state/recording.rs b/atem-connection-rs/src/state/recording.rs index 58b62e4..1320f7f 100644 --- a/atem-connection-rs/src/state/recording.rs +++ b/atem-connection-rs/src/state/recording.rs @@ -1,6 +1,7 @@ use derive_getters::Getters; use derive_new::new; use std::collections::HashMap; +use std::string::String; use crate::enums::{RecordingDiskStatus, RecordingError, RecordingStatus}; diff --git a/atem-connection-rs/src/state/settings.rs b/atem-connection-rs/src/state/settings.rs index 7f965ba..01548fb 100644 --- a/atem-connection-rs/src/state/settings.rs +++ b/atem-connection-rs/src/state/settings.rs @@ -1,6 +1,7 @@ use crate::enums::{MultiViewerLayout, VideoMode}; use derive_getters::Getters; use derive_new::new; +use std::vec::Vec; pub trait MultiViewerSourceState { fn get_source(&self) -> u64; diff --git a/atem-connection-rs/src/state/streaming.rs b/atem-connection-rs/src/state/streaming.rs index aa04def..0ddb578 100644 --- a/atem-connection-rs/src/state/streaming.rs +++ b/atem-connection-rs/src/state/streaming.rs @@ -1,6 +1,7 @@ use crate::enums::{StreamingError, StreamingStatus}; use derive_getters::Getters; use derive_new::new; +use std::string::String; use super::common::Timecode; diff --git a/atem-connection-rs/src/state/util.rs b/atem-connection-rs/src/state/util.rs index de8dacf..c8f4654 100644 --- a/atem-connection-rs/src/state/util.rs +++ b/atem-connection-rs/src/state/util.rs @@ -1,4 +1,5 @@ use crate::enums::{TransitionSelection, TransitionStyle}; +use std::vec; use super::{ AtemState, diff --git a/atem-connection-rs/src/state/video/mod.rs b/atem-connection-rs/src/state/video/mod.rs index 4b3c396..c1a4ae5 100644 --- a/atem-connection-rs/src/state/video/mod.rs +++ b/atem-connection-rs/src/state/video/mod.rs @@ -1,6 +1,7 @@ use crate::enums; use derive_getters::Getters; use derive_new::new; +use std::vec::Vec; mod downstream_keyers; mod super_source; diff --git a/atem-connection-rs/src/tally.rs b/atem-connection-rs/src/tally.rs index 4b25bb0..c89512f 100644 --- a/atem-connection-rs/src/tally.rs +++ b/atem-connection-rs/src/tally.rs @@ -1,3 +1,4 @@ +use std::string::String; #[derive(Debug)] pub struct TallyEvent { tally_state: TallyState, -- 2.44.1 From c9b3879b6cb74d7d828f955939cb8cadca4c613a Mon Sep 17 00:00:00 2001 From: Sam Willcocks Date: Fri, 20 Jun 2025 13:49:55 +0100 Subject: [PATCH 08/10] add AtemPacket setters, init function --- .../src/atem_lib/atem_packet.rs | 156 ++++++++++++++++-- .../src/atem_lib/atem_socket.rs | 40 ++--- 2 files changed, 164 insertions(+), 32 deletions(-) diff --git a/atem-connection-rs/src/atem_lib/atem_packet.rs b/atem-connection-rs/src/atem_lib/atem_packet.rs index d597c0b..c81be4e 100755 --- a/atem-connection-rs/src/atem_lib/atem_packet.rs +++ b/atem-connection-rs/src/atem_lib/atem_packet.rs @@ -1,3 +1,4 @@ +//! This module contains [`AtemPacket`] which is a zero*ish* copy abstraction over a [`AsRef`]`<[u8]>`. use core::{fmt::Display, str}; use enumflags2::{BitFlags, bitflags}; @@ -9,20 +10,40 @@ pub const COMMAND_CONNECT_HELLO: [u8; 20] = [ 0x00, 0x00, 0x00, 0x00, ]; +/// Maximum atem packet length. This is determined by the maximum value that can be stored +/// in the length field. +pub const MAX_LEN: usize = field::LEN_MASK as usize; + +mod field { + use core::ops::{Range, RangeInclusive}; + + pub(crate) const FLAGS_LEN_H: usize = 0; + pub(crate) const FLAGS_MASK: u8 = 0b1111_1000; + pub(crate) const LEN_L: usize = 1; + pub(crate) const LEN: RangeInclusive = FLAGS_LEN_H..=LEN_L; + pub(crate) const LEN_MASK: u16 = 0x7ff; + + pub(crate) const SESSION_ID: Range = 2..4; + pub(crate) const ACK_NUMBER: Range = 4..6; + pub(crate) const REMOTE_SEQ_NUM: Range = 8..10; + pub(crate) const LOCAL_SEQ_NUM: Range = 10..12; +} + +/// An ATEM protocol packet #[derive(Debug)] pub struct AtemPacket> { buf: T, } impl<'a> TryFrom<&'a [u8]> for AtemPacket<&'a [u8]> { - type Error = AtemPacketErr; + type Error = AtemPacketParseError; fn try_from(buf: &'a [u8]) -> Result { AtemPacket::new_checked(buf) } } #[derive(Debug)] -pub enum AtemPacketErr { +pub enum AtemPacketParseError { /// The packet was too short TooShort { got: usize, @@ -35,28 +56,116 @@ pub enum AtemPacketErr { InvalidFlags, } +/// ATEM packet flags #[bitflags] #[repr(u8)] #[derive(PartialEq, Copy, Clone, Debug)] pub enum PacketFlag { + /// AKA "reliable". This packet should be ACKed by the other end AckRequest = 0x1, + /// AKA "syn" - used in the connection handshake NewSessionId = 0x2, + /// This packet is a retransmit of a previous one IsRetransmit = 0x4, + /// Request retransmission of a previous sequence ID RetransmitRequest = 0x8, + /// This packet is an ACK AckReply = 0x10, } +/// An error while constructing a packet +#[derive(Debug)] +pub enum AtemPacketInitError { + /// The provided buf was too small + BufTooSmall { need: usize, got: usize }, + /// The provided payload was too long + DataTooLong { got: usize, max: usize }, +} + +impl<'a> AtemPacket<&'a mut [u8]> { + /// Initialise an [`AtemPacket`] into the given `buf`, with optional payload copied from `data`. + /// Returns the constructed packet, and the remaining spare space in `buf`, if there was any. + pub fn init<'b>( + buf: &'a mut [u8], + flags: BitFlags, + session_id: u16, + local_seq_num: u16, + data: Option<&'b [u8]>, + ) -> Result<(Self, &'a mut [u8]), AtemPacketInitError> { + let len = 12 + data.map_or(0, |d| d.len()); + if len > buf.len() { + return Err(AtemPacketInitError::BufTooSmall { + need: len, + got: buf.len(), + }); + } + if len > MAX_LEN { + return Err(AtemPacketInitError::DataTooLong { + got: len - 12, + max: MAX_LEN - 12, + }); + } + let (pkt, rem) = buf.split_at_mut(len); + let mut p = AtemPacket { buf: pkt }; + p.set_flags(flags) + .set_session_id(session_id) + .set_local_seq_num(local_seq_num) + .set_len(len.try_into().unwrap()); + if let Some(d) = data { + p.set_data(d); + } + Ok((p, rem)) + } +} + +impl + AsMut<[u8]>> AtemPacket { + pub fn set_flags(&mut self, flags: BitFlags) -> &mut Self { + let prev = self.buf.as_ref()[field::FLAGS_LEN_H]; + self.buf.as_mut()[field::FLAGS_LEN_H] = (prev & !field::FLAGS_MASK) | (flags.bits() << 3); + self + } + + pub fn set_len(&mut self, value: u16) -> &mut Self { + let v = value.to_be_bytes(); + self.buf.as_mut()[field::FLAGS_LEN_H] &= v[0] | field::FLAGS_MASK; + self.buf.as_mut()[field::LEN_L] = v[1]; + self + } + + pub fn set_session_id(&mut self, value: u16) -> &mut Self { + self.buf.as_mut()[field::SESSION_ID].copy_from_slice(&value.to_be_bytes()); + self + } + pub fn set_local_seq_num(&mut self, value: u16) -> &mut Self { + self.buf.as_mut()[field::LOCAL_SEQ_NUM].copy_from_slice(&value.to_be_bytes()); + self + } + pub fn set_remote_seq_num(&mut self, value: u16) -> &mut Self { + self.buf.as_mut()[field::REMOTE_SEQ_NUM].copy_from_slice(&value.to_be_bytes()); + self + } + pub fn set_data<'a>(&mut self, value: &'a [u8]) -> &mut Self { + self.buf.as_mut()[12..].copy_from_slice(value); + self + } + + pub fn set_ack_num(&mut self, value: u16) -> &mut Self { + self.buf.as_mut()[field::ACK_NUMBER].copy_from_slice(&value.to_be_bytes()); + self + } +} + impl> AtemPacket { - pub fn new_checked(buf: T) -> Result { + pub fn new_checked(buf: T) -> Result { let len = buf.as_ref().len(); if len < 12 { - return Err(AtemPacketErr::TooShort { + return Err(AtemPacketParseError::TooShort { got: buf.as_ref().len(), }); } let p = Self { buf }; if p.length() as usize != len { - return Err(AtemPacketErr::LengthDiffers { + return Err(AtemPacketParseError::LengthDiffers { expected: p.length(), got: len, }); @@ -65,15 +174,20 @@ impl> AtemPacket { let _: BitFlags = p .flags_raw() .try_into() - .map_err(|_| AtemPacketErr::InvalidFlags)?; + .map_err(|_| AtemPacketParseError::InvalidFlags)?; Ok(p) } + + /// Consumes self, returning the inner packet buffer + pub fn inner(self) -> T { + self.buf + } pub fn length(&self) -> u16 { - u16::from_be_bytes(self.buf.as_ref()[0..=1].try_into().unwrap()) & 0x07ff + u16::from_be_bytes(self.buf.as_ref()[field::LEN].try_into().unwrap()) & field::LEN_MASK } fn flags_raw(&self) -> u8 { - self.buf.as_ref()[0] >> 3 + self.buf.as_ref()[field::FLAGS_LEN_H] >> 3 } pub fn flags(&self) -> BitFlags { @@ -82,19 +196,19 @@ impl> AtemPacket { } pub fn session_id(&self) -> u16 { - u16::from_be_bytes(self.buf.as_ref()[2..=3].try_into().unwrap()) + u16::from_be_bytes(self.buf.as_ref()[field::SESSION_ID].try_into().unwrap()) } pub fn ack_number(&self) -> u16 { - u16::from_be_bytes(self.buf.as_ref()[4..=5].try_into().unwrap()) + u16::from_be_bytes(self.buf.as_ref()[field::ACK_NUMBER].try_into().unwrap()) } pub fn remote_sequence_number(&self) -> u16 { - u16::from_be_bytes(self.buf.as_ref()[9..=10].try_into().unwrap()) + u16::from_be_bytes(self.buf.as_ref()[field::REMOTE_SEQ_NUM].try_into().unwrap()) } pub fn local_sequence_number(&self) -> u16 { - u16::from_be_bytes(self.buf.as_ref()[10..=11].try_into().unwrap()) + u16::from_be_bytes(self.buf.as_ref()[field::LOCAL_SEQ_NUM].try_into().unwrap()) } pub fn retransmit_request(&self) -> Option { @@ -183,3 +297,21 @@ impl<'a> Iterator for RawFields<'a> { Some(RawField { r#type, data }) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_init() { + let mut buf = [0u8; 12]; + let (mut p, _) = + AtemPacket::init(&mut buf, PacketFlag::AckReply.into(), 1234, 5678, None).unwrap(); + p.set_remote_seq_num(9012); + assert!(p.flags().contains(PacketFlag::AckReply)); + assert_eq!(p.session_id(), 1234); + assert_eq!(p.local_sequence_number(), 5678); + assert_eq!(p.remote_sequence_number(), 9012); + assert_eq!(p.length(), 12); + } +} diff --git a/atem-connection-rs/src/atem_lib/atem_socket.rs b/atem-connection-rs/src/atem_lib/atem_socket.rs index f6dcd10..2739436 100644 --- a/atem-connection-rs/src/atem_lib/atem_socket.rs +++ b/atem-connection-rs/src/atem_lib/atem_socket.rs @@ -303,22 +303,18 @@ impl AtemSocket { self.next_send_packet_id = 0; } - let opcode = u16::from(PacketFlag::AckRequest as u8) << 11; - let mut buffer = vec![0; 20 + payload.len()]; - // Headers - buffer[0..2].copy_from_slice(&u16::to_be_bytes(opcode | (payload.len() as u16 + 20))); - buffer[2..4].copy_from_slice(&u16::to_be_bytes(self.session_id)); - buffer[10..12].copy_from_slice(&u16::to_be_bytes(packet_id)); + let (p, _) = AtemPacket::init( + &mut buffer, + PacketFlag::AckRequest.into(), + self.session_id, + packet_id, + Some([raw_name.as_bytes(), payload].concat().as_slice()), + ) + .unwrap(); - // Command - buffer[12..14].copy_from_slice(&u16::to_be_bytes(payload.len() as u16 + 8)); - buffer[16..20].copy_from_slice(raw_name.as_bytes()); - - // Body - buffer[20..20 + payload.len()].copy_from_slice(payload); - self.send_packet(&buffer).await; + self.send_packet(&p.inner()).await; self.in_flight.push(InFlightPacket { packet_id, @@ -401,7 +397,7 @@ impl AtemSocket { self.last_received_at = Instant::now(); self.session_id = atem_packet.session_id(); - // TODO: naming seems rather off here + // TODO: bit of a naming clash here let remote_packet_id = atem_packet.local_sequence_number(); if atem_packet.flags().contains(PacketFlag::NewSessionId) { @@ -483,13 +479,17 @@ impl AtemSocket { async fn send_ack(&mut self, packet_id: u16) { log::debug!("Sending ack for packet {:x?}", packet_id); - let flag: u8 = PacketFlag::AckReply as u8; - let opcode = u16::from(flag) << 11; let mut buffer: [u8; ACK_PACKET_LENGTH as _] = [0; 12]; - buffer[0..2].copy_from_slice(&u16::to_be_bytes(opcode | ACK_PACKET_LENGTH)); - buffer[2..4].copy_from_slice(&u16::to_be_bytes(self.session_id)); - buffer[4..6].copy_from_slice(&u16::to_be_bytes(packet_id)); - self.send_packet(&buffer).await; + let (mut p, _) = AtemPacket::init( + &mut buffer, + PacketFlag::AckReply.into(), + self.session_id, + 0, + None, + ) + .unwrap(); + p.set_ack_num(packet_id); + self.send_packet(p.inner()).await; } async fn retransmit_from(&mut self, from_id: u16) { -- 2.44.1 From 5fb847aec614fd86590ad717dc3e7c00406494ac Mon Sep 17 00:00:00 2001 From: Sam Willcocks Date: Fri, 20 Jun 2025 13:51:07 +0100 Subject: [PATCH 09/10] cargo fix --- atem-connection-rs/src/atem_lib/atem_socket.rs | 7 ++----- atem-connection-rs/src/commands/command_base.rs | 2 +- atem-connection-rs/src/commands/parse_commands.rs | 2 +- atem-test/src/main.rs | 2 +- 4 files changed, 5 insertions(+), 8 deletions(-) diff --git a/atem-connection-rs/src/atem_lib/atem_socket.rs b/atem-connection-rs/src/atem_lib/atem_socket.rs index 2739436..59754a0 100644 --- a/atem-connection-rs/src/atem_lib/atem_socket.rs +++ b/atem-connection-rs/src/atem_lib/atem_socket.rs @@ -18,11 +18,8 @@ use tokio::{ }; use crate::{ - atem_lib::atem_packet::{self, AtemPacket, COMMAND_CONNECT_HELLO}, - commands::{ - command_base::{BasicWritableCommand, DeserializedCommand}, - parse_commands::deserialize_commands, - }, + atem_lib::atem_packet::{AtemPacket, COMMAND_CONNECT_HELLO}, + commands::command_base::{BasicWritableCommand, DeserializedCommand}, enums::ProtocolVersion, }; diff --git a/atem-connection-rs/src/commands/command_base.rs b/atem-connection-rs/src/commands/command_base.rs index e55caef..f9321b9 100644 --- a/atem-connection-rs/src/commands/command_base.rs +++ b/atem-connection-rs/src/commands/command_base.rs @@ -1,5 +1,5 @@ use std::{ - boxed::Box, collections::HashMap, fmt::Debug, process::Command, string::String, sync::Arc, + boxed::Box, collections::HashMap, fmt::Debug, string::String, sync::Arc, vec::Vec, }; diff --git a/atem-connection-rs/src/commands/parse_commands.rs b/atem-connection-rs/src/commands/parse_commands.rs index 3c4eb21..c8458f2 100644 --- a/atem-connection-rs/src/commands/parse_commands.rs +++ b/atem-connection-rs/src/commands/parse_commands.rs @@ -1,7 +1,7 @@ use std::{boxed::Box, collections::VecDeque, sync::Arc}; use crate::{ - atem_lib::atem_packet::{AtemPacket, RawFields}, + atem_lib::atem_packet::RawFields, commands::device_profile::version::{DESERIALIZE_VERSION_RAW_NAME, deserialize_version}, enums::ProtocolVersion, }; diff --git a/atem-test/src/main.rs b/atem-test/src/main.rs index 72e0911..2964e10 100644 --- a/atem-test/src/main.rs +++ b/atem-test/src/main.rs @@ -36,7 +36,7 @@ async fn main() { let (atem_event_tx, atem_event_rx) = tokio::sync::mpsc::unbounded_channel(); let cancel = CancellationToken::new(); - let mut atem_socket = AtemSocket::new(socket_message_rx, atem_event_tx); + let atem_socket = AtemSocket::new(socket_message_rx, atem_event_tx); let atem = Arc::new(Atem::new(atem_socket, socket_message_tx)); let atem_thread = atem.clone(); -- 2.44.1 From 8be762b0ef18c12494a332fc815da414a026c507 Mon Sep 17 00:00:00 2001 From: Sam Willcocks Date: Sat, 21 Jun 2025 15:58:19 +0100 Subject: [PATCH 10/10] Add lowerlevel socket implementation --- Cargo.lock | 335 +++++++++++++++++- atem-connection-rs/Cargo.toml | 6 +- .../src/atem_lib/atem_packet.rs | 15 +- .../src/atem_lib/atem_socket_two.rs | 285 +++++++++++++++ atem-connection-rs/src/atem_lib/mod.rs | 1 + .../src/commands/command_base.rs | 5 +- atem-test/Cargo.toml | 1 + atem-test/src/{ => bin}/main.rs | 0 atem-test/src/bin/socket2.rs | 48 +++ 9 files changed, 671 insertions(+), 25 deletions(-) create mode 100644 atem-connection-rs/src/atem_lib/atem_socket_two.rs rename atem-test/src/{ => bin}/main.rs (100%) create mode 100644 atem-test/src/bin/socket2.rs diff --git a/Cargo.lock b/Cargo.lock index 2273f83..9a8714f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -78,8 +78,10 @@ dependencies = [ name = "atem-connection-rs" version = "0.1.0" dependencies = [ + "defmt", "derive-getters", "derive-new", + "embassy-net", "enumflags2", "itertools", "log", @@ -138,6 +140,12 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.5.0" @@ -187,7 +195,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.104", ] [[package]] @@ -229,6 +237,44 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "critical-section" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "790eea4361631c5e7d22598ecd5723ff611904e3344ce8720784c93e3d83d40b" + +[[package]] +name = "defmt" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "548d977b6da32fa1d1fda2876453da1e7df63ad0304c8b3dae4dbe7b96f39b78" +dependencies = [ + "bitflags", + "defmt-macros", +] + +[[package]] +name = "defmt-macros" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d4fc12a85bcf441cfe44344c4b72d58493178ce635338a3f3b78943aceb258e" +dependencies = [ + "defmt-parser", + "proc-macro-error2", + "proc-macro2", + "quote", + "syn 2.0.104", +] + +[[package]] +name = "defmt-parser" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10d60334b3b2e7c9d91ef8150abfb6fa4c1c39ebbcf4a81c2e346aad939fee3e" +dependencies = [ + "thiserror", +] + [[package]] name = "derive-getters" version = "0.2.0" @@ -248,7 +294,16 @@ checksum = "d150dea618e920167e5973d70ae6ece4385b7164e0d799fe7c122dd0a5d912ad" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.104", +] + +[[package]] +name = "document-features" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95249b50c6c185bee49034bcb378a49dc2b5dff0be90ff6616d31d64febab05d" +dependencies = [ + "litrs", ] [[package]] @@ -257,6 +312,127 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "embassy-net" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "940c4b9fe5c1375b09a0c6722c0100d6b2ed46a717a34f632f26e8d7327c4383" +dependencies = [ + "document-features", + "embassy-net-driver", + "embassy-sync", + "embassy-time", + "embedded-io-async", + "embedded-nal-async", + "heapless", + "managed", + "smoltcp", +] + +[[package]] +name = "embassy-net-driver" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524eb3c489760508f71360112bca70f6e53173e6fe48fc5f0efd0f5ab217751d" + +[[package]] +name = "embassy-sync" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d2c8cdff05a7a51ba0087489ea44b0b1d97a296ca6b1d6d1a33ea7423d34049" +dependencies = [ + "cfg-if", + "critical-section", + "embedded-io-async", + "futures-sink", + "futures-util", + "heapless", +] + +[[package]] +name = "embassy-time" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f820157f198ada183ad62e0a66f554c610cdcd1a9f27d4b316358103ced7a1f8" +dependencies = [ + "cfg-if", + "critical-section", + "document-features", + "embassy-time-driver", + "embedded-hal 0.2.7", + "embedded-hal 1.0.0", + "embedded-hal-async", + "futures-util", +] + +[[package]] +name = "embassy-time-driver" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d45f5d833b6d98bd2aab0c2de70b18bfaa10faf661a1578fd8e5dfb15eb7eba" +dependencies = [ + "document-features", +] + +[[package]] +name = "embedded-hal" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35949884794ad573cf46071e41c9b60efb0cb311e3ca01f7af807af1debc66ff" +dependencies = [ + "nb 0.1.3", + "void", +] + +[[package]] +name = "embedded-hal" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "361a90feb7004eca4019fb28352a9465666b24f840f5c3cddf0ff13920590b89" + +[[package]] +name = "embedded-hal-async" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c4c685bbef7fe13c3c6dd4da26841ed3980ef33e841cddfa15ce8a8fb3f1884" +dependencies = [ + "embedded-hal 1.0.0", +] + +[[package]] +name = "embedded-io" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edd0f118536f44f5ccd48bcb8b111bdc3de888b58c74639dfb034a357d0f206d" + +[[package]] +name = "embedded-io-async" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ff09972d4073aa8c299395be75161d582e7629cd663171d62af73c8d50dba3f" +dependencies = [ + "embedded-io", +] + +[[package]] +name = "embedded-nal" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c56a28be191a992f28f178ec338a0bf02f63d7803244add736d026a471e6ed77" +dependencies = [ + "nb 1.1.0", +] + +[[package]] +name = "embedded-nal-async" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76959917cd2b86f40a98c28dd5624eddd1fa69d746241c8257eac428d83cb211" +dependencies = [ + "embedded-io-async", + "embedded-nal", +] + [[package]] name = "enumflags2" version = "0.7.12" @@ -274,7 +450,7 @@ checksum = "67c78a4d8fdf9953a5c9d458f9efe940fd97a0cab0941c075a813ac594733827" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.104", ] [[package]] @@ -302,9 +478,9 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" [[package]] name = "futures-sink" @@ -312,12 +488,49 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" +[[package]] +name = "futures-task" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-util" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +dependencies = [ + "futures-core", + "futures-task", + "pin-project-lite", + "pin-utils", +] + [[package]] name = "gimli" version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0a01e0497841a3b2db4f8afa483cce65f7e96a3498bd6c541734792aeac8fe7" +[[package]] +name = "hash32" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47d60b12902ba28e2730cd37e95b8c9223af2808df9e902d4df49588d1470606" +dependencies = [ + "byteorder", +] + +[[package]] +name = "heapless" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bfb9eb618601c89945a70e254898da93b13be0388091d42117462b265bb3fad" +dependencies = [ + "hash32", + "stable_deref_trait", +] + [[package]] name = "heck" version = "0.4.1" @@ -372,6 +585,12 @@ version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" +[[package]] +name = "litrs" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4ce301924b7887e9d637144fdade93f9dfff9b60981d4ac161db09720d39aa5" + [[package]] name = "lock_api" version = "0.4.6" @@ -390,6 +609,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "managed" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ca88d725a0a943b096803bd34e73a4437208b6077654cc4ecb2947a5f91618d" + [[package]] name = "memchr" version = "2.4.0" @@ -417,6 +642,21 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "nb" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "801d31da0513b6ec5214e9bf433a77966320625a37860f910be265be6e18d06f" +dependencies = [ + "nb 1.1.0", +] + +[[package]] +name = "nb" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d5439c4ad607c3c23abf66de8c8bf57ba8adcd1f129e699851a6e43935d339d" + [[package]] name = "num_cpus" version = "1.16.0" @@ -478,10 +718,38 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" [[package]] -name = "proc-macro2" -version = "1.0.78" +name = "pin-utils" +version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "proc-macro-error-attr2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96de42df36bb9bba5542fe9f1a054b8cc87e172759a1868aa05c1f3acc89dfc5" +dependencies = [ + "proc-macro2", + "quote", +] + +[[package]] +name = "proc-macro-error2" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11ec05c52be0a07b08061f7dd003e7d7092e0472bc731b4af7bb1ef876109802" +dependencies = [ + "proc-macro-error-attr2", + "proc-macro2", + "quote", + "syn 2.0.104", +] + +[[package]] +name = "proc-macro2" +version = "1.0.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02b3e5e68a3a1a02aad3ec490a98007cbc13c37cbe84a3cd7b8e406d76e7f778" dependencies = [ "unicode-ident", ] @@ -557,6 +825,19 @@ version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7" +[[package]] +name = "smoltcp" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dad095989c1533c1c266d9b1e8d70a1329dd3723c3edac6d03bbd67e7bf6f4bb" +dependencies = [ + "bitflags", + "byteorder", + "cfg-if", + "heapless", + "managed", +] + [[package]] name = "socket2" version = "0.5.6" @@ -567,6 +848,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + [[package]] name = "strsim" version = "0.10.0" @@ -586,9 +873,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.48" +version = "2.0.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f3531638e407dfc0814761abb7c00a5b54992b849452a0646b7f65c9f770f3f" +checksum = "17b6f705963418cdb9927482fa304bc562ece2fdd4f616084c50b7023b435a40" dependencies = [ "proc-macro2", "quote", @@ -604,6 +891,26 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "thiserror" +version = "2.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "2.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "thread_local" version = "1.1.3" @@ -640,7 +947,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.104", ] [[package]] @@ -727,6 +1034,12 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +[[package]] +name = "void" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" diff --git a/atem-connection-rs/Cargo.toml b/atem-connection-rs/Cargo.toml index 1e08a94..76eca05 100644 --- a/atem-connection-rs/Cargo.toml +++ b/atem-connection-rs/Cargo.toml @@ -11,8 +11,12 @@ log = "0.4.14" tokio = { version = "1.13.0", features = ["full"], optional = true } tokio-util = { version = "0.7.10", optional = true } enumflags2 = { version = "0.7.12", default-features = false } +embassy-net = { version = "0.7.0", optional = true, features = ["proto-ipv4", "medium-ip", "udp"]} +defmt = {version = "1.0.0", optional = true} [features] -default = ["std"] +default = ["std", "embassy-net", "defmt"] std = ["dep:tokio", "dep:tokio-util", "itertools/use_std"] +embassy-net = ["dep:embassy-net"] +defmt = ["dep:defmt"] diff --git a/atem-connection-rs/src/atem_lib/atem_packet.rs b/atem-connection-rs/src/atem_lib/atem_packet.rs index c81be4e..69d2f7d 100755 --- a/atem-connection-rs/src/atem_lib/atem_packet.rs +++ b/atem-connection-rs/src/atem_lib/atem_packet.rs @@ -43,16 +43,13 @@ impl<'a> TryFrom<&'a [u8]> for AtemPacket<&'a [u8]> { } #[derive(Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] pub enum AtemPacketParseError { - /// The packet was too short - TooShort { - got: usize, - }, + /// The packet was shorter than any valid packet should be. + TooShort { got: usize }, /// The packet's stated and actual lengths were different - LengthDiffers { - expected: u16, - got: usize, - }, + LengthDiffers { expected: u16, got: usize }, + /// The packet had invalid (unknown) flags set InvalidFlags, } @@ -258,8 +255,8 @@ impl> Display for AtemPacket { self.session_id(), self.flags(), self.ack_number(), - self.remote_sequence_number(), self.local_sequence_number(), + self.remote_sequence_number(), ) } } diff --git a/atem-connection-rs/src/atem_lib/atem_socket_two.rs b/atem-connection-rs/src/atem_lib/atem_socket_two.rs new file mode 100644 index 0000000..a2903ff --- /dev/null +++ b/atem-connection-rs/src/atem_lib/atem_socket_two.rs @@ -0,0 +1,285 @@ +use core::net::{Ipv4Addr, SocketAddrV4}; + +use log::debug; + +use super::atem_packet::{AtemPacket, AtemPacketParseError, PacketFlag}; + +pub const ATEM_PORT: u16 = 9910; +pub const SYN_PAYLOAD: [u8; 8] = [0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]; + +#[derive(Debug, PartialEq, Eq)] +pub enum AtemSocketState { + Closed, + StateDump, + Established { session_id: u16 }, +} + +/// An abstraction over an async UDP socket +pub trait SocketBackend { + type SendError; + type RecvError; + async fn send_to(&mut self, data: &[u8], addr: SocketAddrV4) -> Result<(), Self::SendError>; + async fn recv(&mut self, data: &mut [u8]) -> Result; +} + +#[cfg(feature = "std")] +pub mod tokio { + use core::net::Ipv4Addr; + + use tokio::net::UdpSocket; + + use super::{AtemSocket, SocketBackend}; + + #[derive(Debug)] + pub struct TokioBackend { + sock: UdpSocket, + } + + impl TokioBackend { + pub async fn new(addr: Ipv4Addr) -> Result, std::io::Error> { + let b = Self { + sock: UdpSocket::bind("0.0.0.0:0").await?, + }; + Ok(AtemSocket::new_with_backend(addr, b)) + } + } + + impl SocketBackend for TokioBackend { + type SendError = std::io::Error; + type RecvError = std::io::Error; + + async fn send_to( + &mut self, + data: &[u8], + addr: core::net::SocketAddrV4, + ) -> Result<(), Self::SendError> { + self.sock.send_to(data, addr).await?; + Ok(()) + } + + async fn recv(&mut self, data: &mut [u8]) -> Result { + let (cnt, _) = self.sock.recv_from(data).await?; + Ok(cnt) + } + } +} + +#[cfg(feature = "embassy-net")] +pub mod enet { + use core::net::{Ipv4Addr, SocketAddrV4}; + + use embassy_net::udp::{RecvError, SendError, UdpSocket}; + + use super::{AtemSocket, SocketBackend}; + + pub struct EnetBackend<'a> { + sock: UdpSocket<'a>, + } + + impl core::fmt::Debug for EnetBackend<'_> { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!(f, "EnetBackend") + } + } + + impl<'a> EnetBackend<'a> { + pub fn new(addr: Ipv4Addr, sock: UdpSocket<'a>) -> AtemSocket { + let b = Self { sock }; + AtemSocket::new_with_backend(addr, b) + } + } + + impl SocketBackend for EnetBackend<'_> { + type SendError = SendError; + type RecvError = RecvError; + + async fn send_to( + &mut self, + data: &[u8], + addr: SocketAddrV4, + ) -> Result<(), Self::SendError> { + self.sock.send_to(data, addr).await + } + + async fn recv(&mut self, data: &mut [u8]) -> Result { + let (count, _) = self.sock.recv_from(data).await?; + Ok(count) + } + } +} + +#[derive(Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum AtemError { + /// Tried to do something but the socket is closed + SocketClosed, + /// There was an error from the underlying socket while receiving + Recv(I::RecvError), + /// There was an error from the underlying socket while sending + Send(I::SendError), + /// We received an invalid ATEM protocol packet + BadPacket(AtemPacketParseError), + /// The switcher denied our connection attempt. + SwitcherDeniedConnect, + /// There was some nonspecific protocol error + ProtocolError, +} + +impl From for AtemError +where + I: SocketBackend, +{ + fn from(value: AtemPacketParseError) -> Self { + Self::BadPacket(value) + } +} + +pub struct AtemSocket { + inner: I, + addr: SocketAddrV4, + state: AtemSocketState, + local_seq_num: u16, +} + +impl AtemSocket +where + I: SocketBackend, +{ + fn new_with_backend(addr: Ipv4Addr, backend: I) -> Self { + Self { + inner: backend, + addr: SocketAddrV4::new(addr, ATEM_PORT), + state: AtemSocketState::Closed, + local_seq_num: 0, + } + } + + /// Connect to the ATEM, performing the connection handshake. + pub async fn connect(&mut self) -> Result<(), AtemError> { + // Reset state + self.state = AtemSocketState::Closed; + self.local_seq_num = 0; + let mut buf = [0u8; 20]; + // FIXME: this should be random + let initial_sid = 0x53ab; + + // Step 1: Send the "SYN" with a random session id and magic payload + + let (mut p, _) = AtemPacket::init( + &mut buf, + PacketFlag::NewSessionId.into(), + initial_sid, + self.local_seq_num, + Some(&SYN_PAYLOAD), + ) + .unwrap(); + p.set_remote_seq_num(0x3a); + + self.inner + .send_to(p.inner(), self.addr) + .await + .map_err(|e| AtemError::Send(e))?; + + debug!("Connect step 1: sent SYN"); + + // Step 2: Wait for the response from the ATEM. + // this should also have the SYN flag set and have the same session id that we sent. + // The first byte of the payload should be 0x02 - if it's 0x04 we should restart + // the connection + + self.inner + .recv(&mut buf) + .await + .map_err(|e| AtemError::Recv(e))?; + + let ack = AtemPacket::new_checked(buf)?; + debug!("RX: {}", ack); + if ack.flags() != PacketFlag::NewSessionId { + return Err(AtemError::ProtocolError); + } + if ack.session_id() != initial_sid { + return Err(AtemError::ProtocolError); + } + match ack.body()[0] { + 0x02 => { + // all good + debug!("Connect step 2: got reply"); + } + 0x04 => { + // Restart connection + return Err(AtemError::SwitcherDeniedConnect); + } + _ => { + return Err(AtemError::ProtocolError); + } + } + + // Step 3: Send an ACK for the packet we just received. + self.ack(initial_sid, ack.local_sequence_number()).await?; + debug!("Connect step 3: sent ACK"); + self.state = AtemSocketState::StateDump; + + Ok(()) + } + + async fn ack(&mut self, sid: u16, ack_num: u16) -> Result<(), AtemError> { + let mut buf = [0u8; 12]; + let (mut p, _) = AtemPacket::init( + &mut buf, + PacketFlag::AckReply.into(), + sid, + // N.B: local seq num is set to 0 in ACKs + 0, + None, + ) + .unwrap(); + p.set_ack_num(ack_num); + debug!("TX: {}", p); + self.inner + .send_to(p.inner(), self.addr) + .await + .map_err(|e| AtemError::Send(e))?; + Ok(()) + } + + /// Receive a packet from the ATEM. Automatically handles sending ACKs for received packets. + pub async fn recv<'a>( + &mut self, + buf: &'a mut [u8], + ) -> Result, AtemError> { + if self.state == AtemSocketState::Closed { + return Err(AtemError::SocketClosed); + } + let size = self.inner.recv(buf).await.map_err(|e| AtemError::Recv(e))?; + let p: AtemPacket<_> = buf[0..size].try_into()?; + debug!("RX: {}", p); + match self.state { + AtemSocketState::StateDump => { + // We've just connected and the state's being dumped to us. + if p.body().len() == 0 { + // State dump is done. + self.state = AtemSocketState::Established { + session_id: p.session_id(), + }; + debug!("ATEM state dump done, session ID is {}", p.session_id()); + self.ack(p.session_id(), p.local_sequence_number()).await?; + } + } + AtemSocketState::Established { session_id } => { + if p.flags().contains(PacketFlag::AckRequest) { + self.ack(session_id, p.local_sequence_number()).await?; + } + } + AtemSocketState::Closed => unreachable!(), + } + Ok(p) + } + + /// Send a packet to the ATEM. The ATEM will respond with an ACK - it's up to the caller to handle packet retransmission + pub async fn send>(&mut self, p: AtemPacket) -> Result<(), AtemError> { + self.inner + .send_to(p.inner().as_ref(), self.addr) + .await + .map_err(|e| AtemError::Send(e)) + } +} diff --git a/atem-connection-rs/src/atem_lib/mod.rs b/atem-connection-rs/src/atem_lib/mod.rs index d0a6cd7..a51aab8 100644 --- a/atem-connection-rs/src/atem_lib/mod.rs +++ b/atem-connection-rs/src/atem_lib/mod.rs @@ -2,3 +2,4 @@ pub mod atem_field; pub mod atem_packet; #[cfg(feature = "std")] pub mod atem_socket; +pub mod atem_socket_two; diff --git a/atem-connection-rs/src/commands/command_base.rs b/atem-connection-rs/src/commands/command_base.rs index f9321b9..d7374ee 100644 --- a/atem-connection-rs/src/commands/command_base.rs +++ b/atem-connection-rs/src/commands/command_base.rs @@ -1,7 +1,4 @@ -use std::{ - boxed::Box, collections::HashMap, fmt::Debug, string::String, sync::Arc, - vec::Vec, -}; +use std::{boxed::Box, collections::HashMap, fmt::Debug, string::String, sync::Arc, vec::Vec}; use crate::{enums::ProtocolVersion, state::AtemState}; diff --git a/atem-test/Cargo.toml b/atem-test/Cargo.toml index 028b623..31115c2 100644 --- a/atem-test/Cargo.toml +++ b/atem-test/Cargo.toml @@ -2,6 +2,7 @@ name = "atem-test" version = "0.1.0" edition = "2024" +default-run = "main" [dependencies] atem-connection-rs = { path = "../atem-connection-rs" } diff --git a/atem-test/src/main.rs b/atem-test/src/bin/main.rs similarity index 100% rename from atem-test/src/main.rs rename to atem-test/src/bin/main.rs diff --git a/atem-test/src/bin/socket2.rs b/atem-test/src/bin/socket2.rs new file mode 100644 index 0000000..4ac464a --- /dev/null +++ b/atem-test/src/bin/socket2.rs @@ -0,0 +1,48 @@ +use atem_connection_rs::atem_lib::{atem_field::PrgI, atem_socket_two::tokio::TokioBackend}; +use clap::Parser; +use color_eyre::Report; + +/// ATEM Rust Library Test App +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + /// IP of the ATEM to connect to + #[arg(short, long)] + ip: String, +} + +#[tokio::main] +async fn main() { + let args = Args::parse(); + + setup_logging().unwrap(); + + let mut s = TokioBackend::new(args.ip.parse().unwrap()).await.unwrap(); + s.connect().await.expect("aa"); + let mut buf = [0u8; 2048]; + loop { + let p = s.recv(&mut buf).await.unwrap(); + let prg: Vec = p.fields::().collect::>().unwrap(); + if !prg.is_empty() { + println!("{:?}", prg); + } + } +} + +fn setup_logging() -> Result<(), Report> { + if std::env::var("RUST_LIB_BACKTRACE").is_err() { + unsafe { + std::env::set_var("RUST_LIB_BACKTRACE", "1"); + } + } + color_eyre::install()?; + + if std::env::var("RUST_LOG").is_err() { + unsafe { + std::env::set_var("RUST_LOG", "debug"); + } + } + env_logger::init(); + + Ok(()) +} -- 2.44.1