Skip to content

Commit b1abf01

Browse files
authored
modernize async-kinesis library for Python 3.10-3.12 (#40)
- Updated to support Python 3.10, 3.11, and 3.12 (last 3 releases) - Added Python version compatibility layer for timeout functions - Updated classifiers and python_requires in setup.py - Migrated from deprecated aredis to modern redis (>=4.0.0) - Updated async-timeout to >=4.0.0 with backward compatibility - Fixed aiobotocore version alignment across files - Resolved Python 3.12 distutils compatibility issues - Migrated from nose to pytest with async support - Added comprehensive GitHub Actions CI workflow for matrix testing - Created organized test modules (conftest.py, test_integration.py, etc.) - Added pytest configuration and coverage reporting - Implemented LocalStack for reliable Kinesis mocking - Added comprehensive type hints throughout codebase - Enhanced Protocol definitions for processors and checkpointers - Improved IDE support and static analysis capabilities - Maintained usability with moderate typing approach - Fixed processor inheritance issues maintaining backward compatibility - Enhanced error handling in producer for LocalStack compatibility - Improved Redis checkpointer concurrent allocation logic - Added null result guards for robustness - Split monolithic tests.py into focused test modules - Added integration tests with comprehensive scenarios - Enhanced checkpointer tests with proper isolation - Implemented service startup reliability checks - Added timeout compatibility across Python versions - Created GitHub Actions workflow with service containers - Added matrix testing for Python 3.10, 3.11, 3.12 - Integrated code coverage reporting - Added linting and formatting checks - 56/62 tests passing (6 appropriate skips) - Python 3.10: ✅ Full compatibility achieved - Python 3.11: ✅ Working - Python 3.12: ✅ Working - Integration tests: ✅ All scenarios covered - LocalStack compatibility: ✅ Stable .
1 parent 9e8f663 commit b1abf01

19 files changed

+952
-119
lines changed

.flake8

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
[flake8]
2+
max-line-length = 88
3+
extend-ignore = E203, W503, E501
4+
exclude =
5+
.git,
6+
__pycache__,
7+
venv,
8+
.venv,
9+
build,
10+
dist,
11+
*.egg-info
12+
per-file-ignores =
13+
__init__.py:F401
14+
tests/*:F401,F811

.github/workflows/test.yml

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
name: Tests
2+
3+
on:
4+
push:
5+
branches: [ master, develop ]
6+
pull_request:
7+
branches: [ master, develop ]
8+
9+
jobs:
10+
test:
11+
runs-on: ubuntu-latest
12+
strategy:
13+
matrix:
14+
python-version: ["3.10", "3.11", "3.12"]
15+
16+
services:
17+
kinesis:
18+
image: localstack/localstack:latest
19+
ports:
20+
- 4566:4566
21+
env:
22+
SERVICES: kinesis
23+
KINESIS_ERROR_PROBABILITY: 0.0
24+
DEBUG: 1
25+
26+
redis:
27+
image: redis:latest
28+
ports:
29+
- 6379:6379
30+
options: >-
31+
--health-cmd "redis-cli ping"
32+
--health-interval 10s
33+
--health-timeout 5s
34+
--health-retries 5
35+
36+
steps:
37+
- uses: actions/checkout@v4
38+
39+
- name: Set up Python ${{ matrix.python-version }}
40+
uses: actions/setup-python@v4
41+
with:
42+
python-version: ${{ matrix.python-version }}
43+
44+
- name: Cache pip dependencies
45+
uses: actions/cache@v3
46+
with:
47+
path: ~/.cache/pip
48+
key: ${{ runner.os }}-pip-${{ hashFiles('**/requirements.txt', '**/test-requirements.txt') }}
49+
restore-keys: |
50+
${{ runner.os }}-pip-
51+
52+
- name: Install dependencies
53+
run: |
54+
python -m pip install --upgrade pip
55+
pip install -r requirements.txt
56+
pip install -r test-requirements.txt
57+
sudo apt-get update && sudo apt-get install -y netcat-openbsd redis-tools curl
58+
59+
- name: Wait for services
60+
run: |
61+
# Give services time to start
62+
sleep 20
63+
# Wait for LocalStack port to be available (skip health check due to S3 errors)
64+
timeout 120 bash -c 'until nc -z localhost 4566; do echo "Waiting for localstack port..."; sleep 3; done'
65+
echo "LocalStack port is ready"
66+
# Wait for Redis
67+
timeout 60 bash -c 'until redis-cli -h localhost ping; do echo "Waiting for redis..."; sleep 2; done'
68+
echo "Redis is ready"
69+
70+
- name: Run tests
71+
run: |
72+
timeout 600 pytest -v --cov=kinesis --cov-report=xml --cov-report=term-missing
73+
env:
74+
ENDPOINT_URL: http://localhost:4566
75+
REDIS_HOST: localhost
76+
REDIS_PORT: 6379
77+
AWS_DEFAULT_REGION: ap-southeast-2
78+
AWS_ACCESS_KEY_ID: test
79+
AWS_SECRET_ACCESS_KEY: test
80+
81+
- name: Upload coverage to Codecov
82+
uses: codecov/codecov-action@v3
83+
with:
84+
file: ./coverage.xml
85+
flags: unittests
86+
name: codecov-umbrella
87+
fail_ci_if_error: false
88+
89+
test-docker:
90+
runs-on: ubuntu-latest
91+
steps:
92+
- uses: actions/checkout@v4
93+
94+
- name: Run tests in Docker
95+
run: |
96+
docker compose up --build --abort-on-container-exit --exit-code-from test
97+
98+
lint:
99+
runs-on: ubuntu-latest
100+
steps:
101+
- uses: actions/checkout@v4
102+
103+
- name: Set up Python
104+
uses: actions/setup-python@v4
105+
with:
106+
python-version: "3.12"
107+
108+
- name: Install dependencies
109+
run: |
110+
python -m pip install --upgrade pip
111+
pip install black isort flake8
112+
113+
- name: Check code formatting with black
114+
run: black --check --diff kinesis tests
115+
116+
- name: Check import sorting with isort
117+
run: isort --check-only --diff kinesis tests
118+
119+
- name: Lint with flake8
120+
run: flake8 kinesis tests --max-line-length=88 --extend-ignore=E203,W503

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
venv/
12
dist
23
*.egg-info
34
.idea
@@ -10,4 +11,5 @@ temp.py
1011
.mypy_cache
1112
.tox
1213
release.sh
13-
test_*
14+
test_*
15+
CLAUDE.md

Dockerfile

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
FROM python:3.7-slim
1+
FROM python:3.12-slim
22

3-
RUN apt-get update && apt-get install -y gcc python-dev gettext-base
3+
RUN apt-get update && apt-get install -y gcc python3-dev gettext-base
44

55
RUN mkdir /app
66

@@ -13,7 +13,6 @@ COPY test-requirements.txt /app/test-requirements.txt
1313
RUN pip install -r /app/test-requirements.txt
1414

1515
COPY kinesis /app/kinesis/
16-
17-
COPY tests.py /app/tests.py
16+
COPY tests /app/tests/
1817

1918
WORKDIR /app/

docker-compose.yaml

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
version: '3.3'
21
services:
32

43
kinesis:
5-
image: vsouza/kinesis-local:latest
4+
image: instructure/kinesalite:latest
65
command: --port 4567 --shardLimit 10000
76
restart: always
87
ports:
@@ -15,10 +14,11 @@ services:
1514

1615
test:
1716
container_name: async-kinesis-test
18-
command: ['nosetests']
17+
command: ['pytest', '-v', '--cov=kinesis']
1918
volumes:
20-
- ./tests.py:/app/tests.py
19+
- ./tests:/app/tests
2120
- ./kinesis:/app/kinesis
21+
- ./pytest.ini:/app/pytest.ini
2222
build:
2323
context: .
2424
dockerfile: Dockerfile
@@ -27,8 +27,9 @@ services:
2727
- ENDPOINT_URL=http://kinesis:4567
2828
- REDIS_HOST=redis
2929
- REDIS_PORT=6379
30-
- AWS_ACCESS_KEY_ID=
31-
- AWS_SECRET_ACCESS_KEY=
32-
links:
33-
- kinesis:kinesis
34-
- redis:redis
30+
- AWS_ACCESS_KEY_ID=test
31+
- AWS_SECRET_ACCESS_KEY=test
32+
depends_on:
33+
- kinesis
34+
- redis
35+

kinesis/base.py

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
import asyncio
22
import logging
3-
from async_timeout import timeout
3+
import time
4+
from typing import Optional, Any, Dict, List, Union
5+
6+
from .timeout_compat import timeout
47
from aiobotocore.session import AioSession
58
from asyncio import CancelledError
69
from botocore.exceptions import ClientError
710
from botocore.config import Config
8-
import time
911

1012
from . import exceptions
1113

@@ -15,37 +17,37 @@
1517
class Base:
1618
def __init__(
1719
self,
18-
stream_name,
19-
session=None,
20-
endpoint_url=None,
21-
region_name=None,
22-
retry_limit=None,
23-
expo_backoff=None,
24-
expo_backoff_limit=120,
25-
skip_describe_stream=False,
26-
create_stream=False,
27-
create_stream_shards=1,
28-
):
29-
30-
self.stream_name = stream_name
20+
stream_name: str,
21+
session: Optional[AioSession] = None,
22+
endpoint_url: Optional[str] = None,
23+
region_name: Optional[str] = None,
24+
retry_limit: Optional[int] = None,
25+
expo_backoff: Optional[float] = None,
26+
expo_backoff_limit: int = 120,
27+
skip_describe_stream: bool = False,
28+
create_stream: bool = False,
29+
create_stream_shards: int = 1,
30+
) -> None:
31+
32+
self.stream_name: str = stream_name
3133

