Make AtemPacket operate on `&[u8] `, add field parsing
This commit is contained in:
parent
5fd5b6f764
commit
a419a78a0a
|
@ -1,14 +1,24 @@
|
||||||
pub struct AtemPacket {
|
use core::{fmt::Display, str};
|
||||||
length: u16,
|
|
||||||
flags: u8,
|
// TODO: we don't need itertools once https://github.com/rust-lang/rust/issues/79524 lands
|
||||||
session_id: u16,
|
use itertools::Itertools;
|
||||||
remote_packet_id: u16,
|
|
||||||
body: Vec<u8>,
|
/// The "hello" packet to start communication with the ATEM
|
||||||
|
pub const COMMAND_CONNECT_HELLO: [u8; 20] = [
|
||||||
|
0x10, 0x14, 0x53, 0xab, 0x00, 0x00, 0x00, 0x00, 0x00, 0x3a, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00,
|
||||||
|
];
|
||||||
|
|
||||||
|
pub struct AtemPacket<T: AsRef<[u8]>> {
|
||||||
|
buf: T,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
pub enum AtemPacketErr {
|
pub enum AtemPacketErr {
|
||||||
TooShort(String),
|
/// The packet was too short
|
||||||
LengthDiffers(String),
|
TooShort { got: usize },
|
||||||
|
/// The packet's stated and actual lengths were different
|
||||||
|
LengthDiffers { expected: u16, got: usize },
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(PartialEq)]
|
#[derive(PartialEq)]
|
||||||
|
@ -32,64 +42,122 @@ impl From<PacketFlag> for u8 {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AtemPacket {
|
impl<T: AsRef<[u8]>> AtemPacket<T> {
|
||||||
|
pub fn new_checked(buf: T) -> Result<Self, AtemPacketErr> {
|
||||||
|
let len = buf.as_ref().len();
|
||||||
|
if len < 12 {
|
||||||
|
return Err(AtemPacketErr::TooShort {
|
||||||
|
got: buf.as_ref().len(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
let p = Self { buf };
|
||||||
|
if p.length() as usize != len {
|
||||||
|
return Err(AtemPacketErr::LengthDiffers {
|
||||||
|
expected: p.length(),
|
||||||
|
got: len,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Ok(p)
|
||||||
|
}
|
||||||
pub fn length(&self) -> u16 {
|
pub fn length(&self) -> u16 {
|
||||||
self.length
|
u16::from_be_bytes(self.buf.as_ref()[0..=1].try_into().unwrap()) & 0x07ff
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn flags(&self) -> u8 {
|
pub fn flags(&self) -> u8 {
|
||||||
self.flags
|
self.buf.as_ref()[0] >> 3
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn session_id(&self) -> u16 {
|
pub fn session_id(&self) -> u16 {
|
||||||
self.session_id
|
u16::from_be_bytes(self.buf.as_ref()[2..=3].try_into().unwrap())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn ack_number(&self) -> u16 {
|
||||||
|
u16::from_be_bytes(self.buf.as_ref()[4..=5].try_into().unwrap())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn remote_sequence_number(&self) -> u16 {
|
||||||
|
u16::from_be_bytes(self.buf.as_ref()[9..=10].try_into().unwrap())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn remote_packet_id(&self) -> u16 {
|
pub fn remote_packet_id(&self) -> u16 {
|
||||||
self.remote_packet_id
|
u16::from_be_bytes(self.buf.as_ref()[10..=11].try_into().unwrap())
|
||||||
}
|
|
||||||
|
|
||||||
pub fn body(&self) -> Vec<u8> {
|
|
||||||
self.body.clone()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Return true if this packet has the given [`PacketFlag`]
|
||||||
pub fn has_flag(&self, flag: PacketFlag) -> bool {
|
pub fn has_flag(&self, flag: PacketFlag) -> bool {
|
||||||
self.flags & u8::from(flag) > 0
|
self.flags() & u8::from(flag) > 0
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get an iterator over the `Field`s in this packet.
|
||||||
|
///
|
||||||
|
/// Returns None if this is a packet without fields.
|
||||||
|
pub fn fields(&self) -> Option<Fields> {
|
||||||
|
// TODO: do we only ever get newsessionid during the handshake (i.e. not in a packet with fields)?
|
||||||
|
if self.has_flag(PacketFlag::NewSessionId) {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(Fields {
|
||||||
|
data: self.body(),
|
||||||
|
offset: 0,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn body(&self) -> &[u8] {
|
||||||
|
&self.buf.as_ref()[12..]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TryFrom<&[u8]> for AtemPacket {
|
#[cfg(feature = "std")]
|
||||||
type Error = AtemPacketErr;
|
impl<T: AsRef<[u8]>> Display for AtemPacket<T> {
|
||||||
|
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
||||||
fn try_from(buffer: &[u8]) -> Result<Self, Self::Error> {
|
write!(
|
||||||
if buffer.len() < 12 {
|
f,
|
||||||
return Err(AtemPacketErr::TooShort(format!(
|
"sid: {}, len: {}, fields: ({})",
|
||||||
"Invalid packet from ATEM {:x?}",
|
self.session_id(),
|
||||||
buffer
|
self.length(),
|
||||||
)));
|
self.fields()
|
||||||
}
|
.and_then(|f| Some(
|
||||||
|
f.map(|f| str::from_utf8(f.r#type).unwrap())
|
||||||
let length = u16::from_be_bytes(buffer[0..2].try_into().unwrap()) & 0x07ff;
|
.intersperse(", ")
|
||||||
if length as usize != buffer.len() {
|
.collect::<String>()
|
||||||
return Err(AtemPacketErr::LengthDiffers(format!(
|
))
|
||||||
"Length of message differs, expected {} got {}",
|
.unwrap_or("none".into())
|
||||||
length,
|
)
|
||||||
buffer.len()
|
}
|
||||||
)));
|
}
|
||||||
}
|
|
||||||
|
/// An ATEM protocol field - a 4-ascii character type plus variable length data
|
||||||
let flags = buffer[0] >> 3;
|
pub struct Field<'a> {
|
||||||
let session_id = u16::from_be_bytes(buffer[2..4].try_into().unwrap());
|
r#type: &'a [u8; 4],
|
||||||
let remote_packet_id = u16::from_be_bytes(buffer[10..12].try_into().unwrap());
|
data: &'a [u8],
|
||||||
|
}
|
||||||
let body = buffer[12..].to_vec();
|
|
||||||
|
/// Created by [`AtemPacket::fields`]
|
||||||
Ok(AtemPacket {
|
pub struct Fields<'a> {
|
||||||
length,
|
data: &'a [u8],
|
||||||
flags,
|
offset: usize,
|
||||||
session_id,
|
}
|
||||||
remote_packet_id,
|
|
||||||
body,
|
impl<'a> Iterator for Fields<'a> {
|
||||||
})
|
type Item = Field<'a>;
|
||||||
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
|
let remain = self.data.len() - self.offset;
|
||||||
|
if remain == 0 {
|
||||||
|
return None;
|
||||||
|
} else if remain < 8 {
|
||||||
|
// TODO: is 8 indeed the minimum size for something here? (i.e. no field data)
|
||||||
|
panic!("Oh no");
|
||||||
|
}
|
||||||
|
|
||||||
|
let length =
|
||||||
|
u16::from_be_bytes(self.data[self.offset..=self.offset + 1].try_into().unwrap());
|
||||||
|
// TODO: sanity check length
|
||||||
|
let r#type: &[u8; 4] = self.data[self.offset + 4..=self.offset + 7]
|
||||||
|
.try_into()
|
||||||
|
.unwrap();
|
||||||
|
let data = &self.data[self.offset + 8..self.offset + (length as usize)];
|
||||||
|
self.offset += (length as usize);
|
||||||
|
Some(Field { r#type, data })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,10 +4,10 @@ use std::{
|
||||||
time::{Duration, SystemTime},
|
time::{Duration, SystemTime},
|
||||||
};
|
};
|
||||||
|
|
||||||
use log::debug;
|
use log::{debug, trace};
|
||||||
use tokio::net::UdpSocket;
|
use tokio::net::UdpSocket;
|
||||||
|
|
||||||
use crate::atem_lib::atem_util;
|
use crate::atem_lib::atem_packet::{AtemPacket, PacketFlag, COMMAND_CONNECT_HELLO};
|
||||||
|
|
||||||
const IN_FLIGHT_TIMEOUT: u64 = 60;
|
const IN_FLIGHT_TIMEOUT: u64 = 60;
|
||||||
const CONNECTION_TIMEOUT: u64 = 5000;
|
const CONNECTION_TIMEOUT: u64 = 5000;
|
||||||
|
@ -39,27 +39,6 @@ impl Into<u8> for ConnectionState {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(PartialEq)]
|
|
||||||
enum PacketFlag {
|
|
||||||
AckRequest,
|
|
||||||
NewSessionId,
|
|
||||||
IsRetransmit,
|
|
||||||
RetransmitRequest,
|
|
||||||
AckReply,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<PacketFlag> for u8 {
|
|
||||||
fn from(flag: PacketFlag) -> Self {
|
|
||||||
match flag {
|
|
||||||
PacketFlag::AckRequest => 0x01,
|
|
||||||
PacketFlag::NewSessionId => 0x02,
|
|
||||||
PacketFlag::IsRetransmit => 0x04,
|
|
||||||
PacketFlag::RetransmitRequest => 0x08,
|
|
||||||
PacketFlag::AckReply => 0x10,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct InFlightPacket {
|
struct InFlightPacket {
|
||||||
packet_id: u16,
|
packet_id: u16,
|
||||||
|
@ -152,7 +131,7 @@ impl AtemSocketInner {
|
||||||
self.in_flight = vec![];
|
self.in_flight = vec![];
|
||||||
debug!("Reconnect");
|
debug!("Reconnect");
|
||||||
|
|
||||||
self.send_packet(&atem_util::COMMAND_CONNECT_HELLO).await;
|
self.send_packet(&COMMAND_CONNECT_HELLO).await;
|
||||||
self.connection_state = ConnectionState::SynSent;
|
self.connection_state = ConnectionState::SynSent;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -277,30 +256,23 @@ impl AtemSocketInner {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn recieved_packet(&mut self, packet: &[u8]) {
|
async fn recieved_packet(&mut self, packet: &[u8]) {
|
||||||
debug!("RECV {:x?}", packet);
|
trace!("RX Raw: {:x?}", packet);
|
||||||
|
|
||||||
if packet.len() < 12 {
|
let checked = match AtemPacket::new_checked(packet) {
|
||||||
debug!("Invalid packet from ATEM {:x?}", packet);
|
Ok(p) => p,
|
||||||
return;
|
Err(e) => {
|
||||||
}
|
debug!("Invalid packet ({:?}): {:x?}", e, packet);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
debug!("RX: {}", checked);
|
||||||
|
|
||||||
self.last_received_at = SystemTime::now();
|
self.last_received_at = SystemTime::now();
|
||||||
let length = u16::from_be_bytes(packet[0..2].try_into().unwrap()) & 0x07ff;
|
|
||||||
|
|
||||||
if length as usize != packet.len() {
|
self.session_id = checked.session_id();
|
||||||
debug!(
|
let remote_packet_id = checked.remote_packet_id();
|
||||||
"Length of message differs, expected {} got {}",
|
|
||||||
length,
|
|
||||||
packet.len()
|
|
||||||
);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
let flags = packet[0] >> 3;
|
if checked.flags() & u8::from(PacketFlag::NewSessionId) > 0 {
|
||||||
self.session_id = u16::from_be_bytes(packet[2..4].try_into().unwrap());
|
|
||||||
let remote_packet_id = u16::from_be_bytes(packet[10..12].try_into().unwrap());
|
|
||||||
|
|
||||||
if flags & u8::from(PacketFlag::NewSessionId) > 0 {
|
|
||||||
debug!("New session");
|
debug!("New session");
|
||||||
self.connection_state = ConnectionState::Established;
|
self.connection_state = ConnectionState::Established;
|
||||||
self.last_received_packed_id = remote_packet_id;
|
self.last_received_packed_id = remote_packet_id;
|
||||||
|
@ -309,20 +281,20 @@ impl AtemSocketInner {
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.connection_state == ConnectionState::Established {
|
if self.connection_state == ConnectionState::Established {
|
||||||
if flags & u8::from(PacketFlag::RetransmitRequest) > 0 {
|
if checked.has_flag(PacketFlag::RetransmitRequest) {
|
||||||
let from_packet_id = u16::from_be_bytes(packet[6..8].try_into().unwrap());
|
let from_packet_id = u16::from_be_bytes(packet[6..8].try_into().unwrap());
|
||||||
debug!("Retransmit request: {:x?}", from_packet_id);
|
debug!("Retransmit request: {:x?}", from_packet_id);
|
||||||
|
|
||||||
self.retransmit_from(from_packet_id).await;
|
self.retransmit_from(from_packet_id).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
if flags & u8::from(PacketFlag::AckRequest) > 0 {
|
if checked.has_flag(PacketFlag::AckRequest) {
|
||||||
if remote_packet_id == (self.last_received_packed_id + 1) % MAX_PACKET_ID {
|
if remote_packet_id == (self.last_received_packed_id + 1) % MAX_PACKET_ID {
|
||||||
self.last_received_packed_id = remote_packet_id;
|
self.last_received_packed_id = remote_packet_id;
|
||||||
self.send_or_queue_ack().await;
|
self.send_or_queue_ack().await;
|
||||||
|
|
||||||
if length > 12 {
|
if checked.length() > 12 {
|
||||||
self.on_command_received(&packet[12..], remote_packet_id);
|
self.on_command_received(checked.body(), remote_packet_id);
|
||||||
}
|
}
|
||||||
} else if self
|
} else if self
|
||||||
.is_packet_covered_by_ack(self.last_received_packed_id, remote_packet_id)
|
.is_packet_covered_by_ack(self.last_received_packed_id, remote_packet_id)
|
||||||
|
@ -331,12 +303,12 @@ impl AtemSocketInner {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if flags & u8::from(PacketFlag::IsRetransmit) > 0 {
|
if checked.has_flag(PacketFlag::IsRetransmit) {
|
||||||
debug!("ATEM retransmitted packet {:x?}", remote_packet_id);
|
debug!("ATEM retransmitted packet {:x?}", remote_packet_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
if flags & u8::from(PacketFlag::AckReply) > 0 {
|
if checked.has_flag(PacketFlag::AckReply) {
|
||||||
let ack_packet_id = u16::from_be_bytes(packet[4..6].try_into().unwrap());
|
let ack_packet_id = checked.ack_number();
|
||||||
let mut acked_commands: Vec<AckedPacket> = vec![];
|
let mut acked_commands: Vec<AckedPacket> = vec![];
|
||||||
|
|
||||||
self.in_flight = self
|
self.in_flight = self
|
||||||
|
|
|
@ -1,4 +0,0 @@
|
||||||
pub const COMMAND_CONNECT_HELLO: [u8; 20] = [
|
|
||||||
0x10, 0x14, 0x53, 0xab, 0x00, 0x00, 0x00, 0x00, 0x00, 0x3a, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
|
|
||||||
0x00, 0x00, 0x00, 0x00,
|
|
||||||
];
|
|
|
@ -1,4 +1,3 @@
|
||||||
mod atem_packet;
|
pub mod atem_packet;
|
||||||
pub mod atem_socket;
|
pub mod atem_socket;
|
||||||
mod atem_socket_inner;
|
mod atem_socket_inner;
|
||||||
pub mod atem_util;
|
|
||||||
|
|
Loading…
Reference in New Issue