Files
flock/pkg/agent/anycast_linux.go
T

251 lines
6.7 KiB
Go
Raw Normal View History

//go:build linux
package agent
import (
"context"
"errors"
"fmt"
"log/slog"
"net"
"sync"
"syscall"
"time"
flockv1alpha1 "code.fritzlab.net/fritzlab/flock/pkg/api/v1alpha1"
"github.com/vishvananda/netlink"
)
// AnycastReconciler keeps the kernel's anycast host routes and BIRD's
// advertised set in sync with (committed allocations × pod Ready).
//
// Lifecycle (per design doc):
// - CNI ADD assigns anycast IPs to pod lo (already done in netns_linux.go).
// - Pod transitions to Ready=True → install host /128 (or /32) route at
// `dev flock<8hex>` and add the IP to BIRD's export filter.
// - Pod transitions to Ready=False or DELETE → remove kernel route, remove
// from BIRD export.
//
// When more than one Ready pod on this node binds the same anycast IP, the
// kernel route uses RTA_MULTIPATH so the kernel does per-flow ECMP across
// the contributing pods. This is the within-node companion to BGP-level
// ECMP across nodes.
//
// Reconcile is idempotent. Triggers: AfterCommit hook, Pod informer
// UpdateFunc on Ready transitions, periodic 2s tick.
type AnycastReconciler struct {
Node string
Store *Store
Pods *PodCache
NodeConfig *NodeConfigCache
Bird *BirdManager
RouterID string
Logger *slog.Logger
mu sync.Mutex
advertised map[string]anycastTarget // canonical IP → install info
trigger chan struct{}
}
// NewAnycastReconciler returns a Reconciler ready to Run.
func NewAnycastReconciler(node string, store *Store, pods *PodCache, nc *NodeConfigCache, bird *BirdManager, routerID string, logger *slog.Logger) *AnycastReconciler {
return &AnycastReconciler{
Node: node,
Store: store,
Pods: pods,
NodeConfig: nc,
Bird: bird,
RouterID: routerID,
Logger: logger,
advertised: map[string]anycastTarget{},
trigger: make(chan struct{}, 1),
}
}
// Trigger requests one reconcile pass. Coalesces — if a pass is already
// pending, the call is a no-op.
func (r *AnycastReconciler) Trigger() {
select {
case r.trigger <- struct{}{}:
default:
}
}
// Run blocks until ctx is cancelled. Reconciles on Trigger or every 2s.
func (r *AnycastReconciler) Run(ctx context.Context) {
t := time.NewTicker(2 * time.Second)
defer t.Stop()
r.reconcile() // initial pass
for {
select {
case <-ctx.Done():
return
case <-t.C:
r.reconcile()
case <-r.trigger:
r.reconcile()
}
}
}
func (r *AnycastReconciler) reconcile() {
r.mu.Lock()
defer r.mu.Unlock()
desired := r.computeDesired()
// Install routes that should exist but don't, or whose nexthop set
// changed.
for ip, t := range desired {
if cur, ok := r.advertised[ip]; ok && cur.equal(t) {
continue
}
if err := installAnycastRoute(ip, t); err != nil {
r.Logger.Warn("anycast install", "ip", ip, "nexthops", len(t.nexthops), "err", err)
continue
}
r.Logger.Info("anycast advertise", "ip", ip, "nexthops", describeNexthops(t))
r.advertised[ip] = t
}
// Remove routes that exist but shouldn't.
for ip, t := range r.advertised {
if _, want := desired[ip]; !want {
if err := removeAnycastRoute(ip, t); err != nil {
r.Logger.Warn("anycast remove", "ip", ip, "err", err)
} else {
r.Logger.Info("anycast withdraw", "ip", ip)
}
delete(r.advertised, ip)
}
}
// Re-render BIRD with the active set.
r.renderBird(desired)
}
// computeDesired delegates to the pure resolveAnycastTargets and plugs in
// the live informer-based isReady callback.
func (r *AnycastReconciler) computeDesired() map[string]anycastTarget {
return resolveAnycastTargets(
r.Store.Snapshot(),
func(ns, name string) bool {
pod, ok := r.Pods.Get(ns, name)
return ok && podReady(pod)
},
func(s string) { r.Logger.Warn(s) },
)
}
func (r *AnycastReconciler) renderBird(desired map[string]anycastTarget) {
nc := r.NodeConfig.Load()
if nc == nil || r.Bird == nil {
return
}
var v6, v4 []string
for ipStr := range desired {
ip := net.ParseIP(ipStr)
if ip == nil {
continue
}
if ip.To4() != nil {
v4 = append(v4, ip.To4().String())
} else {
v6 = append(v6, ip.To16().String())
}
}
if err := r.Bird.Render(nc, v6, v4, r.RouterID); err != nil {
r.Logger.Warn("anycast bird render", "err", err)
}
}
// installAnycastRoute installs `<ipStr>/<128|32>` pointing at the
// nexthop set in t. With one nexthop the route is a plain via-route;
// with multiple, it's a multipath route using RTA_MULTIPATH so the
// kernel hashes flows across the constituent pods.
//
// Idempotent — RouteReplace overwrites a stale entry.
func installAnycastRoute(ipStr string, t anycastTarget) error {
ip := net.ParseIP(ipStr)
if ip == nil {
return fmt.Errorf("bad ip %q", ipStr)
}
if len(t.nexthops) == 0 {
return fmt.Errorf("anycast %s: no nexthops", ipStr)
}
prefix := 128
if ip.To4() != nil {
prefix = 32
ip = ip.To4()
}
r := &netlink.Route{Dst: cidrFor(ip, prefix)}
if len(t.nexthops) == 1 {
// Single nexthop — keep the route shape identical to today's
// production form. Functionally equivalent to a 1-element
// MultiPath but `ip route show` renders nicer for operators.
nh := t.nexthops[0]
link, err := netlink.LinkByName(nh.hostIface)
if err != nil {
return fmt.Errorf("lookup %s: %w", nh.hostIface, err)
}
r.LinkIndex = link.Attrs().Index
r.Gw = nh.via
} else {
hops := make([]*netlink.NexthopInfo, 0, len(t.nexthops))
for _, nh := range t.nexthops {
link, err := netlink.LinkByName(nh.hostIface)
if err != nil {
return fmt.Errorf("lookup %s: %w", nh.hostIface, err)
}
hops = append(hops, &netlink.NexthopInfo{
LinkIndex: link.Attrs().Index,
Gw: nh.via,
Hops: 0,
})
}
r.MultiPath = hops
}
return netlink.RouteReplace(r)
}
// removeAnycastRoute deletes the host route. Missing routes / interfaces
// are treated as success — DEL paths can race with veth teardown.
//
// Kernel route deletion matches by destination prefix; we don't need to
// re-specify the nexthop set.
func removeAnycastRoute(ipStr string, _ anycastTarget) error {
ip := net.ParseIP(ipStr)
if ip == nil {
return nil
}
prefix := 128
if ip.To4() != nil {
prefix = 32
ip = ip.To4()
}
r := &netlink.Route{Dst: cidrFor(ip, prefix)}
if err := netlink.RouteDel(r); err != nil {
// ESRCH ("no such process") is netlink-speak for "no such route";
// treat as success.
if errors.Is(err, syscall.ESRCH) || linkNotFound(err) {
return nil
}
return err
}
return nil
}
// describeNexthops returns a compact string for log messages.
func describeNexthops(t anycastTarget) string {
var s string
for i, nh := range t.nexthops {
if i > 0 {
s += ","
}
s += nh.hostIface + "→" + nh.via.String()
}
return s
}
// _ = flockv1alpha1 to silence unused import warnings on minimal builds.
var _ = flockv1alpha1.GroupName