Skip to content

Commit d21b794

Browse files
k8s: Let controller-runtime manage the pod informer
- Use a pod informer from controller-runtime. - Add a ControllerManager Implementation of the PodAccessor interface. - Add integration tests for PodAccessor methods. Signed-off-by: Michi Mutsuzaki <[email protected]>
1 parent 87e0f9f commit d21b794

File tree

7 files changed

+211
-54
lines changed

7 files changed

+211
-54
lines changed

.github/workflows/integration-test.yaml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,9 @@ jobs:
4141
# renovate: datasource=golang-version depName=go
4242
go-version: '1.24.2'
4343

44-
- name: Install Kind and create cluster
45-
uses: helm/kind-action@a1b0e391336a6ee6713a0583f8c6240d70863de3 # v1.12.0
44+
- name: Create a Kind cluster
45+
run: |
46+
make kind
4647
4748
- name: Pull Tetragon Images
4849
uses: nick-fields/retry@7152eba30c6575329ac0576536151aca5a72780e # v3.0.0

cmd/tetragon/main.go

Lines changed: 5 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,6 @@ import (
6666
"github.com/spf13/viper"
6767
"google.golang.org/grpc"
6868
"google.golang.org/protobuf/types/known/durationpb"
69-
"k8s.io/client-go/kubernetes"
70-
"k8s.io/client-go/rest"
7169
)
7270

