83 lines
3.3 KiB
Rust
83 lines
3.3 KiB
Rust
use crate::{handle_cmd, Command, MonitorBrightnessCmd, MonitorCommand, StdError};
|
|
use rumqttc::mqttbytes::v4::Packet;
|
|
use rumqttc::{Client, Event, MqttOptions, QoS};
|
|
use std::env;
|
|
use std::sync::mpsc::Sender;
|
|
use std::time::Duration;
|
|
use tracing::{event, Level};
|
|
|
|
pub fn run_mqtt(txes: &Vec<Sender<MonitorCommand>>) -> StdError<()> {
|
|
let client_id = env::var("MQTT_CLIENT_ID").unwrap_or("ddcmqtt".into());
|
|
let topic_prefix = env::var("MQTT_TOPIC_PREFIX").unwrap_or("ddcmqtt".into());
|
|
let mqtt_host = env::var("MQTT_HOST").unwrap_or("localhost".into());
|
|
let mqtt_port = env::var("MQTT_PORT")
|
|
.unwrap_or("1883".into())
|
|
.parse::<u16>()?;
|
|
let mqtt_user = env::var("MQTT_USER").ok();
|
|
let mqtt_pass = env::var("MQTT_PASS").ok();
|
|
let mut mqttoptions = MqttOptions::new(client_id, mqtt_host.clone(), mqtt_port);
|
|
// Set credentials if we have user and pass specified.
|
|
// TODO: warn if only one maybe?
|
|
if let (Some(u), Some(p)) = (mqtt_user, mqtt_pass) {
|
|
event!(Level::INFO, user = u, "Using MQTT user/pass from env");
|
|
mqttoptions.set_credentials(u, p);
|
|
}
|
|
mqttoptions.set_keep_alive(Duration::from_secs(5));
|
|
|
|
let (mut client, mut connection) = Client::new(mqttoptions, 10);
|
|
client.subscribe([&topic_prefix, "#"].join("/"), QoS::AtMostOnce)?;
|
|
|
|
event!(Level::INFO, mqtt_host, mqtt_port, "Running MQTT client");
|
|
|
|
for notification in connection.iter() {
|
|
event!(Level::INFO, ?notification, "Got notification");
|
|
if let Ok(Event::Incoming(Packet::Publish(p))) = notification {
|
|
let topic: Vec<_> = p.topic.split("/").collect();
|
|
if topic.len() != 3 {
|
|
event!(
|
|
Level::WARN,
|
|
?topic,
|
|
"Publish topic has wrong format, ignoring."
|
|
);
|
|
continue;
|
|
}
|
|
if topic[0] != topic_prefix {
|
|
event!(
|
|
Level::ERROR,
|
|
?topic,
|
|
"Got publish that we didn't subscribe to!"
|
|
);
|
|
continue;
|
|
}
|
|
let mon_idx = topic[1].parse::<u8>();
|
|
match (mon_idx, topic[2]) {
|
|
(Ok(idx), "input") => {
|
|
// TODO: don't crash on these ?s - I'm feeling lazy
|
|
let input_id = std::str::from_utf8(&p.payload)?.parse::<u8>()?;
|
|
handle_cmd(
|
|
Command::Monitor((idx.into(), MonitorCommand::Input(input_id))),
|
|
&txes,
|
|
)?;
|
|
}
|
|
(Ok(idx), "brightness") => {
|
|
let brightness = std::str::from_utf8(&p.payload)?;
|
|
let cmd = if brightness.starts_with('+') | brightness.starts_with('-') {
|
|
MonitorBrightnessCmd::Relative(brightness.parse::<i8>()?)
|
|
} else {
|
|
MonitorBrightnessCmd::Absolute(brightness.parse::<u8>()?)
|
|
};
|
|
handle_cmd(
|
|
Command::Monitor((idx.into(), MonitorCommand::Brightness(cmd))),
|
|
&txes,
|
|
)?;
|
|
}
|
|
_ => {
|
|
event!(Level::ERROR, ?topic, "Unrecognised or invalid topic");
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|