Files
flock/pkg/agent/anycast_linux.go
T

215 lines
5.3 KiB
Go
Raw Normal View History

//go:build linux
package agent
import (
"context"
"fmt"
"log/slog"
"net"
"sync"
"time"
flockv1alpha1 "code.fritzlab.net/fritzlab/flock/pkg/api/v1alpha1"
"github.com/vishvananda/netlink"
)
// AnycastReconciler keeps the kernel's anycast host routes and BIRD's
// advertised set in sync with (committed allocations × pod Ready).
//
// Lifecycle (per design doc):
// - CNI ADD assigns anycast IPs to pod lo (already done in netns_linux.go).
// - Pod transitions to Ready=True → install host /128 (or /32) route at
// `dev flock<8hex>` and add the IP to BIRD's export filter.
// - Pod transitions to Ready=False or DELETE → remove kernel route, remove
// from BIRD export.
//
// Reconcile is idempotent. Triggers: AfterCommit hook, Pod informer
// UpdateFunc on Ready transitions, periodic 2s tick.
type AnycastReconciler struct {
Node string
Store *Store
Pods *PodCache
NodeConfig *NodeConfigCache
Bird *BirdManager
RouterID string
Logger *slog.Logger
mu sync.Mutex
advertised map[string]string // canonical IP → host iface name
trigger chan struct{}
}
// NewAnycastReconciler returns a Reconciler ready to Run.
func NewAnycastReconciler(node string, store *Store, pods *PodCache, nc *NodeConfigCache, bird *BirdManager, routerID string, logger *slog.Logger) *AnycastReconciler {
return &AnycastReconciler{
Node: node,
Store: store,
Pods: pods,
NodeConfig: nc,
Bird: bird,
RouterID: routerID,
Logger: logger,
advertised: map[string]string{},
trigger: make(chan struct{}, 1),
}
}
// Trigger requests one reconcile pass. Coalesces — if a pass is already
// pending, the call is a no-op.
func (r *AnycastReconciler) Trigger() {
select {
case r.trigger <- struct{}{}:
default:
}
}
// Run blocks until ctx is cancelled. Reconciles on Trigger or every 2s.
func (r *AnycastReconciler) Run(ctx context.Context) {
t := time.NewTicker(2 * time.Second)
defer t.Stop()
r.reconcile() // initial pass
for {
select {
case <-ctx.Done():
return
case <-t.C:
r.reconcile()
case <-r.trigger:
r.reconcile()
}
}
}
func (r *AnycastReconciler) reconcile() {
r.mu.Lock()
defer r.mu.Unlock()
desired := r.computeDesired()
// Install routes that should exist but don't.
for ip, host := range desired {
if r.advertised[ip] != host {
if err := installAnycastRoute(ip, host); err != nil {
r.Logger.Warn("anycast install", "ip", ip, "host", host, "err", err)
continue
}
r.Logger.Info("anycast advertise", "ip", ip, "host", host)
r.advertised[ip] = host
}
}
// Remove routes that exist but shouldn't.
for ip, host := range r.advertised {
if _, want := desired[ip]; !want {
if err := removeAnycastRoute(ip, host); err != nil {
r.Logger.Warn("anycast remove", "ip", ip, "host", host, "err", err)
} else {
r.Logger.Info("anycast withdraw", "ip", ip, "host", host)
}
delete(r.advertised, ip)
}
}
// Re-render BIRD with the active set.
r.renderBird(desired)
}
// computeDesired walks the Store and returns the {ip → host iface} map of
// anycast advertisements that should be active right now.
func (r *AnycastReconciler) computeDesired() map[string]string {
out := map[string]string{}
for _, a := range r.Store.Snapshot() {
if a.State != StateCommitted || len(a.Anycast) == 0 {
continue
}
pod, ok := r.Pods.Get(a.Namespace, a.PodName)
if !ok || !podReady(pod) {
continue
}
host := HostIfaceName(a.ContainerID)
for _, ipStr := range a.Anycast {
ip := net.ParseIP(ipStr)
if ip == nil {
continue
}
out[canonical(ip)] = host
}
}
return out
}
func (r *AnycastReconciler) renderBird(desired map[string]string) {
nc := r.NodeConfig.Load()
if nc == nil || r.Bird == nil {
return
}
var v6, v4 []string
for ipStr := range desired {
ip := net.ParseIP(ipStr)
if ip == nil {
continue
}
if ip.To4() != nil {
v4 = append(v4, ip.To4().String())
} else {
v6 = append(v6, ip.To16().String())
}
}
if err := r.Bird.Render(nc, v6, v4, r.RouterID); err != nil {
r.Logger.Warn("anycast bird render", "err", err)
}
}
// installAnycastRoute installs a host /128 (v6) or /32 (v4) pointing at
// the pod's host veth. Idempotent — RouteReplace overwrites.
func installAnycastRoute(ipStr, hostIface string) error {
ip := net.ParseIP(ipStr)
if ip == nil {
return fmt.Errorf("bad ip %q", ipStr)
}
link, err := netlink.LinkByName(hostIface)
if err != nil {
return fmt.Errorf("lookup %s: %w", hostIface, err)
}
prefix := 128
if ip.To4() != nil {
prefix = 32
ip = ip.To4()
}
r := &netlink.Route{
LinkIndex: link.Attrs().Index,
Scope: netlink.SCOPE_LINK,
Dst: cidrFor(ip, prefix),
}
return netlink.RouteReplace(r)
}
// removeAnycastRoute deletes the host route. Missing routes / interfaces
// are not errors.
func removeAnycastRoute(ipStr, hostIface string) error {
ip := net.ParseIP(ipStr)
if ip == nil {
return nil
}
link, err := netlink.LinkByName(hostIface)
if err != nil {
return nil // veth gone → route gone
}
prefix := 128
if ip.To4() != nil {
prefix = 32
ip = ip.To4()
}
r := &netlink.Route{
LinkIndex: link.Attrs().Index,
Dst: cidrFor(ip, prefix),
}
if err := netlink.RouteDel(r); err != nil && !linkNotFound(err) {
return err
}
return nil
}
// _ = flockv1alpha1 to silence unused import warnings on minimal builds.
var _ = flockv1alpha1.GroupName