Skip to content
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
3a07c33
Initial commit for Import into adaption to fts.
OliverS929 Jun 16, 2025
fbf9e07
Modifications to include file header, tici.proto updates, and code style
OliverS929 Jun 24, 2025
6fd2b3f
Code Style fix. Not involving meta in external package.
OliverS929 Jun 24, 2025
96d2377
Add missing license.
OliverS929 Jun 24, 2025
f143800
Fix import engine error propagation.
OliverS929 Jun 24, 2025
74736bc
Add version number to the very beginning of file header. And Use
OliverS929 Jun 24, 2025
0556ac4
Log name fixes.
OliverS929 Jun 24, 2025
946109c
Use the correct package name.
OliverS929 Jun 24, 2025
f093f00
Add isclustered in TableInfo.
OliverS929 Jun 30, 2025
66229de
Adding a check to make sure we don't write to tici in index engine.
OliverS929 Jul 1, 2025
1d4bb68
Merge remote-tracking branch 'pingcap/feature/fts'
OliverS929 Jul 1, 2025
c3449f0
Regenate the proto files.
OliverS929 Jul 1, 2025
48b68a6
nogo fix.
OliverS929 Jul 1, 2025
e99f771
Rerun mockgen.
OliverS929 Jul 1, 2025
c979b1f
Regen gomock.
OliverS929 Jul 1, 2025
9e5412b
Further rerun mockgen.
OliverS929 Jul 1, 2025
c549b6e
Fix UT failures due to interface changes.
OliverS929 Jul 1, 2025
67576de
Fix UT.
OliverS929 Jul 1, 2025
bfb31af
Further fix UT.
OliverS929 Jul 1, 2025
885f1ef
cherry-pick "ddl: get scatter variable from executor session context …
River2000i May 28, 2025
3f8b2a3
Revert "cherry-pick "ddl: get scatter variable from executor session …
OliverS929 Jul 2, 2025
44d9caa
Merge remote-tracking branch 'pingcap/feature/fts' into feature/fts_i…
OliverS929 Jul 11, 2025
2d7298c
Restructure all tici related implementations into pkg/tici, and add full
OliverS929 Jul 16, 2025
2cf1dfe
Redo bazel prepare.
OliverS929 Jul 16, 2025
1cf9ccd
lint revive fix.
OliverS929 Jul 16, 2025
f154934
Make writable atomic.
OliverS929 Jul 17, 2025
124d3ca
Do atomic adaption in ut as well.
OliverS929 Jul 17, 2025
c4a0476
Add goleak ignore "goleak.IgnoreTopFunction("go.opencensus.io/stats/v…
OliverS929 Jul 17, 2025
1a11d39
Add goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).s…
OliverS929 Jul 17, 2025
688fdd3
Fix comments.
OliverS929 Jul 20, 2025
e897010
Fix comments.
OliverS929 Jul 20, 2025
ee98127
Fix comment.
OliverS929 Jul 25, 2025
55b511c
Fix comment.
OliverS929 Jul 25, 2025
b111662
Fix a comment.
OliverS929 Jul 25, 2025
f79352e
Merge remote-tracking branch 'pingcap/feature/fts' into feature/fts_i…
OliverS929 Jul 25, 2025
9c590f2
Fix compiling issues due to changes in master branch.
OliverS929 Jul 28, 2025
7fbdf13
Fix revive.
OliverS929 Jul 28, 2025
23bcc01
Fix bazel.
OliverS929 Jul 28, 2025
ae863a3
Fix comments.
OliverS929 Jul 30, 2025
ad0aa76
Add logger in unit test as well.
OliverS929 Jul 30, 2025
aea1fae
Fix bazel.
OliverS929 Jul 30, 2025
d941ffe
Pass testing to zaptest logger.
OliverS929 Jul 30, 2025
a9e7c71
Fix comments.
OliverS929 Jul 30, 2025
3dfded6
test: remove unnecessary gc setting after update pd client (#62611)
3pointer Jul 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
284 changes: 144 additions & 140 deletions br/pkg/mock/backend.go

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions lightning/pkg/importer/table_import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ func (s *tableRestoreSuite) TestImportKVSuccess() {
CloseEngine(ctx, nil, engineUUID).
Return(nil)
mockBackend.EXPECT().
ImportEngine(ctx, engineUUID, gomock.Any(), gomock.Any()).
ImportEngine(ctx, engineUUID, gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil)
mockBackend.EXPECT().
CleanupEngine(ctx, engineUUID).
Expand Down Expand Up @@ -916,7 +916,7 @@ func (s *tableRestoreSuite) TestImportKVFailure() {
CloseEngine(ctx, nil, engineUUID).
Return(nil)
mockBackend.EXPECT().
ImportEngine(ctx, engineUUID, gomock.Any(), gomock.Any()).
ImportEngine(ctx, engineUUID, gomock.Any(), gomock.Any(), gomock.Any()).
Return(errors.Annotate(context.Canceled, "fake import error"))

closedEngine, err := importer.UnsafeCloseEngineWithUUID(ctx, nil, "tag", engineUUID, 0)
Expand Down Expand Up @@ -987,7 +987,7 @@ func (s *tableRestoreSuite) TestTableRestoreMetrics() {
backendObj := mock.NewMockBackend(controller)
backendObj.EXPECT().OpenEngine(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
backendObj.EXPECT().CloseEngine(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
backendObj.EXPECT().ImportEngine(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
backendObj.EXPECT().ImportEngine(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
backendObj.EXPECT().CleanupEngine(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
backendObj.EXPECT().ShouldPostProcess().Return(false).AnyTimes()
backendObj.EXPECT().LocalWriter(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockEngineWriter, nil).AnyTimes()
Expand Down
3 changes: 2 additions & 1 deletion pkg/ddl/backfilling_import_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (e *cloudImportExecutor) RunSubtask(ctx context.Context, subtask *proto.Sub
}

_, engineUUID := backend.MakeUUID(e.ptbl.Meta().Name.L, idxID)
engineID := int32(common.IndexEngineID)

all := external.SortedKVMeta{}
for _, g := range sm.MetaGroups {
Expand Down Expand Up @@ -135,7 +136,7 @@ func (e *cloudImportExecutor) RunSubtask(ctx context.Context, subtask *proto.Sub
if err != nil {
return err
}
err = localBackend.ImportEngine(ctx, engineUUID, int64(config.SplitRegionSize), int64(config.SplitRegionKeys))
err = localBackend.ImportEngine(ctx, engineUUID, engineID, int64(config.SplitRegionSize), int64(config.SplitRegionKeys))
failpoint.Inject("mockCloudImportRunSubtaskError", func(_ failpoint.Value) {
err = context.DeadlineExceeded
})
Expand Down
12 changes: 11 additions & 1 deletion pkg/disttask/importinto/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,16 @@ func (e *writeAndIngestStepExecutor) RunSubtask(ctx context.Context, subtask *pr
}()

_, engineUUID := backend.MakeUUID("", subtask.ID)

// We specify engineID as a tag so that the callee can determine
// whether the call is from a data engine or an index engine.
var engineID int32
if sm.KVGroup == dataKVGroup {
engineID = int32(0)
} else {
engineID = int32(common.IndexEngineID)
}

localBackend := e.tableImporter.Backend()
// compatible with old version task meta
jobKeys := sm.RangeJobKeys
Expand Down Expand Up @@ -557,7 +567,7 @@ func (e *writeAndIngestStepExecutor) RunSubtask(ctx context.Context, subtask *pr
if err != nil {
return err
}
err = localBackend.ImportEngine(ctx, engineUUID, int64(config.SplitRegionSize), int64(config.SplitRegionKeys))
err = localBackend.ImportEngine(ctx, engineUUID, engineID, int64(config.SplitRegionSize), int64(config.SplitRegionKeys))
if err != nil {
return errors.Trace(err)
}
Expand Down
3 changes: 0 additions & 3 deletions pkg/domain/infosync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ go_library(
"region.go",
"resource_manager_client.go",
"schedule_manager.go",
"tici_manager_client.go",
"tiflash_manager.go",
],
importpath = "github.com/pingcap/tidb/pkg/domain/infosync",
Expand Down Expand Up @@ -59,8 +58,6 @@ go_library(
"@com_github_tikv_pd_client//opt",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_client_v3//concurrency",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//credentials/insecure",
"@org_uber_go_zap//:zap",
],
)
Expand Down
35 changes: 33 additions & 2 deletions pkg/domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/pingcap/tidb/pkg/resourcegroup"
"github.com/pingcap/tidb/pkg/session/cursor"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
"github.com/pingcap/tidb/pkg/tici"
util2 "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/engine"
Expand Down Expand Up @@ -1345,11 +1346,11 @@ func GetTiFlashStoresStat(ctx context.Context) (*pdhttp.StoresInfo, error) {

// CreateFulltextIndex create fulltext infex on TiCI.
func CreateFulltextIndex(ctx context.Context, tblInfo *model.TableInfo, indexInfo *model.IndexInfo, schemaName string) error {
ticiManager, err := NewTiCIManager("0.0.0.0", "50061")
ticiManager, err := tici.NewTiCIManager("0.0.0.0", "50061")
if err != nil {
return err
}
defer ticiManager.conn.Close()
defer ticiManager.Conn.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
defer ticiManager.Conn.Close()
defer ticiManager.Close()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This follows the original implementation in PR #60813. A related discussion is ongoing in PR #61406 regarding a potential modification to this part. I’ll keep it unchanged in this PR and leave any improvement to be handled separately once the discussion there reaches a conclusion.

return ticiManager.CreateFulltextIndex(ctx, tblInfo, indexInfo, schemaName)
}

Expand Down Expand Up @@ -1637,3 +1638,33 @@ func (is *InfoSyncer) setDynamicServerInfo(ds *DynamicServerInfo) {
}
is.info.Store(newInfo)
}

// GetCloudStoragePath requests the S3 path from TiCI Meta Service for a baseline shard upload.
func GetCloudStoragePath(
ctx context.Context,
tblInfo *model.TableInfo,
indexInfo *model.IndexInfo,
schemaName string,
lowerBound, upperBound []byte,
) (string, error) {
ticiManager, err := tici.NewTiCIManager("0.0.0.0", "50061")
if err != nil {
return "", err
}
defer ticiManager.Conn.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doitto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

return ticiManager.GetCloudStoragePath(ctx, tblInfo, indexInfo, schemaName, lowerBound, upperBound)
}

// MarkTableUploadFinished notifies TiCI Meta Service that the whole table/index upload is finished.
func MarkTableUploadFinished(
ctx context.Context,
tableID int64,
indexID int64,
) error {
ticiManager, err := tici.NewTiCIManager("0.0.0.0", "50061")
if err != nil {
return err
}
defer ticiManager.Conn.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doitto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

return ticiManager.MarkTableUploadFinished(ctx, tableID, indexID)
}
109 changes: 0 additions & 109 deletions pkg/domain/infosync/tici_manager_client.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remember move TiCI code out of infosync, they should be put into a separate pkg

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already did. Addressed in 2d7298c.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add UT for this file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PTAL 2d7298c.

This file was deleted.

2 changes: 2 additions & 0 deletions pkg/executor/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ func NewTableImporter(
return nil, err
}

localBackend.SetTiCIWriterGroup(ctx, e.Table.Meta(), e.DBName)

return &TableImporter{
LoadDataController: e,
id: id,
Expand Down
4 changes: 2 additions & 2 deletions pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ type Backend interface {
// ImportEngine imports engine data to the backend. If it returns ErrDuplicateDetected,
// it means there is duplicate detected. For this situation, all data in the engine must be imported.
// It's safe to reset or cleanup this engine.
ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error
ImportEngine(ctx context.Context, engineUUID uuid.UUID, engineID int32, regionSplitSize, regionSplitKeys int64) error

CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error

Expand Down Expand Up @@ -394,7 +394,7 @@ func (engine *ClosedEngine) Import(ctx context.Context, regionSplitSize, regionS

for i := range importMaxRetryTimes {
task := engine.logger.With(zap.Int("retryCnt", i)).Begin(zap.InfoLevel, "import")
err = engine.backend.ImportEngine(ctx, engine.uuid, regionSplitSize, regionSplitKeys)
err = engine.backend.ImportEngine(ctx, engine.uuid, engine.id, regionSplitSize, regionSplitKeys)
if !common.IsRetryableError(err) {
if common.ErrFoundDuplicateKeys.Equal(err) {
task.End(zap.WarnLevel, err)
Expand Down
10 changes: 5 additions & 5 deletions pkg/lightning/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestOpenCloseImportCleanUpEngine(t *testing.T) {
Return(nil).
After(openCall)
importCall := s.mockBackend.EXPECT().
ImportEngine(ctx, engineUUID, gomock.Any(), gomock.Any()).
ImportEngine(ctx, engineUUID, gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).
After(closeCall)
s.mockBackend.EXPECT().
Expand Down Expand Up @@ -276,7 +276,7 @@ func TestImportFailedNoRetry(t *testing.T) {

s.mockBackend.EXPECT().CloseEngine(ctx, nil, gomock.Any()).Return(nil)
s.mockBackend.EXPECT().
ImportEngine(ctx, gomock.Any(), gomock.Any(), gomock.Any()).
ImportEngine(ctx, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(errors.Annotate(context.Canceled, "fake unrecoverable import error"))

closedEngine, err := s.engineMgr.UnsafeCloseEngine(ctx, nil, "`db`.`table`", 1)
Expand All @@ -294,7 +294,7 @@ func TestImportFailedWithRetry(t *testing.T) {

s.mockBackend.EXPECT().CloseEngine(ctx, nil, gomock.Any()).Return(nil)
s.mockBackend.EXPECT().
ImportEngine(ctx, gomock.Any(), gomock.Any(), gomock.Any()).
ImportEngine(ctx, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(errors.Annotate(driver.ErrBadConn, "fake recoverable import error")).
MinTimes(2)
s.mockBackend.EXPECT().RetryImportDelay().Return(time.Duration(0)).AnyTimes()
Expand All @@ -314,10 +314,10 @@ func TestImportFailedRecovered(t *testing.T) {

s.mockBackend.EXPECT().CloseEngine(ctx, nil, gomock.Any()).Return(nil)
s.mockBackend.EXPECT().
ImportEngine(ctx, gomock.Any(), gomock.Any(), gomock.Any()).
ImportEngine(ctx, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(gmysql.ErrInvalidConn)
s.mockBackend.EXPECT().
ImportEngine(ctx, gomock.Any(), gomock.Any(), gomock.Any()).
ImportEngine(ctx, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil)
s.mockBackend.EXPECT().RetryImportDelay().Return(time.Duration(0)).AnyTimes()

Expand Down
1 change: 1 addition & 0 deletions pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ go_library(
"//pkg/table",
"//pkg/table/tables",
"//pkg/tablecodec",
"//pkg/tici",
"//pkg/util",
"//pkg/util/codec",
"//pkg/util/compress",
Expand Down
20 changes: 20 additions & 0 deletions pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/pingcap/tidb/pkg/lightning/tikv"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/tici"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tidb/pkg/util/engine"
Expand Down Expand Up @@ -531,6 +532,8 @@ type Backend struct {
logger log.Logger

nextgenHTTPCli *http.Client

ticiWriteGroup *tici.DataWriterGroup // TiCI writer group
}

var _ DiskUsage = (*Backend)(nil)
Expand Down Expand Up @@ -1145,6 +1148,7 @@ func checkDiskAvail(ctx context.Context, store *pdhttp.StoreInfo) error {
func (local *Backend) ImportEngine(
ctx context.Context,
engineUUID uuid.UUID,
engineID int32,
regionSplitSize, regionSplitKeys int64,
) error {
kvRegionSplitSize, kvRegionSplitKeys, err := GetRegionSplitSizeKeys(ctx, local.pdCli, local.tls)
Expand Down Expand Up @@ -1173,6 +1177,9 @@ func (local *Backend) ImportEngine(
localEngine.regionSplitKeyCnt = regionSplitKeys
e = localEngine
}

tici.SetTiCIDataWriterGroupWritable(ctx, local.ticiWriteGroup, engineUUID, engineID)

lfTotalSize, lfLength := e.KVStatistics()
if lfTotalSize == 0 {
// engine is empty, this is likes because it's a index engine but the table contains no index
Expand Down Expand Up @@ -1454,6 +1461,14 @@ func (local *Backend) doImport(
if err != nil && !common.IsContextCanceledError(err) {
log.FromContext(ctx).Error("do import meets error", zap.Error(err))
}

if err == nil && local.ticiWriteGroup != nil {
// If the import is done, we can close the write group.
if err2 := local.ticiWriteGroup.MarkTableUploadFinished(ctx); err2 != nil {
err = err2 // mark upload itself failed
}
}

return err
}

Expand Down Expand Up @@ -1750,3 +1765,8 @@ func GetRegionSplitSizeKeys(ctx context.Context, cli pd.Client, tls *common.TLS)
}
return 0, 0, errors.New("get region split size and keys failed")
}

// SetTiCIWriterGroup initializes the ticiWriteGroup field for the Backend using the given table info and schema.
func (local *Backend) SetTiCIWriterGroup(ctx context.Context, tblInfo *model.TableInfo, schema string) {
local.ticiWriteGroup = tici.NewTiCIDataWriterGroup(ctx, tblInfo, schema)
}
Loading