diff --git a/src/main.rs b/src/main.rs index 194b8d2..aa4557c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,14 +5,15 @@ mod web; use crate::adaptors::{Adaptor, FakeAdaptor}; use crate::drivers::{Driver, FakeDriver}; use crossbeam_channel::{select, Receiver}; +use futures::sink::Sink; use simplelog::{Config, LevelFilter, TermLogger}; use std::fmt; use web::WebServer; -#[derive(Clone)] +#[derive(Clone, Copy)] pub struct Channel(Option); -#[derive(Clone)] +#[derive(Clone, Copy)] pub struct TallyState { on_air: Channel, preview: Channel, @@ -43,9 +44,10 @@ impl Server { let rx = self.adaptor.run(); let tx = self.driver.run(); let webaddr = ([127, 0, 0, 1], 3000).into(); - //let (websocket_tx, websocket_rx) = + log::info!("Webserver starting on: {}", webaddr); - let web = WebServer::run(webaddr); + let (web, ws_tx) = WebServer::run(webaddr); + let mut ws_tx_wait = ws_tx.wait(); loop { select! { recv(signal_rx) -> _ => { @@ -64,7 +66,9 @@ impl Server { break; }, recv(rx) -> state => { - tx.send(state.unwrap()).unwrap(); + let thestate = state.unwrap(); + tx.send(thestate).unwrap(); + ws_tx_wait.send(web::websocket::NewState::from(thestate)).expect("Error sending to websocket channel"); } } } diff --git a/src/web.rs b/src/web.rs index 3176d25..4418958 100644 --- a/src/web.rs +++ b/src/web.rs @@ -6,6 +6,7 @@ use std::sync::mpsc; use std::thread; use typed_html::dom::DOMTree; use typed_html::html; +pub mod websocket; fn build_template() -> DOMTree { html!( @@ -27,7 +28,9 @@ pub struct WebServer { } impl WebServer { - pub fn run(addr: std::net::SocketAddr) -> WebServer { + pub fn run( + addr: std::net::SocketAddr, + ) -> (WebServer, futures::sync::mpsc::Sender) { // todo: not have the damn js inline in the func const JS: &'static str = r#" var sock = new WebSocket("ws://" + window.location.host + "/ws"); @@ -37,10 +40,16 @@ sock.onmessage = function (e) { sock.onopen = function (e) { sock.send("Hello!"); }"#; + let (ws_tx_tx, ws_tx_rx) = mpsc::channel(); // Channel the websocket sender will be sent on let (shutdown_tx, shutdown_rx) = mpsc::channel(); let (tx, rx) = mpsc::channel(); thread::spawn(move || { let sys = actix::System::new("test"); + + let wsserver = Arbiter::start(|_| websocket::WSServer::new(ws_tx_tx)); + let ws_tx = ws_tx_rx.recv().unwrap(); // Block waiting to get the websocket update sender + log::info!("Got tx"); + let addr = server::new(|| { App::new() .resource("/", |r| { @@ -59,18 +68,21 @@ sock.onopen = function (e) { .expect("Error binding to address.") .start(); - tx.send(addr).unwrap(); + tx.send((addr, ws_tx)).unwrap(); sys.run(); shutdown_tx.send(()).unwrap(); // Notify that we have shutdown }); - let addr = rx.recv().unwrap(); + let (addr, ws_tx) = rx.recv().unwrap(); - WebServer { - addr: addr, - shutdown_rx: shutdown_rx, - } + ( + WebServer { + addr: addr, + shutdown_rx: shutdown_rx, + }, + ws_tx, + ) } pub fn shutdown(self) { diff --git a/src/web/websocket.rs b/src/web/websocket.rs new file mode 100644 index 0000000..81be542 --- /dev/null +++ b/src/web/websocket.rs @@ -0,0 +1,48 @@ +use crate::TallyState; +use actix::prelude::*; +use futures::sync::mpsc::{Receiver, Sender}; + +#[derive(Message)] +pub struct Message(pub String); + +#[derive(Message)] +pub struct Connect { + pub addr: Recipient, +} + +#[derive(Message)] +pub struct NewState(Result); + +// Basically just convenience so you don't have to construct a NewState with an Ok all the time +impl From for NewState { + fn from(state: TallyState) -> Self { + NewState(Ok(state)) + } +} + +pub struct WSServer { + state_tx_tx: std::sync::mpsc::Sender>, +} + +impl StreamHandler for WSServer { + fn handle(&mut self, item: NewState, ctx: &mut Context) { + log::info!("Weobsockety! {}", item.0.unwrap()); + } +} + +impl Actor for WSServer { + type Context = Context; + + fn started(&mut self, ctx: &mut Self::Context) { + log::info!("Websocket started"); + let (tx, rx) = futures::sync::mpsc::channel::(10); + Self::add_stream(rx, ctx); + self.state_tx_tx.send(tx).unwrap(); + } +} + +impl WSServer { + pub fn new(state_tx_tx: std::sync::mpsc::Sender>) -> WSServer { + WSServer { state_tx_tx } + } +}