Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 70 additions & 24 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -186,7 +187,15 @@
}

// updateDupRow updates a duplicate row to a new row.
func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, txn kv.Transaction, row toBeCheckedRow, handle kv.Handle, onDuplicate []*expression.Assignment, autoColIdx int) error {
func (e *InsertExec) updateDupRow(
ctx context.Context,
idxInBatch int,
txn kv.Transaction,
row toBeCheckedRow,
handle kv.Handle,
_ []*expression.Assignment,
autoColIdx int,
) error {
oldRow, err := getOldRow(ctx, e.ctx, txn, row.t, handle, e.GenExprs)
if err != nil {
return err
Expand Down Expand Up @@ -384,8 +393,14 @@
}

// doDupRowUpdate updates the duplicate row.
func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle kv.Handle, oldRow []types.Datum, newRow []types.Datum,
extraCols []types.Datum, cols []*expression.Assignment, idxInBatch int, autoColIdx int) error {
func (e *InsertExec) doDupRowUpdate(
ctx context.Context,
handle kv.Handle,
oldRow, newRow, extraCols []types.Datum,
assigns []*expression.Assignment,
idxInBatch int,
autoColIdx int,
) error {
assignFlag := make([]bool, len(e.Table.WritableCols()))
// See http://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_values
e.curInsertVals.SetDatums(newRow...)
Expand All @@ -399,38 +414,69 @@
e.row4Update = append(e.row4Update, extraCols...)
e.row4Update = append(e.row4Update, newRow...)

// Update old row when the key is duplicated.
e.evalBuffer4Dup.SetDatums(e.row4Update...)
sc := e.ctx.GetSessionVars().StmtCtx
warnCnt := int(sc.WarningCount())
for _, col := range cols {
if col.LazyErr != nil {
return col.LazyErr
}
val, err1 := col.Expr.Eval(e.evalBuffer4Dup.ToRow())
if err1 != nil {
return err1
}
c := col.Col.ToInfo()
c.Name = col.ColName
e.row4Update[col.Col.Index], err1 = table.CastValue(e.ctx, val, c, false, false)
if err1 != nil {
return err1
// Only evaluate non-generated columns here,
// other fields will be evaluated in updateRecord.
var generated, nonGenerated []*expression.Assignment
cols := e.Table.Cols()
for _, assign := range assigns {
if cols[assign.Col.Index].IsGenerated() {
generated = append(generated, assign)
} else {
nonGenerated = append(nonGenerated, assign)
}
}

warnCnt := int(e.ctx.GetSessionVars().StmtCtx.WarningCount())
errorHandler := func(sctx sessionctx.Context, assign *expression.Assignment, val *types.Datum, err error) error {
c := assign.Col.ToInfo()
c.Name = assign.ColName
sc := sctx.GetSessionVars().StmtCtx

if newWarnings := sc.TruncateWarnings(warnCnt); len(newWarnings) > 0 {
for k := range newWarnings {
// Use `idxInBatch` here for simplicity, since the offset of the batch is unknown under the current context.
newWarnings[k].Err = completeInsertErr(c, &val, idxInBatch, newWarnings[k].Err)
newWarnings[k].Err = completeInsertErr(c, val, idxInBatch, newWarnings[k].Err)
}
sc.AppendWarnings(newWarnings)
warnCnt += len(newWarnings)
}
e.evalBuffer4Dup.SetDatum(col.Col.Index, e.row4Update[col.Col.Index])
assignFlag[col.Col.Index] = true
return err
}

// Update old row when the key is duplicated.
e.evalBuffer4Dup.SetDatums(e.row4Update...)
for _, assign := range nonGenerated {
var val types.Datum
if assign.LazyErr != nil {
return assign.LazyErr
}
val, err := assign.Expr.Eval(e.evalBuffer4Dup.ToRow())
if err != nil {
return err
}

Check warning on line 456 in executor/insert.go

View check run for this annotation

Codecov / codecov/patch

executor/insert.go#L455-L456

Added lines #L455 - L456 were not covered by tests

c := assign.Col.ToInfo()
idx := assign.Col.Index
c.Name = assign.ColName
val, err = table.CastValue(e.ctx, val, c, false, false)
if err != nil {
return err
}

Check warning on line 464 in executor/insert.go

View check run for this annotation

Codecov / codecov/patch

executor/insert.go#L463-L464

Added lines #L463 - L464 were not covered by tests

_ = errorHandler(e.ctx, assign, &val, nil)
e.evalBuffer4Dup.SetDatum(idx, val)
e.row4Update[assign.Col.Index] = val
assignFlag[assign.Col.Index] = true
}

newData := e.row4Update[:len(oldRow)]
_, err := updateRecord(ctx, e.ctx, handle, oldRow, newData, assignFlag, e.Table, true, e.memTracker, e.fkChecks, e.fkCascades)
_, err := updateRecord(
ctx, e.ctx,
handle, oldRow, newData,
0, generated, e.evalBuffer4Dup, errorHandler,
assignFlag, e.Table,
true, e.memTracker, e.fkChecks, e.fkCascades)

if err != nil {
return err
}
Expand Down
18 changes: 18 additions & 0 deletions executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,24 @@ func testInsertOnDuplicateKey(t *testing.T, tk *testkit.TestKit) {
tk.MustExec(`insert into t1 set c1 = 0.1`)
tk.MustExec(`insert into t1 set c1 = 0.1 on duplicate key update c1 = 1`)
tk.MustQuery(`select * from t1 use index(primary)`).Check(testkit.Rows(`1.0000`))

// Test issue 56829
tk.MustExec(`
CREATE TABLE cache (
cache_key varchar(512) NOT NULL,
updated_at datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
expired_at datetime GENERATED ALWAYS AS (if(expires > 0, date_add(updated_at, interval expires second), date_add(updated_at, interval 99 year))) VIRTUAL,
expires int(11),
PRIMARY KEY (cache_key) /*T![clustered_index] CLUSTERED */,
KEY idx_c_on_expired_at (expired_at)
)`)
tk.MustExec("INSERT INTO cache(cache_key, expires) VALUES ('2001-01-01 11:11:11', 60) ON DUPLICATE KEY UPDATE expires = expires + 1")
tk.MustExec("select sleep(1)")
tk.MustExec("INSERT INTO cache(cache_key, expires) VALUES ('2001-01-01 11:11:11', 60) ON DUPLICATE KEY UPDATE expires = expires + 1")
tk.MustExec("admin check table cache")
rs1 := tk.MustQuery("select cache_key, expired_at from cache use index() order by cache_key")
rs2 := tk.MustQuery("select cache_key, expired_at from cache use index(idx_c_on_expired_at) order by cache_key")
rs1.Check(rs2.Rows())
}

func TestClusterIndexInsertOnDuplicateKey(t *testing.T) {
Expand Down
Loading