Compare commits

..

2 Commits

Author SHA1 Message Date
Sam W c80a99310a Add discord bot framework 2022-07-17 02:23:09 +01:00
Sam W d423f5f650 Move codecs into own module 2022-07-16 22:09:02 +01:00
6 changed files with 1815 additions and 92 deletions

1688
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -6,12 +6,17 @@ edition = "2021"
[dependencies]
bytes = "1.1.0"
futures = "0.3.21"
rand = "0.8.5"
rsip = "0.4.0"
rtp = "0.6.5"
sdp-rs = "0.2.1"
songbird = { version = "0.2.2", features = ["builtin-queue"]}
tokio = { version = "1.19.2", features = ["full"] }
tokio-stream = "0.1.9"
tokio-util = { version = "0.7.3", features = ["net", "codec"] }
tracing = "0.1.35"
tracing-subscriber = "0.3.14"
twilight-gateway = "0.11.1"
twilight-http = "0.11.1"
twilight-model = "0.11.3"
webrtc-util = "0.5.4"

View File

@ -31,6 +31,7 @@
packages.default = naersk-lib.buildPackage {
pname = "discosip";
root = ./.;
buildInputs = with pkgs; [libopus pkgconfig];
};
apps.default = utils.lib.mkApp {drv = packages.default;};
@ -43,11 +44,22 @@
};
in
pkgs.devshell.mkShell {
packages = with pkgs; [
devshell.packages = with pkgs; [
(rust.override {extensions = ["rls"];})
ffmpeg
(callPackage ./pjsip {inherit (darwin.apple_sdk.frameworks) AppKit;})
opusTools
libopus
pkgconfig
];
# Devshell doesn't do any of the automagic that pkgs.mkshell (thanks to
# mkderivation) does. So we need to manually tell pkg-config where to find
# libopus so that we can `cargo build` in our devshell.
env = [
{
name = "PKG_CONFIG_PATH";
value = "${pkgs.libopus.dev}/lib/pkgconfig";
}
];
};
formatter = pkgs.alejandra;

62
src/codecs.rs Normal file
View File

@ -0,0 +1,62 @@
use bytes::{Bytes, BytesMut};
use rsip::message::SipMessage;
use rtp::packet::Packet as RtpPacket;
use tokio_util::codec::{Decoder, Encoder};
use tracing::{event, instrument, Level};
use webrtc_util::marshal::Unmarshal;
pub struct SipCodec {}
impl Decoder for SipCodec {
type Item = SipMessage;
type Error = std::io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.is_empty() {
Ok(None)
} else {
// We're assuming we get an entire sip message at once here.
match SipMessage::try_from(src.as_ref()) {
Ok(msg) => {
src.clear();
Ok(Some(msg))
}
Err(e) => {
src.clear(); // Still clear the buf to be ready for the next packet
event!(Level::ERROR, error = %e, "Error decoding SIP message.");
Ok(None)
}
}
}
}
}
impl Encoder<SipMessage> for SipCodec {
type Error = std::io::Error;
#[instrument(level = "debug", skip(self, item, dst))]
fn encode(&mut self, item: SipMessage, dst: &mut BytesMut) -> Result<(), Self::Error> {
let stuff: Bytes = item.into();
dst.reserve(stuff.len());
dst.extend_from_slice(&stuff);
Ok(())
}
}
pub struct RtpCodec {}
impl Decoder for RtpCodec {
type Item = RtpPacket;
type Error = std::io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.is_empty() {
Ok(None)
} else {
match RtpPacket::unmarshal(src) {
Ok(p) => Ok(Some(p)),
Err(e) => Err(Self::Error::new(std::io::ErrorKind::Other, e.to_string())),
}
}
}
}

65
src/discord.rs Normal file
View File

