Your services love to talk a lot? Your websocket clients are sending you a lot of messages? Whisper your messages right to where you need them and reduce noise.
Intelligent message batching system that whispers your messages in perfect harmony, with smart batching and error classification.
- Smart Error Classification: Automatically classify and aggregate similar errors
- Queue-Based Architecture: Uses efficient FIFO queues for all append-only operations
- Flexible Processor System: Support for Telegram and custom message processors
- Intelligent Error Aggregation: Reduces noise by grouping similar errors within time windows
- Type-Safe: Written in TypeScript with full type safety
Alice Whispers uses queues as its primary data structure for several key reasons:
-
Append-Only Efficiency: All message operations (batching, error tracking, pattern matching) are append-only by nature. Queues provide O(1) append and O(1) dequeue operations, making them ideal for high-throughput message processing.
-
Memory Efficiency: Unlike arrays that may require resizing and copying, our queue implementation maintains a linked structure that grows efficiently with message volume.
-
FIFO Guarantee: Messages are processed in the exact order they are received, which is crucial for maintaining message context and proper error aggregation.
-
Iterator Support: Our Queue implementation provides standard iterator support, making it easy to process messages in sequence while maintaining clean code.
The Queue class is exported and can be used directly if needed:
import { Queue } from 'alice-whispers';
const messageQueue = new Queue<string>();
messageQueue.enqueue('Hello');
const message = messageQueue.dequeue(); // FIFO order
-
🔄 Smart message batching with configurable batch sizes and timing
-
🎯 Intelligent error classification and pattern matching
-
📊 Error aggregation with customizable time windows
-
🚀 High Performance: Uses array-based message format for minimal memory overhead
-
🔌 Extensible processor system for custom implementations
-
🎭 Elegant: Beautiful message formatting and intelligent error handling
-
💪 Written in TypeScript with full type safety
-
Intelligent Message Aggregation
- Automatically group similar messages within time windows
- Reduce thousands of messages to meaningful summaries
- Configurable aggregation patterns and thresholds
-
Efficient Resource Usage
- Optimized memory footprint
- Concurrent message processing
- Smart batching with configurable sizes
-
Developer-Friendly
- Full TypeScript support
- Easy to configure and extend
- Comprehensive error handling
# npm
npm install alice-whispers
# yarn
yarn add alice-whispers
import { createMessageBatcher, createTelegramProcessor } from 'alice-whispers';
// Create a Telegram processor
const telegramProcessor = createTelegramProcessor({
botToken: process.env.TELEGRAM_BOT_TOKEN!,
chatId: process.env.TELEGRAM_CHAT_ID!,
development: process.env.NODE_ENV === 'development',
});
// Create a batcher with the processor
const batcher = createMessageBatcher({
maxBatchSize: 10, // Process when 10 messages are queued
maxWaitMs: 5000, // Or when 5 seconds have passed
concurrentProcessors: 2, // Run up to 2 processors concurrently
processors: [telegramProcessor],
});
// Send messages
batcher.info('Service started');
batcher.warning('High memory usage detected');
batcher.error('Database connection failed', new Error('Connection timeout'));
// Force process any remaining messages
await batcher.flush();
// Clean up when done
batcher.destroy();
Handle thousands of messages with smart batching and aggregation:
import {
createMessageBatcher,
createTelegramProcessor,
addErrorPatterns,
} from 'alice-whispers';
// Configure error patterns for aggregation
addErrorPatterns([
{
name: 'rateLimit',
pattern: /rate limit exceeded/i,
category: 'RATE_LIMIT',
severity: 'high',
aggregation: {
windowMs: 60000, // 1 minute window
countThreshold: 10, // Aggregate after 10 occurrences
},
},
]);
const telegramProcessor = createTelegramProcessor({
botToken: process.env.TELEGRAM_BOT_TOKEN!,
chatId: process.env.TELEGRAM_CHAT_ID!,
});
// Create processor with larger batch size
const batcher = createMessageBatcher({
maxBatchSize: 100, // Process in batches of 100
maxWaitMs: 30000, // Or every 30 seconds
concurrentProcessors: 3, // Run multiple processors in parallel
processors: [telegramProcessor],
});
// Simulate high-volume message processing
for (let i = 0; i < 1000; i++) {
try {
// Your application logic
throw new Error('rate limit exceeded');
} catch (error) {
batcher.error(`Operation ${i} failed`, error);
}
}
// Instead of 1000 separate messages, you'll get aggregated updates like:
// "🚨 [AGGREGATED] 127 similar RATE_LIMIT errors in last 60s"
Create your own message processors:
import {
createCustomProcessor,
type Message,
msgToMsgsObjects,
} from 'alice-whispers';
const myBatchProcessor = async (messages: Message[]) => {
doSomething(msgToMsgsObjects(messages));
// or use messages directly as an array (recommended)
};
const consoleProcessor = createCustomProcessor({
name: 'my-processor',
// here you can define what you want to do with the messages
processBatch: myBatchProcessor,
});
// Add to batcher
const batcher = createMessageBatcher({
processors: [consoleProcessor],
maxBatchSize: 100,
maxWaitMs: 1000,
});
Messages are stored internally as arrays for performance:
type Message = [
string, // chatId
string, // text
NotificationLevel, // 'info' | 'warning' | 'error'
(Error | any)? // optional error
];
The library includes built-in message classification and aggregation for any message type (info, warning, or error):
import { addErrorPatterns } from 'alice-whispers';
// Configure patterns for message classification and aggregation
addErrorPatterns([
{
name: 'startupInfo',
pattern: /starting batch process/i,
category: 'STARTUP',
severity: 'low',
aggregation: {
windowMs: 60000, // 1 minute window
countThreshold: 5, // Aggregate after 5 similar messages
},
},
{
name: 'rateLimit',
pattern: /rate limit exceeded/i,
category: 'RATE_LIMIT',
severity: 'high',
aggregation: {
windowMs: 60000,
countThreshold: 10,
},
},
{
name: 'progressUpdate',
pattern: /processed \d+ items/i,
category: 'PROGRESS',
severity: 'low',
aggregation: {
windowMs: 30000, // 30 second window
countThreshold: 3,
},
},
]);
// Messages will be automatically classified and aggregated
batcher.info('Starting batch process'); // Will be aggregated if 5+ similar messages in 1 min
batcher.error('Rate limit exceeded'); // Will be aggregated if 10+ similar errors in 1 min
batcher.info('Processed 100 items'); // Will be aggregated if 3+ similar messages in 30s
// Instead of many separate messages, you'll get aggregated updates like:
// "ℹ️ [AGGREGATED] 5 similar STARTUP messages in last 60s"
// "🚨 [AGGREGATED] 15 similar RATE_LIMIT errors in last 60s"
// "ℹ️ [AGGREGATED] 10 PROGRESS updates in last 30s"
Messages are classified based on pattern matching (regex or custom function) and can be aggregated based on configurable thresholds. This helps reduce noise while still maintaining visibility into system behavior.
Messages are internally stored as tuples with the following structure:
type Message = [
chatId, // Position 0: string - Identifier for the chat/channel
text, // Position 1: string - The message content
level, // Position 2: 'info' | 'warning' | 'error' - Message level
error? // Position 3: Optional Error | string - Error details
];
- Position 0 (chatId): String identifier for the target chat or channel
- Position 1 (text): The actual message content to be sent
- Position 2 (level): Notification level indicating message importance
- Position 3 (error): Optional error object or string for error messages
Error patterns are internally stored as tuples with the following structure:
type ErrorPattern = readonly [
pattern, // Position 0: RegExp | function - Pattern to match errors
category, // Position 1: string - Category name for grouping similar errors
severity, // Position 2: 'low' | 'medium' | 'high' | string - Error severity
[windowMs, countThreshold]? // Position 3: Optional aggregation settings
];
- Position 0 (pattern): Can be a RegExp or a function that returns boolean/Promise
- Position 1 (category): String identifier to group similar errors
- Position 2 (severity): Error severity level ('low', 'medium', 'high', or custom string)
- Position 3 (aggregation): Optional tuple of [windowMs, countThreshold]
- windowMs: Time window in milliseconds for aggregation
- countThreshold: Number of occurrences needed to trigger aggregation
const batcher = createMessageBatcher(config);
// Add processor
batcher.addExtraProcessor(newProcessor);
// Remove processor
batcher.removeExtraProcessor('processor-name');
// Remove all extra processors
batcher.removeAllExtraProcessors();
const processor = createCustomProcessor({
name: 'hybrid',
// Async processing
processBatch: async (messages) => {
await someAsyncOperation(messages);
},
// Sync processing (optional)
processBatchSync: (messages) => {
synchronousOperation(messages);
},
});
-
Message Batching: Configure
maxBatchSize
andmaxWaitMs
based on your needs:- High volume: Lower
maxWaitMs
, highermaxBatchSize
- Low latency: Lower
maxBatchSize
, lowermaxWaitMs
- High volume: Lower
-
Error Handling: Always use the error parameter in error messages:
try { await riskyOperation(); } catch (error) { batcher.error('Operation failed', error); }
-
Resource Cleanup: Always call
destroy()
when done:process.on('SIGTERM', () => { batcher.destroy(); });
-
Development Mode: Use development mode for testing:
const processor = createTelegramProcessor({ ...config, development: process.env.NODE_ENV !== 'production', });
MIT © 0xAlice
Example of message reduction through aggregation:
Input: 100,140 total messages
- 100,000 error messages
- 140 info messages
Output: Aggregated to just 4 meaningful messages:
🚨 99,862 error messages of category RATE_LIMIT occurred
ℹ️ 20 info messages of category BURST_COMPLETE occurred
ℹ️ Progress updates aggregated by category
ℹ️ Burst starts aggregated by category
The library uses a custom Queue data structure (inspired by yocto-queue) for efficient message handling. The Queue provides constant time O(1) operations for both enqueue and dequeue operations, making it ideal for high-throughput message processing.
- Constant Time Operations: Both enqueue and dequeue are O(1), unlike arrays where shift() is O(n)
- Memory Efficient: Uses a linked list internally, no array resizing or copying
- FIFO Guarantee: Messages are processed in the exact order they were received
- Iterator Support: Can be used in for...of loops and spread operations
const queue = new Queue<Message>();
// Add items - O(1)
queue.enqueue(message);
// Remove and return first item - O(1)
const message = queue.dequeue();
// Get size - O(1)
const size = queue.size;
// Convert to array - O(n)
const array = queue.toArray();
// or
const array = [...queue];
// Clear all items - O(1)
queue.clear();
// Array to Queue
const array = [1, 2, 3];
const queue = new Queue(array);
// Queue to Array
const array = queue.toArray();
// or use spread operator
const array = [...queue];
The Queue is used internally for message batching and error collection, but when interfacing with external processors, the messages are converted to arrays for better compatibility.