Skip to content

Commit 9e8f663

Browse files
authored
Optionally address streams by ARN instead of name (#39)
1 parent 1148ed6 commit 9e8f663

File tree

4 files changed

+26
-5
lines changed

4 files changed

+26
-5
lines changed

README.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ pip install async-kinesis
1717
- deadlock + reallocation of shards if checkpoint fails to heartbeat within "session_timeout"
1818
- processors (aggregator + serializer)
1919
- json line delimited, msgpack
20-
20+
- Address Kinesis streams by name or [ARN]
2121

2222
See [docs/design](./docs/DESIGN.md) for more details.
2323
See [docs/yetanother](docs/YETANOTHER.md) as to why reinvent the wheel.
@@ -31,6 +31,11 @@ AWS_ACCESS_KEY_ID
3131
AWS_SECRET_ACCESS_KEY
3232
```
3333

34+
## Stream addressing
35+
36+
The `stream_name` argument to consumer and producer accepts either the [StreamName]
37+
like `test`, or a full [StreamARN] like `arn:aws:kinesis:eu-central-1:842404475894:stream/test`.
38+
3439
## Producer
3540

3641
from kinesis import Producer
@@ -206,3 +211,6 @@ TESTING_USE_AWS_KINESIS=1
206211
Note you can ignore these tests if submitting PR unless core batching/processing behaviour is being changed.
207212

208213

214+
[ARN]: https://docs.aws.amazon.com/IAM/latest/UserGuide/reference-arns.html
215+
[StreamARN]: https://docs.aws.amazon.com/kinesis/latest/APIReference/API_StreamDescription.html#Streams-Type-StreamDescription-StreamARN
216+
[StreamName]: https://docs.aws.amazon.com/kinesis/latest/APIReference/API_StreamDescription.html#Streams-Type-StreamDescription-StreamName

kinesis/base.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,19 @@ async def __aexit__(self, exc_type, exc, tb):
8282
await self.close()
8383
await self.client.__aexit__(exc_type, exc, tb)
8484

85+
@property
86+
def address(self):
87+
"""
88+
Return address of stream, either as StreamName or StreamARN, when applicable.
89+
90+
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_StreamDescription.html#Streams-Type-StreamDescription-StreamName
91+
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_StreamDescription.html#Streams-Type-StreamDescription-StreamARN
92+
"""
93+
if self.stream_name.startswith("arn:"):
94+
return {"StreamARN": self.stream_name}
95+
else:
96+
return {"StreamName": self.stream_name}
97+
8598
async def get_client(self):
8699

87100
# Note: max_attempts = 0
@@ -101,7 +114,7 @@ async def get_client(self):
101114
async def get_stream_description(self):
102115

103116
try:
104-
return (await self.client.describe_stream(StreamName=self.stream_name))[
117+
return (await self.client.describe_stream(**self.address))[
105118
"StreamDescription"
106119
]
107120
except ClientError as err:
@@ -240,7 +253,7 @@ async def _create_stream(self, ignore_exists=True):
240253

241254
try:
242255
await self.client.create_stream(
243-
StreamName=self.stream_name, ShardCount=self.create_stream_shards
256+
**self.address, ShardCount=self.create_stream_shards
244257
)
245258
except ClientError as err:
246259
code = err.response["Error"]["Code"]

kinesis/consumer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -353,12 +353,12 @@ async def get_shard_iterator(self, shard_id, last_sequence_number=None):
353353
)
354354

355355
params = {
356-
"StreamName": self.stream_name,
357356
"ShardId": shard_id,
358357
"ShardIteratorType": "AFTER_SEQUENCE_NUMBER"
359358
if last_sequence_number
360359
else self.iterator_type,
361360
}
361+
params.update(self.address)
362362

363363
if last_sequence_number:
364364
params["StartingSequenceNumber"] = last_sequence_number

kinesis/producer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ async def _push_kinesis(self, items):
279279
}
280280
for item in items
281281
],
282-
StreamName=self.stream_name,
282+
**self.address,
283283
)
284284

285285
log.info(

0 commit comments

Comments
 (0)