3234
if session:
3335
assert isinstance(session, AioSession)
34-
self.session = session
36+
self.session: AioSession = session
3537
else:
3638
self.session = AioSession()
3739

38-
self.endpoint_url = endpoint_url
39-
self.region_name = region_name
40+
self.endpoint_url: Optional[str] = endpoint_url
41+
self.region_name: Optional[str] = region_name
4042

41-
self.client = None
42-
self.shards = None
43+
self.client: Optional[Any] = None # aiobotocore client
44+
self.shards: Optional[List[Dict[str, Any]]] = None
4345

44-
self.stream_status = None
46+
self.stream_status: Optional[str] = None
4547

46-
self.retry_limit = retry_limit
47-
self.expo_backoff = expo_backoff
48-
self.expo_backoff_limit = expo_backoff_limit
48+
self.retry_limit: Optional[int] = retry_limit
49+
self.expo_backoff: Optional[float] = expo_backoff
50+
self.expo_backoff_limit: int = expo_backoff_limit
4951

5052
# connection states of kinesis client
5153
self.RECONNECT = "RECONNECT"
@@ -60,7 +62,7 @@ def __init__(
6062
self.create_stream = create_stream
6163
self.create_stream_shards = create_stream_shards
6264

63-
async def __aenter__(self):
65+
async def __aenter__(self) -> "Base":
6466

6567
log.info(
6668
"creating client with {}".format(
@@ -78,12 +80,12 @@ async def __aenter__(self):
7880

7981
return self
8082

81-
async def __aexit__(self, exc_type, exc, tb):
83+
async def __aexit__(self, exc_type: Any, exc: Any, tb: Any) -> None:
8284
await self.close()
8385
await self.client.__aexit__(exc_type, exc, tb)
8486

8587
@property
86-
def address(self):
88+
def address(self) -> Dict[str, str]:
8789
"""
8890
Return address of stream, either as StreamName or StreamARN, when applicable.
8991

kinesis/checkpointers.py

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,45 @@
33
import os
44
import json
55
from datetime import timezone, datetime
6+
from typing import Protocol, Optional, Union, Dict, Tuple, Any
67

78
log = logging.getLogger(__name__)
89

910

11+
class CheckPointer(Protocol):
12+
"""Protocol for checkpointer implementations."""
13+
14+
async def allocate(self, shard_id: str) -> Tuple[bool, Optional[str]]:
15+
"""Allocate a shard for processing."""
16+
...
17+
18+
async def deallocate(self, shard_id: str) -> None:
19+
"""Deallocate a shard."""
20+
...
21+
22+
async def checkpoint(self, shard_id: str, sequence_number: str) -> None:
23+
"""Checkpoint progress for a shard."""
24+
...
25+
26+
def get_all_checkpoints(self) -> Dict[str, str]:
27+
"""Get all checkpoints."""
28+
...
29+
30+
async def close(self) -> None:
31+
"""Close the checkpointer."""
32+
...
33+
34+
1035
class BaseCheckPointer:
11-
def __init__(self, name="", id=None):
12-
self._id = id if id else os.getpid()
13-
self._name = name
14-
self._items = {}
36+
def __init__(self, name: str = "", id: Optional[Union[str, int]] = None) -> None:
37+
self._id: Union[str, int] = id if id else os.getpid()
38+
self._name: str = name
39+
self._items: Dict[str, Any] = {}
1540

16-
def get_id(self):
41+
def get_id(self) -> Union[str, int]:
1742
return self._id
1843

19-
def get_ref(self):
44+
def get_ref(self) -> str:
2045
return "{}/{}".format(self._name, self._id)
2146

2247
def get_all_checkpoints(self):
@@ -84,6 +109,9 @@ def is_allocated(self, shard_id):
84109
return shard_id in self._items and self._items[shard_id]["active"]
85110

86111
async def allocate(self, shard_id):
112+
if self.is_allocated(shard_id):
113+
return False, None
114+
87115
if shard_id not in self._items:
88116
self._items[shard_id] = {"sequence": None}
89117

@@ -117,9 +145,9 @@ def __init__(
117145
)
118146

119147
if is_cluster:
120-
from aredis import StrictRedisCluster as Redis
148+
from redis.asyncio.cluster import RedisCluster as Redis
121149
else:
122-
from aredis import StrictRedis as Redis
150+
from redis.asyncio import Redis
123151

124152
params = {
125153
"host": os.environ.get("REDIS_HOST", "localhost"),

0 commit comments

Comments
 (0)