diff --git a/.github/workflows/deploy.yaml b/.github/workflows/deploy.yaml index b6d633a..4d8e76e 100644 --- a/.github/workflows/deploy.yaml +++ b/.github/workflows/deploy.yaml @@ -4,9 +4,11 @@ on: push: branches: - main + - staging pull_request: branches: - main + - staging jobs: build-and-deploy: diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..17619f2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.env +*.ipynb \ No newline at end of file diff --git a/README.md b/README.md index 1d9c157..d6cac9b 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# πŸš€ Serverless Browser Agents with Playwright + Lambda + Browserbase +## πŸš€ Serverless Browser Agents with Playwright + Lambda + Browserbase *Spin up headless browsers on AWS in under a minuteβ€”no layers, no EC2, no pain.* [![Build](https://github.com/derekmeegan/browserbase-lambda-playwright/actions/workflows/deploy.yaml/badge.svg)](../../actions/workflows/deploy.yaml) @@ -6,8 +6,6 @@ > **Star ⭐ this repo if it saves you hours, and hit _Fork_ to make it yours in seconds.** ---- - ## ⚑ TL;DR Quick-Start ### Option A: Local Deployment @@ -17,106 +15,129 @@ git clone https://github.com/your-username/browserbase-lambda-playwright.git cd browserbase-lambda-playwright -# 2. Export AWS & Browserbase secrets -export AWS_ACCESS_KEY_ID=... -export AWS_SECRET_ACCESS_KEY=... -export BROWSERBASE_API_KEY=... -export BROWSERBASE_PROJECT_ID=... - -# 3. Create AWS Secrets Manager entries -aws secretsmanager create-secret \ - --name BrowserbaseLambda/BrowserbaseApiKey \ - --secret-string '{"BROWSERBASE_API_KEY":"'"$BROWSERBASE_API_KEY"'"}' - -aws secretsmanager create-secret \ - --name BrowserbaseLambda/BrowserbaseProjectId \ - --secret-string '{"BROWSERBASE_PROJECT_ID":"'"$BROWSERBASE_PROJECT_ID"'"}' - -# 4. Deploy (creates the Lambda + IAM + Secrets wiring) +# 2. Deploy infrastructure +env | grep AWS || export AWS_ACCESS_KEY_ID=... && export AWS_SECRET_ACCESS_KEY=... cd infra && pip install -r requirements.txt && cdk deploy --all --require-approval never + +# 3. Fetch API details from CloudFormation outputs +echo "export API_ENDPOINT_URL=$(aws cloudformation describe-stacks \ + --stack-name BrowserbaseLambdaStack \ + --query 'Stacks[0].Outputs[?OutputKey==`ApiEndpointUrl`].OutputValue' \ + --output text)" + +echo "export API_KEY=$(aws apigateway get-api-key \ + --api-key $(aws cloudformation describe-stacks --stack-name BrowserbaseLambdaStack \ + --query 'Stacks[0].Outputs[?OutputKey==`ApiKeyId`].OutputValue' --output text) \ + --include-value \ + --query 'value' \ + --output text)" + +# 4. Install example dependencies and run quick start +pip install -r examples/requirements.txt +python examples/quick_start.py ``` ### Option B: GitHub Actions Deployment ```bash -# 1. Create your own repository -# Either fork this repository on GitHub or create a new one and push this code +# 1. Fork or push this repo to your GitHub account +# 2. Add repository secrets under Settings β†’ Secrets & variables β†’ Actions: +# - AWS_ACCESS_KEY +# - AWS_SECRET_ACCESS_KEY +# 3. Create Browserbase secrets in AWS Secrets Manager (see infra/stack.py env names) +# 4. Push to main β†’ GitHub Actions triggers CDK deploy +``` -# 2. Add GitHub repository secrets -# Go to your repository β†’ Settings β†’ Secrets and variables β†’ Actions β†’ New repository secret -# Add these secrets: -# - AWS_ACCESS_KEY: Your AWS Access Key ID -# - AWS_SECRET_ACCESS_KEY: Your AWS Secret Access Key +--- -# 3. Create AWS Secrets Manager entries (same as Option A step 3) +You now have a Lambda that opens a Browserbase session and runs Playwright code from **`lambdas/scraper/scraper.py`**. +Invoke it with: -# 4. Push to main branch to trigger deployment -git push origin main +```bash +curl -X POST "$API_ENDPOINT_URL" \ + -H "Content-Type: application/json" \ + -H "x-api-key: $API_KEY" \ + -d '{"url":"https://news.ycombinator.com/"}' \ + -v + +# …then poll status: +curl -H "x-api-key: $API_KEY" "$API_ENDPOINT_URL/" ``` -You now have a Lambda that opens a Browserbase session and runs Playwright code from **`src/scraper.py`**. -Invoke it with: +**OR** -```bash -aws lambda invoke \ - --function-name \ - --payload '{"url":"https://news.ycombinator.com/"}' \ - response.json && cat response.json | jq +``` +pip install -r examples/requirements.txt +python examples/quick_start.py ``` ---- +## πŸ”„ Serverless Async Architecture + +1. **POST /scrape** returns **202 Accepted** immediately. +2. Job metadata is stored in DynamoDB (`JobStatusTable`) with status updates (PENDINGΒ β†’ RUNNINGΒ β†’ SUCCESS/FAILED). +3. **GET /scrape/{jobId}** polls DynamoDB for the latest job result. ## πŸš€ Why use this template? -* **Zero binary juggling** – Playwright lives in the Docker image; heavy Chrome lives on Browserbase. -* **Cold-start β‰ˆ 2 s** – no browser download, just connect-over-CDP. +* **Zero binary juggling** – Playwright lives in the Lambda image; Chrome runs remotely on Browserbase. +* **Cold-start β‰ˆΒ 2Β s** – no browser download, just connect-over-CDP. * **Pay-per-run** – pure Lambda pricing; scale by upgrading Browserbase, not infra. -* **Built-in CI/CD** – GitHub Actions deploys on every push to `main`. - ---- +* **Async, serverless** – fire-and-forget POST, durable job tracking via DynamoDB. +* **Built-in CI/CD** – GitHub Actions deploys on every push to `main`/`staging`. ## πŸ—οΈ High-Level Architecture -``` +```text β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” CDP (WebSocket) β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ AWS Lambda β”‚ ────────────────▢ β”‚ Browserbaseβ”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ Logs β–Ό AWS CloudWatch -``` - ---- + β”‚ + β–Ό + Amazon DynamoDB (JobStatusTable) +``` ## πŸ“¦ Project Layout ``` . -β”œβ”€β”€ .github/workflows/deploy.yaml # CI/CD pipeline -β”œβ”€β”€ infra/ # CDK IaC -β”‚ β”œβ”€β”€ app.py -β”‚ β”œβ”€β”€ stack.py +β”œβ”€β”€ .github/workflows/deploy.yaml +β”œβ”€β”€ examples/ +β”‚ β”œβ”€β”€ quick_start.py β”‚ └── requirements.txt -└── src/ - β”œβ”€β”€ Dockerfile # Lambda image - β”œβ”€β”€ scraper.py # Playwright logic - └── requirements.txt +β”œβ”€β”€ infra/ +β”‚ β”œβ”€β”€ app.py +β”‚ β”œβ”€β”€ cdk.json +β”‚ β”œβ”€β”€ requirements.txt +β”‚ └── stack.py +β”œβ”€β”€ lambdas/ +β”‚ β”œβ”€β”€ getter/ +β”‚ β”‚ β”œβ”€β”€ Dockerfile +β”‚ β”‚ β”œβ”€β”€ getter.py +β”‚ β”‚ └── requirements.txt +β”‚ └── scraper/ +β”‚ β”œβ”€β”€ Dockerfile +β”‚ β”œβ”€β”€ scraper.py +β”‚ └── requirements.txt +β”œβ”€β”€ .gitignore +β”œβ”€β”€ README.md +└── LICENSE ``` ---- -
πŸ” Full Setup & Prerequisites ### Requirements -| Tool | Version | -| --- | --- | -| AWS CLI | any 2.x | -| Docker | β‰₯ 20.10 | -| Node & npm | any LTS (for CDK) | -| Python | 3.12+ | -| Browserbase account | free tier works | +| Tool | Version | +| ------------------------- | ------------ | +| AWS CLI | any 2.x | +| Docker | β‰₯ 20.10 | +| Node & npm | any LTS | +| Python | 3.12+ | +| Browserbase account | free tier OK | ### 1. Install the AWS CLI @@ -145,7 +166,7 @@ aws secretsmanager create-secret \ --secret-string '{"BROWSERBASE_PROJECT_ID":"$BROWSERBASE_PROJECT_ID"}' ``` -### 4. Local Playwright install (optional for dev) +### 4. (Optional) Local Playwright install ```bash pip install playwright && python -m playwright install @@ -153,24 +174,19 @@ pip install playwright && python -m playwright install
---- - ## ❓ FAQ -| Question | Answer | -| --- | --- | -| **Does this work on Browserbase free tier?** | Yesβ€”1 concurrent session and rate-limited creation. | -| **Cold-starts?** | Typical < 2000 ms; browser runs remotely. | -| **How do I add extra Python libs?** | Add them to `src/requirements.txt`, rebuild, pushβ€”GitHub Actions redeploys. | - ---- +| Question | Answer | +| ------------------------------------------------- | ------------------------------------------------------------------------------------ | +| **Browserbase free tier?** | Yesβ€”1 concurrent session; creation rate‑limited. | +| **Cold‑starts?** | Typical <Β 2β€―s (CDP connect, no browser download). | +| **Add extra Python libs?** | Add to `lambdas//requirements.txt`, rebuild images, push β†’ redeploy. | +| **API returns 202 Acceptedβ€”how to track status?** | Poll `GET /scrape/{jobId}` to read status/results from DynamoDB. | ## 🀝 Contributing Pull requests are welcome! Please open an issue first if you plan a large change. ---- - ## πŸ“„ License This project is licensed under the MIT License – see the [LICENSE](LICENSE) file for details. diff --git a/examples/quick_start.py b/examples/quick_start.py new file mode 100644 index 0000000..0ecf6d1 --- /dev/null +++ b/examples/quick_start.py @@ -0,0 +1,145 @@ +import requests +import json +import uuid +import time +import os +import sys +from dotenv import load_dotenv + +# Load environment variables from .env file +load_dotenv() + +# --- Configuration --- +API_ENDPOINT_URL = os.getenv("API_ENDPOINT_URL") +API_KEY = os.getenv("API_KEY") + +# Default URL to scrape if not provided otherwise +DEFAULT_URL_TO_SCRAPE = "https://news.ycombinator.com/" + +# Polling configuration +POLL_INTERVAL_SECONDS = 2 +MAX_POLL_ATTEMPTS = 50 + +# --- Helper Functions --- +def submit_job(job_id: str, url: str, endpoint_url: str, api_key: str) -> bool: + """Submits a new scrape job to the API.""" + headers = { + 'Content-Type': 'application/json', + 'x-api-key': api_key + } + payload = { + 'jobId': job_id, + 'url': url + } + print(f"Submitting job {job_id} for URL: {url}...") + try: + response = requests.post(endpoint_url, headers=headers, json=payload) + response.raise_for_status() + + if response.status_code == 202: + print(f"Job {job_id} accepted successfully (Status Code: {response.status_code}).") + return True + else: + print(f"Unexpected success status code: {response.status_code}") + print(f"Response body: {response.text}") + return False + + except requests.exceptions.RequestException as e: + print(f"Error submitting job {job_id}: {e}") + if hasattr(e, 'response') and e.response is not None: + print(f"Response status: {e.response.status_code}") + print(f"Response body: {e.response.text}") + return False + +def get_job_status(job_id: str, endpoint_url_base: str, api_key: str) -> dict | None: + """Polls the API to get the status and results of a job.""" + endpoint = f"{endpoint_url_base}/{job_id}" + headers = { + 'x-api-key': api_key + } + status_check_timeout = 20 + + print(f"Checking status for job {job_id}...") + try: + response = requests.get(endpoint, headers=headers, timeout=status_check_timeout) + response.raise_for_status() + + if response.status_code == 200: + return response.json() + + except requests.exceptions.HTTPError as e: + if e.response.status_code == 404: + print(f"Job {job_id} not found yet (Status 404). Might still be processing.") + return None # Job might not have been processed yet or ID is wrong + else: + print(f"HTTP error getting status for job {job_id}: {e}") + print(f"Response body: {e.response.text}") + return {"status": "ERROR_CHECKING", "error": str(e)} # Indicate error checking status + except requests.exceptions.Timeout: + print(f"Timeout waiting for status response for job {job_id}.") + return None # Indicate timeout, can retry + except requests.exceptions.RequestException as e: + print(f"Network error getting status for job {job_id}: {e}") + return {"status": "ERROR_CHECKING", "error": str(e)} + except json.JSONDecodeError as e: + print(f"Error decoding JSON response for job {job_id}: {e}") + return {"status": "ERROR_CHECKING", "error": f"Invalid JSON response: {e}"} + +def main(url: str = None): + """Main execution logic.""" + if not API_ENDPOINT_URL or not API_KEY: + print("Error: Environment variables API_ENDPOINT_URL and API_KEY must be set.") + print("Please export them before running the script:") + print(" export API_ENDPOINT_URL='https://your-api-id.execute-api.your-region.amazonaws.com/v1/scrape'") + print(" export API_KEY='your-actual-api-key-value'") + sys.exit(1) + + if url is None: + url = DEFAULT_URL_TO_SCRAPE + + # 1. Generate a unique Job ID + job_id = str(uuid.uuid4()) + print(f"Generated Job ID: {job_id}") + + # 2. Submit the job + if submit_job(job_id, url, API_ENDPOINT_URL, API_KEY): + # 3. Poll for results + print(f"\nPolling for job completion every {POLL_INTERVAL_SECONDS} seconds...") + attempts = 0 + final_result = None + while attempts < MAX_POLL_ATTEMPTS: + attempts += 1 + # Wait *before* polling (except the first time) + if attempts > 1: + time.sleep(POLL_INTERVAL_SECONDS) + + result = get_job_status(job_id, API_ENDPOINT_URL, API_KEY) + + if result: + status = result.get('status', 'UNKNOWN') + print(f"Polling attempt {attempts}/{MAX_POLL_ATTEMPTS}: Status = {status}") + if status in ["SUCCESS", "FAILED", "ERROR_CHECKING"]: + final_result = result + break + else: + print(f"Polling attempt {attempts}/{MAX_POLL_ATTEMPTS}: No status update yet or check failed.") + + # 4. Print final result + print("\n--- Final Result ---") + if final_result: + print(json.dumps(final_result, indent=2)) + final_status = final_result.get('status') + if final_status == 'FAILED': + print("\nJob failed.") + elif final_status == 'SUCCESS': + print("\nJob completed successfully.") + else: # Handle ERROR_CHECKING or UNKNOWN + print(f"\nJob polling finished with status: {final_status}") + else: + print(f"Job {job_id} did not reach a final state (SUCCESS/FAILED) within the polling time limit ({MAX_POLL_ATTEMPTS * POLL_INTERVAL_SECONDS} seconds).") + print("It might still be running or encountered an issue. Check status manually later:") + print(f" curl -H \"x-api-key: {API_KEY}\" {API_ENDPOINT_URL}/{job_id}") + +if __name__ == "__main__": + url = None + main(url) diff --git a/examples/requirements.txt b/examples/requirements.txt new file mode 100644 index 0000000..2ed6683 --- /dev/null +++ b/examples/requirements.txt @@ -0,0 +1,2 @@ +requests==2.32.3 +python-dotenv==1.1.0 \ No newline at end of file diff --git a/infra/stack.py b/infra/stack.py index 43a6665..9968e31 100644 --- a/infra/stack.py +++ b/infra/stack.py @@ -2,9 +2,14 @@ Stack, aws_lambda as lambda_, aws_iam as iam, + aws_apigateway as apigateway, + aws_dynamodb as dynamodb, + aws_logs as logs, Duration, Size, aws_secretsmanager as secretsmanager, + RemovalPolicy, + CfnOutput ) from aws_cdk.aws_ecr_assets import Platform from constructs import Construct @@ -25,30 +30,218 @@ def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: "BrowserbaseLambda/BrowserbaseApiKey" ) - lambda_execution_role = iam.Role( - self, "BrowserbaseLambdaExecutionRole", + job_table = dynamodb.Table( + self, "JobStatusTable", + partition_key=dynamodb.Attribute(name="id", type=dynamodb.AttributeType.STRING), + removal_policy=RemovalPolicy.DESTROY + ) + + scraper_execution_role = iam.Role( + self, "ScraperLambdaExecutionRole", assumed_by=iam.ServicePrincipal("lambda.amazonaws.com"), managed_policies=[ iam.ManagedPolicy.from_aws_managed_policy_name("service-role/AWSLambdaBasicExecutionRole"), ] ) - browserbase_project_id_secret.grant_read(lambda_execution_role) - browserbase_api_key_secret.grant_read(lambda_execution_role) + browserbase_project_id_secret.grant_read(scraper_execution_role) + browserbase_api_key_secret.grant_read(scraper_execution_role) + job_table.grant_read_write_data(scraper_execution_role) + + getter_execution_role = iam.Role( + self, "GetterLambdaExecutionRole", + assumed_by=iam.ServicePrincipal("lambda.amazonaws.com"), + managed_policies=[ + iam.ManagedPolicy.from_aws_managed_policy_name("service-role/AWSLambdaBasicExecutionRole"), + ] + ) + job_table.grant_read_data(getter_execution_role) browserbase_lambda = lambda_.DockerImageFunction( self, "BrowserbaseLambda", code=lambda_.DockerImageCode.from_image_asset( - "../src", + "../lambdas/scraper", cmd=["scraper.lambda_handler"], platform=Platform.LINUX_ARM64 ), architecture=lambda_.Architecture.ARM_64, - role=lambda_execution_role, + role=scraper_execution_role, timeout=Duration.minutes(15), memory_size=512, ephemeral_storage_size=Size.gibibytes(1), environment={ "BROWSERBASE_PROJECT_ID_ARN": browserbase_project_id_secret.secret_arn, - "BROWSERBASE_API_KEY_SECRET_ARN": browserbase_api_key_secret.secret_arn + "BROWSERBASE_API_KEY_SECRET_ARN": browserbase_api_key_secret.secret_arn, + "JOB_STATUS_TABLE_NAME": job_table.table_name + }, + ) + + getter_lambda = lambda_.DockerImageFunction( + self, "JobStatusGetterLambda", + code=lambda_.DockerImageCode.from_image_asset( + "../lambdas/getter", + cmd=["getter.lambda_handler"], + platform=Platform.LINUX_ARM64 + ), + architecture=lambda_.Architecture.ARM_64, + role=getter_execution_role, + timeout=Duration.minutes(1), + environment={ + "JOB_STATUS_TABLE_NAME": job_table.table_name, }, + ) + + async_lambda_integration = apigateway.LambdaIntegration( + browserbase_lambda, + proxy=False, + request_parameters={ + 'integration.request.header.X-Amz-Invocation-Type': "'Event'" + }, + integration_responses=[ + apigateway.IntegrationResponse( + status_code="202", + ) + ], + ) + + getter_lambda_integration = apigateway.LambdaIntegration( + getter_lambda, + proxy=True + ) + + api_log_group = logs.LogGroup(self, "ApiGatewayAccessLogs", + retention=logs.RetentionDays.ONE_MONTH, + removal_policy=RemovalPolicy.DESTROY + ) + + api = apigateway.RestApi( + self, "BrowserbaseAsyncApi", + rest_api_name="Browserbase Async API", + description="API to trigger Browserbase Lambda asynchronously", + deploy_options=apigateway.StageOptions( + stage_name="v1", + access_log_destination=apigateway.LogGroupLogDestination(api_log_group), + access_log_format=apigateway.AccessLogFormat.json_with_standard_fields( + caller=True, + http_method=True, + ip=True, + protocol=True, + request_time=True, + resource_path=True, + response_length=True, + status=True, + user=True, + ), + logging_level=apigateway.MethodLoggingLevel.INFO, + data_trace_enabled=True + ), + cloud_watch_role=True + ) + + scrape_request_model = api.add_model("ScrapeRequestModel", + content_type="application/json", + model_name="ScrapeRequestModel", + schema=apigateway.JsonSchema( + schema=apigateway.JsonSchemaVersion.DRAFT4, + title="ScrapeRequest", + type=apigateway.JsonSchemaType.OBJECT, + required=["jobId", "url"], + properties={ + "jobId": apigateway.JsonSchema( + type=apigateway.JsonSchemaType.STRING, + description="Unique identifier for the scrape job provided by the caller" + ), + "url": apigateway.JsonSchema( + type=apigateway.JsonSchemaType.STRING, + format="uri", + description="The URL to scrape" + ) + } + ) + ) + + body_validator = api.add_request_validator("BodyValidator", + request_validator_name="ValidateRequestBody", + validate_request_body=True, + validate_request_parameters=False + ) + + params_validator = api.add_request_validator("ParameterValidator", + request_validator_name="ValidateRequestParameters", + validate_request_body=False, + validate_request_parameters=True + ) + + scrape_resource = api.root.add_resource("scrape") + scrape_resource.add_method( + "POST", + async_lambda_integration, + api_key_required=True, + request_validator=body_validator, + request_models={ + "application/json": scrape_request_model + }, + method_responses=[ + apigateway.MethodResponse(status_code="202"), + apigateway.MethodResponse(status_code="400") + ] + ) + + job_resource = scrape_resource.add_resource("{jobId}") + job_resource.add_method( + "GET", + getter_lambda_integration, + api_key_required=True, + request_validator=params_validator, + request_parameters={ + 'method.request.path.jobId': True + }, + method_responses=[ + apigateway.MethodResponse(status_code="200"), + apigateway.MethodResponse(status_code="404"), + apigateway.MethodResponse(status_code="400"), + apigateway.MethodResponse(status_code="500") + ] + ) + + api_key = api.add_api_key("ServerlessScraperApiKey", + api_key_name="serverless-scraper-api-key" + ) + + usage_plan = api.add_usage_plan("ServerlessScraperUsagePlan", + name="ServerlessScraperBasic", + throttle=apigateway.ThrottleSettings( + rate_limit=5, + burst_limit=2 + ), + api_stages=[apigateway.UsagePlanPerApiStage( + api=api, + stage=api.deployment_stage + )] + ) + usage_plan.add_api_key(api_key) + + CfnOutput( + self, "ApiEndpointUrl", + value=f"{api.url}scrape", + description="API Gateway Endpoint URL for POST /scrape" + ) + CfnOutput( + self, "ApiStatusEndpointBaseUrl", + value=f"{api.url}scrape/", + description="Base URL for GETting job status (append {jobId})" + ) + CfnOutput( + self, "ApiKeyId", + value=api_key.key_id, + description="API Key ID (use 'aws apigateway get-api-key --api-key --include-value' to retrieve the key value)" + ) + CfnOutput( + self, "JobStatusTableName", + value=job_table.table_name, + description="DynamoDB table name for job status" + ) + CfnOutput( + self, "ApiGatewayAccessLogGroupName", + value=api_log_group.log_group_name, + description="Log Group Name for API Gateway Access Logs" ) \ No newline at end of file diff --git a/lambdas/getter/Dockerfile b/lambdas/getter/Dockerfile new file mode 100644 index 0000000..5a3e34c --- /dev/null +++ b/lambdas/getter/Dockerfile @@ -0,0 +1,15 @@ +FROM public.ecr.aws/lambda/python:3.12 +ENV PYTHONUNBUFFERED=1 \ + PIP_NO_CACHE_DIR=1 + +# ── Python deps ──────────────────────────────────────────────────────────────── +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt \ + && pip install --no-cache-dir awslambdaric + +# ── Function code ────────────────────────────────────────────────────────────── +COPY . ${LAMBDA_TASK_ROOT}/ + +# ── Entrypoint ───────────────────────────────────────────────────────────────── +ENTRYPOINT ["python", "-m", "awslambdaric"] +CMD [ "getter.lambda_handler" ] \ No newline at end of file diff --git a/lambdas/getter/getter.py b/lambdas/getter/getter.py new file mode 100644 index 0000000..b449848 --- /dev/null +++ b/lambdas/getter/getter.py @@ -0,0 +1,60 @@ +import os +import json +import boto3 +import logging +from botocore.exceptions import ClientError +from decimal import Decimal + +logger = logging.getLogger() +logger.setLevel(os.environ.get("LOG_LEVEL", "INFO").upper()) + +TABLE_NAME = os.environ.get("JOB_STATUS_TABLE_NAME") +dynamodb = boto3.resource('dynamodb') +job_table = dynamodb.Table(TABLE_NAME) + +class DecimalEncoder(json.JSONEncoder): + def default(self, o): + if isinstance(o, Decimal): + if o % 1 == 0: + return int(o) + else: + return float(o) + return super(DecimalEncoder, self).default(o) + +def lambda_handler(event, context): + try: + job_id = event.get('pathParameters', {}).get('jobId') + + logger.info(f"Attempting to retrieve job status for jobId: {job_id}") + response = job_table.get_item(Key={'id': job_id}) + + if 'Item' in response: + item = response['Item'] + logger.info(f"Found item for jobId: {job_id}. Status: {item.get('status')}") + return { + 'statusCode': 200, + 'headers': { 'Content-Type': 'application/json' }, + 'body': json.dumps(item, cls=DecimalEncoder) + } + else: + logger.warning(f"No item found for jobId: {job_id}") + return { + 'statusCode': 404, + 'headers': { 'Content-Type': 'application/json' }, + 'body': json.dumps({'error': 'Job not found'}) + } + + except ClientError as e: + logger.error(f"DynamoDB error retrieving job {job_id}: {e.response['Error']['Message']}", exc_info=True) + return { + 'statusCode': 500, + 'headers': { 'Content-Type': 'application/json' }, + 'body': json.dumps({'error': 'Failed to retrieve job status due to database error'}) + } + except Exception as e: + logger.error(f"Unexpected error retrieving job {job_id}: {e}", exc_info=True) + return { + 'statusCode': 500, + 'headers': { 'Content-Type': 'application/json' }, + 'body': json.dumps({'error': 'An internal server error occurred'}) + } \ No newline at end of file diff --git a/lambdas/getter/requirements.txt b/lambdas/getter/requirements.txt new file mode 100644 index 0000000..3047b33 --- /dev/null +++ b/lambdas/getter/requirements.txt @@ -0,0 +1 @@ +boto3==1.36.22 \ No newline at end of file diff --git a/src/Dockerfile b/lambdas/scraper/Dockerfile similarity index 100% rename from src/Dockerfile rename to lambdas/scraper/Dockerfile diff --git a/src/requirements.txt b/lambdas/scraper/requirements.txt similarity index 100% rename from src/requirements.txt rename to lambdas/scraper/requirements.txt diff --git a/src/scraper.py b/lambdas/scraper/scraper.py similarity index 64% rename from src/scraper.py rename to lambdas/scraper/scraper.py index 98b1a41..e2acfa9 100644 --- a/src/scraper.py +++ b/lambdas/scraper/scraper.py @@ -1,12 +1,14 @@ import os +import uuid import json import boto3 import asyncio import logging +from datetime import datetime, timezone from botocore.exceptions import ClientError from browserbase import Browserbase, BrowserbaseError from playwright.async_api import async_playwright -from typing import Optional +from typing import Optional, Dict, Any # --- Logging Setup --- logger = logging.getLogger() @@ -16,9 +18,12 @@ # --- Configuration from Environment Variables --- BROWSERBASE_API_KEY_SECRET_ARN = os.environ.get("BROWSERBASE_API_KEY_SECRET_ARN") BROWSERBASE_PROJECT_ID_ARN = os.environ.get("BROWSERBASE_PROJECT_ID_ARN") +JOB_STATUS_TABLE_NAME = os.environ.get("JOB_STATUS_TABLE_NAME") # --- AWS Clients --- secrets_manager_client = boto3.client('secretsmanager') +dynamodb = boto3.resource('dynamodb') +job_table = dynamodb.Table(JOB_STATUS_TABLE_NAME) # --- Secret Retrieval Function --- def get_secret_value(secret_arn: str, expected_key: str) -> Optional[str]: @@ -45,6 +50,30 @@ def get_secret_value(secret_arn: str, expected_key: str) -> Optional[str]: logger.error(f"Error retrieving or parsing secret {secret_arn}: {e}", exc_info=True) return None +# --- DynamoDB Helper Function --- +def update_job_status(job_id: str, status: str, result_data: Optional[Dict[str, Any]] = None, error_message: Optional[str] = None): + """Updates the job status and results in DynamoDB.""" + + timestamp = datetime.now(timezone.utc).isoformat() + item = { + 'id': job_id, + 'status': status, + 'lastUpdatedAt': timestamp, + } + if result_data: + item.update(result_data) + if error_message: + item['errorMessage'] = error_message + + try: + logger.info(f"Updating DynamoDB for jobId {job_id} with status {status}") + job_table.put_item(Item=item) + logger.info(f"Successfully updated DynamoDB for jobId {job_id}") + except ClientError as e: + logger.error(f"Failed to update DynamoDB for jobId {job_id}: {e}", exc_info=True) + except Exception as e: + logger.error(f"An unexpected error occurred during DynamoDB update for jobId {job_id}: {e}", exc_info=True) + # --- Browserbase Session Creation (Modified for Free Tier) --- def create_browserbase_session(): """Creates a basic Browserbase session compatible with the free tier.""" @@ -77,19 +106,26 @@ def create_browserbase_session(): raise # --- Scraper Function --- -async def scrape_page(event): +async def scrape_page(payload: dict): """ AWS Lambda handler function to run a simple Playwright task via Browserbase. """ - logger.info(f"Received event: {json.dumps(event)}") + logger.info(f"Received event: {json.dumps(payload)}") + job_id = payload.get("jobId", str(uuid.uuid4())) + target_url = payload.get("url", "https://news.ycombinator.com/") - target_url = event.get("url", "https://news.ycombinator.com/") + initial_data = { + 'requestedUrl': target_url, + 'receivedAt': datetime.now(timezone.utc).isoformat() + } + update_job_status(job_id, "PENDING", result_data=initial_data) playwright = None browser = None - page_title = None session_id = None - status = "failed" + error_info = None + final_status = "failed" + results = {} try: # 1. Create Browserbase Session @@ -97,6 +133,9 @@ async def scrape_page(event): session = create_browserbase_session() connect_url = session.connect_url session_id = session.id + initial_data['sessionId'] = session_id + + update_job_status(job_id, "RUNNING", result_data=initial_data) logger.info(f"Successfully created Browserbase session: {session_id}") # 2. Connect using Playwright @@ -125,36 +164,49 @@ async def scrape_page(event): content_length = len(await page.content()) logger.info(f"Page content length: {content_length}") - status = "success" + results = { + 'pageTitle': page_title, + 'contentLength': content_length, + } + final_status = "SUCCESS" logger.info("Playwright automation task completed successfully.") except BrowserbaseError as e: - logger.error(f"Browserbase API error: {e}", exc_info=True) - return {'status': 'failed', 'error': f"Browserbase API error: {e}", 'session_id': session_id} + error_info = f"Browserbase API error: {e}" + logger.error(error_info, exc_info=True) except TimeoutError as e: - logger.error(f"Playwright timeout error: {e}", exc_info=True) - return {'status': 'failed', 'error': f"Playwright timeout: {e}", 'session_id': session_id} + error_info = f"Playwright timeout error: {e}" + logger.error(error_info, exc_info=True) except Exception as e: - logger.error(f"An unexpected error occurred: {e}", exc_info=True) - return {'status': 'failed', 'error': f"Unexpected error: {e}", 'session_id': session_id} + error_info = f"An unexpected error occurred: {e}" + logger.error(error_info, exc_info=True) finally: - # 4. Cleanup + # 4. Update DynamoDB + final_data = initial_data.copy() + final_data.update(results) + update_job_status(job_id, final_status, result_data=final_data, error_message=error_info) + + # 5. Cleanup if browser and browser.is_connected(): logger.info("Closing browser connection...") await browser.close() if playwright: logger.info("Stopping Playwright...") await playwright.stop() - logger.info("Lambda execution finished.") - return { - 'status': status, - 'session_id': session_id, - 'requested_url': target_url, - 'page_title': page_title, - 'content_length': content_length - } + logger.info(f"scrape_page finished for jobId: {job_id} with status: {final_status}") + + return {'jobId': job_id, 'finalStatus': final_status} # --- Lambda Handler --- def lambda_handler(event, context): - return asyncio.run(scrape_page(event)) + """ + Handles API Gateway async invocation. Parses request, triggers scrape_page, + and returns immediately (result handling is done via DynamoDB). + """ + logger.info(f"Lambda handler invoked with event: {json.dumps(event)}") + + result = asyncio.run(scrape_page(event)) + logger.info(f"Lambda handler completed for jobId: {result.get('jobId')}. Scraper status (for logs): {result.get('finalStatus')}") + + return {'status': 'accepted', 'jobId': result.get('jobId')}