Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions doc/command-line-flags.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,12 @@ List of metrics and threshold values; topping the threshold of any will cause th

Typically `gh-ost` is used to migrate tables on a master. If you wish to only perform the migration in full on a replica, connect `gh-ost` to said replica and pass `--migrate-on-replica`. `gh-ost` will briefly connect to the master but otherwise will make no changes on the master. Migration will be fully executed on the replica, while making sure to maintain a small replication lag.

### panic-on-warnings

When this flag is set, `gh-ost` will panic when SQL warnings indicating data loss are encountered when copying data. This flag helps prevent data loss scenarios with migrations touching unique keys, column collation and types, as well as `NOT NULL` constraints, where `MySQL` will silently drop inserted rows that no longer satisfy the updated constraint (also dependent on the configured `sql_mode`).

While `panic-on-warnings` is currently disabled by defaults, it will default to `true` in a future version of `gh-ost`.

### postpone-cut-over-flag-file

Indicate a file name, such that the final [cut-over](cut-over.md) step does not take place as long as the file exists.
Expand Down
2 changes: 2 additions & 0 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ type MigrationContext struct {
HooksHintOwner string
HooksHintToken string
HooksStatusIntervalSec int64
PanicOnWarnings bool

DropServeSocket bool
ServeSocketFile string
Expand Down Expand Up @@ -231,6 +232,7 @@ type MigrationContext struct {
ColumnRenameMap map[string]string
DroppedColumnsMap map[string]bool
MappedSharedColumns *sql.ColumnList
MigrationLastInsertSQLWarnings []string
MigrationRangeMinValues *sql.ColumnValues
MigrationRangeMaxValues *sql.ColumnValues
Iteration int64
Expand Down
1 change: 1 addition & 0 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func main() {
chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 10-100,000)")
dmlBatchSize := flag.Int64("dml-batch-size", 10, "batch size for DML events to apply in a single transaction (range 1-100)")
defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking")
flag.BoolVar(&migrationContext.PanicOnWarnings, "panic-on-warnings", false, "Panic when SQL warnings are encountered when copying a batch indicating data loss")
cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout) or attempting instant DDL")
niceRatio := flag.Float64("nice-ratio", 0, "force being 'nice', imply sleep time per chunk time; range: [0.0..100.0]. Example values: 0 is aggressive. 1: for every 1ms spent copying rows, sleep additional 1ms (effectively doubling runtime); 0.7: for every 10ms spend in a rowcopy chunk, spend 7ms sleeping immediately after")

Expand Down
54 changes: 45 additions & 9 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package logic
import (
gosql "database/sql"
"fmt"
"regexp"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -662,7 +663,7 @@ func (this *Applier) ReadMigrationRangeValues() error {
// which will be used for copying the next chunk of rows. Ir returns "false" if there is
// no further chunk to work through, i.e. we're past the last chunk and are done with
// iterating the range (and this done with copying row chunks)
func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error) {
func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, expectedRowCount int64, err error) {
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMaxValues
if this.migrationContext.MigrationIterationRangeMinValues == nil {
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues
Expand All @@ -683,32 +684,36 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
fmt.Sprintf("iteration:%d", this.migrationContext.GetIteration()),
)
if err != nil {
return hasFurtherRange, err
return hasFurtherRange, expectedRowCount, err
}

rows, err := this.db.Query(query, explodedArgs...)
if err != nil {
return hasFurtherRange, err
return hasFurtherRange, expectedRowCount, err
}
defer rows.Close()

iterationRangeMaxValues := sql.NewColumnValues(this.migrationContext.UniqueKey.Len())
iterationRangeMaxValues := sql.NewColumnValues(this.migrationContext.UniqueKey.Len() + 1)
for rows.Next() {
if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil {
return hasFurtherRange, err
return hasFurtherRange, expectedRowCount, err
}
hasFurtherRange = true

expectedRowCount = (*iterationRangeMaxValues.ValuesPointers[len(iterationRangeMaxValues.ValuesPointers)-1].(*interface{})).(int64)
iterationRangeMaxValues = sql.ToColumnValues(iterationRangeMaxValues.AbstractValues()[:len(iterationRangeMaxValues.AbstractValues())-1])

hasFurtherRange = expectedRowCount > 0
}
if err = rows.Err(); err != nil {
return hasFurtherRange, err
return hasFurtherRange, expectedRowCount, err
}
if hasFurtherRange {
this.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues
return hasFurtherRange, nil
return hasFurtherRange, expectedRowCount, nil
}
}
this.migrationContext.Log.Debugf("Iteration complete: no further range to iterate")
return hasFurtherRange, nil
return hasFurtherRange, expectedRowCount, nil
}

// ApplyIterationInsertQuery issues a chunk-INSERT query on the ghost table. It is where
Expand Down Expand Up @@ -753,6 +758,37 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
if err != nil {
return nil, err
}

if this.migrationContext.PanicOnWarnings {
//nolint:execinquery
rows, err := tx.Query("SHOW WARNINGS")
if err != nil {
return nil, err
}
defer rows.Close()
if err = rows.Err(); err != nil {
return nil, err
}

var sqlWarnings []string
for rows.Next() {
var level, message string
var code int
if err := rows.Scan(&level, &code, &message); err != nil {
this.migrationContext.Log.Warningf("Failed to read SHOW WARNINGS row")
continue
}
// Duplicate warnings are formatted differently across mysql versions, hence the optional table name prefix
migrationUniqueKeyExpression := fmt.Sprintf("for key '(%s\\.)?%s'", this.migrationContext.GetGhostTableName(), this.migrationContext.UniqueKey.NameInGhostTable)
matched, _ := regexp.MatchString(migrationUniqueKeyExpression, message)
if strings.Contains(message, "Duplicate entry") && matched {
continue
}
sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code))
}
this.migrationContext.MigrationLastInsertSQLWarnings = sqlWarnings
}

