Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions pkg/ddl/tests/resourcegroup/resource_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,100 @@ func TestResourceGroupRunaway(t *testing.T) {
tk.MustGetErrCode("select /*+ resource_group(rg4) */ * from t", mysql.ErrResourceGroupQueryRunawayQuarantine)
tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, watch_text from mysql.tidb_runaway_watch", nil,
testkit.Rows("rg3 select /*+ resource_group(rg3) */ * from t", "rg4 select /*+ resource_group(rg4) */ * from t"), maxWaitDuration, tryInterval)
<<<<<<< HEAD:pkg/ddl/tests/resourcegroup/resource_group_test.go
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/copr/sleepCoprAfterReq"))
=======
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/checkThresholds"))

tk.MustExec("create resource group rg5 BURSTABLE=UNLIMITED RU_PER_SEC=2000 QUERY_LIMIT=(PROCESSED_KEYS=10 action KILL WATCH EXACT)")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/checkThresholds", "return(true)"))
err = tk.QueryToErr("select /*+ resource_group(rg5) */ * from t")
require.ErrorContains(t, err, "Query execution was interrupted, identified as runaway query")
tk.MustGetErrCode("select /*+ resource_group(rg5) */ * from t", mysql.ErrResourceGroupQueryRunawayQuarantine)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/checkThresholds"))
}

func TestResourceGroupRunawayExceedTiDBSide(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/FastRunawayGC", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/FastRunawayGC"))
}()
store, dom := testkit.CreateMockStoreAndDomain(t)
sv := server.CreateMockServer(t, store)
sv.SetDomain(dom)
defer sv.Close()

conn1 := server.CreateMockConn(t, sv)
tk := testkit.NewTestKitWithSession(t, store, conn1.Context().Session)

go dom.ExpensiveQueryHandle().SetSessionManager(sv).Run()
tk.MustExec("set global tidb_enable_resource_control='on'")
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "localhost"}, nil, nil, nil))

tk.MustExec("use test")
tk.MustExec("create table t(a int)")
tk.MustExec("insert into t values(1)")
tk.MustExec("create resource group rg1 RU_PER_SEC=1000 QUERY_LIMIT=(EXEC_ELAPSED='50ms' ACTION=KILL)")

err := tk.QueryToErr("select /*+ resource_group(rg1) */ sleep(0.5) from t")
require.ErrorContains(t, err, "[executor:8253]Query execution was interrupted, identified as runaway query")

tryInterval := time.Millisecond * 100
maxWaitDuration := time.Second * 5
tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, sample_sql, match_type from mysql.tidb_runaway_queries", nil,
testkit.Rows("rg1 select /*+ resource_group(rg1) */ sleep(0.5) from t identify"), maxWaitDuration, tryInterval)
tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, sample_sql, start_time from mysql.tidb_runaway_queries", nil,
nil, maxWaitDuration, tryInterval)
}

func TestResourceGroupRunawayFlood(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/FastRunawayGC", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/FastRunawayGC"))
}()
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "localhost"}, nil, nil, nil))

tk.MustExec("use test")
tk.MustExec("create table t(a int)")
tk.MustExec("insert into t values(1)")
tk.MustExec("set global tidb_enable_resource_control='on'")
tk.MustExec("create resource group rg1 RU_PER_SEC=1000 QUERY_LIMIT=(EXEC_ELAPSED='50ms' ACTION=KILL)")
tk.MustQuery("select /*+ resource_group(rg1) */ * from t").Check(testkit.Rows("1"))

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/copr/sleepCoprRequest", fmt.Sprintf("return(%d)", 60)))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/copr/sleepCoprRequest"))
}()
err := tk.QueryToErr("select /*+ resource_group(rg1) */ sleep(0.1) from t")
require.ErrorContains(t, err, "Query execution was interrupted, identified as runaway query")

tryInterval := time.Millisecond * 100
maxWaitDuration := time.Second * 5
tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, sample_sql, repeats, match_type from mysql.tidb_runaway_queries", nil,
testkit.Rows("rg1 select /*+ resource_group(rg1) */ sleep(0.1) from t 1 identify"), maxWaitDuration, tryInterval)
// wait for the runaway watch to be cleaned up
tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, sample_sql, repeats from mysql.tidb_runaway_queries", nil,
nil, maxWaitDuration, tryInterval)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/FastRunawayGC"))

