Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion domain/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc
return true
}

file, fileName, err := replayer.GeneratePlanReplayerFile()
file, fileName, err := replayer.GeneratePlanReplayerFile(task.IsContinuesCapture)
if err != nil {
logutil.BgLogger().Warn("generate plan replayer capture task file failed",
zap.String("sqlDigest", taskKey.SQLDigest),
Expand Down
18 changes: 10 additions & 8 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,16 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS
}
}
if c.Ctx.GetSessionVars().IsPlanReplayerCaptureEnabled() && !c.Ctx.GetSessionVars().InRestrictedSQL {
startTS, err := sessiontxn.GetTxnManager(c.Ctx).GetStmtReadTS()
if err != nil {
return nil, err
}
if c.Ctx.GetSessionVars().EnablePlanReplayedContinuesCapture {
checkPlanReplayerContinuesCapture(c.Ctx, stmtNode, startTS)
} else {
checkPlanReplayerCaptureTask(c.Ctx, stmtNode, startTS)
if _, ok := stmtNode.(*ast.SelectStmt); ok {
startTS, err := sessiontxn.GetTxnManager(c.Ctx).GetStmtReadTS()
if err != nil {
return nil, err
}
if c.Ctx.GetSessionVars().EnablePlanReplayedContinuesCapture {
checkPlanReplayerContinuesCapture(c.Ctx, stmtNode, startTS)
} else {
checkPlanReplayerCaptureTask(c.Ctx, stmtNode, startTS)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion executor/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (e *PlanReplayerExec) registerCaptureTask(ctx context.Context) error {

func (e *PlanReplayerExec) createFile() error {
var err error
e.DumpInfo.File, e.DumpInfo.FileName, err = replayer.GeneratePlanReplayerFile()
e.DumpInfo.File, e.DumpInfo.FileName, err = replayer.GeneratePlanReplayerFile(false)
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ go_library(
"//sessionctx/stmtctx",
"//sessionctx/variable",
"//sessiontxn",
"//statistics/handle",
"//store",
"//store/driver/error",
"//store/gcworker",
Expand Down Expand Up @@ -91,6 +92,7 @@ go_library(
"//util/topsql/stmtstats",
"//util/versioninfo",
"@com_github_blacktear23_go_proxyprotocol//:go-proxyprotocol",
"@com_github_burntsushi_toml//:toml",
"@com_github_gorilla_mux//:mux",
"@com_github_opentracing_opentracing_go//:opentracing-go",
"@com_github_pingcap_errors//:errors",
Expand Down
196 changes: 193 additions & 3 deletions server/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,26 @@
package server

import (
"archive/zip"
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"

"github.com/BurntSushi/toml"
"github.com/gorilla/mux"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/replayer"
Expand All @@ -32,9 +43,11 @@ import (

// PlanReplayerHandler is the handler for dumping plan replayer file.
type PlanReplayerHandler struct {
infoGetter *infosync.InfoSyncer
address string
statusPort uint
is infoschema.InfoSchema
statsHandle *handle.Handle
infoGetter *infosync.InfoSyncer
address string
statusPort uint
}

func (s *Server) newPlanReplayerHandler() *PlanReplayerHandler {
Expand All @@ -46,6 +59,12 @@ func (s *Server) newPlanReplayerHandler() *PlanReplayerHandler {
if s.dom != nil && s.dom.InfoSyncer() != nil {
prh.infoGetter = s.dom.InfoSyncer()
}
if s.dom != nil && s.dom.InfoSchema() != nil {
prh.is = s.dom.InfoSchema()
}
if s.dom != nil && s.dom.StatsHandle() != nil {
prh.statsHandle = s.dom.StatsHandle()
}
return prh
}

Expand All @@ -61,6 +80,8 @@ func (prh PlanReplayerHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques
urlPath: fmt.Sprintf("plan_replayer/dump/%s", name),
downloadedFilename: "plan_replayer",
scheme: util.InternalHTTPSchema(),
statsHandle: prh.statsHandle,
is: prh.is,
}
handleDownloadFile(handler, w, req)
}
Expand Down Expand Up @@ -93,6 +114,13 @@ func handleDownloadFile(handler downloadFileHandler, w http.ResponseWriter, req
writeError(w, err)
return
}
if handler.downloadedFilename == "plan_replayer" {
content, err = handlePlanReplayerContinuesCaptureFile(content, path, handler)
if err != nil {
writeError(w, err)
return
}
}
_, err = w.Write(content)
if err != nil {
writeError(w, err)
Expand Down Expand Up @@ -175,6 +203,9 @@ type downloadFileHandler struct {
statusPort uint
urlPath string
downloadedFilename string

statsHandle *handle.Handle
is infoschema.InfoSchema
}

func isExists(path string) (bool, error) {
Expand All @@ -187,3 +218,162 @@ func isExists(path string) (bool, error) {
}
return true, nil
}

func handlePlanReplayerContinuesCaptureFile(content []byte, path string, handler downloadFileHandler) ([]byte, error) {
if !strings.Contains(handler.filePath, "continues_replayer") {
return content, nil
}
b := bytes.NewReader(content)
zr, err := zip.NewReader(b, int64(len(content)))
if err != nil {
return nil, err
}
startTS, err := loadSQLMetaFile(zr)
if err != nil {
return nil, err
}
if startTS == 0 {
return content, nil
}
tbls, err := loadSchemaMeta(zr, handler.is)
if err != nil {
return nil, err
}
for _, tbl := range tbls {
jsonStats, err := handler.statsHandle.DumpHistoricalStatsBySnapshot(tbl.dbName, tbl.info, startTS)
if err != nil {
return nil, err
}
tbl.jsonStats = jsonStats
}
newPath, err := dumpJSONStatsIntoZip(tbls, content, path)
if err != nil {
return nil, err
}
//nolint: gosec
file, err := os.Open(newPath)
if err != nil {
return nil, err
}
content, err = io.ReadAll(file)
if err != nil {
return nil, err
}
err = file.Close()
if err != nil {
return nil, err
}
return content, nil
}

func loadSQLMetaFile(z *zip.Reader) (uint64, error) {
for _, zipFile := range z.File {
if zipFile.Name == domain.PlanReplayerSQLMetaFile {
varMap := make(map[string]string)
v, err := zipFile.Open()
if err != nil {
return 0, errors.AddStack(err)
}
//nolint: errcheck,all_revive
defer v.Close()
_, err = toml.DecodeReader(v, &varMap)
if err != nil {
return 0, errors.AddStack(err)
}
startTS, err := strconv.ParseUint(varMap[domain.PlanReplayerSQLMetaStartTS], 10, 64)
if err != nil {
return 0, err
}
return startTS, nil
}
}
return 0, nil
}

func loadSchemaMeta(z *zip.Reader, is infoschema.InfoSchema) (map[int64]*tblInfo, error) {
r := make(map[int64]*tblInfo, 0)
for _, zipFile := range z.File {
if zipFile.Name == fmt.Sprintf("schema/%v", domain.PlanReplayerSchemaMetaFile) {
v, err := zipFile.Open()
if err != nil {
return nil, errors.AddStack(err)
}
//nolint: errcheck,all_revive
defer v.Close()
buf := new(bytes.Buffer)
_, err = buf.ReadFrom(v)
if err != nil {
return nil, errors.AddStack(err)
}
rows := strings.Split(buf.String(), "\n")
for _, row := range rows {
s := strings.Split(row, ";")
databaseName := s[0]
tableName := s[1]
t, err := is.TableByName(model.NewCIStr(databaseName), model.NewCIStr(tableName))
if err != nil {
return nil, err
}
r[t.Meta().ID] = &tblInfo{
info: t.Meta(),
dbName: databaseName,
tblName: tableName,
}
}
break
}
}
return r, nil
}

func dumpJSONStatsIntoZip(tbls map[int64]*tblInfo, content []byte, path string) (string, error) {
zr, err := zip.NewReader(bytes.NewReader(content), int64(len(content)))
if err != nil {
return "", err
}
newPath := fmt.Sprintf("copy_%v.zip", path[0:len(path)-4])
zf, err := os.Create(newPath)
if err != nil {
return "", err
}
zw := zip.NewWriter(zf)
for _, f := range zr.File {
err = zw.Copy(f)
if err != nil {
logutil.BgLogger().Error("copy plan replayer zip file failed", zap.Error(err))
return "", err
}
}
for _, tbl := range tbls {
w, err := zw.Create(fmt.Sprintf("stats/%v.%v.json", tbl.dbName, tbl.tblName))
if err != nil {
return "", err
}
data, err := json.Marshal(tbl.jsonStats)
if err != nil {
return "", err
}
_, err = w.Write(data)
if err != nil {
return "", err
}
}
err = zw.Close()
if err != nil {
logutil.BgLogger().Error("Closing file failed", zap.Error(err))
return "", err
}
err = zf.Close()
if err != nil {
logutil.BgLogger().Error("Closing file failed", zap.Error(err))
return "", err
}
return newPath, nil
}

type tblInfo struct {
info *model.TableInfo
jsonStats *handle.JSONTable
dbName string
tblName string
}
1 change: 1 addition & 0 deletions statistics/handle/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func (h *Handle) ClearOutdatedHistoryStats() error {
}
sql = "delete from mysql.stats_history where NOW() - create_time >= %? "
_, err = exec.ExecuteInternal(ctx, sql, variable.HistoricalStatsDuration.Load().Seconds())
logutil.BgLogger().Info("clear outdated historical stats")
return err
}

Expand Down
9 changes: 6 additions & 3 deletions util/replayer/replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ type PlanReplayerTaskKey struct {
}

// GeneratePlanReplayerFile generates plan replayer file
func GeneratePlanReplayerFile() (*os.File, string, error) {
func GeneratePlanReplayerFile(isContinues bool) (*os.File, string, error) {
path := GetPlanReplayerDirName()
err := os.MkdirAll(path, os.ModePerm)
if err != nil {
return nil, "", errors.AddStack(err)
}
fileName, err := generatePlanReplayerFileName()
fileName, err := generatePlanReplayerFileName(isContinues)
if err != nil {
return nil, "", errors.AddStack(err)
}
Expand All @@ -50,7 +50,7 @@ func GeneratePlanReplayerFile() (*os.File, string, error) {
return zf, fileName, err
}

func generatePlanReplayerFileName() (string, error) {
func generatePlanReplayerFileName(isContinues bool) (string, error) {
// Generate key and create zip file
time := time.Now().UnixNano()
b := make([]byte, 16)
Expand All @@ -60,6 +60,9 @@ func generatePlanReplayerFileName() (string, error) {
return "", err
}
key := base64.URLEncoding.EncodeToString(b)
if isContinues {
return fmt.Sprintf("continues_replayer_%v_%v.zip", key, time), nil
}
return fmt.Sprintf("replayer_%v_%v.zip", key, time), nil
}

Expand Down