2026-04-25 07:36:47 -05:00
|
|
|
|
//go:build linux
|
|
|
|
|
|
|
|
|
|
|
|
package agent
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
|
"context"
|
2026-04-25 08:02:51 -05:00
|
|
|
|
"errors"
|
2026-04-25 07:36:47 -05:00
|
|
|
|
"fmt"
|
|
|
|
|
|
"log/slog"
|
|
|
|
|
|
"net"
|
|
|
|
|
|
"sync"
|
2026-04-25 08:02:51 -05:00
|
|
|
|
"syscall"
|
2026-04-25 07:36:47 -05:00
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
|
|
flockv1alpha1 "code.fritzlab.net/fritzlab/flock/pkg/api/v1alpha1"
|
|
|
|
|
|
"github.com/vishvananda/netlink"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
// AnycastReconciler keeps the kernel's anycast host routes and BIRD's
|
|
|
|
|
|
// advertised set in sync with (committed allocations × pod Ready).
|
|
|
|
|
|
//
|
|
|
|
|
|
// Lifecycle (per design doc):
|
|
|
|
|
|
// - CNI ADD assigns anycast IPs to pod lo (already done in netns_linux.go).
|
|
|
|
|
|
// - Pod transitions to Ready=True → install host /128 (or /32) route at
|
|
|
|
|
|
// `dev flock<8hex>` and add the IP to BIRD's export filter.
|
|
|
|
|
|
// - Pod transitions to Ready=False or DELETE → remove kernel route, remove
|
|
|
|
|
|
// from BIRD export.
|
|
|
|
|
|
//
|
2026-04-25 09:57:32 -05:00
|
|
|
|
// When more than one Ready pod on this node binds the same anycast IP, the
|
|
|
|
|
|
// kernel route uses RTA_MULTIPATH so the kernel does per-flow ECMP across
|
|
|
|
|
|
// the contributing pods. This is the within-node companion to BGP-level
|
|
|
|
|
|
// ECMP across nodes.
|
|
|
|
|
|
//
|
2026-04-25 07:36:47 -05:00
|
|
|
|
// Reconcile is idempotent. Triggers: AfterCommit hook, Pod informer
|
|
|
|
|
|
// UpdateFunc on Ready transitions, periodic 2s tick.
|
|
|
|
|
|
type AnycastReconciler struct {
|
|
|
|
|
|
Node string
|
|
|
|
|
|
Store *Store
|
|
|
|
|
|
Pods *PodCache
|
|
|
|
|
|
NodeConfig *NodeConfigCache
|
|
|
|
|
|
Bird *BirdManager
|
|
|
|
|
|
RouterID string
|
|
|
|
|
|
Logger *slog.Logger
|
|
|
|
|
|
|
|
|
|
|
|
mu sync.Mutex
|
2026-04-25 08:02:51 -05:00
|
|
|
|
advertised map[string]anycastTarget // canonical IP → install info
|
2026-04-25 07:36:47 -05:00
|
|
|
|
trigger chan struct{}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// NewAnycastReconciler returns a Reconciler ready to Run.
|
|
|
|
|
|
func NewAnycastReconciler(node string, store *Store, pods *PodCache, nc *NodeConfigCache, bird *BirdManager, routerID string, logger *slog.Logger) *AnycastReconciler {
|
|
|
|
|
|
return &AnycastReconciler{
|
|
|
|
|
|
Node: node,
|
|
|
|
|
|
Store: store,
|
|
|
|
|
|
Pods: pods,
|
|
|
|
|
|
NodeConfig: nc,
|
|
|
|
|
|
Bird: bird,
|
|
|
|
|
|
RouterID: routerID,
|
|
|
|
|
|
Logger: logger,
|
2026-04-25 08:02:51 -05:00
|
|
|
|
advertised: map[string]anycastTarget{},
|
2026-04-25 07:36:47 -05:00
|
|
|
|
trigger: make(chan struct{}, 1),
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Trigger requests one reconcile pass. Coalesces — if a pass is already
|
|
|
|
|
|
// pending, the call is a no-op.
|
|
|
|
|
|
func (r *AnycastReconciler) Trigger() {
|
|
|
|
|
|
select {
|
|
|
|
|
|
case r.trigger <- struct{}{}:
|
|
|
|
|
|
default:
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Run blocks until ctx is cancelled. Reconciles on Trigger or every 2s.
|
|
|
|
|
|
func (r *AnycastReconciler) Run(ctx context.Context) {
|
|
|
|
|
|
t := time.NewTicker(2 * time.Second)
|
|
|
|
|
|
defer t.Stop()
|
|
|
|
|
|
r.reconcile() // initial pass
|
|
|
|
|
|
for {
|
|
|
|
|
|
select {
|
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
|
return
|
|
|
|
|
|
case <-t.C:
|
|
|
|
|
|
r.reconcile()
|
|
|
|
|
|
case <-r.trigger:
|
|
|
|
|
|
r.reconcile()
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (r *AnycastReconciler) reconcile() {
|
|
|
|
|
|
r.mu.Lock()
|
|
|
|
|
|
defer r.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
desired := r.computeDesired()
|
|
|
|
|
|
|
2026-04-25 09:57:32 -05:00
|
|
|
|
// Install routes that should exist but don't, or whose nexthop set
|
|
|
|
|
|
// changed.
|
2026-04-25 08:02:51 -05:00
|
|
|
|
for ip, t := range desired {
|
2026-04-25 09:57:32 -05:00
|
|
|
|
if cur, ok := r.advertised[ip]; ok && cur.equal(t) {
|
2026-04-25 08:02:51 -05:00
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
if err := installAnycastRoute(ip, t); err != nil {
|
2026-04-25 09:57:32 -05:00
|
|
|
|
r.Logger.Warn("anycast install", "ip", ip, "nexthops", len(t.nexthops), "err", err)
|
2026-04-25 08:02:51 -05:00
|
|
|
|
continue
|
2026-04-25 07:36:47 -05:00
|
|
|
|
}
|
2026-04-25 09:57:32 -05:00
|
|
|
|
r.Logger.Info("anycast advertise", "ip", ip, "nexthops", describeNexthops(t))
|
2026-04-25 08:02:51 -05:00
|
|
|
|
r.advertised[ip] = t
|
2026-04-25 07:36:47 -05:00
|
|
|
|
}
|
|
|
|
|
|
// Remove routes that exist but shouldn't.
|
2026-04-25 08:02:51 -05:00
|
|
|
|
for ip, t := range r.advertised {
|
2026-04-25 07:36:47 -05:00
|
|
|
|
if _, want := desired[ip]; !want {
|
2026-04-25 08:02:51 -05:00
|
|
|
|
if err := removeAnycastRoute(ip, t); err != nil {
|
2026-04-25 09:57:32 -05:00
|
|
|
|
r.Logger.Warn("anycast remove", "ip", ip, "err", err)
|
2026-04-25 07:36:47 -05:00
|
|
|
|
} else {
|
2026-04-25 09:57:32 -05:00
|
|
|
|
r.Logger.Info("anycast withdraw", "ip", ip)
|
2026-04-25 07:36:47 -05:00
|
|
|
|
}
|
|
|
|
|
|
delete(r.advertised, ip)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Re-render BIRD with the active set.
|
|
|
|
|
|
r.renderBird(desired)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-25 09:57:32 -05:00
|
|
|
|
// computeDesired delegates to the pure resolveAnycastTargets and plugs in
|
|
|
|
|
|
// the live informer-based isReady callback.
|
2026-04-25 08:02:51 -05:00
|
|
|
|
func (r *AnycastReconciler) computeDesired() map[string]anycastTarget {
|
2026-04-25 09:57:32 -05:00
|
|
|
|
return resolveAnycastTargets(
|
|
|
|
|
|
r.Store.Snapshot(),
|
|
|
|
|
|
func(ns, name string) bool {
|
|
|
|
|
|
pod, ok := r.Pods.Get(ns, name)
|
2026-04-25 22:24:50 -05:00
|
|
|
|
return ok && podAnycastEligible(pod)
|
2026-04-25 09:57:32 -05:00
|
|
|
|
},
|
|
|
|
|
|
func(s string) { r.Logger.Warn(s) },
|
|
|
|
|
|
)
|
2026-04-25 07:36:47 -05:00
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-25 08:02:51 -05:00
|
|
|
|
func (r *AnycastReconciler) renderBird(desired map[string]anycastTarget) {
|
2026-04-25 07:36:47 -05:00
|
|
|
|
nc := r.NodeConfig.Load()
|
|
|
|
|
|
if nc == nil || r.Bird == nil {
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
var v6, v4 []string
|
2026-04-29 09:46:48 -05:00
|
|
|
|
seen := map[string]struct{}{}
|
|
|
|
|
|
add := func(ip net.IP) {
|
|
|
|
|
|
key := canonical(ip)
|
|
|
|
|
|
if _, dup := seen[key]; dup {
|
|
|
|
|
|
return
|
2026-04-25 07:36:47 -05:00
|
|
|
|
}
|
2026-04-29 09:46:48 -05:00
|
|
|
|
seen[key] = struct{}{}
|
2026-04-25 07:36:47 -05:00
|
|
|
|
if ip.To4() != nil {
|
|
|
|
|
|
v4 = append(v4, ip.To4().String())
|
|
|
|
|
|
} else {
|
|
|
|
|
|
v6 = append(v6, ip.To16().String())
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
2026-04-29 09:46:48 -05:00
|
|
|
|
for ipStr := range desired {
|
|
|
|
|
|
if ip := net.ParseIP(ipStr); ip != nil {
|
|
|
|
|
|
add(ip)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
// A pod IP that lives outside the node's BGP aggregate (e.g. an
|
|
|
|
|
|
// addresses-annotation IP promoted to be the pod's primary v4 — Plex's
|
|
|
|
|
|
// 142.202.202.166 against host004's 172.25.214.0/24) is not naturally
|
|
|
|
|
|
// covered by the aggregate, so it must be advertised individually as a
|
|
|
|
|
|
// /32 or /128. Anycast and addresses extras are already covered by the
|
|
|
|
|
|
// `desired` loop above; this sweep is for promoted-primary IPs which do
|
|
|
|
|
|
// not flow through the AnycastReconciler.
|
|
|
|
|
|
nodeV6, nodeV4 := parseNodeCIDRs(nc)
|
|
|
|
|
|
for _, a := range r.Store.Snapshot() {
|
|
|
|
|
|
if a.State != StateCommitted {
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
if ip := net.ParseIP(a.IP6); ip != nil && !ipInAny(ip, nodeV6) {
|
|
|
|
|
|
add(ip)
|
|
|
|
|
|
}
|
|
|
|
|
|
if ip := net.ParseIP(a.IP4); ip != nil && !ipInAny(ip, nodeV4) {
|
|
|
|
|
|
add(ip)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
2026-04-25 07:36:47 -05:00
|
|
|
|
if err := r.Bird.Render(nc, v6, v4, r.RouterID); err != nil {
|
|
|
|
|
|
r.Logger.Warn("anycast bird render", "err", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-29 09:46:48 -05:00
|
|
|
|
// parseNodeCIDRs parses NodeConfig.Spec.CIDR6/4 strings into IPNets,
|
|
|
|
|
|
// silently dropping malformed entries (admission-time validation should
|
|
|
|
|
|
// have rejected them long before this point).
|
|
|
|
|
|
func parseNodeCIDRs(nc *flockv1alpha1.NodeConfig) (v6, v4 []*net.IPNet) {
|
|
|
|
|
|
for _, s := range nc.Spec.CIDR6 {
|
|
|
|
|
|
if _, n, err := net.ParseCIDR(s); err == nil {
|
|
|
|
|
|
v6 = append(v6, n)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
for _, s := range nc.Spec.CIDR4 {
|
|
|
|
|
|
if _, n, err := net.ParseCIDR(s); err == nil {
|
|
|
|
|
|
v4 = append(v4, n)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func ipInAny(ip net.IP, nets []*net.IPNet) bool {
|
|
|
|
|
|
for _, n := range nets {
|
|
|
|
|
|
if n.Contains(ip) {
|
|
|
|
|
|
return true
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
return false
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-25 09:57:32 -05:00
|
|
|
|
// installAnycastRoute installs `<ipStr>/<128|32>` pointing at the
|
|
|
|
|
|
// nexthop set in t. With one nexthop the route is a plain via-route;
|
|
|
|
|
|
// with multiple, it's a multipath route using RTA_MULTIPATH so the
|
|
|
|
|
|
// kernel hashes flows across the constituent pods.
|
|
|
|
|
|
//
|
2026-04-25 08:02:51 -05:00
|
|
|
|
// Idempotent — RouteReplace overwrites a stale entry.
|
|
|
|
|
|
func installAnycastRoute(ipStr string, t anycastTarget) error {
|
2026-04-25 07:36:47 -05:00
|
|
|
|
ip := net.ParseIP(ipStr)
|
|
|
|
|
|
if ip == nil {
|
|
|
|
|
|
return fmt.Errorf("bad ip %q", ipStr)
|
|
|
|
|
|
}
|
2026-04-25 09:57:32 -05:00
|
|
|
|
if len(t.nexthops) == 0 {
|
|
|
|
|
|
return fmt.Errorf("anycast %s: no nexthops", ipStr)
|
2026-04-25 07:36:47 -05:00
|
|
|
|
}
|
|
|
|
|
|
prefix := 128
|
|
|
|
|
|
if ip.To4() != nil {
|
|
|
|
|
|
prefix = 32
|
|
|
|
|
|
ip = ip.To4()
|
|
|
|
|
|
}
|
2026-04-25 09:57:32 -05:00
|
|
|
|
r := &netlink.Route{Dst: cidrFor(ip, prefix)}
|
|
|
|
|
|
if len(t.nexthops) == 1 {
|
|
|
|
|
|
// Single nexthop — keep the route shape identical to today's
|
|
|
|
|
|
// production form. Functionally equivalent to a 1-element
|
|
|
|
|
|
// MultiPath but `ip route show` renders nicer for operators.
|
|
|
|
|
|
nh := t.nexthops[0]
|
|
|
|
|
|
link, err := netlink.LinkByName(nh.hostIface)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return fmt.Errorf("lookup %s: %w", nh.hostIface, err)
|
|
|
|
|
|
}
|
|
|
|
|
|
r.LinkIndex = link.Attrs().Index
|
|
|
|
|
|
r.Gw = nh.via
|
|
|
|
|
|
} else {
|
|
|
|
|
|
hops := make([]*netlink.NexthopInfo, 0, len(t.nexthops))
|
|
|
|
|
|
for _, nh := range t.nexthops {
|
|
|
|
|
|
link, err := netlink.LinkByName(nh.hostIface)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return fmt.Errorf("lookup %s: %w", nh.hostIface, err)
|
|
|
|
|
|
}
|
|
|
|
|
|
hops = append(hops, &netlink.NexthopInfo{
|
|
|
|
|
|
LinkIndex: link.Attrs().Index,
|
|
|
|
|
|
Gw: nh.via,
|
|
|
|
|
|
Hops: 0,
|
|
|
|
|
|
})
|
|
|
|
|
|
}
|
|
|
|
|
|
r.MultiPath = hops
|
2026-04-25 07:36:47 -05:00
|
|
|
|
}
|
|
|
|
|
|
return netlink.RouteReplace(r)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// removeAnycastRoute deletes the host route. Missing routes / interfaces
|
2026-04-25 08:02:51 -05:00
|
|
|
|
// are treated as success — DEL paths can race with veth teardown.
|
2026-04-25 09:57:32 -05:00
|
|
|
|
//
|
|
|
|
|
|
// Kernel route deletion matches by destination prefix; we don't need to
|
|
|
|
|
|
// re-specify the nexthop set.
|
|
|
|
|
|
func removeAnycastRoute(ipStr string, _ anycastTarget) error {
|
2026-04-25 07:36:47 -05:00
|
|
|
|
ip := net.ParseIP(ipStr)
|
|
|
|
|
|
if ip == nil {
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
prefix := 128
|
|
|
|
|
|
if ip.To4() != nil {
|
|
|
|
|
|
prefix = 32
|
|
|
|
|
|
ip = ip.To4()
|
|
|
|
|
|
}
|
2026-04-25 09:57:32 -05:00
|
|
|
|
r := &netlink.Route{Dst: cidrFor(ip, prefix)}
|
2026-04-25 08:02:51 -05:00
|
|
|
|
if err := netlink.RouteDel(r); err != nil {
|
|
|
|
|
|
// ESRCH ("no such process") is netlink-speak for "no such route";
|
|
|
|
|
|
// treat as success.
|
|
|
|
|
|
if errors.Is(err, syscall.ESRCH) || linkNotFound(err) {
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
2026-04-25 07:36:47 -05:00
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-25 09:57:32 -05:00
|
|
|
|
// describeNexthops returns a compact string for log messages.
|
|
|
|
|
|
func describeNexthops(t anycastTarget) string {
|
|
|
|
|
|
var s string
|
|
|
|
|
|
for i, nh := range t.nexthops {
|
|
|
|
|
|
if i > 0 {
|
|
|
|
|
|
s += ","
|
|
|
|
|
|
}
|
|
|
|
|
|
s += nh.hostIface + "→" + nh.via.String()
|
|
|
|
|
|
}
|
|
|
|
|
|
return s
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-25 07:36:47 -05:00
|
|
|
|
// _ = flockv1alpha1 to silence unused import warnings on minimal builds.
|
|
|
|
|
|
var _ = flockv1alpha1.GroupName
|