anycast: revert to lo + add via=pod-eth0 next-hop on host route
Build flock Image / build (push) Has been cancelled
Build flock Image / build (push) Has been cancelled
Reverts the eth0-placement hack from e1e9544. The design doc's lo
placement is correct.
Real fix: the host's anycast /128 (or /32) route now uses the pod's own
eth0 unicast IP (same family) as the route's `via` next-hop. The kernel
then does NDP/ARP for that eth0 IP — which IS configured on the pod's
eth0 — so the pod responds normally with no proxy_ndp / proxy_arp
trickery on the anycast IP itself.
ip -6 route add <anycast>/128 via <pod-eth0-v6> dev flock<8hex>
ip -4 route add <anycast>/32 via <pod-eth0-v4> dev flock<8hex>
Validation: an anycast IP whose family the pod doesn't have a unicast
for is skipped with a warn (an v4 anycast on an IPv6-only pod cannot be
NDP-resolved this way; require dual-stack).
Bonus cleanup: ESRCH from RouteDel is treated as success (idempotent).
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
+69
-32
@@ -4,10 +4,12 @@ package agent
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net"
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
flockv1alpha1 "code.fritzlab.net/fritzlab/flock/pkg/api/v1alpha1"
|
flockv1alpha1 "code.fritzlab.net/fritzlab/flock/pkg/api/v1alpha1"
|
||||||
@@ -36,10 +38,17 @@ type AnycastReconciler struct {
|
|||||||
Logger *slog.Logger
|
Logger *slog.Logger
|
||||||
|
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
advertised map[string]string // canonical IP → host iface name
|
advertised map[string]anycastTarget // canonical IP → install info
|
||||||
trigger chan struct{}
|
trigger chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// anycastTarget describes the kernel route shape for one advertised
|
||||||
|
// anycast IP: which veth, and which pod eth0 IP to use as next-hop.
|
||||||
|
type anycastTarget struct {
|
||||||
|
hostIface string
|
||||||
|
via net.IP
|
||||||
|
}
|
||||||
|
|
||||||
// NewAnycastReconciler returns a Reconciler ready to Run.
|
// NewAnycastReconciler returns a Reconciler ready to Run.
|
||||||
func NewAnycastReconciler(node string, store *Store, pods *PodCache, nc *NodeConfigCache, bird *BirdManager, routerID string, logger *slog.Logger) *AnycastReconciler {
|
func NewAnycastReconciler(node string, store *Store, pods *PodCache, nc *NodeConfigCache, bird *BirdManager, routerID string, logger *slog.Logger) *AnycastReconciler {
|
||||||
return &AnycastReconciler{
|
return &AnycastReconciler{
|
||||||
@@ -50,7 +59,7 @@ func NewAnycastReconciler(node string, store *Store, pods *PodCache, nc *NodeCon
|
|||||||
Bird: bird,
|
Bird: bird,
|
||||||
RouterID: routerID,
|
RouterID: routerID,
|
||||||
Logger: logger,
|
Logger: logger,
|
||||||
advertised: map[string]string{},
|
advertised: map[string]anycastTarget{},
|
||||||
trigger: make(chan struct{}, 1),
|
trigger: make(chan struct{}, 1),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -87,24 +96,25 @@ func (r *AnycastReconciler) reconcile() {
|
|||||||
|
|
||||||
desired := r.computeDesired()
|
desired := r.computeDesired()
|
||||||
|
|
||||||
// Install routes that should exist but don't.
|
// Install routes that should exist but don't (or whose target changed).
|
||||||
for ip, host := range desired {
|
for ip, t := range desired {
|
||||||
if r.advertised[ip] != host {
|
if cur, ok := r.advertised[ip]; ok && cur.hostIface == t.hostIface && cur.via.Equal(t.via) {
|
||||||
if err := installAnycastRoute(ip, host); err != nil {
|
continue
|
||||||
r.Logger.Warn("anycast install", "ip", ip, "host", host, "err", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
r.Logger.Info("anycast advertise", "ip", ip, "host", host)
|
|
||||||
r.advertised[ip] = host
|
|
||||||
}
|
}
|
||||||
|
if err := installAnycastRoute(ip, t); err != nil {
|
||||||
|
r.Logger.Warn("anycast install", "ip", ip, "host", t.hostIface, "via", t.via, "err", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
r.Logger.Info("anycast advertise", "ip", ip, "host", t.hostIface, "via", t.via)
|
||||||
|
r.advertised[ip] = t
|
||||||
}
|
}
|
||||||
// Remove routes that exist but shouldn't.
|
// Remove routes that exist but shouldn't.
|
||||||
for ip, host := range r.advertised {
|
for ip, t := range r.advertised {
|
||||||
if _, want := desired[ip]; !want {
|
if _, want := desired[ip]; !want {
|
||||||
if err := removeAnycastRoute(ip, host); err != nil {
|
if err := removeAnycastRoute(ip, t); err != nil {
|
||||||
r.Logger.Warn("anycast remove", "ip", ip, "host", host, "err", err)
|
r.Logger.Warn("anycast remove", "ip", ip, "host", t.hostIface, "err", err)
|
||||||
} else {
|
} else {
|
||||||
r.Logger.Info("anycast withdraw", "ip", ip, "host", host)
|
r.Logger.Info("anycast withdraw", "ip", ip, "host", t.hostIface)
|
||||||
}
|
}
|
||||||
delete(r.advertised, ip)
|
delete(r.advertised, ip)
|
||||||
}
|
}
|
||||||
@@ -114,10 +124,13 @@ func (r *AnycastReconciler) reconcile() {
|
|||||||
r.renderBird(desired)
|
r.renderBird(desired)
|
||||||
}
|
}
|
||||||
|
|
||||||
// computeDesired walks the Store and returns the {ip → host iface} map of
|
// computeDesired walks the Store and returns the per-ip anycastTarget for
|
||||||
// anycast advertisements that should be active right now.
|
// every anycast advertisement that should be active right now. Each target
|
||||||
func (r *AnycastReconciler) computeDesired() map[string]string {
|
// uses the pod's own eth0 IP (same family) as the route's `via` next-hop —
|
||||||
out := map[string]string{}
|
// that way kernel NDP/ARP resolves the eth0 address, which IS configured
|
||||||
|
// on the pod's eth0, so the pod responds normally without proxy_ndp.
|
||||||
|
func (r *AnycastReconciler) computeDesired() map[string]anycastTarget {
|
||||||
|
out := map[string]anycastTarget{}
|
||||||
for _, a := range r.Store.Snapshot() {
|
for _, a := range r.Store.Snapshot() {
|
||||||
if a.State != StateCommitted || len(a.Anycast) == 0 {
|
if a.State != StateCommitted || len(a.Anycast) == 0 {
|
||||||
continue
|
continue
|
||||||
@@ -127,18 +140,31 @@ func (r *AnycastReconciler) computeDesired() map[string]string {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
host := HostIfaceName(a.ContainerID)
|
host := HostIfaceName(a.ContainerID)
|
||||||
|
via6 := net.ParseIP(a.IP6)
|
||||||
|
via4 := net.ParseIP(a.IP4)
|
||||||
for _, ipStr := range a.Anycast {
|
for _, ipStr := range a.Anycast {
|
||||||
ip := net.ParseIP(ipStr)
|
ip := net.ParseIP(ipStr)
|
||||||
if ip == nil {
|
if ip == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
out[canonical(ip)] = host
|
var via net.IP
|
||||||
|
if ip.To4() != nil {
|
||||||
|
via = via4
|
||||||
|
} else {
|
||||||
|
via = via6
|
||||||
|
}
|
||||||
|
if via == nil {
|
||||||
|
r.Logger.Warn("anycast skipped: pod has no unicast IP of same family",
|
||||||
|
"pod", a.Namespace+"/"+a.PodName, "anycast", ipStr)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
out[canonical(ip)] = anycastTarget{hostIface: host, via: via}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *AnycastReconciler) renderBird(desired map[string]string) {
|
func (r *AnycastReconciler) renderBird(desired map[string]anycastTarget) {
|
||||||
nc := r.NodeConfig.Load()
|
nc := r.NodeConfig.Load()
|
||||||
if nc == nil || r.Bird == nil {
|
if nc == nil || r.Bird == nil {
|
||||||
return
|
return
|
||||||
@@ -160,16 +186,16 @@ func (r *AnycastReconciler) renderBird(desired map[string]string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// installAnycastRoute installs a host /128 (v6) or /32 (v4) pointing at
|
// installAnycastRoute installs `<ipStr>/<128|32> via t.via dev t.hostIface`.
|
||||||
// the pod's host veth. Idempotent — RouteReplace overwrites.
|
// Idempotent — RouteReplace overwrites a stale entry.
|
||||||
func installAnycastRoute(ipStr, hostIface string) error {
|
func installAnycastRoute(ipStr string, t anycastTarget) error {
|
||||||
ip := net.ParseIP(ipStr)
|
ip := net.ParseIP(ipStr)
|
||||||
if ip == nil {
|
if ip == nil {
|
||||||
return fmt.Errorf("bad ip %q", ipStr)
|
return fmt.Errorf("bad ip %q", ipStr)
|
||||||
}
|
}
|
||||||
link, err := netlink.LinkByName(hostIface)
|
link, err := netlink.LinkByName(t.hostIface)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("lookup %s: %w", hostIface, err)
|
return fmt.Errorf("lookup %s: %w", t.hostIface, err)
|
||||||
}
|
}
|
||||||
prefix := 128
|
prefix := 128
|
||||||
if ip.To4() != nil {
|
if ip.To4() != nil {
|
||||||
@@ -178,22 +204,27 @@ func installAnycastRoute(ipStr, hostIface string) error {
|
|||||||
}
|
}
|
||||||
r := &netlink.Route{
|
r := &netlink.Route{
|
||||||
LinkIndex: link.Attrs().Index,
|
LinkIndex: link.Attrs().Index,
|
||||||
Scope: netlink.SCOPE_LINK,
|
|
||||||
Dst: cidrFor(ip, prefix),
|
Dst: cidrFor(ip, prefix),
|
||||||
|
Gw: t.via,
|
||||||
|
// SCOPE_UNIVERSE — the gateway is on a different "logical" subnet
|
||||||
|
// than the local /128 route, but reachable on this veth. Linux is
|
||||||
|
// happy as long as the veth has IPv6 forwarding on (it does — set
|
||||||
|
// in configureHostSide) and the pod's eth0 has the via address
|
||||||
|
// (also true — that's the pod's IP6/IP4 we allocated).
|
||||||
}
|
}
|
||||||
return netlink.RouteReplace(r)
|
return netlink.RouteReplace(r)
|
||||||
}
|
}
|
||||||
|
|
||||||
// removeAnycastRoute deletes the host route. Missing routes / interfaces
|
// removeAnycastRoute deletes the host route. Missing routes / interfaces
|
||||||
// are not errors.
|
// are treated as success — DEL paths can race with veth teardown.
|
||||||
func removeAnycastRoute(ipStr, hostIface string) error {
|
func removeAnycastRoute(ipStr string, t anycastTarget) error {
|
||||||
ip := net.ParseIP(ipStr)
|
ip := net.ParseIP(ipStr)
|
||||||
if ip == nil {
|
if ip == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
link, err := netlink.LinkByName(hostIface)
|
link, err := netlink.LinkByName(t.hostIface)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil // veth gone → route gone
|
return nil
|
||||||
}
|
}
|
||||||
prefix := 128
|
prefix := 128
|
||||||
if ip.To4() != nil {
|
if ip.To4() != nil {
|
||||||
@@ -203,8 +234,14 @@ func removeAnycastRoute(ipStr, hostIface string) error {
|
|||||||
r := &netlink.Route{
|
r := &netlink.Route{
|
||||||
LinkIndex: link.Attrs().Index,
|
LinkIndex: link.Attrs().Index,
|
||||||
Dst: cidrFor(ip, prefix),
|
Dst: cidrFor(ip, prefix),
|
||||||
|
Gw: t.via,
|
||||||
}
|
}
|
||||||
if err := netlink.RouteDel(r); err != nil && !linkNotFound(err) {
|
if err := netlink.RouteDel(r); err != nil {
|
||||||
|
// ESRCH ("no such process") is netlink-speak for "no such route";
|
||||||
|
// treat as success.
|
||||||
|
if errors.Is(err, syscall.ESRCH) || linkNotFound(err) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
+14
-13
@@ -241,18 +241,19 @@ func configurePodSide(req SetupRequest) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Anycast: assign each IP to pod eth0 (NOT lo).
|
// Anycast: assign each IP to pod lo, per design doc. NDP/ARP for
|
||||||
//
|
// the anycast IP itself never happens because the host route on
|
||||||
// The original design doc proposed lo to avoid NDP/ARP DAD
|
// the host side is `<anycast> via <pod-eth0-ip> dev flock<8hex>`.
|
||||||
// conflicts "across nodes advertising the same IP". That concern
|
// The kernel resolves <pod-eth0-ip> via NDP/ARP — and that IP IS
|
||||||
// doesn't apply to flock: each pod's veth is its own private /64,
|
// on eth0, so the pod responds normally.
|
||||||
// so DAD on eth0 only sees the veth peer (host) — no cross-node
|
|
||||||
// L2 contention. Putting the IP on eth0 instead means the pod
|
|
||||||
// kernel answers NDP solicits arriving on eth0 for that IP, which
|
|
||||||
// is what the host's /128 host route requires. With anycast on
|
|
||||||
// lo, NDP from the host side fails and the kernel drops the
|
|
||||||
// packet between routing decision and transmit.
|
|
||||||
if len(req.Anycast) > 0 {
|
if len(req.Anycast) > 0 {
|
||||||
|
lo, err := netlink.LinkByName("lo")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("lookup pod lo: %w", err)
|
||||||
|
}
|
||||||
|
if err := netlink.LinkSetUp(lo); err != nil {
|
||||||
|
return fmt.Errorf("set up pod lo: %w", err)
|
||||||
|
}
|
||||||
for _, ip := range req.Anycast {
|
for _, ip := range req.Anycast {
|
||||||
var mask net.IPMask
|
var mask net.IPMask
|
||||||
if ip.To4() != nil {
|
if ip.To4() != nil {
|
||||||
@@ -262,8 +263,8 @@ func configurePodSide(req SetupRequest) error {
|
|||||||
mask = net.CIDRMask(128, 128)
|
mask = net.CIDRMask(128, 128)
|
||||||
}
|
}
|
||||||
a := &netlink.Addr{IPNet: &net.IPNet{IP: ip, Mask: mask}, Scope: int(netlink.SCOPE_UNIVERSE)}
|
a := &netlink.Addr{IPNet: &net.IPNet{IP: ip, Mask: mask}, Scope: int(netlink.SCOPE_UNIVERSE)}
|
||||||
if err := netlink.AddrAdd(eth0, a); err != nil && !errors.Is(err, os.ErrExist) {
|
if err := netlink.AddrAdd(lo, a); err != nil && !errors.Is(err, os.ErrExist) {
|
||||||
return fmt.Errorf("pod eth0 anycast %s: %w", ip, err)
|
return fmt.Errorf("pod lo anycast %s: %w", ip, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user