feat: Basic communication with Node subprocess

This commit is contained in:
Baud 2024-04-30 22:17:21 +01:00
commit a61eb0734d
13 changed files with 1596 additions and 0 deletions

2
.envrc Normal file
View File

@ -0,0 +1,2 @@
watch_file rust-toolchain.toml
use flake

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
.direnv
target/
node_modules/
index.js

1084
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

16
Cargo.toml Normal file
View File

@ -0,0 +1,16 @@
[package]
name = "hyperdeck-monitor"
version = "0.1.0"
edition = "2021"
[dependencies]
color-eyre = "0.6.3"
futures-util = "0.3.30"
serde = { version = "1.0.199", features = ["derive"] }
serde_json = "1.0.116"
tokio = { version = "1.37.0", features = ["full"] }
tokio-tungstenite = "0.21.0"
tokio-util = "0.7.10"
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
url = "2.5.0"

26
build.rs Normal file
View File

@ -0,0 +1,26 @@
use std::process::Command;
fn main() {
println!("cargo:rerun-if-changed=package.json");
println!("cargo:rerun-if-changed=package-lock.json");
println!("cargo:rerun-if-changed=index.ts");
let npm_install_output = Command::new("npm")
.arg("install")
.output()
.expect("Failed to run `npm install`");
if !npm_install_output.status.success() {
let err = String::from_utf8(npm_install_output.stderr).unwrap_or("Unknown".to_string());
panic!("Running `npm install` failed, reason: {err}");
}
let ts_build_output = Command::new("npm")
.arg("run")
.arg("build")
.output()
.expect("Failed to run `npm run build`");
if !ts_build_output.status.success() {
let err = String::from_utf8(ts_build_output.stderr).unwrap_or("Unknown".to_string());
panic!("Running `npm run build` failed, reason: {err}");
}
}

85
flake.lock Normal file
View File

@ -0,0 +1,85 @@
{
"nodes": {
"flake-utils": {
"inputs": {
"systems": "systems"
},
"locked": {
"lastModified": 1710146030,
"narHash": "sha256-SZ5L6eA7HJ/nmkzGG7/ISclqe6oZdOZTNoesiInkXPQ=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1714253743,
"narHash": "sha256-mdTQw2XlariysyScCv2tTE45QSU9v/ezLcHJ22f0Nxc=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "58a1abdbae3217ca6b702f03d3b35125d88a2994",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"flake-utils": "flake-utils",
"nixpkgs": "nixpkgs",
"rust-overlay": "rust-overlay"
}
},
"rust-overlay": {
"inputs": {
"flake-utils": [
"flake-utils"
],
"nixpkgs": [
"nixpkgs"
]
},
"locked": {
"lastModified": 1714356894,
"narHash": "sha256-W6Mss7AG6bnFT1BqRApHXvLXBrFOu7V0+EUe9iML30s=",
"owner": "oxalica",
"repo": "rust-overlay",
"rev": "d9b44509b4064f0a3fc9c7c92a603861f52fbedc",
"type": "github"
},
"original": {
"owner": "oxalica",
"repo": "rust-overlay",
"type": "github"
}
},
"systems": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
}
},
"root": "root",
"version": 7
}

46
flake.nix Normal file
View File

@ -0,0 +1,46 @@
{
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
flake-utils.url = "github:numtide/flake-utils";
rust-overlay = {
url = "github:oxalica/rust-overlay";
inputs = {
nixpkgs.follows = "nixpkgs";
flake-utils.follows = "flake-utils";
};
};
};
outputs = {
self,
nixpkgs,
flake-utils,
rust-overlay,
}:
flake-utils.lib.eachDefaultSystem
(
system: let
overlays = [(import rust-overlay)];
pkgs = import nixpkgs {
inherit system overlays;
};
rustToolchain = pkgs.pkgsBuildHost.rust-bin.fromRustupToolchainFile ./rust-toolchain.toml;
nativeBuildInputs = with pkgs; [
rustToolchain
nodejs
];
buildInputs = with pkgs; [
rustToolchain
nodejs
];
in
with pkgs; {
formatter = pkgs.alejandra;
devShells.default = mkShell {
inherit buildInputs nativeBuildInputs;
RUST_SRC_PATH = "${rustToolchain}/lib/rustlib/src/rust/library";
};
}
);
}

14
index.ts Normal file
View File

@ -0,0 +1,14 @@
import WebSocket from 'ws';
const wss = new WebSocket.Server({ port: 7867 });
wss.on('connection', function connection(ws) {
ws.on('message', function message(data) {
console.log('received: %s', data);
});
ws.send(JSON.stringify({
event: "log",
message: "Hello"
}));
});

100
package-lock.json generated Normal file
View File

