Skip to content

Commit 20e2783

Browse files
authored
Merge pull request #28 from sendbird/feature/auto-detect-pk-column
Auto detect PK column
2 parents 38d9d9a + 57b67c7 commit 20e2783

File tree

13 files changed

+113
-79
lines changed

13 files changed

+113
-79
lines changed

doc/operation-class.md

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,17 @@ class MessageRetentionOperation(BaseOperation):
2727
INSERT INTO {self.source_db}.{self.destination_table}({self.source_columns})
2828
SELECT {self.source_columns}
2929
FROM {self.source_db}.{self.source_table} AS source
30-
WHERE source.id BETWEEN {start_pk} AND {end_pk}
30+
WHERE source.{self.pk_column} BETWEEN {start_pk} AND {end_pk}
3131
AND source.ts > DATE_SUB(NOW(), INTERVAL 30 DAY)
3232
"""
3333
def _get_not_imported_pks_query(self, start_pk, end_pk):
3434
return f'''
35-
SELECT source.id FROM {self.source_db}.{self.source_table} AS source
36-
LEFT JOIN {self.source_db}.{self.destination_table} AS dest ON source.id = dest.id
37-
WHERE source.id BETWEEN {start_pk} AND {end_pk}
35+
SELECT source.{self.pk_column} FROM {self.source_db}.{self.source_table} AS source
36+
LEFT JOIN {self.source_db}.{self.destination_table} AS dest
37+
ON source.{self.pk_column} = dest.{self.pk_column}
38+
WHERE source.{self.pk_column} BETWEEN {start_pk} AND {end_pk}
3839
AND source.ts > DATE_SUB(NOW(), INTERVAL 30 DAY)
39-
AND dest.id IS NULL
40+
AND dest.{self.pk_column} IS NULL
4041
'''
4142
```
4243
@@ -48,20 +49,20 @@ class CrossClusterMessageRetentionOperation(CrossClusterBaseOperation):
4849
def _select_batch_query(self, start_pk, end_pk):
4950
return f'''
5051
SELECT {self.source_columns} FROM {self.source_db}.{self.source_table}
51-
WHERE id BETWEEN {start_pk} AND {end_pk}
52+
WHERE {self.pk_column} BETWEEN {start_pk} AND {end_pk}
5253
AND source.ts > DATE_SUB(NOW(), INTERVAL 30 DAY)
5354
'''
5455
5556
def get_not_imported_pks(self, source_cursor, dest_cursor, start_pk, end_pk):
5657
source_cursor.execute(f'''
57-
SELECT id FROM {self.source_db}.{self.source_table}
58-
WHERE id BETWEEN {start_pk} AND {end_pk}
58+
SELECT {self.pk_column} FROM {self.source_db}.{self.source_table}
59+
WHERE {self.pk_column} BETWEEN {start_pk} AND {end_pk}
5960
AND source.ts > DATE_SUB(NOW(), INTERVAL 30 DAY)
6061
''')
6162
source_pks = [row[0] for row in source_cursor.fetchall()]
6263
dest_cursor.execute(f'''
63-
SELECT id FROM {self.destination_db}.{self.destination_table}
64-
WHERE id BETWEEN {start_pk} AND {end_pk}
64+
SELECT {self.pk_column} FROM {self.destination_db}.{self.destination_table}
65+
WHERE {self.pk_column} BETWEEN {start_pk} AND {end_pk}
6566
AND source.ts > DATE_SUB(NOW(), INTERVAL 30 DAY)
6667
''')
6768
dest_pks = [row[0] for row in dest_cursor.fetchall()]
@@ -89,7 +90,7 @@ class MessageRetentionOperation(BaseOperation):
8990
INSERT INTO {self.source_db}.{self.destination_table}({self.source_columns})
9091
SELECT {self.source_columns}
9192
FROM {self.source_db}.{self.source_table} AS source
92-
WHERE source.id BETWEEN {start_pk} AND {end_pk}
93+
WHERE source.{self.pk_column} BETWEEN {start_pk} AND {end_pk}
9394
AND source.ts > DATE_SUB(NOW(), INTERVAL {self.operation_config.retention_days} DAY)
9495
"""
9596
```

src/modules/redis/schema.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ class Metadata(Hash):
2828
destination_db: str
2929
destination_table: str
3030
source_columns: str
31-
max_id: int
31+
pk_column: str
32+
max_pk: int
3233
start_datetime: datetime
3334

3435

src/sbosc/controller/controller.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,14 @@ def create_bulk_import_chunks(self):
6363
self.redis_data.remove_all_chunks()
6464

6565
metadata = self.redis_data.metadata
66-
max_id = metadata.max_id
66+
max_pk = metadata.max_pk
6767

6868
# chunk_count is determined by min_chunk_size and max_chunk_count
6969
# Each chunk will have min_chunk_size rows and the number of chunks should not exceed max_chunk_count
7070
min_chunk_size = config.MIN_CHUNK_SIZE
7171
max_chunk_count = config.MAX_CHUNK_COUNT # Number of chunks means max number of worker threads
72-
chunk_count = min(max_id // min_chunk_size, max_chunk_count)
73-
chunk_size = max_id // chunk_count
72+
chunk_count = min(max_pk // min_chunk_size, max_chunk_count)
73+
chunk_size = max_pk // chunk_count
7474

7575
# Create chunks
7676
# Each chunk will have a range of primary key values [start_pk, end_pk]
@@ -79,7 +79,7 @@ def create_bulk_import_chunks(self):
7979
start_pk = i * chunk_size + 1
8080
end_pk = (i + 1) * chunk_size
8181
if i == chunk_count - 1:
82-
end_pk = max_id
82+
end_pk = max_pk
8383

8484
chunk_id = f"{self.migration_id}-{i}"
8585
chunk_info = self.redis_data.get_chunk_info(chunk_id)
@@ -112,7 +112,7 @@ def create_bulk_import_chunks(self):
112112
self.redis_data.set_current_stage(Stage.BULK_IMPORT)
113113
self.slack.send_message(
114114
subtitle="Bulk import started",
115-
message=f"Max id: {max_id}\n"
115+
message=f"Max PK: {max_pk}\n"
116116
f"Chunk count: {chunk_count}\n"
117117
f"Chunk size: {chunk_size}\n"
118118
f"Batch size: {config.MIN_BATCH_SIZE}\n"
@@ -343,7 +343,7 @@ def swap_tables(self):
343343
old_source_table = f"{metadata.source_db}.{self.redis_data.old_source_table}"
344344
cursor.execute(f"RENAME TABLE {source_table} TO {old_source_table}")
345345
after_rename_table_timestamp = time.time()
346-
cursor.execute(f"SELECT MAX(id) FROM {old_source_table}")
346+
cursor.execute(f"SELECT MAX({metadata.pk_column}) FROM {old_source_table}")
347347
final_max_id = cursor.fetchone()[0]
348348

349349
with self.validator.migration_operation.override_source_table(self.redis_data.old_source_table):

src/sbosc/controller/initializer.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,10 +174,25 @@ def fetch_metadata(self, redis_data):
174174
metadata.source_columns = cursor.fetchone()[0]
175175
self.logger.info("Saved source column schema to Redis")
176176

177+
# Get pk column
178+
cursor.execute(f'''
179+
SELECT COLUMN_NAME FROM information_schema.COLUMNS
180+
WHERE TABLE_SCHEMA = '{metadata.source_db}' AND TABLE_NAME = '{metadata.source_table}'
181+
AND COLUMN_KEY = 'PRI' AND EXTRA LIKE '%auto_increment%'
182+
''')
183+
if cursor.rowcount == 0:
184+
raise Exception("Auto increment primary key column not found")
185+
metadata.pk_column = f"`{cursor.fetchone()[0]}`"
186+
self.logger.info("Saved primary key column to Redis")
187+
177188
# Get max id
178-
cursor.execute("SELECT MAX(id) FROM %s.%s" % (metadata.source_db, metadata.source_table))
179-
max_id = cursor.fetchone()[0]
180-
metadata.max_id = max_id
189+
cursor.execute('''
190+
SELECT MAX(%s) FROM %s.%s
191+
''' % (metadata.pk_column, metadata.source_db, metadata.source_table))
192+
max_pk = cursor.fetchone()[0]
193+
if max_pk is None:
194+
raise Exception("No data in source table")
195+
metadata.max_pk = max_pk
181196
self.logger.info("Saved total rows to Redis")
182197

183198
metadata.start_datetime = datetime.now()

src/sbosc/controller/validator.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ def bulk_import_validation(self):
8383
metadata = self.redis_data.metadata
8484
range_queue = Queue()
8585
start_pk = 0
86-
while start_pk <= metadata.max_id:
87-
range_queue.put((start_pk, min(start_pk + self.bulk_import_batch_size, metadata.max_id)))
86+
while start_pk <= metadata.max_pk:
87+
range_queue.put((start_pk, min(start_pk + self.bulk_import_batch_size, metadata.max_pk)))
8888
start_pk += self.bulk_import_batch_size + 1
8989
failed_pks = []
9090

@@ -153,13 +153,15 @@ def __execute_apply_dml_events_validation_query(
153153
if event_pks:
154154
event_pks_str = ','.join([str(pk) for pk in event_pks])
155155
dest_cursor.execute(f'''
156-
SELECT id FROM {metadata.destination_db}.{metadata.destination_table} WHERE id IN ({event_pks_str})
156+
SELECT {metadata.pk_column} FROM {metadata.destination_db}.{metadata.destination_table}
157+
WHERE {metadata.pk_column} IN ({event_pks_str})
157158
''')
158159
not_deleted_pks = set([row[0] for row in dest_cursor.fetchall()])
159160
if dest_cursor.rowcount > 0:
160161
# Check if deleted pks are reinserted
161162
source_cursor.execute(f'''
162-
SELECT id FROM {metadata.source_db}.{metadata.source_table} WHERE id IN ({event_pks_str})
163+
SELECT {metadata.pk_column} FROM {metadata.source_db}.{metadata.source_table}
164+
WHERE {metadata.pk_column} IN ({event_pks_str})
163165
''')
164166
reinserted_pks = set([row[0] for row in source_cursor.fetchall()])
165167
if reinserted_pks:

src/sbosc/monitor/monitor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -306,8 +306,8 @@ def check_migration_status(self):
306306
if last_pk_inserted and last_pk_inserted >= chunk_info.start_pk:
307307
inserted_rows += last_pk_inserted - chunk_info.start_pk
308308

309-
if self.redis_data.metadata.max_id:
310-
bulk_import_progress = inserted_rows / self.redis_data.metadata.max_id * 100
309+
if self.redis_data.metadata.max_pk:
310+
bulk_import_progress = inserted_rows / self.redis_data.metadata.max_pk * 100
311311
self.metric_sender.submit('sb_osc_bulk_import_progress', bulk_import_progress)
312312

313313
self.submit_event_handler_timestamps()

0 commit comments

Comments
 (0)