Add initial websocket server, with working connect/disconnect/broadcast

This commit is contained in:
Sam W 2019-03-11 19:01:06 +00:00
parent 6cd569b4e8
commit a54c154606
3 changed files with 146 additions and 15 deletions

View File

@ -7,6 +7,8 @@ use std::thread;
use typed_html::dom::DOMTree; use typed_html::dom::DOMTree;
use typed_html::html; use typed_html::html;
pub mod websocket; pub mod websocket;
mod websocketsession;
use websocketsession::ws_route;
fn build_template() -> DOMTree<String> { fn build_template() -> DOMTree<String> {
html!( html!(
@ -50,19 +52,21 @@ sock.onopen = function (e) {
let ws_tx = ws_tx_rx.recv().unwrap(); // Block waiting to get the websocket update sender let ws_tx = ws_tx_rx.recv().unwrap(); // Block waiting to get the websocket update sender
log::info!("Got tx"); log::info!("Got tx");
let addr = server::new(|| { let addr = server::new(move || {
App::new() App::with_state(websocketsession::WSSessionState {
.resource("/", |r| { addr: wsserver.clone(),
r.f(|_r| { })
HttpResponse::Ok() .resource("/", |r| {
.content_type("text/html") r.f(|_r| {
.body(build_template().to_string()) HttpResponse::Ok()
}) .content_type("text/html")
.body(build_template().to_string())
}) })
.resource("script.js", |r| { })
r.f(|_r| HttpResponse::Ok().content_type("text/javascript").body(JS)) .resource("script.js", |r| {
}) r.f(|_r| HttpResponse::Ok().content_type("text/javascript").body(JS))
//.resource("/ws", ) })
.resource("/ws", |r| r.route().f(ws_route))
}) })
.bind(addr) .bind(addr)
.expect("Error binding to address.") .expect("Error binding to address.")

View File

@ -1,6 +1,7 @@
use crate::TallyState; use crate::TallyState;
use actix::prelude::*; use actix::prelude::*;
use futures::sync::mpsc::{Receiver, Sender}; use futures::sync::mpsc::Sender;
use std::collections::HashMap;
#[derive(Message)] #[derive(Message)]
pub struct Message(pub String); pub struct Message(pub String);
@ -10,6 +11,11 @@ pub struct Connect {
pub addr: Recipient<Message>, pub addr: Recipient<Message>,
} }
#[derive(Message)]
pub struct Disconnect {
pub addr: Recipient<Message>,
}
#[derive(Message)] #[derive(Message)]
pub struct NewState(Result<TallyState, ()>); pub struct NewState(Result<TallyState, ()>);
@ -22,11 +28,14 @@ impl From<TallyState> for NewState {
pub struct WSServer { pub struct WSServer {
state_tx_tx: std::sync::mpsc::Sender<Sender<NewState>>, state_tx_tx: std::sync::mpsc::Sender<Sender<NewState>>,
clients: HashMap<Recipient<Message>, ()>,
} }
impl StreamHandler<NewState, ()> for WSServer { impl StreamHandler<NewState, ()> for WSServer {
fn handle(&mut self, item: NewState, ctx: &mut Context<WSServer>) { fn handle(&mut self, item: NewState, ctx: &mut Context<WSServer>) {
log::info!("Weobsockety! {}", item.0.unwrap()); if let Ok(state) = item.0 {
self.broadcast(state.to_string())
}
} }
} }
@ -43,6 +52,34 @@ impl Actor for WSServer {
impl WSServer { impl WSServer {
pub fn new(state_tx_tx: std::sync::mpsc::Sender<Sender<NewState>>) -> WSServer { pub fn new(state_tx_tx: std::sync::mpsc::Sender<Sender<NewState>>) -> WSServer {
WSServer { state_tx_tx } WSServer {
state_tx_tx,
clients: HashMap::new(),
}
}
fn broadcast(&mut self, msg: String) {
for c in self.clients.iter_mut() {
c.0.do_send(Message(msg.to_owned()))
.unwrap_or_else(|e| log::error!("Error sending message to client: {}", e));
}
}
}
impl Handler<Connect> for WSServer {
type Result = ();
fn handle(&mut self, msg: Connect, _: &mut Context<Self>) -> Self::Result {
log::info!("Client connected!");
self.clients.insert(msg.addr, ());
}
}
impl Handler<Disconnect> for WSServer {
type Result = ();
fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) -> Self::Result {
log::info!("Disconnected");
self.clients.remove(&msg.addr);
} }
} }

View File

@ -0,0 +1,90 @@
use super::websocket;
use actix::*;
use actix_web::{ws, Error, HttpRequest, HttpResponse};
use std::time::{Duration, Instant};
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
pub struct WSSessionState {
pub addr: Addr<websocket::WSServer>,
}
pub fn ws_route(req: &HttpRequest<WSSessionState>) -> Result<HttpResponse, Error> {
ws::start(req, WSSession { hb: Instant::now() })
}
struct WSSession {
hb: Instant,
}
impl Actor for WSSession {
type Context = ws::WebsocketContext<Self, WSSessionState>;
fn started(&mut self, ctx: &mut Self::Context) {
self.hb(ctx);
ctx.state()
.addr
.send(websocket::Connect {
addr: ctx.address().recipient(),
})
.into_actor(self)
.then(|res, act, ctx| {
match res {
Ok(res) => (),
_ => ctx.stop(),
}
fut::ok(())
})
.wait(ctx);
}
fn stopping(&mut self, ctx: &mut Self::Context) -> Running {
ctx.state().addr.do_send(websocket::Disconnect {
addr: ctx.address().recipient(),
});
Running::Stop
}
}
impl Handler<websocket::Message> for WSSession {
type Result = ();
fn handle(&mut self, msg: websocket::Message, ctx: &mut Self::Context) {
ctx.text(msg.0);
}
}
impl StreamHandler<ws::Message, ws::ProtocolError> for WSSession {
fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
match msg {
ws::Message::Ping(msg) => {
self.hb = Instant::now();
ctx.pong(&msg);
}
ws::Message::Pong(_) => {
self.hb = Instant::now();
}
ws::Message::Text(_) => log::info!("Unexpected txt essage from client"),
ws::Message::Binary(_) => log::info!("Unexpected binary message from client"),
ws::Message::Close(_) => ctx.stop(),
}
}
}
impl WSSession {
fn hb(&self, ctx: &mut ws::WebsocketContext<Self, WSSessionState>) {
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
log::info!("Websocket client timed out, disconnecting!");
ctx.state().addr.do_send(websocket::Disconnect {
addr: ctx.address().recipient(),
});
ctx.stop();
return;
}
ctx.ping("");
});
}
}