From a54c154606fac85b77d8ea1cd772db123ec015eb Mon Sep 17 00:00:00 2001 From: Sam Willcocks Date: Mon, 11 Mar 2019 19:01:06 +0000 Subject: [PATCH] Add initial websocket server, with working connect/disconnect/broadcast --- src/web.rs | 28 +++++++----- src/web/websocket.rs | 43 ++++++++++++++++-- src/web/websocketsession.rs | 90 +++++++++++++++++++++++++++++++++++++ 3 files changed, 146 insertions(+), 15 deletions(-) create mode 100644 src/web/websocketsession.rs diff --git a/src/web.rs b/src/web.rs index 4418958..554f5f4 100644 --- a/src/web.rs +++ b/src/web.rs @@ -7,6 +7,8 @@ use std::thread; use typed_html::dom::DOMTree; use typed_html::html; pub mod websocket; +mod websocketsession; +use websocketsession::ws_route; fn build_template() -> DOMTree { html!( @@ -50,19 +52,21 @@ sock.onopen = function (e) { let ws_tx = ws_tx_rx.recv().unwrap(); // Block waiting to get the websocket update sender log::info!("Got tx"); - let addr = server::new(|| { - App::new() - .resource("/", |r| { - r.f(|_r| { - HttpResponse::Ok() - .content_type("text/html") - .body(build_template().to_string()) - }) + let addr = server::new(move || { + App::with_state(websocketsession::WSSessionState { + addr: wsserver.clone(), + }) + .resource("/", |r| { + r.f(|_r| { + HttpResponse::Ok() + .content_type("text/html") + .body(build_template().to_string()) }) - .resource("script.js", |r| { - r.f(|_r| HttpResponse::Ok().content_type("text/javascript").body(JS)) - }) - //.resource("/ws", ) + }) + .resource("script.js", |r| { + r.f(|_r| HttpResponse::Ok().content_type("text/javascript").body(JS)) + }) + .resource("/ws", |r| r.route().f(ws_route)) }) .bind(addr) .expect("Error binding to address.") diff --git a/src/web/websocket.rs b/src/web/websocket.rs index 81be542..59409af 100644 --- a/src/web/websocket.rs +++ b/src/web/websocket.rs @@ -1,6 +1,7 @@ use crate::TallyState; use actix::prelude::*; -use futures::sync::mpsc::{Receiver, Sender}; +use futures::sync::mpsc::Sender; +use std::collections::HashMap; #[derive(Message)] pub struct Message(pub String); @@ -10,6 +11,11 @@ pub struct Connect { pub addr: Recipient, } +#[derive(Message)] +pub struct Disconnect { + pub addr: Recipient, +} + #[derive(Message)] pub struct NewState(Result); @@ -22,11 +28,14 @@ impl From for NewState { pub struct WSServer { state_tx_tx: std::sync::mpsc::Sender>, + clients: HashMap, ()>, } impl StreamHandler for WSServer { fn handle(&mut self, item: NewState, ctx: &mut Context) { - log::info!("Weobsockety! {}", item.0.unwrap()); + if let Ok(state) = item.0 { + self.broadcast(state.to_string()) + } } } @@ -43,6 +52,34 @@ impl Actor for WSServer { impl WSServer { pub fn new(state_tx_tx: std::sync::mpsc::Sender>) -> WSServer { - WSServer { state_tx_tx } + WSServer { + state_tx_tx, + clients: HashMap::new(), + } + } + + fn broadcast(&mut self, msg: String) { + for c in self.clients.iter_mut() { + c.0.do_send(Message(msg.to_owned())) + .unwrap_or_else(|e| log::error!("Error sending message to client: {}", e)); + } + } +} + +impl Handler for WSServer { + type Result = (); + + fn handle(&mut self, msg: Connect, _: &mut Context) -> Self::Result { + log::info!("Client connected!"); + self.clients.insert(msg.addr, ()); + } +} + +impl Handler for WSServer { + type Result = (); + + fn handle(&mut self, msg: Disconnect, _: &mut Context) -> Self::Result { + log::info!("Disconnected"); + self.clients.remove(&msg.addr); } } diff --git a/src/web/websocketsession.rs b/src/web/websocketsession.rs new file mode 100644 index 0000000..9fa8163 --- /dev/null +++ b/src/web/websocketsession.rs @@ -0,0 +1,90 @@ +use super::websocket; +use actix::*; +use actix_web::{ws, Error, HttpRequest, HttpResponse}; +use std::time::{Duration, Instant}; + +const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); +const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); + +pub struct WSSessionState { + pub addr: Addr, +} + +pub fn ws_route(req: &HttpRequest) -> Result { + ws::start(req, WSSession { hb: Instant::now() }) +} + +struct WSSession { + hb: Instant, +} + +impl Actor for WSSession { + type Context = ws::WebsocketContext; + + fn started(&mut self, ctx: &mut Self::Context) { + self.hb(ctx); + ctx.state() + .addr + .send(websocket::Connect { + addr: ctx.address().recipient(), + }) + .into_actor(self) + .then(|res, act, ctx| { + match res { + Ok(res) => (), + _ => ctx.stop(), + } + fut::ok(()) + }) + .wait(ctx); + } + + fn stopping(&mut self, ctx: &mut Self::Context) -> Running { + ctx.state().addr.do_send(websocket::Disconnect { + addr: ctx.address().recipient(), + }); + Running::Stop + } +} + +impl Handler for WSSession { + type Result = (); + + fn handle(&mut self, msg: websocket::Message, ctx: &mut Self::Context) { + ctx.text(msg.0); + } +} + +impl StreamHandler for WSSession { + fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { + match msg { + ws::Message::Ping(msg) => { + self.hb = Instant::now(); + ctx.pong(&msg); + } + ws::Message::Pong(_) => { + self.hb = Instant::now(); + } + ws::Message::Text(_) => log::info!("Unexpected txt essage from client"), + ws::Message::Binary(_) => log::info!("Unexpected binary message from client"), + ws::Message::Close(_) => ctx.stop(), + } + } +} + +impl WSSession { + fn hb(&self, ctx: &mut ws::WebsocketContext) { + ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { + if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { + log::info!("Websocket client timed out, disconnecting!"); + + ctx.state().addr.do_send(websocket::Disconnect { + addr: ctx.address().recipient(), + }); + ctx.stop(); + return; + } + ctx.ping(""); + }); + } +}