Files
flock/pkg/agent/handlers.go
T
Donavan Fritz eb1f5e0d8d
Build flock Image / build (push) Has been cancelled
M2: netlink, IPAM/handler wiring, BIRD sidecar, CNI installer
Code (Linux build, with no-op stubs for macOS dev):
- pkg/agent/netns_linux.go: ensureVeth → host-side configure (addrgenmode
  none, fe80::1/64, proxy_arp, forwarding) → move peer to pod ns →
  configure pod side (addr, default route via fe80::1, v4 169.254.1.1
  on-link gateway) → host /128 + /32 routes. Idempotent.
- pkg/agent/hostiface.go: deterministic host iface name flock<8hex> from
  FNV-1a-32(containerID).
- pkg/agent/annotations.go: parse flock.fritzlab.net/{ipv6,ipv4,cidr6,
  cidr4,ip-algo,anycast} with design-doc defaults; ParseCNIArgs for the
  K8S_POD_* keys kubelet sets.
- pkg/agent/podinfo.go: shared informer scoped to spec.nodeName==NODE,
  WaitForPod helper for ADD-vs-informer-sync race.
- pkg/agent/handlers.go: PodHandler does
    cache lookup → annotations → IPAM → store(pending) → SetupFunc →
    store(committed) → Result. Idempotent on retry. Del symmetric.
- pkg/routing/bird/config.go: text/template render with stable ordering;
  golden tests for host001 + anycast injection + sort stability.
- pkg/agent/bird.go: writes /etc/flock/bird/bird.conf, debounces 500ms,
  execs `birdc -s /run/flock/bird.ctl configure`. Installs blackhole
  kernel routes for the node summary CIDRs so BIRD's protocol kernel
  imports them.
- pkg/agent/runtime_linux.go: at startup, waits up to 60s for the per-
  node NodeConfig, reconciles committed allocations into IPAM.used,
  garbage-collects pending entries, builds PodHandler, swaps RPC
  handlers in.
