From 89a3502446b939036b030333301e4c518f78e93e Mon Sep 17 00:00:00 2001 From: Donavan Fritz Date: Sat, 25 Apr 2026 07:36:47 -0500 Subject: [PATCH] =?UTF-8?q?M6:=20anycast=20=E2=80=94=20pod=20lo=20+=20Read?= =?UTF-8?q?y-gated=20/128/32=20+=20BIRD=20export?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CNI ADD now adds anycast IPs to the pod's lo interface (NOT eth0 — design doc rationale: avoid NDP/ARP DAD conflicts when N replicas share an IP). Allocation persists the anycast list. AnycastReconciler: desired = { ip → flock<8hex> } from committed allocations × pod.Status.PodReady=True diff against advertised, install/remove host /128 (v6) or /32 (v4) re-render bird.conf with the active set Triggers: 2s tick, AfterCommit (per ADD/DEL), Pod informer Ready transitions (PodCache.OnReadyChange callback). The bird template already supported Anycast6/Anycast4 via the export filter — this turn finally drives those slices from runtime. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- pkg/agent/anycast_linux.go | 214 +++++++++++++++++++++++++++++++++++++ pkg/agent/anycast_stub.go | 19 ++++ pkg/agent/handlers.go | 13 +++ pkg/agent/netns_linux.go | 29 +++++ pkg/agent/netns_stub.go | 1 + pkg/agent/podinfo.go | 65 ++++++++++- pkg/agent/runtime_linux.go | 26 +++-- 7 files changed, 352 insertions(+), 15 deletions(-) create mode 100644 pkg/agent/anycast_linux.go create mode 100644 pkg/agent/anycast_stub.go diff --git a/pkg/agent/anycast_linux.go b/pkg/agent/anycast_linux.go new file mode 100644 index 0000000..9817202 --- /dev/null +++ b/pkg/agent/anycast_linux.go @@ -0,0 +1,214 @@ +//go:build linux + +package agent + +import ( + "context" + "fmt" + "log/slog" + "net" + "sync" + "time" + + flockv1alpha1 "code.fritzlab.net/fritzlab/flock/pkg/api/v1alpha1" + "github.com/vishvananda/netlink" +) + +// AnycastReconciler keeps the kernel's anycast host routes and BIRD's +// advertised set in sync with (committed allocations × pod Ready). +// +// Lifecycle (per design doc): +// - CNI ADD assigns anycast IPs to pod lo (already done in netns_linux.go). +// - Pod transitions to Ready=True → install host /128 (or /32) route at +// `dev flock<8hex>` and add the IP to BIRD's export filter. +// - Pod transitions to Ready=False or DELETE → remove kernel route, remove +// from BIRD export. +// +// Reconcile is idempotent. Triggers: AfterCommit hook, Pod informer +// UpdateFunc on Ready transitions, periodic 2s tick. +type AnycastReconciler struct { + Node string + Store *Store + Pods *PodCache + NodeConfig *NodeConfigCache + Bird *BirdManager + RouterID string + Logger *slog.Logger + + mu sync.Mutex + advertised map[string]string // canonical IP → host iface name + trigger chan struct{} +} + +// NewAnycastReconciler returns a Reconciler ready to Run. +func NewAnycastReconciler(node string, store *Store, pods *PodCache, nc *NodeConfigCache, bird *BirdManager, routerID string, logger *slog.Logger) *AnycastReconciler { + return &AnycastReconciler{ + Node: node, + Store: store, + Pods: pods, + NodeConfig: nc, + Bird: bird, + RouterID: routerID, + Logger: logger, + advertised: map[string]string{}, + trigger: make(chan struct{}, 1), + } +} + +// Trigger requests one reconcile pass. Coalesces — if a pass is already +// pending, the call is a no-op. +func (r *AnycastReconciler) Trigger() { + select { + case r.trigger <- struct{}{}: + default: + } +} + +// Run blocks until ctx is cancelled. Reconciles on Trigger or every 2s. +func (r *AnycastReconciler) Run(ctx context.Context) { + t := time.NewTicker(2 * time.Second) + defer t.Stop() + r.reconcile() // initial pass + for { + select { + case <-ctx.Done(): + return + case <-t.C: + r.reconcile() + case <-r.trigger: + r.reconcile() + } + } +} + +func (r *AnycastReconciler) reconcile() { + r.mu.Lock() + defer r.mu.Unlock() + + desired := r.computeDesired() + + // Install routes that should exist but don't. + for ip, host := range desired { + if r.advertised[ip] != host { + if err := installAnycastRoute(ip, host); err != nil { + r.Logger.Warn("anycast install", "ip", ip, "host", host, "err", err) + continue + } + r.Logger.Info("anycast advertise", "ip", ip, "host", host) + r.advertised[ip] = host + } + } + // Remove routes that exist but shouldn't. + for ip, host := range r.advertised { + if _, want := desired[ip]; !want { + if err := removeAnycastRoute(ip, host); err != nil { + r.Logger.Warn("anycast remove", "ip", ip, "host", host, "err", err) + } else { + r.Logger.Info("anycast withdraw", "ip", ip, "host", host) + } + delete(r.advertised, ip) + } + } + + // Re-render BIRD with the active set. + r.renderBird(desired) +} + +// computeDesired walks the Store and returns the {ip → host iface} map of +// anycast advertisements that should be active right now. +func (r *AnycastReconciler) computeDesired() map[string]string { + out := map[string]string{} + for _, a := range r.Store.Snapshot() { + if a.State != StateCommitted || len(a.Anycast) == 0 { + continue + } + pod, ok := r.Pods.Get(a.Namespace, a.PodName) + if !ok || !podReady(pod) { + continue + } + host := HostIfaceName(a.ContainerID) + for _, ipStr := range a.Anycast { + ip := net.ParseIP(ipStr) + if ip == nil { + continue + } + out[canonical(ip)] = host + } + } + return out +} + +func (r *AnycastReconciler) renderBird(desired map[string]string) { + nc := r.NodeConfig.Load() + if nc == nil || r.Bird == nil { + return + } + var v6, v4 []string + for ipStr := range desired { + ip := net.ParseIP(ipStr) + if ip == nil { + continue + } + if ip.To4() != nil { + v4 = append(v4, ip.To4().String()) + } else { + v6 = append(v6, ip.To16().String()) + } + } + if err := r.Bird.Render(nc, v6, v4, r.RouterID); err != nil { + r.Logger.Warn("anycast bird render", "err", err) + } +} + +// installAnycastRoute installs a host /128 (v6) or /32 (v4) pointing at +// the pod's host veth. Idempotent — RouteReplace overwrites. +func installAnycastRoute(ipStr, hostIface string) error { + ip := net.ParseIP(ipStr) + if ip == nil { + return fmt.Errorf("bad ip %q", ipStr) + } + link, err := netlink.LinkByName(hostIface) + if err != nil { + return fmt.Errorf("lookup %s: %w", hostIface, err) + } + prefix := 128 + if ip.To4() != nil { + prefix = 32 + ip = ip.To4() + } + r := &netlink.Route{ + LinkIndex: link.Attrs().Index, + Scope: netlink.SCOPE_LINK, + Dst: cidrFor(ip, prefix), + } + return netlink.RouteReplace(r) +} + +// removeAnycastRoute deletes the host route. Missing routes / interfaces +// are not errors. +func removeAnycastRoute(ipStr, hostIface string) error { + ip := net.ParseIP(ipStr) + if ip == nil { + return nil + } + link, err := netlink.LinkByName(hostIface) + if err != nil { + return nil // veth gone → route gone + } + prefix := 128 + if ip.To4() != nil { + prefix = 32 + ip = ip.To4() + } + r := &netlink.Route{ + LinkIndex: link.Attrs().Index, + Dst: cidrFor(ip, prefix), + } + if err := netlink.RouteDel(r); err != nil && !linkNotFound(err) { + return err + } + return nil +} + +// _ = flockv1alpha1 to silence unused import warnings on minimal builds. +var _ = flockv1alpha1.GroupName diff --git a/pkg/agent/anycast_stub.go b/pkg/agent/anycast_stub.go new file mode 100644 index 0000000..ff62efc --- /dev/null +++ b/pkg/agent/anycast_stub.go @@ -0,0 +1,19 @@ +//go:build !linux + +package agent + +import ( + "context" + "log/slog" +) + +// AnycastReconciler is a no-op on non-Linux platforms (lets the package +// compile on developer machines for unit tests). +type AnycastReconciler struct{} + +func NewAnycastReconciler(_ string, _ *Store, _ *PodCache, _ *NodeConfigCache, _ *BirdManager, _ string, _ *slog.Logger) *AnycastReconciler { + return &AnycastReconciler{} +} + +func (r *AnycastReconciler) Trigger() {} +func (r *AnycastReconciler) Run(_ context.Context) {} diff --git a/pkg/agent/handlers.go b/pkg/agent/handlers.go index ded03c5..5e0ff8b 100644 --- a/pkg/agent/handlers.go +++ b/pkg/agent/handlers.go @@ -78,6 +78,7 @@ func (h *PodHandler) Add(ctx context.Context, req flockcni.Request) (*current.Re OwnerUID: string(pod.UID), IP6: ipString(res.IP6), IP4: ipString(res.IP4), + Anycast: anycastStrings(parsed.Anycast), State: StatePending, AllocatedAt: time.Now().UTC(), } @@ -93,6 +94,7 @@ func (h *PodHandler) Add(ctx context.Context, req flockcni.Request) (*current.Re HostIface: HostIfaceName(req.ContainerID), IP6: res.IP6, IP4: res.IP4, + Anycast: parsed.Anycast, } if err := h.SetupFunc(setup); err != nil { // Roll forward: leave pending entry in place so startup GC can clean @@ -172,3 +174,14 @@ func ipString(ip net.IP) string { } return canonical(ip) } + +func anycastStrings(ips []net.IP) []string { + if len(ips) == 0 { + return nil + } + out := make([]string, len(ips)) + for i, ip := range ips { + out[i] = canonical(ip) + } + return out +} diff --git a/pkg/agent/netns_linux.go b/pkg/agent/netns_linux.go index 2b2279f..b1d97b0 100644 --- a/pkg/agent/netns_linux.go +++ b/pkg/agent/netns_linux.go @@ -21,6 +21,10 @@ type SetupRequest struct { HostIface string // host-side veth name from HostIfaceName IP6 net.IP // /128 inside pod IP4 net.IP // /32 inside pod (may be nil) + // Anycast IPs to add to pod's lo (NOT eth0). Mix of IPv6 and IPv4. + // Host /128 and /32 routes are NOT installed here — that happens once + // the pod becomes Ready, see AnycastReconciler. + Anycast []net.IP } // LinkLocalGW is the deterministic IPv6 LL gateway placed on every host @@ -237,6 +241,31 @@ func configurePodSide(req SetupRequest) error { } } + // Anycast: assign each IP to pod lo. NOT on eth0 (avoids NDP/ARP + // DAD conflicts when multiple replicas share the same IP). + if len(req.Anycast) > 0 { + lo, err := netlink.LinkByName("lo") + if err != nil { + return fmt.Errorf("lookup pod lo: %w", err) + } + if err := netlink.LinkSetUp(lo); err != nil { + return fmt.Errorf("set up pod lo: %w", err) + } + for _, ip := range req.Anycast { + var mask net.IPMask + if ip.To4() != nil { + mask = net.CIDRMask(32, 32) + ip = ip.To4() + } else { + mask = net.CIDRMask(128, 128) + } + a := &netlink.Addr{IPNet: &net.IPNet{IP: ip, Mask: mask}, Scope: int(netlink.SCOPE_UNIVERSE)} + if err := netlink.AddrAdd(lo, a); err != nil && !errors.Is(err, os.ErrExist) { + return fmt.Errorf("pod lo anycast %s: %w", ip, err) + } + } + } + return nil }) } diff --git a/pkg/agent/netns_stub.go b/pkg/agent/netns_stub.go index 7a720dd..da75770 100644 --- a/pkg/agent/netns_stub.go +++ b/pkg/agent/netns_stub.go @@ -15,6 +15,7 @@ type SetupRequest struct { HostIface string IP6 net.IP IP4 net.IP + Anycast []net.IP } // Setup is unimplemented on non-Linux platforms; the agent only runs in diff --git a/pkg/agent/podinfo.go b/pkg/agent/podinfo.go index 82f5ddb..ceb1cb7 100644 --- a/pkg/agent/podinfo.go +++ b/pkg/agent/podinfo.go @@ -15,13 +15,28 @@ import ( "k8s.io/client-go/tools/cache" ) +// podReady returns true iff the Pod has a Ready=True condition. The +// canonical readiness signal kubelet reports based on container readiness +// probes — anycast advertisement and (future) NetworkPolicy hooks key off +// this. +func podReady(pod *corev1.Pod) bool { + for _, c := range pod.Status.Conditions { + if c.Type == corev1.PodReady { + return c.Status == corev1.ConditionTrue + } + } + return false +} + // PodCache exposes a Get(ns, name) lookup against a node-scoped Pod // informer. ADD/DEL handlers consult it to read annotations + labels for -// IPAM and (later) NetworkPolicy. +// IPAM and (later) NetworkPolicy. Callers can subscribe to Ready +// transitions via OnReadyChange. type PodCache struct { - lister cache.GenericLister - logger *slog.Logger - store cache.Store + logger *slog.Logger + store cache.Store + informer cache.SharedIndexInformer + onReady []func() } // StartPodInformer launches a Pod informer filtered to spec.nodeName == @@ -39,13 +54,53 @@ func StartPodInformer(ctx context.Context, cfg *rest.Config, node string, logger informers.WithTweakListOptions(tweak)) inf := factory.Core().V1().Pods().Informer() + pc := &PodCache{store: inf.GetStore(), logger: logger, informer: inf} + + _, _ = inf.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + if pod, ok := obj.(*corev1.Pod); ok && podReady(pod) { + pc.fireReady() + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + oldP, _ := oldObj.(*corev1.Pod) + newP, _ := newObj.(*corev1.Pod) + if oldP == nil || newP == nil { + return + } + if podReady(oldP) != podReady(newP) { + pc.fireReady() + } + }, + DeleteFunc: func(obj interface{}) { + pc.fireReady() + }, + }) + logger.Info("Pod informer starting", "node", node, "field_selector", "spec.nodeName="+node) factory.Start(ctx.Done()) if !cache.WaitForCacheSync(ctx.Done(), inf.HasSynced) { return nil, fmt.Errorf("pod informer cache failed to sync") } logger.Info("Pod informer synced", "node", node, "items", len(inf.GetStore().ListKeys())) - return &PodCache{store: inf.GetStore(), logger: logger}, nil + return pc, nil +} + +// OnReadyChange registers a callback fired on every Pod readiness +// transition observed by the informer (and on Add when the pod is already +// Ready, and on Delete unconditionally). Used by the AnycastReconciler. +// +// Safe to call before Run; callbacks fire synchronously inside the +// informer's event handler so they should be cheap (the AnycastReconciler +// just sends to a coalescing channel). +func (c *PodCache) OnReadyChange(f func()) { + c.onReady = append(c.onReady, f) +} + +func (c *PodCache) fireReady() { + for _, f := range c.onReady { + f() + } } // Get looks up a Pod by namespace and name. Returns (nil, false) if absent. diff --git a/pkg/agent/runtime_linux.go b/pkg/agent/runtime_linux.go index 5954660..8d3c1ea 100644 --- a/pkg/agent/runtime_linux.go +++ b/pkg/agent/runtime_linux.go @@ -74,23 +74,31 @@ func (s *Server) configureRuntime(ctx context.Context) error { // Calico is fenced off this node (Tigera Installation CR adds a // nodeAffinity excluding flock.fritzlab.net/agent on // calicoNodeDaemonSet). flock now owns BGP from this host. - if err := bird.Render(nc, nil, nil, routerIDFromNodeIP(s.restCfg)); err != nil { + routerID := routerIDFromNodeIP(s.restCfg) + if err := bird.Render(nc, nil, nil, routerID); err != nil { s.Logger.Warn("initial bird render", "err", err) } + + // AnycastReconciler is the single owner of bird re-renders going + // forward. It runs every 2s + on Pod readiness changes + on each + // successful CNI ADD/DEL. + anycast := NewAnycastReconciler(s.Node, s.Store, pods, s.NodeConfig, bird, routerID, s.Logger) + pods.OnReadyChange(anycast.Trigger) + go anycast.Run(ctx) + + // Background tick for SummaryRoutes (idempotent) in case the kernel + // blackhole disappears for any reason. go func() { - t := time.NewTicker(15 * time.Second) + t := time.NewTicker(60 * time.Second) defer t.Stop() for { select { case <-ctx.Done(): return case <-t.C: - cur := s.NodeConfig.Load() - if cur == nil { - continue + if cur := s.NodeConfig.Load(); cur != nil { + _ = bird.SummaryRoutes(cur) } - _ = bird.SummaryRoutes(cur) - _ = bird.Render(cur, nil, nil, routerIDFromNodeIP(s.restCfg)) } } }() @@ -103,9 +111,7 @@ func (s *Server) configureRuntime(ctx context.Context) error { NodeConfig: s.NodeConfig, SetupFunc: Setup, TeardownFunc: Teardown, - AfterCommit: func() { - // Future: collect anycast IPs from store snapshot, re-render bird. - }, + AfterCommit: anycast.Trigger, } s.RPC.SetHandlers(handler.Add, handler.Del, handler.Check) s.Logger.Info("runtime ready",