Skip to content
107 changes: 74 additions & 33 deletions pkg/executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
Expand Down Expand Up @@ -193,7 +194,16 @@ func (e *InsertValues) prefetchDataCache(ctx context.Context, txn kv.Transaction
}

// updateDupRow updates a duplicate row to a new row.
func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, txn kv.Transaction, row toBeCheckedRow, handle kv.Handle, _ []*expression.Assignment, dupKeyCheck table.DupKeyCheckMode, autoColIdx int) error {
func (e *InsertExec) updateDupRow(
ctx context.Context,
idxInBatch int,
txn kv.Transaction,
row toBeCheckedRow,
handle kv.Handle,
_ []*expression.Assignment,
dupKeyCheck table.DupKeyCheckMode,
autoColIdx int,
) error {
oldRow, err := getOldRow(ctx, e.Ctx(), txn, row.t, handle, e.GenExprs)
if err != nil {
return err
Expand Down Expand Up @@ -430,8 +440,15 @@ func (e *InsertExec) initEvalBuffer4Dup() {
}

// doDupRowUpdate updates the duplicate row.
func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle kv.Handle, oldRow []types.Datum, newRow []types.Datum,
extraCols []types.Datum, cols []*expression.Assignment, idxInBatch int, dupKeyMode table.DupKeyCheckMode, autoColIdx int) error {
func (e *InsertExec) doDupRowUpdate(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It only fixes the INSERT .. ON DUPLICATE UPDATE .. case. As I have tested, the UPDATE path also has the same bug:

CREATE TABLE cache (
  cache_key varchar(512) NOT NULL,
  updated_at datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  expired_at datetime GENERATED ALWAYS AS (if(expires > 0, date_add(updated_at, interval expires second), date_add(updated_at, interval 99 year))) VIRTUAL,
  expires int(11),
  PRIMARY KEY (cache_key) /*T![clustered_index] CLUSTERED */,
  KEY idx_c_on_expired_at (expired_at)
);

INSERT INTO cache(cache_key, expires) VALUES ('2001-01-01 11:11:11', 60) ON DUPLICATE KEY UPDATE expires = expires + 1;
update cache set expires = expires + 1 where cache_key = '2001-01-01 11:11:11';

Then the following two queries will have different result:

select /*+ force_index(test.cache, idx_c_on_expired_at) */ cache_key, expired_at from cache order by cache_key;
select /*+ ignore_index(test.cache, idx_c_on_expired_at) */ cache_key, expired_at from cache order by cache_key;

ctx context.Context,
handle kv.Handle,
oldRow, newRow, extraCols []types.Datum,
assigns []*expression.Assignment,
idxInBatch int,
dupKeyMode table.DupKeyCheckMode,
autoColIdx int,
) error {
assignFlag := make([]bool, len(e.Table.WritableCols()))
// See http://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_values
e.curInsertVals.SetDatums(newRow...)
Expand All @@ -445,53 +462,77 @@ func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle kv.Handle, oldRo
e.row4Update = append(e.row4Update, extraCols...)
e.row4Update = append(e.row4Update, newRow...)

// Update old row when the key is duplicated.
e.evalBuffer4Dup.SetDatums(e.row4Update...)
sctx := e.Ctx()
evalCtx := sctx.GetExprCtx().GetEvalCtx()
sc := sctx.GetSessionVars().StmtCtx
warnCnt := int(sc.WarningCount())
for _, col := range cols {
if col.LazyErr != nil {
return col.LazyErr
}
val, err1 := col.Expr.Eval(evalCtx, e.evalBuffer4Dup.ToRow())
if err1 != nil {
return err1
}
c := col.Col.ToInfo()
c.Name = col.ColName
e.row4Update[col.Col.Index], err1 = table.CastValue(sctx, val, c, false, false)
if err1 != nil {
return err1
// Only evaluate non-generated columns here,
// other fields will be evaluated in updateRecord.
var generated, nonGenerated []*expression.Assignment
cols := e.Table.Cols()
for _, assign := range assigns {
if cols[assign.Col.Index].IsGenerated() {
generated = append(generated, assign)
} else {
nonGenerated = append(nonGenerated, assign)
}
}

warnCnt := int(e.Ctx().GetSessionVars().StmtCtx.WarningCount())
errorHandler := func(sctx sessionctx.Context, assign *expression.Assignment, val *types.Datum, err error) error {
c := assign.Col.ToInfo()
c.Name = assign.ColName
sc := sctx.GetSessionVars().StmtCtx

if newWarnings := sc.TruncateWarnings(warnCnt); len(newWarnings) > 0 {
for k := range newWarnings {
// Use `idxInBatch` here for simplicity, since the offset of the batch is unknown under the current context.
newWarnings[k].Err = completeInsertErr(c, &val, idxInBatch, newWarnings[k].Err)
newWarnings[k].Err = completeInsertErr(c, val, idxInBatch, newWarnings[k].Err)
}
sc.AppendWarnings(newWarnings)
warnCnt += len(newWarnings)
}
e.evalBuffer4Dup.SetDatum(col.Col.Index, e.row4Update[col.Col.Index])
assignFlag[col.Col.Index] = true
return err
}

newData := e.row4Update[:len(oldRow)]
if e.ignoreErr {
ignored, err := checkFKIgnoreErr(ctx, e.Ctx(), e.fkChecks, newData)
// Update old row when the key is duplicated.
e.evalBuffer4Dup.SetDatums(e.row4Update...)
sctx := e.Ctx()
evalCtx := sctx.GetExprCtx().GetEvalCtx()
for _, assign := range nonGenerated {
var val types.Datum
if assign.LazyErr != nil {
return assign.LazyErr
}
val, err := assign.Expr.Eval(evalCtx, e.evalBuffer4Dup.ToRow())
if err != nil {
return err
}

// meets an error, skip this row.
if ignored {
return nil
c := assign.Col.ToInfo()
idx := assign.Col.Index
c.Name = assign.ColName
val, err = table.CastValue(sctx, val, c, false, false)
if err != nil {
return err
}

_ = errorHandler(sctx, assign, &val, nil)
e.evalBuffer4Dup.SetDatum(idx, val)
e.row4Update[assign.Col.Index] = val
assignFlag[assign.Col.Index] = true
}
_, err := updateRecord(ctx, e.Ctx(), handle, oldRow, newData, assignFlag, e.Table, true, e.memTracker, e.fkChecks, e.fkCascades, dupKeyMode, e.ignoreErr)

newData := e.row4Update[:len(oldRow)]
_, ignored, err := updateRecord(
ctx, e.Ctx(),
handle, oldRow, newData,
0, generated, e.evalBuffer4Dup, errorHandler,
assignFlag, e.Table,
true, e.memTracker, e.fkChecks, e.fkCascades, dupKeyMode, e.ignoreErr)

if ignored {
return nil
}

if err != nil {
return err
return errors.Trace(err)
}

if autoColIdx >= 0 {
Expand Down
18 changes: 18 additions & 0 deletions pkg/executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,24 @@ func testInsertOnDuplicateKey(t *testing.T, tk *testkit.TestKit) {
"<nil>/<nil>/x/1.2",
"<nil>/<nil>/x/1.2"))

// Test issue 56829
tk.MustExec(`
CREATE TABLE cache (
cache_key varchar(512) NOT NULL,
updated_at datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
expired_at datetime GENERATED ALWAYS AS (if(expires > 0, date_add(updated_at, interval expires second), date_add(updated_at, interval 99 year))) VIRTUAL,
expires int(11),
PRIMARY KEY (cache_key) /*T![clustered_index] CLUSTERED */,
KEY idx_c_on_expired_at (expired_at)
)`)
tk.MustExec("INSERT INTO cache(cache_key, expires) VALUES ('2001-01-01 11:11:11', 60) ON DUPLICATE KEY UPDATE expires = expires + 1")
tk.MustExec("select sleep(1)")
tk.MustExec("INSERT INTO cache(cache_key, expires) VALUES ('2001-01-01 11:11:11', 60) ON DUPLICATE KEY UPDATE expires = expires + 1")
tk.MustExec("admin check table cache")
rs1 := tk.MustQuery("select cache_key, expired_at from cache use index() order by cache_key")
rs2 := tk.MustQuery("select cache_key, expired_at from cache use index(idx_c_on_expired_at) order by cache_key")
require.True(t, rs1.Equal(rs2.Rows()))

// reproduce insert on duplicate key update bug under new row format.
tk.MustExec(`drop table if exists t1`)
tk.MustExec(`create table t1(c1 decimal(6,4), primary key(c1))`)
Expand Down
Loading