Skip to content

Commit f158c65

Browse files
authored
table: provide some binlog related methods for binlog in MutateContext (#54433)
ref #54392, ref #54397
1 parent 53dcc79 commit f158c65

File tree

12 files changed

+184
-58
lines changed

12 files changed

+184
-58
lines changed

pkg/lightning/backend/kv/session.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ func NewSession(options *encode.SessionOptions, logger log.Logger) *Session {
363363
Session: s,
364364
PlanCtxExtendedImpl: planctximpl.NewPlanCtxExtendedImpl(s),
365365
}
366-
s.tblctx = tbctximpl.NewTableContextImpl(s, s.exprCtx)
366+
s.tblctx = tbctximpl.NewTableContextImpl(s)
367367
s.txn.kvPairs = &Pairs{}
368368

369369
return s

pkg/session/bootstrap_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ func TestBootstrapWithError(t *testing.T) {
156156
}
157157
se.exprctx = contextsession.NewSessionExprContext(se)
158158
se.pctx = newPlanContextImpl(se)
159-
se.tblctx = tbctximpl.NewTableContextImpl(se, se.exprctx)
159+
se.tblctx = tbctximpl.NewTableContextImpl(se)
160160
globalVarsAccessor := variable.NewMockGlobalAccessor4Tests()
161161
se.GetSessionVars().GlobalVarsAccessor = globalVarsAccessor
162162
se.txn.init()

pkg/session/session.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3701,7 +3701,7 @@ func createSessionWithOpt(store kv.Storage, opt *Opt) (*session, error) {
37013701
s.sessionVars = variable.NewSessionVars(s)
37023702
s.exprctx = contextsession.NewSessionExprContext(s)
37033703
s.pctx = newPlanContextImpl(s)
3704-
s.tblctx = tbctximpl.NewTableContextImpl(s, s.exprctx)
3704+
s.tblctx = tbctximpl.NewTableContextImpl(s)
37053705

37063706
if opt != nil && opt.PreparedPlanCache != nil {
37073707
s.sessionPlanCache = opt.PreparedPlanCache
@@ -3764,7 +3764,7 @@ func CreateSessionWithDomain(store kv.Storage, dom *domain.Domain) (*session, er
37643764
}
37653765
s.exprctx = contextsession.NewSessionExprContext(s)
37663766
s.pctx = newPlanContextImpl(s)
3767-
s.tblctx = tbctximpl.NewTableContextImpl(s, s.exprctx)
3767+
s.tblctx = tbctximpl.NewTableContextImpl(s)
37683768
s.mu.values = make(map[fmt.Stringer]any)
37693769
s.lockedTables = make(map[int64]model.TableLockTpInfo)
37703770
domain.BindDomain(s, dom)

pkg/table/context/buffers.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,15 @@ func (b *EncodeRowBuffer) WriteMemBufferEncoded(
8080
return memBuffer.SetWithFlags(key, encoded, flags...)
8181
}
8282

83-
// GetColDataBuffer returns the buffer for column data.
84-
// TODO: make sure the inner buffer is not used outside directly.
85-
func (b *EncodeRowBuffer) GetColDataBuffer() ([]int64, []types.Datum) {
86-
return b.colIDs, b.row
83+
// EncodeBinlogRowData encodes the row data for binlog and returns the encoded row value.
84+
// The returned slice is not referenced in the buffer, so you can cache and modify them freely.
85+
func (b *EncodeRowBuffer) EncodeBinlogRowData(loc *time.Location, ec errctx.Context) ([]byte, error) {
86+
value, err := tablecodec.EncodeOldRow(loc, b.row, b.colIDs, nil, nil)
87+
err = ec.HandleError(err)
88+
if err != nil {
89+
return nil, err
90+
}
91+
return value, nil
8792
}
8893

8994
// CheckRowBuffer is used to check row constraints

pkg/table/context/buffers_test.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package context
1717
import (
1818
"testing"
1919
"time"
20+
"unsafe"
2021

2122
"github.com/pingcap/tidb/pkg/errctx"
2223
"github.com/pingcap/tidb/pkg/kv"
@@ -97,6 +98,7 @@ func TestEncodeRow(t *testing.T) {
9798
oldFormat: true,
9899
},
99100
} {
101+
// test encode and write to mem buffer
100102
cfg := RowEncodingConfig{
101103
RowEncoder: &rowcodec.Encoder{Enable: !c.oldFormat},
102104
IsRowLevelChecksumEnabled: c.rowLevelChecksum,
@@ -115,19 +117,31 @@ func TestEncodeRow(t *testing.T) {
115117

116118
memBuffer := &mockMemBuffer{}
117119
if len(c.flags) == 0 {
118-
memBuffer.On("Set", kv.Key("key1"), expectedVal).Return(nil).Once()
120+
memBuffer.On("Set", kv.Key("key1"), expectedVal).
121+
Return(nil).Once()
119122
} else {
120-
memBuffer.On("SetWithFlags", kv.Key("key1"), expectedVal, c.flags).Return(nil).Once()
123+
memBuffer.On("SetWithFlags", kv.Key("key1"), expectedVal, c.flags).
124+
Return(nil).Once()
121125
}
122126
err = buffer.WriteMemBufferEncoded(
123127
cfg, c.loc, errctx.StrictNoWarningContext,
124128
memBuffer, kv.Key("key1"), c.flags...,
125129
)
126130
require.NoError(t, err)
127131
memBuffer.AssertExpectations(t)
128-
129132
// the encoding result should be cached as a buffer
130133
require.Equal(t, expectedVal, buffer.writeStmtBufs.RowValBuf)
134+
135+
// test encode val for binlog
136+
expectedVal, err =
137+
tablecodec.EncodeOldRow(c.loc, []types.Datum{d1, d2, d3}, []int64{1, 2, 3}, nil, nil)
138+
require.NoError(t, err)
139+
encoded, err := buffer.EncodeBinlogRowData(c.loc, errctx.StrictNoWarningContext)
140+
require.NoError(t, err)
141+
require.Equal(t, expectedVal, encoded)
142+
// the encoded should not be referenced by any inner buffer
143+
require.True(t, unsafe.SliceData(encoded) != unsafe.SliceData(buffer.writeStmtBufs.RowValBuf))
144+
require.True(t, unsafe.SliceData(encoded) != unsafe.SliceData(buffer.writeStmtBufs.IndexKeyBuf))
131145
}
132146
}
133147

@@ -158,11 +172,6 @@ func TestEncodeBufferReserve(t *testing.T) {
158172
require.Equal(t, 4, len(buffer.writeStmtBufs.AddRowValues))
159173
addRowValuesCap := cap(buffer.writeStmtBufs.AddRowValues)
160174

161-
// GetColDataBuffer should return the underlying buffer
162-
colIDs, row := buffer.GetColDataBuffer()
163-
require.Equal(t, buffer.colIDs, colIDs)
164-
require.Equal(t, buffer.row, row)
165-
166175
// reset should not shrink the capacity
167176
buffer.Reset(2)
168177
require.Equal(t, 6, cap(buffer.colIDs))

pkg/table/context/table.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,17 @@ type MutateContext interface {
4747
// If the active parameter is true, call this function will wait for the pending txn
4848
// to become valid.
4949
Txn(active bool) (kv.Transaction, error)
50-
// StmtGetMutation gets the binlog mutation for current statement.
51-
StmtGetMutation(int64) *binlog.TableMutation
50+
// BinlogEnabled returns whether the binlog is enabled.
51+
BinlogEnabled() bool
52+
// GetBinlogMutation returns a `binlog.TableMutation` object for a table.
53+
GetBinlogMutation(tblID int64) *binlog.TableMutation
5254
// GetDomainInfoSchema returns the latest information schema in domain
5355
GetDomainInfoSchema() infoschema.MetaOnlyInfoSchema
5456
// TxnRecordTempTable record the temporary table to the current transaction.
5557
// This method will be called when the temporary table is modified or should allocate id in the transaction.
5658
TxnRecordTempTable(tbl *model.TableInfo) tableutil.TempTable
59+
// InRestrictedSQL returns whether the current context is used in restricted SQL.
60+
InRestrictedSQL() bool
5761
// GetRowEncodingConfig returns the RowEncodingConfig.
5862
GetRowEncodingConfig() RowEncodingConfig
5963
// GetMutateBuffers returns the MutateBuffers,

pkg/table/contextimpl/BUILD.bazel

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
load("@io_bazel_rules_go//go:def.bzl", "go_library")
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
22

33
go_library(
44
name = "contextimpl",
@@ -12,5 +12,21 @@ go_library(
1212
"//pkg/sessionctx/variable",
1313
"//pkg/table/context",
1414
"//pkg/util/tableutil",
15+
"@com_github_pingcap_tipb//go-binlog",
16+
],
17+
)
18+
19+
go_test(
20+
name = "contextimpl_test",
21+
timeout = "short",
22+
srcs = ["table_test.go"],
23+
flaky = True,
24+
deps = [
25+
":contextimpl",
26+
"//pkg/sessionctx/binloginfo",
27+
"//pkg/testkit",
28+
"//pkg/util/mock",
29+
"@com_github_pingcap_tipb//go-binlog",
30+
"@com_github_stretchr_testify//require",
1531
],
1632
)

pkg/table/contextimpl/table.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/pingcap/tidb/pkg/sessionctx/variable"
2222
"github.com/pingcap/tidb/pkg/table/context"
2323
"github.com/pingcap/tidb/pkg/util/tableutil"
24+
"github.com/pingcap/tipb/go-binlog"
2425
)
2526

2627
var _ context.MutateContext = &TableContextImpl{}
@@ -29,18 +30,16 @@ var _ context.AllocatorContext = &TableContextImpl{}
2930
// TableContextImpl is used to provide context for table operations.
3031
type TableContextImpl struct {
3132
sessionctx.Context
32-
exprCtx exprctx.ExprContext
3333
// mutateBuffers is a memory pool for table related memory allocation that aims to reuse memory
3434
// and saves allocation
3535
// The buffers are supposed to be used inside AddRecord/UpdateRecord/RemoveRecord.
3636
mutateBuffers *context.MutateBuffers
3737
}
3838

3939
// NewTableContextImpl creates a new TableContextImpl.
40-
func NewTableContextImpl(sctx sessionctx.Context, exprCtx exprctx.ExprContext) *TableContextImpl {
40+
func NewTableContextImpl(sctx sessionctx.Context) *TableContextImpl {
4141
return &TableContextImpl{
4242
Context: sctx,
43-
exprCtx: exprCtx,
4443
mutateBuffers: context.NewMutateBuffers(sctx.GetSessionVars().GetWriteStmtBufs()),
4544
}
4645
}
@@ -53,7 +52,22 @@ func (ctx *TableContextImpl) TxnRecordTempTable(tbl *model.TableInfo) tableutil.
5352

5453
// GetExprCtx returns the ExprContext
5554
func (ctx *TableContextImpl) GetExprCtx() exprctx.ExprContext {
56-
return ctx.exprCtx
55+
return ctx.Context.GetExprCtx()
56+
}
57+
58+
// InRestrictedSQL returns whether the current context is used in restricted SQL.
59+
func (ctx *TableContextImpl) InRestrictedSQL() bool {
60+
return ctx.vars().StmtCtx.InRestrictedSQL
61+
}
62+
63+
// BinlogEnabled returns whether the binlog is enabled.
64+
func (ctx *TableContextImpl) BinlogEnabled() bool {
65+
return ctx.vars().BinlogClient != nil
66+
}
67+
68+
// GetBinlogMutation returns a `binlog.TableMutation` object for a table.
69+
func (ctx *TableContextImpl) GetBinlogMutation(tblID int64) *binlog.TableMutation {
70+
return ctx.Context.StmtGetMutation(tblID)
5771
}
5872

5973
// GetRowEncodingConfig returns the RowEncodingConfig.

pkg/table/contextimpl/table_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// Copyright 2024 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package contextimpl_test
16+
17+
import (
18+
"testing"
19+
20+
"github.com/pingcap/tidb/pkg/sessionctx/binloginfo"
21+
"github.com/pingcap/tidb/pkg/table/contextimpl"
22+
"github.com/pingcap/tidb/pkg/testkit"
23+
"github.com/pingcap/tidb/pkg/util/mock"
24+
"github.com/pingcap/tipb/go-binlog"
25+
"github.com/stretchr/testify/require"
26+
)
27+
28+
func TestMutateContextImplFields(t *testing.T) {
29+
sctx := mock.NewContext()
30+
sctx.Mutations = make(map[int64]*binlog.TableMutation)
31+
ctx := contextimpl.NewTableContextImpl(sctx)
32+
// expression
33+
require.True(t, sctx.GetExprCtx() == ctx.GetExprCtx())
34+
// binlog
35+
sctx.GetSessionVars().BinlogClient = nil
36+
require.False(t, ctx.BinlogEnabled())
37+
sctx.GetSessionVars().BinlogClient = binloginfo.MockPumpsClient(&testkit.MockPumpClient{})
38+
require.True(t, ctx.BinlogEnabled())
39+
binlogMutation := ctx.GetBinlogMutation(1234)
40+
require.NotNil(t, binlogMutation)
41+
require.Same(t, sctx.StmtGetMutation(1234), binlogMutation)
42+
// restricted SQL
43+
sctx.GetSessionVars().StmtCtx.InRestrictedSQL = false
44+
require.False(t, ctx.InRestrictedSQL())
45+
sctx.GetSessionVars().StmtCtx.InRestrictedSQL = true
46+
require.True(t, ctx.InRestrictedSQL())
47+
// encoding config
48+
sctx.GetSessionVars().EnableRowLevelChecksum = true
49+
sctx.GetSessionVars().RowEncoder.Enable = true
50+
sctx.GetSessionVars().InRestrictedSQL = false
51+
cfg := ctx.GetRowEncodingConfig()
52+
require.True(t, cfg.IsRowLevelChecksumEnabled)
53+
require.Equal(t, sctx.GetSessionVars().IsRowLevelChecksumEnabled(), cfg.IsRowLevelChecksumEnabled)
54+
require.True(t, cfg.IsRowLevelChecksumEnabled)
55+
require.Same(t, &sctx.GetSessionVars().RowEncoder, cfg.RowEncoder)
56+
sctx.GetSessionVars().RowEncoder.Enable = false
57+
cfg = ctx.GetRowEncodingConfig()
58+
require.False(t, cfg.IsRowLevelChecksumEnabled)
59+
require.Equal(t, sctx.GetSessionVars().IsRowLevelChecksumEnabled(), cfg.IsRowLevelChecksumEnabled)
60+
require.Same(t, &sctx.GetSessionVars().RowEncoder, cfg.RowEncoder)
61+
require.False(t, cfg.IsRowLevelChecksumEnabled)
62+
sctx.GetSessionVars().RowEncoder.Enable = true
63+
sctx.GetSessionVars().InRestrictedSQL = true
64+
require.Equal(t, sctx.GetSessionVars().IsRowLevelChecksumEnabled(), cfg.IsRowLevelChecksumEnabled)
65+
require.False(t, cfg.IsRowLevelChecksumEnabled)
66+
sctx.GetSessionVars().InRestrictedSQL = false
67+
sctx.GetSessionVars().EnableRowLevelChecksum = false
68+
require.Equal(t, sctx.GetSessionVars().IsRowLevelChecksumEnabled(), cfg.IsRowLevelChecksumEnabled)
69+
// mutate buffers
70+
require.NotNil(t, ctx.GetMutateBuffers())
71+
}

pkg/table/tables/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ go_library(
3434
"//pkg/sessionctx/variable",
3535
"//pkg/statistics",
3636
"//pkg/table",
37+
"//pkg/table/context",
3738
"//pkg/tablecodec",
3839
"//pkg/types",
3940
"//pkg/util",

0 commit comments

Comments
 (0)