Skip to content

Commit 6e49a0d

Browse files
maxxgxCopilot
andauthored
feat(inspect): show dataset snapshot (#3168)
* add snapshot download endpoint Signed-off-by: Ma, Xiangxiang <[email protected]> * fix snapshot timestamp and test Signed-off-by: Ma, Xiangxiang <[email protected]> * remove camera macos workaround Signed-off-by: Ma, Xiangxiang <[email protected]> * address comments Signed-off-by: Ma, Xiangxiang <[email protected]> * Update application/backend/src/api/dependencies.py Co-authored-by: Copilot <[email protected]> Signed-off-by: Max Xiang <[email protected]> * rename to item Signed-off-by: Ma, Xiangxiang <[email protected]> * fix test --------- Signed-off-by: Ma, Xiangxiang <[email protected]> Signed-off-by: Max Xiang <[email protected]> Co-authored-by: Copilot <[email protected]>
1 parent efd0710 commit 6e49a0d

File tree

15 files changed

+200
-99
lines changed

15 files changed

+200
-99
lines changed

application/backend/pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ dependencies = [
2323
"uvloop==0.21.0",
2424
"loguru==0.7.3",
2525
"trackio~=0.6.0",
26-
"cv2-enumerate-cameras==1.3.0",
26+
"sse-starlette~=3.0",
27+
"cv2-enumerate-cameras==1.3.1",
2728
"pyarrow~=21.0.0",
2829
]
2930

application/backend/src/alembic/versions/7a213a27d666_initial_schema.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ def upgrade() -> None:
103103
sa.Column("id", sa.Text(), nullable=False),
104104
sa.Column("project_id", sa.String(), nullable=False),
105105
sa.Column("filename", sa.Text(), nullable=False),
106+
sa.Column("count", sa.Integer(), nullable=False),
106107
sa.Column("created_at", sa.DateTime(), server_default=sa.text("(CURRENT_TIMESTAMP)"), nullable=False),
107108
sa.PrimaryKeyConstraint("id"),
108109
)

application/backend/src/api/dependencies.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,11 @@ async def get_job_id(job_id: str) -> UUID:
154154
return get_uuid(job_id, "job")
155155

156156

157+
async def get_snapshot_id(snapshot_id: str) -> UUID:
158+
"""Initializes and validates a Snapshot ID"""
159+
return get_uuid(snapshot_id, "snapshot")
160+
161+
157162
async def get_webrtc_manager(request: Request) -> WebRTCManager:
158163
"""Provides the global WebRTCManager instance from FastAPI application's state."""
159164
return request.app.state.webrtc_manager
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
# Copyright (C) 2025 Intel Corporation
2+
# SPDX-License-Identifier: Apache-2.0
3+
from typing import Annotated
4+
from uuid import UUID
5+
6+
from fastapi import APIRouter, Depends, HTTPException, status
7+
from fastapi.responses import FileResponse
8+
9+
from api.dependencies import get_project_id, get_snapshot_id
10+
from api.endpoints import API_PREFIX
11+
from pydantic_models.dataset_snapshot import DatasetSnapshot, DatasetSnapshotList
12+
from repositories.binary_repo import DatasetSnapshotBinaryRepository
13+
from services.dataset_snapshot_service import DatasetSnapshotService
14+
15+
router = APIRouter(
16+
prefix=API_PREFIX + "/projects/{project_id}",
17+
tags=["snapshots"],
18+
)
19+
20+
21+
@router.get(
22+
"/snapshots",
23+
responses={
24+
status.HTTP_200_OK: {"description": "List of available dataset snapshots"},
25+
},
26+
response_model_exclude_none=True,
27+
)
28+
async def get_snapshot_list(
29+
project_id: Annotated[UUID, Depends(get_project_id)],
30+
) -> DatasetSnapshotList:
31+
"""Endpoint to get list of all snapshots"""
32+
snapshots = await DatasetSnapshotService.list_snapshots(project_id=project_id)
33+
return DatasetSnapshotList(snapshots=snapshots)
34+
35+
36+
@router.get(
37+
"/snapshots/{snapshot_id}",
38+
response_model_exclude_none=True,
39+
)
40+
async def get_snapshot(
41+
project_id: Annotated[UUID, Depends(get_project_id)],
42+
snapshot_id: UUID,
43+
) -> DatasetSnapshot:
44+
"""Endpoint to get snapshot file by ID"""
45+
return await DatasetSnapshotService.get_snapshot(project_id=project_id, snapshot_id=snapshot_id)
46+
47+
48+
@router.get(
49+
"/snapshots/{snapshot_id}/parquet",
50+
response_model_exclude_none=True,
51+
responses={
52+
status.HTTP_200_OK: {"description": "Snapshot Parquet file retrieved successfully"},
53+
status.HTTP_404_NOT_FOUND: {"description": "Snapshot not found"},
54+
},
55+
)
56+
async def get_snapshot_file(
57+
project_id: Annotated[UUID, Depends(get_project_id)],
58+
snapshot_id: Annotated[UUID, Depends(get_snapshot_id)],
59+
) -> FileResponse:
60+
"""Endpoint to get snapshot file by ID"""
61+
try:
62+
snapshot_bin_repo = DatasetSnapshotBinaryRepository(project_id=project_id)
63+
snapshot_path = snapshot_bin_repo.get_snapshot_path(snapshot_id)
64+
return FileResponse(snapshot_path)
65+
except FileNotFoundError as e:
66+
raise HTTPException(
67+
status_code=status.HTTP_404_NOT_FOUND,
68+
detail=f"Snapshot with ID {snapshot_id} not found",
69+
) from e

