Add beginnings of websocket server that can receive new tally states

This commit is contained in:
Sam W 2019-03-08 00:14:57 +00:00
parent 173d431400
commit 6cd569b4e8
3 changed files with 76 additions and 12 deletions

View File

@ -5,14 +5,15 @@ mod web;
use crate::adaptors::{Adaptor, FakeAdaptor}; use crate::adaptors::{Adaptor, FakeAdaptor};
use crate::drivers::{Driver, FakeDriver}; use crate::drivers::{Driver, FakeDriver};
use crossbeam_channel::{select, Receiver}; use crossbeam_channel::{select, Receiver};
use futures::sink::Sink;
use simplelog::{Config, LevelFilter, TermLogger}; use simplelog::{Config, LevelFilter, TermLogger};
use std::fmt; use std::fmt;
use web::WebServer; use web::WebServer;
#[derive(Clone)] #[derive(Clone, Copy)]
pub struct Channel(Option<u8>); pub struct Channel(Option<u8>);
#[derive(Clone)] #[derive(Clone, Copy)]
pub struct TallyState { pub struct TallyState {
on_air: Channel, on_air: Channel,
preview: Channel, preview: Channel,
@ -43,9 +44,10 @@ impl<T: Adaptor, U: Driver> Server<T, U> {
let rx = self.adaptor.run(); let rx = self.adaptor.run();
let tx = self.driver.run(); let tx = self.driver.run();
let webaddr = ([127, 0, 0, 1], 3000).into(); let webaddr = ([127, 0, 0, 1], 3000).into();
//let (websocket_tx, websocket_rx) =
log::info!("Webserver starting on: {}", webaddr); log::info!("Webserver starting on: {}", webaddr);
let web = WebServer::run(webaddr); let (web, ws_tx) = WebServer::run(webaddr);
let mut ws_tx_wait = ws_tx.wait();
loop { loop {
select! { select! {
recv(signal_rx) -> _ => { recv(signal_rx) -> _ => {
@ -64,7 +66,9 @@ impl<T: Adaptor, U: Driver> Server<T, U> {
break; break;
}, },
recv(rx) -> state => { recv(rx) -> state => {
tx.send(state.unwrap()).unwrap(); let thestate = state.unwrap();
tx.send(thestate).unwrap();
ws_tx_wait.send(web::websocket::NewState::from(thestate)).expect("Error sending to websocket channel");
} }
} }
} }

View File

@ -6,6 +6,7 @@ use std::sync::mpsc;
use std::thread; use std::thread;
use typed_html::dom::DOMTree; use typed_html::dom::DOMTree;
use typed_html::html; use typed_html::html;
pub mod websocket;
fn build_template() -> DOMTree<String> { fn build_template() -> DOMTree<String> {
html!( html!(
@ -27,7 +28,9 @@ pub struct WebServer {
} }
impl WebServer { impl WebServer {
pub fn run(addr: std::net::SocketAddr) -> WebServer { pub fn run(
addr: std::net::SocketAddr,
) -> (WebServer, futures::sync::mpsc::Sender<websocket::NewState>) {
// todo: not have the damn js inline in the func // todo: not have the damn js inline in the func
const JS: &'static str = r#" const JS: &'static str = r#"
var sock = new WebSocket("ws://" + window.location.host + "/ws"); var sock = new WebSocket("ws://" + window.location.host + "/ws");
@ -37,10 +40,16 @@ sock.onmessage = function (e) {
sock.onopen = function (e) { sock.onopen = function (e) {
sock.send("Hello!"); sock.send("Hello!");
}"#; }"#;
let (ws_tx_tx, ws_tx_rx) = mpsc::channel(); // Channel the websocket sender will be sent on
let (shutdown_tx, shutdown_rx) = mpsc::channel(); let (shutdown_tx, shutdown_rx) = mpsc::channel();
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
thread::spawn(move || { thread::spawn(move || {
let sys = actix::System::new("test"); let sys = actix::System::new("test");
let wsserver = Arbiter::start(|_| websocket::WSServer::new(ws_tx_tx));
let ws_tx = ws_tx_rx.recv().unwrap(); // Block waiting to get the websocket update sender
log::info!("Got tx");
let addr = server::new(|| { let addr = server::new(|| {
App::new() App::new()
.resource("/", |r| { .resource("/", |r| {
@ -59,18 +68,21 @@ sock.onopen = function (e) {
.expect("Error binding to address.") .expect("Error binding to address.")
.start(); .start();
tx.send(addr).unwrap(); tx.send((addr, ws_tx)).unwrap();
sys.run(); sys.run();
shutdown_tx.send(()).unwrap(); // Notify that we have shutdown shutdown_tx.send(()).unwrap(); // Notify that we have shutdown
}); });
let addr = rx.recv().unwrap(); let (addr, ws_tx) = rx.recv().unwrap();
WebServer { (
addr: addr, WebServer {
shutdown_rx: shutdown_rx, addr: addr,
} shutdown_rx: shutdown_rx,
},
ws_tx,
)
} }
pub fn shutdown(self) { pub fn shutdown(self) {

48
src/web/websocket.rs Normal file
View File

@ -0,0 +1,48 @@
use crate::TallyState;
use actix::prelude::*;
use futures::sync::mpsc::{Receiver, Sender};
#[derive(Message)]
pub struct Message(pub String);
#[derive(Message)]
pub struct Connect {
pub addr: Recipient<Message>,
}
#[derive(Message)]
pub struct NewState(Result<TallyState, ()>);
// Basically just convenience so you don't have to construct a NewState with an Ok all the time
impl From<TallyState> for NewState {
fn from(state: TallyState) -> Self {
NewState(Ok(state))
}
}
pub struct WSServer {
state_tx_tx: std::sync::mpsc::Sender<Sender<NewState>>,
}
impl StreamHandler<NewState, ()> for WSServer {
fn handle(&mut self, item: NewState, ctx: &mut Context<WSServer>) {
log::info!("Weobsockety! {}", item.0.unwrap());
}
}
impl Actor for WSServer {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
log::info!("Websocket started");
let (tx, rx) = futures::sync::mpsc::channel::<NewState>(10);
Self::add_stream(rx, ctx);
self.state_tx_tx.send(tx).unwrap();
}
}
impl WSServer {
pub fn new(state_tx_tx: std::sync::mpsc::Sender<Sender<NewState>>) -> WSServer {
WSServer { state_tx_tx }
}
}