if err := tx.Commit(); err != nil {
return nil, err
}
Expand Down
93 changes: 93 additions & 0 deletions go/logic/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,99 @@ func (suite *ApplierTestSuite) TestCreateGhostTable() {
suite.Require().Equal("CREATE TABLE `_testing_gho` (\n `id` int DEFAULT NULL,\n `item_id` int DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci", createDDL)
}

func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQuerySucceedsWithUniqueKeyWarningInsertedByDMLEvent() {
ctx := context.Background()

var err error

_, err = suite.db.ExecContext(ctx, "CREATE TABLE test.testing (id INT, item_id INT, UNIQUE KEY (item_id));")
suite.Require().NoError(err)

_, err = suite.db.ExecContext(ctx, "CREATE TABLE test._testing_gho (id INT, item_id INT, UNIQUE KEY (item_id));")
suite.Require().NoError(err)

connectionConfig, err := GetConnectionConfig(ctx, suite.mysqlContainer)
suite.Require().NoError(err)

migrationContext := base.NewMigrationContext()
migrationContext.ApplierConnectionConfig = connectionConfig
migrationContext.DatabaseName = "test"
migrationContext.SkipPortValidation = true
migrationContext.OriginalTableName = "testing"
migrationContext.SetConnectionConfig("innodb")

migrationContext.PanicOnWarnings = true

migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "item_id"})
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "item_id"})
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "item_id"})
migrationContext.UniqueKey = &sql.UniqueKey{
Name: "item_id",
NameInGhostTable: "item_id",
Columns: *sql.NewColumnList([]string{"item_id"}),
}

applier := NewApplier(migrationContext)
suite.Require().NoError(applier.prepareQueries())
defer applier.Teardown()

err = applier.InitDBConnections()
suite.Require().NoError(err)

_, err = suite.db.ExecContext(ctx, "INSERT INTO test.testing (id, item_id) VALUES (123456, 42);")
suite.Require().NoError(err)

