Skip to content

Commit 3aa2272

Browse files
authored
Merge pull request #439 from nodestream-proj/kafka-consumer-factory
Allowing additional params to be sent to KafkaConsumerFactory
2 parents f0d69f4 + 9f5d8f0 commit 3aa2272

File tree

2 files changed

+9
-2
lines changed

2 files changed

+9
-2
lines changed

nodestream/pipeline/extractors/streams/kafka.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@ def build_consumer_config(
1717
security_protocol: str,
1818
group_id: str,
1919
bootstrap_servers: List[str],
20+
**kwargs: Any,
2021
):
2122
return {
2223
"bootstrap.servers": ",".join(bootstrap_servers),
2324
"auto.offset.reset": offset_reset,
2425
"security.protocol": security_protocol,
2526
"group.id": group_id,
27+
**kwargs,
2628
}
2729

2830
@classmethod
@@ -33,10 +35,15 @@ def new(
3335
bootstrap_servers: List[str],
3436
offset_reset: str,
3537
security_protocol: str,
38+
**kwargs: Any,
3639
):
3740
consumer = Consumer(
3841
cls.build_consumer_config(
39-
offset_reset, security_protocol, group_id, bootstrap_servers
42+
offset_reset,
43+
security_protocol,
44+
group_id,
45+
bootstrap_servers,
46+
**kwargs,
4047
)
4148
)
4249
consumer.subscribe([topic_name])

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "nodestream"
3-
version = "0.14.15"
3+
version = "0.14.16"
44
description = "A Fast, Declarative ETL for Graph Databases."
55
license = "GPL-3.0-only"
66
authors = [

0 commit comments

Comments
 (0)