Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ type Config struct {
Experimental Experimental `toml:"experimental" json:"experimental"`
// SkipRegisterToDashboard tells TiDB don't register itself to the dashboard.
SkipRegisterToDashboard bool `toml:"skip-register-to-dashboard" json:"skip-register-to-dashboard"`
// EnableTelemetry enables the usage data report to PingCAP. Deprecated: Telemetry has been removed.
// EnableTelemetry enables the usage data print to log.
EnableTelemetry bool `toml:"enable-telemetry" json:"enable-telemetry"`
// Labels indicates the labels set for the tidb server. The labels describe some specific properties for the tidb
// server like `zone`/`rack`/`host`. Currently, labels won't affect the tidb server except for some special
Expand Down
3 changes: 1 addition & 2 deletions pkg/config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,8 @@ new_collations_enabled_on_first_bootstrap = true
# *If you want to start a TiDB service, NEVER enable this.*
skip-register-to-dashboard = false

# When enabled, usage data (for example, instance versions) will be reported to PingCAP periodically for user experience analytics.
# When enabled, usage data (for example, instance versions) will be reported to log periodically for user experience analytics on cloud.
# If this config is set to `false` on all TiDB servers, telemetry will be always disabled regardless of the value of the global variable `tidb_enable_telemetry`.
# See PingCAP privacy policy for details: https://pingcap.com/en/privacy-policy/
enable-telemetry = false

# deprecate-integer-display-length is used to be compatible with MySQL 8.0 in which the integer declared with display length will be returned with
Expand Down
4 changes: 4 additions & 0 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ const (
MaxCommentLength = 1024
)

var telemetryAddIndexIngestUsage = metrics.TelemetryAddIndexIngestCnt

func buildIndexColumns(ctx *metabuild.Context, columns []*model.ColumnInfo, indexPartSpecifications []*ast.IndexPartSpecification, columnarIndexType model.ColumnarIndexType) ([]*model.IndexColumn, bool, error) {
// Build offsets.
idxParts := make([]*model.IndexColumn, 0, len(indexPartSpecifications))
Expand Down Expand Up @@ -1099,6 +1101,8 @@ SwitchIndexState:
}
loadCloudStorageURI(w, job)
if reorgTp.NeedMergeProcess() {
// Increase telemetryAddIndexIngestUsage
telemetryAddIndexIngestUsage.Inc()
for _, indexInfo := range allIndexInfos {
indexInfo.BackfillState = model.BackfillStateRunning
}
Expand Down
1 change: 1 addition & 0 deletions pkg/distsql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_library(
"//pkg/sessionctx/vardef",
"//pkg/store/copr",
"//pkg/tablecodec",
"//pkg/telemetry",
"//pkg/types",
"//pkg/util/chunk",
"//pkg/util/codec",
Expand Down
39 changes: 39 additions & 0 deletions pkg/distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/store/copr"
"github.com/pingcap/tidb/pkg/telemetry"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/codec"
Expand All @@ -52,6 +53,12 @@ var (
errQueryInterrupted = dbterror.ClassExecutor.NewStd(errno.ErrQueryInterrupted)
)

var (
telemetryBatchedQueryTaskCnt = metrics.TelemetryBatchedQueryTaskCnt
telemetryStoreBatchedCnt = metrics.TelemetryStoreBatchedCnt
telemetryStoreBatchedFallbackCnt = metrics.TelemetryStoreBatchedFallbackCnt
)

var (
_ SelectResult = (*selectResult)(nil)
_ SelectResult = (*serialSelectResults)(nil)
Expand Down Expand Up @@ -311,6 +318,35 @@ type selectResult struct {
}

func (r *selectResult) fetchResp(ctx context.Context) error {
defer func() {
if r.stats != nil {
// Ignore internal sql.
if !r.ctx.InRestrictedSQL && r.stats.copRespTime.Size() > 0 {
ratio := r.stats.calcCacheHit()
if ratio >= 1 {
telemetry.CurrentCoprCacheHitRatioGTE100Count.Inc()
}
if ratio >= 0.8 {
telemetry.CurrentCoprCacheHitRatioGTE80Count.Inc()
}
if ratio >= 0.4 {
telemetry.CurrentCoprCacheHitRatioGTE40Count.Inc()
}
if ratio >= 0.2 {
telemetry.CurrentCoprCacheHitRatioGTE20Count.Inc()
}
if ratio >= 0.1 {
telemetry.CurrentCoprCacheHitRatioGTE10Count.Inc()
}
if ratio >= 0.01 {
telemetry.CurrentCoprCacheHitRatioGTE1Count.Inc()
}
if ratio >= 0 {
telemetry.CurrentCoprCacheHitRatioGTE0Count.Inc()
}
}
}
}()
for {
r.respChkIdx = 0
startTime := time.Now()
Expand Down Expand Up @@ -620,6 +656,9 @@ func (r *selectResult) Close() error {
batched, fallback := ci.GetStoreBatchInfo()
if batched != 0 || fallback != 0 {
r.stats.storeBatchedNum, r.stats.storeBatchedFallbackNum = batched, fallback
telemetryStoreBatchedCnt.Add(float64(r.stats.storeBatchedNum))
telemetryStoreBatchedFallbackCnt.Add(float64(r.stats.storeBatchedFallbackNum))
telemetryBatchedQueryTaskCnt.Add(float64(r.stats.copRespTime.Size()))
}
}
r.stats.fetchRspDuration = r.fetchDuration
Expand Down
1 change: 1 addition & 0 deletions pkg/domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ go_library(
"//pkg/statistics/handle/util",
"//pkg/statistics/util",
"//pkg/store",
"//pkg/telemetry",
"//pkg/ttl/ttlworker",
"//pkg/types",
"//pkg/util",
Expand Down
38 changes: 38 additions & 0 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ import (
statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil"
handleutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
kvstore "github.com/pingcap/tidb/pkg/store"
"github.com/pingcap/tidb/pkg/telemetry"
"github.com/pingcap/tidb/pkg/ttl/ttlworker"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
Expand Down Expand Up @@ -1598,6 +1599,40 @@ func (do *Domain) globalBindHandleWorkerLoop(owner owner.Manager) {
}, "globalBindHandleWorkerLoop")
}

// TelemetryLoop create a goroutine that reports usage data in a loop, it should be called only once
// in BootstrapSession.
func (do *Domain) TelemetryLoop(ctx sessionctx.Context) {
ctx.GetSessionVars().InRestrictedSQL = true
err := telemetry.InitialRun(ctx)
if err != nil {
logutil.BgLogger().Warn("Initial telemetry run failed", zap.Error(err))
}

reportTicker := time.NewTicker(telemetry.ReportInterval)
subWindowTicker := time.NewTicker(telemetry.SubWindowSize)

do.wg.Run(func() {
defer func() {
logutil.BgLogger().Info("TelemetryReportLoop exited.")
}()
defer util.Recover(metrics.LabelDomain, "TelemetryReportLoop", nil, false)

for {
select {
case <-do.exit:
return
case <-reportTicker.C:
err := telemetry.ReportUsageData(ctx)
if err != nil {
logutil.BgLogger().Warn("TelemetryLoop retports usaged data failed", zap.Error(err))
}
case <-subWindowTicker.C:
telemetry.RotateSubWindow()
}
}
}, "TelemetryLoop")
}

// SetupPlanReplayerHandle setup plan replayer handle
func (do *Domain) SetupPlanReplayerHandle(collectorSctx sessionctx.Context, workersSctxs []sessionctx.Context) {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
Expand Down Expand Up @@ -2880,6 +2915,9 @@ func (do *Domain) readTableCostWorker(wbLearningHandle *workloadlearning.Handle,

func init() {
initByLDFlagsForGlobalKill()
telemetry.GetDomainInfoSchema = func(ctx sessionctx.Context) infoschema.InfoSchema {
return GetDomain(ctx).InfoSchema()
}
}

var (
Expand Down
47 changes: 45 additions & 2 deletions pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,48 @@ func (a *recordSet) GetExecutor4Test() any {
return a.executor
}

// TelemetryInfo records some telemetry information during execution.
type TelemetryInfo struct {
UseNonRecursive bool
UseRecursive bool
UseMultiSchemaChange bool
UseExchangePartition bool
UseFlashbackToCluster bool
PartitionTelemetry *PartitionTelemetryInfo
AccountLockTelemetry *AccountLockTelemetryInfo
UseIndexMerge bool
UseTableLookUp atomic.Bool
}

// PartitionTelemetryInfo records table partition telemetry information during execution.
type PartitionTelemetryInfo struct {
UseTablePartition bool
UseTablePartitionList bool
UseTablePartitionRange bool
UseTablePartitionHash bool
UseTablePartitionRangeColumns bool
UseTablePartitionRangeColumnsGt1 bool
UseTablePartitionRangeColumnsGt2 bool
UseTablePartitionRangeColumnsGt3 bool
UseTablePartitionListColumns bool
TablePartitionMaxPartitionsNum uint64
UseCreateIntervalPartition bool
UseAddIntervalPartition bool
UseDropIntervalPartition bool
UseCompactTablePartition bool
UseReorganizePartition bool
}

// AccountLockTelemetryInfo records account lock/unlock information during execution
type AccountLockTelemetryInfo struct {
// The number of CREATE/ALTER USER statements that lock the user
LockUser int64
// The number of CREATE/ALTER USER statements that unlock the user
UnlockUser int64
// The number of CREATE/ALTER USER statements
CreateOrAlterUser int64
}

// ExecStmt implements the sqlexec.Statement interface, it builds a planner.Plan to an sqlexec.Statement.
type ExecStmt struct {
// GoCtx stores parent go context.Context for a stmt.
Expand Down Expand Up @@ -285,6 +327,7 @@ type ExecStmt struct {
// OutputNames will be set if using cached plan
OutputNames []*types.FieldName
PsStmt *plannercore.PlanCacheStmt
Ti *TelemetryInfo
}

// GetStmtNode returns the stmtNode inside Statement
Expand Down Expand Up @@ -339,7 +382,7 @@ func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) {
}

if executor == nil {
b := newExecutorBuilder(a.Ctx, a.InfoSchema)
b := newExecutorBuilder(a.Ctx, a.InfoSchema, a.Ti)
executor = b.build(a.Plan)
if b.err != nil {
return nil, b.err
Expand Down Expand Up @@ -1236,7 +1279,7 @@ func (a *ExecStmt) buildExecutor() (exec.Executor, error) {
ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow
}

b := newExecutorBuilder(ctx, a.InfoSchema)
b := newExecutorBuilder(ctx, a.InfoSchema, a.Ti)
e := b.build(a.Plan)
if b.err != nil {
return nil, errors.Trace(b.err)
Expand Down
10 changes: 5 additions & 5 deletions pkg/executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func buildHashAggExecutor(ctx sessionctx.Context, src exec.Executor, schema *exp
plan.SetSchema(schema)
plan.Init(ctx.GetPlanCtx(), nil, 0)
plan.SetChildren(nil)
b := newExecutorBuilder(ctx, nil)
b := newExecutorBuilder(ctx, nil, nil)
exec := b.build(plan)
hashAgg := exec.(*aggregate.HashAggExec)
hashAgg.SetChildren(0, src)
Expand Down Expand Up @@ -119,7 +119,7 @@ func buildStreamAggExecutor(ctx sessionctx.Context, srcExec exec.Executor, schem
plan = sg
}

b := newExecutorBuilder(ctx, nil)
b := newExecutorBuilder(ctx, nil, nil)
return b.build(plan)
}

Expand Down Expand Up @@ -352,7 +352,7 @@ func buildWindowExecutor(ctx sessionctx.Context, windowFunc string, funcs int, f
plan = win
}

b := newExecutorBuilder(ctx, nil)
b := newExecutorBuilder(ctx, nil, nil)
exec := b.build(plan)
return exec
}
Expand Down Expand Up @@ -1253,7 +1253,7 @@ func prepare4IndexInnerHashJoin(tc *IndexJoinTestCase, outerDS *testutil.MockDat
keyOff2IdxOff[i] = i
}

readerBuilder, err := newExecutorBuilder(tc.Ctx, nil).
readerBuilder, err := newExecutorBuilder(tc.Ctx, nil, nil).
newDataReaderBuilder(&mockPhysicalIndexReader{e: innerDS})
if err != nil {
return nil, err
Expand Down Expand Up @@ -1327,7 +1327,7 @@ func prepare4IndexMergeJoin(tc *IndexJoinTestCase, outerDS *testutil.MockDataSou
outerCompareFuncs = append(outerCompareFuncs, expression.GetCmpFunction(nil, outerJoinKeys[i], outerJoinKeys[i]))
}

readerBuilder, err := newExecutorBuilder(tc.Ctx, nil).
readerBuilder, err := newExecutorBuilder(tc.Ctx, nil, nil).
newDataReaderBuilder(&mockPhysicalIndexReader{e: innerDS})
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/brie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func TestBRIEBuilderOptions(t *testing.T) {
sctx.GetSessionVars().User = &auth.UserIdentity{Username: "test"}
is := infoschema.MockInfoSchema([]*model.TableInfo{core.MockSignedTable(), core.MockUnsignedTable()})
ResetGlobalBRIEQueueForTest()
builder := NewMockExecutorBuilderForTest(sctx, is)
builder := NewMockExecutorBuilderForTest(sctx, is, nil)
ctx := context.Background()
p := parser.New()
p.SetParserConfig(parser.ParserConfig{EnableWindowFunction: true, EnableStrictDoubleTypeCheck: true})
Expand Down
Loading