diff --git a/src/lib.rs b/src/lib.rs index e6f82d9..34d2db7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,24 +1,24 @@ use ddc::Ddc; use ddc_i2c::{I2cDeviceDdc, I2cDeviceEnumerator}; -use rosc; -use rumqttc::mqttbytes::v4::Packet; -use rumqttc::{Client, Event, MqttOptions, QoS}; use std::collections::HashMap; use std::env; use std::error::Error; -use std::net::UdpSocket; use std::sync::mpsc::{channel, Receiver, Sender}; use std::thread; use std::time::{Duration, Instant}; use tracing::{event, Level}; +mod mqtt; +mod osc; + pub type StdError = Result>; enum Command { Monitor((usize, MonitorCommand)), } -enum MonitorCommand { +#[derive(Debug)] +pub enum MonitorCommand { Brightness(u8), Input(u8), } @@ -40,7 +40,7 @@ impl MonitorCommand { } // Run an i2c command handler for a specific i2c device, rate limiting each type of command -fn run_i2c(mut dev: I2cDeviceDdc, command_channel: Receiver) { +fn run_i2c(idx: usize, mut dev: I2cDeviceDdc, command_channel: Receiver) { let mut last_sent_command: HashMap<&str, Option> = HashMap::new(); loop { let cmd = command_channel.recv().unwrap(); @@ -50,6 +50,7 @@ fn run_i2c(mut dev: I2cDeviceDdc, command_channel: Receiver) { continue; } } + event!(Level::INFO, monitor_index = idx, ?cmd, "Sending DDC comand"); match cmd { MonitorCommand::Brightness(b) => dev.set_vcp_feature(cmd.vcp(), b.into()).unwrap(), // Hack - add 15 to align with DELL monitors @@ -60,134 +61,29 @@ fn run_i2c(mut dev: I2cDeviceDdc, command_channel: Receiver) { } pub fn main() -> StdError<()> { + event!(Level::INFO, "Starting"); let displays = I2cDeviceEnumerator::new().unwrap().collect::>(); - println!("Enumerated {} displays", displays.len()); + event!(Level::INFO, "Enumerated {} displays", displays.len()); let txes: Vec<_> = displays .into_iter() - .map(|d| { + .enumerate() + .map(|(idx, d)| { let (tx, rx) = channel(); - thread::spawn(move || run_i2c(d, rx)); + thread::spawn(move || run_i2c(idx, d, rx)); tx }) .collect(); - run_mqtt(&txes)?; - Ok(()) -} - -fn run_mqtt(txes: &Vec>) -> StdError<()> { - let client_id = env::var("MQTT_CLIENT_ID").unwrap_or("ddcmqtt".into()); - let mqtt_host = env::var("MQTT_HOST").unwrap_or("localhost".into()); - let mqtt_port = env::var("MQTT_PORT") - .unwrap_or("1883".into()) - .parse::()?; - let mqtt_user = env::var("MQTT_USER").ok(); - let mqtt_pass = env::var("MQTT_PASS").ok(); - let mut mqttoptions = MqttOptions::new(client_id, mqtt_host.clone(), mqtt_port); - // Set credentials if we have user and pass specified. - // TODO: warn if only one maybe? - if let (Some(u), Some(p)) = (mqtt_user, mqtt_pass) { - event!(Level::INFO, user = u, "Using MQTT user/pass from env"); - mqttoptions.set_credentials(u, p); - } - mqttoptions.set_keep_alive(Duration::from_secs(5)); - - let (mut client, mut connection) = Client::new(mqttoptions, 10); - client.subscribe("ddcmqtt/#", QoS::AtMostOnce)?; - - event!(Level::INFO, mqtt_host, mqtt_port, "Running MQTT client"); - - for notification in connection.iter() { - event!(Level::INFO, ?notification, "Got notification"); - if let Ok(Event::Incoming(Packet::Publish(p))) = notification { - let topic: Vec<_> = p.topic.split("/").collect(); - if topic.len() != 3 { - event!( - Level::WARN, - ?topic, - "Publish topic has wrong format, ignoring." - ); - continue; - } - if topic[0] != "ddcmqtt" { - event!( - Level::ERROR, - ?topic, - "Got publish that we didn't subscribe to!" - ); - continue; - } - let mon_idx = topic[1].parse::(); - match (mon_idx, topic[2]) { - (Ok(idx), "input") => { - // TODO: don't crash on these ?s - I'm feeling lazy - let input_id = std::str::from_utf8(&p.payload)?.parse::()?; - handle_cmd( - Command::Monitor((idx.into(), MonitorCommand::Input(input_id))), - &txes, - )?; - } - _ => { - event!(Level::ERROR, ?topic, "Unrecognised or invalid topic"); - continue; - } - } - } + if let Ok(_) = env::var("OSC_MODE") { + osc::run_osc(&txes)?; + } else { + mqtt::run_mqtt(&txes)?; } Ok(()) } -fn run_osc(txes: &Vec>) -> StdError<()> { - let sock = UdpSocket::bind("0.0.0.0:1234")?; - let mut buf = [0u8; rosc::decoder::MTU]; - - loop { - let (size, addr) = sock.recv_from(&mut buf).unwrap(); - println!("Got {} bytes from {}", size, addr); - let (_, pack) = rosc::decoder::decode_udp(&buf[..size]).unwrap(); - match pack { - rosc::OscPacket::Message(msg) => { - match osc_message_to_command(msg) { - Ok(cmd) => { - if let Err(e) = handle_cmd(cmd, &txes) { - println!("Error handling command: {:?}", e); - } - } - Err(e) => println!("Unrecognised OSC command: {:?}", e), - }; - } - rosc::OscPacket::Bundle(bundle) => { - println!("OSC Bundle: {:?}", bundle); - } - } - } -} - fn handle_cmd(cmd: Command, txes: &Vec>) -> StdError<()> { match cmd { Command::Monitor((idx, c)) => txes.get(idx).ok_or("Bad monitor index")?.send(c)?, }; Ok(()) } - -fn osc_message_to_command(msg: rosc::OscMessage) -> StdError { - println!("OSC: {}, args: {:?}", msg.addr, msg.args); - let splitaddr: Vec<_> = msg.addr.split("/").collect(); - match &splitaddr[1..] { - ["monitor", idx, control] => { - println!("Monitor {}, control {}, args {:?}", idx, control, msg.args); - let command = match *control { - "brightness" => Some(MonitorCommand::Brightness( - (msg.args[0].clone().float().unwrap() * 100.0) as u8, - )), - "input" => Some(MonitorCommand::Input( - (msg.args[0].clone().int().unwrap()) as u8, - )), - _ => None, - } - .ok_or(format!("Unrecognised monitor control: {}", *control))?; - let idx = idx.parse::().or(Err("Bad monitor index"))?; - Ok(Command::Monitor((idx, command))) - } - _ => Err("Unsupported osc address, ignoring".into()), - } -} diff --git a/src/mqtt.rs b/src/mqtt.rs new file mode 100644 index 0000000..74c31c3 --- /dev/null +++ b/src/mqtt.rs @@ -0,0 +1,69 @@ +use crate::{handle_cmd, Command, MonitorCommand, StdError}; +use rumqttc::mqttbytes::v4::Packet; +use rumqttc::{Client, Event, MqttOptions, QoS}; +use std::env; +use std::sync::mpsc::Sender; +use std::time::Duration; +use tracing::{event, Level}; + +pub fn run_mqtt(txes: &Vec>) -> StdError<()> { + let client_id = env::var("MQTT_CLIENT_ID").unwrap_or("ddcmqtt".into()); + let mqtt_host = env::var("MQTT_HOST").unwrap_or("localhost".into()); + let mqtt_port = env::var("MQTT_PORT") + .unwrap_or("1883".into()) + .parse::()?; + let mqtt_user = env::var("MQTT_USER").ok(); + let mqtt_pass = env::var("MQTT_PASS").ok(); + let mut mqttoptions = MqttOptions::new(client_id, mqtt_host.clone(), mqtt_port); + // Set credentials if we have user and pass specified. + // TODO: warn if only one maybe? + if let (Some(u), Some(p)) = (mqtt_user, mqtt_pass) { + event!(Level::INFO, user = u, "Using MQTT user/pass from env"); + mqttoptions.set_credentials(u, p); + } + mqttoptions.set_keep_alive(Duration::from_secs(5)); + + let (mut client, mut connection) = Client::new(mqttoptions, 10); + client.subscribe("ddcmqtt/#", QoS::AtMostOnce)?; + + event!(Level::INFO, mqtt_host, mqtt_port, "Running MQTT client"); + + for notification in connection.iter() { + event!(Level::INFO, ?notification, "Got notification"); + if let Ok(Event::Incoming(Packet::Publish(p))) = notification { + let topic: Vec<_> = p.topic.split("/").collect(); + if topic.len() != 3 { + event!( + Level::WARN, + ?topic, + "Publish topic has wrong format, ignoring." + ); + continue; + } + if topic[0] != "ddcmqtt" { + event!( + Level::ERROR, + ?topic, + "Got publish that we didn't subscribe to!" + ); + continue; + } + let mon_idx = topic[1].parse::(); + match (mon_idx, topic[2]) { + (Ok(idx), "input") => { + // TODO: don't crash on these ?s - I'm feeling lazy + let input_id = std::str::from_utf8(&p.payload)?.parse::()?; + handle_cmd( + Command::Monitor((idx.into(), MonitorCommand::Input(input_id))), + &txes, + )?; + } + _ => { + event!(Level::ERROR, ?topic, "Unrecognised or invalid topic"); + continue; + } + } + } + } + Ok(()) +} diff --git a/src/osc.rs b/src/osc.rs new file mode 100644 index 0000000..fb5a6d6 --- /dev/null +++ b/src/osc.rs @@ -0,0 +1,53 @@ +use crate::{handle_cmd, Command, MonitorCommand, StdError}; +use rosc; +use std::net::UdpSocket; +use std::sync::mpsc::Sender; + +pub fn run_osc(txes: &Vec>) -> StdError<()> { + let sock = UdpSocket::bind("0.0.0.0:1234")?; + let mut buf = [0u8; rosc::decoder::MTU]; + + loop { + let (size, addr) = sock.recv_from(&mut buf).unwrap(); + println!("Got {} bytes from {}", size, addr); + let (_, pack) = rosc::decoder::decode_udp(&buf[..size]).unwrap(); + match pack { + rosc::OscPacket::Message(msg) => { + match osc_message_to_command(msg) { + Ok(cmd) => { + if let Err(e) = handle_cmd(cmd, &txes) { + println!("Error handling command: {:?}", e); + } + } + Err(e) => println!("Unrecognised OSC command: {:?}", e), + }; + } + rosc::OscPacket::Bundle(bundle) => { + println!("OSC Bundle: {:?}", bundle); + } + } + } +} + +fn osc_message_to_command(msg: rosc::OscMessage) -> StdError { + println!("OSC: {}, args: {:?}", msg.addr, msg.args); + let splitaddr: Vec<_> = msg.addr.split("/").collect(); + match &splitaddr[1..] { + ["monitor", idx, control] => { + println!("Monitor {}, control {}, args {:?}", idx, control, msg.args); + let command = match *control { + "brightness" => Some(MonitorCommand::Brightness( + (msg.args[0].clone().float().unwrap() * 100.0) as u8, + )), + "input" => Some(MonitorCommand::Input( + (msg.args[0].clone().int().unwrap()) as u8, + )), + _ => None, + } + .ok_or(format!("Unrecognised monitor control: {}", *control))?; + let idx = idx.parse::().or(Err("Bad monitor index"))?; + Ok(Command::Monitor((idx, command))) + } + _ => Err("Unsupported osc address, ignoring".into()), + } +}