Skip to content
This repository has been archived by the owner on Jul 21, 2024. It is now read-only.

Commit

Permalink
update add record audio
Browse files Browse the repository at this point in the history
  • Loading branch information
thedtvn committed Dec 16, 2023
1 parent 60b37b1 commit 51cfac6
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 10 deletions.
37 changes: 35 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,20 @@ serde_json = "*"
json_comments = "*"
lazy_static = "1.4.0"
serde = { version = "1.0", features = ["derive"] }
songbird = { version = "0.4.0", features = ["driver", "gateway", "rustls"], default_features = false}
songbird = { version = "0.4.0", features = ["driver", "gateway", "rustls", "receive"], default_features = false}
async-trait = "0.1.74"
axum = {version = "0.7", features=["ws"]}
reqwest = "0.11"
sysinfo = "0.29.10"
openssl = { version = "0.10", features = ["vendored"]}
rustube = {git = "https://github.com/thedtvn/rustube" , rev = "372c5aeddb4403032f69ac9dbe621a4f61a798af"}
dashmap = "5.5.3"
chrono = "0.4.31"
audiopus = "0.2.0"
base64 = "0.21.5"
hound = "3.5.1"


[dependencies.symphonia]
version = "0.5.3"
features = ["pcm", "ogg"]
features = ["pcm", "ogg"]
128 changes: 122 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
#![allow(dead_code)]
mod ffmpeg_support;
mod youtube_supprt;
mod task_support;

use core::f32;
use std::{net::SocketAddr, num::NonZeroU64};
use std::{net::SocketAddr, num::NonZeroU64, sync::atomic::AtomicBool, io::Cursor};
use ffmpeg_support::get_input;
use futures_util::{SinkExt, StreamExt};
use serde_json::{json, Value};
Expand All @@ -19,12 +20,16 @@ use axum::{
middleware::{Next, from_fn},
};
use async_trait::async_trait;
use songbird::{Driver, Config, ConnectionInfo, EventContext, id::{GuildId, UserId, ChannelId}, Event, EventHandler, tracks::TrackHandle};
use songbird::{Driver, Config, ConnectionInfo, EventContext, id::{GuildId, ChannelId}, Event, EventHandler, tracks::TrackHandle, model::{payload::Speaking, id::UserId}, CoreEvent, driver::DecodeMode};
use serde::{Deserialize, Serialize};
use songbird::id::UserId as RawUserId;
use json_comments::StripComments;
use lazy_static::lazy_static;
use youtube_supprt::youtube_modun;
use tokio::sync::Mutex;
use dashmap::DashMap;
use chrono::{DateTime, Utc};
use base64::Engine as _;

