Skip to content

Commit

Permalink
fix(messaging): high spike of RPC queues for AMQP strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
JozefFlakus committed Nov 18, 2020
1 parent e24ea04 commit 614d1e7
Showing 1 changed file with 21 additions and 15 deletions.
36 changes: 21 additions & 15 deletions packages/messaging/src/transport/strategies/amqp.strategy.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { pipe } from 'fp-ts/lib/function';
import { Subject, fromEvent, merge, from } from 'rxjs';
import { map, filter, take, mapTo, first, mergeMap, share, tap } from 'rxjs/operators';
import { Channel, ConsumeMessage, Replies } from 'amqplib';
Expand Down Expand Up @@ -109,7 +110,7 @@ class AmqpStrategyConnection implements TransportLayerConnection<Transport.AMQP>
});
});

await this.channelWrapper.addSetup(async (channel: Channel) => {
const modifyChannelSetup = async (channel: Channel): Promise<void> => {
const replyQueue = await channel.assertQueue('', {
exclusive: true,
autoDelete: true,
Expand All @@ -122,26 +123,31 @@ class AmqpStrategyConnection implements TransportLayerConnection<Transport.AMQP>
{ noAck: true },
);

setTimeout(() => {
channel
.cancel(consumer.consumerTag)
.catch(error => resSubject$.error(error));
}, timeout);
setTimeout(async () => await removeChannelSetupForConsumerTag(consumer.consumerTag), timeout);

replyToSubject.next(replyQueue.queue);
});
};

const removeChannelSetupForConsumerTag = (consumerTag: string): Promise<void> =>
this.channelWrapper
.removeSetup(modifyChannelSetup, async (channel: Channel) => channel.cancel(consumerTag))
.catch(error => resSubject$.error(error));

await this.channelWrapper.addSetup(modifyChannelSetup);

return resSubject$.asObservable().pipe(
filter(raw => raw.msg.properties.correlationId === correlationId),
take(1),
mergeMap(raw => from(this.channelWrapper.addSetup((channel: Channel) => channel.cancel(raw.tag))).pipe(
mapTo(({
data: raw.msg.content,
replyTo: raw.msg.properties.replyTo,
correlationId: raw.msg.properties.correlationId,
raw,
} as TransportMessage<Buffer>)),
)),
mergeMap(raw =>
pipe(
from(removeChannelSetupForConsumerTag(raw.tag)),
mapTo(({
data: raw.msg.content,
replyTo: raw.msg.properties.replyTo,
correlationId: raw.msg.properties.correlationId,
raw,
} as TransportMessage<Buffer>)),
)),
).toPromise();
};

Expand Down

0 comments on commit 614d1e7

Please sign in to comment.