Skip to content

Commit 63d07a9

Browse files
wk989898ti-chi-bot
authored andcommitted
This is an automated cherry-pick of #12245
Signed-off-by: ti-chi-bot <[email protected]>
1 parent 71c4e3d commit 63d07a9

File tree

3 files changed

+59
-1
lines changed

3 files changed

+59
-1
lines changed

cdc/owner/changefeed.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,11 +513,18 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) {
513513

514514
c.cancel()
515515
c.cancel = func() {}
516+
// ddlPuller might still be referenced in initialize.
517+
// we have to wait it done
518+
c.wg.Wait()
516519

517520
if c.ddlPuller != nil {
518521
c.ddlPuller.Close()
522+
c.ddlPuller = nil
519523
}
524+
<<<<<<< HEAD
520525
c.ddlWg.Wait()
526+
=======
527+
>>>>>>> b949fa6674 (chann(ticdc): fix a panic that send on closed channel (#12245))
521528

522529
if c.sink != nil {
523530
canceledCtx, cancel := context.WithCancel(context.Background())

cdc/owner/changefeed_test.go

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"os"
2222
"path/filepath"
2323
"sync"
24+
"sync/atomic"
2425
"testing"
2526
"time"
2627

@@ -42,6 +43,7 @@ import (
4243

4344
type mockDDLPuller struct {
4445
// DDLPuller
46+
<<<<<<< HEAD
4547
resolvedTs model.Ts
4648
ddlQueue []*timodel.Job
4749
}
@@ -51,6 +53,12 @@ func (m *mockDDLPuller) FrontDDL() (uint64, *timodel.Job) {
5153
return m.ddlQueue[0].BinlogInfo.FinishedTS, m.ddlQueue[0]
5254
}
5355
return m.resolvedTs, nil
56+
=======
57+
resolvedTs model.Ts
58+
ddlQueue []*timodel.Job
59+
schemaStorage entry.SchemaStorage
60+
closed int64
61+
>>>>>>> b949fa6674 (chann(ticdc): fix a panic that send on closed channel (#12245))
5462
}
5563

5664
func (m *mockDDLPuller) PopFrontDDL() (uint64, *timodel.Job) {
@@ -62,7 +70,11 @@ func (m *mockDDLPuller) PopFrontDDL() (uint64, *timodel.Job) {
6270
return m.resolvedTs, nil
6371
}
6472

65-
func (m *mockDDLPuller) Close() {}
73+
func (m *mockDDLPuller) Close() {
74+
if !atomic.CompareAndSwapInt64(&m.closed, 0, 1) {
75+
panic("close twice!")
76+
}
77+
}
6678

6779
func (m *mockDDLPuller) Run(ctx cdcContext.Context) error {
6880
<-ctx.Done()
@@ -525,7 +537,13 @@ func testChangefeedReleaseResource(
525537
redoLogDir string,
526538
expectedInitialized bool,
527539
) {
540+
<<<<<<< HEAD
528541
cf, state, captures, tester := createChangefeed4Test(ctx, t)
542+
=======
543+
var err error
544+
cf, captures, tester, state := createChangefeed4Test(globalVars, changefeedInfo, newMockDDLSink, t)
545+
defer cf.Close(ctx)
546+
>>>>>>> b949fa6674 (chann(ticdc): fix a panic that send on closed channel (#12245))
529547

530548
// pre check
531549
cf.Tick(ctx, state, captures)
@@ -1050,3 +1068,25 @@ func TestBarrierAdvance(t *testing.T) {
10501068
require.Equal(t, mockDDLPuller.resolvedTs, barrier)
10511069
}
10521070
}
1071+
1072+
func TestReleaseResourcesTwice(t *testing.T) {
1073+
globalVars, changefeedInfo := vars.NewGlobalVarsAndChangefeedInfo4Test()
1074+
ctx := context.Background()
1075+
cf, captures, tester, state := createChangefeed4Test(globalVars, changefeedInfo, newMockDDLSink, t)
1076+
defer cf.Close(ctx)
1077+
1078+
// pre check
1079+
state.CheckCaptureAlive(globalVars.CaptureInfo.ID)
1080+
require.False(t, preflightCheck(state, captures))
1081+
tester.MustApplyPatches()
1082+
1083+
// initialize
1084+
cf.Tick(ctx, state.Info, state.Status, captures)
1085+
tester.MustApplyPatches()
1086+
require.Equal(t, cf.initialized.Load(), true)
1087+
1088+
// close twice
1089+
cf.releaseResources(ctx)
1090+
cf.isReleased = false
1091+
cf.releaseResources(ctx)
1092+
}

cdc/processor/processor.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -905,6 +905,7 @@ func (p *processor) Close() error {
905905
// when error occurs during closing the processor
906906
p.cleanupMetrics()
907907

908+
<<<<<<< HEAD
908909
for _, tbl := range p.tables {
909910
tbl.Cancel()
910911
}
@@ -914,6 +915,16 @@ func (p *processor) Close() error {
914915
p.cancel()
915916
p.wg.Wait()
916917
p.upStream.Release()
918+
=======
919+
p.sinkManager.stop()
920+
p.sinkManager.r = nil
921+
p.sourceManager.stop()
922+
p.sourceManager.r = nil
923+
p.redo.stop()
924+
p.mg.stop()
925+
p.ddlHandler.stop()
926+
p.ddlHandler.r = nil
927+
>>>>>>> b949fa6674 (chann(ticdc): fix a panic that send on closed channel (#12245))
917928

918929
if p.agent == nil {
919930
return nil

0 commit comments

Comments
 (0)