Files
flock/pkg/agent/podinfo.go
T

139 lines
4.2 KiB
Go
Raw Normal View History

package agent
import (
"context"
"fmt"
"log/slog"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
// podReady returns true iff the Pod has a Ready=True condition. The
// canonical readiness signal kubelet reports based on container readiness
// probes — anycast advertisement and (future) NetworkPolicy hooks key off
// this.
func podReady(pod *corev1.Pod) bool {
for _, c := range pod.Status.Conditions {
if c.Type == corev1.PodReady {
return c.Status == corev1.ConditionTrue
}
}
return false
}
// PodCache exposes a Get(ns, name) lookup against a node-scoped Pod
// informer. ADD/DEL handlers consult it to read annotations + labels for
// IPAM and (later) NetworkPolicy. Callers can subscribe to Ready
// transitions via OnReadyChange.
type PodCache struct {
logger *slog.Logger
store cache.Store
informer cache.SharedIndexInformer
onReady []func()
}
// StartPodInformer launches a Pod informer filtered to spec.nodeName ==
// node. Returns a PodCache once the cache is synced. Blocks on initial
// list/watch sync.
func StartPodInformer(ctx context.Context, cfg *rest.Config, node string, logger *slog.Logger) (*PodCache, error) {
cs, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, fmt.Errorf("kubernetes client: %w", err)
}
tweak := func(opts *metav1.ListOptions) {
opts.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", node).String()
}
factory := informers.NewSharedInformerFactoryWithOptions(cs, 10*time.Minute,
informers.WithTweakListOptions(tweak))
inf := factory.Core().V1().Pods().Informer()
pc := &PodCache{store: inf.GetStore(), logger: logger, informer: inf}
_, _ = inf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if pod, ok := obj.(*corev1.Pod); ok && podReady(pod) {
pc.fireReady()
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldP, _ := oldObj.(*corev1.Pod)
newP, _ := newObj.(*corev1.Pod)
if oldP == nil || newP == nil {
return
}
if podReady(oldP) != podReady(newP) {
pc.fireReady()
}
},
DeleteFunc: func(obj interface{}) {
pc.fireReady()
},
})
logger.Info("Pod informer starting", "node", node, "field_selector", "spec.nodeName="+node)
factory.Start(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), inf.HasSynced) {
return nil, fmt.Errorf("pod informer cache failed to sync")
}
logger.Info("Pod informer synced", "node", node, "items", len(inf.GetStore().ListKeys()))
return pc, nil
}
// OnReadyChange registers a callback fired on every Pod readiness
// transition observed by the informer (and on Add when the pod is already
// Ready, and on Delete unconditionally). Used by the AnycastReconciler.
//
// Safe to call before Run; callbacks fire synchronously inside the
// informer's event handler so they should be cheap (the AnycastReconciler
// just sends to a coalescing channel).
func (c *PodCache) OnReadyChange(f func()) {
c.onReady = append(c.onReady, f)
}
func (c *PodCache) fireReady() {
for _, f := range c.onReady {
f()
}
}
// Get looks up a Pod by namespace and name. Returns (nil, false) if absent.
func (c *PodCache) Get(namespace, name string) (*corev1.Pod, bool) {
key := namespace + "/" + name
obj, ok, err := c.store.GetByKey(key)
if err != nil || !ok || obj == nil {
return nil, false
}
pod, ok := obj.(*corev1.Pod)
if !ok {
return nil, false
}
return pod, true
}
// WaitForPod polls the cache for up to `timeout` for a pod to appear.
// kubelet may invoke CNI ADD slightly before the informer has observed the
// PodSpec, so this helper smooths the race.
func (c *PodCache) WaitForPod(ctx context.Context, namespace, name string, timeout time.Duration) (*corev1.Pod, error) {
deadline := time.Now().Add(timeout)
for {
if pod, ok := c.Get(namespace, name); ok {
return pod, nil
}
if time.Now().After(deadline) {
return nil, fmt.Errorf("pod %s/%s not found in informer cache after %s", namespace, name, timeout)
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(50 * time.Millisecond):
}
}
}