Skip to content

Commit 43e0d2c

Browse files
committed
store rows copied & dml applied
1 parent d4ac082 commit 43e0d2c

File tree

5 files changed

+21
-5
lines changed

5 files changed

+21
-5
lines changed

go/logic/applier.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,8 @@ func (this *Applier) CreateCheckpointTable() error {
435435
"`gh_ost_chk_timestamp` bigint",
436436
"`gh_ost_chk_coords` varchar(4096)",
437437
"`gh_ost_chk_iteration` bigint",
438+
"`gh_ost_rows_copied` bigint",
439+
"`gh_ost_dml_applied` bigint",
438440
}
439441
for _, col := range this.migrationContext.UniqueKey.Columns.Columns() {
440442
if col.MySQLType == "" {
@@ -623,7 +625,7 @@ func (this *Applier) WriteCheckpoint(chk *Checkpoint) (int64, error) {
623625
if err != nil {
624626
return insertId, err
625627
}
626-
args := sqlutils.Args(chk.LastTrxCoords.String(), chk.Iteration)
628+
args := sqlutils.Args(chk.LastTrxCoords.String(), chk.Iteration, chk.RowsCopied, chk.DMLApplied)
627629
args = append(args, uniqueKeyArgs...)
628630
res, err := this.db.Exec(query, args...)
629631
if err != nil {
@@ -641,7 +643,7 @@ func (this *Applier) ReadLastCheckpoint() (*Checkpoint, error) {
641643

642644
var coordStr string
643645
var timestamp int64
644-
ptrs := []interface{}{&chk.Id, &timestamp, &coordStr, &chk.Iteration}
646+
ptrs := []interface{}{&chk.Id, &timestamp, &coordStr, &chk.Iteration, &chk.RowsCopied, &chk.DMLApplied}
645647
ptrs = append(ptrs, chk.IterationRangeMin.ValuesPointers...)
646648
ptrs = append(ptrs, chk.IterationRangeMax.ValuesPointers...)
647649
err := row.Scan(ptrs...)

go/logic/applier_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -700,6 +700,8 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() {
700700
IterationRangeMin: applier.migrationContext.MigrationRangeMinValues,
701701
IterationRangeMax: applier.migrationContext.MigrationRangeMaxValues,
702702
Iteration: 2,
703+
RowsCopied: 100000,
704+
DMLApplied: 200000,
703705
}
704706
id, err := applier.WriteCheckpoint(chk)
705707
suite.Require().NoError(err)
@@ -712,6 +714,8 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() {
712714
suite.Require().Equal(chk.LastTrxCoords.String(), gotChk.LastTrxCoords.String())
713715
suite.Require().Equal(chk.IterationRangeMin.String(), gotChk.IterationRangeMin.String())
714716
suite.Require().Equal(chk.IterationRangeMax.String(), gotChk.IterationRangeMax.String())
717+
suite.Require().Equal(chk.RowsCopied, gotChk.RowsCopied)
718+
suite.Require().Equal(chk.DMLApplied, gotChk.DMLApplied)
715719
}
716720

717721
func TestApplier(t *testing.T) {

go/logic/checkpoint.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,6 @@ type Checkpoint struct {
2626
// for the chunk copier range.
2727
IterationRangeMax *sql.ColumnValues
2828
Iteration int64
29+
RowsCopied int64
30+
DMLApplied int64
2931
}

go/logic/migrator.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,8 @@ func (this *Migrator) Migrate() (err error) {
428428
this.migrationContext.MigrationIterationRangeMinValues = lastCheckpoint.IterationRangeMin
429429
this.migrationContext.MigrationIterationRangeMaxValues = lastCheckpoint.IterationRangeMax
430430
this.migrationContext.Iteration = lastCheckpoint.Iteration
431+
this.migrationContext.TotalRowsCopied = lastCheckpoint.RowsCopied
432+
this.migrationContext.TotalDMLEventsApplied = lastCheckpoint.DMLApplied
431433
this.migrationContext.InitialStreamerCoords = lastCheckpoint.LastTrxCoords
432434
if err := this.initiateStreaming(); err != nil {
433435
return err
@@ -1397,7 +1399,7 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error {
13971399

13981400
// Checkpoint attempts to write a checkpoint of the Migrator's current state.
13991401
// It gets the binlog coordinates of the last received trx and waits until the
1400-
// applier reaches that trx. At that point it's safe to resume from these
1402+
// applier reaches that trx. At that point it's safe to resume from these coordinates.
14011403
func (this *Migrator) Checkpoint(ctx context.Context) (*Checkpoint, error) {
14021404
coords := this.eventsStreamer.GetCurrentBinlogCoordinates()
14031405
this.applier.LastIterationRangeMutex.Lock()
@@ -1410,6 +1412,8 @@ func (this *Migrator) Checkpoint(ctx context.Context) (*Checkpoint, error) {
14101412
IterationRangeMin: this.applier.LastIterationRangeMinValues.Clone(),
14111413
IterationRangeMax: this.applier.LastIterationRangeMaxValues.Clone(),
14121414
LastTrxCoords: coords,
1415+
RowsCopied: atomic.LoadInt64(&this.migrationContext.TotalRowsCopied),
1416+
DMLApplied: atomic.LoadInt64(&this.migrationContext.TotalDMLEventsApplied),
14131417
}
14141418
this.applier.LastIterationRangeMutex.Unlock()
14151419

go/sql/builder.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,13 @@ func NewCheckpointQueryBuilder(databaseName, tableName string, uniqueKeyColumns
123123
stmt := fmt.Sprintf(`
124124
insert /* gh-ost */
125125
into %s.%s
126-
(gh_ost_chk_timestamp, gh_ost_chk_coords, gh_ost_chk_iteration, %s, %s)
126+
(gh_ost_chk_timestamp, gh_ost_chk_coords, gh_ost_chk_iteration,
127+
gh_ost_rows_copied, gh_ost_dml_applied,
128+
%s, %s)
127129
values
128-
(unix_timestamp(now()), ?, ?, %s, %s)`,
130+
(unix_timestamp(now()), ?, ?,
131+
?, ?,
132+
%s, %s)`,
129133
databaseName, tableName,
130134
strings.Join(minUniqueColNames, ", "),
131135
strings.Join(maxUniqueColNames, ", "),

0 commit comments

Comments
 (0)