Add logging, graceful shutdown, webserver with websockets

This commit is contained in:
Sam W 2019-02-18 00:29:05 +00:00
parent 50cf832c3f
commit fdf781ff02
7 changed files with 1806 additions and 31 deletions

1621
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -5,4 +5,12 @@ authors = ["Sam Willcocks <sam@wlcx.cc>"]
edition = "2018"
[dependencies]
rand = "0.6.5"
rand = "0.6.5"
typed-html = "0.1.1"
warp = "0.1.13"
futures = "0.1.25"
log = "0.4.6"
simplelog = "0.5.3"
crossbeam-channel = "0.3.8"
ctrlc = "3.1.1"
tokio = "0.1.15"

View File

@ -1,49 +1,60 @@
use crate::{TallyState, Channel};
use std::sync::mpsc;
use std::sync::mpsc::{Sender, Receiver};
use std::thread;
use std::time::Duration;
use crate::common::Waitable;
use crate::{Channel, TallyState};
use crossbeam_channel::{Receiver, Sender};
use rand::prelude::*;
use std::thread;
use std::thread::JoinHandle;
use std::time::Duration;
/// An Adaptor interfaces to a system providing tally information, e.g. a BlackMagic ATEM
pub trait Adaptor {
pub trait Adaptor: Waitable {
/// Run the adaptor, returning a channel receiver which receives TallyStates
fn run(&mut self) -> Receiver<TallyState>;
}
pub struct FakeAdaptor {
numchannels: u8,
thread: Option<thread::JoinHandle<()>>
thread: Option<thread::JoinHandle<()>>,
}
impl FakeAdaptor {
pub fn new(numchannels: u8) -> FakeAdaptor {
return FakeAdaptor{
return FakeAdaptor {
numchannels: numchannels,
thread: None
}
thread: None,
};
}
fn _run(numchannels: u8, sender: Sender<TallyState>) {
println!("Hello, I have {} channels", numchannels);
loop {
let state = TallyState{
let state = TallyState {
on_air: Channel(Some(rand::thread_rng().gen_range(0, numchannels))),
preview: Channel(Some(rand::thread_rng().gen_range(0, numchannels))),
};
sender.send(state).unwrap();
thread::sleep(Duration::new(rand::thread_rng().gen_range(0,5), 0));
if let Err(_) = sender.send(state) {
log::info!("Fakeadaptor shutting down!");
break;
};
thread::sleep(Duration::new(rand::thread_rng().gen_range(0, 5), 0));
}
}
}
impl Waitable for FakeAdaptor {
fn get_joinhandle(self) -> Option<JoinHandle<()>> {
self.thread
}
}
impl Adaptor for FakeAdaptor {
fn run(&mut self) -> Receiver<TallyState> {
println!("arse");
let (tx, rx): (Sender<TallyState>, Receiver<TallyState>) = mpsc::channel();
let (tx, rx) = crossbeam_channel::unbounded::<TallyState>();
let thread_numchannels = self.numchannels.clone();
self.thread = Some(thread::spawn(move || {
FakeAdaptor::_run(thread_numchannels, tx);
}));
return rx;
}
}
}

13
src/common.rs Normal file
View File

@ -0,0 +1,13 @@
use std::thread;
pub trait Waitable: Sized {
fn get_joinhandle(self) -> Option<thread::JoinHandle<()>>;
fn wait(self) {
if let Some(handle) = self.get_joinhandle() {
handle.join().expect("Couldn't join on thread!");
} else {
log::warn!("Tried to wait on non-existent thread!");
}
}
}

View File

@ -1,22 +1,27 @@
use crate::common::Waitable;
use crate::TallyState;
use std::sync::mpsc;
use std::sync::mpsc::{Sender, Receiver};
use crossbeam_channel::{Receiver, Sender};
use std::thread;
/// A Driver displays the Tallies on a physical tally system.
pub trait Driver {
pub trait Driver: Waitable {
/// Run the driver, returning a channel sender which takes TallyStates
fn run(&mut self) -> Sender<TallyState>;
}
pub struct FakeDriver {
thread: Option<thread::JoinHandle<()>>
thread: Option<thread::JoinHandle<()>>,
}
impl Waitable for FakeDriver {
fn get_joinhandle(self) -> Option<thread::JoinHandle<()>> {
self.thread
}
}
impl Driver for FakeDriver {
fn run(&mut self) -> Sender<TallyState> {
let (tx, rx): (Sender<TallyState>, Receiver<TallyState>) = mpsc::channel();
let (tx, rx) = crossbeam_channel::bounded::<TallyState>(1);
self.thread = Some(thread::spawn(move || {
FakeDriver::_run(rx);
}));
@ -26,15 +31,18 @@ impl Driver for FakeDriver {
impl FakeDriver {
pub fn new() -> FakeDriver {
FakeDriver{
thread: None
}
FakeDriver { thread: None }
}
fn _run(rx: Receiver<TallyState>) {
loop {
let state = rx.recv().unwrap();
println!("{}", state);
if let Ok(state) = rx.recv() {
println!("{}", state);
} else {
// channel has been disconnected, shutdown.
log::info!("Fakedriver shutting down...");
return;
}
}
}
}
}

View File

@ -1,8 +1,13 @@
mod adaptors;
mod common;
mod drivers;
mod web;
use crate::adaptors::{Adaptor, FakeAdaptor};
use crate::drivers::{Driver, FakeDriver};
use crossbeam_channel::{select, Receiver};
use simplelog::{Config, LevelFilter, TermLogger};
use std::fmt;
use web::WebServer;
#[derive(Clone)]
pub struct Channel(Option<u8>);
@ -34,20 +39,51 @@ struct Server<T: Adaptor, U: Driver> {
}
impl<T: Adaptor, U: Driver> Server<T, U> {
fn run(&mut self) {
fn run(mut self, signal_rx: Receiver<()>) {
let rx = self.adaptor.run();
let tx = self.driver.run();
let webaddr = ([127, 0, 0, 1], 3000).into();
log::info!("Webserver starting on: {}", webaddr);
let web = WebServer::run(webaddr);
loop {
tx.send(rx.recv().unwrap()).unwrap();
select! {
recv(signal_rx) -> _ => {
log::info!("Signal received, quitting!");
// signal webserver to shutdown and wait for exit..
web.shutdown();
// Drop the driver channel, and wait for it to shutdown...
drop(tx);
self.driver.wait();
// ...and do the same for the adaptor.
drop(rx);
self.adaptor.wait();
// We're done, break loop and return.
break;
},
recv(rx) -> state => {
tx.send(state.unwrap()).unwrap();
}
}
}
}
}
fn main() {
let mut s = Server {
TermLogger::init(LevelFilter::Debug, Config::default()).unwrap();
let (signal_tx, signal_rx) = crossbeam_channel::bounded::<()>(1);
ctrlc::set_handler(move || {
log::info!("Received interrupt. Shutting down...");
signal_tx.send(()).expect("Error sending to signal channel");
})
.expect("Error setting signal handler");
let s = Server {
adaptor: FakeAdaptor::new(8),
driver: FakeDriver::new(),
};
s.run();
s.run(signal_rx);
}

78
src/web.rs Normal file
View File

@ -0,0 +1,78 @@
use futures::stream;
use futures::sync::oneshot::{channel, Sender};
use std::sync::{Arc, Mutex};
use std::thread;
use typed_html::dom::DOMTree;
use typed_html::html;
use warp::filters::ws::Message;
use warp::{Filter, Future, Stream};
fn build_template() -> DOMTree<String> {
html!(
<html>
<head>
<title>"OpenTally Control"</title>
<script src="/script.js" type="text/javascript"></script>
</head>
<body>
<h1>"OpenTally Control"</h1>
</body>
</html>
)
}
pub struct WebServer {
shutdown_chan: Sender<()>,
shutdown_mutex: Arc<Mutex<()>>,
}
impl WebServer {
pub fn run(addr: std::net::SocketAddr) -> WebServer {
// todo: not have the damn js inline in the func
const JS: &'static str = r#"
var sock = new WebSocket("ws://" + window.location.host + "/ws");
sock.onmessage = function (e) {
console.log(e);
}
sock.onopen = function (e) {
sock.send("Hello!");
}"#;
let (shutdown_tx, shutdown_rx) = channel::<()>();
let shutdown_mutex = Arc::new(Mutex::new(()));
let thread_shutdown_mutex = Arc::clone(&shutdown_mutex);
thread::spawn(move || {
// N.B! Despite the fact we never use the value inside the mutex (it's () after all..) we need to bind
// it to a variable so that it lives until the end of the thread, else it would unlock immediately.
let _lock = thread_shutdown_mutex.lock().unwrap();
let websocket = warp::path("ws").and(warp::ws2()).map(|ws: warp::ws::Ws2| {
ws.on_upgrade(|websocket| {
let (tx, rx) = websocket.split();
//stream::once::<Message, _>(Ok(Message::text("Hello")))
rx.forward(tx).map(|_| ()).map_err(|e| {
log::error!("Websocket error: {}", e);
})
})
});
let script = warp::path("script.js").map(|| JS);
let home = warp::path::end().map(|| warp::reply::html(build_template().to_string()));
let routes = warp::get2().and(websocket.or(script).or(home));
let (_addr, server) =
warp::serve(routes).bind_with_graceful_shutdown(addr, shutdown_rx);
tokio::run(server);
});
WebServer {
shutdown_chan: shutdown_tx,
shutdown_mutex,
}
}
pub fn shutdown(self) {
log::debug!("Web server shutting down...");
self.shutdown_chan.send(()).unwrap();
let _lock = self.shutdown_mutex.lock().unwrap();
log::info!("Web server shutdown");
}
}