Tear out warp, use actix instead

This commit is contained in:
Sam W 2019-02-28 15:35:35 +00:00
parent 19479263b8
commit 173d431400
6 changed files with 739 additions and 267 deletions

911
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -7,10 +7,11 @@ edition = "2018"
[dependencies] [dependencies]
rand = "0.6.5" rand = "0.6.5"
typed-html = "0.1.1" typed-html = "0.1.1"
warp = "0.1.13" actix-web = "0.7.18"
actix-net = "0.2.6"
actix = "0.7.4"
futures = "0.1.25" futures = "0.1.25"
log = "0.4.6" log = "0.4.6"
simplelog = "0.5.3" simplelog = "0.5.3"
crossbeam-channel = "0.3.8" crossbeam-channel = "0.3.8"
ctrlc = "3.1.1" ctrlc = "3.1.1"
tokio = "0.1.15"

View File

@ -26,14 +26,13 @@ impl FakeAdaptor {
} }
fn _run(numchannels: u8, sender: Sender<TallyState>) { fn _run(numchannels: u8, sender: Sender<TallyState>) {
println!("Hello, I have {} channels", numchannels);
loop { loop {
let state = TallyState { let state = TallyState {
on_air: Channel(Some(rand::thread_rng().gen_range(0, numchannels))), on_air: Channel(Some(rand::thread_rng().gen_range(0, numchannels))),
preview: Channel(Some(rand::thread_rng().gen_range(0, numchannels))), preview: Channel(Some(rand::thread_rng().gen_range(0, numchannels))),
}; };
if let Err(_) = sender.send(state) { if let Err(_) = sender.send(state) {
log::info!("Fakeadaptor shutting down!"); log::info!("Fakeadaptor shutting down");
break; break;
}; };
thread::sleep(Duration::new(rand::thread_rng().gen_range(0, 5), 0)); thread::sleep(Duration::new(rand::thread_rng().gen_range(0, 5), 0));
@ -49,7 +48,7 @@ impl Waitable for FakeAdaptor {
impl Adaptor for FakeAdaptor { impl Adaptor for FakeAdaptor {
fn run(&mut self) -> Receiver<TallyState> { fn run(&mut self) -> Receiver<TallyState> {
println!("arse"); log::info!("Fakeadaptor starting");
let (tx, rx) = crossbeam_channel::unbounded::<TallyState>(); let (tx, rx) = crossbeam_channel::unbounded::<TallyState>();
let thread_numchannels = self.numchannels.clone(); let thread_numchannels = self.numchannels.clone();
self.thread = Some(thread::spawn(move || { self.thread = Some(thread::spawn(move || {

View File

@ -21,6 +21,7 @@ impl Waitable for FakeDriver {
} }
impl Driver for FakeDriver { impl Driver for FakeDriver {
fn run(&mut self) -> Sender<TallyState> { fn run(&mut self) -> Sender<TallyState> {
log::info!("Fakedriver starting");
let (tx, rx) = crossbeam_channel::bounded::<TallyState>(1); let (tx, rx) = crossbeam_channel::bounded::<TallyState>(1);
self.thread = Some(thread::spawn(move || { self.thread = Some(thread::spawn(move || {
FakeDriver::_run(rx); FakeDriver::_run(rx);
@ -40,7 +41,7 @@ impl FakeDriver {
println!("{}", state); println!("{}", state);
} else { } else {
// channel has been disconnected, shutdown. // channel has been disconnected, shutdown.
log::info!("Fakedriver shutting down..."); log::info!("Fakedriver shutting down");
return; return;
} }
} }

0
src/script.js Normal file
View File

View File

@ -1,13 +1,11 @@
use crate::TallyState; use crate::TallyState;
use futures::stream; use actix::prelude::*;
use futures::sync::mpsc::Receiver; use actix_web::{server, App, HttpResponse};
use futures::sync::oneshot; use futures::future::Future;
use std::sync::{Arc, Mutex}; use std::sync::mpsc;
use std::thread; use std::thread;
use typed_html::dom::DOMTree; use typed_html::dom::DOMTree;
use typed_html::html; use typed_html::html;
use warp::filters::ws::Message;
use warp::{Filter, Future, Stream};
fn build_template() -> DOMTree<String> { fn build_template() -> DOMTree<String> {
html!( html!(
@ -24,12 +22,12 @@ fn build_template() -> DOMTree<String> {
} }
pub struct WebServer { pub struct WebServer {
shutdown_chan: oneshot::Sender<()>, addr: Addr<actix_net::server::Server>,
shutdown_mutex: Arc<Mutex<()>>, shutdown_rx: mpsc::Receiver<()>,
} }
impl WebServer { impl WebServer {
pub fn run(addr: std::net::SocketAddr, tally_rx: Receiver<TallyState>) -> WebServer { pub fn run(addr: std::net::SocketAddr) -> WebServer {
// todo: not have the damn js inline in the func // todo: not have the damn js inline in the func
const JS: &'static str = r#" const JS: &'static str = r#"
var sock = new WebSocket("ws://" + window.location.host + "/ws"); var sock = new WebSocket("ws://" + window.location.host + "/ws");
@ -39,46 +37,50 @@ sock.onmessage = function (e) {
sock.onopen = function (e) { sock.onopen = function (e) {
sock.send("Hello!"); sock.send("Hello!");
}"#; }"#;
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); let (shutdown_tx, shutdown_rx) = mpsc::channel();
let (tx, rx) = mpsc::channel();
let shutdown_mutex = Arc::new(Mutex::new(()));
let thread_shutdown_mutex = Arc::clone(&shutdown_mutex);
thread::spawn(move || { thread::spawn(move || {
// N.B! Despite the fact we never use the value inside the mutex (it's () after all..) we need to bind let sys = actix::System::new("test");
// it to a variable so that it lives until the end of the thread, else it would unlock immediately. let addr = server::new(|| {
let _lock = thread_shutdown_mutex.lock().unwrap(); App::new()
.resource("/", |r| {
let websocket = warp::path("ws").and(warp::ws2()).map(|ws: warp::ws::Ws2| { r.f(|_r| {
ws.on_upgrade(|websocket| { HttpResponse::Ok()
let (tx, _) = websocket.split(); .content_type("text/html")
//stream::once::<Message, _>(Ok(Message::text("Hello"))) .body(build_template().to_string())
tally_rx
.map(|state| Message::text(format!("{}", state)))
.forward(tx)
.map(|_| ())
.map_err(|e| {
log::error!("Websocket error: {}", e);
}) })
}) })
}); .resource("script.js", |r| {
let script = warp::path("script.js").map(|| JS); r.f(|_r| HttpResponse::Ok().content_type("text/javascript").body(JS))
let home = warp::path::end().map(|| warp::reply::html(build_template().to_string())); })
let routes = warp::get2().and(websocket.or(script).or(home)); //.resource("/ws", )
})
.bind(addr)
.expect("Error binding to address.")
.start();
let (_addr, server) = tx.send(addr).unwrap();
warp::serve(routes).bind_with_graceful_shutdown(addr, shutdown_rx);
tokio::run(server); sys.run();
shutdown_tx.send(()).unwrap(); // Notify that we have shutdown
}); });
let addr = rx.recv().unwrap();
WebServer { WebServer {
shutdown_chan: shutdown_tx, addr: addr,
shutdown_mutex, shutdown_rx: shutdown_rx,
} }
} }
pub fn shutdown(self) { pub fn shutdown(self) {
log::debug!("Web server shutting down..."); log::debug!("Web server shutting down...");
self.shutdown_chan.send(()).unwrap(); self.addr
let _lock = self.shutdown_mutex.lock().unwrap(); .send(actix_web::server::StopServer { graceful: true })
.wait()
.unwrap()
.unwrap();
let _ = self.shutdown_rx.recv().unwrap(); // wait for webserver shutdown
log::info!("Web server shutdown"); log::info!("Web server shutdown");
} }
} }