diff --git a/atem-connection-rs/src/atem_lib/atem_packet.rs b/atem-connection-rs/src/atem_lib/atem_packet.rs index 18a9b02..c1a14a9 100755 --- a/atem-connection-rs/src/atem_lib/atem_packet.rs +++ b/atem-connection-rs/src/atem_lib/atem_packet.rs @@ -1,9 +1,12 @@ -pub struct AtemPacket { +#[derive(Debug)] +pub struct AtemPacket<'packet_buffer> { length: u16, flags: u8, session_id: u16, remote_packet_id: u16, - body: Vec, + retransmit_requested_from_packet_id: Option, + ack_reply: Option, + body: &'packet_buffer [u8], } pub enum AtemPacketErr { @@ -32,7 +35,7 @@ impl From for u8 { } } -impl AtemPacket { +impl<'packet_buffer> AtemPacket<'packet_buffer> { pub fn length(&self) -> u16 { self.length } @@ -49,8 +52,16 @@ impl AtemPacket { self.remote_packet_id } - pub fn body(&self) -> Vec { - self.body.clone() + pub fn body(&self) -> &[u8] { + self.body + } + + pub fn retransmit_request(&self) -> Option { + self.retransmit_requested_from_packet_id + } + + pub fn ack_reply(&self) -> Option { + self.ack_reply } pub fn has_flag(&self, flag: PacketFlag) -> bool { @@ -58,10 +69,10 @@ impl AtemPacket { } } -impl TryFrom<&[u8]> for AtemPacket { +impl<'packet_buffer> TryFrom<&'packet_buffer [u8]> for AtemPacket<'packet_buffer> { type Error = AtemPacketErr; - fn try_from(buffer: &[u8]) -> Result { + fn try_from(buffer: &'packet_buffer [u8]) -> Result { if buffer.len() < 12 { return Err(AtemPacketErr::TooShort(format!( "Invalid packet from ATEM {:x?}", @@ -82,7 +93,20 @@ impl TryFrom<&[u8]> for AtemPacket { let session_id = u16::from_be_bytes(buffer[2..4].try_into().unwrap()); let remote_packet_id = u16::from_be_bytes(buffer[10..12].try_into().unwrap()); - let body = buffer[12..].to_vec(); + let body = &buffer[12..]; + + let retransmit_requested_from_packet_id = + if flags & u8::from(PacketFlag::RetransmitRequest) > 0 { + Some(u16::from_be_bytes(buffer[6..8].try_into().unwrap())) + } else { + None + }; + + let ack_reply = if flags & u8::from(PacketFlag::AckReply) > 0 { + Some(u16::from_be_bytes(buffer[4..6].try_into().unwrap())) + } else { + None + }; Ok(AtemPacket { length, @@ -90,6 +114,8 @@ impl TryFrom<&[u8]> for AtemPacket { session_id, remote_packet_id, body, + retransmit_requested_from_packet_id, + ack_reply, }) } } diff --git a/atem-connection-rs/src/atem_lib/atem_socket.rs b/atem-connection-rs/src/atem_lib/atem_socket.rs index 32de3ab..3c4d5c6 100644 --- a/atem-connection-rs/src/atem_lib/atem_socket.rs +++ b/atem-connection-rs/src/atem_lib/atem_socket.rs @@ -13,6 +13,8 @@ use crate::{ commands::{command_base::DeserializedCommand, parse_commands::deserialize_commands}, }; +use super::atem_packet::PacketFlag; + const IN_FLIGHT_TIMEOUT: u64 = 60; const CONNECTION_TIMEOUT: u64 = 5000; const CONNECTION_RETRY_INTERVAL: u64 = 1000; @@ -54,27 +56,6 @@ impl Into for ConnectionState { } } -#[derive(PartialEq)] -enum PacketFlag { - AckRequest, - NewSessionId, - IsRetransmit, - RetransmitRequest, - AckReply, -} - -impl From for u8 { - fn from(flag: PacketFlag) -> Self { - match flag { - PacketFlag::AckRequest => 0x01, - PacketFlag::NewSessionId => 0x02, - PacketFlag::IsRetransmit => 0x04, - PacketFlag::RetransmitRequest => 0x08, - PacketFlag::AckReply => 0x10, - } - } -} - #[derive(Clone)] struct InFlightPacket { packet_id: u16, @@ -276,34 +257,18 @@ impl AtemSocket { } async fn recieved_packet(&mut self, packet: &[u8]) { - debug!("Received {:x?}", packet); - let Ok(atem_packet): Result = packet.try_into() else { return; }; - if packet.len() < 12 { - debug!("Invalid packet from ATEM {:x?}", packet); - return; - } + debug!("Received {:x?}", atem_packet); self.last_received_at = SystemTime::now(); - let length = u16::from_be_bytes(packet[0..2].try_into().unwrap()) & 0x07ff; - if length as usize != packet.len() { - debug!( - "Length of message differs, expected {} got {}", - length, - packet.len() - ); - return; - } + self.session_id = atem_packet.session_id(); + let remote_packet_id = atem_packet.remote_packet_id(); - let flags = packet[0] >> 3; - self.session_id = u16::from_be_bytes(packet[2..4].try_into().unwrap()); - let remote_packet_id = u16::from_be_bytes(packet[10..12].try_into().unwrap()); - - if flags & u8::from(PacketFlag::NewSessionId) > 0 { + if atem_packet.has_flag(PacketFlag::NewSessionId) { debug!("New session"); self.connection_state = ConnectionState::Established; self.last_received_packed_id = remote_packet_id; @@ -312,20 +277,19 @@ impl AtemSocket { } if self.connection_state == ConnectionState::Established { - if flags & u8::from(PacketFlag::RetransmitRequest) > 0 { - let from_packet_id = u16::from_be_bytes(packet[6..8].try_into().unwrap()); + if let Some(from_packet_id) = atem_packet.retransmit_request() { debug!("Retransmit request: {:x?}", from_packet_id); self.retransmit_from(from_packet_id).await; } - if flags & u8::from(PacketFlag::AckRequest) > 0 { + if atem_packet.has_flag(PacketFlag::AckRequest) { if remote_packet_id == (self.last_received_packed_id + 1) % MAX_PACKET_ID { self.last_received_packed_id = remote_packet_id; self.send_or_queue_ack().await; - if length > 12 { - self.on_commands_received(&packet[12..]); + if atem_packet.length() > 12 { + self.on_commands_received(atem_packet.body()); } } else if self .is_packet_covered_by_ack(self.last_received_packed_id, remote_packet_id) @@ -334,12 +298,11 @@ impl AtemSocket { } } - if flags & u8::from(PacketFlag::IsRetransmit) > 0 { + if atem_packet.has_flag(PacketFlag::IsRetransmit) { debug!("ATEM retransmitted packet {:x?}", remote_packet_id); } - if flags & u8::from(PacketFlag::AckReply) > 0 { - let ack_packet_id = u16::from_be_bytes(packet[4..6].try_into().unwrap()); + if let Some(ack_packet_id) = atem_packet.ack_reply() { let mut acked_commands: Vec = vec![]; self.in_flight = self