feat: Remove hyperdecks, bundle client

This commit is contained in:
Baud 2024-05-22 01:20:03 +01:00
parent f9cac35a29
commit 8cce21193a
19 changed files with 3789 additions and 2575 deletions

2
.gitignore vendored
View File

@ -1,2 +1,4 @@
.direnv .direnv
target/ target/
index.js
node_modules/

2482
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,12 +1,24 @@
[workspace] [package]
members = [ name = "hyperdeck-monitor"
"monitor", version = "0.1.0"
"frontend", edition = "2021"
]
resolver = "2"
[profile.release] default-run = "hyperdeck-monitor"
opt-level = 2
[profile.dev.package."*"] [dependencies]
opt-level = 2 axum = { version = "0.6.10", features = ["macros", "ws"] }
color-eyre = "0.6.3"
futures = { version = "0.3.25" }
futures-util = "0.3.30"
serde = { version = "1.0.199", features = ["derive"] }
serde_json = "1.0.116"
tokio = { version = "1.37.0", features = ["full"] }
tokio-stream = { version = "0.1.12", features = ["sync"] }
tokio-tungstenite = "0.21.0"
tokio-util = { version = "0.7.11", features = ["full"] }
tower = { version = "0.4.13", features = ["full"] }
tower-http = { version = "0.4.0", features = ["full"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
url = "2.5.0"
uuid = { version = "1.2.2", features = ["serde", "v4"] }

140
build.rs Normal file
View File

@ -0,0 +1,140 @@
use std::{
fs::{self, DirEntry},
io::Error,
path::PathBuf,
process::Command,
};
fn main() {
println!("cargo:rerun-if-changed=package.json");
println!("cargo:rerun-if-changed=package-lock.json");
println!("cargo:rerun-if-changed=index.ts");
println!("cargo:rerun-if-changed=./frontend");
let npm_install_output = Command::new("npm")
.arg("install")
.output()
.expect("Failed to run `npm install`");
if !npm_install_output.status.success() {
let err = String::from_utf8(npm_install_output.stderr).unwrap_or("Unknown".to_string());
panic!("Running `npm install` failed, reason: {err}");
}
let ts_build_output = Command::new("npm")
.arg("run")
.arg("build")
.output()
.expect("Failed to run `npm run build`");
if !ts_build_output.status.success() {
let err = String::from_utf8(ts_build_output.stderr).unwrap_or("Unknown".to_string());
panic!("Running `npm run build` failed, reason: {err}");
}
let trunk_build_output = Command::new("trunk")
.arg("build")
.arg("--dist")
.arg("./frontend/dist")
.arg("./frontend/index.html")
.spawn()
.unwrap()
.wait_with_output()
.expect("Failed to run `trunk build`");
if !trunk_build_output.status.success() {
let err = String::from_utf8(trunk_build_output.stderr).unwrap_or("Unknown".to_string());
panic!("Running `trunk build` failed, reason {err}");
}
let dist_paths = fs::read_dir("./frontend/dist").expect("Could not read dist directory");
let paths: Vec<Result<DirEntry, Error>> = dist_paths.collect();
let paths: Vec<(String, PathBuf)> = paths
.into_iter()
.filter_map(|path| path.ok())
.collect::<Vec<DirEntry>>()
.into_iter()
.filter_map(|entry| {
let Ok(file_type) = entry.file_type() else {
return None;
};
if file_type.is_file() {
let Ok(canonical_path) = entry.path().canonicalize() else {
return None;
};
canonical_path
.to_str()
.map(|path_str| (path_str.to_string(), canonical_path.clone()))
} else {
None
}
})
.collect();
for (path_str, path) in paths.iter() {
println!("{path_str} {path:?}");
}
let (index_str, index_path) = paths
.iter()
.find(|(path_str, _)| path_str.ends_with("index.html"))
.expect("Could not find index.html");
let (wasm_str, wasm_path) = paths
.iter()
.find(|(path_str, _)| {
path_str.contains("hyperdeck_monitor_gui") && path_str.ends_with(".wasm")
})
.expect("Could not find WASM source file");
let (js_str, js_path) = paths
.iter()
.find(|(path_str, _)| {
path_str.contains("hyperdeck_monitor_gui") && path_str.ends_with(".js")
})
.expect("Could not find JS source file");
let (manifest_str, manifest_path) = paths
.iter()
.find(|(path_str, _)| path_str.contains("manifest") && path_str.ends_with(".json"))
.expect("Could not find manifest.json");
let (service_worker_str, service_worker_path) = paths
.iter()
.find(|(path_str, _)| path_str.contains("sw") && path_str.ends_with(".js"))
.expect("Could not find sw.js");
// Yeah it's a bit redundant but 🤷
let index_name = index_path
.file_name()
.expect("Could not get file name for index.html")
.to_str()
.expect("Could not get file name for index.html");
let wasm_name = wasm_path
.file_name()
.expect("Could not get file name for WASM source file")
.to_str()
.expect("Could not get file name for WASM source file");
let js_name = js_path
.file_name()
.expect("Could not get file name for JS source file")
.to_str()
.expect("Could not get file name for JS source file");
let manifest_name = manifest_path
.file_name()
.expect("Could not get file name for manifest.json file")
.to_str()
.expect("Could not get file name for manifest.json file");
let service_worker_name = service_worker_path
.file_name()
.expect("Could not get file name for service worker file")
.to_str()
.expect("Could not get file name for service worker file");
println!("cargo:rustc-env=INCLUDE_PATH_INDEX={index_str}");
println!("cargo:rustc-env=INCLUDE_PATH_WASM={wasm_str}");
println!("cargo:rustc-env=INCLUDE_PATH_JS={js_str}");
println!("cargo:rustc-env=INCLUDE_PATH_MANIFEST={manifest_str}");
println!("cargo:rustc-env=INCLUDE_PATH_SERVICE_WORKER={service_worker_str}");
println!("cargo:rustc-env=FILE_NAME_INDEX={index_name}");
println!("cargo:rustc-env=FILE_NAME_WASM={wasm_name}");
println!("cargo:rustc-env=FILE_NAME_JS={js_name}");
println!("cargo:rustc-env=FILE_NAME_MANIFEST={manifest_name}");
println!("cargo:rustc-env=FILE_NAME_SERVICE_WORKER={service_worker_name}");
}

3427
frontend/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
fn main() { fn main() {
// Sorry, I know it's horrible but I don't have the spoons to make it right. // Sorry, I know it's horrible but I don't have the spoons to make it right.
const API_DEFINITION_FILE: &str = "../monitor/src/api/message.rs"; const API_DEFINITION_FILE: &str = "../src/api/message.rs";
println!("cargo:rerun-if-changed={API_DEFINITION_FILE}"); println!("cargo:rerun-if-changed={API_DEFINITION_FILE}");
std::fs::copy(API_DEFINITION_FILE, "./src/websocket.rs") std::fs::copy(API_DEFINITION_FILE, "./src/websocket.rs")
.expect("Failed to copy API definition"); .expect("Failed to copy API definition");

View File

@ -9,7 +9,10 @@ use egui::{Button, Color32, RichText, Sense, Stroke, Vec2};
use ewebsock::{Options, WsReceiver, WsSender}; use ewebsock::{Options, WsReceiver, WsSender};
use wasm_timer::Instant; use wasm_timer::Instant;
use crate::websocket::{AddHyperdeckRequest, ClientRequest}; use crate::websocket::{
AddHyperdeckRequest, ClientRequest, HyperdeckConnectionState, RemoveHyperdeckRequest,
ServerEvent,
};
pub struct HyperdeckMonitorApp { pub struct HyperdeckMonitorApp {
blink: bool, blink: bool,
@ -34,26 +37,28 @@ impl Default for HyperdeckMonitorApp {
new_hyperdeck_name: "".to_owned(), new_hyperdeck_name: "".to_owned(),
new_hyperdeck_port: 9993.to_string(), new_hyperdeck_port: 9993.to_string(),
hyperdecks: vec![ hyperdecks: vec![
Hyperdeck { // Hyperdeck {
name: "Test Hyperdeck 1".to_string(), // id: "test-1".to_string(),
ip: IpAddr::V4(Ipv4Addr::new(192, 168, 10, 1)), // name: "Test Hyperdeck 1".to_string(),
status: HyperdeckStatus::Connected, // ip: IpAddr::V4(Ipv4Addr::new(192, 168, 10, 1)),
recording_bays: vec![HyperdeckRecordBay { // status: HyperdeckStatus::Connected,
status: RecordingStatus::NotRecording, // recording_bays: vec![HyperdeckRecordBay {
storage_capacity_mb: 500_000, // status: RecordingStatus::NotRecording,
recording_time_remaining: TimeRemaining(60), // storage_capacity_mb: 500_000,
}], // recording_time_remaining: TimeRemaining(60),
}, // }],
Hyperdeck { // },
name: "Test Hyperdeck 2".to_string(), // Hyperdeck {
ip: IpAddr::V4(Ipv4Addr::new(192, 168, 10, 2)), // id: "test-2".to_string(),
status: HyperdeckStatus::Disconnected, // name: "Test Hyperdeck 2".to_string(),
recording_bays: vec![HyperdeckRecordBay { // ip: IpAddr::V4(Ipv4Addr::new(192, 168, 10, 2)),
status: RecordingStatus::NotRecording, // status: HyperdeckStatus::Disconnected,
storage_capacity_mb: 500_000, // recording_bays: vec![HyperdeckRecordBay {
recording_time_remaining: TimeRemaining(3600 * 5), // 5 Hours // status: RecordingStatus::NotRecording,
}], // storage_capacity_mb: 500_000,
}, // recording_time_remaining: TimeRemaining(3600 * 5), // 5 Hours
// }],
// },
], ],
websocket_message_queue: VecDeque::new(), websocket_message_queue: VecDeque::new(),
ws_sender, ws_sender,
@ -69,6 +74,26 @@ impl eframe::App for HyperdeckMonitorApp {
serde_json::to_string(&message).expect("Could not serialize message"), serde_json::to_string(&message).expect("Could not serialize message"),
)); ));
} }
if let Some(ewebsock::WsEvent::Message(ewebsock::WsMessage::Text(event))) =
self.ws_receiver.try_recv()
{
if let Ok(received) = serde_json::from_str::<ServerEvent>(&event) {
match received {
ServerEvent::HyperdeckMonitorState(state) => {
self.hyperdecks = Default::default();
for (id, hyperdeck) in state.hyperdecks {
self.hyperdecks.push(Hyperdeck {
id,
name: hyperdeck.name,
ip: hyperdeck.ip.parse().unwrap(),
status: hyperdeck.connection_state.into(),
recording_bays: vec![],
})
}
}
}
}
}
egui::TopBottomPanel::top("top_panel").show(ctx, |ui| { egui::TopBottomPanel::top("top_panel").show(ctx, |ui| {
egui::menu::bar(ui, |ui| { egui::menu::bar(ui, |ui| {
egui::widgets::global_dark_light_mode_buttons(ui); egui::widgets::global_dark_light_mode_buttons(ui);
@ -86,7 +111,12 @@ impl eframe::App for HyperdeckMonitorApp {
ui.separator(); ui.separator();
ui.vertical(|ui| { ui.vertical(|ui| {
hyperdeck_list(ui, &self.hyperdecks, self.blink); hyperdeck_list(
ui,
&self.hyperdecks,
self.blink,
&mut self.websocket_message_queue,
);
}); });
ui.with_layout(egui::Layout::bottom_up(egui::Align::LEFT), |ui| { ui.with_layout(egui::Layout::bottom_up(egui::Align::LEFT), |ui| {
@ -135,7 +165,12 @@ fn add_hyperdeck_panel(
}); });
} }
fn hyperdeck_list(ui: &mut egui::Ui, hyperdecks: &[Hyperdeck], blink: bool) { fn hyperdeck_list(
ui: &mut egui::Ui,
hyperdecks: &[Hyperdeck],
blink: bool,
message_queue: &mut VecDeque<ClientRequest>,
) {
for hyperdeck in hyperdecks { for hyperdeck in hyperdecks {
ui.vertical(|ui| { ui.vertical(|ui| {
ui.horizontal(|ui| { ui.horizontal(|ui| {
@ -152,6 +187,13 @@ fn hyperdeck_list(ui: &mut egui::Ui, hyperdecks: &[Hyperdeck], blink: bool) {
let hyperdeck_heading: RichText = let hyperdeck_heading: RichText =
format!("{} [{}]", hyperdeck.name, hyperdeck.ip).into(); format!("{} [{}]", hyperdeck.name, hyperdeck.ip).into();
ui.heading(hyperdeck_heading.strong()); ui.heading(hyperdeck_heading.strong());
if ui.button("Remove").clicked() {
message_queue.push_back(ClientRequest::RemoveHyperdeck(
RemoveHyperdeckRequest {
id: hyperdeck.id.clone(),
},
));
}
}); });
if !hyperdeck.recording_bays.is_empty() if !hyperdeck.recording_bays.is_empty()
&& matches!(hyperdeck.status, HyperdeckStatus::Connected) && matches!(hyperdeck.status, HyperdeckStatus::Connected)
@ -195,6 +237,7 @@ fn connection_status(ui: &mut egui::Ui) {
#[derive(serde::Deserialize, serde::Serialize)] #[derive(serde::Deserialize, serde::Serialize)]
struct Hyperdeck { struct Hyperdeck {
id: String,
name: String, name: String,
ip: IpAddr, ip: IpAddr,
status: HyperdeckStatus, status: HyperdeckStatus,
@ -207,6 +250,15 @@ enum HyperdeckStatus {
Disconnected, Disconnected,
} }
impl From<HyperdeckConnectionState> for HyperdeckStatus {
fn from(value: HyperdeckConnectionState) -> Self {
match value {
HyperdeckConnectionState::Connected => HyperdeckStatus::Connected,
HyperdeckConnectionState::Disconnected => HyperdeckStatus::Disconnected,
}
}
}
#[derive(serde::Deserialize, serde::Serialize)] #[derive(serde::Deserialize, serde::Serialize)]
struct HyperdeckRecordBay { struct HyperdeckRecordBay {
status: RecordingStatus, status: RecordingStatus,

View File

@ -11,7 +11,7 @@ fn main() -> eframe::Result<()> {
eframe::run_native( eframe::run_native(
"eframe template", "eframe template",
native_options, native_options,
Box::new(|_| Box::new(hyperdeck_monitor_gui::HyperdeckMonitorApp::default())), Box::new(|_| Box::<hyperdeck_monitor_gui::HyperdeckMonitorApp>::default()),
) )
} }
@ -26,7 +26,7 @@ fn main() {
.start( .start(
"the_canvas_id", // Where to draw to. "the_canvas_id", // Where to draw to.
web_options, web_options,
Box::new(|_| Box::new(hyperdeck_monitor_gui::HyperdeckMonitorApp::default())), Box::new(|_| Box::<hyperdeck_monitor_gui::HyperdeckMonitorApp>::default()),
) )
.await .await
.expect("failed to start eframe"); .expect("failed to start eframe");

View File

@ -10,7 +10,8 @@ interface WrappedHyperdeck {
const hyperdecks: Map<string, WrappedHyperdeck> = new Map() const hyperdecks: Map<string, WrappedHyperdeck> = new Map()
enum WebSocketMessageType { enum WebSocketMessageType {
AddHyperdeck = "add_hyperdeck" AddHyperdeck = "add_hyperdeck",
RemoveHyperdeck = "remove_hyperdeck"
} }
type WebSocketMessage = { type WebSocketMessage = {
@ -18,6 +19,9 @@ type WebSocketMessage = {
id: string, id: string,
ip: string, ip: string,
port: number port: number
} | {
type: WebSocketMessageType.RemoveHyperdeck,
id: string
} }
const wss = new WebSocket.Server({ port: 7867 }); const wss = new WebSocket.Server({ port: 7867 });
@ -58,11 +62,11 @@ function handle_message(message: Partial<WebSocketMessage>) {
const newHyperdeck = new Hyperdeck() const newHyperdeck = new Hyperdeck()
// hyperdecks.set(message.id, { hyperdecks.set(message.id, {
// ip: message.ip, ip: message.ip,
// port: message.port, port: message.port,
// hyperdeck: newHyperdeck hyperdeck: newHyperdeck
// }); });
newHyperdeck.on('connected', (info) => { newHyperdeck.on('connected', (info) => {
console.log(JSON.stringify(info)) console.log(JSON.stringify(info))
@ -84,8 +88,20 @@ function handle_message(message: Partial<WebSocketMessage>) {
newHyperdeck.connect(message.ip, message.port) newHyperdeck.connect(message.ip, message.port)
break;
case WebSocketMessageType.RemoveHyperdeck:
if (message.id === undefined) return;
console.log("Removing hyperdeck");
let hyperdeck = hyperdecks.get(message.id)
if (hyperdeck === undefined) return;
hyperdeck.hyperdeck.disconnect()
hyperdecks.delete(message.id)
break; break;
default: default:
exhaustiveMatch(message.type) exhaustiveMatch(message)
} }
} }

2
monitor/.gitignore vendored
View File

@ -1,2 +0,0 @@
node_modules/
index.js

View File

@ -1,24 +0,0 @@
[package]
name = "hyperdeck-monitor"
version = "0.1.0"
edition = "2021"
default-run = "hyperdeck-monitor"
[dependencies]
axum = { version = "0.6.10", features = ["macros", "ws"] }
color-eyre = "0.6.3"
futures = { version = "0.3.25" }
futures-util = "0.3.30"
serde = { version = "1.0.199", features = ["derive"] }
serde_json = "1.0.116"
tokio = { version = "1.37.0", features = ["full"] }
tokio-stream = { version = "0.1.12", features = ["sync"] }
tokio-tungstenite = "0.21.0"
tokio-util = { version = "0.7.11", features = ["full"] }
tower = { version = "0.4.13", features = ["full"] }
tower-http = { version = "0.4.0", features = ["full"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
url = "2.5.0"
uuid = { version = "1.2.2", features = ["serde", "v4"] }

View File

@ -1,26 +0,0 @@
use std::process::Command;
fn main() {
println!("cargo:rerun-if-changed=package.json");
println!("cargo:rerun-if-changed=package-lock.json");
println!("cargo:rerun-if-changed=index.ts");
let npm_install_output = Command::new("npm")
.arg("install")
.output()
.expect("Failed to run `npm install`");
if !npm_install_output.status.success() {
let err = String::from_utf8(npm_install_output.stderr).unwrap_or("Unknown".to_string());
panic!("Running `npm install` failed, reason: {err}");
}
let ts_build_output = Command::new("npm")
.arg("run")
.arg("build")
.output()
.expect("Failed to run `npm run build`");
if !ts_build_output.status.success() {
let err = String::from_utf8(ts_build_output.stderr).unwrap_or("Unknown".to_string());
panic!("Running `npm run build` failed, reason: {err}");
}
}

View File

@ -1,6 +1,7 @@
use axum::extract::ws::Message; use axum::extract::ws::Message;
use axum::extract::{State, WebSocketUpgrade}; use axum::extract::{State, WebSocketUpgrade};
use axum::response::Html; use axum::response::Html;
use axum::Json;
use axum::{ use axum::{
body::Bytes, body::Bytes,
extract::Path, extract::Path,
@ -32,6 +33,12 @@ use uuid::Uuid;
pub mod message; pub mod message;
mod ws; mod ws;
const FILE_NAME_INDEX: &str = env!("FILE_NAME_INDEX");
const FILE_NAME_WASM: &str = env!("FILE_NAME_WASM");
const FILE_NAME_JS: &str = env!("FILE_NAME_JS");
const FILE_NAME_MANIFEST: &str = env!("FILE_NAME_MANIFEST");
const FILE_NAME_SERVICE_WORKER: &str = env!("FILE_NAME_SERVICE_WORKER");
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Client { pub struct Client {
pub sender: Option<tokio::sync::broadcast::Sender<Message>>, pub sender: Option<tokio::sync::broadcast::Sender<Message>>,
@ -136,6 +143,14 @@ fn app(state: AppState) -> Router {
Router::new() Router::new()
.route("/", get(get_index)) .route("/", get(get_index))
.route(&format!("/{}", FILE_NAME_INDEX), get(get_index))
.route(&format!("/{}", FILE_NAME_WASM), get(get_wasm))
.route(&format!("/{}", FILE_NAME_JS), get(get_js))
.route(&format!("/{}", FILE_NAME_MANIFEST), get(get_manifest))
.route(
&format!("/{}", FILE_NAME_SERVICE_WORKER),
get(get_service_worker),
)
.route("/ws", get(upgrade_ws)) .route("/ws", get(upgrade_ws))
.layer(middleware) .layer(middleware)
.layer(cors) .layer(cors)
@ -146,7 +161,32 @@ fn app(state: AppState) -> Router {
pub struct WebSocketUpgradeRequest {} pub struct WebSocketUpgradeRequest {}
async fn get_index() -> Html<String> { async fn get_index() -> Html<String> {
Html(format!("Hello!")) Html(include_str!(env!("INCLUDE_PATH_INDEX")).to_string())
}
async fn get_wasm() -> impl IntoResponse {
(
[(header::CONTENT_TYPE, "application/wasm")],
include_bytes!(env!("INCLUDE_PATH_WASM")),
)
}
async fn get_js() -> impl IntoResponse {
(
[(header::CONTENT_TYPE, "text/javascript")],
include_str!(env!("INCLUDE_PATH_JS")),
)
}
async fn get_manifest() -> Json<String> {
Json(include_str!(env!("INCLUDE_PATH_MANIFEST")).to_string())
}
async fn get_service_worker() -> impl IntoResponse {
(
[(header::CONTENT_TYPE, "text/javascript")],
include_str!(env!("INCLUDE_PATH_SERVICE_WORKER")),
)
} }
#[axum::debug_handler] #[axum::debug_handler]
@ -157,7 +197,7 @@ async fn upgrade_ws(state: State<AppState>, ws: WebSocketUpgrade) -> impl IntoRe
.clients .clients
.lock() .lock()
.await .await
.insert(client_id.clone(), Client { sender: None }); .insert(client_id, Client { sender: None });
let client = state.clients.lock().await.get(&client_id).cloned().unwrap(); let client = state.clients.lock().await.get(&client_id).cloned().unwrap();
ws.on_upgrade(move |socket| { ws.on_upgrade(move |socket| {
ws::client_connection( ws::client_connection(

View File

@ -7,6 +7,7 @@ use serde::{Deserialize, Serialize};
#[serde(tag = "type")] #[serde(tag = "type")]
pub enum ClientRequest { pub enum ClientRequest {
AddHyperdeck(AddHyperdeckRequest), AddHyperdeck(AddHyperdeckRequest),
RemoveHyperdeck(RemoveHyperdeckRequest),
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
@ -16,6 +17,11 @@ pub struct AddHyperdeckRequest {
pub port: u16, pub port: u16,
} }
#[derive(Debug, Serialize, Deserialize)]
pub struct RemoveHyperdeckRequest {
pub id: String,
}
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
#[serde(tag = "type")] #[serde(tag = "type")]
@ -33,4 +39,11 @@ pub struct HyperdeckState {
pub name: String, pub name: String,
pub ip: String, pub ip: String,
pub port: u16, pub port: u16,
pub connection_state: HyperdeckConnectionState,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum HyperdeckConnectionState {
Connected,
Disconnected,
} }

View File

@ -1,6 +1,9 @@
use std::{process::Stdio, time::Duration}; use std::{process::Stdio, time::Duration};
use api::message::{AddHyperdeckRequest, ClientRequest, HyperdeckMonitorState, HyperdeckState}; use api::message::{
AddHyperdeckRequest, ClientRequest, HyperdeckConnectionState, HyperdeckMonitorState,
HyperdeckState, RemoveHyperdeckRequest,
};
use color_eyre::Report; use color_eyre::Report;
use futures_util::{ use futures_util::{
pin_mut, select, pin_mut, select,
@ -71,15 +74,8 @@ async fn run(
let mut state = HyperdeckMonitorState::default(); let mut state = HyperdeckMonitorState::default();
let _ = state_tx.send(state.clone()); let _ = state_tx.send(state.clone());
let mut ping_interval = tokio::time::interval(Duration::from_millis(500));
ping_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
while !cancel.is_cancelled() { while !cancel.is_cancelled() {
let state_modified = select! { let state_modified = select! {
_ = ping_interval.tick().fuse() => {
// TODO: Ping node, check it's still alive
false
},
message_from_node = node_ws_message_rx.recv().fuse() => { message_from_node = node_ws_message_rx.recv().fuse() => {
if let Some(msg) = message_from_node { if let Some(msg) = message_from_node {
handle_message_from_node(msg, &mut node_commands_tx, &mut state).await handle_message_from_node(msg, &mut node_commands_tx, &mut state).await
@ -112,6 +108,18 @@ async fn handle_message_from_node(
tracing::info!("[NODE] {message}"); tracing::info!("[NODE] {message}");
false false
} }
NodeWsMessageReceived::HyperdeckConnected { id } => {
state.hyperdecks.entry(id).and_modify(|hyperdeck| {
hyperdeck.connection_state = HyperdeckConnectionState::Connected
});
true
}
NodeWsMessageReceived::HypderdeckDisconnected { id } => {
state.hyperdecks.entry(id).and_modify(|hyperdeck| {
hyperdeck.connection_state = HyperdeckConnectionState::Disconnected
});
true
}
} }
} }
@ -130,6 +138,7 @@ async fn handle_message_from_client(
name, name,
ip: ip.clone(), ip: ip.clone(),
port, port,
connection_state: api::message::HyperdeckConnectionState::Disconnected,
}, },
); );
let _ = node_commands_tx.send(NodeWsCommand::AddHyperdeck(AddHyperdeckCommand { let _ = node_commands_tx.send(NodeWsCommand::AddHyperdeck(AddHyperdeckCommand {
@ -139,6 +148,13 @@ async fn handle_message_from_client(
})); }));
true true
} }
ClientRequest::RemoveHyperdeck(RemoveHyperdeckRequest { id }) => {
let _ = state.hyperdecks.remove(&id);
let _ = node_commands_tx.send(NodeWsCommand::RemoveHyperdeck(RemoveHyperdeckCommand {
id,
}));
true
}
} }
} }
@ -148,7 +164,7 @@ async fn run_node_process(cancel: CancellationToken) {
tokio::time::sleep(Duration::from_secs(1)).await; tokio::time::sleep(Duration::from_secs(1)).await;
let result = tokio::process::Command::new("node") let result = tokio::process::Command::new("node")
.arg("monitor/index.js") .arg("./index.js")
.stdin(Stdio::piped()) .stdin(Stdio::piped())
.stdout(Stdio::piped()) .stdout(Stdio::piped())
.stderr(Stdio::piped()) .stderr(Stdio::piped())
@ -200,8 +216,6 @@ struct AppState {}
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type")] #[serde(tag = "type")]
enum NodeWsCommand { enum NodeWsCommand {
#[serde(rename = "ping")]
Ping,
#[serde(rename = "add_hyperdeck")] #[serde(rename = "add_hyperdeck")]
AddHyperdeck(AddHyperdeckCommand), AddHyperdeck(AddHyperdeckCommand),
#[serde(rename = "remove_hyperdeck")] #[serde(rename = "remove_hyperdeck")]
@ -213,6 +227,8 @@ enum NodeWsCommand {
#[serde(tag = "event")] #[serde(tag = "event")]
enum NodeWsMessageReceived { enum NodeWsMessageReceived {
Log { message: String }, Log { message: String },
HyperdeckConnected { id: String },
HypderdeckDisconnected { id: String },
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]