From 6042cc7a8239745dad51517ee4a2085b647bb221 Mon Sep 17 00:00:00 2001 From: Sam Willcocks Date: Sun, 17 Jul 2022 03:28:59 +0100 Subject: [PATCH] Add proper shutdown, discord presence --- src/discord.rs | 60 +++++++++++++++++++++++++++++++++++++++----------- src/main.rs | 33 ++++++++++++++++++++++----- 2 files changed, 74 insertions(+), 19 deletions(-) diff --git a/src/discord.rs b/src/discord.rs index bc6f7c0..9005804 100644 --- a/src/discord.rs +++ b/src/discord.rs @@ -4,12 +4,18 @@ use rand::seq::SliceRandom; use rand::{prelude::ThreadRng, thread_rng}; use std::cell::RefCell; use std::sync::Arc; +use tokio::select; +use tokio::sync::{broadcast, mpsc}; use tracing::{event, instrument, Level}; use twilight_gateway::{Cluster, Event}; use twilight_http::Client; use twilight_model::{ channel::message::Message, - gateway::Intents, + gateway::{ + payload::outgoing::update_presence::UpdatePresencePayload, + presence::{ActivityType, MinimalActivity, Status}, + Intents, + }, id::{marker::UserMarker, Id}, }; @@ -18,25 +24,53 @@ thread_local!(static RNG: RefCell = RefCell::new(thread_rng())); const GREETINGS: &'static [&'static str] = &["You rung?", "Hello there", "Right back atcha", "Yes?"]; -pub async fn run_discord(token: &str) -> StdErr<()> { +pub async fn run_discord( + token: &str, + mut shutdown: broadcast::Receiver<()>, + _done: mpsc::Sender<()>, +) -> StdErr<()> { event!(Level::INFO, "Starting..."); - let (cluster, mut events) = Cluster::new( + let (cluster, mut events) = Cluster::builder( token.to_owned(), Intents::GUILD_MESSAGES | Intents::MESSAGE_CONTENT, ) + .presence(UpdatePresencePayload::new( + vec![MinimalActivity { + kind: ActivityType::Listening, + name: "Your INVITEs".to_owned(), + url: None, + } + .into()], + false, + None, + Status::Online, + )?) + .build() .await?; - tokio::spawn(async move { - cluster.up().await; - }); + + let cluster = Arc::new(cluster); + let cluster_spawn = Arc::clone(&cluster); + tokio::spawn(async move { cluster_spawn.up().await }); let client = Arc::new(Client::new(token.to_owned())); - while let Some((shard_id, ev)) = events.next().await { - let client = client.clone(); - tokio::spawn(async move { - if let Err(e) = handle_event(ev, shard_id, client).await { - event!(Level::ERROR, err=?e, "Error handling discord event"); - } - }); + loop { + select!( + Some((shard_id, ev)) = events.next() => { + let client = client.clone(); + tokio::spawn(async move { + if let Err(e) = handle_event(ev, shard_id, client).await { + event!(Level::ERROR, err=?e, "Error handling discord event"); + } + }); + }, + _ = shutdown.recv() => { + event!(Level::INFO, "Shutting down..."); + cluster.down(); + break; + }, + + ); } + event!(Level::INFO, "Done"); Ok(()) } diff --git a/src/main.rs b/src/main.rs index 2c704bc..53b62c5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,6 @@ use futures::{sink::Sink, SinkExt}; +use tokio::sync::{broadcast, mpsc}; +use tokio::signal; use rsip::common::method::Method as SipMethod; use rsip::headers::header::Header as SipHeader; use rsip::message::{request::Request, response::Response, SipMessage}; @@ -11,7 +13,6 @@ use std::env; use std::net::IpAddr; use std::time::Duration; use tokio::net::UdpSocket; -use tokio::sync::mpsc; use tokio::time::sleep; use tokio_stream::StreamExt; use tokio_util::{ @@ -41,8 +42,8 @@ struct CurrentCall { } impl Server { - #[instrument] - async fn run_sip() -> StdErr<()> { + #[instrument(skip(shutdown, _done))] + async fn run_sip(mut shutdown: broadcast::Receiver<()>, _done: mpsc::Sender<()>) -> StdErr<()> { event!(Level::INFO, "Starting..."); let socket = UdpSocket::bind(format!("{}:{}", BIND_ADDR, SIP_PORT)).await?; let mut framed = UdpFramed::new(socket, SipCodec {}); @@ -54,6 +55,10 @@ impl Server { let (response_tx, mut response_rx) = mpsc::channel(10); loop { tokio::select! { + _ = shutdown.recv() => { + event!(Level::INFO, "Shutting down..."); + break; + } // We got a new response to send Some((res, send_to)) = response_rx.recv() => { event!(Level::INFO, remote=%send_to, "Sending response"); @@ -133,6 +138,8 @@ impl Server { } } } + event!(Level::INFO, "Done."); + Ok(()) } // Check that the media description is something we can handle, returning the selected format @@ -317,8 +324,22 @@ impl Server { async fn main() -> Result<(), Box> { tracing_subscriber::fmt::init(); let discord_token = env::var("DISCORD_TOKEN")?; - tokio::spawn(async move { - discord::run_discord(&discord_token).await; + let (shutdown_tx, shutdown_rx_1) = broadcast::channel(1); + let (done_tx, mut done_rx) = mpsc::channel(1); + let done_2 = done_tx.clone(); + tokio::spawn( async move { + discord::run_discord(&discord_token, shutdown_rx_1, done_tx.clone()).await; }); - Server::run_sip().await + let sd_2 = shutdown_tx.subscribe(); + tokio::spawn(async move { + Server::run_sip(sd_2, done_2).await; + }); + + signal::ctrl_c().await?; + event!(Level::INFO, "Got interrupt, shutting down"); + shutdown_tx.send(())?; + let _ = done_rx.recv().await; + event!(Level::INFO, "Shut down. Goodbye!"); + + Ok(()) }