Skip to content

Commit

Permalink
Processors use shared queues, means there can be more than process on…
Browse files Browse the repository at this point in the history
… a queue to share load (#265)
  • Loading branch information
cybermaggedon authored Jan 11, 2025
1 parent c603caa commit cd9a208
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 0 deletions.
2 changes: 2 additions & 0 deletions trustgraph-base/trustgraph/base/consumer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

from pulsar.schema import JsonSchema
import pulsar
from prometheus_client import Histogram, Info, Counter, Enum
import time

Expand Down Expand Up @@ -51,6 +52,7 @@ def __init__(self, **params):

self.consumer = self.client.subscribe(
input_queue, subscriber,
consumer_type=pulsar.ConsumerType.Shared,
schema=JsonSchema(input_schema),
)

Expand Down
2 changes: 2 additions & 0 deletions trustgraph-base/trustgraph/base/consumer_producer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

from pulsar.schema import JsonSchema
import pulsar
from prometheus_client import Histogram, Info, Counter, Enum
import time

Expand Down Expand Up @@ -71,6 +72,7 @@ def __init__(self, **params):

self.consumer = self.client.subscribe(
input_queue, subscriber,
consumer_type=pulsar.ConsumerType.Shared,
schema=JsonSchema(input_schema),
)

Expand Down

0 comments on commit cd9a208

Please sign in to comment.