Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion executor/batch_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor
import (
"github.com/juju/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
Expand Down Expand Up @@ -214,7 +215,6 @@ func (b *batchChecker) initDupOldRowFromUniqueKey(ctx sessionctx.Context, t tabl
return errors.Trace(err)
}
handles = append(handles, handle)
break
}
}
}
Expand Down Expand Up @@ -249,3 +249,29 @@ func (b *batchChecker) deleteDupKeys(row toBeCheckedRow) {
delete(b.dupKVs, string(uk.newKV.key))
}
}

// getOldRow gets the table record row from storage for batch check.
func (b *batchChecker) getOldRow(ctx sessionctx.Context, t table.Table, handle int64) (types.DatumRow, error) {
oldValue, ok := b.dupOldRowValues[string(t.RecordKey(handle))]
if !ok {
return nil, errors.NotFoundf("can not be duplicated row, due to old row not found. handle %d", handle)
}
cols := t.WritableCols()
oldRow, oldRowMap, err := tables.DecodeRawRowData(ctx, t.Meta(), handle, cols, oldValue)
if err != nil {
return nil, errors.Trace(err)
}
// Fill write-only and write-reorg columns with originDefaultValue if not found in oldValue.
for _, col := range cols {
if col.State != model.StatePublic && oldRow[col.Offset].IsNull() {
_, found := oldRowMap[col.ID]
if !found {
oldRow[col.Offset], err = table.GetColOriginDefaultValue(ctx, col.ToInfo())
if err != nil {
return nil, errors.Trace(err)
}
}
}
}
return oldRow, nil
}
25 changes: 2 additions & 23 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ import (
"github.com/juju/errors"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -207,30 +205,11 @@ func (e *InsertExec) Open(ctx context.Context) error {
}

// updateDupRow updates a duplicate row to a new row.
func (e *InsertExec) updateDupRow(row toBeCheckedRow, handle int64, onDuplicate []*expression.Assignment) (err error) {
// Get the table record row from storage for update.
oldValue, ok := e.dupOldRowValues[string(e.Table.RecordKey(handle))]
if !ok {
return errors.NotFoundf("can not be duplicated row, due to old row not found. handle %d", handle)
}
cols := e.Table.WritableCols()
oldRow, oldRowMap, err := tables.DecodeRawRowData(e.ctx, e.Table.Meta(), handle, cols, oldValue)
func (e *InsertExec) updateDupRow(row toBeCheckedRow, handle int64, onDuplicate []*expression.Assignment) error {
oldRow, err := e.getOldRow(e.ctx, e.Table, handle)
if err != nil {
return errors.Trace(err)
}
// Fill write-only and write-reorg columns with originDefaultValue if not found in oldValue.
for _, col := range cols {
if col.State != model.StatePublic && oldRow[col.Offset].IsNull() {
_, found := oldRowMap[col.ID]
if !found {
oldRow[col.Offset], err = table.GetColOriginDefaultValue(e.ctx, col.ToInfo())
if err != nil {
return errors.Trace(err)
}
}
}
}

// Do update row.
updatedRow, handleChanged, newHandle, err := e.doDupRowUpdate(handle, oldRow, row.row, onDuplicate)
if e.ctx.GetSessionVars().StmtCtx.DupKeyAsWarning && kv.ErrKeyExists.Equal(err) {
Expand Down
172 changes: 133 additions & 39 deletions executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package executor
import (
"github.com/juju/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"golang.org/x/net/context"
Expand Down Expand Up @@ -44,7 +46,122 @@ func (e *ReplaceExec) Open(ctx context.Context) error {
return nil
}

func (e *ReplaceExec) exec(rows []types.DatumRow) error {
// removeRow removes the duplicate row and cleanup its keys in the key-value map,
// but if the to-be-removed row equals to the to-be-added row, no remove or add things to do.
func (e *ReplaceExec) removeRow(handle int64, newRow types.DatumRow) (bool, error) {
oldRow, err := e.getOldRow(e.ctx, e.Table, handle)
if err != nil {
return false, errors.Trace(err)
}
rowUnchanged, err := types.EqualDatums(e.ctx.GetSessionVars().StmtCtx, oldRow, newRow)
if err != nil {
return false, errors.Trace(err)
}
if rowUnchanged {
e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)
return true, nil
}
err = e.Table.RemoveRecord(e.ctx, handle, oldRow)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the row is unchanged, we don't need to remove the row and insert it again.

if err != nil {
return false, errors.Trace(err)
}
e.ctx.StmtAddDirtyTableOP(DirtyTableDeleteRow, e.Table.Meta().ID, handle, nil)
e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)

// Cleanup keys map, because the record was removed.
cleanupRows, err := e.getKeysNeedCheck(e.ctx, e.Table, []types.DatumRow{oldRow})
if err != nil {
return false, errors.Trace(err)
}
if len(cleanupRows) > 0 {
// The length of need-to-cleanup rows should be at most 1, due to we only input 1 row.
e.deleteDupKeys(cleanupRows[0])
}
return false, nil
}

// addRow adds a row when all the duplicate key were checked.
func (e *ReplaceExec) addRow(row types.DatumRow) (int64, error) {
// Set kv.PresumeKeyNotExists is safe here, because we've already removed all duplicated rows.
e.ctx.Txn().SetOption(kv.PresumeKeyNotExists, nil)
h, err := e.Table.AddRecord(e.ctx, row, false)
e.ctx.Txn().DelOption(kv.PresumeKeyNotExists)
if err != nil {
return 0, errors.Trace(err)
}
if !e.ctx.GetSessionVars().ImportingData {
e.ctx.StmtAddDirtyTableOP(DirtyTableAddRow, e.Table.Meta().ID, h, row)
}
return h, nil
}

// replaceRow remove all duplicate rows for one row, then insert it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/remove/removes
s/insert/inserts

func (e *ReplaceExec) replaceRow(r toBeCheckedRow) error {
// Keep on removing duplicated rows.
for {
if r.handleKey != nil {
if _, found := e.dupKVs[string(r.handleKey.newKV.key)]; found {
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKV.key)
if err != nil {
return errors.Trace(err)
}
rowUnchanged, err := e.removeRow(handle, r.row)
if err != nil {
return errors.Trace(err)
}
if rowUnchanged {
return nil
}
continue
}
}

rowUnchanged, foundDupKey, err := e.removeIndexRow(r)
if err != nil {
return errors.Trace(err)
}
if rowUnchanged {
return nil
}
if foundDupKey {
continue
}
break
}

// No duplicated rows now, insert the row.
newHandle, err := e.addRow(r.row)
if err != nil {
return errors.Trace(err)
}
e.fillBackKeys(e.Table, r, newHandle)
return nil
}

// removeIndexRow removes the row which has a duplicated key.
// the return values:
// 1. bool: true when the row is unchanged. This means no need to remove, and then add the row.
// 2. bool: true when found the duplicated key. This only means that duplicated key was found,
// and the row was removed.
// 3. error: the error.
func (e *ReplaceExec) removeIndexRow(r toBeCheckedRow) (bool, bool, error) {
for _, uk := range r.uniqueKeys {
if val, found := e.dupKVs[string(uk.newKV.key)]; found {
handle, err := tables.DecodeHandle(val)
if err != nil {
return false, found, errors.Trace(err)
}
rowUnchanged, err := e.removeRow(handle, r.row)
if err != nil {
return false, found, errors.Trace(err)
}
return rowUnchanged, found, nil
}
}
return false, false, nil
}

func (e *ReplaceExec) exec(newRows []types.DatumRow) error {
/*
* MySQL uses the following algorithm for REPLACE (and LOAD DATA ... REPLACE):
* 1. Try to insert the new row into the table
Expand All @@ -57,46 +174,23 @@ func (e *ReplaceExec) exec(rows []types.DatumRow) error {
* because in this case, one row was inserted after the duplicate was deleted.
* See http://dev.mysql.com/doc/refman/5.7/en/mysql-affected-rows.html
*/
idx := 0
rowsLen := len(rows)
sc := e.ctx.GetSessionVars().StmtCtx
for {
if idx >= rowsLen {
break
}
row := rows[idx]
h, err1 := e.Table.AddRecord(e.ctx, row, false)
if err1 == nil {
e.ctx.StmtAddDirtyTableOP(DirtyTableAddRow, e.Table.Meta().ID, h, row)
idx++
continue
}
if err1 != nil && !kv.ErrKeyExists.Equal(err1) {
return errors.Trace(err1)
}
oldRow, err1 := e.Table.Row(e.ctx, h)
if err1 != nil {
return errors.Trace(err1)
}
rowUnchanged, err1 := types.EqualDatums(sc, oldRow, row)
if err1 != nil {
return errors.Trace(err1)
}
if rowUnchanged {
// If row unchanged, we do not need to do insert.
e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)
idx++
continue
}
// Remove current row and try replace again.
err1 = e.Table.RemoveRecord(e.ctx, h, oldRow)
if err1 != nil {
return errors.Trace(err1)
}
e.ctx.StmtAddDirtyTableOP(DirtyTableDeleteRow, e.Table.Meta().ID, h, nil)
e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)
err := e.batchGetInsertKeys(e.ctx, e.Table, newRows)
if err != nil {
return errors.Trace(err)
}

// Batch get the to-be-replaced rows in storage.
err = e.initDupOldRowValue(e.ctx, e.Table, newRows)
if err != nil {
return errors.Trace(err)
}

for _, r := range e.toBeCheckedRows {
err = e.replaceRow(r)
if err != nil {
return errors.Trace(err)
}
}
if e.lastInsertID != 0 {
e.ctx.GetSessionVars().SetLastInsertID(e.lastInsertID)
}
Expand Down