use ddc::Ddc; use ddc_i2c::{I2cDeviceDdc, I2cDeviceEnumerator}; use rosc; use rumqttc::mqttbytes::v4::Packet; use rumqttc::{Client, Event, MqttOptions, QoS}; use std::collections::HashMap; use std::env; use std::error::Error; use std::net::UdpSocket; use std::sync::mpsc::{channel, Receiver, Sender}; use std::thread; use std::time::{Duration, Instant}; use tracing::{event, Level}; pub type StdError = Result>; enum Command { Monitor((usize, MonitorCommand)), } enum MonitorCommand { Brightness(u8), Input(u8), } impl MonitorCommand { fn cmd_str(&self) -> &'static str { match self { Self::Brightness(_) => "brightness", Self::Input(_) => "input", } } fn vcp(&self) -> u8 { match self { Self::Brightness(_) => 0x10, Self::Input(_) => 0x60, } } } // Run an i2c command handler for a specific i2c device, rate limiting each type of command fn run_i2c(mut dev: I2cDeviceDdc, command_channel: Receiver) { let mut last_sent_command: HashMap<&str, Option> = HashMap::new(); loop { let cmd = command_channel.recv().unwrap(); if let Some(last) = last_sent_command.entry(cmd.cmd_str()).or_insert(None) { if (*last + Duration::from_millis(100)) > Instant::now() { // rate limit continue; } } match cmd { MonitorCommand::Brightness(b) => dev.set_vcp_feature(cmd.vcp(), b.into()).unwrap(), // Hack - add 15 to align with DELL monitors MonitorCommand::Input(i) => dev.set_vcp_feature(cmd.vcp(), (i + 15).into()).unwrap(), } last_sent_command.insert(cmd.cmd_str(), Some(Instant::now())); } } pub fn main() -> StdError<()> { let displays = I2cDeviceEnumerator::new().unwrap().collect::>(); println!("Enumerated {} displays", displays.len()); let txes: Vec<_> = displays .into_iter() .map(|d| { let (tx, rx) = channel(); thread::spawn(move || run_i2c(d, rx)); tx }) .collect(); run_mqtt(&txes)?; Ok(()) } fn run_mqtt(txes: &Vec>) -> StdError<()> { let client_id = env::var("MQTT_CLIENT_ID").unwrap_or("ddcmqtt".into()); let mqtt_host = env::var("MQTT_HOST").unwrap_or("localhost".into()); let mqtt_port = env::var("MQTT_PORT") .unwrap_or("1883".into()) .parse::()?; let mqtt_user = env::var("MQTT_USER").ok(); let mqtt_pass = env::var("MQTT_PASS").ok(); let mut mqttoptions = MqttOptions::new(client_id, mqtt_host.clone(), mqtt_port); // Set credentials if we have user and pass specified. // TODO: warn if only one maybe? if let (Some(u), Some(p)) = (mqtt_user, mqtt_pass) { event!(Level::INFO, user = u, "Using MQTT user/pass from env"); mqttoptions.set_credentials(u, p); } mqttoptions.set_keep_alive(Duration::from_secs(5)); let (mut client, mut connection) = Client::new(mqttoptions, 10); client.subscribe("ddcmqtt/#", QoS::AtMostOnce)?; event!(Level::INFO, mqtt_host, mqtt_port, "Running MQTT client"); for notification in connection.iter() { event!(Level::INFO, ?notification, "Got notification"); if let Ok(Event::Incoming(Packet::Publish(p))) = notification { let topic: Vec<_> = p.topic.split("/").collect(); if topic.len() != 3 { event!( Level::WARN, ?topic, "Publish topic has wrong format, ignoring." ); continue; } if topic[0] != "ddcmqtt" { event!( Level::ERROR, ?topic, "Got publish that we didn't subscribe to!" ); continue; } let mon_idx = topic[1].parse::(); match (mon_idx, topic[2]) { (Ok(idx), "input") => { // TODO: don't crash on these ?s - I'm feeling lazy let input_id = std::str::from_utf8(&p.payload)?.parse::()?; handle_cmd( Command::Monitor((idx.into(), MonitorCommand::Input(input_id))), &txes, )?; } _ => { event!(Level::ERROR, ?topic, "Unrecognised or invalid topic"); continue; } } } } Ok(()) } fn run_osc(txes: &Vec>) -> StdError<()> { let sock = UdpSocket::bind("0.0.0.0:1234")?; let mut buf = [0u8; rosc::decoder::MTU]; loop { let (size, addr) = sock.recv_from(&mut buf).unwrap(); println!("Got {} bytes from {}", size, addr); let (_, pack) = rosc::decoder::decode_udp(&buf[..size]).unwrap(); match pack { rosc::OscPacket::Message(msg) => { match osc_message_to_command(msg) { Ok(cmd) => { if let Err(e) = handle_cmd(cmd, &txes) { println!("Error handling command: {:?}", e); } } Err(e) => println!("Unrecognised OSC command: {:?}", e), }; } rosc::OscPacket::Bundle(bundle) => { println!("OSC Bundle: {:?}", bundle); } } } } fn handle_cmd(cmd: Command, txes: &Vec>) -> StdError<()> { match cmd { Command::Monitor((idx, c)) => txes.get(idx).ok_or("Bad monitor index")?.send(c)?, }; Ok(()) } fn osc_message_to_command(msg: rosc::OscMessage) -> StdError { println!("OSC: {}, args: {:?}", msg.addr, msg.args); let splitaddr: Vec<_> = msg.addr.split("/").collect(); match &splitaddr[1..] { ["monitor", idx, control] => { println!("Monitor {}, control {}, args {:?}", idx, control, msg.args); let command = match *control { "brightness" => Some(MonitorCommand::Brightness( (msg.args[0].clone().float().unwrap() * 100.0) as u8, )), "input" => Some(MonitorCommand::Input( (msg.args[0].clone().int().unwrap()) as u8, )), _ => None, } .ok_or(format!("Unrecognised monitor control: {}", *control))?; let idx = idx.parse::().or(Err("Bad monitor index"))?; Ok(Command::Monitor((idx, command))) } _ => Err("Unsupported osc address, ignoring".into()), } }