dmlEvents := []*binlog.BinlogDMLEvent{
{
DatabaseName: "test",
TableName: "testing",
DML: binlog.InsertDML,
NewColumnValues: sql.ToColumnValues([]interface{}{123456, 42}),
},
}
err = applier.ApplyDMLEventQueries(dmlEvents)
suite.Require().NoError(err)

err = applier.CreateChangelogTable()
suite.Require().NoError(err)
err = applier.ReadMigrationRangeValues()
suite.Require().NoError(err)

hasFurtherRange, expectedRangeSize, err := applier.CalculateNextIterationRangeEndValues()
suite.Require().NoError(err)
suite.Require().True(hasFurtherRange)
suite.Require().Equal(int64(1), expectedRangeSize)

_, rowsAffected, _, err := applier.ApplyIterationInsertQuery()
suite.Require().NoError(err)
suite.Require().Equal(int64(0), rowsAffected)

// Ensure Duplicate entry '42' for key '_testing_gho.item_id' is ignored correctly
suite.Require().Empty(applier.migrationContext.MigrationLastInsertSQLWarnings)

// Check that the row was inserted
rows, err := suite.db.Query("SELECT * FROM test._testing_gho")
suite.Require().NoError(err)
defer rows.Close()

var count, id, item_id int
for rows.Next() {
err = rows.Scan(&id, &item_id)
suite.Require().NoError(err)
count += 1
}
suite.Require().NoError(rows.Err())

suite.Require().Equal(1, count)
suite.Require().Equal(123456, id)
suite.Require().Equal(42, item_id)

suite.Require().
Equal(int64(1), migrationContext.TotalDMLEventsApplied)
suite.Require().
Equal(int64(0), migrationContext.RowsDeltaEstimate)
}

