Files
flock/pkg/agent/anycast_linux.go
T
Donavan Fritz c61b12204c
Build flock Image / build (push) Has been cancelled
anycast: drop pods from nexthop set on DeletionTimestamp
Previously the AnycastReconciler kept a pod in the nexthop set as long as
its PodReady condition was True. During a rolling restart that produces a
window after kubelet has accepted SIGTERM (DeletionTimestamp set, pod
still Ready until probes observe shutdown) where BGP still advertises a
path through the dying pod's veth — in-flight requests get RST'd when
the container actually exits.

Fix: introduce podAnycastEligible(pod) = !DeletionTimestamp && Ready,
swap it in at the AnycastReconciler's isReady callback, and fire the
ready-change callback when DeletionTimestamp transitions (the informer
UpdateFunc previously only fired on Ready transitions).

Result: as soon as the apiserver marks a pod for deletion, the
reconciler withdraws the local nexthop and BIRD reannounces the route
without it. Sibling replicas absorb traffic before the pod's
terminationGracePeriod elapses.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
2026-04-25 22:24:50 -05:00

251 lines
6.7 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
//go:build linux
package agent
import (
"context"
"errors"
"fmt"
"log/slog"
"net"
"sync"
"syscall"
"time"
flockv1alpha1 "code.fritzlab.net/fritzlab/flock/pkg/api/v1alpha1"
"github.com/vishvananda/netlink"
)
// AnycastReconciler keeps the kernel's anycast host routes and BIRD's
// advertised set in sync with (committed allocations × pod Ready).
//
// Lifecycle (per design doc):
// - CNI ADD assigns anycast IPs to pod lo (already done in netns_linux.go).
// - Pod transitions to Ready=True → install host /128 (or /32) route at
// `dev flock<8hex>` and add the IP to BIRD's export filter.
// - Pod transitions to Ready=False or DELETE → remove kernel route, remove
// from BIRD export.
//
// When more than one Ready pod on this node binds the same anycast IP, the
// kernel route uses RTA_MULTIPATH so the kernel does per-flow ECMP across
// the contributing pods. This is the within-node companion to BGP-level
// ECMP across nodes.
//
// Reconcile is idempotent. Triggers: AfterCommit hook, Pod informer
// UpdateFunc on Ready transitions, periodic 2s tick.
type AnycastReconciler struct {
Node string
Store *Store
Pods *PodCache
NodeConfig *NodeConfigCache
Bird *BirdManager
RouterID string
Logger *slog.Logger
mu sync.Mutex
advertised map[string]anycastTarget // canonical IP → install info
trigger chan struct{}
}
// NewAnycastReconciler returns a Reconciler ready to Run.
func NewAnycastReconciler(node string, store *Store, pods *PodCache, nc *NodeConfigCache, bird *BirdManager, routerID string, logger *slog.Logger) *AnycastReconciler {
return &AnycastReconciler{
Node: node,
Store: store,
Pods: pods,
NodeConfig: nc,
Bird: bird,
RouterID: routerID,
Logger: logger,
advertised: map[string]anycastTarget{},
trigger: make(chan struct{}, 1),
}
}
// Trigger requests one reconcile pass. Coalesces — if a pass is already
// pending, the call is a no-op.
func (r *AnycastReconciler) Trigger() {
select {
case r.trigger <- struct{}{}:
default:
}
}
// Run blocks until ctx is cancelled. Reconciles on Trigger or every 2s.
func (r *AnycastReconciler) Run(ctx context.Context) {
t := time.NewTicker(2 * time.Second)
defer t.Stop()
r.reconcile() // initial pass
for {
select {
case <-ctx.Done():
return
case <-t.C:
r.reconcile()
case <-r.trigger:
r.reconcile()
}
}
}
func (r *AnycastReconciler) reconcile() {
r.mu.Lock()
defer r.mu.Unlock()
desired := r.computeDesired()
// Install routes that should exist but don't, or whose nexthop set
// changed.
for ip, t := range desired {
if cur, ok := r.advertised[ip]; ok && cur.equal(t) {
continue
}
if err := installAnycastRoute(ip, t); err != nil {
r.Logger.Warn("anycast install", "ip", ip, "nexthops", len(t.nexthops), "err", err)
continue
}
r.Logger.Info("anycast advertise", "ip", ip, "nexthops", describeNexthops(t))
r.advertised[ip] = t
}
// Remove routes that exist but shouldn't.
for ip, t := range r.advertised {
if _, want := desired[ip]; !want {
if err := removeAnycastRoute(ip, t); err != nil {
r.Logger.Warn("anycast remove", "ip", ip, "err", err)
} else {
r.Logger.Info("anycast withdraw", "ip", ip)
}
delete(r.advertised, ip)
}
}
// Re-render BIRD with the active set.
r.renderBird(desired)
}
// computeDesired delegates to the pure resolveAnycastTargets and plugs in
// the live informer-based isReady callback.
func (r *AnycastReconciler) computeDesired() map[string]anycastTarget {
return resolveAnycastTargets(
r.Store.Snapshot(),
func(ns, name string) bool {
pod, ok := r.Pods.Get(ns, name)
return ok && podAnycastEligible(pod)
},
func(s string) { r.Logger.Warn(s) },
)
}
func (r *AnycastReconciler) renderBird(desired map[string]anycastTarget) {
nc := r.NodeConfig.Load()
if nc == nil || r.Bird == nil {
return
}
var v6, v4 []string
for ipStr := range desired {
ip := net.ParseIP(ipStr)
if ip == nil {
continue
}
if ip.To4() != nil {
v4 = append(v4, ip.To4().String())
} else {
v6 = append(v6, ip.To16().String())
}
}
if err := r.Bird.Render(nc, v6, v4, r.RouterID); err != nil {
r.Logger.Warn("anycast bird render", "err", err)
}
}
// installAnycastRoute installs `<ipStr>/<128|32>` pointing at the
// nexthop set in t. With one nexthop the route is a plain via-route;
// with multiple, it's a multipath route using RTA_MULTIPATH so the
// kernel hashes flows across the constituent pods.
//
// Idempotent — RouteReplace overwrites a stale entry.
func installAnycastRoute(ipStr string, t anycastTarget) error {
ip := net.ParseIP(ipStr)
if ip == nil {
return fmt.Errorf("bad ip %q", ipStr)
}
if len(t.nexthops) == 0 {
return fmt.Errorf("anycast %s: no nexthops", ipStr)
}
prefix := 128
if ip.To4() != nil {
prefix = 32
ip = ip.To4()
}
r := &netlink.Route{Dst: cidrFor(ip, prefix)}
if len(t.nexthops) == 1 {
// Single nexthop — keep the route shape identical to today's
// production form. Functionally equivalent to a 1-element
// MultiPath but `ip route show` renders nicer for operators.
nh := t.nexthops[0]
link, err := netlink.LinkByName(nh.hostIface)
if err != nil {
return fmt.Errorf("lookup %s: %w", nh.hostIface, err)
}
r.LinkIndex = link.Attrs().Index
r.Gw = nh.via
} else {
hops := make([]*netlink.NexthopInfo, 0, len(t.nexthops))
for _, nh := range t.nexthops {
link, err := netlink.LinkByName(nh.hostIface)
if err != nil {
return fmt.Errorf("lookup %s: %w", nh.hostIface, err)
}
hops = append(hops, &netlink.NexthopInfo{
LinkIndex: link.Attrs().Index,
Gw: nh.via,
Hops: 0,
})
}
r.MultiPath = hops
}
return netlink.RouteReplace(r)
}
// removeAnycastRoute deletes the host route. Missing routes / interfaces
// are treated as success — DEL paths can race with veth teardown.
//
// Kernel route deletion matches by destination prefix; we don't need to
// re-specify the nexthop set.
func removeAnycastRoute(ipStr string, _ anycastTarget) error {
ip := net.ParseIP(ipStr)
if ip == nil {
return nil
}
prefix := 128
if ip.To4() != nil {
prefix = 32
ip = ip.To4()
}
r := &netlink.Route{Dst: cidrFor(ip, prefix)}
if err := netlink.RouteDel(r); err != nil {
// ESRCH ("no such process") is netlink-speak for "no such route";
// treat as success.
if errors.Is(err, syscall.ESRCH) || linkNotFound(err) {
return nil
}
return err
}
return nil
}
// describeNexthops returns a compact string for log messages.
func describeNexthops(t anycastTarget) string {
var s string
for i, nh := range t.nexthops {
if i > 0 {
s += ","
}
s += nh.hostIface + "→" + nh.via.String()
}
return s
}
// _ = flockv1alpha1 to silence unused import warnings on minimal builds.
var _ = flockv1alpha1.GroupName