8
8
"net/http"
9
9
"net/url"
10
10
"strings"
11
+ "sync"
11
12
"time"
12
13
13
14
"github.com/pingcap/errors"
@@ -33,7 +34,11 @@ type Backer struct {
33
34
addrs []string
34
35
cli * http.Client
35
36
}
36
- tikvCli tikv.Storage
37
+ tikvCli tikv.Storage
38
+ backupClis struct {
39
+ mu sync.Mutex
40
+ clis map [uint64 ]* grpc.ClientConn
41
+ }
37
42
}
38
43
39
44
// NewBacker creates a new Backer.
@@ -51,18 +56,15 @@ func NewBacker(ctx context.Context, pdAddrs string) (*Backer, error) {
51
56
return nil , errors .Trace (err )
52
57
}
53
58
54
- return & Backer {
59
+ backer := & Backer {
55
60
ctx : ctx ,
56
61
pdClient : pdClient ,
57
- pdHTTP : struct {
58
- addrs []string
59
- cli * http.Client
60
- }{
61
- addrs : addrs ,
62
- cli : & http.Client {Timeout : 30 * time .Second },
63
- },
64
- tikvCli : tikvCli .(tikv.Storage ),
65
- }, nil
62
+ tikvCli : tikvCli .(tikv.Storage ),
63
+ }
64
+ backer .pdHTTP .addrs = addrs
65
+ backer .pdHTTP .cli = & http.Client {Timeout : 30 * time .Second }
66
+ backer .backupClis .clis = make (map [uint64 ]* grpc.ClientConn )
67
+ return backer , nil
66
68
}
67
69
68
70
// GetClusterVersion returns the current cluster version.
@@ -129,8 +131,16 @@ func (backer *Backer) Context() context.Context {
129
131
return backer .ctx
130
132
}
131
133
132
- // NewBackupClient creates a new backup client.
133
- func (backer * Backer ) NewBackupClient (storeID uint64 ) (backup.BackupClient , error ) {
134
+ // GetBackupClient get or create a backup client.
135
+ func (backer * Backer ) GetBackupClient (storeID uint64 ) (backup.BackupClient , error ) {
136
+ backer .backupClis .mu .Lock ()
137
+ defer backer .backupClis .mu .Unlock ()
138
+
139
+ if conn , ok := backer .backupClis .clis [storeID ]; ok {
140
+ // Find a cached backup client.
141
+ return backup .NewBackupClient (conn ), nil
142
+ }
143
+
134
144
store , err := backer .pdClient .GetStore (backer .ctx , storeID )
135
145
if err != nil {
136
146
return nil , errors .Trace (err )
@@ -154,10 +164,27 @@ func (backer *Backer) NewBackupClient(storeID uint64) (backup.BackupClient, erro
154
164
if err != nil {
155
165
return nil , errors .WithStack (err )
156
166
}
167
+ // Cache the conn.
168
+ backer .backupClis .clis [storeID ] = conn
169
+
157
170
client := backup .NewBackupClient (conn )
158
171
return client , nil
159
172
}
160
173
174
+ // ResetBackupClient reset and close cached backup client.
175
+ func (backer * Backer ) ResetBackupClient (storeID uint64 ) error {
176
+ backer .backupClis .mu .Lock ()
177
+ defer backer .backupClis .mu .Unlock ()
178
+
179
+ if conn , ok := backer .backupClis .clis [storeID ]; ok {
180
+ delete (backer .backupClis .clis , storeID )
181
+ if err := conn .Close (); err != nil {
182
+ return errors .Trace (err )
183
+ }
184
+ }
185
+ return nil
186
+ }
187
+
161
188
// SendBackup send backup request to the given store.
162
189
// Stop receiving response if respFn returns error.
163
190
func (backer * Backer ) SendBackup (
@@ -167,7 +194,7 @@ func (backer *Backer) SendBackup(
167
194
respFn func (* backup.BackupResponse ) error ,
168
195
) error {
169
196
log .Info ("try backup" , zap .Any ("backup request" , req ))
170
- client , err := backer .NewBackupClient (storeID )
197
+ client , err := backer .GetBackupClient (storeID )
171
198
if err != nil {
172
199
log .Warn ("fail to connect store" , zap .Uint64 ("StoreID" , storeID ))
173
200
return nil
@@ -177,6 +204,11 @@ func (backer *Backer) SendBackup(
177
204
bcli , err := client .Backup (ctx , & req )
178
205
if err != nil {
179
206
log .Warn ("fail to create backup" , zap .Uint64 ("StoreID" , storeID ))
207
+ if err1 := backer .ResetBackupClient (storeID ); err1 != nil {
208
+ log .Warn ("fail to reset backup client" ,
209
+ zap .Uint64 ("StoreID" , storeID ),
210
+ zap .Error (err1 ))
211
+ }
180
212
return nil
181
213
}
182
214
for {
0 commit comments