// check thrice to make sure the runaway query be regarded as a repeated query.
err = tk.QueryToErr("select /*+ resource_group(rg1) */ sleep(0.2) from t")
require.ErrorContains(t, err, "Query execution was interrupted, identified as runaway query")
err = tk.QueryToErr("select /*+ resource_group(rg1) */ sleep(0.3) from t")
require.ErrorContains(t, err, "Query execution was interrupted, identified as runaway query")
// using FastRunawayGC to trigger flush
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/FastRunawayGC", `return(true)`))
err = tk.QueryToErr("select /*+ resource_group(rg1) */ sleep(0.4) from t")
require.ErrorContains(t, err, "Query execution was interrupted, identified as runaway query")
// only have one runaway query
tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, sample_sql, repeats, match_type from mysql.tidb_runaway_queries", nil,
testkit.Rows("rg1 select /*+ resource_group(rg1) */ sleep(0.2) from t 3 identify"), maxWaitDuration, tryInterval)
// wait for the runaway watch to be cleaned up
tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, sample_sql, repeats from mysql.tidb_runaway_queries", nil,
nil, maxWaitDuration, tryInterval)
>>>>>>> 2d0da8ee789 (fix(runaway): resolve the dead channel in UpdateNewAndDoneWatch (#61795)):pkg/resourcegroup/tests/resource_group_test.go
}

func TestAlreadyExistsDefaultResourceGroup(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1319,7 +1319,8 @@ func (do *Domain) Init(
do.wg.Run(do.topNSlowQueryLoop, "topNSlowQueryLoop")
do.wg.Run(do.infoSyncerKeeper, "infoSyncerKeeper")
do.wg.Run(do.globalConfigSyncerKeeper, "globalConfigSyncerKeeper")
do.wg.Run(do.runawayStartLoop, "runawayStartLoop")
do.wg.Run(do.runawayManager.RunawayRecordFlushLoop, "runawayRecordFlushLoop")
do.wg.Run(do.runawayManager.RunawayWatchSyncLoop, "runawayWatchSyncLoop")
do.wg.Run(do.requestUnitsWriterLoop, "requestUnitsWriterLoop")
if !skipRegisterToDashboard {
do.wg.Run(do.topologySyncerKeeper, "topologySyncerKeeper")
Expand Down
13 changes: 13 additions & 0 deletions pkg/domain/runaway.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@ import (
"context"
"net"
"strconv"
<<<<<<< HEAD
"strings"
"sync"
"time"
=======
>>>>>>> 2d0da8ee789 (fix(runaway): resolve the dead channel in UpdateNewAndDoneWatch (#61795))

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/tidb/pkg/domain/infosync"
<<<<<<< HEAD
"github.com/pingcap/tidb/pkg/domain/resourcegroup"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
Expand All @@ -37,11 +41,15 @@ import (
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/chunk"
=======
"github.com/pingcap/tidb/pkg/resourcegroup/runaway"
>>>>>>> 2d0da8ee789 (fix(runaway): resolve the dead channel in UpdateNewAndDoneWatch (#61795))
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
rmclient "github.com/tikv/pd/client/resource_group/controller"
<<<<<<< HEAD
"go.uber.org/zap"
)

Expand All @@ -56,6 +64,8 @@ const (

maxIDRetries = 3
runawayLoopLogErrorIntervalCount = 1800
=======
>>>>>>> 2d0da8ee789 (fix(runaway): resolve the dead channel in UpdateNewAndDoneWatch (#61795))
)

var systemSchemaCIStr = model.NewCIStr("mysql")
Expand Down Expand Up @@ -140,6 +150,7 @@ func (do *Domain) deleteExpiredRows(tableName, colName string, expiredDuration t
}
}
}
<<<<<<< HEAD

func (do *Domain) runawayStartLoop() {
defer util.Recover(metrics.LabelDomain, "runawayStartLoop", nil, false)
Expand Down Expand Up @@ -657,3 +668,5 @@ func (*SystemTableReader) Read(exec sqlexec.RestrictedSQLExecutor, genFn func()
)
return rows, err
}
=======
>>>>>>> 2d0da8ee789 (fix(runaway): resolve the dead channel in UpdateNewAndDoneWatch (#61795))
16 changes: 12 additions & 4 deletions pkg/executor/internal/querywatch/query_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ import (
)

func TestQueryWatch(t *testing.T) {
<<<<<<< HEAD
=======
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/FastRunawayGC", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/FastRunawayGC"))
}()
>>>>>>> 2d0da8ee789 (fix(runaway): resolve the dead channel in UpdateNewAndDoneWatch (#61795))
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand All @@ -36,6 +43,10 @@ func TestQueryWatch(t *testing.T) {
tk.MustExec("insert into t2 values(1)")
tk.MustExec("create table t3(a int)")
tk.MustExec("insert into t3 values(1)")
<<<<<<< HEAD
=======

>>>>>>> 2d0da8ee789 (fix(runaway): resolve the dead channel in UpdateNewAndDoneWatch (#61795))
err := tk.QueryToErr("query watch add sql text exact to 'select * from test.t1'")
require.ErrorContains(t, err, "must set runaway config for resource group `default`")
err = tk.QueryToErr("query watch add resource group rg2 action DRYRUN sql text exact to 'select * from test.t1'")
Expand Down Expand Up @@ -139,12 +150,9 @@ func TestQueryWatchIssue56897(t *testing.T) {
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/FastRunawayGC"))
}()
store, dom := testkit.CreateMockStoreAndDomain(t)
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
require.Eventually(t, func() bool {
return dom.RunawayManager().IsSyncerInitialized()
}, 20*time.Second, 300*time.Millisecond)
tk.MustQuery("QUERY WATCH ADD ACTION KILL SQL TEXT SIMILAR TO 'use test';").Check((testkit.Rows("1")))
time.Sleep(1 * time.Second)
_, err := tk.Exec("use test")
Expand Down
Loading