application/backend/src/db/schema.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class DatasetSnapshotDB(Base):
4545
id: Mapped[str] = mapped_column(primary_key=True, default=lambda: str(uuid4()))
4646
project_id: Mapped[str] = mapped_column(ForeignKey("projects.id"))
4747
filename: Mapped[str] = mapped_column(Text, nullable=False)
48+
count: Mapped[int] = mapped_column(nullable=False)
4849
created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.current_timestamp())
4950

5051

application/backend/src/main.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from api.endpoints.pipeline_endpoints import router as pipeline_router
1717
from api.endpoints.project_endpoints import project_router
1818
from api.endpoints.sink_endpoints import router as sink_router
19+
from api.endpoints.snapshot_endpoints import router as snapshot_router
1920
from api.endpoints.source_endpoints import router as source_router
2021
from api.endpoints.trainable_models_endpoints import router as trainable_model_router
2122
from api.endpoints.webrtc import router as webrtc_router
@@ -58,6 +59,7 @@
5859
app.include_router(trainable_model_router)
5960
app.include_router(device_router)
6061
app.include_router(capture_router)
62+
app.include_router(snapshot_router)
6163

6264

6365
if __name__ == "__main__":

application/backend/src/pydantic_models/dataset_snapshot.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from datetime import datetime
55
from uuid import UUID
66

7-
from pydantic import Field
7+
from pydantic import BaseModel, Field
88

99
from pydantic_models.base import BaseIDModel
1010

@@ -16,15 +16,21 @@ class DatasetSnapshot(BaseIDModel):
1616

1717
project_id: UUID
1818
filename: str
19-
created_at: datetime = Field(default_factory=datetime.now)
19+
count: int
20+
created_at: datetime | None = Field(default=None)
2021

2122
model_config = {
2223
"json_schema_extra": {
2324
"example": {
2425
"id": "76e07d18-196e-4e33-bf98-ac1d35dca4cb",
2526
"project_id": "16e07d18-196e-4e33-bf98-ac1d35dcaaaa",
2627
"filename": "dataset_snapshot_123.parquet",
28+
"count": 42,
2729
"created_at": "2025-01-01T12:00:00",
2830
}
2931
}
3032
}
33+
34+
35+
class DatasetSnapshotList(BaseModel):
36+
snapshots: list[DatasetSnapshot]

application/backend/src/repositories/mappers/dataset_snapshot_mapper.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ def to_schema(model: DatasetSnapshot) -> DatasetSnapshotDB:
1616
id=str(model.id),
1717
project_id=str(model.project_id),
1818
filename=model.filename,
19+
count=model.count,
1920
created_at=model.created_at,
2021
)
2122

@@ -25,5 +26,6 @@ def from_schema(schema: DatasetSnapshotDB) -> DatasetSnapshot:
2526
id=UUID(schema.id),
2627
project_id=UUID(schema.project_id),
2728
filename=schema.filename,
29+
count=schema.count,
2830
created_at=schema.created_at,
2931
)

application/backend/src/repositories/project_repo.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from uuid import UUID
66

77
import sqlalchemy as sa
8+
from loguru import logger
89
from sqlalchemy.ext.asyncio.session import AsyncSession
910

1011
from db.schema import PipelineDB, ProjectDB
@@ -47,6 +48,7 @@ async def update_dataset_timestamp(self, project_id: str | UUID) -> None:
4748
updated_at=sa.func.current_timestamp(),
4849
)
4950
)
51+
logger.info(f"Updated dataset timestamp for project {project_id} to current time.")
5052
await self.db.commit()
5153

5254
async def get_dataset_timestamp(self, project_id: str | UUID) -> datetime:

application/backend/src/services/active_pipeline_service.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,15 +73,15 @@ async def _initialize(
7373
await self._load_app_config()
7474

7575
# For child processes with config_changed_condition, start a daemon to monitor configuration changes
76-
if start_daemon is not None and self.config_changed_condition is not None:
76+
if start_daemon and self.config_changed_condition is not None:
7777
# Store the current event loop for the daemon thread to use
7878
self._event_loop = asyncio.get_running_loop()
7979

8080
self._config_reload_daemon = Thread(
8181
target=self._reload_config_daemon_routine, name="Config reloader", daemon=True
8282
)
8383
self._config_reload_daemon.start()
84-
elif start_daemon is not None and self.config_changed_condition is None:
84+
elif self.config_changed_condition is None:
8585
# This is a child process but no condition provided - this is likely an API process
8686
# that doesn't need the daemon thread, so we just log and continue
8787
logger.debug("Child process detected but no config_changed_condition provided - skipping daemon thread")
@@ -102,7 +102,6 @@ async def _load_app_config(self) -> None:
102102
This method loads the active pipeline configuration and updates the
103103
internal source and sink configurations accordingly.
104104
"""
105-
logger.info("Loading configuration from database")
106105
async with get_async_db_session_ctx() as db:
107106
repo = PipelineRepository(db)
108107

@@ -141,7 +140,7 @@ def _reload_config_daemon_routine(self) -> None:
141140
notified = self.config_changed_condition.wait(timeout=3)
142141
if not notified: # awakened before of timeout
143142
continue
144-
logger.debug(f"Configuration changes detected. Process: {mp.current_process().name}")
143+
logger.info(f"Configuration changes detected. Process: {mp.current_process().name}")
145144
# Schedule the async reload in the event loop using the stored loop reference
146145
asyncio.run(self.reload())
147146

0 commit comments

Comments
 (0)