diff --git a/backend/kernelCI_app/management/commands/update_db.py b/backend/kernelCI_app/management/commands/update_db.py index 69930fca6..1edfc0d4f 100644 --- a/backend/kernelCI_app/management/commands/update_db.py +++ b/backend/kernelCI_app/management/commands/update_db.py @@ -1,12 +1,19 @@ +import ast +import csv import json import logging +import tarfile from datetime import datetime, timedelta -from typing import Generator +from io import IOBase, StringIO, TextIOWrapper +from itertools import islice +from pathlib import Path +from tempfile import SpooledTemporaryFile +from typing import Generator, Optional -from django.conf import settings from django.core.management.base import BaseCommand, CommandError from django.db import connections, models from django.utils import timezone +from django.utils.dateparse import parse_datetime from kernelCI_app.models import Builds, Checkouts, Incidents, Issues, Tests @@ -18,6 +25,19 @@ SELECT_BATCH_SIZE = 25000 +MAX_MEMORY_BUFFER_BYTES = 64 * 1024**2 # 64 MiB + +csv.field_size_limit(1 * 1024**2) # 1MiB row limit + + +def parse_array(array_str) -> Optional[list[str]]: + try: + array = ast.literal_eval(array_str) + assert isinstance(array, list) + return array + except (ValueError, SyntaxError): + return None + def parse_interval(interval_str: str) -> datetime: parts = interval_str.split() @@ -39,6 +59,23 @@ def parse_interval(interval_str: str) -> datetime: return timezone.now() - delta +def to_human_readable(num_bytes: int) -> str: + """Converts bytes to a human-readable string (KiB, MiB, GiB).""" + for unit in ["Bytes", "KiB", "MiB", "GiB"]: + if num_bytes < 1024.0: + return f"{num_bytes:.2f} {unit}" + num_bytes /= 1024.0 + return f"{num_bytes:.2f} TiB" + + +def ensure_suffix(filepath: str, suffix: str) -> Path: + """Ensure path ends with .tar.gz without doubling when suffix already present.""" + p = Path(filepath) + if p.name.lower().endswith(suffix): + return p + return p.with_suffix(suffix) + + class Command(BaseCommand): help = "Migrate data from default database to dashboard_db" @@ -51,56 +88,94 @@ def __init__(self, *args, **kwargs): self.related_data_only: bool self.origins: list[str] self.origin_condition: str - - if settings.USE_DASHBOARD_DB: - self.kcidb_connection = connections["kcidb"] - self.dashboard_conn_name = "default" - else: - self.kcidb_connection = connections["default"] - self.dashboard_conn_name = "dashboard_db" + self.snapshot_archive: tarfile.TarFile def add_arguments(self, parser): - parser.add_argument( + + command_parser = parser.add_subparsers(dest="command", required=True) + snapshot_parser = command_parser.add_parser( + "snapshot", help="Save snapshot of database into file." + ) + + snapshot_parser.add_argument( "--start-interval", type=str, help="Start interval for filtering data ('x days' or 'x hours' format)", required=True, ) - parser.add_argument( + snapshot_parser.add_argument( "--end-interval", type=str, help="End interval for filtering data ('x days' or 'x hours' format)", required=True, ) - parser.add_argument( + snapshot_parser.add_argument( "--table", type=str, help="""Table name to limit the migration to (optional, if not provided all tables will be migrated)""", ) - parser.add_argument( + snapshot_parser.add_argument( "--related-data-only", action="store_true", help="""Only retrieves data that are related to the existing data. This allows to follow foreign key constraints, but it almost certainly won't retrieve all data in the given interval.""", ) - parser.add_argument( + snapshot_parser.add_argument( "--origins", type=lambda s: [origin.strip() for origin in s.split(",")], help="Limit database changes to specific origins (comma-separated list)." " If not provided, any origin will be considered", default=[], ) + snapshot_parser.add_argument( + "--filepath", + type=str, + required=True, + help="Path to store/load the snapshot (.tar.gz) file.", + ) - def handle( + restore_parser = command_parser.add_parser( + "restore", + help="Load snapshot file onto database (will INSERT into database).", + ) + restore_parser.add_argument( + "--filepath", + type=str, + required=True, + help="Path to store/load the snapshot (.tar.gz) file.", + ) + + def _invalid_table_error(self, table: str) -> str: + return ( + f"Unknown table '{table}'.\n" + "\tValid options are: issues, checkouts, builds, tests, incidents." + ) + + def handle(self, *args, command, **options): + if command == "snapshot": + self.handle_snapshot(**options) + elif command == "restore": + self.handle_restore(**options) + else: + raise ValueError(f"Invalid command: {command}") + + def handle_restore(self, *args, filepath, **options): + + self.related_data_only = False + filepath = ensure_suffix(filepath, ".tar.gz") + self.restore(filepath) + + def handle_snapshot( self, *args, start_interval: str, end_interval: str, table: str, origins: list[str], - related_data_only, + related_data_only: bool, + filepath: str, **options, ): self.start_interval = start_interval @@ -129,38 +204,59 @@ def handle( f"\nFiltering data between {self.start_interval} and {self.end_interval}" ) + filepath = ensure_suffix(filepath, ".tar.gz") + + self.snapshot(table, filepath) + + def snapshot(self, table, snapshot_filepath: Path): + + self.snapshot_archive = tarfile.open(snapshot_filepath, "w:gz") + try: match table: case None: - self.migrate_issues() - self.migrate_checkouts() - self.migrate_builds() - self.migrate_tests() - self.migrate_incidents() + self.snapshot_issues() + self.snapshot_checkouts() + self.snapshot_builds() + self.snapshot_tests() + self.snapshot_incidents() case "issues": - self.migrate_issues() + self.snapshot_issues() case "checkouts": - self.migrate_checkouts() + self.snapshot_checkouts() case "builds": - self.migrate_builds() + self.snapshot_builds() case "tests": - self.migrate_tests() + self.snapshot_tests() case "incidents": - self.migrate_incidents() + self.snapshot_incidents() case _: - self.stdout.write( - self.style.ERROR( - f"""Unknown table '{table}'. - Valid options are: issues, checkouts, builds, tests, incidents.""" - ) - ) + self.stdout.write(self._invalid_table_error(table)) + self.stdout.write( + self.style.SUCCESS("Successfully migrated all data to dashboard_db") + ) + except Exception as e: + logger.error(f"Error updating database: {str(e)}") + raise CommandError("Command failed") from e + finally: + self.snapshot_archive.close() + def restore(self, snapshot_filepath: Path): + self.snapshot_archive = tarfile.open(snapshot_filepath, "r:*") + try: + self.restore_checkouts() + self.restore_builds() + self.restore_issues() + self.restore_tests() + self.restore_incidents() self.stdout.write( self.style.SUCCESS("Successfully migrated all data to dashboard_db") ) except Exception as e: logger.error(f"Error updating database: {str(e)}") raise CommandError("Command failed") from e + finally: + self.snapshot_archive.close() def get_related_data( self, *, model: models.Model, field_name: str, filter_timestamp: bool = True @@ -171,7 +267,7 @@ def get_related_data( if self.related_data_only is False: return set(), "" - values = model.objects.using(self.dashboard_conn_name) + values = model.objects if filter_timestamp: values = values.filter( field_timestamp__gte=self.start_timestamp, @@ -203,7 +299,7 @@ def select_issues_data(self) -> list[tuple]: self.end_interval, ] + self.origins - with self.kcidb_connection.cursor() as kcidb_cursor: + with connections["default"].cursor() as kcidb_cursor: kcidb_cursor.execute(query, query_params) return kcidb_cursor.fetchall() @@ -216,7 +312,7 @@ def insert_issues_data(self, records: list[tuple]) -> int: Issues( id=record[1], version=record[2], - field_timestamp=record[0], + field_timestamp=parse_datetime(record[0]), origin=record[3], report_url=record[4], report_subject=record[5], @@ -225,11 +321,11 @@ def insert_issues_data(self, records: list[tuple]) -> int: culprit_harness=record[8], comment=record[9], misc=json.loads(record[10]) if record[10] else None, - categories=record[11], + categories=parse_array(record[11]), ) ) - migrated_issues = Issues.objects.using(self.dashboard_conn_name).bulk_create( + migrated_issues = Issues.objects.bulk_create( original_issues, ignore_conflicts=True, batch_size=DEFAULT_BATCH_SIZE, @@ -239,13 +335,23 @@ def insert_issues_data(self, records: list[tuple]) -> int: self.stdout.write(f"Processed {total_inserted} Issues records") return total_inserted - def migrate_issues(self) -> None: - """Migrate Issues data from default to dashboard_db""" - - self.stdout.write("\nMigrating Issues...") - records = self.select_issues_data() - self.insert_issues_data(records) - self.stdout.write("Issues migration completed") + def snapshot_issues(self) -> None: + """Migrate Issues data from dashboard_db to file""" + with SpooledTemporaryFile(mode="w+b", max_size=MAX_MEMORY_BUFFER_BYTES) as file: + self.stdout.write("\nMigrating Issues...") + records = self.select_issues_data() + self.insert_records(file, "issues", records) + self.add_file_to_snapshot(file, "issues") + self.stdout.write("Issues migration completed") + + def restore_issues(self) -> None: + """Migrate Issues data from file to dashboard_db""" + with TextIOWrapper(self.snapshot_archive.extractfile("issues.csv")) as file: + self.stdout.write("\nMigrating Issues...") + reader = csv.reader(file) + records = self.read_records(reader) + self.insert_issues_data(records) + self.stdout.write("Issues migration completed") # CHECKOUTS ######################################## def select_checkouts_data(self) -> list[tuple]: @@ -267,7 +373,7 @@ def select_checkouts_data(self) -> list[tuple]: self.end_interval, ] + self.origins - with self.kcidb_connection.cursor() as kcidb_cursor: + with connections["default"].cursor() as kcidb_cursor: kcidb_cursor.execute(query, query_params) return kcidb_cursor.fetchall() @@ -277,7 +383,7 @@ def insert_checkouts_data(self, records: list[tuple]) -> int: for record in records: original_checkouts.append( Checkouts( - field_timestamp=record[0], + field_timestamp=parse_datetime(record[0]), id=record[1], origin=record[2], tree_name=record[3], @@ -289,22 +395,20 @@ def insert_checkouts_data(self, records: list[tuple]) -> int: patchset_hash=record[9], message_id=record[10], comment=record[11], - start_time=record[12], + start_time=parse_datetime(record[12]), log_url=record[13], log_excerpt=record[14], valid=record[15], misc=json.loads(record[16]) if record[16] else None, git_commit_message=record[17], git_repository_branch_tip=record[18], - git_commit_tags=record[19], - origin_builds_finish_time=record[20], - origin_tests_finish_time=record[21], + git_commit_tags=parse_array(record[19]), + origin_builds_finish_time=record[20] or None, + origin_tests_finish_time=record[21] or None, ) ) - migrated_checkouts = Checkouts.objects.using( - self.dashboard_conn_name - ).bulk_create( + migrated_checkouts = Checkouts.objects.bulk_create( original_checkouts, ignore_conflicts=True, batch_size=DEFAULT_BATCH_SIZE, @@ -315,14 +419,25 @@ def insert_checkouts_data(self, records: list[tuple]) -> int: self.stdout.write(f"Processed {total_inserted} Checkouts records") return total_inserted - def migrate_checkouts(self) -> None: + def snapshot_checkouts(self) -> None: + """Migrate Checkouts data from default to dashboard_db""" + + with SpooledTemporaryFile(mode="w+b", max_size=MAX_MEMORY_BUFFER_BYTES) as file: + self.stdout.write("\nMigrating Checkouts...") + records = self.select_checkouts_data() + self.insert_records(file, "checkouts", records) + self.add_file_to_snapshot(file, "checkouts") + self.stdout.write("Checkouts migration completed") + + def restore_checkouts(self) -> None: """Migrate Checkouts data from default to dashboard_db""" - self.stdout.write("\nMigrating Checkouts...") - records = self.select_checkouts_data() - self.insert_checkouts_data(records) - self.stdout.write("Checkouts migration completed") - return + with TextIOWrapper(self.snapshot_archive.extractfile("checkouts.csv")) as file: + reader = csv.reader(file) + self.stdout.write("\nMigrating Checkouts...") + records = self.read_records(reader) + self.insert_checkouts_data(records) + self.stdout.write("Checkouts migration completed") # BUILDS ######################################## def select_builds_data(self) -> list[tuple]: @@ -353,20 +468,62 @@ def select_builds_data(self) -> list[tuple]: + self.origins ) - with self.kcidb_connection.cursor() as kcidb_cursor: + with connections["default"].cursor() as kcidb_cursor: kcidb_cursor.execute(query, query_params) return kcidb_cursor.fetchall() + def write_to_csv_stream(self, records: list[tuple]) -> StringIO: + stream = StringIO() + writer = csv.writer(stream) + writer.writerows(records) + return stream + + def read_records( + self, reader: csv.reader, max_rows: Optional[int] = None + ) -> list[tuple]: + records = [tuple(record) for record in islice(reader, max_rows)] + return records + + def insert_records(self, tmp_file: IOBase, table: str, records: list[tuple]): + stream = self.write_to_csv_stream(records) + csv_bytes = stream.getvalue().encode("utf-8") + tmp_file.write(csv_bytes) + self.stdout.write( + f"\nProcessed {len(records)} {table} records: {to_human_readable(len(csv_bytes))}" + ) + + def snapshot_builds(self) -> None: + """Migrate Builds data from default to dashboard_db, + only inserts builds that have the related checkout in the dashboard_db + in order to preserve the foreign key constraint""" + with SpooledTemporaryFile(mode="w+b", max_size=MAX_MEMORY_BUFFER_BYTES) as file: + self.stdout.write("\nMigrating Builds...") + records = self.select_builds_data() + self.insert_records(file, "builds", records) + self.add_file_to_snapshot(file, "builds") + self.stdout.write("Builds migration completed") + + def restore_builds(self) -> None: + """Migrate Builds data from default to dashboard_db, + only inserts builds that have the related checkout in the dashboard_db + in order to preserve the foreign key constraint""" + with TextIOWrapper(self.snapshot_archive.extractfile("builds.csv")) as file: + self.stdout.write("\nMigrating Builds...") + reader = csv.reader(file) + records = self.read_records(reader) + self.insert_builds_data(records) + self.stdout.write("Builds migration completed") + def insert_builds_data(self, records: list[tuple]) -> int: original_builds: list[Builds] = [ Builds( - field_timestamp=record[0], + field_timestamp=parse_datetime(record[0]), checkout_id=record[1], id=record[2], origin=record[3], comment=record[4], - start_time=record[5], - duration=record[6], + start_time=parse_datetime(record[5]), + duration=record[6] or None, architecture=record[7], command=record[8], compiler=record[9], @@ -382,7 +539,7 @@ def insert_builds_data(self, records: list[tuple]) -> int: for record in records ] - migrated_builds = Builds.objects.using(self.dashboard_conn_name).bulk_create( + migrated_builds = Builds.objects.bulk_create( original_builds, ignore_conflicts=True, batch_size=BUILD_BATCH_SIZE, @@ -392,16 +549,6 @@ def insert_builds_data(self, records: list[tuple]) -> int: self.stdout.write(f"Processed {total_inserted} Builds records") return total_inserted - def migrate_builds(self) -> None: - """Migrate Builds data from default to dashboard_db, - only inserts builds that have the related checkout in the dashboard_db - in order to preserve the foreign key constraint""" - self.stdout.write("\nMigrating Builds...") - - records = self.select_builds_data() - self.insert_builds_data(records) - self.stdout.write("Builds migration completed") - # TESTS ######################################## def select_tests_data(self) -> Generator[list[tuple], None, list[tuple]]: related_build_ids, related_condition = self.get_related_data( @@ -432,7 +579,7 @@ def select_tests_data(self) -> Generator[list[tuple], None, list[tuple]]: + self.origins ) - with self.kcidb_connection.cursor() as kcidb_cursor: + with connections["default"].cursor() as kcidb_cursor: kcidb_cursor.execute(tests_query, query_params) self.stdout.write("Finished fetching tests") while batch := kcidb_cursor.fetchmany(SELECT_BATCH_SIZE): @@ -442,7 +589,7 @@ def insert_tests_data(self, records: list[tuple]) -> int: print(f"Processing {len(records)} tests") original_tests: list[Tests] = [ Tests( - field_timestamp=record[0], + field_timestamp=parse_datetime(record[0]), build_id=record[1], id=record[2], origin=record[3], @@ -453,12 +600,12 @@ def insert_tests_data(self, records: list[tuple]) -> int: log_url=record[8], log_excerpt=record[9], status=record[10], - start_time=record[11], - duration=record[12], + start_time=parse_datetime(record[11]) if record[11] else None, + duration=record[12] or None, output_files=json.loads(record[13]) if record[13] else None, misc=json.loads(record[14]) if record[14] else None, - number_value=record[15], - environment_compatible=record[16], + number_value=record[15] or None, + environment_compatible=parse_array(record[16]), number_prefix=record[17], number_unit=record[18], input_files=json.loads(record[19]) if record[19] else None, @@ -466,7 +613,7 @@ def insert_tests_data(self, records: list[tuple]) -> int: for record in records ] - migrated_tests = Tests.objects.using(self.dashboard_conn_name).bulk_create( + migrated_tests = Tests.objects.bulk_create( original_tests, ignore_conflicts=True, batch_size=TEST_BATCH_SIZE, @@ -476,19 +623,35 @@ def insert_tests_data(self, records: list[tuple]) -> int: self.stdout.write(f"Processed {total_inserted} Tests records") return total_inserted - def migrate_tests(self) -> None: + def add_file_to_snapshot(self, file: IOBase, table: str): + tar_info = tarfile.TarInfo(name=f"{table}.csv") + file.seek(0, 2) + tar_info.size = file.tell() + file.seek(0, 0) + self.snapshot_archive.addfile(tar_info, file) + + def snapshot_tests(self) -> None: """Migrate Tests data from default to dashboard_db, only inserts tests that have the related build in the dashboard_db in order to preserve the foreign key constraint""" - - self.stdout.write("\nMigrating Tests...") - total_inserted = 0 - for batch in self.select_tests_data(): - inserted = self.insert_tests_data(batch) - total_inserted += inserted - self.stdout.write( - f"\nTests migration completed.\nInserted {total_inserted} tests in total." - ) + with SpooledTemporaryFile(mode="w+b", max_size=MAX_MEMORY_BUFFER_BYTES) as file: + self.stdout.write("\nMigrating Tests...") + records = self.select_tests_data() + for record_batch in records: + self.insert_records(file, "tests", record_batch) + self.add_file_to_snapshot(file, "tests") + self.stdout.write("Tests migration completed") + + def restore_tests(self) -> None: + """Migrate Tests data from default to dashboard_db, + only inserts tests that have the related build in the dashboard_db + in order to preserve the foreign key constraint""" + with TextIOWrapper(self.snapshot_archive.extractfile("tests.csv")) as file: + self.stdout.write("\nMigrating Tests...") + reader = csv.reader(file) + while records := self.read_records(reader, max_rows=TEST_BATCH_SIZE): + self.insert_tests_data(records) + self.stdout.write("Tests migration completed") # INCIDENTS ######################################## def select_incidents_data(self) -> list[tuple]: @@ -521,7 +684,7 @@ def select_incidents_data(self) -> list[tuple]: + self.origins ) - with self.kcidb_connection.cursor() as kcidb_cursor: + with connections["default"].cursor() as kcidb_cursor: kcidb_cursor.execute(query, query_params) records = kcidb_cursor.fetchall() print(f"Retrieved {len(records)} Incidents") @@ -551,21 +714,21 @@ def insert_incidents_data(self, records: list[tuple]) -> int: proposed_test_ids.add(test_id) existing_issue_ids = set( - Issues.objects.using(self.dashboard_conn_name) - .filter(id__in=[issue[0] for issue in proposed_issue_ids]) - .values_list("id", flat=True) + Issues.objects.filter( + id__in=[issue[0] for issue in proposed_issue_ids] + ).values_list("id", flat=True) ) existing_build_ids = set( - Builds.objects.using(self.dashboard_conn_name) - .filter(id__in=proposed_build_ids) - .values_list("id", flat=True) + Builds.objects.filter(id__in=proposed_build_ids).values_list( + "id", flat=True + ) ) existing_test_ids = set( - Tests.objects.using(self.dashboard_conn_name) - .filter(id__in=proposed_test_ids) - .values_list("id", flat=True) + Tests.objects.filter(id__in=proposed_test_ids).values_list( + "id", flat=True + ) ) # Incidents that don't have a related issue, build or test in the dashboard_db @@ -601,9 +764,7 @@ def insert_incidents_data(self, records: list[tuple]) -> int: else: skipped_incidents += 1 - migrated_incidents = Incidents.objects.using( - self.dashboard_conn_name - ).bulk_create( + migrated_incidents = Incidents.objects.bulk_create( original_incidents, ignore_conflicts=True, batch_size=DEFAULT_BATCH_SIZE, @@ -615,13 +776,28 @@ def insert_incidents_data(self, records: list[tuple]) -> int: ) return total_inserted - def migrate_incidents(self) -> None: + def snapshot_incidents(self) -> None: + """Migrate Incidents data from default to dashboard_db, + incidents are related to issues, builds and tests. + So if any of them are not null, an incident will only be inserted + if the related issue, build or test exists in the dashboard_db""" + + with SpooledTemporaryFile(mode="w+b", max_size=MAX_MEMORY_BUFFER_BYTES) as file: + self.stdout.write("\nMigrating Incidents...") + records = self.select_incidents_data() + self.insert_records(file, "incidents", records) + self.add_file_to_snapshot(file, "incidents") + self.stdout.write("Incidents migration completed") + + def restore_incidents(self) -> None: """Migrate Incidents data from default to dashboard_db, incidents are related to issues, builds and tests. So if any of them are not null, an incident will only be inserted if the related issue, build or test exists in the dashboard_db""" - self.stdout.write("\nMigrating Incidents...") - records = self.select_incidents_data() - self.insert_incidents_data(records) - self.stdout.write("Incidents migration completed") + with TextIOWrapper(self.snapshot_archive.extractfile("incidents.csv")) as file: + self.stdout.write("\nMigrating Incidents...") + reader = csv.reader(file) + records = self.read_records(reader) + self.insert_incidents_data(records) + self.stdout.write("Incidents migration completed") diff --git a/backend/scripts/copy_db_data.sh b/backend/scripts/copy_db_data.sh deleted file mode 100755 index d6bb4971f..000000000 --- a/backend/scripts/copy_db_data.sh +++ /dev/null @@ -1,46 +0,0 @@ -#!/bin/bash - -# Copies data from the DB_DEFAULT to the DASH_DB, through the manage.py update_db command. -# -# The small interval is done in order to avoid running into OOM errors. -# There is no problem in running this script multiple times, as it will only update the database with new data. -# -# Usage in docker: -# docker compose run --build --rm backend sh ./kernelCI_app/backend/scripts/copy_db_data.sh -# -# If OOM errors still occur, lower the INTERVAL_HOURS value - -START_DAYS_AGO=7 -END_DAYS_AGO=0 -INTERVAL_HOURS=8 -ORIGINS="maestro" # comma-separated list of origins. If empty will select all origins - -START_HOURS_AGO=$((START_DAYS_AGO * 24)) -END_HOURS_AGO=$((END_DAYS_AGO * 24)) - -echo "Starting database update from $START_HOURS_AGO hours ago to $END_HOURS_AGO hours ago in $INTERVAL_HOURS hour intervals on origins $ORIGINS..." - -current_start=$START_HOURS_AGO -origins_arg="" -if [ $ORIGINS ]; then - origins_arg="--origins $ORIGINS" -fi -while [ $current_start -gt $END_HOURS_AGO ]; do - current_end=$((current_start - INTERVAL_HOURS)) - - if [ $current_end -lt $END_HOURS_AGO ]; then - current_end=$END_HOURS_AGO - fi - - echo "Updating database from $current_start hours ago to $current_end hours ago..." - poetry run python3 manage.py update_db ${origins_arg} --start-interval "${current_start} hours" --end-interval "${current_end} hours" - - if [ $? -ne 0 ]; then - echo "Error: Command failed for interval ${current_start} to ${current_end} hours ago" - exit 1 - fi - - current_start=$current_end -done - -echo "Database update completed successfully!"