@ -0,0 +1,65 @@
use crate::StdErr;
use futures::stream::StreamExt;
use rand::seq::SliceRandom;
use rand::{prelude::ThreadRng, thread_rng};
use std::cell::RefCell;
use std::sync::Arc;
use tracing::{event, instrument, Level};
use twilight_gateway::{Cluster, Event};
use twilight_http::Client;
use twilight_model::{
channel::message::Message,
gateway::Intents,
id::{marker::UserMarker, Id},
};
thread_local!(static RNG: RefCell<ThreadRng> = RefCell::new(thread_rng()));
const GREETINGS: &'static [&'static str] =
&["You rung?", "Hello there", "Right back atcha", "Yes?"];
pub async fn run_discord(token: &str) -> StdErr<()> {
event!(Level::INFO, "Starting...");
let (cluster, mut events) = Cluster::new(
token.to_owned(),
Intents::GUILD_MESSAGES | Intents::MESSAGE_CONTENT,
)
.await?;
tokio::spawn(async move {
cluster.up().await;
});
let client = Arc::new(Client::new(token.to_owned()));
while let Some((shard_id, ev)) = events.next().await {
let client = client.clone();
tokio::spawn(async move {
if let Err(e) = handle_event(ev, shard_id, client).await {
event!(Level::ERROR, err=?e, "Error handling discord event");
}
});
}
Ok(())
}
fn mentions(msg: &Message, us: Id<UserMarker>) -> bool {
msg.mentions.iter().filter(|m| m.id == us).next().is_some()
}
#[instrument(skip(client))]
async fn handle_event(ev: Event, shard_id: u64, client: Arc<Client>) -> StdErr<()> {
let me = client.current_user().exec().await?.model().await?;
match ev {
Event::MessageCreate(msg) => {
if mentions(&msg.0, me.id) {
let greet = RNG.with(|rng| GREETINGS.choose(&mut *rng.borrow_mut()).unwrap());
client
.create_message(msg.channel_id)
.content(greet)?
.exec()
.await?;
}
}
Event::ShardConnected(_) => event!(Level::INFO, "Shard connected"),
_ => {}
}
Ok(())
}

View File

@ -1,7 +1,4 @@
use bytes::{Bytes, BytesMut};
use futures::{sink::Sink, SinkExt};
use rtp::packet::Packet as RtpPacket;
use webrtc_util::marshal::Unmarshal;
use rsip::common::method::Method as SipMethod;
use rsip::headers::header::Header as SipHeader;
use rsip::message::{request::Request, response::Response, SipMessage};
@ -10,6 +7,7 @@ use sdp_rs::lines::{attribute::Rtpmap, Attribute, Media};
use sdp_rs::{MediaDescription, SessionDescription};
use std::net::SocketAddr;
use std::str;
use std::env;
use std::net::IpAddr;
use std::time::Duration;
use tokio::net::UdpSocket;
@ -17,73 +15,20 @@ use tokio::sync::mpsc;
use tokio::time::sleep;
use tokio_stream::StreamExt;
use tokio_util::{
codec::{Decoder, Encoder},
sync::{PollSendError, PollSender},
udp::UdpFramed,
};
use tracing::{event, instrument, Level};
use songbird::driver::Driver;
mod codecs;
use codecs::{SipCodec, RtpCodec};
mod discord;
const SIP_PORT: u16 = 5060;
const BIND_ADDR: &str = "0.0.0.0"; // for now
struct SipCodec {}
impl Decoder for SipCodec {
type Item = SipMessage;
type Error = std::io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.is_empty() {
Ok(None)
} else {
// We're assuming we get an entire sip message at once here.
match SipMessage::try_from(src.as_ref()) {
Ok(msg) => {
src.clear();
Ok(Some(msg))
}
Err(e) => {
src.clear(); // Still clear the buf to be ready for the next packet
event!(Level::ERROR, error = %e, "Error decoding SIP message.");
Ok(None)
}
}
}
}
}
impl Encoder<SipMessage> for SipCodec {
type Error = std::io::Error;
#[instrument(
level="debug",
skip(self, item, dst)
)]
fn encode(&mut self, item: SipMessage, dst: &mut BytesMut) -> Result<(), Self::Error> {
let stuff: Bytes = item.into();
dst.reserve(stuff.len());
dst.extend_from_slice(&stuff);
Ok(())
}
}
struct RtpCodec{}
impl Decoder for RtpCodec {
type Item = RtpPacket;
type Error = std::io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.is_empty() {
Ok(None)
} else {
match RtpPacket::unmarshal(src) {
Ok(p) =>Ok(Some(p)),
Err(e) => Err(Self::Error::new(std::io::ErrorKind::Other, e.to_string())),
}
}
}
}
type StdErr<T> = Result<T, Box<dyn std::error::Error>>;
@ -371,5 +316,9 @@ impl Server {
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
let discord_token = env::var("DISCORD_TOKEN")?;
tokio::spawn(async move {
discord::run_discord(&discord_token).await;
});
Server::run_sip().await
}