Move codecs into own module

This commit is contained in:
Sam W 2022-07-16 22:09:02 +01:00
parent ffcb0a86b2
commit d423f5f650
2 changed files with 65 additions and 62 deletions

62
src/codecs.rs Normal file
View File

@ -0,0 +1,62 @@
use bytes::{Bytes, BytesMut};
use rsip::message::SipMessage;
use rtp::packet::Packet as RtpPacket;
use tokio_util::codec::{Decoder, Encoder};
use tracing::{event, instrument, Level};
use webrtc_util::marshal::Unmarshal;
pub struct SipCodec {}
impl Decoder for SipCodec {
type Item = SipMessage;
type Error = std::io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.is_empty() {
Ok(None)
} else {
// We're assuming we get an entire sip message at once here.
match SipMessage::try_from(src.as_ref()) {
Ok(msg) => {
src.clear();
Ok(Some(msg))
}
Err(e) => {
src.clear(); // Still clear the buf to be ready for the next packet
event!(Level::ERROR, error = %e, "Error decoding SIP message.");
Ok(None)
}
}
}
}
}
impl Encoder<SipMessage> for SipCodec {
type Error = std::io::Error;
#[instrument(level = "debug", skip(self, item, dst))]
fn encode(&mut self, item: SipMessage, dst: &mut BytesMut) -> Result<(), Self::Error> {
let stuff: Bytes = item.into();
dst.reserve(stuff.len());
dst.extend_from_slice(&stuff);
Ok(())
}
}
pub struct RtpCodec {}
impl Decoder for RtpCodec {
type Item = RtpPacket;
type Error = std::io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.is_empty() {
Ok(None)
} else {
match RtpPacket::unmarshal(src) {
Ok(p) => Ok(Some(p)),
Err(e) => Err(Self::Error::new(std::io::ErrorKind::Other, e.to_string())),
}
}
}
}

View File

@ -1,7 +1,4 @@
use bytes::{Bytes, BytesMut};
use futures::{sink::Sink, SinkExt};
use rtp::packet::Packet as RtpPacket;
use webrtc_util::marshal::Unmarshal;
use rsip::common::method::Method as SipMethod;
use rsip::headers::header::Header as SipHeader;
use rsip::message::{request::Request, response::Response, SipMessage};
@ -17,73 +14,17 @@ use tokio::sync::mpsc;
use tokio::time::sleep;
use tokio_stream::StreamExt;
use tokio_util::{
codec::{Decoder, Encoder},
sync::{PollSendError, PollSender},
udp::UdpFramed,
};
use tracing::{event, instrument, Level};
mod codecs;
use codecs::{SipCodec, RtpCodec};
const SIP_PORT: u16 = 5060;
const BIND_ADDR: &str = "0.0.0.0"; // for now
struct SipCodec {}
impl Decoder for SipCodec {
type Item = SipMessage;
type Error = std::io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.is_empty() {
Ok(None)
} else {
// We're assuming we get an entire sip message at once here.
match SipMessage::try_from(src.as_ref()) {
Ok(msg) => {
src.clear();
Ok(Some(msg))
}
Err(e) => {
src.clear(); // Still clear the buf to be ready for the next packet
event!(Level::ERROR, error = %e, "Error decoding SIP message.");
Ok(None)
}
}
}
}
}
impl Encoder<SipMessage> for SipCodec {
type Error = std::io::Error;
#[instrument(
level="debug",
skip(self, item, dst)
)]
fn encode(&mut self, item: SipMessage, dst: &mut BytesMut) -> Result<(), Self::Error> {
let stuff: Bytes = item.into();
dst.reserve(stuff.len());
dst.extend_from_slice(&stuff);
Ok(())
}
}
struct RtpCodec{}
impl Decoder for RtpCodec {
type Item = RtpPacket;
type Error = std::io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.is_empty() {
Ok(None)
} else {
match RtpPacket::unmarshal(src) {
Ok(p) =>Ok(Some(p)),
Err(e) => Err(Self::Error::new(std::io::ErrorKind::Other, e.to_string())),
}
}
}
}
type StdErr<T> = Result<T, Box<dyn std::error::Error>>;