@ -0,0 +1,100 @@
{
"name": "hyperdeck-monitor",
"version": "1.0.0",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "hyperdeck-monitor",
"version": "1.0.0",
"license": "ISC",
"dependencies": {
"hyperdeck-connection": "^2.0.1",
"ws": "^8.17.0"
},
"devDependencies": {
"@types/ws": "^8.5.10",
"typescript": "^5.4.5"
}
},
"node_modules/@types/node": {
"version": "20.12.7",
"resolved": "https://registry.npmjs.org/@types/node/-/node-20.12.7.tgz",
"integrity": "sha512-wq0cICSkRLVaf3UGLMGItu/PtdY7oaXaI/RVU+xliKVOtRna3PRY57ZDfztpDL0n11vfymMUnXv8QwYCO7L1wg==",
"dev": true,
"dependencies": {
"undici-types": "~5.26.4"
}
},
"node_modules/@types/ws": {
"version": "8.5.10",
"resolved": "https://registry.npmjs.org/@types/ws/-/ws-8.5.10.tgz",
"integrity": "sha512-vmQSUcfalpIq0R9q7uTo2lXs6eGIpt9wtnLdMv9LVpIjCA/+ufZRozlVoVelIYixx1ugCBKDhn89vnsEGOCx9A==",
"dev": true,
"dependencies": {
"@types/node": "*"
}
},
"node_modules/eventemitter3": {
"version": "4.0.7",
"resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.7.tgz",
"integrity": "sha512-8guHBZCwKnFhYdHr2ysuRWErTwhoN2X8XELRlrRwpmfeY2jjuUN4taQMsULKUVo1K4DvZl+0pgfyoysHxvmvEw=="
},
"node_modules/hyperdeck-connection": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/hyperdeck-connection/-/hyperdeck-connection-2.0.1.tgz",
"integrity": "sha512-80fBsX57048ps253+t+BiG4d0JnVXWJESxAVrlQz3u7f2BLjMNguvUoTAR/N5xbdkHGcY2f6T4XCKzIMegpg4g==",
"dependencies": {
"eventemitter3": "^4.0.7",
"tslib": "^2.6.2"
},
"engines": {
"node": ">=14.18"
}
},
"node_modules/tslib": {
"version": "2.6.2",
"resolved": "https://registry.npmjs.org/tslib/-/tslib-2.6.2.tgz",
"integrity": "sha512-AEYxH93jGFPn/a2iVAwW87VuUIkR1FVUKB77NwMF7nBTDkDrrT/Hpt/IrCJ0QXhW27jTBDcf5ZY7w6RiqTMw2Q=="
},
"node_modules/typescript": {
"version": "5.4.5",
"resolved": "https://registry.npmjs.org/typescript/-/typescript-5.4.5.tgz",
"integrity": "sha512-vcI4UpRgg81oIRUFwR0WSIHKt11nJ7SAVlYNIu+QpqeyXP+gpQJy/Z4+F0aGxSE4MqwjyXvW/TzgkLAx2AGHwQ==",
"dev": true,
"bin": {
"tsc": "bin/tsc",
"tsserver": "bin/tsserver"
},
"engines": {
"node": ">=14.17"
}
},
"node_modules/undici-types": {
"version": "5.26.5",
"resolved": "https://registry.npmjs.org/undici-types/-/undici-types-5.26.5.tgz",
"integrity": "sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA==",
"dev": true
},
"node_modules/ws": {
"version": "8.17.0",
"resolved": "https://registry.npmjs.org/ws/-/ws-8.17.0.tgz",
"integrity": "sha512-uJq6108EgZMAl20KagGkzCKfMEjxmKvZHG7Tlq0Z6nOky7YF7aq4mOx6xK8TJ/i1LeK4Qus7INktacctDgY8Ow==",
"engines": {
"node": ">=10.0.0"
},
"peerDependencies": {
"bufferutil": "^4.0.1",
"utf-8-validate": ">=5.0.2"
},
"peerDependenciesMeta": {
"bufferutil": {
"optional": true
},
"utf-8-validate": {
"optional": true
}
}
}
}
}

16
package.json Normal file
View File

@ -0,0 +1,16 @@
{
"name": "hyperdeck-monitor",
"version": "1.0.0",
"main": "index.js",
"scripts": {
"build": "tsc"
},
"dependencies": {
"hyperdeck-connection": "^2.0.1",
"ws": "^8.17.0"
},
"devDependencies": {
"@types/ws": "^8.5.10",
"typescript": "^5.4.5"
}
}

3
rust-toolchain.toml Normal file
View File

@ -0,0 +1,3 @@
[toolchain]
channel = "stable"
components = [ "rust-src", "cargo", "rustc" ]

192
src/main.rs Normal file
View File

