Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,11 +513,18 @@

c.cancel()
c.cancel = func() {}
// ddlPuller might still be referenced in initialize.
// we have to wait it done
c.wg.Wait()
Comment on lines +516 to +518

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Consider adding a comment to explain why c.wg.Wait() is needed here. It's not immediately obvious why the ddlPuller might still be referenced in initialize.

// ddlPuller might still be referenced in initialize.
// We have to wait for it to finish before proceeding.
c.wg.Wait()


if c.ddlPuller != nil {
c.ddlPuller.Close()
c.ddlPuller = nil
}
<<<<<<< HEAD

Check failure on line 524 in cdc/owner/changefeed.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected <<, expecting }
c.ddlWg.Wait()
=======
>>>>>>> b949fa6674 (chann(ticdc): fix a panic that send on closed channel (#12245))

Check failure on line 527 in cdc/owner/changefeed.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

invalid character U+0023 '#'
Comment on lines +524 to +527

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This file contains unresolved merge conflict markers. Please resolve them before this PR can be merged. It seems like c.ddlWg.Wait() was part of the changes but got caught in a conflict.


if c.sink != nil {
canceledCtx, cancel := context.WithCancel(context.Background())
Expand Down
42 changes: 41 additions & 1 deletion cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"os"
"path/filepath"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -42,6 +43,7 @@ import (

type mockDDLPuller struct {
// DDLPuller
<<<<<<< HEAD
resolvedTs model.Ts
ddlQueue []*timodel.Job
}
Expand All @@ -51,6 +53,12 @@ func (m *mockDDLPuller) FrontDDL() (uint64, *timodel.Job) {
return m.ddlQueue[0].BinlogInfo.FinishedTS, m.ddlQueue[0]
}
return m.resolvedTs, nil
=======
resolvedTs model.Ts
ddlQueue []*timodel.Job
schemaStorage entry.SchemaStorage
closed int64
>>>>>>> b949fa6674 (chann(ticdc): fix a panic that send on closed channel (#12245))
}

func (m *mockDDLPuller) PopFrontDDL() (uint64, *timodel.Job) {
Expand All @@ -62,7 +70,11 @@ func (m *mockDDLPuller) PopFrontDDL() (uint64, *timodel.Job) {
return m.resolvedTs, nil
}

func (m *mockDDLPuller) Close() {}
func (m *mockDDLPuller) Close() {
if !atomic.CompareAndSwapInt64(&m.closed, 0, 1) {
panic("close twice!")
}
Comment on lines +74 to +76

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Consider using atomic.StoreInt64 instead of panic to handle the double close scenario. This provides a more graceful way to handle the error without abruptly terminating the program. Also, add a log to indicate the double close.

if atomic.CompareAndSwapInt64(&m.closed, 0, 1) {
		log.Warn("ddl puller close twice")
		return
	}
	panic("close twice!")

}

func (m *mockDDLPuller) Run(ctx cdcContext.Context) error {
<-ctx.Done()
Expand Down Expand Up @@ -525,7 +537,13 @@ func testChangefeedReleaseResource(
redoLogDir string,
expectedInitialized bool,
) {
<<<<<<< HEAD
cf, state, captures, tester := createChangefeed4Test(ctx, t)
=======
var err error
cf, captures, tester, state := createChangefeed4Test(globalVars, changefeedInfo, newMockDDLSink, t)
defer cf.Close(ctx)
>>>>>>> b949fa6674 (chann(ticdc): fix a panic that send on closed channel (#12245))
Comment on lines +540 to +546

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There's another unresolved merge conflict here in testChangefeedReleaseResource. Please resolve this as well.


// pre check
cf.Tick(ctx, state, captures)
Expand Down Expand Up @@ -1050,3 +1068,25 @@ func TestBarrierAdvance(t *testing.T) {
require.Equal(t, mockDDLPuller.resolvedTs, barrier)
}
}

func TestReleaseResourcesTwice(t *testing.T) {
globalVars, changefeedInfo := vars.NewGlobalVarsAndChangefeedInfo4Test()
ctx := context.Background()
cf, captures, tester, state := createChangefeed4Test(globalVars, changefeedInfo, newMockDDLSink, t)
defer cf.Close(ctx)

// pre check
state.CheckCaptureAlive(globalVars.CaptureInfo.ID)
require.False(t, preflightCheck(state, captures))
tester.MustApplyPatches()

// initialize
cf.Tick(ctx, state.Info, state.Status, captures)
tester.MustApplyPatches()
require.Equal(t, cf.initialized.Load(), true)

// close twice
cf.releaseResources(ctx)
cf.isReleased = false
cf.releaseResources(ctx)
}
11 changes: 11 additions & 0 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,7 @@
// when error occurs during closing the processor
p.cleanupMetrics()

<<<<<<< HEAD

Check failure on line 908 in cdc/processor/processor.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected <<, expecting }
for _, tbl := range p.tables {
tbl.Cancel()
}
Expand All @@ -914,6 +915,16 @@
p.cancel()
p.wg.Wait()
p.upStream.Release()
=======
p.sinkManager.stop()
p.sinkManager.r = nil
p.sourceManager.stop()
p.sourceManager.r = nil
p.redo.stop()
p.mg.stop()
p.ddlHandler.stop()
p.ddlHandler.r = nil
>>>>>>> b949fa6674 (chann(ticdc): fix a panic that send on closed channel (#12245))

Check failure on line 927 in cdc/processor/processor.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

invalid character U+0023 '#'
Comment on lines +918 to +927

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This file contains unresolved merge conflict markers in the Close method. The two branches of the conflict seem to have very different logic for closing resources. Please resolve this conflict carefully to ensure all necessary cleanup is performed correctly.


if p.agent == nil {
return nil
Expand Down
Loading