feat: Use AtemPacket struct

This commit is contained in:
Baud 2024-02-28 18:43:21 +00:00
parent 34a268f0bf
commit 5db8843ce7
2 changed files with 46 additions and 57 deletions

View File

@ -1,9 +1,12 @@
pub struct AtemPacket { #[derive(Debug)]
pub struct AtemPacket<'packet_buffer> {
length: u16, length: u16,
flags: u8, flags: u8,
session_id: u16, session_id: u16,
remote_packet_id: u16, remote_packet_id: u16,
body: Vec<u8>, retransmit_requested_from_packet_id: Option<u16>,
ack_reply: Option<u16>,
body: &'packet_buffer [u8],
} }
pub enum AtemPacketErr { pub enum AtemPacketErr {
@ -32,7 +35,7 @@ impl From<PacketFlag> for u8 {
} }
} }
impl AtemPacket { impl<'packet_buffer> AtemPacket<'packet_buffer> {
pub fn length(&self) -> u16 { pub fn length(&self) -> u16 {
self.length self.length
} }
@ -49,8 +52,16 @@ impl AtemPacket {
self.remote_packet_id self.remote_packet_id
} }
pub fn body(&self) -> Vec<u8> { pub fn body(&self) -> &[u8] {
self.body.clone() self.body
}
pub fn retransmit_request(&self) -> Option<u16> {
self.retransmit_requested_from_packet_id
}
pub fn ack_reply(&self) -> Option<u16> {
self.ack_reply
} }
pub fn has_flag(&self, flag: PacketFlag) -> bool { pub fn has_flag(&self, flag: PacketFlag) -> bool {
@ -58,10 +69,10 @@ impl AtemPacket {
} }
} }
impl TryFrom<&[u8]> for AtemPacket { impl<'packet_buffer> TryFrom<&'packet_buffer [u8]> for AtemPacket<'packet_buffer> {
type Error = AtemPacketErr; type Error = AtemPacketErr;
fn try_from(buffer: &[u8]) -> Result<Self, Self::Error> { fn try_from(buffer: &'packet_buffer [u8]) -> Result<Self, Self::Error> {
if buffer.len() < 12 { if buffer.len() < 12 {
return Err(AtemPacketErr::TooShort(format!( return Err(AtemPacketErr::TooShort(format!(
"Invalid packet from ATEM {:x?}", "Invalid packet from ATEM {:x?}",
@ -82,7 +93,20 @@ impl TryFrom<&[u8]> for AtemPacket {
let session_id = u16::from_be_bytes(buffer[2..4].try_into().unwrap()); let session_id = u16::from_be_bytes(buffer[2..4].try_into().unwrap());
let remote_packet_id = u16::from_be_bytes(buffer[10..12].try_into().unwrap()); let remote_packet_id = u16::from_be_bytes(buffer[10..12].try_into().unwrap());
let body = buffer[12..].to_vec(); let body = &buffer[12..];
let retransmit_requested_from_packet_id =
if flags & u8::from(PacketFlag::RetransmitRequest) > 0 {
Some(u16::from_be_bytes(buffer[6..8].try_into().unwrap()))
} else {
None
};
let ack_reply = if flags & u8::from(PacketFlag::AckReply) > 0 {
Some(u16::from_be_bytes(buffer[4..6].try_into().unwrap()))
} else {
None
};
Ok(AtemPacket { Ok(AtemPacket {
length, length,
@ -90,6 +114,8 @@ impl TryFrom<&[u8]> for AtemPacket {
session_id, session_id,
remote_packet_id, remote_packet_id,
body, body,
retransmit_requested_from_packet_id,
ack_reply,
}) })
} }
} }

View File

@ -13,6 +13,8 @@ use crate::{
commands::{command_base::DeserializedCommand, parse_commands::deserialize_commands}, commands::{command_base::DeserializedCommand, parse_commands::deserialize_commands},
}; };
use super::atem_packet::PacketFlag;
const IN_FLIGHT_TIMEOUT: u64 = 60; const IN_FLIGHT_TIMEOUT: u64 = 60;
const CONNECTION_TIMEOUT: u64 = 5000; const CONNECTION_TIMEOUT: u64 = 5000;
const CONNECTION_RETRY_INTERVAL: u64 = 1000; const CONNECTION_RETRY_INTERVAL: u64 = 1000;
@ -54,27 +56,6 @@ impl Into<u8> for ConnectionState {
} }
} }
#[derive(PartialEq)]
enum PacketFlag {
AckRequest,
NewSessionId,
IsRetransmit,
RetransmitRequest,
AckReply,
}
impl From<PacketFlag> for u8 {
fn from(flag: PacketFlag) -> Self {
match flag {
PacketFlag::AckRequest => 0x01,
PacketFlag::NewSessionId => 0x02,
PacketFlag::IsRetransmit => 0x04,
PacketFlag::RetransmitRequest => 0x08,
PacketFlag::AckReply => 0x10,
}
}
}
#[derive(Clone)] #[derive(Clone)]
struct InFlightPacket { struct InFlightPacket {
packet_id: u16, packet_id: u16,
@ -276,34 +257,18 @@ impl AtemSocket {
} }
async fn recieved_packet(&mut self, packet: &[u8]) { async fn recieved_packet(&mut self, packet: &[u8]) {
debug!("Received {:x?}", packet);
let Ok(atem_packet): Result<AtemPacket, _> = packet.try_into() else { let Ok(atem_packet): Result<AtemPacket, _> = packet.try_into() else {
return; return;
}; };
if packet.len() < 12 { debug!("Received {:x?}", atem_packet);
debug!("Invalid packet from ATEM {:x?}", packet);
return;
}
self.last_received_at = SystemTime::now(); self.last_received_at = SystemTime::now();
let length = u16::from_be_bytes(packet[0..2].try_into().unwrap()) & 0x07ff;
if length as usize != packet.len() { self.session_id = atem_packet.session_id();
debug!( let remote_packet_id = atem_packet.remote_packet_id();
"Length of message differs, expected {} got {}",
length,
packet.len()
);
return;
}
let flags = packet[0] >> 3; if atem_packet.has_flag(PacketFlag::NewSessionId) {
self.session_id = u16::from_be_bytes(packet[2..4].try_into().unwrap());
let remote_packet_id = u16::from_be_bytes(packet[10..12].try_into().unwrap());
if flags & u8::from(PacketFlag::NewSessionId) > 0 {
debug!("New session"); debug!("New session");
self.connection_state = ConnectionState::Established; self.connection_state = ConnectionState::Established;
self.last_received_packed_id = remote_packet_id; self.last_received_packed_id = remote_packet_id;
@ -312,20 +277,19 @@ impl AtemSocket {
} }
if self.connection_state == ConnectionState::Established { if self.connection_state == ConnectionState::Established {
if flags & u8::from(PacketFlag::RetransmitRequest) > 0 { if let Some(from_packet_id) = atem_packet.retransmit_request() {
let from_packet_id = u16::from_be_bytes(packet[6..8].try_into().unwrap());
debug!("Retransmit request: {:x?}", from_packet_id); debug!("Retransmit request: {:x?}", from_packet_id);
self.retransmit_from(from_packet_id).await; self.retransmit_from(from_packet_id).await;
} }
if flags & u8::from(PacketFlag::AckRequest) > 0 { if atem_packet.has_flag(PacketFlag::AckRequest) {
if remote_packet_id == (self.last_received_packed_id + 1) % MAX_PACKET_ID { if remote_packet_id == (self.last_received_packed_id + 1) % MAX_PACKET_ID {
self.last_received_packed_id = remote_packet_id; self.last_received_packed_id = remote_packet_id;
self.send_or_queue_ack().await; self.send_or_queue_ack().await;
if length > 12 { if atem_packet.length() > 12 {
self.on_commands_received(&packet[12..]); self.on_commands_received(atem_packet.body());
} }
} else if self } else if self
.is_packet_covered_by_ack(self.last_received_packed_id, remote_packet_id) .is_packet_covered_by_ack(self.last_received_packed_id, remote_packet_id)
@ -334,12 +298,11 @@ impl AtemSocket {
} }
} }
if flags & u8::from(PacketFlag::IsRetransmit) > 0 { if atem_packet.has_flag(PacketFlag::IsRetransmit) {
debug!("ATEM retransmitted packet {:x?}", remote_packet_id); debug!("ATEM retransmitted packet {:x?}", remote_packet_id);
} }
if flags & u8::from(PacketFlag::AckReply) > 0 { if let Some(ack_packet_id) = atem_packet.ack_reply() {
let ack_packet_id = u16::from_be_bytes(packet[4..6].try_into().unwrap());
let mut acked_commands: Vec<AckedPacket> = vec![]; let mut acked_commands: Vec<AckedPacket> = vec![];
self.in_flight = self self.in_flight = self