Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/timer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 5,
shard_count = 6,
deps = [
"//pkg/sessionctx",
"//pkg/testkit",
Expand Down
4 changes: 2 additions & 2 deletions pkg/timer/api/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func TestDefaultClient(t *testing.T) {
require.Empty(t, timer.EventData)
require.True(t, timer.EventStart.IsZero())
require.Equal(t, []byte("s1"), timer.SummaryData)
require.Equal(t, eventStart, timer.Watermark)
require.Equal(t, eventStart.Unix(), timer.Watermark.Unix())
require.Equal(t, EventExtra{}, timer.EventExtra)

// close event with option
Expand All @@ -267,7 +267,7 @@ func TestDefaultClient(t *testing.T) {
require.Empty(t, timer.EventData)
require.True(t, timer.EventStart.IsZero())
require.Equal(t, []byte("s2"), timer.SummaryData)
require.Equal(t, watermark, timer.Watermark)
require.Equal(t, watermark.Unix(), timer.Watermark.Unix())

// manual trigger
err = store.Update(ctx, timer.ID, &TimerUpdate{
Expand Down
21 changes: 21 additions & 0 deletions pkg/timer/api/mem_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ func (s *memoryStoreCore) Create(_ context.Context, record *TimerRecord) (string
record.EventStatus = SchedEventIdle
}

normalizeTimeFields(record)

if _, ok := s.id2Timers[record.ID]; ok {
return "", errors.Trace(ErrTimerExists)
}
Expand Down Expand Up @@ -137,6 +139,7 @@ func (s *memoryStoreCore) Update(_ context.Context, timerID string, update *Time
return err
}

normalizeTimeFields(newRecord)
if err = newRecord.Validate(); err != nil {
return err
}
Expand Down Expand Up @@ -303,3 +306,21 @@ func getMemStoreTimeZoneLoc(tz string) *time.Location {

return timeutil.SystemLocation()
}

func normalizeTimeFields(record *TimerRecord) {
if record.Location == nil {
return
}

if !record.Watermark.IsZero() {
record.Watermark = record.Watermark.In(record.Location)
}

if !record.EventStart.IsZero() {
record.EventStart = record.EventStart.In(record.Location)
}

if !record.CreateTime.IsZero() {
record.CreateTime = record.CreateTime.In(record.Location)
}
}
2 changes: 1 addition & 1 deletion pkg/timer/runtime/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func prepareTimer(t *testing.T, cli api.TimerClient) *api.TimerRecord {
require.Equal(t, "1m", timer.SchedPolicyExpr)
require.Equal(t, "h1", timer.HookClass)
require.True(t, timer.Enable)
require.Equal(t, watermark, timer.Watermark)
require.Equal(t, watermark.Unix(), timer.Watermark.Unix())
require.Equal(t, []byte("summary1"), timer.SummaryData)
require.True(t, !timer.CreateTime.Before(now))
require.True(t, !timer.CreateTime.After(time.Now()))
Expand Down
136 changes: 136 additions & 0 deletions pkg/timer/store_intergartion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package timer_test

import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -256,6 +257,7 @@ func runTimerStoreUpdate(ctx context.Context, t *testing.T, store *api.TimerStor
EventManualRequestID: "req2",
EventWatermark: time.Unix(456, 0),
}
tpl.CreateTime = tpl.CreateTime.In(time.UTC)
require.Equal(t, *tpl, *record)

// tags full update again
Expand Down Expand Up @@ -328,6 +330,7 @@ func runTimerStoreUpdate(ctx context.Context, t *testing.T, store *api.TimerStor
tpl.EventExtra = api.EventExtra{}
tpl.Watermark = zeroTime
tpl.SummaryData = nil
tpl.CreateTime = tpl.CreateTime.In(tpl.Location)
require.Equal(t, *tpl, *record)

// err check version
Expand Down Expand Up @@ -872,3 +875,136 @@ func TestTableStoreManualTrigger(t *testing.T) {
require.True(t, timer.ManualProcessed)
require.Equal(t, api.EventExtra{}, timer.EventExtra)
}

func TestTimerStoreWithTimeZone(t *testing.T) {
// mem store
testTimerStoreWithTimeZone(t, api.NewMemoryTimerStore(), timeutil.SystemLocation().String())

// table store
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
dbName := "test"
tblName := "timerstore"
tk.MustExec("use test")
tk.MustExec(tablestore.CreateTimerTableSQL(dbName, tblName))
tk.MustExec("set @@time_zone = 'America/Los_Angeles'")

pool := pools.NewResourcePool(func() (pools.Resource, error) {
return tk.Session(), nil
}, 1, 1, time.Second)
defer pool.Close()

timerStore := tablestore.NewTableTimerStore(1, pool, dbName, tblName, nil)
defer timerStore.Close()

testTimerStoreWithTimeZone(t, timerStore, timeutil.SystemLocation().String())
tk.MustExec("set @@global.time_zone='Asia/Tokyo'")
tk.MustExec(fmt.Sprintf("truncate table %s.%s", dbName, tblName))
testTimerStoreWithTimeZone(t, timerStore, "Asia/Tokyo")

// check time zone should be set back to the previous one.
require.Equal(t, "America/Los_Angeles", tk.Session().GetSessionVars().Location().String())
}

func testTimerStoreWithTimeZone(t *testing.T, timerStore *api.TimerStore, defaultTZ string) {
// 2024-11-03 09:30:00 UTC is 2024-11-03 01:30:00 -08:00 in `America/Los_Angeles`
// We should notice that it should not be regarded as 2024-11-03 01:30:00 -07:00
// because of DST these two times have the same format in time zone `America/Los_Angeles`.
time1, err := time.ParseInLocation(time.DateTime, "2024-11-03 09:30:00", time.UTC)
require.NoError(t, err)

time2, err := time.ParseInLocation(time.DateTime, "2024-11-03 08:30:00", time.UTC)
require.NoError(t, err)

id1, err := timerStore.Create(context.TODO(), &api.TimerRecord{
TimerSpec: api.TimerSpec{
Namespace: "default",
Key: "test1",
SchedPolicyType: api.SchedEventInterval,
SchedPolicyExpr: "1h",
Watermark: time1,
},
EventStatus: api.SchedEventTrigger,
EventStart: time2,
})
require.NoError(t, err)

id2, err := timerStore.Create(context.TODO(), &api.TimerRecord{
TimerSpec: api.TimerSpec{
Namespace: "default",
Key: "test2",
SchedPolicyType: api.SchedEventInterval,
SchedPolicyExpr: "1h",
Watermark: time2,
},
EventStatus: api.SchedEventTrigger,
EventStart: time1,
})
require.NoError(t, err)

// create case
timer1, err := timerStore.GetByID(context.TODO(), id1)
require.NoError(t, err)
require.Equal(t, time1.In(time.UTC).String(), timer1.Watermark.In(time.UTC).String())
require.Equal(t, time2.In(time.UTC).String(), timer1.EventStart.In(time.UTC).String())
checkTimerRecordLocation(t, timer1, defaultTZ)

timer2, err := timerStore.GetByID(context.TODO(), id2)
require.NoError(t, err)
require.Equal(t, time2.In(time.UTC).String(), timer2.Watermark.In(time.UTC).String())
require.Equal(t, time1.In(time.UTC).String(), timer2.EventStart.In(time.UTC).String())
checkTimerRecordLocation(t, timer2, defaultTZ)

// update time
require.NoError(t, timerStore.Update(context.TODO(), id1, &api.TimerUpdate{
Watermark: api.NewOptionalVal(time2),
EventStart: api.NewOptionalVal(time1),
}))

require.NoError(t, timerStore.Update(context.TODO(), id2, &api.TimerUpdate{
Watermark: api.NewOptionalVal(time1),
EventStart: api.NewOptionalVal(time2),
}))

timer1, err = timerStore.GetByID(context.TODO(), id1)
require.NoError(t, err)
require.Equal(t, time2.In(time.UTC).String(), timer1.Watermark.In(time.UTC).String())
require.Equal(t, time1.In(time.UTC).String(), timer1.EventStart.In(time.UTC).String())
checkTimerRecordLocation(t, timer1, defaultTZ)

timer2, err = timerStore.GetByID(context.TODO(), id2)
require.NoError(t, err)
require.Equal(t, time1.In(time.UTC).String(), timer2.Watermark.In(time.UTC).String())
require.Equal(t, time2.In(time.UTC).String(), timer2.EventStart.In(time.UTC).String())
checkTimerRecordLocation(t, timer2, defaultTZ)

// update timezone
require.NoError(t, timerStore.Update(context.TODO(), id1, &api.TimerUpdate{
TimeZone: api.NewOptionalVal("Europe/Berlin"),
}))

timer1, err = timerStore.GetByID(context.TODO(), id1)
require.NoError(t, err)
require.Equal(t, time2.In(time.UTC).String(), timer1.Watermark.In(time.UTC).String())
require.Equal(t, time1.In(time.UTC).String(), timer1.EventStart.In(time.UTC).String())
checkTimerRecordLocation(t, timer1, "Europe/Berlin")

require.NoError(t, timerStore.Update(context.TODO(), id1, &api.TimerUpdate{
TimeZone: api.NewOptionalVal(""),
}))

timer1, err = timerStore.GetByID(context.TODO(), id1)
require.NoError(t, err)
require.Equal(t, time2.In(time.UTC).String(), timer1.Watermark.In(time.UTC).String())
require.Equal(t, time1.In(time.UTC).String(), timer1.EventStart.In(time.UTC).String())
checkTimerRecordLocation(t, timer1, defaultTZ)
}

func checkTimerRecordLocation(t *testing.T, r *api.TimerRecord, tz string) {
require.Equal(t, tz, r.Location.String())
require.Same(t, r.Location, r.Watermark.Location())
require.Same(t, r.Location, r.CreateTime.Location())
if !r.EventStart.IsZero() {
require.Same(t, r.Location, r.EventStart.Location())
}
}
4 changes: 4 additions & 0 deletions pkg/timer/tablestore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,13 @@ go_test(
shard_count = 8,
deps = [
"//pkg/kv",
"//pkg/parser/ast",
"//pkg/parser/model",
"//pkg/parser/mysql",
"//pkg/sessionctx",
"//pkg/sessionctx/variable",
"//pkg/timer/api",
"//pkg/types",
"//pkg/util/sqlexec",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
Expand Down
77 changes: 76 additions & 1 deletion pkg/timer/tablestore/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/timer/api"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -614,9 +618,56 @@ func TestTakeSession(t *testing.T) {
require.EqualError(t, err, "mockErr")
pool.AssertExpectations(t)

// Get returns a session
// init session returns error
se := &mockSession{}
pool.On("Get").Return(se, nil).Once()
se.On("ExecuteInternal", matchCtx, "ROLLBACK", []any(nil)).
Return(nil, errors.New("mockErr")).
Once()
pool.On("Put", se).Once()
r, back, err = core.takeSession()
require.Nil(t, r)
require.Nil(t, back)
require.EqualError(t, err, "mockErr")
pool.AssertExpectations(t)
se.AssertExpectations(t)

// init session returns error2
pool.On("Get").Return(se, nil).Once()
se.On("ExecuteInternal", matchCtx, "ROLLBACK", []any(nil)).
Return(nil, nil).
Once()
se.On("ExecuteInternal", matchCtx, "SELECT @@time_zone", []any(nil)).
Return(nil, errors.New("mockErr2")).
Once()
pool.On("Put", se).Once()
r, back, err = core.takeSession()
require.Nil(t, r)
require.Nil(t, back)
require.EqualError(t, err, "mockErr2")
pool.AssertExpectations(t)
se.AssertExpectations(t)

// Get returns a session
pool.On("Get").Return(se, nil).Once()
rs := &sqlexec.SimpleRecordSet{
ResultFields: []*ast.ResultField{{
Column: &model.ColumnInfo{
FieldType: *types.NewFieldType(mysql.TypeString),
},
}},
MaxChunkSize: 1,
Rows: [][]any{{"tz1"}},
}
se.On("ExecuteInternal", matchCtx, "ROLLBACK", []any(nil)).
Return(nil, nil).
Once()
se.On("ExecuteInternal", matchCtx, "SELECT @@time_zone", []any(nil)).
Return(rs, nil).
Once()
se.On("ExecuteInternal", matchCtx, "SET @@time_zone='UTC'", []any(nil)).
Return(nil, nil).
Once()
r, back, err = core.takeSession()
require.Equal(t, r, se)
require.NotNil(t, back)
Expand All @@ -633,15 +684,39 @@ func TestTakeSession(t *testing.T) {
pool.AssertExpectations(t)
se.AssertExpectations(t)

// Put session failed2
se.On("ExecuteInternal", matchCtx, "ROLLBACK", []any(nil)).
Return(nil, nil).
Once()
se.On("ExecuteInternal", matchCtx, "SET @@time_zone=%?", []any{"tz1"}).
Return(nil, errors.New("mockErr2")).
Once()
se.On("Close").Once()
back()
pool.AssertExpectations(t)
se.AssertExpectations(t)

// Put session success
pool.On("Get").Return(se, nil).Once()
se.On("ExecuteInternal", matchCtx, "ROLLBACK", []any(nil)).
Return(nil, nil).
Once()
se.On("ExecuteInternal", matchCtx, "SELECT @@time_zone", []any(nil)).
Return(rs, nil).
Once()
se.On("ExecuteInternal", matchCtx, "SET @@time_zone='UTC'", []any(nil)).
Return(nil, nil).
Once()
r, back, err = core.takeSession()
require.Equal(t, r, se)
require.NotNil(t, back)
require.Nil(t, err)
se.On("ExecuteInternal", matchCtx, "ROLLBACK", []any(nil)).
Return(nil, nil).
Once()
se.On("ExecuteInternal", matchCtx, "SET @@time_zone=%?", []any{"tz1"}).
Return(nil, nil).
Once()
pool.On("Put", se).Once()
back()
pool.AssertExpectations(t)
Expand Down
Loading