7371
var (
@@ -389,9 +387,8 @@ func tetragonExecuteCtx(ctx context.Context, cancel context.CancelFunc, ready fu
389387
// Initialize a k8s watcher used to retrieve process metadata. This should
390388
// happen before the sensors are loaded, otherwise events will be stuck
391389
// waiting for metadata.
392-
var k8sClient *kubernetes.Clientset
393390
var controllerManager *manager.ControllerManager
394-
var k8sWatcher watcher.K8sResourceWatcher
391+
var podAccessor watcher.PodAccessor
395392
if option.Config.EnableK8s {
396393
log.Info("Enabling Kubernetes API")
397394
// Start controller-runtime manager.
@@ -411,37 +408,18 @@ func tetragonExecuteCtx(ctx context.Context, cancel context.CancelFunc, ready fu
411408
return err
412409
}
413410
}
414-
// retrieve k8s clients
415-
k8sClient, _, err = watcher.GetK8sClients(func(_ *rest.Config) error { return nil })
416-
if err != nil {
417-
return err
418-
}
419-
420-
// create k8s watcher
421-
k8sWatcher = watcher.NewK8sWatcher(k8sClient, nil, 60*time.Second)
422-
423-
// add informers for all resources
424-
// NB(anna): To add a pod informer, we need to pass the underlying
425-
// struct, not just the interface, because the function initializes
426-
// a cache inside this struct. This can be refactored if needed.
427-
realK8sWatcher := k8sWatcher.(*watcher.K8sWatcher)
428-
err = watcher.AddPodInformer(realK8sWatcher, true)
429-
if err != nil {
430-
return err
431-
}
411+
podAccessor = controllerManager
432412
} else {
433413
log.Info("Disabling Kubernetes API")
434-
k8sWatcher = watcher.NewFakeK8sWatcher(nil)
414+
podAccessor = watcher.NewFakeK8sWatcher(nil)
435415
}
436-
// start k8s watcher
437-
k8sWatcher.Start()
438416

439417
pcGCInterval := option.Config.ProcessCacheGCInterval
440418
if pcGCInterval <= 0 {
441419
pcGCInterval = defaults.DefaultProcessCacheGCInterval
442420
}
443421

444-
if err := process.InitCache(k8sWatcher, option.Config.ProcessCacheSize, pcGCInterval); err != nil {
422+
if err := process.InitCache(podAccessor, option.Config.ProcessCacheSize, pcGCInterval); err != nil {
445423
return err
446424
}
447425

@@ -460,7 +438,7 @@ func tetragonExecuteCtx(ctx context.Context, cancel context.CancelFunc, ready fu
460438
ctx, cancel2 := context.WithCancel(ctx)
461439
defer cancel2()
462440

463-
hookRunner := rthooks.GlobalRunner().WithWatcher(k8sWatcher)
441+
hookRunner := rthooks.GlobalRunner().WithWatcher(podAccessor)
464442

465443
err = setRedactionFilters()
466444
if err != nil {

pkg/manager/manager.go

Lines changed: 91 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,53 +5,91 @@ package manager
55

66
import (
77
"context"
8+
"fmt"
89
"sync"
910

1011
"github.com/bombsimon/logrusr/v4"
1112
"github.com/cilium/tetragon/pkg/k8s/apis/cilium.io/v1alpha1"
1213
"github.com/cilium/tetragon/pkg/logger"
14+
"github.com/cilium/tetragon/pkg/podhooks"
15+
"github.com/cilium/tetragon/pkg/reader/node"
16+
"github.com/cilium/tetragon/pkg/watcher"
1317
corev1 "k8s.io/api/core/v1"
1418
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
19+
"k8s.io/apimachinery/pkg/fields"
1520
"k8s.io/apimachinery/pkg/runtime"
1621
"k8s.io/apimachinery/pkg/types"
1722
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
1823
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
1924
"k8s.io/client-go/tools/cache"
2025
ctrl "sigs.k8s.io/controller-runtime"
26+
cmCache "sigs.k8s.io/controller-runtime/pkg/cache"
2127
"sigs.k8s.io/controller-runtime/pkg/client"
2228
ctrlManager "sigs.k8s.io/controller-runtime/pkg/manager"
29+
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
2330
)
2431

2532
var (
2633
initOnce, startOnce sync.Once
2734
manager *ControllerManager
35+
_ watcher.PodAccessor = (*ControllerManager)(nil)
2836
)
2937

3038
// ControllerManager is responsible for running controller-runtime controllers,
3139
// and interacting with Kubernetes API server in general. If you need to interact
3240
// with Kubernetes API server, this is the place to start.
3341
type ControllerManager struct {
34-
Manager ctrlManager.Manager
42+
Manager ctrlManager.Manager
43+
deletedPodCache *watcher.DeletedPodCache
44+
podInformer cache.SharedIndexInformer
3545
}
3646

3747
func Get() *ControllerManager {
48+
var err error
3849
initOnce.Do(func() {
39-
ctrl.SetLogger(logrusr.New(logger.GetLogger()))
40-
scheme := runtime.NewScheme()
41-
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
42-
utilruntime.Must(v1alpha1.AddToScheme(scheme))
43-
utilruntime.Must(apiextensionsv1.AddToScheme(scheme))
44-
controllerManager, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{Scheme: scheme})
50+
manager, err = newControllerManager(true)
4551
if err != nil {
4652
panic(err)
4753
}
48-
manager = &ControllerManager{
49-
Manager: controllerManager,
50-
}
5154
})
5255
return manager
5356
}
5457

58+
// newControllerManager creates a new controller manager. The enableMetrics flag
59+
// is for unit tests so that we can instantiate multiple ControllerManager instances
60+
// without them trying to bind to the same port 8080.
61+
func newControllerManager(enableMetrics bool) (*ControllerManager, error) {
62+
ctrl.SetLogger(logrusr.New(logger.GetLogger()))
63+
scheme := runtime.NewScheme()
64+
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
65+
utilruntime.Must(v1alpha1.AddToScheme(scheme))
66+
utilruntime.Must(apiextensionsv1.AddToScheme(scheme))
67+
cacheOptions := cmCache.Options{
68+
ByObject: map[client.Object]cmCache.ByObject{
69+
&corev1.Pod{}: {
70+
Field: fields.OneTermEqualSelector("spec.nodeName", node.GetKubernetesNodeName()),
71+
},
72+
},
73+
}
74+
metricsOptions := metricsserver.Options{}
75+
if !enableMetrics {
76+
metricsOptions.BindAddress = "0"
77+
}
78+
controllerOptions := ctrl.Options{Scheme: scheme, Cache: cacheOptions, Metrics: metricsOptions}
79+
controllerManager, err := ctrl.NewManager(ctrl.GetConfigOrDie(), controllerOptions)
80+
if err != nil {
81+
return nil, err
82+
}
83+
manager = &ControllerManager{
84+
Manager: controllerManager,
85+
}
86+
err = manager.addPodInformer()
87+
if err != nil {
88+
return nil, err
89+
}
90+
return manager, nil
91+
}
92+
5593
func (cm *ControllerManager) Start(ctx context.Context) {
5694
startOnce.Do(func() {
5795
go func() {
@@ -116,3 +154,46 @@ func (cm *ControllerManager) WaitCRDs(ctx context.Context, crds map[string]struc
116154
}
117155
return nil
118156
}
157+
158+
func (cm *ControllerManager) addPodInformer() error {
159+
// initialize deleted pod cache
160+
var err error
161+
deletedPodCache, err := watcher.NewDeletedPodCache()
162+
if err != nil {
163+
return fmt.Errorf("failed to initialize deleted pod cache: %w", err)
164+
}
165+
cm.deletedPodCache = deletedPodCache
166+
167+
// Initialize a pod informer.
168+
podInformer, err := cm.Manager.GetCache().GetInformer(context.Background(), &corev1.Pod{})
169+
if err != nil {
170+
return err
171+
}
172+
cm.podInformer = podInformer.(cache.SharedIndexInformer)
173+
err = cm.podInformer.AddIndexers(cache.Indexers{
174+
watcher.ContainerIdx: watcher.ContainerIndexFunc,
175+
watcher.PodIdx: watcher.PodIndexFunc,
176+
})
177+
if err != nil {
178+
return err
179+
}
180+
// add event handlers to the informer
181+
_, err = cm.podInformer.AddEventHandler(cm.deletedPodCache.EventHandler())
182+
if err != nil {
183+
return nil
184+
}
185+
podhooks.InstallHooks(cm.podInformer)
186+
return nil
187+
}
188+
189+
func (cm *ControllerManager) FindContainer(containerID string) (*corev1.Pod, *corev1.ContainerStatus, bool) {
190+
return watcher.FindContainer(containerID, cm.podInformer, cm.deletedPodCache)
191+
}
192+
193+
func (cm *ControllerManager) FindPod(podID string) (*corev1.Pod, error) {
194+
return watcher.FindPod(podID, cm.podInformer)
195+
}
196+
197+
func (cm *ControllerManager) FindMirrorPod(hash string) (*corev1.Pod, error) {
198+
return watcher.FindMirrorPod(hash, cm.podInformer)
199+
}

pkg/manager/manager_integration_test.go

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,25 +7,43 @@ package manager
77

88
import (
99
"context"
10+
"os"
1011
"testing"
12+
"time"
1113

14+
"github.com/cilium/tetragon/pkg/reader/node"
15+
"github.com/cilium/tetragon/pkg/watcher"
1216
"github.com/stretchr/testify/assert"
1317
"github.com/stretchr/testify/suite"
18+
corev1 "k8s.io/api/core/v1"
19+
"k8s.io/apimachinery/pkg/api/errors"
20+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
21+
"sigs.k8s.io/controller-runtime/pkg/client"
1422
"sigs.k8s.io/controller-runtime/pkg/envtest"
1523
)
1624

25+
const (
26+
nodeName = "tetragon-dev-control-plane"
27+
)
28+
29+
// ManagerTestSuite is a test suite for the ControllerManager. It assumes that
30+
// a Kind cluster created with "make kind" is running. More specifically, it
31+
// assumes that the cluster has a single node named "tetragon-dev-control-plane".
1732
type ManagerTestSuite struct {
1833
suite.Suite
1934
testEnv *envtest.Environment
2035
manager *ControllerManager
2136
}
2237

2338
func (suite *ManagerTestSuite) SetupSuite() {
39+
err := os.Setenv("NODE_NAME", nodeName)
40+
assert.NoError(suite.T(), err)
41+
node.SetKubernetesNodeName()
2442
useExistingCluster := true
2543
suite.testEnv = &envtest.Environment{
2644
UseExistingCluster: &useExistingCluster,
2745
}
28-
_, err := suite.testEnv.Start()
46+
_, err = suite.testEnv.Start()
2947
assert.NoError(suite.T(), err)
3048
suite.manager = Get()
3149
suite.manager.Start(context.Background())
@@ -43,6 +61,85 @@ func (suite *ManagerTestSuite) TestListNamespaces() {
4361
assert.Equal(suite.T(), namespaces[0].Name, namespace.Name)
4462
}
4563

64+
func (suite *ManagerTestSuite) TestFindPod() {
65+
var pods corev1.PodList
66+
err := suite.manager.Manager.GetCache().List(context.Background(), &pods, client.InNamespace("kube-system"))
67+
assert.NoError(suite.T(), err)
68+
assert.NotEmpty(suite.T(), pods.Items)
69+
pod, err := suite.manager.FindPod(string(pods.Items[0].UID))
70+
assert.NoError(suite.T(), err)
71+
assert.Equal(suite.T(), pods.Items[0].UID, pod.UID)
72+
}
73+
74+
func (suite *ManagerTestSuite) TestFindContainer() {
75+
// Create a pod.
76+
pod := &corev1.Pod{
77+
ObjectMeta: metav1.ObjectMeta{
78+
Name: "nginx",
79+
Namespace: "kube-system",
80+
},
81+
Spec: corev1.PodSpec{
82+
Containers: []corev1.Container{{Name: "nginx", Image: "nginx"}},
83+
},
84+
}
85+
k8sClient := suite.manager.Manager.GetClient()
86+
_ = k8sClient.Create(context.Background(), pod)
87+
88+
// Get the container ID of the pod.
89+
podFromClient := corev1.Pod{}
90+
containerID := ""
91+
assert.Eventually(suite.T(), func() bool {
92+
err := k8sClient.Get(context.Background(), client.ObjectKey{Namespace: pod.Namespace, Name: pod.Name}, &podFromClient)
93+
if err != nil {
94+
return false
95+
}
96+
containerID, err = watcher.ContainerIDKey(podFromClient.Status.ContainerStatuses[0].ContainerID)
97+
if err != nil {
98+
return false
99+
}
100+
return true
101+
}, 10*time.Second, 1*time.Second)
102+
103+
// FindContainer should return the pod and container.
104+
podFromCache, container, found := suite.manager.FindContainer(containerID)
105+
assert.True(suite.T(), found)
106+
assert.Equal(suite.T(), pod.Name, podFromCache.Name)
107+
assert.Equal(suite.T(), pod.Spec.Containers[0].Name, container.Name)
108+
109+
// Delete the pod.
110+
err := k8sClient.Delete(context.Background(), pod)
111+
assert.NoError(suite.T(), err)
112+
assert.Eventually(suite.T(), func() bool {
113+
err = k8sClient.Get(context.Background(), client.ObjectKey{Namespace: pod.Namespace, Name: pod.Name}, &podFromClient)
114+
return errors.IsNotFound(err)
115+
}, 10*time.Second, 1*time.Second)
116+
117+
// FindContainer should still return the pod and container from the deleted pod cache.
118+
podFromCache, container, found = suite.manager.FindContainer(containerID)
119+
assert.True(suite.T(), found)
120+
assert.Equal(suite.T(), pod.Name, podFromCache.Name)
121+
assert.Equal(suite.T(), pod.Spec.Containers[0].Name, container.Name)
122+
}
123+
124+
func (suite *ManagerTestSuite) TestLocalPods() {
125+
ctx, cancel := context.WithCancel(context.Background())
126+
defer cancel()
127+
err := os.Setenv("NODE_NAME", "nonexistent-node")
128+
assert.NoError(suite.T(), err)
129+
node.SetKubernetesNodeName()
130+
controllerManager, err := newControllerManager(false)
131+
assert.NoError(suite.T(), err)
132+
go func() {
133+
assert.NoError(suite.T(), controllerManager.Manager.Start(ctx))
134+
}()
135+
controllerManager.Manager.GetCache().WaitForCacheSync(ctx)
136+
pods := corev1.PodList{}
137+
err = controllerManager.Manager.GetCache().List(context.Background(), &pods)
138+
assert.NoError(suite.T(), err)
139+
// Pod cache should be empty because the node name is set to a nonexistent node.
140+
assert.Empty(suite.T(), pods.Items)
141+
}
142+
46143
func (suite *ManagerTestSuite) TearDownSuite() {
47144
assert.NoError(suite.T(), suite.testEnv.Stop())
48145
}

pkg/watcher/deleted_pod_cache.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func (c *DeletedPodCache) EventHandler() cache.ResourceEventHandler {
5555
continue
5656
}
5757

58-
key, err := containerIDKey(contID)
58+
key, err := ContainerIDKey(contID)
5959
if err != nil {
6060
logger.GetLogger().Warn("failed to crate container key for id '%s': %w", contID, err)
6161
continue

0 commit comments

Comments
 (0)