From d423f5f65075807ea07a72e3d275cbb5529e51dc Mon Sep 17 00:00:00 2001 From: Sam Willcocks Date: Sat, 16 Jul 2022 22:09:02 +0100 Subject: [PATCH] Move codecs into own module --- src/codecs.rs | 62 ++++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 65 +++------------------------------------------------ 2 files changed, 65 insertions(+), 62 deletions(-) create mode 100644 src/codecs.rs diff --git a/src/codecs.rs b/src/codecs.rs new file mode 100644 index 0000000..d917a29 --- /dev/null +++ b/src/codecs.rs @@ -0,0 +1,62 @@ +use bytes::{Bytes, BytesMut}; +use rsip::message::SipMessage; +use rtp::packet::Packet as RtpPacket; +use tokio_util::codec::{Decoder, Encoder}; +use tracing::{event, instrument, Level}; +use webrtc_util::marshal::Unmarshal; + +pub struct SipCodec {} + +impl Decoder for SipCodec { + type Item = SipMessage; + type Error = std::io::Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + if src.is_empty() { + Ok(None) + } else { + // We're assuming we get an entire sip message at once here. + match SipMessage::try_from(src.as_ref()) { + Ok(msg) => { + src.clear(); + Ok(Some(msg)) + } + Err(e) => { + src.clear(); // Still clear the buf to be ready for the next packet + event!(Level::ERROR, error = %e, "Error decoding SIP message."); + Ok(None) + } + } + } + } +} + +impl Encoder for SipCodec { + type Error = std::io::Error; + + #[instrument(level = "debug", skip(self, item, dst))] + fn encode(&mut self, item: SipMessage, dst: &mut BytesMut) -> Result<(), Self::Error> { + let stuff: Bytes = item.into(); + dst.reserve(stuff.len()); + dst.extend_from_slice(&stuff); + Ok(()) + } +} + +pub struct RtpCodec {} + +impl Decoder for RtpCodec { + type Item = RtpPacket; + type Error = std::io::Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + if src.is_empty() { + Ok(None) + } else { + match RtpPacket::unmarshal(src) { + Ok(p) => Ok(Some(p)), + Err(e) => Err(Self::Error::new(std::io::ErrorKind::Other, e.to_string())), + } + } + } +} diff --git a/src/main.rs b/src/main.rs index eeb981c..5e82e97 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,4 @@ -use bytes::{Bytes, BytesMut}; use futures::{sink::Sink, SinkExt}; -use rtp::packet::Packet as RtpPacket; -use webrtc_util::marshal::Unmarshal; use rsip::common::method::Method as SipMethod; use rsip::headers::header::Header as SipHeader; use rsip::message::{request::Request, response::Response, SipMessage}; @@ -17,73 +14,17 @@ use tokio::sync::mpsc; use tokio::time::sleep; use tokio_stream::StreamExt; use tokio_util::{ - codec::{Decoder, Encoder}, sync::{PollSendError, PollSender}, udp::UdpFramed, }; use tracing::{event, instrument, Level}; +mod codecs; +use codecs::{SipCodec, RtpCodec}; + const SIP_PORT: u16 = 5060; const BIND_ADDR: &str = "0.0.0.0"; // for now -struct SipCodec {} - -impl Decoder for SipCodec { - type Item = SipMessage; - type Error = std::io::Error; - - fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - if src.is_empty() { - Ok(None) - } else { - // We're assuming we get an entire sip message at once here. - match SipMessage::try_from(src.as_ref()) { - Ok(msg) => { - src.clear(); - Ok(Some(msg)) - } - Err(e) => { - src.clear(); // Still clear the buf to be ready for the next packet - event!(Level::ERROR, error = %e, "Error decoding SIP message."); - Ok(None) - } - } - } - } -} - -impl Encoder for SipCodec { - type Error = std::io::Error; - - #[instrument( - level="debug", - skip(self, item, dst) - )] - fn encode(&mut self, item: SipMessage, dst: &mut BytesMut) -> Result<(), Self::Error> { - let stuff: Bytes = item.into(); - dst.reserve(stuff.len()); - dst.extend_from_slice(&stuff); - Ok(()) - } -} - -struct RtpCodec{} - -impl Decoder for RtpCodec { - type Item = RtpPacket; - type Error = std::io::Error; - - fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - if src.is_empty() { - Ok(None) - } else { - match RtpPacket::unmarshal(src) { - Ok(p) =>Ok(Some(p)), - Err(e) => Err(Self::Error::new(std::io::ErrorKind::Other, e.to_string())), - } - } - } -} type StdErr = Result>;