Skip to content

Commit 81cb9a5

Browse files
lcwangchaozeminzhou
authored andcommitted
ttl: disable paging in TTL (pingcap#58759)
close pingcap#58342
1 parent faa9582 commit 81cb9a5

File tree

3 files changed

+96
-10
lines changed

3 files changed

+96
-10
lines changed

pkg/ttl/ttlworker/scan.go

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,11 @@ package ttlworker
1717
import (
1818
"context"
1919
"fmt"
20-
"strconv"
2120
"sync/atomic"
2221
"time"
2322

2423
"github.com/pingcap/errors"
2524
"github.com/pingcap/tidb/pkg/parser/ast"
26-
"github.com/pingcap/tidb/pkg/parser/terror"
2725
"github.com/pingcap/tidb/pkg/sessionctx/variable"
2826
"github.com/pingcap/tidb/pkg/ttl/cache"
2927
"github.com/pingcap/tidb/pkg/ttl/metrics"
@@ -199,17 +197,12 @@ func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, s
199197
))
200198
}
201199

202-
origConcurrency := rawSess.GetSessionVars().DistSQLScanConcurrency()
203-
if _, err = rawSess.ExecuteSQL(ctx, "set @@tidb_distsql_scan_concurrency=1"); err != nil {
200+
sess, restoreSession, err := NewScanSession(rawSess, t.tbl, t.ExpireTime)
201+
if err != nil {
204202
return t.result(err)
205203
}
204+
defer restoreSession()
206205

207-
defer func() {
208-
_, err = rawSess.ExecuteSQL(ctx, "set @@tidb_distsql_scan_concurrency="+strconv.Itoa(origConcurrency))
209-
terror.Log(err)
210-
}()
211-
212-
sess := newTableSession(rawSess, t.tbl, t.ExpireTime)
213206
generator, err := sqlbuilder.NewScanQueryGenerator(t.tbl, t.ExpireTime, t.ScanRangeStart, t.ScanRangeEnd)
214207
if err != nil {
215208
return t.result(err)

pkg/ttl/ttlworker/session.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,36 @@ func newTableSession(se session.Session, tbl *cache.PhysicalTable, expire time.T
208208
}
209209
}
210210

211+
// NewScanSession creates a session for scan
212+
func NewScanSession(se session.Session, tbl *cache.PhysicalTable, expire time.Time) (*ttlTableSession, func(), error) {
213+
origConcurrency := se.GetSessionVars().DistSQLScanConcurrency()
214+
origPaging := se.GetSessionVars().EnablePaging
215+
216+
restore := func() {
217+
_, err := se.ExecuteSQL(context.Background(), "set @@tidb_distsql_scan_concurrency=%?", origConcurrency)
218+
terror.Log(err)
219+
_, err = se.ExecuteSQL(context.Background(), "set @@tidb_enable_paging=%?", origPaging)
220+
terror.Log(err)
221+
}
222+
223+
// Set the distsql scan concurrency to 1 to reduce the number of cop tasks in TTL scan.
224+
if _, err := se.ExecuteSQL(context.Background(), "set @@tidb_distsql_scan_concurrency=1"); err != nil {
225+
restore()
226+
return nil, nil, err
227+
}
228+
229+
// Disable tidb_enable_paging because we have already had a `LIMIT` in the SQL to limit the result set.
230+
// If `tidb_enable_paging` is enabled, it may have multiple cop tasks even in one region that makes some extra
231+
// processed keys in TiKV side, see issue: https://github.com/pingcap/tidb/issues/58342.
232+
// Disable it to make the scan more efficient.
233+
if _, err := se.ExecuteSQL(context.Background(), "set @@tidb_enable_paging=OFF"); err != nil {
234+
restore()
235+
return nil, nil, err
236+
}
237+
238+
return newTableSession(se, tbl, expire), restore, nil
239+
}
240+
211241
type ttlTableSession struct {
212242
session.Session
213243
tbl *cache.PhysicalTable

pkg/ttl/ttlworker/session_integration_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@ import (
2020
"strings"
2121
"sync/atomic"
2222
"testing"
23+
"time"
2324

2425
"github.com/ngaut/pools"
2526
"github.com/pingcap/tidb/pkg/parser/ast"
2627
"github.com/pingcap/tidb/pkg/sessionctx"
2728
"github.com/pingcap/tidb/pkg/testkit"
29+
"github.com/pingcap/tidb/pkg/ttl/cache"
2830
"github.com/pingcap/tidb/pkg/ttl/ttlworker"
2931
"github.com/pingcap/tidb/pkg/util"
3032
"github.com/pingcap/tidb/pkg/util/logutil"
@@ -185,3 +187,64 @@ func TestGetSessionWithFault(t *testing.T) {
185187
require.True(t, se != nil || err != nil)
186188
}
187189
}
190+
191+
func TestNewScanSession(t *testing.T) {
192+
_, dom := testkit.CreateMockStoreAndDomain(t)
193+
pool := newFaultSessionPool(dom.SysSessionPool())
194+
pool.setFault(newFaultWithFilter(func(s string) bool { return false }, newFaultAfterCount(0)))
195+
se, err := ttlworker.GetSessionForTest(pool)
196+
require.NoError(t, err)
197+
198+
_, err = se.ExecuteSQL(context.Background(), "set @@tidb_distsql_scan_concurrency=123")
199+
require.NoError(t, err)
200+
require.Equal(t, 123, se.GetSessionVars().DistSQLScanConcurrency())
201+
202+
_, err = se.ExecuteSQL(context.Background(), "set @@tidb_enable_paging=ON")
203+
require.NoError(t, err)
204+
require.True(t, se.GetSessionVars().EnablePaging)
205+
206+
for _, errSQL := range []string{
207+
"",
208+
"set @@tidb_distsql_scan_concurrency=1",
209+
"set @@tidb_enable_paging=OFF",
210+
} {
211+
t.Run("test err in SQL: "+errSQL, func(t *testing.T) {
212+
var faultCnt atomic.Int64
213+
pool.setFault(newFaultWithFilter(func(s string) bool {
214+
if s == errSQL && s != "" {
215+
faultCnt.Add(1)
216+
return true
217+
}
218+
return false
219+
}, newFaultAfterCount(0)))
220+
tblSe, restore, err := ttlworker.NewScanSession(se, &cache.PhysicalTable{}, time.Now())
221+
if errSQL == "" {
222+
// success case
223+
require.NoError(t, err)
224+
require.NotNil(t, tblSe)
225+
require.NotNil(t, restore)
226+
require.Same(t, se, tblSe.Session)
227+
require.Equal(t, int64(0), faultCnt.Load())
228+
229+
// NewScanSession should override @@dist_sql_scan_concurrency and @@tidb_enable_paging
230+
require.Equal(t, 1, se.GetSessionVars().DistSQLScanConcurrency())
231+
require.False(t, se.GetSessionVars().EnablePaging)
232+
233+
// restore should restore the session variables
234+
restore()
235+
require.Equal(t, 123, se.GetSessionVars().DistSQLScanConcurrency())
236+
require.True(t, se.GetSessionVars().EnablePaging)
237+
} else {
238+
// error case
239+
require.Equal(t, int64(1), faultCnt.Load())
240+
require.EqualError(t, err, "fault in test")
241+
require.Nil(t, tblSe)
242+
require.Nil(t, restore)
243+
244+
// NewScanSession should not change session state if error occurs
245+
require.Equal(t, 123, se.GetSessionVars().DistSQLScanConcurrency())
246+
require.True(t, se.GetSessionVars().EnablePaging)
247+
}
248+
})
249+
}
250+
}

0 commit comments

Comments
 (0)