wip: handle received commands

This commit is contained in:
Baud 2024-03-08 17:43:53 +00:00
parent 9dd8cc0574
commit 4a075c3d1e
13 changed files with 216 additions and 15 deletions

View File

@ -1,13 +1,31 @@
use std::{collections::HashMap, net::SocketAddr, sync::Arc}; use std::{
collections::{HashMap, VecDeque},
net::SocketAddr,
sync::Arc,
};
use tokio::sync::{mpsc::error::TryRecvError, Semaphore}; use tokio::sync::{mpsc::error::TryRecvError, Semaphore};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use crate::{ use crate::{
atem_lib::atem_socket::{AtemEvent, AtemSocketCommand, AtemSocketMessage, TrackingId}, atem_lib::atem_socket::{AtemEvent, AtemSocketCommand, AtemSocketMessage, TrackingId},
commands::command_base::BasicWritableCommand, commands::{
command_base::{BasicWritableCommand, DeserializedCommand},
device_profile::DESERIALIZE_VERSION_RAW_NAME,
init_complete::DESERIALIZE_INIT_COMPLETE_RAW_NAME,
time::DESERIALIZE_TIME_RAW_NAME,
},
state::AtemState,
}; };
#[derive(Default)]
pub enum AtemConnectionStatus {
#[default]
Closed,
Connecting,
Connected,
}
pub struct Atem { pub struct Atem {
waiting_semaphores: tokio::sync::RwLock<HashMap<TrackingId, Arc<Semaphore>>>, waiting_semaphores: tokio::sync::RwLock<HashMap<TrackingId, Arc<Semaphore>>>,
socket_message_tx: tokio::sync::mpsc::Sender<AtemSocketMessage>, socket_message_tx: tokio::sync::mpsc::Sender<AtemSocketMessage>,
@ -39,6 +57,9 @@ impl Atem {
mut atem_event_rx: tokio::sync::mpsc::UnboundedReceiver<AtemEvent>, mut atem_event_rx: tokio::sync::mpsc::UnboundedReceiver<AtemEvent>,
cancel: CancellationToken, cancel: CancellationToken,
) { ) {
let mut status = AtemConnectionStatus::default();
let mut state = AtemState::default();
while !cancel.is_cancelled() { while !cancel.is_cancelled() {
match atem_event_rx.try_recv() { match atem_event_rx.try_recv() {
Ok(event) => match event { Ok(event) => match event {
@ -49,7 +70,9 @@ impl Atem {
log::info!("Atem connected"); log::info!("Atem connected");
} }
AtemEvent::Disconnected => todo!(), AtemEvent::Disconnected => todo!(),
AtemEvent::ReceivedCommand(_) => todo!(), AtemEvent::ReceivedCommands(commands) => {
self.mutate_state(&mut state, &mut status, commands).await
}
AtemEvent::AckedCommand(tracking_id) => { AtemEvent::AckedCommand(tracking_id) => {
log::debug!("Received tracking Id {tracking_id}"); log::debug!("Received tracking Id {tracking_id}");
if let Some(semaphore) = if let Some(semaphore) =
@ -109,4 +132,25 @@ impl Atem {
self.waiting_semaphores.write().await.remove(tracking_id); self.waiting_semaphores.write().await.remove(tracking_id);
} }
} }
async fn mutate_state(
&self,
state: &mut AtemState,
status: &mut AtemConnectionStatus,
commands: VecDeque<Arc<dyn DeserializedCommand>>,
) {
for command in commands {
match command.raw_name() {
DESERIALIZE_VERSION_RAW_NAME => {
*state = AtemState::default();
*status = AtemConnectionStatus::Connecting
}
DESERIALIZE_INIT_COMPLETE_RAW_NAME => *status = AtemConnectionStatus::Connected,
DESERIALIZE_TIME_RAW_NAME => {
todo!("Time command")
}
_ => {}
}
}
}
} }

View File

@ -6,7 +6,7 @@ pub struct AtemPacket<'packet_buffer> {
remote_packet_id: u16, remote_packet_id: u16,
retransmit_requested_from_packet_id: Option<u16>, retransmit_requested_from_packet_id: Option<u16>,
ack_reply: Option<u16>, ack_reply: Option<u16>,
body: &'packet_buffer [u8], body: Option<&'packet_buffer [u8]>,
} }
pub enum AtemPacketErr { pub enum AtemPacketErr {
@ -48,7 +48,7 @@ impl<'packet_buffer> AtemPacket<'packet_buffer> {
self.remote_packet_id self.remote_packet_id
} }
pub fn body(&self) -> &[u8] { pub fn body(&self) -> Option<&[u8]> {
self.body self.body
} }
@ -89,7 +89,11 @@ impl<'packet_buffer> TryFrom<&'packet_buffer [u8]> for AtemPacket<'packet_buffer
let session_id = u16::from_be_bytes([buffer[2], buffer[3]]); let session_id = u16::from_be_bytes([buffer[2], buffer[3]]);
let remote_packet_id = u16::from_be_bytes([buffer[10], buffer[11]]); let remote_packet_id = u16::from_be_bytes([buffer[10], buffer[11]]);
let body = &buffer[12..]; let body = if buffer.len() > 12 {
Some(&buffer[12..])
} else {
None
};
let retransmit_requested_from_packet_id = let retransmit_requested_from_packet_id =
if flags & u8::from(PacketFlag::RetransmitRequest) > 0 { if flags & u8::from(PacketFlag::RetransmitRequest) > 0 {

View File

@ -1,4 +1,5 @@
use std::{ use std::{
collections::VecDeque,
fmt::Display, fmt::Display,
io, io,
net::SocketAddr, net::SocketAddr,
@ -59,7 +60,7 @@ pub enum AtemEvent {
Debug(String), Debug(String),
Connected, Connected,
Disconnected, Disconnected,
ReceivedCommand(Arc<dyn DeserializedCommand>), ReceivedCommands(VecDeque<Arc<dyn DeserializedCommand>>),
AckedCommand(TrackingId), AckedCommand(TrackingId),
} }
@ -406,8 +407,8 @@ impl AtemSocket {
self.last_received_packed_id = remote_packet_id; self.last_received_packed_id = remote_packet_id;
self.send_or_queue_ack().await; self.send_or_queue_ack().await;
if atem_packet.length() > 12 { if let Some(body) = atem_packet.body() {
self.on_commands_received(atem_packet.body()); self.on_commands_received(body);
} }
} else if self } else if self
.is_packet_covered_by_ack(self.last_received_packed_id, remote_packet_id) .is_packet_covered_by_ack(self.last_received_packed_id, remote_packet_id)
@ -526,6 +527,10 @@ impl AtemSocket {
fn on_commands_received(&mut self, payload: &[u8]) { fn on_commands_received(&mut self, payload: &[u8]) {
let commands = deserialize_commands(payload); let commands = deserialize_commands(payload);
let _ = self
.atem_event_tx
.send(AtemEvent::ReceivedCommands(commands));
} }
fn on_command_acknowledged(&mut self, packets: Vec<AckedPacket>) { fn on_command_acknowledged(&mut self, packets: Vec<AckedPacket>) {

View File

@ -3,6 +3,7 @@ use std::{collections::HashMap, fmt::Debug, sync::Arc};
use crate::{enums::ProtocolVersion, state::AtemState}; use crate::{enums::ProtocolVersion, state::AtemState};
pub trait DeserializedCommand: Send + Sync + Debug { pub trait DeserializedCommand: Send + Sync + Debug {
fn raw_name(&self) -> &'static str;
fn apply_to_state(&self, state: &mut AtemState) -> Vec<String>; fn apply_to_state(&self, state: &mut AtemState) -> Vec<String>;
} }

View File

@ -0,0 +1,34 @@
use std::sync::Arc;
use crate::enums::ProtocolVersion;
use super::command_base::{CommandDeserializer, DeserializedCommand};
pub const DESERIALIZE_VERSION_RAW_NAME: &str = "_ver";
#[derive(Debug)]
pub struct VersionCommand {
pub version: ProtocolVersion,
}
impl DeserializedCommand for VersionCommand {
fn raw_name(&self) -> &'static str {
DESERIALIZE_VERSION_RAW_NAME
}
fn apply_to_state(&self, state: &mut crate::state::AtemState) -> Vec<String> {
todo!()
}
}
#[derive(Default)]
pub struct VersionCommandDeserializer {}
impl CommandDeserializer for VersionCommandDeserializer {
fn deserialize(&self, buffer: &[u8]) -> std::sync::Arc<dyn DeserializedCommand> {
let version = u32::from_be_bytes([buffer[0], buffer[1], buffer[2], buffer[3]]);
let version: ProtocolVersion = version.try_into().expect("Invalid protocol version");
Arc::new(VersionCommand { version })
}
}

View File

@ -0,0 +1,27 @@
use std::sync::Arc;
use super::command_base::{CommandDeserializer, DeserializedCommand};
pub const DESERIALIZE_INIT_COMPLETE_RAW_NAME: &str = "InCm";
#[derive(Debug)]
pub struct InitComplete {}
impl DeserializedCommand for InitComplete {
fn raw_name(&self) -> &'static str {
DESERIALIZE_INIT_COMPLETE_RAW_NAME
}
fn apply_to_state(&self, state: &mut crate::state::AtemState) -> Vec<String> {
todo!()
}
}
#[derive(Default)]
pub struct InitCompleteDeserializer {}
impl CommandDeserializer for InitCompleteDeserializer {
fn deserialize(&self, buffer: &[u8]) -> std::sync::Arc<dyn DeserializedCommand> {
Arc::new(InitComplete {})
}
}

View File

@ -4,6 +4,8 @@ use crate::commands::command_base::{
BasicWritableCommand, CommandDeserializer, DeserializedCommand, SerializableCommand, BasicWritableCommand, CommandDeserializer, DeserializedCommand, SerializableCommand,
}; };
pub const DESERIALIZE_PROGRAM_INPUT_RAW_NAME: &str = "PrgI";
#[derive(Debug, new)] #[derive(Debug, new)]
pub struct ProgramInput { pub struct ProgramInput {
pub mix_effect: u8, pub mix_effect: u8,
@ -31,6 +33,10 @@ impl BasicWritableCommand for ProgramInput {
} }
impl DeserializedCommand for ProgramInput { impl DeserializedCommand for ProgramInput {
fn raw_name(&self) -> &'static str {
DESERIALIZE_PROGRAM_INPUT_RAW_NAME
}
fn apply_to_state(&self, state: &mut crate::state::AtemState) -> Vec<String> { fn apply_to_state(&self, state: &mut crate::state::AtemState) -> Vec<String> {
todo!() todo!()
} }

View File

@ -1,4 +1,7 @@
pub mod command_base; pub mod command_base;
pub mod device_profile;
pub mod init_complete;
pub mod mix_effects; pub mod mix_effects;
pub mod parse_commands; pub mod parse_commands;
pub mod tally_by_source; pub mod tally_by_source;
pub mod time;

View File

@ -2,8 +2,11 @@ use std::{collections::VecDeque, sync::Arc};
use super::{ use super::{
command_base::{CommandDeserializer, DeserializedCommand}, command_base::{CommandDeserializer, DeserializedCommand},
mix_effects::program_input::ProgramInputDeserializer, device_profile::{VersionCommandDeserializer, DESERIALIZE_VERSION_RAW_NAME},
tally_by_source::TallyBySourceDeserializer, init_complete::{InitCompleteDeserializer, DESERIALIZE_INIT_COMPLETE_RAW_NAME},
mix_effects::program_input::{ProgramInputDeserializer, DESERIALIZE_PROGRAM_INPUT_RAW_NAME},
tally_by_source::{TallyBySourceDeserializer, DESERIALIZE_TALLY_BY_SOURCE_RAW_NAME},
time::{TimeDeserializer, DESERIALIZE_TIME_RAW_NAME},
}; };
pub fn deserialize_commands(payload: &[u8]) -> VecDeque<Arc<dyn DeserializedCommand>> { pub fn deserialize_commands(payload: &[u8]) -> VecDeque<Arc<dyn DeserializedCommand>> {
@ -36,8 +39,11 @@ pub fn deserialize_commands(payload: &[u8]) -> VecDeque<Arc<dyn DeserializedComm
fn command_deserializer_from_string(command_str: &str) -> Option<Box<dyn CommandDeserializer>> { fn command_deserializer_from_string(command_str: &str) -> Option<Box<dyn CommandDeserializer>> {
match command_str { match command_str {
"PrgI" => Some(Box::<ProgramInputDeserializer>::default()), DESERIALIZE_VERSION_RAW_NAME => Some(Box::<VersionCommandDeserializer>::default()),
"TlSr" => Some(Box::<TallyBySourceDeserializer>::default()), DESERIALIZE_INIT_COMPLETE_RAW_NAME => Some(Box::<InitCompleteDeserializer>::default()),
DESERIALIZE_PROGRAM_INPUT_RAW_NAME => Some(Box::<ProgramInputDeserializer>::default()),
DESERIALIZE_TALLY_BY_SOURCE_RAW_NAME => Some(Box::<TallyBySourceDeserializer>::default()),
DESERIALIZE_TIME_RAW_NAME => Some(Box::<TimeDeserializer>::default()),
_ => None, _ => None,
} }
} }

View File

@ -2,6 +2,8 @@ use std::{collections::HashMap, sync::Arc};
use super::command_base::{CommandDeserializer, DeserializedCommand}; use super::command_base::{CommandDeserializer, DeserializedCommand};
pub const DESERIALIZE_TALLY_BY_SOURCE_RAW_NAME: &str = "TlSr";
#[derive(Debug)] #[derive(Debug)]
pub struct TallySource { pub struct TallySource {
pub program: bool, pub program: bool,
@ -14,6 +16,10 @@ pub struct TallyBySource {
} }
impl DeserializedCommand for TallyBySource { impl DeserializedCommand for TallyBySource {
fn raw_name(&self) -> &'static str {
DESERIALIZE_TALLY_BY_SOURCE_RAW_NAME
}
fn apply_to_state(&self, state: &mut crate::state::AtemState) -> Vec<String> { fn apply_to_state(&self, state: &mut crate::state::AtemState) -> Vec<String> {
todo!() todo!()
} }

View File

@ -0,0 +1,49 @@
use std::sync::Arc;
use super::command_base::{CommandDeserializer, DeserializedCommand};
pub const DESERIALIZE_TIME_RAW_NAME: &str = "Time";
#[derive(Debug)]
pub struct TimeInfo {
pub hour: u8,
pub minute: u8,
pub second: u8,
pub frame: u8,
pub drop_frame: bool,
}
#[derive(Debug)]
pub struct Time {
info: TimeInfo,
}
impl DeserializedCommand for Time {
fn raw_name(&self) -> &'static str {
DESERIALIZE_TIME_RAW_NAME
}
fn apply_to_state(&self, state: &mut crate::state::AtemState) -> Vec<String> {
todo!()
}
}
#[derive(Default)]
pub struct TimeDeserializer {}
impl CommandDeserializer for TimeDeserializer {
fn deserialize(
&self,
buffer: &[u8],
) -> std::sync::Arc<dyn super::command_base::DeserializedCommand> {
let info = TimeInfo {
hour: buffer[0],
minute: buffer[1],
second: buffer[2],
frame: buffer[3],
drop_frame: buffer[5] == 1,
};
Arc::new(Time { info })
}
}

View File

@ -21,7 +21,7 @@ pub enum Model {
MiniExtremeISO = 0x11, MiniExtremeISO = 0x11,
} }
#[derive(Default)] #[derive(Debug, Default)]
pub enum ProtocolVersion { pub enum ProtocolVersion {
#[default] #[default]
Unknown = 0, Unknown = 0,
@ -32,6 +32,22 @@ pub enum ProtocolVersion {
V8_1_1 = 0x0002001e, // 2.30 V8_1_1 = 0x0002001e, // 2.30
} }
impl TryFrom<u32> for ProtocolVersion {
type Error = ();
fn try_from(value: u32) -> Result<Self, Self::Error> {
match value {
0 => Ok(ProtocolVersion::Unknown),
0x00020016 => Ok(ProtocolVersion::V7_2),
0x0002001b => Ok(ProtocolVersion::V7_5_2),
0x0002001c => Ok(ProtocolVersion::V8_0),
0x0002001d => Ok(ProtocolVersion::V8_0_1),
0x0002001e => Ok(ProtocolVersion::V8_1_1),
_ => Ok(ProtocolVersion::Unknown),
}
}
}
pub enum TransitionStyle { pub enum TransitionStyle {
MIX = 0x00, MIX = 0x00,
DIP = 0x01, DIP = 0x01,

View File

@ -16,7 +16,7 @@ pub trait UpstreamKeyerTypeSettings {
fn set_fly_enabled(&mut self, enabled: bool); fn set_fly_enabled(&mut self, enabled: bool);
} }
pub trait UpstreamKeyerMaskSettings { pub trait UpstreamKeyerMaskSettings: Send {
fn get_mask_enabled(&self) -> bool; fn get_mask_enabled(&self) -> bool;
fn set_mask_enabled(&mut self, enabled: bool); fn set_mask_enabled(&mut self, enabled: bool);
fn get_mask_top(&self) -> f64; fn get_mask_top(&self) -> f64;