Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Readd Confluent Kafka drivers #418

Draft
wants to merge 21 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
'faust.cli',
'faust.models',
'faust.serializers',
'faust.transport.drivers.confluent',
'faust.types',
'faust.types._env',
'faust.utils',
Expand Down
9 changes: 9 additions & 0 deletions docs/includes/settingref.txt
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,15 @@ You can also pass a list of URLs:
Limitations: None


- ``confluent://``

Experimental transport using the :pypi:`confluent-kafka` client.

Limitations: Does not do sticky partition assignment (not
suitable for tables), and do not create any necessary internal
topics (you have to create them manually).


.. setting:: broker_credentials

``broker_credentials``
Expand Down
2 changes: 1 addition & 1 deletion examples/word_count.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

app = faust.App(
'word-counts',
broker='kafka://localhost:9092',
broker='confluent://localhost:9092',
store='rocksdb://',
version=1,
topic_partitions=8,
Expand Down
14 changes: 13 additions & 1 deletion extra/bandit/baseline.json
Original file line number Diff line number Diff line change
Expand Up @@ -1070,6 +1070,18 @@
"loc": 890,
"nosec": 0
},
"faust/transport/drivers/confluent.py": {
"CONFIDENCE.HIGH": 0.0,
"CONFIDENCE.LOW": 0.0,
"CONFIDENCE.MEDIUM": 0.0,
"CONFIDENCE.UNDEFINED": 0.0,
"SEVERITY.HIGH": 0.0,
"SEVERITY.LOW": 0.0,
"SEVERITY.MEDIUM": 0.0,
"SEVERITY.UNDEFINED": 0.0,
"loc": 486,
"nosec": 0
},
"faust/transport/producer.py": {
"CONFIDENCE.HIGH": 0.0,
"CONFIDENCE.LOW": 0.0,
Expand Down Expand Up @@ -2091,4 +2103,4 @@
"test_name": "blacklist"
}
]
}
}
28 changes: 12 additions & 16 deletions faust/transport/drivers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
"""Transport registry."""
from yarl import URL
from typing import Type

from .aiokafka import Transport as AIOKafkaTransport
from mode.utils.imports import FactoryMapping

__all__ = ["by_name", "by_url"]


DRIVERS = {
"aiokafka": AIOKafkaTransport,
"kafka": AIOKafkaTransport,
}


def by_name(driver_name: str):
return DRIVERS[driver_name]
from faust.types import TransportT

__all__ = ["by_name", "by_url"]

def by_url(url: URL):
scheme = url.scheme
return DRIVERS[scheme]
TRANSPORTS: FactoryMapping[Type[TransportT]] = FactoryMapping(
aiokafka="faust.transport.drivers.aiokafka:Transport",
confluent="faust.transport.drivers.confluent:Transport",
kafka="faust.transport.drivers.aiokafka:Transport",
)
TRANSPORTS.include_setuptools_namespace("faust.transports")
by_name = TRANSPORTS.by_name
by_url = TRANSPORTS.by_url
Loading