- cmd/flock-installer: init-container binary that copies /opt/cni/bin/
  flock and writes 01-flock.conflist (lex-first so kubelet picks it
  over Calico's 10-calico.conflist on flock-labeled nodes).

Deploy:
- Dockerfile: alpine + iproute2 + bird2; multi-binary image.
- deploy/daemonset.yaml: install-cni init container; bird sidecar
  sharing /etc/flock/bird + /run/flock with the agent; ConfigMap-seeded
  bootstrap bird.conf so the sidecar boots before the agent renders.
  Privileged on flock-agent + install-cni; bird sidecar uses
  NET_ADMIN/RAW only.
- RBAC: pods + networkpolicies get/list/watch (the latter is reserved
  for M8 — harmless to grant now).

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
2026-04-24 22:33:48 -05:00

175 lines
5.1 KiB
Go

package agent
import (
"context"
"fmt"
"net"
"time"
flockcni "code.fritzlab.net/fritzlab/flock/pkg/cni"
cnitypes "github.com/containernetworking/cni/pkg/types"
current "github.com/containernetworking/cni/pkg/types/100"
)
// PodHandler is the platform-agnostic ADD/DEL/CHECK implementation. It
// resolves the Pod from the informer cache, parses annotations, allocates
// from IPAM, programs netns (or skips on non-Linux build), and persists
// state. The netns ops are split into Setup/Teardown so platform stubs can
// keep the rest of the orchestration testable.
type PodHandler struct {
Node string
Store *Store
IPAM *IPAM
Pods *PodCache
NodeConfig *NodeConfigCache
// SetupFunc and TeardownFunc are injected at startup; in production
// they point at the Linux netlink ops, in tests they're fakes.
SetupFunc func(SetupRequest) error
TeardownFunc func(containerID string, ip6, ip4 net.IP) error
// AfterCommit is called after a successful ADD/DEL with the
// post-mutation Snapshot — used to refresh BIRD config.
AfterCommit func()
}
// Add implements the CNI ADD path.
func (h *PodHandler) Add(ctx context.Context, req flockcni.Request) (*current.Result, error) {
args := ParseCNIArgs(req.Args)
if args.PodName == "" || args.PodNamespace == "" {
return nil, fmt.Errorf("CNI_ARGS missing K8S_POD_NAMESPACE/NAME")
}
// Idempotency: if we already committed this containerID, return the
// existing IPs. kubelet retries ADD on the same sandbox.
if existing, ok := h.Store.Get(req.ContainerID); ok && existing.State == StateCommitted {
return resultFromAllocation(req.IfName, existing), nil
}
pod, err := h.Pods.WaitForPod(ctx, args.PodNamespace, args.PodName, 3*time.Second)
if err != nil {
return nil, fmt.Errorf("lookup pod: %w", err)
}
parsed, err := ParseAnnotations(pod.Annotations)
if err != nil {
return nil, fmt.Errorf("parse annotations: %w", err)
}
allocReq := AllocRequest{
ContainerID: req.ContainerID,
Namespace: args.PodNamespace,
Pod: args.PodName,
WantV6: parsed.WantV6,
WantV4: parsed.WantV4,
AnnCIDR6: parsed.CIDR6,
AnnCIDR4: parsed.CIDR4,
IPAlgo: parsed.IPAlgo,
}
res, err := h.IPAM.Allocate(allocReq)
if err != nil {
return nil, fmt.Errorf("ipam: %w", err)
}
// Persist pending entry before any netlink work so a crash mid-ADD
// leaves recoverable state.
pending := Allocation{
ContainerID: req.ContainerID,
Namespace: args.PodNamespace,
PodName: args.PodName,
OwnerUID: string(pod.UID),
IP6: ipString(res.IP6),
IP4: ipString(res.IP4),
State: StatePending,
AllocatedAt: time.Now().UTC(),
}
if err := h.Store.Upsert(pending); err != nil {
h.IPAM.Release(res.IP6, res.IP4)
return nil, fmt.Errorf("store pending: %w", err)
}
setup := SetupRequest{
ContainerID: req.ContainerID,
Netns: req.Netns,
IfName: req.IfName,
HostIface: HostIfaceName(req.ContainerID),
IP6: res.IP6,
IP4: res.IP4,
}
if err := h.SetupFunc(setup); err != nil {
// Roll forward: leave pending entry in place so startup GC can clean
// up the partial netns; let kubelet retry ADD.
return nil, fmt.Errorf("netns setup: %w", err)
}
committed := pending
committed.State = StateCommitted
if err := h.Store.Upsert(committed); err != nil {
return nil, fmt.Errorf("store commit: %w", err)
}
if h.AfterCommit != nil {
h.AfterCommit()
}
return resultFromAllocation(req.IfName, committed), nil
}
// Del implements CNI DEL. Idempotent.
func (h *PodHandler) Del(ctx context.Context, req flockcni.Request) error {
entry, ok := h.Store.Get(req.ContainerID)
if !ok {
return nil
}
ip6 := net.ParseIP(entry.IP6)
ip4 := net.ParseIP(entry.IP4)
if err := h.TeardownFunc(req.ContainerID, ip6, ip4); err != nil {
return fmt.Errorf("netns teardown: %w", err)
}
if err := h.Store.Delete(req.ContainerID); err != nil {
return fmt.Errorf("store delete: %w", err)
}
h.IPAM.Release(ip6, ip4)
if h.AfterCommit != nil {
h.AfterCommit()
}
return nil
}
// Check verifies that the persisted state is consistent. M2 minimum: just
// look up the entry; full kernel-state comparison is M7.
func (h *PodHandler) Check(_ context.Context, req flockcni.Request) error {
if _, ok := h.Store.Get(req.ContainerID); !ok {
return cnitypes.NewError(cnitypes.ErrUnknownContainer, "flock-check",
"container "+req.ContainerID+" has no allocation")
}
return nil
}
func resultFromAllocation(ifName string, a Allocation) *current.Result {
r := &current.Result{CNIVersion: current.ImplementedSpecVersion}
r.Interfaces = []*current.Interface{{Name: ifName, Sandbox: "pod"}}
if a.IP6 != "" {
ip6 := net.ParseIP(a.IP6)
r.IPs = append(r.IPs, &current.IPConfig{
Interface: intPtr(0),
Address: net.IPNet{IP: ip6, Mask: net.CIDRMask(128, 128)},
})
}
if a.IP4 != "" {
ip4 := net.ParseIP(a.IP4).To4()
r.IPs = append(r.IPs, &current.IPConfig{
Interface: intPtr(0),
Address: net.IPNet{IP: ip4, Mask: net.CIDRMask(32, 32)},
})
}
return r
}
func intPtr(i int) *int { return &i }
func ipString(ip net.IP) string {
if ip == nil {
return ""
}
return canonical(ip)
}