Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 2 additions & 9 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2206,13 +2206,6 @@ def go_deps():
sum = "h1:0Vihzu20St42/UDsvZGdNE6jak7oi/UOeMzwMPHkgFY=",
version = "v3.2.0+incompatible",
)
go_repository(
name = "com_github_jarcoal_httpmock",
build_file_proto_mode = "disable",
importpath = "github.com/jarcoal/httpmock",
sum = "h1:gSvTxxFR/MEMfsGrvRbdfpRUMBStovlSRLw0Ep1bwwc=",
version = "v1.2.0",
)

go_repository(
name = "com_github_jcmturner_aescts_v2",
Expand Down Expand Up @@ -4123,8 +4116,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:XpdZrei86oIrRjXbqvlQh23TdHXVtSxWmsxxwy/Zgc0=",
version = "v2.0.7-0.20230328084104-ea13e9700259",
sum = "h1:lqlizij6n/v4jx1Ph2rLF0E/gRJUg7kz3VmO6P5Y1e0=",
version = "v2.0.7-0.20230406064257-1ec0ff5bf443",
)
go_repository(
name = "com_github_tikv_pd",
Expand Down
1 change: 0 additions & 1 deletion executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,6 @@ go_test(
"//util/topsql/state",
"@com_github_golang_protobuf//proto",
"@com_github_gorilla_mux//:mux",
"@com_github_jarcoal_httpmock//:httpmock",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_fn//:fn",
Expand Down
106 changes: 30 additions & 76 deletions executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"io"
"math"
"net/http"
"strconv"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/tidb/ddl/label"
"github.com/pingcap/tidb/ddl/placement"
Expand All @@ -43,7 +42,6 @@ import (
"github.com/pingcap/tidb/parser/charset"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/privilege/privileges"
Expand Down Expand Up @@ -73,6 +71,8 @@ import (
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tidb/util/syncutil"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv/txnlock"
"go.uber.org/zap"
"golang.org/x/exp/slices"
Expand Down Expand Up @@ -2989,7 +2989,7 @@ type TiFlashSystemTableRetriever struct {
outputCols []*model.ColumnInfo
instanceCount int
instanceIdx int
instanceInfos []tiflashInstanceInfo
instanceIds []string
rowIdx int
retrieved bool
initialized bool
Expand All @@ -3012,7 +3012,7 @@ func (e *TiFlashSystemTableRetriever) retrieve(ctx context.Context, sctx session
}

for {
rows, err := e.dataForTiFlashSystemTables(sctx, e.extractor.TiDBDatabases, e.extractor.TiDBTables)
rows, err := e.dataForTiFlashSystemTables(ctx, sctx, e.extractor.TiDBDatabases, e.extractor.TiDBTables)
if err != nil {
return nil, err
}
Expand All @@ -3022,11 +3022,6 @@ func (e *TiFlashSystemTableRetriever) retrieve(ctx context.Context, sctx session
}
}

type tiflashInstanceInfo struct {
id string
url string
}

func (e *TiFlashSystemTableRetriever) initialize(sctx sessionctx.Context, tiflashInstances set.StringSet) error {
storeInfo, err := infoschema.GetStoreServerInfo(sctx)
if err != nil {
Expand All @@ -3045,53 +3040,8 @@ func (e *TiFlashSystemTableRetriever) initialize(sctx sessionctx.Context, tiflas
if len(hostAndStatusPort) != 2 {
return errors.Errorf("node status addr: %s format illegal", info.StatusAddr)
}
// fetch tiflash config
configURL := fmt.Sprintf("%s://%s/config", util.InternalHTTPSchema(), info.StatusAddr)
resp, err := util.InternalHTTPClient().Get(configURL)
if err != nil {
sctx.GetSessionVars().StmtCtx.AppendWarning(err)
continue
}
if resp.StatusCode != http.StatusOK {
return errors.Errorf("request %s failed: %s", configURL, resp.Status)
}
// parse http_port or https_port from the fetched config
var nestedConfig map[string]interface{}
if err = json.NewDecoder(resp.Body).Decode(&nestedConfig); err != nil {
return err
}
if engineStoreConfig, ok := nestedConfig["engine-store"]; ok {
foundPort := false
var port interface{}
portProtocol := ""
if httpPort, ok := engineStoreConfig.(map[string]interface{})["http_port"]; ok {
foundPort = true
port = httpPort
portProtocol = "http"
} else if httpsPort, ok := engineStoreConfig.(map[string]interface{})["https_port"]; ok {
foundPort = true
port = httpsPort
portProtocol = "https"
}
if !foundPort {
return errors.Errorf("engine-store.http_port/https_port not found in server %s", info.Address)
}
switch portValue := port.(type) {
case float64:
e.instanceInfos = append(e.instanceInfos, tiflashInstanceInfo{
id: info.Address,
url: fmt.Sprintf("%s://%s:%d", portProtocol, hostAndStatusPort[0], int(portValue)),
})
e.instanceCount += 1
default:
return errors.Errorf("engine-store.http_port value(%p) unexpected in server %s", port, info.Address)
}
} else {
return errors.Errorf("engine-store config not found in server %s", info.Address)
}
if err = resp.Body.Close(); err != nil {
return err
}
e.instanceIds = append(e.instanceIds, info.Address)
e.instanceCount += 1
}
e.initialized = true
return nil
Expand All @@ -3107,7 +3057,7 @@ type tiFlashSQLExecuteResponse struct {
Data [][]interface{} `json:"data"`
}

func (e *TiFlashSystemTableRetriever) dataForTiFlashSystemTables(ctx sessionctx.Context, tidbDatabases string, tidbTables string) ([][]types.Datum, error) {
func (e *TiFlashSystemTableRetriever) dataForTiFlashSystemTables(ctx context.Context, sctx sessionctx.Context, tidbDatabases string, tidbTables string) ([][]types.Datum, error) {
maxCount := 1024
targetTable := strings.ToLower(strings.Replace(e.table.Name.O, "TIFLASH", "DT", 1))
var filters []string
Expand All @@ -3122,29 +3072,33 @@ func (e *TiFlashSystemTableRetriever) dataForTiFlashSystemTables(ctx sessionctx.
sql = fmt.Sprintf("%s WHERE %s", sql, strings.Join(filters, " AND "))
}
sql = fmt.Sprintf("%s LIMIT %d, %d", sql, e.rowIdx, maxCount)
instanceInfo := e.instanceInfos[e.instanceIdx]
url := instanceInfo.url
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, errors.Trace(err)
request := tikvrpc.Request{
Type: tikvrpc.CmdGetTiFlashSystemTable,
StoreTp: tikvrpc.TiFlash,
Req: &kvrpcpb.TiFlashSystemTableRequest{
Sql: sql,
},
}
q := req.URL.Query()
q.Add("query", sql)
q.Add("default_format", "JSONCompact")
req.URL.RawQuery = q.Encode()
resp, err := util.InternalHTTPClient().Do(req)
if err != nil {
return nil, errors.Trace(err)

store := sctx.GetStore()
tikvStore, ok := store.(tikv.Storage)
if !ok {
return nil, errors.New("Get tiflash system tables can only run with tikv compatible storage")
}
body, err := io.ReadAll(resp.Body)
terror.Log(resp.Body.Close())
// send request to tiflash, timeout is 1s
instanceID := e.instanceIds[e.instanceIdx]
resp, err := tikvStore.GetTiKVClient().SendRequest(ctx, instanceID, &request, time.Second)
if err != nil {
return nil, errors.Trace(err)
}
var result tiFlashSQLExecuteResponse
err = json.Unmarshal(body, &result)
if err != nil {
return nil, errors.Wrapf(err, "Failed to decode JSON from TiFlash")
if tiflashResp, ok := resp.Resp.(*kvrpcpb.TiFlashSystemTableResponse); ok {
err = json.Unmarshal(tiflashResp.Data, &result)
if err != nil {
return nil, errors.Wrapf(err, "Failed to decode JSON from TiFlash")
}
} else {
return nil, errors.Errorf("Unexpected response type: %T", resp.Resp)
}

// Map result columns back to our columns. It is possible that some columns cannot be
Expand Down Expand Up @@ -3194,7 +3148,7 @@ func (e *TiFlashSystemTableRetriever) dataForTiFlashSystemTables(ctx sessionctx.
return nil, errors.Errorf("Meet column of unknown type %v", column)
}
}
outputRow[len(e.outputCols)-1].SetString(instanceInfo.id, mysql.DefaultCollationName)
outputRow[len(e.outputCols)-1].SetString(instanceID, mysql.DefaultCollationName)
outputRows = append(outputRows, outputRow)
}
e.rowIdx += len(outputRows)
Expand Down
Loading