Skip to content

Commit 8445821

Browse files
authored
server: make clientConn() thread-safe (#49073) (#49104)
ref #48224
1 parent 78d203e commit 8445821

File tree

2 files changed

+68
-14
lines changed

2 files changed

+68
-14
lines changed

server/conn.go

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ type clientConn struct {
207207
lastActive time.Time // last active time
208208
authPlugin string // default authentication plugin
209209
isUnixSocket bool // connection is Unix Socket file
210+
closeOnce sync.Once // closeOnce is used to make sure clientConn closes only once
210211
rsEncoder *resultEncoder // rsEncoder is used to encode the string result to different charsets.
211212
inputDecoder *inputDecoder // inputDecoder is used to decode the different charsets of incoming strings to utf-8.
212213
socketCredUID uint32 // UID from the other end of the Unix Socket
@@ -346,21 +347,33 @@ func (cc *clientConn) Close() error {
346347
}
347348

348349
func closeConn(cc *clientConn, connections int) error {
349-
metrics.ConnGauge.Set(float64(connections))
350-
if cc.bufReadConn != nil {
351-
err := cc.bufReadConn.Close()
352-
if err != nil {
353-
// We need to expect connection might have already disconnected.
354-
// This is because closeConn() might be called after a connection read-timeout.
355-
logutil.Logger(context.Background()).Debug("could not close connection", zap.Error(err))
350+
var err error
351+
cc.closeOnce.Do(func() {
352+
metrics.ConnGauge.Set(float64(connections))
353+
354+
if cc.bufReadConn != nil {
355+
err = cc.bufReadConn.Close()
356+
if err != nil {
357+
// We need to expect connection might have already disconnected.
358+
// This is because closeConn() might be called after a connection read-timeout.
359+
logutil.Logger(context.Background()).Debug("could not close connection", zap.Error(err))
360+
}
361+
if cc.bufReadConn != nil {
362+
err = cc.bufReadConn.Close()
363+
if err != nil {
364+
// We need to expect connection might have already disconnected.
365+
// This is because closeConn() might be called after a connection read-timeout.
366+
logutil.Logger(context.Background()).Debug("could not close connection", zap.Error(err))
367+
}
368+
}
369+
// Close statements and session
370+
// This will release advisory locks, row locks, etc.
371+
if ctx := cc.getCtx(); ctx != nil {
372+
err = ctx.Close()
373+
}
356374
}
357-
}
358-
// Close statements and session
359-
// This will release advisory locks, row locks, etc.
360-
if ctx := cc.getCtx(); ctx != nil {
361-
return ctx.Close()
362-
}
363-
return nil
375+
})
376+
return err
364377
}
365378

366379
func (cc *clientConn) closeWithoutLock() error {

server/conn_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"io"
2626
"path/filepath"
2727
"strings"
28+
"sync"
2829
"sync/atomic"
2930
"testing"
3031
"time"
@@ -1830,3 +1831,43 @@ func TestProcessInfoForExecuteCommand(t *testing.T) {
18301831
0x0A, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}))
18311832
require.Equal(t, cc.ctx.Session.ShowProcess().Info, "select sum(col1) from t where col1 < ? and col1 > 100")
18321833
}
1834+
func TestCloseConn(t *testing.T) {
1835+
var outBuffer bytes.Buffer
1836+
1837+
store, _ := testkit.CreateMockStoreAndDomain(t)
1838+
cfg := newTestConfig()
1839+
cfg.Port = 0
1840+
cfg.Status.StatusPort = 0
1841+
drv := NewTiDBDriver(store)
1842+
server, err := NewServer(cfg, drv)
1843+
require.NoError(t, err)
1844+
1845+
cc := &clientConn{
1846+
connectionID: 0,
1847+
salt: []byte{
1848+
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A,
1849+
0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, 0x11, 0x12, 0x13, 0x14,
1850+
},
1851+
server: server,
1852+
pkt: &packetIO{
1853+
bufWriter: bufio.NewWriter(&outBuffer),
1854+
},
1855+
collation: mysql.DefaultCollationID,
1856+
peerHost: "localhost",
1857+
alloc: arena.NewAllocator(512),
1858+
chunkAlloc: chunk.NewAllocator(),
1859+
capability: mysql.ClientProtocol41,
1860+
}
1861+
1862+
var wg sync.WaitGroup
1863+
const numGoroutines = 10
1864+
wg.Add(numGoroutines)
1865+
for i := 0; i < numGoroutines; i++ {
1866+
go func() {
1867+
defer wg.Done()
1868+
err := closeConn(cc, 1)
1869+
require.NoError(t, err)
1870+
}()
1871+
}
1872+
wg.Wait()
1873+
}

0 commit comments

Comments
 (0)