diff --git a/.env.example b/.env.example index 047e2d7..caf7fd2 100644 --- a/.env.example +++ b/.env.example @@ -1,5 +1,7 @@ OPENAI_API_KEY=your_api_key LANGSMITH_API_KEY=your_langsmith_api_key #Find it here: https://smith.langchain.com +PORT=3000 +NARAKEET_API_KEY=your_narakeet_api_key #FLASK_ENV=development #Optional if you want docker to reload flask when you save your code. #LANGSMITH_API_KEY=your_api_key #optional. Let's you debug using langsmith #LANGCHAIN_PROJECT=your_project_name #pops up in langsmith dashboard @@ -7,4 +9,4 @@ LANGSMITH_API_KEY=your_langsmith_api_key #Find it here: https://smith.langchain. #NARAKEET_API_KEY=your_api_key #Needed for the AI to generate videos. The tool will be disabled if there's no api key #PERPLEXITY_API_KEY=your_api_key #Needed for the AI to generate videos. The tool will be disabled if there's no api key #GOOGLE_AUTH_KEY=your_google_auth_key #Needed for the AI to access your google calendar. The tool will be disabled if there's no api key -#GOOGLE_CALENDAR_ID=your_google_calendar_id #Needed for the AI to access your google calendar. The tool will be disabled if there's no api key \ No newline at end of file +#GOOGLE_CALENDAR_ID=your_google_calendar_id #Needed for the AI to access your google calendar. The tool will be disabled if there's no api key diff --git a/.gitignore b/.gitignore index e0890d0..81cae66 100644 --- a/.gitignore +++ b/.gitignore @@ -150,3 +150,7 @@ dmypy.json *.webm #calender json /core/tools/calendarjson + + +#Redis data +redis_data diff --git a/core/static/index.html b/core/static/index.html index b2e43a0..656951c 100644 --- a/core/static/index.html +++ b/core/static/index.html @@ -12,6 +12,7 @@ + diff --git a/core/static/tts.js b/core/static/tts.js new file mode 100644 index 0000000..9c73df6 --- /dev/null +++ b/core/static/tts.js @@ -0,0 +1,187 @@ +// TODO: Remove random debugging stuff +// Check if socket already exists, if not create it +const ttsSocket = (() => { + const config = { + websocketServer: 'http://localhost:5000' + }; + + return io(config.websocketServer, { + transports: ['websocket', 'polling'], + reconnection: true, + reconnectionAttempts: 5, + reconnectionDelay: 1000, + timeout: 10000, + autoConnect: true, + }); +})(); + +console.log('Attempting to connect to server...'); + +let audioContext; +let isProcessing = false; +let audioQueue = []; +let expectedSentenceId = 1; + +// Update all socket references to ttsSocket +ttsSocket.onAny((eventName, ...args) => { + console.log(`Received event: ${eventName}`, args); +}); + +ttsSocket.on('connecting', () => { + console.log('Attempting to connect...'); +}); + +ttsSocket.on('connect', () => { + console.log('Connected to remote WebSocket server:', ttsSocket.io.uri); + console.log('Connected to server with ID:', ttsSocket.id); + console.log('Transport type:', ttsSocket.io.engine.transport.name); +}); + +ttsSocket.on('connect_error', (error) => { + console.error('Connection error:', error); + console.log('Failed connecting to:', ttsSocket.io.uri); + console.log('Transport type:', ttsSocket.io.engine.transport.name); +}); + +ttsSocket.on('connect_timeout', () => { + console.error('Connection timeout'); +}); + +ttsSocket.on('reconnect_attempt', (attemptNumber) => { + console.log(`Reconnection attempt ${attemptNumber}`); +}); + +ttsSocket.on('disconnect', () => { + console.log('Disconnected from server'); +}); + +async function initAudioContext() { + audioContext = new (window.AudioContext || window.webkitAudioContext)(); +} + +async function processAudioChunk(audioData, sentenceId) { + try { + console.log(`Processing audio chunk for sentence ${sentenceId}`); + const arrayBuffer = new Uint8Array(audioData).buffer; + const audioBuffer = await audioContext.decodeAudioData(arrayBuffer); + + const source = audioContext.createBufferSource(); + source.buffer = audioBuffer; + source.connect(audioContext.destination); + + // Add an event listener for when the audio finishes playing + source.onended = () => { + console.log(`Finished playing sentence ${sentenceId}`); + expectedSentenceId++; + isProcessing = false; + processQueuedAudio(); // Process next chunk if available + }; + + source.start(); + isProcessing = true; + + } catch (error) { + console.error('Error processing audio chunk:', error); + isProcessing = false; + processQueuedAudio(); // Try next chunk on error + } +} + +function processQueuedAudio() { + if (isProcessing || audioQueue.length === 0) return; + + // Sort queue by sentence ID + audioQueue.sort((a, b) => a.sentenceId - b.sentenceId); + + // Process next chunk if it matches expected ID + const nextChunk = audioQueue[0]; + if (nextChunk.sentenceId === expectedSentenceId) { + audioQueue.shift(); // Remove from queue + processAudioChunk(nextChunk.audioData, nextChunk.sentenceId); + } +} + +// Socket.IO event handler +ttsSocket.on('audio_stream', async (data) => { + console.log('Received audio_stream event:', { + sentenceId: data.sentence_id, + dataLength: data.audio_data.length + }); + + if (!audioContext) { + console.log('Initializing audio context'); + await initAudioContext(); + } + + const audioData = new Uint8Array(data.audio_data); + const sentenceId = data.sentence_id; + + // Reset state if this is the start of a new generation + if (sentenceId === 1) { + console.log('New text generation - resetting client state'); + expectedSentenceId = 1; + audioQueue = []; + isProcessing = false; + } + + console.log(`Queueing audio chunk ${sentenceId}`); + + // Queue the audio chunk + audioQueue.push({ + audioData: audioData, + sentenceId: sentenceId, + timestamp: Date.now() + }); + + console.log(`Current queue length: ${audioQueue.length}`); + // Try to process queued audio + processQueuedAudio(); +}); + +// Initialize audio context on user interaction +document.addEventListener('click', async () => { + if (!audioContext) { + await initAudioContext(); + } + if (audioContext.state === 'suspended') { + await audioContext.resume(); + } +}); + +ttsSocket.on('test', (data) => { + console.log('Received test message:', data); +}); + +ttsSocket.on('connect', () => { + console.log('Connected to remote WebSocket server:', ttsSocket.io.uri); + console.log('Connected to server with ID:', ttsSocket.id); + console.log('Transport type:', ttsSocket.io.engine.transport.name); +}); + +ttsSocket.on('connect_error', (error) => { + console.error('Connection error:', error); + console.log('Failed connecting to:', ttsSocket.io.uri); + console.log('Transport type:', ttsSocket.io.engine.transport.name); +}); + +ttsSocket.on('connect_timeout', () => { + console.error('Connection timeout'); +}); + +ttsSocket.on('reconnect_attempt', (attemptNumber) => { + console.log(`Reconnection attempt ${attemptNumber}`); +}); + +ttsSocket.on('disconnect', () => { + console.log('Disconnected from server'); +}); + +ttsSocket.on('connect', () => { + console.log('Connected to remote WebSocket server:', ttsSocket.io.uri); + console.log('Transport type:', ttsSocket.io.engine.transport.name); +}); + +ttsSocket.on('connect_error', (error) => { + console.error('Connection error:', error); + console.log('Failed connecting to:', ttsSocket.io.uri); +}); \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index f705a3f..3e4ba0d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -44,6 +44,38 @@ services: stop_signal: SIGINT ports: - "3001:3001" + redis: + image: redis:latest + container_name: redis_audio + ports: + - "6379:6379" + command: > + redis-server + --appendonly yes + --save 60 1 + --save 300 100 + --save 900 1000 + --maxmemory 1000mb + volumes: + - ./redis_data:/data + tts: + build: + context: ./textToSpeech + dockerfile: Dockerfile + env_file: + - .env + ports: + - "5000:5000" + environment: + - REDIS_URL=redis://redis:6379 + - NARKEE_API_KEY=${NARKEE_API_KEY} + depends_on: + - redis + + + + + networks: backend: diff --git a/textToSpeech/Dockerfile b/textToSpeech/Dockerfile index bf89f0f..a590a13 100644 --- a/textToSpeech/Dockerfile +++ b/textToSpeech/Dockerfile @@ -1,25 +1,8 @@ -# Use an official Python runtime as a parent image -FROM python:3.9-slim - -# Set the working directory in the container +FROM python:3.12-alpine WORKDIR /app +COPY requirements.txt . +RUN pip install -r requirements.txt -# Copy the current directory contents into the container at /app -COPY . /app - -# Install system dependencies -RUN apt-get update && apt-get install -y \ - espeak-ng \ - libespeak-ng1 \ - ffmpeg \ - && rm -rf /var/lib/apt/lists/* - -# Install any needed packages specified in requirements.txt -RUN pip install --no-cache-dir -r requirements.txt - -# Make port 5000 available to the world outside this container -EXPOSE 5000 - -# Run ttssend.py when the container launches -CMD ["python", "tts_server.py"] +COPY app /app +CMD ["python", "/app/app.py"] diff --git a/textToSpeech/app/app.py b/textToSpeech/app/app.py new file mode 100644 index 0000000..640fe5b --- /dev/null +++ b/textToSpeech/app/app.py @@ -0,0 +1,203 @@ +from flask import Flask, render_template, request, jsonify +from flask_socketio import SocketIO, emit +from threading import Thread, Lock +from queue import Queue +import tts +import os +from typing import List, Tuple +import logging + +# Global counter for sentence IDs +sentence_counter: int = 0 +counter_lock = Lock() + +# Add these global variables after the existing ones +recent_audio_chunks: dict[int, list] = {} # Store recent chunks +MAX_STORED_CHUNKS: int = 100 # Limit how many chunks we store + + +def split_into_sentences(text: str) -> List[str]: + """Split text into sentences using basic string splitting""" + # Split on common sentence endings + raw_sentences: List[str] = [] + for part in text.replace('!', '.').replace('?', '.').split('.'): + cleaned = part.strip() + if cleaned: # Only add non-empty sentences + raw_sentences.append(cleaned) + return raw_sentences + + +def generator_thread(input_queue: Queue, output_queue: Queue, tts_engine: tts.TTS) -> None: + # TODO port this to asyncio + while True: + item = input_queue.get() + if item is None: + break + sentence_id, sentence = item + audio_data = tts_engine.tts(sentence) + output_queue.put((sentence_id, audio_data)) + + +def streamer_thread(input_queue: Queue) -> None: + expected_id: int = 1 + buffer: dict[int, list] = {} + + while True: + item = input_queue.get() + if item is None: + break + + sentence_id, audio_data = item + # print(f"Got audio chunk {sentence_id}") + + # Reset expected_id if we're starting a new text generation + if sentence_id == 1: + expected_id = 1 + buffer.clear() # Clear any old buffered chunks + print("New text generation - resetting counters") + + # If this is the chunk we're waiting for, emit it + if sentence_id == expected_id: + try: + audio_list = list(audio_data) + # print(f"Emitting chunk {sentence_id} to clients") + socketio.emit('audio_stream', { + 'audio_data': audio_list, + 'sentence_id': sentence_id + }, namespace='/') + # print(f"Emitted chunk {sentence_id}") + expected_id += 1 + + # Process buffered chunks + while expected_id in buffer: + buffered_audio = buffer.pop(expected_id) + socketio.emit('audio_stream', { + 'audio_data': buffered_audio, + 'sentence_id': expected_id + }, namespace='/') + expected_id += 1 + + except Exception as e: + print(f"Error emitting audio: {str(e)}") + import traceback + traceback.print_exc() + + # If this chunk is for a future sentence, buffer it + elif sentence_id > expected_id: + buffer[sentence_id] = list(audio_data) + else: + print(f"Dropping late chunk for sentence {sentence_id}") + + +app: Flask = Flask(__name__) +app.config['SECRET_KEY'] = 'lklsa01lkJASD9012o3khj123l' +socketio: SocketIO = SocketIO( + app, + cors_allowed_origins="*", + logger=False, # Set to True to log hella verbose + engineio_logger=False, # Same with this one + ping_timeout=10, + ping_interval=5, + async_mode='threading', + reconnection=True, + reconnection_attempts=5, + reconnection_delay=1000 +) +input_queue: Queue = Queue() +output_queue: Queue = Queue() + + +@app.route('/') +def index(): + return render_template('index.html') + + +@app.route('/generate', methods=['POST']) +def generate(): + global sentence_counter + try: + data = request.get_json() + text: str = data.get('text', '') + + if not text: + return jsonify({'error': 'No text provided'}), 400 + + sentences: List[str] = split_into_sentences(text) + queue_items: List[Tuple[int, str]] = [] + + with counter_lock: + # Reset sentence counter if it's a new session + sentence_counter = 0 # Reset counter + for sentence in sentences: + sentence_counter += 1 + queue_item = (sentence_counter, sentence) + queue_items.append(queue_item) + input_queue.put(queue_item) + print(f"Queuing sentence {sentence_counter}: { + sentence[:30]}...") # Debug print + + return jsonify({ + 'message': 'Text queued successfully', + 'sentences': len(sentences), + 'queue_items': queue_items + }), 200 + + except Exception as e: + print(f"Error in generate endpoint: {str(e)}") + return jsonify({'error': str(e)}), 500 + + +@socketio.on('audio_data') +def handle_audio_data(data: bytes) -> None: + # Broadcast the received audio data to all connected clients + emit('audio_stream', data, broadcast=True) + + +# Add this new route to test Socket.IO +@socketio.on('connect') +def handle_connect(): + client_id = request.sid + print(f"New client connected: {client_id}") # More detailed connection log + socketio.emit('test', {'data': 'Test message'}) + + +# Add a health check route +# For docker healthchecks +@app.route('/health') +def health_check(): + return jsonify({'status': 'healthy'}), 200 + + +if __name__ == '__main__': + import logging + logging.getLogger('werkzeug').setLevel(logging.INFO) + logging.getLogger('engineio').setLevel(logging.INFO) + logging.getLogger('socketio').setLevel(logging.INFO) + + print("Starting server...") + + redis_url = os.getenv("REDIS_URL") + if not redis_url: + raise ValueError("REDIS_URL is not set") + + cache = tts.Cache(redis_url, max_size_mb=1000) + tts_narakeet = tts.Narakeet( + api_key=os.getenv("NARAKEET_API_KEY"), cache=cache) + + if os.getenv("DEBUG") == "True": + debug = True + else: + debug = False + + generator_thread = Thread(target=generator_thread, + args=(input_queue, output_queue, tts_narakeet)) + generator_thread.daemon = True # Make thread daemon + generator_thread.start() + + streamer_thread = Thread(target=streamer_thread, args=(output_queue,)) + streamer_thread.daemon = True # Make thread daemon + streamer_thread.start() + + print("All threads started, running server...") + socketio.run(app, debug=debug, host='0.0.0.0', + port=5000, allow_unsafe_werkzeug=True) diff --git a/textToSpeech/app/static/js/audio.js b/textToSpeech/app/static/js/audio.js new file mode 100644 index 0000000..dfac5b6 --- /dev/null +++ b/textToSpeech/app/static/js/audio.js @@ -0,0 +1,149 @@ +const socket = io({ + transports: ['websocket', 'polling'], + reconnection: true, + reconnectionAttempts: 5, + reconnectionDelay: 1000, + timeout: 10000, + autoConnect: true +}); + +console.log('Attempting to connect to server...'); + +let audioContext; +let isProcessing = false; +let audioQueue = []; +let expectedSentenceId = 1; + +socket.onAny((eventName, ...args) => { + console.log(`Received event: ${eventName}`, args); +}); + +async function initAudioContext() { + audioContext = new (window.AudioContext || window.webkitAudioContext)(); +} + +async function processAudioChunk(audioData, sentenceId) { + try { + console.log(`Processing audio chunk for sentence ${sentenceId}`); + const arrayBuffer = new Uint8Array(audioData).buffer; + const audioBuffer = await audioContext.decodeAudioData(arrayBuffer); + + const source = audioContext.createBufferSource(); + source.buffer = audioBuffer; + source.connect(audioContext.destination); + + // Add an event listener for when the audio finishes playing + source.onended = () => { + console.log(`Finished playing sentence ${sentenceId}`); + expectedSentenceId++; + isProcessing = false; + processQueuedAudio(); // Process next chunk if available + }; + + source.start(); + isProcessing = true; + + } catch (error) { + console.error('Error processing audio chunk:', error); + isProcessing = false; + processQueuedAudio(); // Try next chunk on error + } +} + +function processQueuedAudio() { + if (isProcessing || audioQueue.length === 0) return; + + // Sort queue by sentence ID + audioQueue.sort((a, b) => a.sentenceId - b.sentenceId); + + // Process next chunk if it matches expected ID + const nextChunk = audioQueue[0]; + if (nextChunk.sentenceId === expectedSentenceId) { + audioQueue.shift(); // Remove from queue + processAudioChunk(nextChunk.audioData, nextChunk.sentenceId); + } +} + +// Socket.IO event handler +socket.on('audio_stream', async (data) => { + console.log('Received audio_stream event:', { + sentenceId: data.sentence_id, + dataLength: data.audio_data.length + }); + + if (!audioContext) { + console.log('Initializing audio context'); + await initAudioContext(); + } + + const audioData = new Uint8Array(data.audio_data); + const sentenceId = data.sentence_id; + + // Reset state if this is the start of a new generation + if (sentenceId === 1) { + console.log('New text generation - resetting client state'); + expectedSentenceId = 1; + audioQueue = []; + isProcessing = false; + } + + console.log(`Queueing audio chunk ${sentenceId}`); + + // Queue the audio chunk + audioQueue.push({ + audioData: audioData, + sentenceId: sentenceId, + timestamp: Date.now() + }); + + console.log(`Current queue length: ${audioQueue.length}`); + // Try to process queued audio + processQueuedAudio(); +}); + +// Initialize audio context on user interaction +document.addEventListener('click', async () => { + if (!audioContext) { + await initAudioContext(); + } + if (audioContext.state === 'suspended') { + await audioContext.resume(); + } +}); + +socket.on('test', (data) => { + console.log('Received test message:', data); + status.textContent = "Status: Received test message"; +}); + +socket.on('connecting', () => { + console.log('Attempting to connect...'); + status.textContent = "Status: Attempting to connect..."; +}); + +socket.on('connect_error', (error) => { + console.error('Connection error:', error); + console.log('Transport type:', socket.io.engine.transport.name); + status.textContent = `Status: Connection error - ${error.message}`; +}); + +socket.on('connect_timeout', () => { + console.error('Connection timeout'); + status.textContent = "Status: Connection timeout"; +}); + +socket.on('reconnect_attempt', (attemptNumber) => { + console.log(`Reconnection attempt ${attemptNumber}`); + status.textContent = `Status: Reconnection attempt ${attemptNumber}`; +}); + +socket.on('connect', () => { + console.log('Connected to server with ID:', socket.id); + console.log('Transport type:', socket.io.engine.transport.name); + status.textContent = "Status: Connected to server. Click anywhere to enable audio."; +}); + +socket.on('disconnect', () => { + console.log('Disconnected from server'); + status.textContent = "Status: Disconnected from server"; +}); \ No newline at end of file diff --git a/textToSpeech/app/templates/index.html b/textToSpeech/app/templates/index.html new file mode 100644 index 0000000..445476f --- /dev/null +++ b/textToSpeech/app/templates/index.html @@ -0,0 +1,73 @@ + + + + Audio Streaming + + +
+
Status: Initializing...
+ +
+ +
+ +
+ +
+ + + + + + + + \ No newline at end of file diff --git a/textToSpeech/app/tts.py b/textToSpeech/app/tts.py new file mode 100644 index 0000000..c3301a0 --- /dev/null +++ b/textToSpeech/app/tts.py @@ -0,0 +1,170 @@ +from langdetect import detect +from abc import ABC, abstractmethod +import requests +import time +import subprocess +from typing import Optional, Callable +import redis +import hashlib +from functools import wraps + + +class Cache(): + def __init__(self, redis_url: str, max_size_mb: int = 100): + self.redis = redis.Redis.from_url(redis_url) + self.enabled = True + + self.max_size_bytes = max_size_mb * 1024 * 1024 + + def generate_key(self, text: str, language: str, voice: str, speed: float): + """Generates MD5 hash to be used as key in redis""" + # Removing language because langdetect is ass + key_string = f"{text}-{voice}-{speed}" + return hashlib.sha256(key_string.encode()).hexdigest() + + def get(self, key: str): + value = self.redis.get(key) + if value: + # Add to access order + self.redis.zadd("cache_access_order", {key: time.time()}) + return value + + def set(self, key: str, value: bytes): + self.redis.set(key, value) + # zadd to keep track of access order + self.redis.zadd("cache_access_order", {key: time.time()}) + + def __get_cache_size(self): + return self.redis.info("memory")["used_memory"] + + def __cleanup_cache(self): + cache_size = self.__get_cache_size() + while cache_size > self.max_size_bytes: + # Get the oldest item + oldest_item = self.redis.zrange("cache_access_order", 0, 0)[0] + if isinstance(oldest_item, bytes): + print("Removing old items from cache") + self.redis.delete(oldest_item) + self.redis.zrem("cache_access_order", oldest_item) + else: + print(f"Oldest item is not a bytes object: { + oldest_item}. Something is wrong!") + + cache_size = self.__get_cache_size() + + def cache_info(self): + memory = self.redis.info("memory") + length = self.redis.zcard("cache_access_order") + return { + "memory": memory, + "length": length + } + + +def cached_tts(func: Callable) -> Callable: + @wraps(func) + def wrapper(self, text: str) -> bytes: + if not hasattr(self, 'cache') or not self.cache: + # If no cache is set, just return the result of the function + return func(self, text) + + cache_key = self.cache.generate_key( + text, self.language, self.voice, self.speed) + + cached_audio = self.cache.get(cache_key) + if cached_audio: + print(f"Cache hit for {cache_key}") + return cached_audio + + # Generate TTS if not cached + audio = func(self, text) + self.cache.set(cache_key, audio) + return audio + + return wrapper + + +class TTS(ABC): + def __init__(self, cache: Optional[Cache] = None): + self.cache = cache + + def __detect_language(self, text: str): + """Detect the language of the text, returns string like 'en' or 'nb' """ + try: + return detect(text) + except: + return "en" + + @abstractmethod + def _generate_tts(self, text: str): + pass + + @cached_tts + def tts(self, text: str) -> bytes: + if self.language == "autodetect": + print("Autodetecting language") + self.language = self.__detect_language(text) + print(f"Detected language: {self.language}") + + # TODO fix this + if self.language == "nb": + self.voice = "Aksel" + else: + self.voice = "Harry" + + return self._generate_tts(text) + + +class Narakeet(TTS): + def __init__(self, api_key: str, voice: str = "Aksel", language: str = "autodetect", speed: float = 1.0, cache: Optional[Cache] = None): + + if not api_key: + raise ValueError("API key is required") + self.api_key = api_key + self.voice = voice + self.language = language + self.speed = speed + self.cache = cache + + def _generate_tts(self, text: str) -> bytes: + url = f'https://api.narakeet.com/text-to-speech/mp3?voice={ + self.voice}&voice-speed={self.speed}&language={self.language}' + + options = { + 'headers': { + 'Accept': 'application/octet-stream', + 'Content-Type': 'text/plain', + 'x-api-key': self.api_key, + }, + 'data': text.encode('utf8') + } + start_time = time.time() + response = requests.post(url, **options) + if response.status_code != 200: + print(f"Failed to generate TTS: {options}") + print(f"URL: {url}") + raise ValueError(f"Failed to geernate TTs: { + response.status_code} {response.text}") + end_time = time.time() + print(f"TTS generated in {end_time - start_time} seconds") + return response.content + + +class Espeak(TTS): + def __init__(self, language: str = "en", speed: float = 1.0, cache: Optional[Cache] = None): + default_speed = 175 + + self.language = language + self.speed = speed * default_speed + self.cache = cache + self.voice = "default" + + def _generate_tts(self, text: str) -> bytes: + espeak_result = subprocess.run( + ['espeak-ng', '-v', self.language, '-s', + str(self.speed), '--stdout', text], + capture_output=True, + check=True + ) + + return espeak_result.stdout diff --git a/textToSpeech/docker-compose.yml b/textToSpeech/docker-compose.yml index d484365..1c754f9 100644 --- a/textToSpeech/docker-compose.yml +++ b/textToSpeech/docker-compose.yml @@ -1,16 +1,30 @@ version: '3.8' services: - ttssend: - build: . + redis: + image: redis:latest + container_name: redis_audio + ports: + - "6379:6379" + command: > + redis-server + --appendonly yes + --save 60 1 + --save 300 100 + --save 900 1000 + --maxmemory 1000mb + volumes: + - ./redis_data:/data + tts: + build: + context: . + dockerfile: Dockerfile + env_file: + - .env ports: - "5000:5000" environment: - - RECEIVER_IP=${RECEIVER_IP} - - RECEIVER_PORT=${RECEIVER_PORT} - - TTS_ENGINE=narakeet - - NARAKEET_API_KEY=${NARAKEET_API_KEY} - - CACHING_MAX_SIZE=100 - volumes: - - ./cache:/cache - + - REDIS_URL=redis://redis:6379 + - NARKEE_API_KEY=${NARKEE_API_KEY} + depends_on: + - redis diff --git a/textToSpeech/narakeet.py b/textToSpeech/narakeet.py deleted file mode 100644 index cd59958..0000000 --- a/textToSpeech/narakeet.py +++ /dev/null @@ -1,29 +0,0 @@ -import requests -import time -import os -from loguru import logger - - -def narakeet(text,filename,api_key,voice='harry',speed=1): - url = f'https://api.narakeet.com/text-to-speech/mp3?voice={voice}&voice-speed={speed}' - options = { - 'headers': { - 'Accept': 'application/octet-stream', - 'Content-Type': 'text/plain', - 'x-api-key': api_key, - }, - 'data': text.encode('utf8') - } - - start_time = time.time() - response = requests.post(url, **options) - if response.status_code != 200: - raise ValueError(f"Failed to generate TTS: {response.status_code} {response.text}") - end_time = time.time() - logger.info(f"TTS generated in {end_time - start_time} seconds") - - with open(filename, 'wb') as f: - f.write(response.content) - - - diff --git a/textToSpeech/requirements.txt b/textToSpeech/requirements.txt index 6ea2710..721f1f6 100644 --- a/textToSpeech/requirements.txt +++ b/textToSpeech/requirements.txt @@ -1,6 +1,8 @@ -Flask==2.1.0 -pydub==0.25.1 -requests==2.26.0 -loguru==0.6.0 -Werkzeug==2.0.3 - +Flask==3.0.3 +Flask-SocketIO==5.4.1 +redis==5.2.0 +langdetect==1.0.9 +requests==2.32.3 +python-engineio==4.10.1 +python-socketio==5.11.4 +typing-extensions>=4.0.0 diff --git a/textToSpeech/soundplayer.py b/textToSpeech/soundplayer.py deleted file mode 100644 index e6bbc13..0000000 --- a/textToSpeech/soundplayer.py +++ /dev/null @@ -1,115 +0,0 @@ -import socket -import struct -import pyaudio -import queue -import threading -import time - -# Network parameters -RECEIVER_IP = '0.0.0.0' -RECEIVER_PORT = 42069 -CHUNK_SIZE = 1024 - -# Audio parameters -FORMAT = pyaudio.paInt16 -CHANNELS = 2 -RATE = 44100 - -# Initialize PyAudio -p = pyaudio.PyAudio() - -# Open a stream using PulseAudio -stream = p.open(format=FORMAT, - channels=CHANNELS, - rate=RATE, - output=True, - frames_per_buffer=CHUNK_SIZE, - output_device_index=6) # Use the PulseAudio device - -# Create a TCP socket -sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -sock.bind((RECEIVER_IP, RECEIVER_PORT)) -sock.listen(1) - - -def receive_all(sock, n): - data = b'' - while len(data) < n: - try: - packet = sock.recv(n - len(data)) - if not packet: - return None - data += packet - except socket.timeout: - print("Receiving data timed out.") - return None - except socket.error as e: - print(f"Socket error: {e}") - return None - return data - -def socket_thread(sound_queue): - while True: - sock.settimeout(30) - print("Waiting for connection...") - try: - connection, client_address = sock.accept() - connection.settimeout(5) - print(f"Connected to {client_address}") - - while True: - try: - # Receive the length of the incoming data - length_data = receive_all(connection, 4) - if length_data is None: - print("Connection closed by client") - break - - length = struct.unpack('!I', length_data)[0] - audio_data = receive_all(connection, length) - - if audio_data is None: - print("Connection closed by client") - break - - sound_queue.put(audio_data) - - except socket.timeout: - print("Socket timeout while receiving data") - break - except socket.error as e: - print(f"Socket error: {e}") - break - - except socket.timeout: - print("Timeout while waiting for connection") - except socket.error as e: - print(f"Socket error while accepting connection: {e}") - - print("Disconnected. Waiting for new connection...") - -sound_queue = queue.Queue() -threading.Thread(target=socket_thread, args=(sound_queue,)).start() - - -try: - while True: - audio_data = sound_queue.get() - if audio_data is None: - print("No audio data") - time.sleep(0.1) - continue - - print(f"Received {len(audio_data)} bytes of audio data") - stream.write(audio_data) - -except KeyboardInterrupt: - print("Streaming stopped.") - sock.close() - -finally: - print("Stopping stream") - stream.stop_stream() - stream.close() - p.terminate() - sock.close() diff --git a/textToSpeech/tts_server.py b/textToSpeech/tts_server.py deleted file mode 100644 index 69c6620..0000000 --- a/textToSpeech/tts_server.py +++ /dev/null @@ -1,166 +0,0 @@ -import os -import socket -import struct -import hashlib -from loguru import logger -from pydub import AudioSegment -from pydub.utils import make_chunks -import subprocess -import queue -import threading -from flask import jsonify, request, Flask -import time -from narakeet import narakeet - -# Network parameters -RECEIVER_IP = os.environ.get('RECEIVER_IP') -RECEIVER_PORT = os.environ.get('RECEIVER_PORT') -TTS_ENGINE = os.environ.get('TTS_ENGINE') -NARAKEET_API_KEY = os.environ.get('NARAKEET_API_KEY') -CACHING_MAX_SIZE = os.environ.get('CACHING_MAX_SIZE') -# Validate environment variables -if not RECEIVER_IP: - raise ValueError("RECEIVER_IP environment variable is not set") - -if not RECEIVER_PORT: - raise ValueError("RECEIVER_PORT environment variable is not set") -try: - RECEIVER_PORT = int(RECEIVER_PORT) -except ValueError: - raise ValueError("RECEIVER_PORT must be a valid integer") - -if not TTS_ENGINE: - raise ValueError("TTS_ENGINE environment variable is not set") -if TTS_ENGINE not in ['narakeet', 'espeak']: - raise ValueError("TTS_ENGINE must be either 'narakeet' or 'espeak'") - -if TTS_ENGINE == 'narakeet' and not NARAKEET_API_KEY: - raise ValueError("NARAKEET_API_KEY environment variable is not set, but TTS_ENGINE is set to 'narakeet'") - -caching_directory = "/cache" - -if CACHING_MAX_SIZE: - try: - CACHING_MAX_SIZE = int(CACHING_MAX_SIZE) - if CACHING_MAX_SIZE <= 0: - raise ValueError("CACHING_MAX_SIZE must be a positive integer") - except ValueError: - raise ValueError("CACHING_MAX_SIZE must be a valid positive integer") - - logger.info(f"Caching enabled. Max cache size: {CACHING_MAX_SIZE} MB") -else: - logger.info("Caching disabled") - - - -def generate_audio(text: str) -> AudioSegment: - if CACHING_MAX_SIZE: - sha256_hash = hashlib.sha256(text.encode()).hexdigest() - cache_file = os.path.join(caching_directory, f"{sha256_hash}.mp3") - - if os.path.exists(cache_file): - logger.info(f"Cache hit for text: {text[:30]}...") - audio = AudioSegment.from_mp3(cache_file) - # Update the access time of the file - os.utime(cache_file, None) - return audio - - logger.info(f"Cache miss for text: {text[:30]}...") - - # Generate new audio - if TTS_ENGINE == 'espeak': - subprocess.run(['espeak-ng', '-v', 'en', '-s', '150', '-w', 'temp.mp3', text], check=True) - audio = AudioSegment.from_mp3('temp.mp3') - elif TTS_ENGINE == 'narakeet': - narakeet(text, 'temp.mp3', api_key=NARAKEET_API_KEY) - audio = AudioSegment.from_mp3('temp.mp3') - elif TTS_ENGINE == 'openai': - raise NotImplementedError("OpenAI TTS is not implemented yet") - # TODO: Implement OpenAI TTS - audio = AudioSegment.from_mp3('temp.mp3') - - - if CACHING_MAX_SIZE: - # Add the new file to the cache - audio.export(cache_file, format="mp3") - logger.info(f"Added new file to cache: {cache_file}") - - # Check cache size and remove oldest files if necessary - cache_files = [os.path.join(caching_directory, f) for f in os.listdir(caching_directory) if f.endswith('.mp3')] - cache_files.sort(key=lambda x: os.path.getatime(x)) - - total_size = sum(os.path.getsize(f) for f in cache_files) - while total_size > CACHING_MAX_SIZE * 1024 * 1024: # Convert MB to bytes - oldest_file = cache_files.pop(0) - file_size = os.path.getsize(oldest_file) - os.remove(oldest_file) - total_size -= file_size - logger.info(f"Removed oldest file from cache: {oldest_file} (size: {file_size / 1024 / 1024:.2f} MB)") - - logger.info(f"Current cache size: {total_size / 1024 / 1024:.2f} MB") - - return audio - - -def generate_tts_thread(input_queue, output_queue): - while True: - text = input_queue.get() - print("Generating TTS") - - audio = generate_audio(text) - - - print("TTS generated") - - audio = audio.set_channels(2).set_frame_rate(44100).set_sample_width(2) - - output_queue.put(audio) - print("TTS put in queue") - - -def audio_sender_thread(audio_queue:queue.Queue): - while True: - if not audio_queue.empty(): - audio = audio_queue.get() - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect((RECEIVER_IP, RECEIVER_PORT)) - - print("Sending audio") - for chunk in make_chunks(audio, 1024): - print("Sending chunk") - raw_data = chunk.raw_data - sock.sendall(struct.pack('!I', len(raw_data))) - sock.sendall(raw_data) - - sock.close() - else: - time.sleep(0.1) - -audio_queue = queue.Queue() -tts_queue = queue.Queue() - -threading.Thread(target=generate_tts_thread, args=(tts_queue, audio_queue)).start() -threading.Thread(target=audio_sender_thread, args=(audio_queue,)).start() - - -app = Flask(__name__) - -@app.route('/tts', methods=['POST']) -def text_to_speech(): - data = request.json - if 'text' not in data: - return jsonify({"error": "No text provided"}), 400 - - print("Got input: " + data['text']) - for sentence in data['text'].split('.'): - tts_queue.put(sentence) - return jsonify({"message": "Text received and processing started"}), 202 - -if __name__ == '__main__': - logger.info("Starting TTS server") - logger.info(f"RECEIVER_IP: {RECEIVER_IP}") - logger.info(f"RECEIVER_PORT: {RECEIVER_PORT}") - logger.info(f"TTS_ENGINE: {TTS_ENGINE}") - if CACHING_MAX_SIZE: - logger.info(f"CACHING_MAX_SIZE: {CACHING_MAX_SIZE}") - app.run(debug=True, port=5000, host='0.0.0.0')