Skip to content

Commit

Permalink
batch insert
Browse files Browse the repository at this point in the history
  • Loading branch information
vigneshshettyin committed Nov 3, 2024
1 parent 0eac8a3 commit 2296b4c
Showing 1 changed file with 47 additions and 43 deletions.
90 changes: 47 additions & 43 deletions kafka-clickhouse/src/services/rabbitmq/producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,58 +2,62 @@ import rabbitmqConnection from './connection.js';
import UserLocationService from '../location/location.js';
import { ProduceMessage, UserLocation } from './types.js';


const exchangeName = 'exchange'; // The exchange name
const routingKey = 'eurl_click_analytics'; // The routing key
const batchSize = 10; // Set batch size for messages

const pub = rabbitmqConnection.createPublisher({
confirm: true, // Enable publish confirmations
maxAttempts: 2, // Enable retries
exchanges: [{ exchange: exchangeName, type: 'fanout', durable: true }], // Ensure the exchange exists
// Enable publish confirmations, similar to consumer acknowledgements
confirm: true,
// Enable retries
maxAttempts: 2,
// Ensure the existence of an exchange before we use it
exchanges: [{ exchange: exchangeName, type: 'fanout', durable: true }],
});

// Publish a message for testing
// pub.send({ exchange: exchangeName, routingKey: routingKey }, {
// code: '123',
// browser: 'Chrome',
// os: 'Windows',
// device: 'Desktop',
// country: 'US',
// region: 'CA',
// city: 'Los Angeles',
// })
// .then(() => console.log('Message published'))
// .catch((error) => console.error('Error publishing message:', error));

// TODO: Implement the Producer class
// TODO: Make it batch processing

let dump = [];

class Producer {
private dump: ProduceMessage[]; // Batch of messages

constructor() {
this.dump = [];
}

public async produceLogic(ip: string, browser: string, os: string, device: string, code: string): Promise<void> {
try {
const location: UserLocation = await UserLocationService.getUserLocation(ip);
const message: ProduceMessage = {
code,
browser,
os,
device,
country: location.country,
region: location.region,
city: location.city,
};

console.log(`[INFO] Adding message to batch: ${JSON.stringify(message)}`);
this.dump.push(message);

if (this.dump.length >= batchSize) {
await this.publishBatch();
}
} catch (error) {
console.error(`[ERROR] Failed to process message for IP ${ip}:`, error);
async produceLogic(ip: string, browser: string, os: string, device: string, code: string): Promise<void> {
const location: UserLocation = await UserLocationService.getUserLocation(ip);
const message: ProduceMessage = {
code,
browser,
os,
device,
country: location.country,
region: location.region,
city: location.city,
};
console.log('Producing message:', JSON.stringify(message));
dump.push(message);
if (dump.length >= 10) {
console.log('Dumping messages:', dump);
pub.send({ exchange: exchangeName, routingKey: routingKey }, dump)
.then(() => console.log(`Published ${dump.length} messages, at ${new Date().toISOString()}`))
.catch((error) => console.error('Error publishing message:', error));
dump = [];
}
}

private async publishBatch(): Promise<void> {
try {
await pub.send({ exchange: exchangeName, routingKey: routingKey }, this.dump);
console.log(`[SUCCESS] Published ${this.dump.length} messages at ${new Date().toISOString()}`);
} catch (error) {
console.error(`[ERROR] Failed to publish batch of ${this.dump.length} messages:`, error);
} finally {
this.dump = []; // Clear batch regardless of success or failure
}
}

}
}

const instance = new Producer();

export default instance;

0 comments on commit 2296b4c

Please sign in to comment.