Separate mqtt and osc, fix cargo check warns
This commit is contained in:
parent
c5d58a4ff5
commit
1fccc92bce
136
src/lib.rs
136
src/lib.rs
|
@ -1,24 +1,24 @@
|
||||||
use ddc::Ddc;
|
use ddc::Ddc;
|
||||||
use ddc_i2c::{I2cDeviceDdc, I2cDeviceEnumerator};
|
use ddc_i2c::{I2cDeviceDdc, I2cDeviceEnumerator};
|
||||||
use rosc;
|
|
||||||
use rumqttc::mqttbytes::v4::Packet;
|
|
||||||
use rumqttc::{Client, Event, MqttOptions, QoS};
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::env;
|
use std::env;
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use std::net::UdpSocket;
|
|
||||||
use std::sync::mpsc::{channel, Receiver, Sender};
|
use std::sync::mpsc::{channel, Receiver, Sender};
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
use tracing::{event, Level};
|
use tracing::{event, Level};
|
||||||
|
|
||||||
|
mod mqtt;
|
||||||
|
mod osc;
|
||||||
|
|
||||||
pub type StdError<T> = Result<T, Box<dyn Error>>;
|
pub type StdError<T> = Result<T, Box<dyn Error>>;
|
||||||
|
|
||||||
enum Command {
|
enum Command {
|
||||||
Monitor((usize, MonitorCommand)),
|
Monitor((usize, MonitorCommand)),
|
||||||
}
|
}
|
||||||
|
|
||||||
enum MonitorCommand {
|
#[derive(Debug)]
|
||||||
|
pub enum MonitorCommand {
|
||||||
Brightness(u8),
|
Brightness(u8),
|
||||||
Input(u8),
|
Input(u8),
|
||||||
}
|
}
|
||||||
|
@ -40,7 +40,7 @@ impl MonitorCommand {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run an i2c command handler for a specific i2c device, rate limiting each type of command
|
// Run an i2c command handler for a specific i2c device, rate limiting each type of command
|
||||||
fn run_i2c(mut dev: I2cDeviceDdc, command_channel: Receiver<MonitorCommand>) {
|
fn run_i2c(idx: usize, mut dev: I2cDeviceDdc, command_channel: Receiver<MonitorCommand>) {
|
||||||
let mut last_sent_command: HashMap<&str, Option<Instant>> = HashMap::new();
|
let mut last_sent_command: HashMap<&str, Option<Instant>> = HashMap::new();
|
||||||
loop {
|
loop {
|
||||||
let cmd = command_channel.recv().unwrap();
|
let cmd = command_channel.recv().unwrap();
|
||||||
|
@ -50,6 +50,7 @@ fn run_i2c(mut dev: I2cDeviceDdc, command_channel: Receiver<MonitorCommand>) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
event!(Level::INFO, monitor_index = idx, ?cmd, "Sending DDC comand");
|
||||||
match cmd {
|
match cmd {
|
||||||
MonitorCommand::Brightness(b) => dev.set_vcp_feature(cmd.vcp(), b.into()).unwrap(),
|
MonitorCommand::Brightness(b) => dev.set_vcp_feature(cmd.vcp(), b.into()).unwrap(),
|
||||||
// Hack - add 15 to align with DELL monitors
|
// Hack - add 15 to align with DELL monitors
|
||||||
|
@ -60,134 +61,29 @@ fn run_i2c(mut dev: I2cDeviceDdc, command_channel: Receiver<MonitorCommand>) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn main() -> StdError<()> {
|
pub fn main() -> StdError<()> {
|
||||||
|
event!(Level::INFO, "Starting");
|
||||||
let displays = I2cDeviceEnumerator::new().unwrap().collect::<Vec<_>>();
|
let displays = I2cDeviceEnumerator::new().unwrap().collect::<Vec<_>>();
|
||||||
println!("Enumerated {} displays", displays.len());
|
event!(Level::INFO, "Enumerated {} displays", displays.len());
|
||||||
let txes: Vec<_> = displays
|
let txes: Vec<_> = displays
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|d| {
|
.enumerate()
|
||||||
|
.map(|(idx, d)| {
|
||||||
let (tx, rx) = channel();
|
let (tx, rx) = channel();
|
||||||
thread::spawn(move || run_i2c(d, rx));
|
thread::spawn(move || run_i2c(idx, d, rx));
|
||||||
tx
|
tx
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
run_mqtt(&txes)?;
|
if let Ok(_) = env::var("OSC_MODE") {
|
||||||
Ok(())
|
osc::run_osc(&txes)?;
|
||||||
}
|
} else {
|
||||||
|
mqtt::run_mqtt(&txes)?;
|
||||||
fn run_mqtt(txes: &Vec<Sender<MonitorCommand>>) -> StdError<()> {
|
|
||||||
let client_id = env::var("MQTT_CLIENT_ID").unwrap_or("ddcmqtt".into());
|
|
||||||
let mqtt_host = env::var("MQTT_HOST").unwrap_or("localhost".into());
|
|
||||||
let mqtt_port = env::var("MQTT_PORT")
|
|
||||||
.unwrap_or("1883".into())
|
|
||||||
.parse::<u16>()?;
|
|
||||||
let mqtt_user = env::var("MQTT_USER").ok();
|
|
||||||
let mqtt_pass = env::var("MQTT_PASS").ok();
|
|
||||||
let mut mqttoptions = MqttOptions::new(client_id, mqtt_host.clone(), mqtt_port);
|
|
||||||
// Set credentials if we have user and pass specified.
|
|
||||||
// TODO: warn if only one maybe?
|
|
||||||
if let (Some(u), Some(p)) = (mqtt_user, mqtt_pass) {
|
|
||||||
event!(Level::INFO, user = u, "Using MQTT user/pass from env");
|
|
||||||
mqttoptions.set_credentials(u, p);
|
|
||||||
}
|
|
||||||
mqttoptions.set_keep_alive(Duration::from_secs(5));
|
|
||||||
|
|
||||||
let (mut client, mut connection) = Client::new(mqttoptions, 10);
|
|
||||||
client.subscribe("ddcmqtt/#", QoS::AtMostOnce)?;
|
|
||||||
|
|
||||||
event!(Level::INFO, mqtt_host, mqtt_port, "Running MQTT client");
|
|
||||||
|
|
||||||
for notification in connection.iter() {
|
|
||||||
event!(Level::INFO, ?notification, "Got notification");
|
|
||||||
if let Ok(Event::Incoming(Packet::Publish(p))) = notification {
|
|
||||||
let topic: Vec<_> = p.topic.split("/").collect();
|
|
||||||
if topic.len() != 3 {
|
|
||||||
event!(
|
|
||||||
Level::WARN,
|
|
||||||
?topic,
|
|
||||||
"Publish topic has wrong format, ignoring."
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if topic[0] != "ddcmqtt" {
|
|
||||||
event!(
|
|
||||||
Level::ERROR,
|
|
||||||
?topic,
|
|
||||||
"Got publish that we didn't subscribe to!"
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
let mon_idx = topic[1].parse::<u8>();
|
|
||||||
match (mon_idx, topic[2]) {
|
|
||||||
(Ok(idx), "input") => {
|
|
||||||
// TODO: don't crash on these ?s - I'm feeling lazy
|
|
||||||
let input_id = std::str::from_utf8(&p.payload)?.parse::<u8>()?;
|
|
||||||
handle_cmd(
|
|
||||||
Command::Monitor((idx.into(), MonitorCommand::Input(input_id))),
|
|
||||||
&txes,
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
event!(Level::ERROR, ?topic, "Unrecognised or invalid topic");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn run_osc(txes: &Vec<Sender<MonitorCommand>>) -> StdError<()> {
|
|
||||||
let sock = UdpSocket::bind("0.0.0.0:1234")?;
|
|
||||||
let mut buf = [0u8; rosc::decoder::MTU];
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let (size, addr) = sock.recv_from(&mut buf).unwrap();
|
|
||||||
println!("Got {} bytes from {}", size, addr);
|
|
||||||
let (_, pack) = rosc::decoder::decode_udp(&buf[..size]).unwrap();
|
|
||||||
match pack {
|
|
||||||
rosc::OscPacket::Message(msg) => {
|
|
||||||
match osc_message_to_command(msg) {
|
|
||||||
Ok(cmd) => {
|
|
||||||
if let Err(e) = handle_cmd(cmd, &txes) {
|
|
||||||
println!("Error handling command: {:?}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => println!("Unrecognised OSC command: {:?}", e),
|
|
||||||
};
|
|
||||||
}
|
|
||||||
rosc::OscPacket::Bundle(bundle) => {
|
|
||||||
println!("OSC Bundle: {:?}", bundle);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn handle_cmd(cmd: Command, txes: &Vec<Sender<MonitorCommand>>) -> StdError<()> {
|
fn handle_cmd(cmd: Command, txes: &Vec<Sender<MonitorCommand>>) -> StdError<()> {
|
||||||
match cmd {
|
match cmd {
|
||||||
Command::Monitor((idx, c)) => txes.get(idx).ok_or("Bad monitor index")?.send(c)?,
|
Command::Monitor((idx, c)) => txes.get(idx).ok_or("Bad monitor index")?.send(c)?,
|
||||||
};
|
};
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn osc_message_to_command(msg: rosc::OscMessage) -> StdError<Command> {
|
|
||||||
println!("OSC: {}, args: {:?}", msg.addr, msg.args);
|
|
||||||
let splitaddr: Vec<_> = msg.addr.split("/").collect();
|
|
||||||
match &splitaddr[1..] {
|
|
||||||
["monitor", idx, control] => {
|
|
||||||
println!("Monitor {}, control {}, args {:?}", idx, control, msg.args);
|
|
||||||
let command = match *control {
|
|
||||||
"brightness" => Some(MonitorCommand::Brightness(
|
|
||||||
(msg.args[0].clone().float().unwrap() * 100.0) as u8,
|
|
||||||
)),
|
|
||||||
"input" => Some(MonitorCommand::Input(
|
|
||||||
(msg.args[0].clone().int().unwrap()) as u8,
|
|
||||||
)),
|
|
||||||
_ => None,
|
|
||||||
}
|
|
||||||
.ok_or(format!("Unrecognised monitor control: {}", *control))?;
|
|
||||||
let idx = idx.parse::<usize>().or(Err("Bad monitor index"))?;
|
|
||||||
Ok(Command::Monitor((idx, command)))
|
|
||||||
}
|
|
||||||
_ => Err("Unsupported osc address, ignoring".into()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -0,0 +1,69 @@
|
||||||
|
use crate::{handle_cmd, Command, MonitorCommand, StdError};
|
||||||
|
use rumqttc::mqttbytes::v4::Packet;
|
||||||
|
use rumqttc::{Client, Event, MqttOptions, QoS};
|
||||||
|
use std::env;
|
||||||
|
use std::sync::mpsc::Sender;
|
||||||
|
use std::time::Duration;
|
||||||
|
use tracing::{event, Level};
|
||||||
|
|
||||||
|
pub fn run_mqtt(txes: &Vec<Sender<MonitorCommand>>) -> StdError<()> {
|
||||||
|
let client_id = env::var("MQTT_CLIENT_ID").unwrap_or("ddcmqtt".into());
|
||||||
|
let mqtt_host = env::var("MQTT_HOST").unwrap_or("localhost".into());
|
||||||
|
let mqtt_port = env::var("MQTT_PORT")
|
||||||
|
.unwrap_or("1883".into())
|
||||||
|
.parse::<u16>()?;
|
||||||
|
let mqtt_user = env::var("MQTT_USER").ok();
|
||||||
|
let mqtt_pass = env::var("MQTT_PASS").ok();
|
||||||
|
let mut mqttoptions = MqttOptions::new(client_id, mqtt_host.clone(), mqtt_port);
|
||||||
|
// Set credentials if we have user and pass specified.
|
||||||
|
// TODO: warn if only one maybe?
|
||||||
|
if let (Some(u), Some(p)) = (mqtt_user, mqtt_pass) {
|
||||||
|
event!(Level::INFO, user = u, "Using MQTT user/pass from env");
|
||||||
|
mqttoptions.set_credentials(u, p);
|
||||||
|
}
|
||||||
|
mqttoptions.set_keep_alive(Duration::from_secs(5));
|
||||||
|
|
||||||
|
let (mut client, mut connection) = Client::new(mqttoptions, 10);
|
||||||
|
client.subscribe("ddcmqtt/#", QoS::AtMostOnce)?;
|
||||||
|
|
||||||
|
event!(Level::INFO, mqtt_host, mqtt_port, "Running MQTT client");
|
||||||
|
|
||||||
|
for notification in connection.iter() {
|
||||||
|
event!(Level::INFO, ?notification, "Got notification");
|
||||||
|
if let Ok(Event::Incoming(Packet::Publish(p))) = notification {
|
||||||
|
let topic: Vec<_> = p.topic.split("/").collect();
|
||||||
|
if topic.len() != 3 {
|
||||||
|
event!(
|
||||||
|
Level::WARN,
|
||||||
|
?topic,
|
||||||
|
"Publish topic has wrong format, ignoring."
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if topic[0] != "ddcmqtt" {
|
||||||
|
event!(
|
||||||
|
Level::ERROR,
|
||||||
|
?topic,
|
||||||
|
"Got publish that we didn't subscribe to!"
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let mon_idx = topic[1].parse::<u8>();
|
||||||
|
match (mon_idx, topic[2]) {
|
||||||
|
(Ok(idx), "input") => {
|
||||||
|
// TODO: don't crash on these ?s - I'm feeling lazy
|
||||||
|
let input_id = std::str::from_utf8(&p.payload)?.parse::<u8>()?;
|
||||||
|
handle_cmd(
|
||||||
|
Command::Monitor((idx.into(), MonitorCommand::Input(input_id))),
|
||||||
|
&txes,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
event!(Level::ERROR, ?topic, "Unrecognised or invalid topic");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
|
@ -0,0 +1,53 @@
|
||||||
|
use crate::{handle_cmd, Command, MonitorCommand, StdError};
|
||||||
|
use rosc;
|
||||||
|
use std::net::UdpSocket;
|
||||||
|
use std::sync::mpsc::Sender;
|
||||||
|
|
||||||
|
pub fn run_osc(txes: &Vec<Sender<MonitorCommand>>) -> StdError<()> {
|
||||||
|
let sock = UdpSocket::bind("0.0.0.0:1234")?;
|
||||||
|
let mut buf = [0u8; rosc::decoder::MTU];
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let (size, addr) = sock.recv_from(&mut buf).unwrap();
|
||||||
|
println!("Got {} bytes from {}", size, addr);
|
||||||
|
let (_, pack) = rosc::decoder::decode_udp(&buf[..size]).unwrap();
|
||||||
|
match pack {
|
||||||
|
rosc::OscPacket::Message(msg) => {
|
||||||
|
match osc_message_to_command(msg) {
|
||||||
|
Ok(cmd) => {
|
||||||
|
if let Err(e) = handle_cmd(cmd, &txes) {
|
||||||
|
println!("Error handling command: {:?}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => println!("Unrecognised OSC command: {:?}", e),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
rosc::OscPacket::Bundle(bundle) => {
|
||||||
|
println!("OSC Bundle: {:?}", bundle);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn osc_message_to_command(msg: rosc::OscMessage) -> StdError<Command> {
|
||||||
|
println!("OSC: {}, args: {:?}", msg.addr, msg.args);
|
||||||
|
let splitaddr: Vec<_> = msg.addr.split("/").collect();
|
||||||
|
match &splitaddr[1..] {
|
||||||
|
["monitor", idx, control] => {
|
||||||
|
println!("Monitor {}, control {}, args {:?}", idx, control, msg.args);
|
||||||
|
let command = match *control {
|
||||||
|
"brightness" => Some(MonitorCommand::Brightness(
|
||||||
|
(msg.args[0].clone().float().unwrap() * 100.0) as u8,
|
||||||
|
)),
|
||||||
|
"input" => Some(MonitorCommand::Input(
|
||||||
|
(msg.args[0].clone().int().unwrap()) as u8,
|
||||||
|
)),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
.ok_or(format!("Unrecognised monitor control: {}", *control))?;
|
||||||
|
let idx = idx.parse::<usize>().or(Err("Bad monitor index"))?;
|
||||||
|
Ok(Command::Monitor((idx, command)))
|
||||||
|
}
|
||||||
|
_ => Err("Unsupported osc address, ignoring".into()),
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue