diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 8908a67fb34..7ac167533ee 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -513,11 +513,18 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) { c.cancel() c.cancel = func() {} + // ddlPuller might still be referenced in initialize. + // we have to wait it done + c.wg.Wait() if c.ddlPuller != nil { c.ddlPuller.Close() + c.ddlPuller = nil } +<<<<<<< HEAD c.ddlWg.Wait() +======= +>>>>>>> b949fa6674 (chann(ticdc): fix a panic that send on closed channel (#12245)) if c.sink != nil { canceledCtx, cancel := context.WithCancel(context.Background()) diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index ee36526e33b..3a4134b7099 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -21,6 +21,7 @@ import ( "os" "path/filepath" "sync" + "sync/atomic" "testing" "time" @@ -42,6 +43,7 @@ import ( type mockDDLPuller struct { // DDLPuller +<<<<<<< HEAD resolvedTs model.Ts ddlQueue []*timodel.Job } @@ -51,6 +53,12 @@ func (m *mockDDLPuller) FrontDDL() (uint64, *timodel.Job) { return m.ddlQueue[0].BinlogInfo.FinishedTS, m.ddlQueue[0] } return m.resolvedTs, nil +======= + resolvedTs model.Ts + ddlQueue []*timodel.Job + schemaStorage entry.SchemaStorage + closed int64 +>>>>>>> b949fa6674 (chann(ticdc): fix a panic that send on closed channel (#12245)) } func (m *mockDDLPuller) PopFrontDDL() (uint64, *timodel.Job) { @@ -62,7 +70,11 @@ func (m *mockDDLPuller) PopFrontDDL() (uint64, *timodel.Job) { return m.resolvedTs, nil } -func (m *mockDDLPuller) Close() {} +func (m *mockDDLPuller) Close() { + if !atomic.CompareAndSwapInt64(&m.closed, 0, 1) { + panic("close twice!") + } +} func (m *mockDDLPuller) Run(ctx cdcContext.Context) error { <-ctx.Done() @@ -525,7 +537,13 @@ func testChangefeedReleaseResource( redoLogDir string, expectedInitialized bool, ) { +<<<<<<< HEAD cf, state, captures, tester := createChangefeed4Test(ctx, t) +======= + var err error + cf, captures, tester, state := createChangefeed4Test(globalVars, changefeedInfo, newMockDDLSink, t) + defer cf.Close(ctx) +>>>>>>> b949fa6674 (chann(ticdc): fix a panic that send on closed channel (#12245)) // pre check cf.Tick(ctx, state, captures) @@ -1050,3 +1068,25 @@ func TestBarrierAdvance(t *testing.T) { require.Equal(t, mockDDLPuller.resolvedTs, barrier) } } + +func TestReleaseResourcesTwice(t *testing.T) { + globalVars, changefeedInfo := vars.NewGlobalVarsAndChangefeedInfo4Test() + ctx := context.Background() + cf, captures, tester, state := createChangefeed4Test(globalVars, changefeedInfo, newMockDDLSink, t) + defer cf.Close(ctx) + + // pre check + state.CheckCaptureAlive(globalVars.CaptureInfo.ID) + require.False(t, preflightCheck(state, captures)) + tester.MustApplyPatches() + + // initialize + cf.Tick(ctx, state.Info, state.Status, captures) + tester.MustApplyPatches() + require.Equal(t, cf.initialized.Load(), true) + + // close twice + cf.releaseResources(ctx) + cf.isReleased = false + cf.releaseResources(ctx) +} diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 25c5b9d2420..5c952653ea4 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -905,6 +905,7 @@ func (p *processor) Close() error { // when error occurs during closing the processor p.cleanupMetrics() +<<<<<<< HEAD for _, tbl := range p.tables { tbl.Cancel() } @@ -914,6 +915,16 @@ func (p *processor) Close() error { p.cancel() p.wg.Wait() p.upStream.Release() +======= + p.sinkManager.stop() + p.sinkManager.r = nil + p.sourceManager.stop() + p.sourceManager.r = nil + p.redo.stop() + p.mg.stop() + p.ddlHandler.stop() + p.ddlHandler.r = nil +>>>>>>> b949fa6674 (chann(ticdc): fix a panic that send on closed channel (#12245)) if p.agent == nil { return nil