Skip to content

Commit

Permalink
simple batch for rmq
Browse files Browse the repository at this point in the history
  • Loading branch information
vigneshshettyin committed Nov 3, 2024
1 parent eb5008c commit 2f2c155
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 10 deletions.
2 changes: 1 addition & 1 deletion kafka-clickhouse/src/fixtures/rabbitmq_clickhouse.sql
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ CREATE TABLE IF NOT EXISTS eurl_data.click_analytics_rq (
`city` String -- The city where the user is located
) ENGINE = RabbitMQ -- Use RabbitMQ as the table engine for real-time data ingestion
SETTINGS
rabbitmq_host_port = '172.17.0.5:5672', -- RabbitMQ broker address
rabbitmq_host_port = '172.18.0.9:5672', -- RabbitMQ broker address
rabbitmq_routing_key_list = 'eurl_click_analytics', -- RabbitMQ routing key for message filtering
rabbitmq_exchange_name = 'exchange', -- The RabbitMQ exchange name where messages are published
rabbitmq_format = 'JSONEachRow'; -- Format for incoming messages from RabbitMQ (JSON format, one message per row)
Expand Down
20 changes: 11 additions & 9 deletions kafka-clickhouse/src/services/rabbitmq/producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const pub = rabbitmqConnection.createPublisher({
// TODO: Implement the Producer class
// TODO: Make it batch processing

let dump = [];

class Producer {
async produceLogic(ip: string, browser: string, os: string, device: string, code: string): Promise<void> {
const location: UserLocation = await UserLocationService.getUserLocation(ip);
Expand All @@ -43,15 +45,15 @@ class Producer {
region: location.region,
city: location.city,
};
console.log('Producing message:', message);
pub.send(
{exchange: exchangeName, routingKey: routingKey}, // metadata
message
).catch((error) => {
console.error('Error producing message:', error);
}).finally(() => {
console.log('Message produced');
});
console.log('Producing message:', JSON.stringify(message));
dump.push(message);
if (dump.length >= 10) {
pub.send({ exchange: exchangeName, routingKey: routingKey }, dump)
.then(() => console.log(`Published ${dump.length} messages, at ${new Date().toISOString()}`))
.catch((error) => console.error('Error publishing message:', error));
dump = [];
}

}
}

Expand Down

0 comments on commit 2f2c155

Please sign in to comment.