use crate::{handle_cmd, Command, MonitorCommand, StdError}; use rumqttc::mqttbytes::v4::Packet; use rumqttc::{Client, Event, MqttOptions, QoS}; use std::env; use std::sync::mpsc::Sender; use std::time::Duration; use tracing::{event, Level}; pub fn run_mqtt(txes: &Vec>) -> StdError<()> { let client_id = env::var("MQTT_CLIENT_ID").unwrap_or("ddcmqtt".into()); let mqtt_host = env::var("MQTT_HOST").unwrap_or("localhost".into()); let mqtt_port = env::var("MQTT_PORT") .unwrap_or("1883".into()) .parse::()?; let mqtt_user = env::var("MQTT_USER").ok(); let mqtt_pass = env::var("MQTT_PASS").ok(); let mut mqttoptions = MqttOptions::new(client_id, mqtt_host.clone(), mqtt_port); // Set credentials if we have user and pass specified. // TODO: warn if only one maybe? if let (Some(u), Some(p)) = (mqtt_user, mqtt_pass) { event!(Level::INFO, user = u, "Using MQTT user/pass from env"); mqttoptions.set_credentials(u, p); } mqttoptions.set_keep_alive(Duration::from_secs(5)); let (mut client, mut connection) = Client::new(mqttoptions, 10); client.subscribe("ddcmqtt/#", QoS::AtMostOnce)?; event!(Level::INFO, mqtt_host, mqtt_port, "Running MQTT client"); for notification in connection.iter() { event!(Level::INFO, ?notification, "Got notification"); if let Ok(Event::Incoming(Packet::Publish(p))) = notification { let topic: Vec<_> = p.topic.split("/").collect(); if topic.len() != 3 { event!( Level::WARN, ?topic, "Publish topic has wrong format, ignoring." ); continue; } if topic[0] != "ddcmqtt" { event!( Level::ERROR, ?topic, "Got publish that we didn't subscribe to!" ); continue; } let mon_idx = topic[1].parse::(); match (mon_idx, topic[2]) { (Ok(idx), "input") => { // TODO: don't crash on these ?s - I'm feeling lazy let input_id = std::str::from_utf8(&p.payload)?.parse::()?; handle_cmd( Command::Monitor((idx.into(), MonitorCommand::Input(input_id))), &txes, )?; } _ => { event!(Level::ERROR, ?topic, "Unrecognised or invalid topic"); continue; } } } } Ok(()) }