Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion executor/batch_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor
import (
"github.com/juju/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
Expand Down Expand Up @@ -214,7 +215,6 @@ func (b *batchChecker) initDupOldRowFromUniqueKey(ctx sessionctx.Context, t tabl
return errors.Trace(err)
}
handles = append(handles, handle)
break
}
}
}
Expand Down Expand Up @@ -249,3 +249,29 @@ func (b *batchChecker) deleteDupKeys(row toBeCheckedRow) {
delete(b.dupKVs, string(uk.newKV.key))
}
}

// getOldRow gets the table record row from storage for batch check.
func (b *batchChecker) getOldRow(ctx sessionctx.Context, t table.Table, handle int64) (types.DatumRow, error) {
oldValue, ok := b.dupOldRowValues[string(t.RecordKey(handle))]
if !ok {
return nil, errors.NotFoundf("can not be duplicated row, due to old row not found. handle %d", handle)
}
cols := t.WritableCols()
oldRow, oldRowMap, err := tables.DecodeRawRowData(ctx, t.Meta(), handle, cols, oldValue)
if err != nil {
return nil, errors.Trace(err)
}
// Fill write-only and write-reorg columns with originDefaultValue if not found in oldValue.
for _, col := range cols {
if col.State != model.StatePublic && oldRow[col.Offset].IsNull() {
_, found := oldRowMap[col.ID]
if !found {
oldRow[col.Offset], err = table.GetColOriginDefaultValue(ctx, col.ToInfo())
if err != nil {
return nil, errors.Trace(err)
}
}
}
}
return oldRow, nil
}
25 changes: 2 additions & 23 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ import (
"github.com/juju/errors"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -207,30 +205,11 @@ func (e *InsertExec) Open(ctx context.Context) error {
}

// updateDupRow updates a duplicate row to a new row.
func (e *InsertExec) updateDupRow(row toBeCheckedRow, handle int64, onDuplicate []*expression.Assignment) (err error) {
// Get the table record row from storage for update.
oldValue, ok := e.dupOldRowValues[string(e.Table.RecordKey(handle))]
if !ok {
return errors.NotFoundf("can not be duplicated row, due to old row not found. handle %d", handle)
}
cols := e.Table.WritableCols()
oldRow, oldRowMap, err := tables.DecodeRawRowData(e.ctx, e.Table.Meta(), handle, cols, oldValue)
func (e *InsertExec) updateDupRow(row toBeCheckedRow, handle int64, onDuplicate []*expression.Assignment) error {
oldRow, err := e.getOldRow(e.ctx, e.Table, handle)
if err != nil {
return errors.Trace(err)
}
// Fill write-only and write-reorg columns with originDefaultValue if not found in oldValue.
for _, col := range cols {
if col.State != model.StatePublic && oldRow[col.Offset].IsNull() {
_, found := oldRowMap[col.ID]
if !found {
oldRow[col.Offset], err = table.GetColOriginDefaultValue(e.ctx, col.ToInfo())
if err != nil {
return errors.Trace(err)
}
}
}
}

// Do update row.
updatedRow, handleChanged, newHandle, err := e.doDupRowUpdate(handle, oldRow, row.row, onDuplicate)
if e.ctx.GetSessionVars().StmtCtx.DupKeyAsWarning && kv.ErrKeyExists.Equal(err) {
Expand Down
148 changes: 110 additions & 38 deletions executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package executor
import (
"github.com/juju/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"golang.org/x/net/context"
Expand Down Expand Up @@ -44,7 +46,56 @@ func (e *ReplaceExec) Open(ctx context.Context) error {
return nil
}

func (e *ReplaceExec) exec(rows []types.DatumRow) error {
// removeRow removes the duplicate row and cleanup its keys in the key-value map,
// but if the to-be-removed row equals to the to-be-added row, no remove or add things to do.
func (e *ReplaceExec) removeRow(handle int64, newRow types.DatumRow) (bool, error) {
oldRow, err := e.getOldRow(e.ctx, e.Table, handle)
if err != nil {
return false, errors.Trace(err)
}
rowUnchanged, err := types.EqualDatums(e.ctx.GetSessionVars().StmtCtx, oldRow, newRow)
if err != nil {
return false, errors.Trace(err)
}
if rowUnchanged {
e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)
return true, nil
}
err = e.Table.RemoveRecord(e.ctx, handle, oldRow)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the row is unchanged, we don't need to remove the row and insert it again.

if err != nil {
return false, errors.Trace(err)
}
e.ctx.StmtAddDirtyTableOP(DirtyTableDeleteRow, e.Table.Meta().ID, handle, nil)
e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)

// Cleanup keys map, because the record was removed.
cleanupRows, err := e.getKeysNeedCheck(e.ctx, e.Table, []types.DatumRow{oldRow})
if err != nil {
return false, errors.Trace(err)
}
if len(cleanupRows) > 0 {
// The length of need-to-cleanup rows should be at most 1, due to we only input 1 row.
e.deleteDupKeys(cleanupRows[0])
}
return false, nil
}

// addRow adds a row when all the duplicate key were checked.
func (e *ReplaceExec) addRow(row types.DatumRow) (int64, error) {
// Set kv.PresumeKeyNotExists is safe here, because we've already removed all duplicated rows.
e.ctx.Txn().SetOption(kv.PresumeKeyNotExists, nil)
h, err := e.Table.AddRecord(e.ctx, row, false)
e.ctx.Txn().DelOption(kv.PresumeKeyNotExists)
if err != nil {
return 0, errors.Trace(err)
}
if !e.ctx.GetSessionVars().ImportingData {
e.ctx.StmtAddDirtyTableOP(DirtyTableAddRow, e.Table.Meta().ID, h, row)
}
return h, nil
}

func (e *ReplaceExec) exec(newRows []types.DatumRow) error {
/*
* MySQL uses the following algorithm for REPLACE (and LOAD DATA ... REPLACE):
* 1. Try to insert the new row into the table
Expand All @@ -57,46 +108,67 @@ func (e *ReplaceExec) exec(rows []types.DatumRow) error {
* because in this case, one row was inserted after the duplicate was deleted.
* See http://dev.mysql.com/doc/refman/5.7/en/mysql-affected-rows.html
*/
idx := 0
rowsLen := len(rows)
sc := e.ctx.GetSessionVars().StmtCtx
for {
if idx >= rowsLen {
err := e.batchGetInsertKeys(e.ctx, e.Table, newRows)
if err != nil {
return errors.Trace(err)
}

// Batch get the to-be-replaced rows in storage.
err = e.initDupOldRowValue(e.ctx, e.Table, newRows)
if err != nil {
return errors.Trace(err)
}

for i, r := range e.toBeCheckedRows {
for {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we extract the inner for loop into its own function.
It's hard to see where the code is going to execute after break and continute.

rowUnchanged := false
if r.handleKey != nil {
if _, found := e.dupKVs[string(r.handleKey.newKV.key)]; found {
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKV.key)
if err != nil {
return errors.Trace(err)
}
rowUnchanged, err = e.removeRow(handle, newRows[i])
if err != nil {
return errors.Trace(err)
}
if rowUnchanged {
break
}
continue
}
}

foundDupKey := false
for _, uk := range r.uniqueKeys {
if val, found := e.dupKVs[string(uk.newKV.key)]; found {
handle, err := tables.DecodeHandle(val)
if err != nil {
return errors.Trace(err)
}
rowUnchanged, err = e.removeRow(handle, newRows[i])
if err != nil {
return errors.Trace(err)
}
foundDupKey = true
break
}
}
if rowUnchanged {
break
}
if foundDupKey {
continue
}
// No duplicated rows now, insert the row and go to the next row.
newHandle, err := e.addRow(newRows[i])
if err != nil {
return errors.Trace(err)
}
e.fillBackKeys(e.Table, r, newHandle)
break
}
row := rows[idx]
h, err1 := e.Table.AddRecord(e.ctx, row, false)
if err1 == nil {
e.ctx.StmtAddDirtyTableOP(DirtyTableAddRow, e.Table.Meta().ID, h, row)
idx++
continue
}
if err1 != nil && !kv.ErrKeyExists.Equal(err1) {
return errors.Trace(err1)
}
oldRow, err1 := e.Table.Row(e.ctx, h)
if err1 != nil {
return errors.Trace(err1)
}
rowUnchanged, err1 := types.EqualDatums(sc, oldRow, row)
if err1 != nil {
return errors.Trace(err1)
}
if rowUnchanged {
// If row unchanged, we do not need to do insert.
e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)
idx++
continue
}
// Remove current row and try replace again.
err1 = e.Table.RemoveRecord(e.ctx, h, oldRow)
if err1 != nil {
return errors.Trace(err1)
}
e.ctx.StmtAddDirtyTableOP(DirtyTableDeleteRow, e.Table.Meta().ID, h, nil)
e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)
}

if e.lastInsertID != 0 {
e.ctx.GetSessionVars().SetLastInsertID(e.lastInsertID)
}
Expand Down