Skip to content

Commit 1d10025

Browse files
authored
fix(store): fix store when pd enable ready api (#6357) (#6358)
1 parent 720c775 commit 1d10025

File tree

16 files changed

+272
-29
lines changed

16 files changed

+272
-29
lines changed

pkg/controllers/scheduler/controller.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ func Setup(mgr manager.Manager, c client.Client, pdcm pdm.PDClientManager, vm vo
5252
Owns(&corev1.Pod{}).
5353
Owns(&corev1.ConfigMap{}).
5454
Owns(&corev1.PersistentVolumeClaim{}).
55+
Watches(&v1alpha1.Cluster{}, r.ClusterEventHandler()).
5556
// WatchesRawSource(pdcm.Source(&pdv1.Member{}, r.MemberEventHandler())).
5657
WithOptions(controller.Options{RateLimiter: k8s.RateLimiter}).
5758
Complete(r)
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
// Copyright 2024 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package scheduler
16+
17+
import (
18+
"context"
19+
"reflect"
20+
21+
"k8s.io/apimachinery/pkg/types"
22+
"k8s.io/client-go/util/workqueue"
23+
"sigs.k8s.io/controller-runtime/pkg/event"
24+
"sigs.k8s.io/controller-runtime/pkg/handler"
25+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
26+
27+
"github.com/pingcap/tidb-operator/api/v2/core/v1alpha1"
28+
"github.com/pingcap/tidb-operator/pkg/client"
29+
)
30+
31+
func (r *Reconciler) ClusterEventHandler() handler.TypedEventHandler[client.Object, reconcile.Request] {
32+
return handler.TypedFuncs[client.Object, reconcile.Request]{
33+
UpdateFunc: func(ctx context.Context, event event.TypedUpdateEvent[client.Object],
34+
queue workqueue.TypedRateLimitingInterface[reconcile.Request],
35+
) {
36+
oldObj := event.ObjectOld.(*v1alpha1.Cluster)
37+
newObj := event.ObjectNew.(*v1alpha1.Cluster)
38+
39+
if newObj.Status.PD != oldObj.Status.PD {
40+
r.Logger.Info("pd url is updating", "from", oldObj.Status.PD, "to", newObj.Status.PD)
41+
} else if !reflect.DeepEqual(oldObj.Spec.SuspendAction, newObj.Spec.SuspendAction) {
42+
r.Logger.Info("suspend action is updating", "from", oldObj.Spec.SuspendAction, "to", newObj.Spec.SuspendAction)
43+
} else if oldObj.Spec.Paused != newObj.Spec.Paused {
44+
r.Logger.Info("cluster paused is updating", "from", oldObj.Spec.Paused, "to", newObj.Spec.Paused)
45+
} else {
46+
return
47+
}
48+
49+
var sl v1alpha1.SchedulerList
50+
if err := r.Client.List(ctx, &sl, client.MatchingLabels{
51+
v1alpha1.LabelKeyManagedBy: v1alpha1.LabelValManagedByOperator,
52+
v1alpha1.LabelKeyCluster: newObj.Name,
53+
v1alpha1.LabelKeyComponent: v1alpha1.LabelValComponentScheduler,
54+
}, client.InNamespace(newObj.Namespace)); err != nil {
55+
r.Logger.Error(err, "cannot list all scheduler instances", "ns", newObj.Namespace, "cluster", newObj.Name)
56+
return
57+
}
58+
59+
for i := range sl.Items {
60+
scheduler := &sl.Items[i]
61+
queue.Add(reconcile.Request{
62+
NamespacedName: types.NamespacedName{
63+
Name: scheduler.Name,
64+
Namespace: scheduler.Namespace,
65+
},
66+
})
67+
}
68+
},
69+
DeleteFunc: func(ctx context.Context, event event.TypedDeleteEvent[client.Object],
70+
queue workqueue.TypedRateLimitingInterface[reconcile.Request],
71+
) {
72+
obj := event.Object.(*v1alpha1.Cluster)
73+
74+
r.Logger.Info("enqueue all scheduler instances because of the cluster is deleting", "ns", obj.Namespace, "cluster", obj.Name)
75+
76+
var sl v1alpha1.SchedulerList
77+
if err := r.Client.List(ctx, &sl, client.MatchingLabels{
78+
v1alpha1.LabelKeyManagedBy: v1alpha1.LabelValManagedByOperator,
79+
v1alpha1.LabelKeyCluster: obj.Name,
80+
v1alpha1.LabelKeyComponent: v1alpha1.LabelValComponentScheduler,
81+
}, client.InNamespace(obj.Namespace)); err != nil {
82+
r.Logger.Error(err, "cannot list all scheduler instances", "ns", obj.Namespace, "cluster", obj.Name)
83+
return
84+
}
85+
86+
for i := range sl.Items {
87+
scheduler := &sl.Items[i]
88+
queue.Add(reconcile.Request{
89+
NamespacedName: types.NamespacedName{
90+
Name: scheduler.Name,
91+
Namespace: scheduler.Namespace,
92+
},
93+
})
94+
}
95+
},
96+
}
97+
}

pkg/controllers/tiflash/builder.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,20 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
3636
// check whether it's paused
3737
task.IfBreak(common.CondClusterIsPaused(state)),
3838

39-
// get info from pd
40-
task.IfNot(common.CondClusterIsDeleting(state),
41-
tasks.TaskContextInfoFromPD(state, r.PDClientManager),
39+
// if the cluster is deleting, del all subresources and remove the finalizer directly
40+
task.IfBreak(common.CondClusterIsDeleting(state),
41+
tasks.TaskFinalizerDel(state, r.Client),
4242
),
4343

44-
task.IfBreak(canDeleteAllResources(state), tasks.TaskFinalizerDel(state, r.Client)),
45-
task.If(common.CondObjectIsDeleting[scope.TiFlash](state),
46-
tasks.TaskOfflineStore(state),
44+
// get info from pd
45+
tasks.TaskContextInfoFromPD(state, r.PDClientManager),
46+
47+
// if instance is deleting and store is removed
48+
task.IfBreak(ObjectIsDeletingAndStoreIsRemoved(state),
49+
tasks.TaskFinalizerDel(state, r.Client),
4750
),
4851

52+
tasks.TaskOfflineStore(state),
4953
common.TaskFinalizerAdd[scope.TiFlash](state, r.Client),
5054
// get pod and check whether the cluster is suspending
5155
common.TaskContextPod[scope.TiFlash](state, r.Client),
@@ -72,10 +76,9 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
7276
return runner
7377
}
7478

75-
// canDeleteAllResources checks if the resources can be deleted.
76-
func canDeleteAllResources(state *tasks.ReconcileContext) task.Condition {
79+
func ObjectIsDeletingAndStoreIsRemoved(state *tasks.ReconcileContext) task.Condition {
7780
return task.CondFunc(func() bool {
78-
return !state.Cluster().GetDeletionTimestamp().IsZero() ||
79-
(!state.Object().GetDeletionTimestamp().IsZero() && (state.GetStoreState() == v1alpha1.StoreStateRemoved || state.Store == nil))
81+
return !state.Object().GetDeletionTimestamp().IsZero() && state.PDSynced &&
82+
(state.GetStoreState() == v1alpha1.StoreStateRemoved || state.Store == nil)
8083
})
8184
}

pkg/controllers/tiflash/handler.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,33 @@ func (r *Reconciler) ClusterEventHandler() handler.TypedEventHandler[client.Obje
7272
})
7373
}
7474
},
75+
DeleteFunc: func(ctx context.Context, event event.TypedDeleteEvent[client.Object],
76+
queue workqueue.TypedRateLimitingInterface[reconcile.Request],
77+
) {
78+
obj := event.Object.(*v1alpha1.Cluster)
79+
80+
r.Logger.Info("enqueue all tiflash instances because of the cluster is deleting", "ns", obj.Namespace, "cluster", obj.Name)
81+
82+
var fl v1alpha1.TiFlashList
83+
if err := r.Client.List(ctx, &fl, client.MatchingLabels{
84+
v1alpha1.LabelKeyManagedBy: v1alpha1.LabelValManagedByOperator,
85+
v1alpha1.LabelKeyCluster: obj.Name,
86+
v1alpha1.LabelKeyComponent: v1alpha1.LabelValComponentTiFlash,
87+
}, client.InNamespace(obj.Namespace)); err != nil {
88+
r.Logger.Error(err, "cannot list all tiflash instances", "ns", obj.Namespace, "cluster", obj.Name)
89+
return
90+
}
91+
92+
for i := range fl.Items {
93+
f := &fl.Items[i]
94+
queue.Add(reconcile.Request{
95+
NamespacedName: types.NamespacedName{
96+
Name: f.Name,
97+
Namespace: f.Namespace,
98+
},
99+
})
100+
}
101+
},
75102
}
76103
}
77104