func TestApplier(t *testing.T) {
suite.Run(t, new(ApplierTestSuite))
}
3 changes: 3 additions & 0 deletions go/logic/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,9 @@ func (this *Inspector) getSharedUniqueKeys(originalUniqueKeys, ghostUniqueKeys [
for _, originalUniqueKey := range originalUniqueKeys {
for _, ghostUniqueKey := range ghostUniqueKeys {
if originalUniqueKey.Columns.IsSubsetOf(&ghostUniqueKey.Columns) {
// In case the unique key gets renamed in -alter, PanicOnWarnings needs to rely on the new name
// to check SQL warnings on the ghost table, so return new name here.
originalUniqueKey.NameInGhostTable = ghostUniqueKey.Name
uniqueKeys = append(uniqueKeys, originalUniqueKey)
break
}
Expand Down
16 changes: 15 additions & 1 deletion go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1238,8 +1238,9 @@ func (this *Migrator) iterateChunks() error {
// When hasFurtherRange is false, original table might be write locked and CalculateNextIterationRangeEndValues would hangs forever

hasFurtherRange := false
expectedRangeSize := int64(0)
if err := this.retryOperation(func() (e error) {
hasFurtherRange, e = this.applier.CalculateNextIterationRangeEndValues()
hasFurtherRange, expectedRangeSize, e = this.applier.CalculateNextIterationRangeEndValues()
return e
}); err != nil {
return terminateRowIteration(err)
Expand All @@ -1265,6 +1266,19 @@ func (this *Migrator) iterateChunks() error {
if err != nil {
return err // wrapping call will retry
}

if this.migrationContext.PanicOnWarnings {
if len(this.migrationContext.MigrationLastInsertSQLWarnings) > 0 {
for _, warning := range this.migrationContext.MigrationLastInsertSQLWarnings {
this.migrationContext.Log.Infof("ApplyIterationInsertQuery has SQL warnings! %s", warning)
}
if expectedRangeSize != rowsAffected {
joinedWarnings := strings.Join(this.migrationContext.MigrationLastInsertSQLWarnings, "; ")
terminateRowIteration(fmt.Errorf("ApplyIterationInsertQuery failed because of SQL warnings: [%s]", joinedWarnings))
}
}
}

atomic.AddInt64(&this.migrationContext.TotalRowsCopied, rowsAffected)
atomic.AddInt64(&this.migrationContext.Iteration, 1)
return nil
Expand Down
69 changes: 53 additions & 16 deletions go/sql/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,36 +275,54 @@ func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string

uniqueKeyColumnNames := duplicateNames(uniqueKeyColumns.Names())
uniqueKeyColumnAscending := make([]string, len(uniqueKeyColumnNames))
uniqueKeyColumnDescending := make([]string, len(uniqueKeyColumnNames))
for i, column := range uniqueKeyColumns.Columns() {
uniqueKeyColumnNames[i] = EscapeName(uniqueKeyColumnNames[i])
if column.Type == EnumColumnType {
uniqueKeyColumnAscending[i] = fmt.Sprintf("concat(%s) asc", uniqueKeyColumnNames[i])
uniqueKeyColumnDescending[i] = fmt.Sprintf("concat(%s) desc", uniqueKeyColumnNames[i])
} else {
uniqueKeyColumnAscending[i] = fmt.Sprintf("%s asc", uniqueKeyColumnNames[i])
uniqueKeyColumnDescending[i] = fmt.Sprintf("%s desc", uniqueKeyColumnNames[i])
}
}
joinedColumnNames := strings.Join(uniqueKeyColumnNames, ", ")
result = fmt.Sprintf(`
select /* gh-ost %s.%s %s */
%s
from
%s.%s
where
%s and %s
%s,
(select count(*) from (
select
%s
from
%s.%s
where
%s and %s
limit
%d
) select_osc_chunk)
from (
select
%s
from
%s.%s
where
%s and %s
limit
%d
) select_osc_chunk
order by
%s
limit 1
offset %d`,
databaseName, tableName, hint,
strings.Join(uniqueKeyColumnNames, ", "),
joinedColumnNames, joinedColumnNames,
databaseName, tableName,
rangeStartComparison, rangeEndComparison,
rangeStartComparison, rangeEndComparison, chunkSize,
joinedColumnNames,
databaseName, tableName,
rangeStartComparison, rangeEndComparison, chunkSize,
strings.Join(uniqueKeyColumnAscending, ", "),
(chunkSize - 1),
)
return result, explodedArgs, nil
// 2x the explodedArgs for the subquery (CTE would be possible but not supported by MySQL 5)
return result, append(explodedArgs, explodedArgs...), nil
}

func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) {
Expand Down Expand Up @@ -342,8 +360,22 @@ func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName str
uniqueKeyColumnDescending[i] = fmt.Sprintf("%s desc", uniqueKeyColumnNames[i])
}
}

joinedColumnNames := strings.Join(uniqueKeyColumnNames, ", ")
result = fmt.Sprintf(`
select /* gh-ost %s.%s %s */ %s
select /* gh-ost %s.%s %s */
%s,
(select count(*) from (
select
%s
from
%s.%s
where
%s and %s
order by
%s
limit %d
) select_osc_chunk)
from (
select
%s
Expand All @@ -353,17 +385,22 @@ func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName str
%s and %s
order by
%s
limit %d) select_osc_chunk
limit %d
) select_osc_chunk
order by
%s
limit 1`,
databaseName, tableName, hint, strings.Join(uniqueKeyColumnNames, ", "),
strings.Join(uniqueKeyColumnNames, ", "), databaseName, tableName,
databaseName, tableName, hint, joinedColumnNames,
joinedColumnNames, databaseName, tableName,
rangeStartComparison, rangeEndComparison,
strings.Join(uniqueKeyColumnAscending, ", "), chunkSize,
joinedColumnNames, databaseName, tableName,
rangeStartComparison, rangeEndComparison,
strings.Join(uniqueKeyColumnAscending, ", "), chunkSize,
strings.Join(uniqueKeyColumnDescending, ", "),
)
return result, explodedArgs, nil
// 2x the explodedArgs for the subquery (CTE would be possible but not supported by MySQL 5)
return result, append(explodedArgs, explodedArgs...), nil
}

func BuildUniqueKeyMinValuesPreparedQuery(databaseName, tableName string, uniqueKey *UniqueKey) (string, error) {
Expand Down
Loading
Loading