M6: anycast — pod lo + Ready-gated /128/32 + BIRD export
Build flock Image / build (push) Has been cancelled
Build flock Image / build (push) Has been cancelled
CNI ADD now adds anycast IPs to the pod's lo interface (NOT eth0 — design
doc rationale: avoid NDP/ARP DAD conflicts when N replicas share an IP).
Allocation persists the anycast list.
AnycastReconciler:
desired = { ip → flock<8hex> } from
committed allocations × pod.Status.PodReady=True
diff against advertised, install/remove host /128 (v6) or /32 (v4)
re-render bird.conf with the active set
Triggers: 2s tick, AfterCommit (per ADD/DEL), Pod informer Ready
transitions (PodCache.OnReadyChange callback).
The bird template already supported Anycast6/Anycast4 via the export
filter — this turn finally drives those slices from runtime.
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,214 @@
|
||||
//go:build linux
|
||||
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
flockv1alpha1 "code.fritzlab.net/fritzlab/flock/pkg/api/v1alpha1"
|
||||
"github.com/vishvananda/netlink"
|
||||
)
|
||||
|
||||
// AnycastReconciler keeps the kernel's anycast host routes and BIRD's
|
||||
// advertised set in sync with (committed allocations × pod Ready).
|
||||
//
|
||||
// Lifecycle (per design doc):
|
||||
// - CNI ADD assigns anycast IPs to pod lo (already done in netns_linux.go).
|
||||
// - Pod transitions to Ready=True → install host /128 (or /32) route at
|
||||
// `dev flock<8hex>` and add the IP to BIRD's export filter.
|
||||
// - Pod transitions to Ready=False or DELETE → remove kernel route, remove
|
||||
// from BIRD export.
|
||||
//
|
||||
// Reconcile is idempotent. Triggers: AfterCommit hook, Pod informer
|
||||
// UpdateFunc on Ready transitions, periodic 2s tick.
|
||||
type AnycastReconciler struct {
|
||||
Node string
|
||||
Store *Store
|
||||
Pods *PodCache
|
||||
NodeConfig *NodeConfigCache
|
||||
Bird *BirdManager
|
||||
RouterID string
|
||||
Logger *slog.Logger
|
||||
|
||||
mu sync.Mutex
|
||||
advertised map[string]string // canonical IP → host iface name
|
||||
trigger chan struct{}
|
||||
}
|
||||
|
||||
// NewAnycastReconciler returns a Reconciler ready to Run.
|
||||
func NewAnycastReconciler(node string, store *Store, pods *PodCache, nc *NodeConfigCache, bird *BirdManager, routerID string, logger *slog.Logger) *AnycastReconciler {
|
||||
return &AnycastReconciler{
|
||||
Node: node,
|
||||
Store: store,
|
||||
Pods: pods,
|
||||
NodeConfig: nc,
|
||||
Bird: bird,
|
||||
RouterID: routerID,
|
||||
Logger: logger,
|
||||
advertised: map[string]string{},
|
||||
trigger: make(chan struct{}, 1),
|
||||
}
|
||||
}
|
||||
|
||||
// Trigger requests one reconcile pass. Coalesces — if a pass is already
|
||||
// pending, the call is a no-op.
|
||||
func (r *AnycastReconciler) Trigger() {
|
||||
select {
|
||||
case r.trigger <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// Run blocks until ctx is cancelled. Reconciles on Trigger or every 2s.
|
||||
func (r *AnycastReconciler) Run(ctx context.Context) {
|
||||
t := time.NewTicker(2 * time.Second)
|
||||
defer t.Stop()
|
||||
r.reconcile() // initial pass
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
r.reconcile()
|
||||
case <-r.trigger:
|
||||
r.reconcile()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *AnycastReconciler) reconcile() {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
desired := r.computeDesired()
|
||||
|
||||
// Install routes that should exist but don't.
|
||||
for ip, host := range desired {
|
||||
if r.advertised[ip] != host {
|
||||
if err := installAnycastRoute(ip, host); err != nil {
|
||||
r.Logger.Warn("anycast install", "ip", ip, "host", host, "err", err)
|
||||
continue
|
||||
}
|
||||
r.Logger.Info("anycast advertise", "ip", ip, "host", host)
|
||||
r.advertised[ip] = host
|
||||
}
|
||||
}
|
||||
// Remove routes that exist but shouldn't.
|
||||
for ip, host := range r.advertised {
|
||||
if _, want := desired[ip]; !want {
|
||||
if err := removeAnycastRoute(ip, host); err != nil {
|
||||
r.Logger.Warn("anycast remove", "ip", ip, "host", host, "err", err)
|
||||
} else {
|
||||
r.Logger.Info("anycast withdraw", "ip", ip, "host", host)
|
||||
}
|
||||
delete(r.advertised, ip)
|
||||
}
|
||||
}
|
||||
|
||||
// Re-render BIRD with the active set.
|
||||
r.renderBird(desired)
|
||||
}
|
||||
|
||||
// computeDesired walks the Store and returns the {ip → host iface} map of
|
||||
// anycast advertisements that should be active right now.
|
||||
func (r *AnycastReconciler) computeDesired() map[string]string {
|
||||
out := map[string]string{}
|
||||
for _, a := range r.Store.Snapshot() {
|
||||
if a.State != StateCommitted || len(a.Anycast) == 0 {
|
||||
continue
|
||||
}
|
||||
pod, ok := r.Pods.Get(a.Namespace, a.PodName)
|
||||
if !ok || !podReady(pod) {
|
||||
continue
|
||||
}
|
||||
host := HostIfaceName(a.ContainerID)
|
||||
for _, ipStr := range a.Anycast {
|
||||
ip := net.ParseIP(ipStr)
|
||||
if ip == nil {
|
||||
continue
|
||||
}
|
||||
out[canonical(ip)] = host
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func (r *AnycastReconciler) renderBird(desired map[string]string) {
|
||||
nc := r.NodeConfig.Load()
|
||||
if nc == nil || r.Bird == nil {
|
||||
return
|
||||
}
|
||||
var v6, v4 []string
|
||||
for ipStr := range desired {
|
||||
ip := net.ParseIP(ipStr)
|
||||
if ip == nil {
|
||||
continue
|
||||
}
|
||||
if ip.To4() != nil {
|
||||
v4 = append(v4, ip.To4().String())
|
||||
} else {
|
||||
v6 = append(v6, ip.To16().String())
|
||||
}
|
||||
}
|
||||
if err := r.Bird.Render(nc, v6, v4, r.RouterID); err != nil {
|
||||
r.Logger.Warn("anycast bird render", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// installAnycastRoute installs a host /128 (v6) or /32 (v4) pointing at
|
||||
// the pod's host veth. Idempotent — RouteReplace overwrites.
|
||||
func installAnycastRoute(ipStr, hostIface string) error {
|
||||
ip := net.ParseIP(ipStr)
|
||||
if ip == nil {
|
||||
return fmt.Errorf("bad ip %q", ipStr)
|
||||
}
|
||||
link, err := netlink.LinkByName(hostIface)
|
||||
if err != nil {
|
||||
return fmt.Errorf("lookup %s: %w", hostIface, err)
|
||||
}
|
||||
prefix := 128
|
||||
if ip.To4() != nil {
|
||||
prefix = 32
|
||||
ip = ip.To4()
|
||||
}
|
||||
r := &netlink.Route{
|
||||
LinkIndex: link.Attrs().Index,
|
||||
Scope: netlink.SCOPE_LINK,
|
||||
Dst: cidrFor(ip, prefix),
|
||||
}
|
||||
return netlink.RouteReplace(r)
|
||||
}
|
||||
|
||||
// removeAnycastRoute deletes the host route. Missing routes / interfaces
|
||||
// are not errors.
|
||||
func removeAnycastRoute(ipStr, hostIface string) error {
|
||||
ip := net.ParseIP(ipStr)
|
||||
if ip == nil {
|
||||
return nil
|
||||
}
|
||||
link, err := netlink.LinkByName(hostIface)
|
||||
if err != nil {
|
||||
return nil // veth gone → route gone
|
||||
}
|
||||
prefix := 128
|
||||
if ip.To4() != nil {
|
||||
prefix = 32
|
||||
ip = ip.To4()
|
||||
}
|
||||
r := &netlink.Route{
|
||||
LinkIndex: link.Attrs().Index,
|
||||
Dst: cidrFor(ip, prefix),
|
||||
}
|
||||
if err := netlink.RouteDel(r); err != nil && !linkNotFound(err) {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// _ = flockv1alpha1 to silence unused import warnings on minimal builds.
|
||||
var _ = flockv1alpha1.GroupName
|
||||
Reference in New Issue
Block a user