Skip to content

Commit d0e59d7

Browse files
executor: return TiFlash system table by grpc rather than http (#42720)
close #42857
1 parent 7e0c146 commit d0e59d7

File tree

6 files changed

+120
-152
lines changed

6 files changed

+120
-152
lines changed

DEPS.bzl

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2206,13 +2206,6 @@ def go_deps():
22062206
sum = "h1:0Vihzu20St42/UDsvZGdNE6jak7oi/UOeMzwMPHkgFY=",
22072207
version = "v3.2.0+incompatible",
22082208
)
2209-
go_repository(
2210-
name = "com_github_jarcoal_httpmock",
2211-
build_file_proto_mode = "disable",
2212-
importpath = "github.com/jarcoal/httpmock",
2213-
sum = "h1:gSvTxxFR/MEMfsGrvRbdfpRUMBStovlSRLw0Ep1bwwc=",
2214-
version = "v1.2.0",
2215-
)
22162209

22172210
go_repository(
22182211
name = "com_github_jcmturner_aescts_v2",
@@ -4123,8 +4116,8 @@ def go_deps():
41234116
name = "com_github_tikv_client_go_v2",
41244117
build_file_proto_mode = "disable_global",
41254118
importpath = "github.com/tikv/client-go/v2",
4126-
sum = "h1:XpdZrei86oIrRjXbqvlQh23TdHXVtSxWmsxxwy/Zgc0=",
4127-
version = "v2.0.7-0.20230328084104-ea13e9700259",
4119+
sum = "h1:lqlizij6n/v4jx1Ph2rLF0E/gRJUg7kz3VmO6P5Y1e0=",
4120+
version = "v2.0.7-0.20230406064257-1ec0ff5bf443",
41284121
)
41294122
go_repository(
41304123
name = "com_github_tikv_pd",

executor/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,6 @@ go_test(
441441
"//util/topsql/state",
442442
"@com_github_golang_protobuf//proto",
443443
"@com_github_gorilla_mux//:mux",
444-
"@com_github_jarcoal_httpmock//:httpmock",
445444
"@com_github_pingcap_errors//:errors",
446445
"@com_github_pingcap_failpoint//:failpoint",
447446
"@com_github_pingcap_fn//:fn",

executor/infoschema_reader.go

Lines changed: 30 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,15 @@ import (
2020
"encoding/hex"
2121
"encoding/json"
2222
"fmt"
23-
"io"
2423
"math"
25-
"net/http"
2624
"strconv"
2725
"strings"
2826
"time"
2927

3028
"github.com/pingcap/errors"
3129
"github.com/pingcap/failpoint"
3230
"github.com/pingcap/kvproto/pkg/deadlock"
31+
"github.com/pingcap/kvproto/pkg/kvrpcpb"
3332
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
3433
"github.com/pingcap/tidb/ddl/label"
3534
"github.com/pingcap/tidb/ddl/placement"
@@ -43,7 +42,6 @@ import (
4342
"github.com/pingcap/tidb/parser/charset"
4443
"github.com/pingcap/tidb/parser/model"
4544
"github.com/pingcap/tidb/parser/mysql"
46-
"github.com/pingcap/tidb/parser/terror"
4745
plannercore "github.com/pingcap/tidb/planner/core"
4846
"github.com/pingcap/tidb/privilege"
4947
"github.com/pingcap/tidb/privilege/privileges"
@@ -73,6 +71,8 @@ import (
7371
"github.com/pingcap/tidb/util/sqlexec"
7472
"github.com/pingcap/tidb/util/stringutil"
7573
"github.com/pingcap/tidb/util/syncutil"
74+
"github.com/tikv/client-go/v2/tikv"
75+
"github.com/tikv/client-go/v2/tikvrpc"
7676
"github.com/tikv/client-go/v2/txnkv/txnlock"
7777
"go.uber.org/zap"
7878
"golang.org/x/exp/slices"
@@ -3001,7 +3001,7 @@ type TiFlashSystemTableRetriever struct {
30013001
outputCols []*model.ColumnInfo
30023002
instanceCount int
30033003
instanceIdx int
3004-
instanceInfos []tiflashInstanceInfo
3004+
instanceIds []string
30053005
rowIdx int
30063006
retrieved bool
30073007
initialized bool
@@ -3024,7 +3024,7 @@ func (e *TiFlashSystemTableRetriever) retrieve(ctx context.Context, sctx session
30243024
}
30253025

30263026
for {
3027-
rows, err := e.dataForTiFlashSystemTables(sctx, e.extractor.TiDBDatabases, e.extractor.TiDBTables)
3027+
rows, err := e.dataForTiFlashSystemTables(ctx, sctx, e.extractor.TiDBDatabases, e.extractor.TiDBTables)
30283028
if err != nil {
30293029
return nil, err
30303030
}
@@ -3034,11 +3034,6 @@ func (e *TiFlashSystemTableRetriever) retrieve(ctx context.Context, sctx session
30343034
}
30353035
}
30363036

3037-
type tiflashInstanceInfo struct {
3038-
id string
3039-
url string
3040-
}
3041-
30423037
func (e *TiFlashSystemTableRetriever) initialize(sctx sessionctx.Context, tiflashInstances set.StringSet) error {
30433038
storeInfo, err := infoschema.GetStoreServerInfo(sctx)
30443039
if err != nil {
@@ -3057,53 +3052,8 @@ func (e *TiFlashSystemTableRetriever) initialize(sctx sessionctx.Context, tiflas
30573052
if len(hostAndStatusPort) != 2 {
30583053
return errors.Errorf("node status addr: %s format illegal", info.StatusAddr)
30593054
}
3060-
// fetch tiflash config
3061-
configURL := fmt.Sprintf("%s://%s/config", util.InternalHTTPSchema(), info.StatusAddr)
3062-
resp, err := util.InternalHTTPClient().Get(configURL)
3063-
if err != nil {
3064-
sctx.GetSessionVars().StmtCtx.AppendWarning(err)
3065-
continue
3066-
}
3067-
if resp.StatusCode != http.StatusOK {
3068-
return errors.Errorf("request %s failed: %s", configURL, resp.Status)
3069-
}
3070-
// parse http_port or https_port from the fetched config
3071-
var nestedConfig map[string]interface{}
3072-
if err = json.NewDecoder(resp.Body).Decode(&nestedConfig); err != nil {
3073-
return err
3074-
}
3075-
if engineStoreConfig, ok := nestedConfig["engine-store"]; ok {
3076-
foundPort := false
3077-
var port interface{}
3078-
portProtocol := ""
3079-
if httpPort, ok := engineStoreConfig.(map[string]interface{})["http_port"]; ok {
3080-
foundPort = true
3081-
port = httpPort
3082-
portProtocol = "http"
3083-
} else if httpsPort, ok := engineStoreConfig.(map[string]interface{})["https_port"]; ok {
3084-
foundPort = true
3085-
port = httpsPort
3086-
portProtocol = "https"
3087-
}
3088-
if !foundPort {
3089-
return errors.Errorf("engine-store.http_port/https_port not found in server %s", info.Address)
3090-
}
3091-
switch portValue := port.(type) {
3092-
case float64:
3093-
e.instanceInfos = append(e.instanceInfos, tiflashInstanceInfo{
3094-
id: info.Address,
3095-
url: fmt.Sprintf("%s://%s:%d", portProtocol, hostAndStatusPort[0], int(portValue)),
3096-
})
3097-
e.instanceCount += 1
3098-
default:
3099-
return errors.Errorf("engine-store.http_port value(%p) unexpected in server %s", port, info.Address)
3100-
}
3101-
} else {
3102-
return errors.Errorf("engine-store config not found in server %s", info.Address)
3103-
}
3104-
if err = resp.Body.Close(); err != nil {
3105-
return err
3106-
}
3055+
e.instanceIds = append(e.instanceIds, info.Address)
3056+
e.instanceCount += 1
31073057
}
31083058
e.initialized = true
31093059
return nil
@@ -3119,7 +3069,7 @@ type tiFlashSQLExecuteResponse struct {
31193069
Data [][]interface{} `json:"data"`
31203070
}
31213071

3122-
func (e *TiFlashSystemTableRetriever) dataForTiFlashSystemTables(ctx sessionctx.Context, tidbDatabases string, tidbTables string) ([][]types.Datum, error) {
3072+
func (e *TiFlashSystemTableRetriever) dataForTiFlashSystemTables(ctx context.Context, sctx sessionctx.Context, tidbDatabases string, tidbTables string) ([][]types.Datum, error) {
31233073
maxCount := 1024
31243074
targetTable := strings.ToLower(strings.Replace(e.table.Name.O, "TIFLASH", "DT", 1))
31253075
var filters []string
@@ -3134,29 +3084,33 @@ func (e *TiFlashSystemTableRetriever) dataForTiFlashSystemTables(ctx sessionctx.
31343084
sql = fmt.Sprintf("%s WHERE %s", sql, strings.Join(filters, " AND "))
31353085
}
31363086
sql = fmt.Sprintf("%s LIMIT %d, %d", sql, e.rowIdx, maxCount)
3137-
instanceInfo := e.instanceInfos[e.instanceIdx]
3138-
url := instanceInfo.url
3139-
req, err := http.NewRequest(http.MethodGet, url, nil)
3140-
if err != nil {
3141-
return nil, errors.Trace(err)
3087+
request := tikvrpc.Request{
3088+
Type: tikvrpc.CmdGetTiFlashSystemTable,
3089+
StoreTp: tikvrpc.TiFlash,
3090+
Req: &kvrpcpb.TiFlashSystemTableRequest{
3091+
Sql: sql,
3092+
},
31423093
}
3143-
q := req.URL.Query()
3144-
q.Add("query", sql)
3145-
q.Add("default_format", "JSONCompact")
3146-
req.URL.RawQuery = q.Encode()
3147-
resp, err := util.InternalHTTPClient().Do(req)
3148-
if err != nil {
3149-
return nil, errors.Trace(err)
3094+
3095+
store := sctx.GetStore()
3096+
tikvStore, ok := store.(tikv.Storage)
3097+
if !ok {
3098+
return nil, errors.New("Get tiflash system tables can only run with tikv compatible storage")
31503099
}
3151-
body, err := io.ReadAll(resp.Body)
3152-
terror.Log(resp.Body.Close())
3100+
// send request to tiflash, timeout is 1s
3101+
instanceID := e.instanceIds[e.instanceIdx]
3102+
resp, err := tikvStore.GetTiKVClient().SendRequest(ctx, instanceID, &request, time.Second)
31533103
if err != nil {
31543104
return nil, errors.Trace(err)
31553105
}
31563106
var result tiFlashSQLExecuteResponse
3157-
err = json.Unmarshal(body, &result)
3158-
if err != nil {
3159-
return nil, errors.Wrapf(err, "Failed to decode JSON from TiFlash")
3107+
if tiflashResp, ok := resp.Resp.(*kvrpcpb.TiFlashSystemTableResponse); ok {
3108+
err = json.Unmarshal(tiflashResp.Data, &result)
3109+
if err != nil {
3110+
return nil, errors.Wrapf(err, "Failed to decode JSON from TiFlash")
3111+
}
3112+
} else {
3113+
return nil, errors.Errorf("Unexpected response type: %T", resp.Resp)
31603114
}
31613115

31623116
// Map result columns back to our columns. It is possible that some columns cannot be
@@ -3206,7 +3160,7 @@ func (e *TiFlashSystemTableRetriever) dataForTiFlashSystemTables(ctx sessionctx.
32063160
return nil, errors.Errorf("Meet column of unknown type %v", column)
32073161
}
32083162
}
3209-
outputRow[len(e.outputCols)-1].SetString(instanceInfo.id, mysql.DefaultCollationName)
3163+
outputRow[len(e.outputCols)-1].SetString(instanceID, mysql.DefaultCollationName)
32103164
outputRows = append(outputRows, outputRow)
32113165
}
32123166
e.rowIdx += len(outputRows)

0 commit comments

Comments
 (0)