c61b12204c
Build flock Image / build (push) Has been cancelled
Previously the AnycastReconciler kept a pod in the nexthop set as long as its PodReady condition was True. During a rolling restart that produces a window after kubelet has accepted SIGTERM (DeletionTimestamp set, pod still Ready until probes observe shutdown) where BGP still advertises a path through the dying pod's veth — in-flight requests get RST'd when the container actually exits. Fix: introduce podAnycastEligible(pod) = !DeletionTimestamp && Ready, swap it in at the AnycastReconciler's isReady callback, and fire the ready-change callback when DeletionTimestamp transitions (the informer UpdateFunc previously only fired on Ready transitions). Result: as soon as the apiserver marks a pod for deletion, the reconciler withdraws the local nexthop and BIRD reannounces the route without it. Sibling replicas absorb traffic before the pod's terminationGracePeriod elapses. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
152 lines
5.0 KiB
Go
152 lines
5.0 KiB
Go
package agent
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"time"
|
|
|
|
corev1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/fields"
|
|
"k8s.io/client-go/informers"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/rest"
|
|
"k8s.io/client-go/tools/cache"
|
|
)
|
|
|
|
// podReady returns true iff the Pod has a Ready=True condition. The
|
|
// canonical readiness signal kubelet reports based on container readiness
|
|
// probes — anycast advertisement and (future) NetworkPolicy hooks key off
|
|
// this.
|
|
func podReady(pod *corev1.Pod) bool {
|
|
for _, c := range pod.Status.Conditions {
|
|
if c.Type == corev1.PodReady {
|
|
return c.Status == corev1.ConditionTrue
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// podAnycastEligible reports whether a pod should contribute its IP as a
|
|
// nexthop for its anycast IPs. A pod is eligible when it is Ready AND not
|
|
// being deleted. Once the apiserver sets DeletionTimestamp, kubelet has
|
|
// started teardown — kube-proxy will keep routing for terminationGracePeriod
|
|
// but the pod is on the way out; we should withdraw the nexthop immediately
|
|
// so BGP shifts traffic to a sibling before the pod actually exits.
|
|
func podAnycastEligible(pod *corev1.Pod) bool {
|
|
return pod.DeletionTimestamp == nil && podReady(pod)
|
|
}
|
|
|
|
// PodCache exposes a Get(ns, name) lookup against a node-scoped Pod
|
|
// informer. ADD/DEL handlers consult it to read annotations + labels for
|
|
// IPAM and (later) NetworkPolicy. Callers can subscribe to Ready
|
|
// transitions via OnReadyChange.
|
|
type PodCache struct {
|
|
logger *slog.Logger
|
|
store cache.Store
|
|
informer cache.SharedIndexInformer
|
|
onReady []func()
|
|
}
|
|
|
|
// StartPodInformer launches a Pod informer filtered to spec.nodeName ==
|
|
// node. Returns a PodCache once the cache is synced. Blocks on initial
|
|
// list/watch sync.
|
|
func StartPodInformer(ctx context.Context, cfg *rest.Config, node string, logger *slog.Logger) (*PodCache, error) {
|
|
cs, err := kubernetes.NewForConfig(cfg)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("kubernetes client: %w", err)
|
|
}
|
|
tweak := func(opts *metav1.ListOptions) {
|
|
opts.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", node).String()
|
|
}
|
|
factory := informers.NewSharedInformerFactoryWithOptions(cs, 10*time.Minute,
|
|
informers.WithTweakListOptions(tweak))
|
|
inf := factory.Core().V1().Pods().Informer()
|
|
|
|
pc := &PodCache{store: inf.GetStore(), logger: logger, informer: inf}
|
|
|
|
_, _ = inf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
AddFunc: func(obj interface{}) {
|
|
if pod, ok := obj.(*corev1.Pod); ok && podAnycastEligible(pod) {
|
|
pc.fireReady()
|
|
}
|
|
},
|
|
UpdateFunc: func(oldObj, newObj interface{}) {
|
|
oldP, _ := oldObj.(*corev1.Pod)
|
|
newP, _ := newObj.(*corev1.Pod)
|
|
if oldP == nil || newP == nil {
|
|
return
|
|
}
|
|
// Fire on Ready transition OR DeletionTimestamp transition.
|
|
// The latter catches "pod was Ready, now being deleted" so the
|
|
// reconciler withdraws the nexthop before the pod actually exits.
|
|
if podAnycastEligible(oldP) != podAnycastEligible(newP) {
|
|
pc.fireReady()
|
|
}
|
|
},
|
|
DeleteFunc: func(obj interface{}) {
|
|
pc.fireReady()
|
|
},
|
|
})
|
|
|
|
logger.Info("Pod informer starting", "node", node, "field_selector", "spec.nodeName="+node)
|
|
factory.Start(ctx.Done())
|
|
if !cache.WaitForCacheSync(ctx.Done(), inf.HasSynced) {
|
|
return nil, fmt.Errorf("pod informer cache failed to sync")
|
|
}
|
|
logger.Info("Pod informer synced", "node", node, "items", len(inf.GetStore().ListKeys()))
|
|
return pc, nil
|
|
}
|
|
|
|
// OnReadyChange registers a callback fired on every Pod readiness
|
|
// transition observed by the informer (and on Add when the pod is already
|
|
// Ready, and on Delete unconditionally). Used by the AnycastReconciler.
|
|
//
|
|
// Safe to call before Run; callbacks fire synchronously inside the
|
|
// informer's event handler so they should be cheap (the AnycastReconciler
|
|
// just sends to a coalescing channel).
|
|
func (c *PodCache) OnReadyChange(f func()) {
|
|
c.onReady = append(c.onReady, f)
|
|
}
|
|
|
|
func (c *PodCache) fireReady() {
|
|
for _, f := range c.onReady {
|
|
f()
|
|
}
|
|
}
|
|
|
|
// Get looks up a Pod by namespace and name. Returns (nil, false) if absent.
|
|
func (c *PodCache) Get(namespace, name string) (*corev1.Pod, bool) {
|
|
key := namespace + "/" + name
|
|
obj, ok, err := c.store.GetByKey(key)
|
|
if err != nil || !ok || obj == nil {
|
|
return nil, false
|
|
}
|
|
pod, ok := obj.(*corev1.Pod)
|
|
if !ok {
|
|
return nil, false
|
|
}
|
|
return pod, true
|
|
}
|
|
|
|
// WaitForPod polls the cache for up to `timeout` for a pod to appear.
|
|
// kubelet may invoke CNI ADD slightly before the informer has observed the
|
|
// PodSpec, so this helper smooths the race.
|
|
func (c *PodCache) WaitForPod(ctx context.Context, namespace, name string, timeout time.Duration) (*corev1.Pod, error) {
|
|
deadline := time.Now().Add(timeout)
|
|
for {
|
|
if pod, ok := c.Get(namespace, name); ok {
|
|
return pod, nil
|
|
}
|
|
if time.Now().After(deadline) {
|
|
return nil, fmt.Errorf("pod %s/%s not found in informer cache after %s", namespace, name, timeout)
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
case <-time.After(50 * time.Millisecond):
|
|
}
|
|
}
|
|
}
|