pkg/controllers/tiflash/tasks/ctx.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ type ReconcileContext struct {
3434

3535
Store *pdv1.Store
3636
StoreLabels []*metapb.StoreLabel
37+
38+
PDSynced bool
3739
}
3840

3941
func TaskContextInfoFromPD(state *ReconcileContext, cm pdm.PDClientManager) task.Task {
@@ -49,6 +51,8 @@ func TaskContextInfoFromPD(state *ReconcileContext, cm pdm.PDClientManager) task
4951
return task.Fail().With("store info is not synced, just wait for next sync")
5052
}
5153

54+
state.PDSynced = true
55+
5256
s, err := c.Stores().Get(tiflashconfig.GetServiceAddr(state.TiFlash()))
5357
if err != nil {
5458
if !errors.IsNotFound(err) {

pkg/controllers/tiflash/tasks/offline.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,16 @@ const (
2929

3030
func TaskOfflineStore(state *ReconcileContext) task.Task {
3131
return task.NameTaskFunc("OfflineStore", func(ctx context.Context) task.Result {
32-
if state.Store == nil {
33-
return task.Complete().With("store is not exists, no need to offline")
32+
if !state.PDSynced {
33+
return task.Wait().With("pd is not synced")
3434
}
35-
36-
if state.PDClient == nil {
37-
return task.Fail().With("pd client is not registered")
35+
if state.Object().GetDeletionTimestamp().IsZero() {
36+
return task.Complete().With("tikv is not deleting, no need to offline the store")
37+
}
38+
if !state.IsStoreUp() {
39+
return task.Wait().With("store has been %s, no need to offline it", state.GetStoreState())
3840
}
41+
3942
if err := state.PDClient.Underlay().DeleteStore(ctx, state.Store.ID); err != nil {
4043
return task.Fail().With("cannot delete store %s: %w", state.Store.ID, err)
4144
}

pkg/controllers/tiflash/tasks/store_labels.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030
func TaskStoreLabels(state *ReconcileContext, c client.Client) task.Task {
3131
return task.NameTaskFunc("StoreLabels", func(ctx context.Context) task.Result {
3232
logger := logr.FromContextOrDiscard(ctx)
33-
if !state.IsStoreUp() || state.IsPodTerminating() || state.Pod() == nil {
33+
if !state.PDSynced || !state.IsStoreUp() || state.IsPodTerminating() || state.Pod() == nil {
3434
return task.Complete().With("skip sync store labels as the store is not serving")
3535
}
3636

pkg/controllers/tikv/builder.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
5050
tasks.TaskEndEvictLeader(state),
5151
tasks.TaskFinalizerDel(state, r.Client),
5252
),
53-
// if instance is deleting and store is not removed
54-
task.If(common.CondObjectIsDeleting[scope.TiKV](state),
55-
tasks.TaskOfflineStore(state),
56-
),
5753

54+
tasks.TaskOfflineStore(state),
5855
common.TaskFinalizerAdd[scope.TiKV](state, r.Client),
5956
// get pod and check whether the cluster is suspending
6057
common.TaskContextPod[scope.TiKV](state, r.Client),
@@ -87,7 +84,7 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
8784

8885
func ObjectIsDeletingAndStoreIsRemoved(state *tasks.ReconcileContext) task.Condition {
8986
return task.CondFunc(func() bool {
90-
return !state.Object().GetDeletionTimestamp().IsZero() &&
87+
return !state.Object().GetDeletionTimestamp().IsZero() && state.PDSynced &&
9188
(state.GetStoreState() == v1alpha1.StoreStateRemoved || state.Store == nil)
9289
})
9390
}

pkg/controllers/tikv/handler.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,33 @@ func (r *Reconciler) ClusterEventHandler() handler.TypedEventHandler[client.Obje
6262
return
6363
}
6464

65+
for i := range kvl.Items {
66+
tikv := &kvl.Items[i]
67+
queue.Add(reconcile.Request{
68+
NamespacedName: types.NamespacedName{
69+
Name: tikv.Name,
70+
Namespace: tikv.Namespace,
71+
},
72+
})
73+
}
74+
},
75+
DeleteFunc: func(ctx context.Context, event event.TypedDeleteEvent[client.Object],
76+
queue workqueue.TypedRateLimitingInterface[reconcile.Request],
77+
) {
78+
obj := event.Object.(*v1alpha1.Cluster)
79+
80+
r.Logger.Info("enqueue all tikvs because of the cluster is deleting", "ns", obj.Namespace, "cluster", obj.Name)
81+
82+
var kvl v1alpha1.TiKVList
83+
if err := r.Client.List(ctx, &kvl, client.MatchingLabels{
84+
v1alpha1.LabelKeyManagedBy: v1alpha1.LabelValManagedByOperator,
85+
v1alpha1.LabelKeyCluster: obj.Name,
86+
v1alpha1.LabelKeyComponent: v1alpha1.LabelValComponentTiKV,
87+
}, client.InNamespace(obj.Namespace)); err != nil {
88+
r.Logger.Error(err, "cannot list all tikv instances", "ns", obj.Namespace, "cluster", obj.Name)
89+
return
90+
}
91+
6592
for i := range kvl.Items {
6693
tikv := &kvl.Items[i]
6794
queue.Add(reconcile.Request{

pkg/controllers/tikv/tasks/ctx.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,26 +33,29 @@ type ReconcileContext struct {
3333

3434
LeaderEvicting bool
3535

36-
Store *pdv1.Store
36+
Store *pdv1.Store
37+
PDSynced bool
3738
}
3839

3940
func TaskContextInfoFromPD(state *ReconcileContext, cm pdm.PDClientManager) task.Task {
4041
return task.NameTaskFunc("ContextInfoFromPD", func(ctx context.Context) task.Result {
4142
ck := state.Cluster()
4243
c, ok := cm.Get(timanager.PrimaryKey(ck.Namespace, ck.Name))
4344
if !ok {
44-
return task.Fail().With("pd client is not registered")
45+
return task.Wait().With("pd client is not registered")
4546
}
4647
state.PDClient = c
4748

4849
if !c.HasSynced() {
49-
return task.Fail().With("store info is not synced, just wait for next sync")
50+
return task.Wait().With("store info is not synced, just wait for next sync")
5051
}
5152

53+
state.PDSynced = true
54+
5255
s, err := c.Stores().Get(coreutil.TiKVAdvertiseClientURLs(state.TiKV()))
5356
if err != nil {
5457
if !errors.IsNotFound(err) {
55-
return task.Fail().With("failed to get store info: %w", err)
58+
return task.Fail().With("failed to get store info: %v", err)
5659
}
5760
return task.Complete().With("store does not exist")
5861
}

0 commit comments

Comments
 (0)