lazy_static! {
static ref ROOT_CONFIG: ConfigFile = {
Expand Down Expand Up @@ -123,6 +128,25 @@ async fn handler_ws(ws: WebSocketUpgrade) -> Response {
ws.on_upgrade(accept_connection)
}

fn to_wav(pcm_samples: &[i16], buffer: &mut Vec<u8>) -> Result<(), hound::Error> {
let spec = hound::WavSpec {
channels: 2,
sample_rate: 48000,
bits_per_sample: 16,
sample_format: hound::SampleFormat::Int,
};

let cursor = Cursor::new(buffer);

let mut writer = hound::WavWriter::new(cursor, spec)?;

for &sample in pcm_samples {
writer.write_sample(sample)?;
}

Ok(())
}

struct Callback {
ws: UnboundedSender<Message>,
data: Value,
Expand Down Expand Up @@ -150,6 +174,96 @@ impl EventHandler for Callback {
}



#[derive(Clone, Debug)]
struct Snippet {
date: DateTime<Utc>,
mapping: Option<UserInfo>,
}

#[derive(PartialEq, Copy, Clone, Debug)]
struct UserInfo {
user_id: u64
}

struct InnerReceiver {
last_tick_was_empty: AtomicBool,
known_ssrcs: DashMap<u32, UserId>,
}

#[derive(Clone)]
struct CallbackR {
ws: UnboundedSender<Message>,
pub accumulator: DashMap<u32, Snippet>
}


impl CallbackR {
pub fn new(ws: UnboundedSender<Message>) -> Self {
// You can manage state here, such as a buffer of audio packet bytes so
// you can later store them in intervals.
Self {
ws,
accumulator: DashMap::default()
}
}
}

#[async_trait]
impl EventHandler for CallbackR {
#[allow(unused_variables)]
async fn act(&self, ctx: &EventContext<'_>) -> Option<Event> {
use EventContext as Ctx;
match ctx {
Ctx::SpeakingStateUpdate(Speaking {
speaking,
ssrc,
user_id,
..
}) => {

if user_id.is_none() {
return None;
}
let jdata = json!({
"t": "SSRC_UPDATE",
"d": {
"ssrc": ssrc,
"user": user_id.unwrap().0
}
});
self.ws.send(Message::Text(jdata.to_string())).unwrap();

},
Ctx::VoiceTick(packet) => {
for i in &packet.speaking {
let data_out = &i.1.decoded_voice;
let old_data: Option<dashmap::mapref::one::RefMut<'_, u32, Snippet>> = self.accumulator.get_mut(i.0);
if data_out.is_some() {
let data = data_out.as_ref().unwrap();
let mut data_u8 = Vec::new();
to_wav(data, &mut data_u8).unwrap();
let b64_data = base64::engine::general_purpose::URL_SAFE.encode(data_u8);
let jdata = json!({
"t": "VOICE_PACKET",
"d": {
"ssrc": i.0,
"data": b64_data
}
});
self.ws.send(Message::Text(jdata.to_string())).unwrap();
}
}
},
_ => {
// We won't be registering this struct for any more event classes.
unimplemented!()
},
}
None
}
}

async fn accept_connection(ws_stream: WebSocket) {
let (mut write, mut read) = ws_stream.split();
let (send_s, mut send_r) = tokio::sync::mpsc::unbounded_channel();
Expand All @@ -167,17 +281,19 @@ async fn accept_connection(ws_stream: WebSocket) {
let mut user_id = 1;
let mut session_id = "".to_string();
let mut channel_id = 1;
let mut dr = Driver::new(Config::default());
let mut dr = Driver::new(Config::default().decode_mode(DecodeMode::Decode));
let jdata = json!({
"t": "STOP"
});
let jdata_err = json!({
"t": "STOP_ERROR"
});
let mut controler: Option<TrackHandle> = None;
dr.add_global_event(Event::Track(songbird::TrackEvent::End), Callback {ws: send_s.clone(), data: jdata, data_err: jdata_err});

let evt_receiver = CallbackR::new(send_s.clone());

dr.add_global_event(Event::Track(songbird::TrackEvent::End), Callback {ws: send_s.clone(), data: jdata.clone(), data_err: jdata_err.clone()});
dr.add_global_event(CoreEvent::SpeakingStateUpdate.into(), evt_receiver.clone());
dr.add_global_event(CoreEvent::VoiceTick.into(), evt_receiver.clone());
let mut volume = 100;
while let Some(msg) = read.next().await {
if msg.is_err() {
Expand Down Expand Up @@ -221,7 +337,7 @@ async fn accept_connection(ws_stream: WebSocket) {
let guild_id = msg.get("guild_id").unwrap().as_str().unwrap().to_string().parse::<u64>().unwrap();
let endpoint = msg.get("endpoint").unwrap().as_str().unwrap();
dr.leave();
dr.connect(ConnectionInfo {channel_id: Some(ChannelId(NonZeroU64::new(channel_id).unwrap())), endpoint: endpoint.to_string(), guild_id: GuildId(NonZeroU64::new(guild_id).unwrap()), session_id: session_id.clone(), token, user_id: UserId(NonZeroU64::new(user_id).unwrap())}).await.unwrap();
dr.connect(ConnectionInfo {channel_id: Some(ChannelId(NonZeroU64::new(channel_id).unwrap())), endpoint: endpoint.to_string(), guild_id: GuildId(NonZeroU64::new(guild_id).unwrap()), session_id: session_id.clone(), token, user_id: RawUserId(NonZeroU64::new(user_id).unwrap())}).await.unwrap();
} else if data_out == "PLAY" {
let dataout = data["url"].as_str().unwrap().to_string();
let stop_op = controler.as_mut();
Expand Down

0 comments on commit 51cfac6

Please sign in to comment.