Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 69 additions & 30 deletions sdks/python/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

"""Pytest configuration and custom hooks."""

import gc
import os
import sys
import threading
import time
from types import SimpleNamespace

import pytest
Expand All @@ -34,6 +37,12 @@ def pytest_addoption(parser):
'--test-pipeline-options',
help='Options to use in test pipelines. NOTE: Tests may '
'ignore some or all of these options.')
parser.addoption(
'--enable-test-cleanup',
action='store_true',
default=False,
help='Enable expensive cleanup operations. Auto-enabled in CI by default. '
'Use this flag to explicitly enable cleanup in local development.')


# See pytest.ini for main collection rules.
Expand Down Expand Up @@ -101,56 +110,86 @@ def configure_beam_rpc_timeouts():
print("Successfully configured Beam RPC timeouts")


def _running_in_ci():
"""Returns True if running in a CI environment."""
return (
os.getenv('GITHUB_ACTIONS') == 'true' or
os.getenv('CI') == 'true' or
os.getenv('CONTINUOUS_INTEGRATION') == 'true'
)


def _should_enable_test_cleanup(config):
"""Returns True if expensive cleanup operations should run.

Result is cached on config object to avoid re-computation per test.
"""
if hasattr(config, '_should_enable_test_cleanup_result'):
return config._should_enable_test_cleanup_result

if config.getoption('--enable-test-cleanup'):
result = True
reason = "enabled via --enable-test-cleanup"
else:
if _running_in_ci():
result = True
reason = "CI detected"
else:
result = False
reason = "local development"

# Log once per session
if not hasattr(config, '_cleanup_decision_logged'):
print(f"\n[Test Cleanup] Enabled: {result} ({reason})")
config._cleanup_decision_logged = True

config._should_enable_test_cleanup_result = result
return result


@pytest.fixture(autouse=True)
def ensure_clean_state():
def ensure_clean_state(request):
"""
Ensure clean state before each test
to prevent cross-test contamination.
Ensures clean state between tests to prevent contamination.

Expensive operations (sleeps, extra GC) only run in CI or when
explicitly enabled to keep local tests fast.
"""
import gc
import threading
import time
enable_cleanup = _should_enable_test_cleanup(request.config)

# Force garbage collection to clean up any lingering resources
gc.collect()
if enable_cleanup:
gc.collect()

# Log active thread count for debugging
thread_count = threading.active_count()
if thread_count > 50: # Increased threshold since we see 104 threads
if thread_count > 50:
print(f"Warning: {thread_count} active threads detected before test")

# Force a brief pause to let threads settle
time.sleep(0.5)
gc.collect()
if enable_cleanup:
time.sleep(0.5)
gc.collect()

yield

# Enhanced cleanup after test
try:
# Force more aggressive cleanup
gc.collect()

# Brief pause to let any async operations complete
time.sleep(0.1)

# Additional garbage collection
gc.collect()
if enable_cleanup:
gc.collect()
time.sleep(0.1)
gc.collect()
except Exception as e:
print(f"Warning: Cleanup error: {e}")


@pytest.fixture(autouse=True)
def enhance_mock_stability():
"""Enhance mock stability in DinD environment."""
import time
def enhance_mock_stability(request):
"""Improves mock stability in DinD environment."""
enable_cleanup = _should_enable_test_cleanup(request.config)

# Brief pause before test to ensure clean mock state
time.sleep(0.05)
if enable_cleanup:
time.sleep(0.05)

yield

# Brief pause after test to let mocks clean up
time.sleep(0.05)
if enable_cleanup:
time.sleep(0.05)


def pytest_configure(config):
Expand Down
Loading