From 8be762b0ef18c12494a332fc815da414a026c507 Mon Sep 17 00:00:00 2001 From: Sam Willcocks Date: Sat, 21 Jun 2025 15:58:19 +0100 Subject: [PATCH] Add lowerlevel socket implementation --- Cargo.lock | 335 +++++++++++++++++- atem-connection-rs/Cargo.toml | 6 +- .../src/atem_lib/atem_packet.rs | 15 +- .../src/atem_lib/atem_socket_two.rs | 285 +++++++++++++++ atem-connection-rs/src/atem_lib/mod.rs | 1 + .../src/commands/command_base.rs | 5 +- atem-test/Cargo.toml | 1 + atem-test/src/{ => bin}/main.rs | 0 atem-test/src/bin/socket2.rs | 48 +++ 9 files changed, 671 insertions(+), 25 deletions(-) create mode 100644 atem-connection-rs/src/atem_lib/atem_socket_two.rs rename atem-test/src/{ => bin}/main.rs (100%) create mode 100644 atem-test/src/bin/socket2.rs diff --git a/Cargo.lock b/Cargo.lock index 2273f83..9a8714f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -78,8 +78,10 @@ dependencies = [ name = "atem-connection-rs" version = "0.1.0" dependencies = [ + "defmt", "derive-getters", "derive-new", + "embassy-net", "enumflags2", "itertools", "log", @@ -138,6 +140,12 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.5.0" @@ -187,7 +195,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.104", ] [[package]] @@ -229,6 +237,44 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "critical-section" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "790eea4361631c5e7d22598ecd5723ff611904e3344ce8720784c93e3d83d40b" + +[[package]] +name = "defmt" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "548d977b6da32fa1d1fda2876453da1e7df63ad0304c8b3dae4dbe7b96f39b78" +dependencies = [ + "bitflags", + "defmt-macros", +] + +[[package]] +name = "defmt-macros" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d4fc12a85bcf441cfe44344c4b72d58493178ce635338a3f3b78943aceb258e" +dependencies = [ + "defmt-parser", + "proc-macro-error2", + "proc-macro2", + "quote", + "syn 2.0.104", +] + +[[package]] +name = "defmt-parser" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10d60334b3b2e7c9d91ef8150abfb6fa4c1c39ebbcf4a81c2e346aad939fee3e" +dependencies = [ + "thiserror", +] + [[package]] name = "derive-getters" version = "0.2.0" @@ -248,7 +294,16 @@ checksum = "d150dea618e920167e5973d70ae6ece4385b7164e0d799fe7c122dd0a5d912ad" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.104", +] + +[[package]] +name = "document-features" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95249b50c6c185bee49034bcb378a49dc2b5dff0be90ff6616d31d64febab05d" +dependencies = [ + "litrs", ] [[package]] @@ -257,6 +312,127 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "embassy-net" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "940c4b9fe5c1375b09a0c6722c0100d6b2ed46a717a34f632f26e8d7327c4383" +dependencies = [ + "document-features", + "embassy-net-driver", + "embassy-sync", + "embassy-time", + "embedded-io-async", + "embedded-nal-async", + "heapless", + "managed", + "smoltcp", +] + +[[package]] +name = "embassy-net-driver" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524eb3c489760508f71360112bca70f6e53173e6fe48fc5f0efd0f5ab217751d" + +[[package]] +name = "embassy-sync" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d2c8cdff05a7a51ba0087489ea44b0b1d97a296ca6b1d6d1a33ea7423d34049" +dependencies = [ + "cfg-if", + "critical-section", + "embedded-io-async", + "futures-sink", + "futures-util", + "heapless", +] + +[[package]] +name = "embassy-time" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f820157f198ada183ad62e0a66f554c610cdcd1a9f27d4b316358103ced7a1f8" +dependencies = [ + "cfg-if", + "critical-section", + "document-features", + "embassy-time-driver", + "embedded-hal 0.2.7", + "embedded-hal 1.0.0", + "embedded-hal-async", + "futures-util", +] + +[[package]] +name = "embassy-time-driver" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d45f5d833b6d98bd2aab0c2de70b18bfaa10faf661a1578fd8e5dfb15eb7eba" +dependencies = [ + "document-features", +] + +[[package]] +name = "embedded-hal" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35949884794ad573cf46071e41c9b60efb0cb311e3ca01f7af807af1debc66ff" +dependencies = [ + "nb 0.1.3", + "void", +] + +[[package]] +name = "embedded-hal" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "361a90feb7004eca4019fb28352a9465666b24f840f5c3cddf0ff13920590b89" + +[[package]] +name = "embedded-hal-async" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c4c685bbef7fe13c3c6dd4da26841ed3980ef33e841cddfa15ce8a8fb3f1884" +dependencies = [ + "embedded-hal 1.0.0", +] + +[[package]] +name = "embedded-io" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edd0f118536f44f5ccd48bcb8b111bdc3de888b58c74639dfb034a357d0f206d" + +[[package]] +name = "embedded-io-async" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ff09972d4073aa8c299395be75161d582e7629cd663171d62af73c8d50dba3f" +dependencies = [ + "embedded-io", +] + +[[package]] +name = "embedded-nal" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c56a28be191a992f28f178ec338a0bf02f63d7803244add736d026a471e6ed77" +dependencies = [ + "nb 1.1.0", +] + +[[package]] +name = "embedded-nal-async" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76959917cd2b86f40a98c28dd5624eddd1fa69d746241c8257eac428d83cb211" +dependencies = [ + "embedded-io-async", + "embedded-nal", +] + [[package]] name = "enumflags2" version = "0.7.12" @@ -274,7 +450,7 @@ checksum = "67c78a4d8fdf9953a5c9d458f9efe940fd97a0cab0941c075a813ac594733827" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.104", ] [[package]] @@ -302,9 +478,9 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" [[package]] name = "futures-sink" @@ -312,12 +488,49 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" +[[package]] +name = "futures-task" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-util" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +dependencies = [ + "futures-core", + "futures-task", + "pin-project-lite", + "pin-utils", +] + [[package]] name = "gimli" version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0a01e0497841a3b2db4f8afa483cce65f7e96a3498bd6c541734792aeac8fe7" +[[package]] +name = "hash32" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47d60b12902ba28e2730cd37e95b8c9223af2808df9e902d4df49588d1470606" +dependencies = [ + "byteorder", +] + +[[package]] +name = "heapless" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bfb9eb618601c89945a70e254898da93b13be0388091d42117462b265bb3fad" +dependencies = [ + "hash32", + "stable_deref_trait", +] + [[package]] name = "heck" version = "0.4.1" @@ -372,6 +585,12 @@ version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" +[[package]] +name = "litrs" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4ce301924b7887e9d637144fdade93f9dfff9b60981d4ac161db09720d39aa5" + [[package]] name = "lock_api" version = "0.4.6" @@ -390,6 +609,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "managed" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ca88d725a0a943b096803bd34e73a4437208b6077654cc4ecb2947a5f91618d" + [[package]] name = "memchr" version = "2.4.0" @@ -417,6 +642,21 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "nb" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "801d31da0513b6ec5214e9bf433a77966320625a37860f910be265be6e18d06f" +dependencies = [ + "nb 1.1.0", +] + +[[package]] +name = "nb" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d5439c4ad607c3c23abf66de8c8bf57ba8adcd1f129e699851a6e43935d339d" + [[package]] name = "num_cpus" version = "1.16.0" @@ -478,10 +718,38 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" [[package]] -name = "proc-macro2" -version = "1.0.78" +name = "pin-utils" +version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "proc-macro-error-attr2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96de42df36bb9bba5542fe9f1a054b8cc87e172759a1868aa05c1f3acc89dfc5" +dependencies = [ + "proc-macro2", + "quote", +] + +[[package]] +name = "proc-macro-error2" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11ec05c52be0a07b08061f7dd003e7d7092e0472bc731b4af7bb1ef876109802" +dependencies = [ + "proc-macro-error-attr2", + "proc-macro2", + "quote", + "syn 2.0.104", +] + +[[package]] +name = "proc-macro2" +version = "1.0.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02b3e5e68a3a1a02aad3ec490a98007cbc13c37cbe84a3cd7b8e406d76e7f778" dependencies = [ "unicode-ident", ] @@ -557,6 +825,19 @@ version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7" +[[package]] +name = "smoltcp" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dad095989c1533c1c266d9b1e8d70a1329dd3723c3edac6d03bbd67e7bf6f4bb" +dependencies = [ + "bitflags", + "byteorder", + "cfg-if", + "heapless", + "managed", +] + [[package]] name = "socket2" version = "0.5.6" @@ -567,6 +848,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + [[package]] name = "strsim" version = "0.10.0" @@ -586,9 +873,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.48" +version = "2.0.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f3531638e407dfc0814761abb7c00a5b54992b849452a0646b7f65c9f770f3f" +checksum = "17b6f705963418cdb9927482fa304bc562ece2fdd4f616084c50b7023b435a40" dependencies = [ "proc-macro2", "quote", @@ -604,6 +891,26 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "thiserror" +version = "2.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "2.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "thread_local" version = "1.1.3" @@ -640,7 +947,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.104", ] [[package]] @@ -727,6 +1034,12 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +[[package]] +name = "void" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" diff --git a/atem-connection-rs/Cargo.toml b/atem-connection-rs/Cargo.toml index 1e08a94..76eca05 100644 --- a/atem-connection-rs/Cargo.toml +++ b/atem-connection-rs/Cargo.toml @@ -11,8 +11,12 @@ log = "0.4.14" tokio = { version = "1.13.0", features = ["full"], optional = true } tokio-util = { version = "0.7.10", optional = true } enumflags2 = { version = "0.7.12", default-features = false } +embassy-net = { version = "0.7.0", optional = true, features = ["proto-ipv4", "medium-ip", "udp"]} +defmt = {version = "1.0.0", optional = true} [features] -default = ["std"] +default = ["std", "embassy-net", "defmt"] std = ["dep:tokio", "dep:tokio-util", "itertools/use_std"] +embassy-net = ["dep:embassy-net"] +defmt = ["dep:defmt"] diff --git a/atem-connection-rs/src/atem_lib/atem_packet.rs b/atem-connection-rs/src/atem_lib/atem_packet.rs index c81be4e..69d2f7d 100755 --- a/atem-connection-rs/src/atem_lib/atem_packet.rs +++ b/atem-connection-rs/src/atem_lib/atem_packet.rs @@ -43,16 +43,13 @@ impl<'a> TryFrom<&'a [u8]> for AtemPacket<&'a [u8]> { } #[derive(Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] pub enum AtemPacketParseError { - /// The packet was too short - TooShort { - got: usize, - }, + /// The packet was shorter than any valid packet should be. + TooShort { got: usize }, /// The packet's stated and actual lengths were different - LengthDiffers { - expected: u16, - got: usize, - }, + LengthDiffers { expected: u16, got: usize }, + /// The packet had invalid (unknown) flags set InvalidFlags, } @@ -258,8 +255,8 @@ impl> Display for AtemPacket { self.session_id(), self.flags(), self.ack_number(), - self.remote_sequence_number(), self.local_sequence_number(), + self.remote_sequence_number(), ) } } diff --git a/atem-connection-rs/src/atem_lib/atem_socket_two.rs b/atem-connection-rs/src/atem_lib/atem_socket_two.rs new file mode 100644 index 0000000..a2903ff --- /dev/null +++ b/atem-connection-rs/src/atem_lib/atem_socket_two.rs @@ -0,0 +1,285 @@ +use core::net::{Ipv4Addr, SocketAddrV4}; + +use log::debug; + +use super::atem_packet::{AtemPacket, AtemPacketParseError, PacketFlag}; + +pub const ATEM_PORT: u16 = 9910; +pub const SYN_PAYLOAD: [u8; 8] = [0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]; + +#[derive(Debug, PartialEq, Eq)] +pub enum AtemSocketState { + Closed, + StateDump, + Established { session_id: u16 }, +} + +/// An abstraction over an async UDP socket +pub trait SocketBackend { + type SendError; + type RecvError; + async fn send_to(&mut self, data: &[u8], addr: SocketAddrV4) -> Result<(), Self::SendError>; + async fn recv(&mut self, data: &mut [u8]) -> Result; +} + +#[cfg(feature = "std")] +pub mod tokio { + use core::net::Ipv4Addr; + + use tokio::net::UdpSocket; + + use super::{AtemSocket, SocketBackend}; + + #[derive(Debug)] + pub struct TokioBackend { + sock: UdpSocket, + } + + impl TokioBackend { + pub async fn new(addr: Ipv4Addr) -> Result, std::io::Error> { + let b = Self { + sock: UdpSocket::bind("0.0.0.0:0").await?, + }; + Ok(AtemSocket::new_with_backend(addr, b)) + } + } + + impl SocketBackend for TokioBackend { + type SendError = std::io::Error; + type RecvError = std::io::Error; + + async fn send_to( + &mut self, + data: &[u8], + addr: core::net::SocketAddrV4, + ) -> Result<(), Self::SendError> { + self.sock.send_to(data, addr).await?; + Ok(()) + } + + async fn recv(&mut self, data: &mut [u8]) -> Result { + let (cnt, _) = self.sock.recv_from(data).await?; + Ok(cnt) + } + } +} + +#[cfg(feature = "embassy-net")] +pub mod enet { + use core::net::{Ipv4Addr, SocketAddrV4}; + + use embassy_net::udp::{RecvError, SendError, UdpSocket}; + + use super::{AtemSocket, SocketBackend}; + + pub struct EnetBackend<'a> { + sock: UdpSocket<'a>, + } + + impl core::fmt::Debug for EnetBackend<'_> { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!(f, "EnetBackend") + } + } + + impl<'a> EnetBackend<'a> { + pub fn new(addr: Ipv4Addr, sock: UdpSocket<'a>) -> AtemSocket { + let b = Self { sock }; + AtemSocket::new_with_backend(addr, b) + } + } + + impl SocketBackend for EnetBackend<'_> { + type SendError = SendError; + type RecvError = RecvError; + + async fn send_to( + &mut self, + data: &[u8], + addr: SocketAddrV4, + ) -> Result<(), Self::SendError> { + self.sock.send_to(data, addr).await + } + + async fn recv(&mut self, data: &mut [u8]) -> Result { + let (count, _) = self.sock.recv_from(data).await?; + Ok(count) + } + } +} + +#[derive(Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum AtemError { + /// Tried to do something but the socket is closed + SocketClosed, + /// There was an error from the underlying socket while receiving + Recv(I::RecvError), + /// There was an error from the underlying socket while sending + Send(I::SendError), + /// We received an invalid ATEM protocol packet + BadPacket(AtemPacketParseError), + /// The switcher denied our connection attempt. + SwitcherDeniedConnect, + /// There was some nonspecific protocol error + ProtocolError, +} + +impl From for AtemError +where + I: SocketBackend, +{ + fn from(value: AtemPacketParseError) -> Self { + Self::BadPacket(value) + } +} + +pub struct AtemSocket { + inner: I, + addr: SocketAddrV4, + state: AtemSocketState, + local_seq_num: u16, +} + +impl AtemSocket +where + I: SocketBackend, +{ + fn new_with_backend(addr: Ipv4Addr, backend: I) -> Self { + Self { + inner: backend, + addr: SocketAddrV4::new(addr, ATEM_PORT), + state: AtemSocketState::Closed, + local_seq_num: 0, + } + } + + /// Connect to the ATEM, performing the connection handshake. + pub async fn connect(&mut self) -> Result<(), AtemError> { + // Reset state + self.state = AtemSocketState::Closed; + self.local_seq_num = 0; + let mut buf = [0u8; 20]; + // FIXME: this should be random + let initial_sid = 0x53ab; + + // Step 1: Send the "SYN" with a random session id and magic payload + + let (mut p, _) = AtemPacket::init( + &mut buf, + PacketFlag::NewSessionId.into(), + initial_sid, + self.local_seq_num, + Some(&SYN_PAYLOAD), + ) + .unwrap(); + p.set_remote_seq_num(0x3a); + + self.inner + .send_to(p.inner(), self.addr) + .await + .map_err(|e| AtemError::Send(e))?; + + debug!("Connect step 1: sent SYN"); + + // Step 2: Wait for the response from the ATEM. + // this should also have the SYN flag set and have the same session id that we sent. + // The first byte of the payload should be 0x02 - if it's 0x04 we should restart + // the connection + + self.inner + .recv(&mut buf) + .await + .map_err(|e| AtemError::Recv(e))?; + + let ack = AtemPacket::new_checked(buf)?; + debug!("RX: {}", ack); + if ack.flags() != PacketFlag::NewSessionId { + return Err(AtemError::ProtocolError); + } + if ack.session_id() != initial_sid { + return Err(AtemError::ProtocolError); + } + match ack.body()[0] { + 0x02 => { + // all good + debug!("Connect step 2: got reply"); + } + 0x04 => { + // Restart connection + return Err(AtemError::SwitcherDeniedConnect); + } + _ => { + return Err(AtemError::ProtocolError); + } + } + + // Step 3: Send an ACK for the packet we just received. + self.ack(initial_sid, ack.local_sequence_number()).await?; + debug!("Connect step 3: sent ACK"); + self.state = AtemSocketState::StateDump; + + Ok(()) + } + + async fn ack(&mut self, sid: u16, ack_num: u16) -> Result<(), AtemError> { + let mut buf = [0u8; 12]; + let (mut p, _) = AtemPacket::init( + &mut buf, + PacketFlag::AckReply.into(), + sid, + // N.B: local seq num is set to 0 in ACKs + 0, + None, + ) + .unwrap(); + p.set_ack_num(ack_num); + debug!("TX: {}", p); + self.inner + .send_to(p.inner(), self.addr) + .await + .map_err(|e| AtemError::Send(e))?; + Ok(()) + } + + /// Receive a packet from the ATEM. Automatically handles sending ACKs for received packets. + pub async fn recv<'a>( + &mut self, + buf: &'a mut [u8], + ) -> Result, AtemError> { + if self.state == AtemSocketState::Closed { + return Err(AtemError::SocketClosed); + } + let size = self.inner.recv(buf).await.map_err(|e| AtemError::Recv(e))?; + let p: AtemPacket<_> = buf[0..size].try_into()?; + debug!("RX: {}", p); + match self.state { + AtemSocketState::StateDump => { + // We've just connected and the state's being dumped to us. + if p.body().len() == 0 { + // State dump is done. + self.state = AtemSocketState::Established { + session_id: p.session_id(), + }; + debug!("ATEM state dump done, session ID is {}", p.session_id()); + self.ack(p.session_id(), p.local_sequence_number()).await?; + } + } + AtemSocketState::Established { session_id } => { + if p.flags().contains(PacketFlag::AckRequest) { + self.ack(session_id, p.local_sequence_number()).await?; + } + } + AtemSocketState::Closed => unreachable!(), + } + Ok(p) + } + + /// Send a packet to the ATEM. The ATEM will respond with an ACK - it's up to the caller to handle packet retransmission + pub async fn send>(&mut self, p: AtemPacket) -> Result<(), AtemError> { + self.inner + .send_to(p.inner().as_ref(), self.addr) + .await + .map_err(|e| AtemError::Send(e)) + } +} diff --git a/atem-connection-rs/src/atem_lib/mod.rs b/atem-connection-rs/src/atem_lib/mod.rs index d0a6cd7..a51aab8 100644 --- a/atem-connection-rs/src/atem_lib/mod.rs +++ b/atem-connection-rs/src/atem_lib/mod.rs @@ -2,3 +2,4 @@ pub mod atem_field; pub mod atem_packet; #[cfg(feature = "std")] pub mod atem_socket; +pub mod atem_socket_two; diff --git a/atem-connection-rs/src/commands/command_base.rs b/atem-connection-rs/src/commands/command_base.rs index f9321b9..d7374ee 100644 --- a/atem-connection-rs/src/commands/command_base.rs +++ b/atem-connection-rs/src/commands/command_base.rs @@ -1,7 +1,4 @@ -use std::{ - boxed::Box, collections::HashMap, fmt::Debug, string::String, sync::Arc, - vec::Vec, -}; +use std::{boxed::Box, collections::HashMap, fmt::Debug, string::String, sync::Arc, vec::Vec}; use crate::{enums::ProtocolVersion, state::AtemState}; diff --git a/atem-test/Cargo.toml b/atem-test/Cargo.toml index 028b623..31115c2 100644 --- a/atem-test/Cargo.toml +++ b/atem-test/Cargo.toml @@ -2,6 +2,7 @@ name = "atem-test" version = "0.1.0" edition = "2024" +default-run = "main" [dependencies] atem-connection-rs = { path = "../atem-connection-rs" } diff --git a/atem-test/src/main.rs b/atem-test/src/bin/main.rs similarity index 100% rename from atem-test/src/main.rs rename to atem-test/src/bin/main.rs diff --git a/atem-test/src/bin/socket2.rs b/atem-test/src/bin/socket2.rs new file mode 100644 index 0000000..4ac464a --- /dev/null +++ b/atem-test/src/bin/socket2.rs @@ -0,0 +1,48 @@ +use atem_connection_rs::atem_lib::{atem_field::PrgI, atem_socket_two::tokio::TokioBackend}; +use clap::Parser; +use color_eyre::Report; + +/// ATEM Rust Library Test App +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + /// IP of the ATEM to connect to + #[arg(short, long)] + ip: String, +} + +#[tokio::main] +async fn main() { + let args = Args::parse(); + + setup_logging().unwrap(); + + let mut s = TokioBackend::new(args.ip.parse().unwrap()).await.unwrap(); + s.connect().await.expect("aa"); + let mut buf = [0u8; 2048]; + loop { + let p = s.recv(&mut buf).await.unwrap(); + let prg: Vec = p.fields::().collect::>().unwrap(); + if !prg.is_empty() { + println!("{:?}", prg); + } + } +} + +fn setup_logging() -> Result<(), Report> { + if std::env::var("RUST_LIB_BACKTRACE").is_err() { + unsafe { + std::env::set_var("RUST_LIB_BACKTRACE", "1"); + } + } + color_eyre::install()?; + + if std::env::var("RUST_LOG").is_err() { + unsafe { + std::env::set_var("RUST_LOG", "debug"); + } + } + env_logger::init(); + + Ok(()) +}