@ -0,0 +1,192 @@
use std::{net::IpAddr, time::Duration};
use color_eyre::Report;
use futures_util::{
pin_mut, select,
stream::{SplitSink, SplitStream},
FutureExt, SinkExt, StreamExt,
};
use serde::{Deserialize, Serialize};
use tokio::net::TcpStream;
use tokio_tungstenite::{tungstenite::Message, MaybeTlsStream, WebSocketStream};
use tokio_util::sync::CancellationToken;
use tracing_subscriber::EnvFilter;
#[tokio::main]
async fn main() {
setup_logging().expect("Failed to setup logging");
tracing::info!("Hello, world!");
let cancel = CancellationToken::new();
let node_process = run_node_process(cancel.clone()).fuse();
let (ws_message_tx, ws_message_rx) = tokio::sync::mpsc::unbounded_channel();
let (commands_tx, commands_rx) = tokio::sync::mpsc::unbounded_channel();
let state = AppState::default();
let ws_process = talk_to_node_ws(state, ws_message_tx, commands_rx, cancel.clone()).fuse();
pin_mut!(node_process);
pin_mut!(ws_process);
select! {
_ = node_process => {},
_ = ws_process => {},
_ = cancel.cancelled().fuse() => {}
};
cancel.cancel();
}
async fn run_node_process(cancel: CancellationToken) {
while !cancel.is_cancelled() {
let result = tokio::process::Command::new("node")
.arg("index.js")
.output()
.await;
if let Ok(output) = result {
if !output.status.success() {
let err = String::from_utf8(output.stderr).unwrap_or("Unknown".to_string());
tracing::error!("Node process exited with error: {}", err);
// Back-off in case we are immediately crashing in a loop.
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
}
#[derive(Default)]
struct AppState {}
enum NodeWsCommand {
Ping,
AddHyperdeck(AddHyperdeckCommand),
RemoveHyperdeck(RemoveHyperdeckCommand),
}
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
#[serde(tag = "event")]
enum NodeWsMessageReceived {
Log { message: String },
}
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct AddHyperdeckCommand {
ip: IpAddr,
}
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct RemoveHyperdeckCommand {
ip: IpAddr,
}
async fn talk_to_node_ws(
state: AppState,
ws_message_tx: tokio::sync::mpsc::UnboundedSender<NodeWsMessageReceived>,
commands_rx: tokio::sync::mpsc::UnboundedReceiver<NodeWsCommand>,
cancel: CancellationToken,
) {
let ws_stream = wait_for_connection().await;
let (write, read) = ws_stream.split();
let outgoing = handle_outbound_messages(commands_rx, write).fuse();
let incoming = handle_inbound_messages(read, ws_message_tx).fuse();
pin_mut!(outgoing);
pin_mut!(incoming);
select! {
_ = outgoing => {},
_ = incoming => {},
_ = cancel.cancelled().fuse() => {},
}
}
async fn wait_for_connection() -> WebSocketStream<MaybeTlsStream<TcpStream>> {
loop {
// Wait for Node to wake up...
tokio::time::sleep(Duration::from_secs(1)).await;
let ws_url = url::Url::parse("ws://127.0.0.1:7867").expect("Invalid websocket URL");
match tokio_tungstenite::connect_async(ws_url.clone()).await {
Ok((ws_stream, _)) => {
tracing::info!("Connected to Node process on {ws_url}");
return ws_stream;
}
Err(err) => {
tracing::error!("Error connecting to Node process: {:?}", err)
}
}
}
}
async fn handle_outbound_messages(
mut commands_rx: tokio::sync::mpsc::UnboundedReceiver<NodeWsCommand>,
mut socket_tx: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
) {
while let Some(command) = commands_rx.recv().await {
match command {
NodeWsCommand::Ping => {
let _ = socket_tx
.send(tokio_tungstenite::tungstenite::Message::Ping(vec![]))
.await;
}
NodeWsCommand::AddHyperdeck(command) => {
let _ = socket_tx
.send(tokio_tungstenite::tungstenite::Message::Text(
serde_json::to_string(&command)
.expect("Could not serialize AddHyperdeck command"),
))
.await;
}
NodeWsCommand::RemoveHyperdeck(command) => {
let _ = socket_tx
.send(tokio_tungstenite::tungstenite::Message::Text(
serde_json::to_string(&command)
.expect("Could not serialize RemoveHyperdeck command"),
))
.await;
}
}
}
}
async fn handle_inbound_messages(
socket_rx: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
ws_message_tx: tokio::sync::mpsc::UnboundedSender<NodeWsMessageReceived>,
) {
socket_rx
.for_each(|message| async {
match message {
Ok(tokio_tungstenite::tungstenite::Message::Text(text)) => {
if let Ok(received) = serde_json::from_str::<NodeWsMessageReceived>(&text) {
match received {
NodeWsMessageReceived::Log { message } => {
tracing::info!("Message from Node process: {message}");
}
}
}
}
Ok(tokio_tungstenite::tungstenite::Message::Pong(_)) => {}
_ => {}
}
})
.await;
}
fn setup_logging() -> Result<(), Report> {
if std::env::var("RUST_LIB_BACKTRACE").is_err() {
std::env::set_var("RUST_LIB_BACKTRACE", "1");
}
color_eyre::install()?;
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "debug");
}
tracing_subscriber::fmt::fmt()
.with_env_filter(EnvFilter::from_default_env())
.init();
Ok(())
}

8
tsconfig.json Normal file
View File

@ -0,0 +1,8 @@
{
"include": [
"index.ts"
],
"compilerOptions": {
"esModuleInterop": true
},
}