Add proper shutdown, discord presence

This commit is contained in:
Sam W 2022-07-17 03:28:59 +01:00
parent c80a99310a
commit 6042cc7a82
2 changed files with 74 additions and 19 deletions

View File

@ -4,12 +4,18 @@ use rand::seq::SliceRandom;
use rand::{prelude::ThreadRng, thread_rng}; use rand::{prelude::ThreadRng, thread_rng};
use std::cell::RefCell; use std::cell::RefCell;
use std::sync::Arc; use std::sync::Arc;
use tokio::select;
use tokio::sync::{broadcast, mpsc};
use tracing::{event, instrument, Level}; use tracing::{event, instrument, Level};
use twilight_gateway::{Cluster, Event}; use twilight_gateway::{Cluster, Event};
use twilight_http::Client; use twilight_http::Client;
use twilight_model::{ use twilight_model::{
channel::message::Message, channel::message::Message,
gateway::Intents, gateway::{
payload::outgoing::update_presence::UpdatePresencePayload,
presence::{ActivityType, MinimalActivity, Status},
Intents,
},
id::{marker::UserMarker, Id}, id::{marker::UserMarker, Id},
}; };
@ -18,25 +24,53 @@ thread_local!(static RNG: RefCell<ThreadRng> = RefCell::new(thread_rng()));
const GREETINGS: &'static [&'static str] = const GREETINGS: &'static [&'static str] =
&["You rung?", "Hello there", "Right back atcha", "Yes?"]; &["You rung?", "Hello there", "Right back atcha", "Yes?"];
pub async fn run_discord(token: &str) -> StdErr<()> { pub async fn run_discord(
token: &str,
mut shutdown: broadcast::Receiver<()>,
_done: mpsc::Sender<()>,
) -> StdErr<()> {
event!(Level::INFO, "Starting..."); event!(Level::INFO, "Starting...");
let (cluster, mut events) = Cluster::new( let (cluster, mut events) = Cluster::builder(
token.to_owned(), token.to_owned(),
Intents::GUILD_MESSAGES | Intents::MESSAGE_CONTENT, Intents::GUILD_MESSAGES | Intents::MESSAGE_CONTENT,
) )
.presence(UpdatePresencePayload::new(
vec![MinimalActivity {
kind: ActivityType::Listening,
name: "Your INVITEs".to_owned(),
url: None,
}
.into()],
false,
None,
Status::Online,
)?)
.build()
.await?; .await?;
tokio::spawn(async move {
cluster.up().await; let cluster = Arc::new(cluster);
}); let cluster_spawn = Arc::clone(&cluster);
tokio::spawn(async move { cluster_spawn.up().await });
let client = Arc::new(Client::new(token.to_owned())); let client = Arc::new(Client::new(token.to_owned()));
while let Some((shard_id, ev)) = events.next().await { loop {
let client = client.clone(); select!(
tokio::spawn(async move { Some((shard_id, ev)) = events.next() => {
if let Err(e) = handle_event(ev, shard_id, client).await { let client = client.clone();
event!(Level::ERROR, err=?e, "Error handling discord event"); tokio::spawn(async move {
} if let Err(e) = handle_event(ev, shard_id, client).await {
}); event!(Level::ERROR, err=?e, "Error handling discord event");
}
});
},
_ = shutdown.recv() => {
event!(Level::INFO, "Shutting down...");
cluster.down();
break;
},
);
} }
event!(Level::INFO, "Done");
Ok(()) Ok(())
} }

View File

@ -1,4 +1,6 @@
use futures::{sink::Sink, SinkExt}; use futures::{sink::Sink, SinkExt};
use tokio::sync::{broadcast, mpsc};
use tokio::signal;
use rsip::common::method::Method as SipMethod; use rsip::common::method::Method as SipMethod;
use rsip::headers::header::Header as SipHeader; use rsip::headers::header::Header as SipHeader;
use rsip::message::{request::Request, response::Response, SipMessage}; use rsip::message::{request::Request, response::Response, SipMessage};
@ -11,7 +13,6 @@ use std::env;
use std::net::IpAddr; use std::net::IpAddr;
use std::time::Duration; use std::time::Duration;
use tokio::net::UdpSocket; use tokio::net::UdpSocket;
use tokio::sync::mpsc;
use tokio::time::sleep; use tokio::time::sleep;
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use tokio_util::{ use tokio_util::{
@ -41,8 +42,8 @@ struct CurrentCall {
} }
impl Server { impl Server {
#[instrument] #[instrument(skip(shutdown, _done))]
async fn run_sip() -> StdErr<()> { async fn run_sip(mut shutdown: broadcast::Receiver<()>, _done: mpsc::Sender<()>) -> StdErr<()> {
event!(Level::INFO, "Starting..."); event!(Level::INFO, "Starting...");
let socket = UdpSocket::bind(format!("{}:{}", BIND_ADDR, SIP_PORT)).await?; let socket = UdpSocket::bind(format!("{}:{}", BIND_ADDR, SIP_PORT)).await?;
let mut framed = UdpFramed::new(socket, SipCodec {}); let mut framed = UdpFramed::new(socket, SipCodec {});
@ -54,6 +55,10 @@ impl Server {
let (response_tx, mut response_rx) = mpsc::channel(10); let (response_tx, mut response_rx) = mpsc::channel(10);
loop { loop {
tokio::select! { tokio::select! {
_ = shutdown.recv() => {
event!(Level::INFO, "Shutting down...");
break;
}
// We got a new response to send // We got a new response to send
Some((res, send_to)) = response_rx.recv() => { Some((res, send_to)) = response_rx.recv() => {
event!(Level::INFO, remote=%send_to, "Sending response"); event!(Level::INFO, remote=%send_to, "Sending response");
@ -133,6 +138,8 @@ impl Server {
} }
} }
} }
event!(Level::INFO, "Done.");
Ok(())
} }
// Check that the media description is something we can handle, returning the selected format // Check that the media description is something we can handle, returning the selected format
@ -317,8 +324,22 @@ impl Server {
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init(); tracing_subscriber::fmt::init();
let discord_token = env::var("DISCORD_TOKEN")?; let discord_token = env::var("DISCORD_TOKEN")?;
tokio::spawn(async move { let (shutdown_tx, shutdown_rx_1) = broadcast::channel(1);
discord::run_discord(&discord_token).await; let (done_tx, mut done_rx) = mpsc::channel(1);
let done_2 = done_tx.clone();
tokio::spawn( async move {
discord::run_discord(&discord_token, shutdown_rx_1, done_tx.clone()).await;
}); });
Server::run_sip().await let sd_2 = shutdown_tx.subscribe();
tokio::spawn(async move {
Server::run_sip(sd_2, done_2).await;
});
signal::ctrl_c().await?;
event!(Level::INFO, "Got interrupt, shutting down");
shutdown_tx.send(())?;
let _ = done_rx.recv().await;
event!(Level::INFO, "Shut down. Goodbye!");
Ok(())
} }