diff --git a/Cargo.lock b/Cargo.lock index 3151fab..90dcf2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -84,13 +84,32 @@ dependencies = [ "syn 2.0.39", ] +[[package]] +name = "audiopus" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3743519567e9135cf6f9f1a509851cb0c8e4cb9d66feb286668afb1923bec458" +dependencies = [ + "audiopus_sys 0.1.8", +] + [[package]] name = "audiopus" version = "0.3.0-rc.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab55eb0e56d7c6de3d59f544e5db122d7725ec33be6a276ee8241f3be6473955" dependencies = [ - "audiopus_sys", + "audiopus_sys 0.2.2", +] + +[[package]] +name = "audiopus_sys" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "927791de46f70facea982dbfaf19719a41ce6064443403be631a85de6a58fff9" +dependencies = [ + "log", + "pkg-config", ] [[package]] @@ -261,8 +280,10 @@ checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" dependencies = [ "android-tzdata", "iana-time-zone", + "js-sys", "num-traits", "serde", + "wasm-bindgen", "windows-targets 0.48.5", ] @@ -837,6 +858,12 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" +[[package]] +name = "hound" +version = "3.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62adaabb884c94955b19907d60019f4e145d091c75345379e70d1ee696f7854f" + [[package]] name = "http" version = "0.2.11" @@ -2281,8 +2308,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b905d2cdd4becf0e643a4aa5491cdfe8f193d5676b5bae7c0460e0e3c6358d63" dependencies = [ "async-trait", - "audiopus", + "audiopus 0.3.0-rc.0", "byteorder", + "bytes", "crypto_secretbox", "dashmap", "derivative", @@ -2323,8 +2351,13 @@ name = "songbird_node" version = "0.0.3" dependencies = [ "async-trait", + "audiopus 0.2.0", "axum", + "base64 0.21.5", + "chrono", + "dashmap", "futures-util", + "hound", "json_comments", "lazy_static", "openssl", diff --git a/Cargo.toml b/Cargo.toml index 86a9c40..734792e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,15 +13,20 @@ serde_json = "*" json_comments = "*" lazy_static = "1.4.0" serde = { version = "1.0", features = ["derive"] } -songbird = { version = "0.4.0", features = ["driver", "gateway", "rustls"], default_features = false} +songbird = { version = "0.4.0", features = ["driver", "gateway", "rustls", "receive"], default_features = false} async-trait = "0.1.74" axum = {version = "0.7", features=["ws"]} reqwest = "0.11" sysinfo = "0.29.10" openssl = { version = "0.10", features = ["vendored"]} rustube = {git = "https://github.com/thedtvn/rustube" , rev = "372c5aeddb4403032f69ac9dbe621a4f61a798af"} +dashmap = "5.5.3" +chrono = "0.4.31" +audiopus = "0.2.0" +base64 = "0.21.5" +hound = "3.5.1" [dependencies.symphonia] version = "0.5.3" -features = ["pcm", "ogg"] \ No newline at end of file +features = ["pcm", "ogg"] diff --git a/src/main.rs b/src/main.rs index d4a8ac5..43b75cb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,10 @@ +#![allow(dead_code)] mod ffmpeg_support; mod youtube_supprt; mod task_support; use core::f32; -use std::{net::SocketAddr, num::NonZeroU64}; +use std::{net::SocketAddr, num::NonZeroU64, sync::atomic::AtomicBool, io::Cursor}; use ffmpeg_support::get_input; use futures_util::{SinkExt, StreamExt}; use serde_json::{json, Value}; @@ -19,12 +20,16 @@ use axum::{ middleware::{Next, from_fn}, }; use async_trait::async_trait; -use songbird::{Driver, Config, ConnectionInfo, EventContext, id::{GuildId, UserId, ChannelId}, Event, EventHandler, tracks::TrackHandle}; +use songbird::{Driver, Config, ConnectionInfo, EventContext, id::{GuildId, ChannelId}, Event, EventHandler, tracks::TrackHandle, model::{payload::Speaking, id::UserId}, CoreEvent, driver::DecodeMode}; use serde::{Deserialize, Serialize}; +use songbird::id::UserId as RawUserId; use json_comments::StripComments; use lazy_static::lazy_static; use youtube_supprt::youtube_modun; use tokio::sync::Mutex; +use dashmap::DashMap; +use chrono::{DateTime, Utc}; +use base64::Engine as _; lazy_static! { static ref ROOT_CONFIG: ConfigFile = { @@ -123,6 +128,25 @@ async fn handler_ws(ws: WebSocketUpgrade) -> Response { ws.on_upgrade(accept_connection) } +fn to_wav(pcm_samples: &[i16], buffer: &mut Vec) -> Result<(), hound::Error> { + let spec = hound::WavSpec { + channels: 2, + sample_rate: 48000, + bits_per_sample: 16, + sample_format: hound::SampleFormat::Int, + }; + + let cursor = Cursor::new(buffer); + + let mut writer = hound::WavWriter::new(cursor, spec)?; + + for &sample in pcm_samples { + writer.write_sample(sample)?; + } + + Ok(()) +} + struct Callback { ws: UnboundedSender, data: Value, @@ -150,6 +174,96 @@ impl EventHandler for Callback { } + +#[derive(Clone, Debug)] +struct Snippet { + date: DateTime, + mapping: Option, +} + +#[derive(PartialEq, Copy, Clone, Debug)] +struct UserInfo { + user_id: u64 +} + +struct InnerReceiver { + last_tick_was_empty: AtomicBool, + known_ssrcs: DashMap, +} + +#[derive(Clone)] +struct CallbackR { + ws: UnboundedSender, + pub accumulator: DashMap +} + + +impl CallbackR { + pub fn new(ws: UnboundedSender) -> Self { + // You can manage state here, such as a buffer of audio packet bytes so + // you can later store them in intervals. + Self { + ws, + accumulator: DashMap::default() + } + } +} + +#[async_trait] +impl EventHandler for CallbackR { + #[allow(unused_variables)] + async fn act(&self, ctx: &EventContext<'_>) -> Option { + use EventContext as Ctx; + match ctx { + Ctx::SpeakingStateUpdate(Speaking { + speaking, + ssrc, + user_id, + .. + }) => { + + if user_id.is_none() { + return None; + } + let jdata = json!({ + "t": "SSRC_UPDATE", + "d": { + "ssrc": ssrc, + "user": user_id.unwrap().0 + } + }); + self.ws.send(Message::Text(jdata.to_string())).unwrap(); + + }, + Ctx::VoiceTick(packet) => { + for i in &packet.speaking { + let data_out = &i.1.decoded_voice; + let old_data: Option> = self.accumulator.get_mut(i.0); + if data_out.is_some() { + let data = data_out.as_ref().unwrap(); + let mut data_u8 = Vec::new(); + to_wav(data, &mut data_u8).unwrap(); + let b64_data = base64::engine::general_purpose::URL_SAFE.encode(data_u8); + let jdata = json!({ + "t": "VOICE_PACKET", + "d": { + "ssrc": i.0, + "data": b64_data + } + }); + self.ws.send(Message::Text(jdata.to_string())).unwrap(); + } + } + }, + _ => { + // We won't be registering this struct for any more event classes. + unimplemented!() + }, + } + None + } +} + async fn accept_connection(ws_stream: WebSocket) { let (mut write, mut read) = ws_stream.split(); let (send_s, mut send_r) = tokio::sync::mpsc::unbounded_channel(); @@ -167,7 +281,7 @@ async fn accept_connection(ws_stream: WebSocket) { let mut user_id = 1; let mut session_id = "".to_string(); let mut channel_id = 1; - let mut dr = Driver::new(Config::default()); + let mut dr = Driver::new(Config::default().decode_mode(DecodeMode::Decode)); let jdata = json!({ "t": "STOP" }); @@ -175,9 +289,11 @@ async fn accept_connection(ws_stream: WebSocket) { "t": "STOP_ERROR" }); let mut controler: Option = None; - dr.add_global_event(Event::Track(songbird::TrackEvent::End), Callback {ws: send_s.clone(), data: jdata, data_err: jdata_err}); - + let evt_receiver = CallbackR::new(send_s.clone()); + dr.add_global_event(Event::Track(songbird::TrackEvent::End), Callback {ws: send_s.clone(), data: jdata.clone(), data_err: jdata_err.clone()}); + dr.add_global_event(CoreEvent::SpeakingStateUpdate.into(), evt_receiver.clone()); + dr.add_global_event(CoreEvent::VoiceTick.into(), evt_receiver.clone()); let mut volume = 100; while let Some(msg) = read.next().await { if msg.is_err() { @@ -221,7 +337,7 @@ async fn accept_connection(ws_stream: WebSocket) { let guild_id = msg.get("guild_id").unwrap().as_str().unwrap().to_string().parse::().unwrap(); let endpoint = msg.get("endpoint").unwrap().as_str().unwrap(); dr.leave(); - dr.connect(ConnectionInfo {channel_id: Some(ChannelId(NonZeroU64::new(channel_id).unwrap())), endpoint: endpoint.to_string(), guild_id: GuildId(NonZeroU64::new(guild_id).unwrap()), session_id: session_id.clone(), token, user_id: UserId(NonZeroU64::new(user_id).unwrap())}).await.unwrap(); + dr.connect(ConnectionInfo {channel_id: Some(ChannelId(NonZeroU64::new(channel_id).unwrap())), endpoint: endpoint.to_string(), guild_id: GuildId(NonZeroU64::new(guild_id).unwrap()), session_id: session_id.clone(), token, user_id: RawUserId(NonZeroU64::new(user_id).unwrap())}).await.unwrap(); } else if data_out == "PLAY" { let dataout = data["url"].as_str().unwrap().to_string(); let stop_op = controler.as_mut();