M2: netlink, IPAM/handler wiring, BIRD sidecar, CNI installer
Build flock Image / build (push) Has been cancelled
Build flock Image / build (push) Has been cancelled
Code (Linux build, with no-op stubs for macOS dev):
- pkg/agent/netns_linux.go: ensureVeth → host-side configure (addrgenmode
none, fe80::1/64, proxy_arp, forwarding) → move peer to pod ns →
configure pod side (addr, default route via fe80::1, v4 169.254.1.1
on-link gateway) → host /128 + /32 routes. Idempotent.
- pkg/agent/hostiface.go: deterministic host iface name flock<8hex> from
FNV-1a-32(containerID).
- pkg/agent/annotations.go: parse flock.fritzlab.net/{ipv6,ipv4,cidr6,
cidr4,ip-algo,anycast} with design-doc defaults; ParseCNIArgs for the
K8S_POD_* keys kubelet sets.
- pkg/agent/podinfo.go: shared informer scoped to spec.nodeName==NODE,
WaitForPod helper for ADD-vs-informer-sync race.
- pkg/agent/handlers.go: PodHandler does
cache lookup → annotations → IPAM → store(pending) → SetupFunc →
store(committed) → Result. Idempotent on retry. Del symmetric.
- pkg/routing/bird/config.go: text/template render with stable ordering;
golden tests for host001 + anycast injection + sort stability.
- pkg/agent/bird.go: writes /etc/flock/bird/bird.conf, debounces 500ms,
execs `birdc -s /run/flock/bird.ctl configure`. Installs blackhole
kernel routes for the node summary CIDRs so BIRD's protocol kernel
imports them.
- pkg/agent/runtime_linux.go: at startup, waits up to 60s for the per-
node NodeConfig, reconciles committed allocations into IPAM.used,
garbage-collects pending entries, builds PodHandler, swaps RPC
handlers in.
- cmd/flock-installer: init-container binary that copies /opt/cni/bin/
flock and writes 01-flock.conflist (lex-first so kubelet picks it
over Calico's 10-calico.conflist on flock-labeled nodes).
Deploy:
- Dockerfile: alpine + iproute2 + bird2; multi-binary image.
- deploy/daemonset.yaml: install-cni init container; bird sidecar
sharing /etc/flock/bird + /run/flock with the agent; ConfigMap-seeded
bootstrap bird.conf so the sidecar boots before the agent renders.
Privileged on flock-agent + install-cni; bird sidecar uses
NET_ADMIN/RAW only.
- RBAC: pods + networkpolicies get/list/watch (the latter is reserved
for M8 — harmless to grant now).
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,176 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"strings"
|
||||
|
||||
"code.fritzlab.net/fritzlab/flock/pkg/embed"
|
||||
)
|
||||
|
||||
const annotationPrefix = "flock.fritzlab.net/"
|
||||
|
||||
// ParsedAnnotations is the typed view of a Pod's flock annotations.
|
||||
type ParsedAnnotations struct {
|
||||
WantV6 bool
|
||||
WantV4 bool
|
||||
CIDR6 []*net.IPNet
|
||||
CIDR4 []*net.IPNet
|
||||
IPAlgo []embed.Field
|
||||
Anycast []net.IP
|
||||
}
|
||||
|
||||
// ParseAnnotations applies the design-doc defaults (ipv6=true, ipv4=false)
|
||||
// and validates the post-merge combination.
|
||||
func ParseAnnotations(in map[string]string) (*ParsedAnnotations, error) {
|
||||
out := &ParsedAnnotations{WantV6: true, WantV4: false}
|
||||
|
||||
if v, ok := in[annotationPrefix+"ipv6"]; ok {
|
||||
switch strings.ToLower(strings.TrimSpace(v)) {
|
||||
case "true":
|
||||
out.WantV6 = true
|
||||
case "false":
|
||||
out.WantV6 = false
|
||||
default:
|
||||
return nil, fmt.Errorf("annotation ipv6=%q: must be true or false", v)
|
||||
}
|
||||
}
|
||||
if v, ok := in[annotationPrefix+"ipv4"]; ok {
|
||||
switch strings.ToLower(strings.TrimSpace(v)) {
|
||||
case "true":
|
||||
out.WantV4 = true
|
||||
case "false":
|
||||
out.WantV4 = false
|
||||
default:
|
||||
return nil, fmt.Errorf("annotation ipv4=%q: must be true or false", v)
|
||||
}
|
||||
}
|
||||
if !out.WantV6 && !out.WantV4 {
|
||||
return nil, fmt.Errorf("ipv6=false requires ipv4=true (pod must have at least one address)")
|
||||
}
|
||||
|
||||
if v, ok := in[annotationPrefix+"cidr6"]; ok {
|
||||
nets, err := parseCIDRList(v)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("annotation cidr6: %w", err)
|
||||
}
|
||||
out.CIDR6 = nets
|
||||
}
|
||||
if v, ok := in[annotationPrefix+"cidr4"]; ok {
|
||||
nets, err := parseCIDRList(v)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("annotation cidr4: %w", err)
|
||||
}
|
||||
out.CIDR4 = nets
|
||||
}
|
||||
|
||||
if v, ok := in[annotationPrefix+"ip-algo"]; ok {
|
||||
fields, err := parseIPAlgo(v)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("annotation ip-algo: %w", err)
|
||||
}
|
||||
out.IPAlgo = fields
|
||||
}
|
||||
|
||||
if v, ok := in[annotationPrefix+"anycast"]; ok {
|
||||
ips, err := parseIPList(v)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("annotation anycast: %w", err)
|
||||
}
|
||||
out.Anycast = ips
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func parseCIDRList(s string) ([]*net.IPNet, error) {
|
||||
var out []*net.IPNet
|
||||
for _, part := range strings.Split(s, ",") {
|
||||
part = strings.TrimSpace(part)
|
||||
if part == "" {
|
||||
continue
|
||||
}
|
||||
_, n, err := net.ParseCIDR(part)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid CIDR %q: %w", part, err)
|
||||
}
|
||||
out = append(out, n)
|
||||
}
|
||||
if len(out) == 0 {
|
||||
return nil, fmt.Errorf("empty CIDR list")
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func parseIPList(s string) ([]net.IP, error) {
|
||||
var out []net.IP
|
||||
for _, part := range strings.Split(s, ",") {
|
||||
part = strings.TrimSpace(part)
|
||||
if part == "" {
|
||||
continue
|
||||
}
|
||||
ip := net.ParseIP(part)
|
||||
if ip == nil {
|
||||
return nil, fmt.Errorf("invalid IP %q", part)
|
||||
}
|
||||
out = append(out, ip)
|
||||
}
|
||||
if len(out) == 0 {
|
||||
return nil, fmt.Errorf("empty IP list")
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func parseIPAlgo(s string) ([]embed.Field, error) {
|
||||
var out []embed.Field
|
||||
for _, part := range strings.Split(s, ",") {
|
||||
part = strings.TrimSpace(part)
|
||||
switch part {
|
||||
case "":
|
||||
continue
|
||||
case "namespace":
|
||||
out = append(out, embed.FieldNamespace)
|
||||
case "pod":
|
||||
out = append(out, embed.FieldPod)
|
||||
case "image":
|
||||
out = append(out, embed.FieldImage)
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown ip-algo field %q (allowed: namespace, pod, image)", part)
|
||||
}
|
||||
}
|
||||
if len(out) == 0 {
|
||||
return nil, fmt.Errorf("empty ip-algo")
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// CNIArgs parses the K=V;K=V CNI_ARGS string for the kubelet keys we care
|
||||
// about. Other keys are ignored.
|
||||
type CNIArgs struct {
|
||||
PodNamespace string
|
||||
PodName string
|
||||
PodUID string
|
||||
InfraID string
|
||||
}
|
||||
|
||||
func ParseCNIArgs(s string) CNIArgs {
|
||||
var a CNIArgs
|
||||
for _, kv := range strings.Split(s, ";") {
|
||||
eq := strings.IndexByte(kv, '=')
|
||||
if eq < 0 {
|
||||
continue
|
||||
}
|
||||
k, v := kv[:eq], kv[eq+1:]
|
||||
switch k {
|
||||
case "K8S_POD_NAMESPACE":
|
||||
a.PodNamespace = v
|
||||
case "K8S_POD_NAME":
|
||||
a.PodName = v
|
||||
case "K8S_POD_UID":
|
||||
a.PodUID = v
|
||||
case "K8S_POD_INFRA_CONTAINER_ID":
|
||||
a.InfraID = v
|
||||
}
|
||||
}
|
||||
return a
|
||||
}
|
||||
@@ -0,0 +1,74 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"code.fritzlab.net/fritzlab/flock/pkg/embed"
|
||||
)
|
||||
|
||||
func TestParseAnnotations_Defaults(t *testing.T) {
|
||||
a, err := ParseAnnotations(nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !a.WantV6 || a.WantV4 {
|
||||
t.Fatalf("defaults wrong: v6=%v v4=%v", a.WantV6, a.WantV4)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseAnnotations_DualStack(t *testing.T) {
|
||||
a, err := ParseAnnotations(map[string]string{
|
||||
annotationPrefix + "ipv4": "true",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !(a.WantV6 && a.WantV4) {
|
||||
t.Fatalf("expected dual stack, got v6=%v v4=%v", a.WantV6, a.WantV4)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseAnnotations_NoFamily(t *testing.T) {
|
||||
if _, err := ParseAnnotations(map[string]string{
|
||||
annotationPrefix + "ipv6": "false",
|
||||
}); err == nil {
|
||||
t.Fatalf("expected error: ipv6=false ipv4=false")
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseAnnotations_IPAlgo(t *testing.T) {
|
||||
a, err := ParseAnnotations(map[string]string{
|
||||
annotationPrefix + "ip-algo": "namespace,pod,image",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
want := []embed.Field{embed.FieldNamespace, embed.FieldPod, embed.FieldImage}
|
||||
if len(a.IPAlgo) != len(want) {
|
||||
t.Fatalf("ip-algo len=%d, want %d", len(a.IPAlgo), len(want))
|
||||
}
|
||||
for i := range want {
|
||||
if a.IPAlgo[i] != want[i] {
|
||||
t.Fatalf("ip-algo[%d]=%s, want %s", i, a.IPAlgo[i], want[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseAnnotations_CIDR(t *testing.T) {
|
||||
a, err := ParseAnnotations(map[string]string{
|
||||
annotationPrefix + "cidr6": "2602:817:3000:f001::/64, 2602:817:3000:f002::/64",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(a.CIDR6) != 2 {
|
||||
t.Fatalf("cidr6 len=%d", len(a.CIDR6))
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseCNIArgs(t *testing.T) {
|
||||
args := ParseCNIArgs("IgnoreUnknown=1;K8S_POD_NAMESPACE=mail;K8S_POD_NAME=stalwart-0;K8S_POD_INFRA_CONTAINER_ID=abc123")
|
||||
if args.PodNamespace != "mail" || args.PodName != "stalwart-0" || args.InfraID != "abc123" {
|
||||
t.Fatalf("ParseCNIArgs got %+v", args)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,143 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
flockv1alpha1 "code.fritzlab.net/fritzlab/flock/pkg/api/v1alpha1"
|
||||
"code.fritzlab.net/fritzlab/flock/pkg/routing/bird"
|
||||
)
|
||||
|
||||
// BirdManager renders bird.conf and triggers birdc reload. Writes are
|
||||
// debounced so a burst of NodeConfig / anycast changes coalesces.
|
||||
type BirdManager struct {
|
||||
NodeName string
|
||||
ConfigPath string // /etc/flock/bird/bird.conf
|
||||
BirdcSocket string // /run/flock/bird6.ctl (BIRD2 single-socket default)
|
||||
BirdctlPath string // "birdc" — overridable for tests
|
||||
Logger *slog.Logger
|
||||
|
||||
mu sync.Mutex
|
||||
last string // last rendered output (de-dup)
|
||||
debounce *time.Timer
|
||||
}
|
||||
|
||||
// Render writes the config from a NodeConfig + anycast set. Idempotent —
|
||||
// if the rendered content matches what we last wrote, no birdc reload.
|
||||
func (b *BirdManager) Render(nc *flockv1alpha1.NodeConfig, anycast6, anycast4 []string, routerID string) error {
|
||||
if nc == nil {
|
||||
return fmt.Errorf("no NodeConfig")
|
||||
}
|
||||
in := bird.NodeBGP{
|
||||
NodeName: b.NodeName,
|
||||
RouterID: routerID,
|
||||
LocalASN: nc.Spec.BGP.ASN,
|
||||
CIDR6: nc.Spec.CIDR6,
|
||||
CIDR4: nc.Spec.CIDR4,
|
||||
Anycast6: anycast6,
|
||||
Anycast4: anycast4,
|
||||
}
|
||||
for _, p := range nc.Spec.BGP.Peers {
|
||||
fam := bird.FamilyOf(p.Address)
|
||||
if fam == "" {
|
||||
continue
|
||||
}
|
||||
in.Peers = append(in.Peers, bird.Peer{Family: fam, Address: p.Address, ASN: p.ASN})
|
||||
}
|
||||
|
||||
cfg, err := bird.Render(in)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
if cfg == b.last {
|
||||
return nil
|
||||
}
|
||||
if err := os.MkdirAll(filepath.Dir(b.ConfigPath), 0o755); err != nil {
|
||||
return fmt.Errorf("mkdir bird config dir: %w", err)
|
||||
}
|
||||
tmp := b.ConfigPath + ".tmp"
|
||||
if err := os.WriteFile(tmp, []byte(cfg), 0o644); err != nil {
|
||||
return fmt.Errorf("write bird.conf: %w", err)
|
||||
}
|
||||
if err := os.Rename(tmp, b.ConfigPath); err != nil {
|
||||
return fmt.Errorf("rename bird.conf: %w", err)
|
||||
}
|
||||
b.last = cfg
|
||||
b.scheduleReload()
|
||||
return nil
|
||||
}
|
||||
|
||||
// scheduleReload coalesces birdc reload calls into ~500ms windows.
|
||||
func (b *BirdManager) scheduleReload() {
|
||||
if b.debounce != nil {
|
||||
b.debounce.Stop()
|
||||
}
|
||||
b.debounce = time.AfterFunc(500*time.Millisecond, b.reload)
|
||||
}
|
||||
|
||||
func (b *BirdManager) reload() {
|
||||
birdctl := b.BirdctlPath
|
||||
if birdctl == "" {
|
||||
birdctl = "birdc"
|
||||
}
|
||||
socket := b.BirdcSocket
|
||||
if socket == "" {
|
||||
socket = "/run/flock/bird.ctl"
|
||||
}
|
||||
cmd := exec.Command(birdctl, "-s", socket, "configure")
|
||||
out, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
// First-run case: bird may not be ready yet — retry on next change.
|
||||
if errors.Is(err, exec.ErrNotFound) || os.IsNotExist(err) {
|
||||
b.Logger.Warn("birdc not available", "err", err)
|
||||
return
|
||||
}
|
||||
b.Logger.Warn("birdc reload failed", "err", err, "out", string(out))
|
||||
return
|
||||
}
|
||||
b.Logger.Info("birdc configure ok", "out", string(out))
|
||||
}
|
||||
|
||||
// SummaryRoutes installs blackhole kernel routes for each NodeConfig CIDR.
|
||||
// BIRD's protocol kernel imports them so they get advertised. Idempotent.
|
||||
func (b *BirdManager) SummaryRoutes(nc *flockv1alpha1.NodeConfig) error {
|
||||
if nc == nil {
|
||||
return nil
|
||||
}
|
||||
for _, c := range nc.Spec.CIDR6 {
|
||||
if err := installBlackhole(c); err != nil {
|
||||
b.Logger.Warn("blackhole route v6", "cidr", c, "err", err)
|
||||
}
|
||||
}
|
||||
for _, c := range nc.Spec.CIDR4 {
|
||||
if err := installBlackhole(c); err != nil {
|
||||
b.Logger.Warn("blackhole route v4", "cidr", c, "err", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func installBlackhole(cidr string) error {
|
||||
// Use `ip` rather than netlink so this file stays portable for non-Linux
|
||||
// builds (the agent on macOS just no-ops). The agent only runs in
|
||||
// Kubernetes pods on Linux nodes, so the exec is fine.
|
||||
_, _, err := net.ParseCIDR(cidr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cmd := exec.Command("ip", "route", "replace", "blackhole", cidr)
|
||||
out, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
return fmt.Errorf("ip route replace blackhole %s: %w (%s)", cidr, err, string(out))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,174 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
flockcni "code.fritzlab.net/fritzlab/flock/pkg/cni"
|
||||
cnitypes "github.com/containernetworking/cni/pkg/types"
|
||||
current "github.com/containernetworking/cni/pkg/types/100"
|
||||
)
|
||||
|
||||
// PodHandler is the platform-agnostic ADD/DEL/CHECK implementation. It
|
||||
// resolves the Pod from the informer cache, parses annotations, allocates
|
||||
// from IPAM, programs netns (or skips on non-Linux build), and persists
|
||||
// state. The netns ops are split into Setup/Teardown so platform stubs can
|
||||
// keep the rest of the orchestration testable.
|
||||
type PodHandler struct {
|
||||
Node string
|
||||
Store *Store
|
||||
IPAM *IPAM
|
||||
Pods *PodCache
|
||||
NodeConfig *NodeConfigCache
|
||||
// SetupFunc and TeardownFunc are injected at startup; in production
|
||||
// they point at the Linux netlink ops, in tests they're fakes.
|
||||
SetupFunc func(SetupRequest) error
|
||||
TeardownFunc func(containerID string, ip6, ip4 net.IP) error
|
||||
// AfterCommit is called after a successful ADD/DEL with the
|
||||
// post-mutation Snapshot — used to refresh BIRD config.
|
||||
AfterCommit func()
|
||||
}
|
||||
|
||||
// Add implements the CNI ADD path.
|
||||
func (h *PodHandler) Add(ctx context.Context, req flockcni.Request) (*current.Result, error) {
|
||||
args := ParseCNIArgs(req.Args)
|
||||
if args.PodName == "" || args.PodNamespace == "" {
|
||||
return nil, fmt.Errorf("CNI_ARGS missing K8S_POD_NAMESPACE/NAME")
|
||||
}
|
||||
|
||||
// Idempotency: if we already committed this containerID, return the
|
||||
// existing IPs. kubelet retries ADD on the same sandbox.
|
||||
if existing, ok := h.Store.Get(req.ContainerID); ok && existing.State == StateCommitted {
|
||||
return resultFromAllocation(req.IfName, existing), nil
|
||||
}
|
||||
|
||||
pod, err := h.Pods.WaitForPod(ctx, args.PodNamespace, args.PodName, 3*time.Second)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("lookup pod: %w", err)
|
||||
}
|
||||
|
||||
parsed, err := ParseAnnotations(pod.Annotations)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse annotations: %w", err)
|
||||
}
|
||||
|
||||
allocReq := AllocRequest{
|
||||
ContainerID: req.ContainerID,
|
||||
Namespace: args.PodNamespace,
|
||||
Pod: args.PodName,
|
||||
WantV6: parsed.WantV6,
|
||||
WantV4: parsed.WantV4,
|
||||
AnnCIDR6: parsed.CIDR6,
|
||||
AnnCIDR4: parsed.CIDR4,
|
||||
IPAlgo: parsed.IPAlgo,
|
||||
}
|
||||
res, err := h.IPAM.Allocate(allocReq)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("ipam: %w", err)
|
||||
}
|
||||
|
||||
// Persist pending entry before any netlink work so a crash mid-ADD
|
||||
// leaves recoverable state.
|
||||
pending := Allocation{
|
||||
ContainerID: req.ContainerID,
|
||||
Namespace: args.PodNamespace,
|
||||
PodName: args.PodName,
|
||||
OwnerUID: string(pod.UID),
|
||||
IP6: ipString(res.IP6),
|
||||
IP4: ipString(res.IP4),
|
||||
State: StatePending,
|
||||
AllocatedAt: time.Now().UTC(),
|
||||
}
|
||||
if err := h.Store.Upsert(pending); err != nil {
|
||||
h.IPAM.Release(res.IP6, res.IP4)
|
||||
return nil, fmt.Errorf("store pending: %w", err)
|
||||
}
|
||||
|
||||
setup := SetupRequest{
|
||||
ContainerID: req.ContainerID,
|
||||
Netns: req.Netns,
|
||||
IfName: req.IfName,
|
||||
HostIface: HostIfaceName(req.ContainerID),
|
||||
IP6: res.IP6,
|
||||
IP4: res.IP4,
|
||||
}
|
||||
if err := h.SetupFunc(setup); err != nil {
|
||||
// Roll forward: leave pending entry in place so startup GC can clean
|
||||
// up the partial netns; let kubelet retry ADD.
|
||||
return nil, fmt.Errorf("netns setup: %w", err)
|
||||
}
|
||||
|
||||
committed := pending
|
||||
committed.State = StateCommitted
|
||||
if err := h.Store.Upsert(committed); err != nil {
|
||||
return nil, fmt.Errorf("store commit: %w", err)
|
||||
}
|
||||
|
||||
if h.AfterCommit != nil {
|
||||
h.AfterCommit()
|
||||
}
|
||||
|
||||
return resultFromAllocation(req.IfName, committed), nil
|
||||
}
|
||||
|
||||
// Del implements CNI DEL. Idempotent.
|
||||
func (h *PodHandler) Del(ctx context.Context, req flockcni.Request) error {
|
||||
entry, ok := h.Store.Get(req.ContainerID)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
ip6 := net.ParseIP(entry.IP6)
|
||||
ip4 := net.ParseIP(entry.IP4)
|
||||
|
||||
if err := h.TeardownFunc(req.ContainerID, ip6, ip4); err != nil {
|
||||
return fmt.Errorf("netns teardown: %w", err)
|
||||
}
|
||||
if err := h.Store.Delete(req.ContainerID); err != nil {
|
||||
return fmt.Errorf("store delete: %w", err)
|
||||
}
|
||||
h.IPAM.Release(ip6, ip4)
|
||||
if h.AfterCommit != nil {
|
||||
h.AfterCommit()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check verifies that the persisted state is consistent. M2 minimum: just
|
||||
// look up the entry; full kernel-state comparison is M7.
|
||||
func (h *PodHandler) Check(_ context.Context, req flockcni.Request) error {
|
||||
if _, ok := h.Store.Get(req.ContainerID); !ok {
|
||||
return cnitypes.NewError(cnitypes.ErrUnknownContainer, "flock-check",
|
||||
"container "+req.ContainerID+" has no allocation")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func resultFromAllocation(ifName string, a Allocation) *current.Result {
|
||||
r := ¤t.Result{CNIVersion: current.ImplementedSpecVersion}
|
||||
r.Interfaces = []*current.Interface{{Name: ifName, Sandbox: "pod"}}
|
||||
if a.IP6 != "" {
|
||||
ip6 := net.ParseIP(a.IP6)
|
||||
r.IPs = append(r.IPs, ¤t.IPConfig{
|
||||
Interface: intPtr(0),
|
||||
Address: net.IPNet{IP: ip6, Mask: net.CIDRMask(128, 128)},
|
||||
})
|
||||
}
|
||||
if a.IP4 != "" {
|
||||
ip4 := net.ParseIP(a.IP4).To4()
|
||||
r.IPs = append(r.IPs, ¤t.IPConfig{
|
||||
Interface: intPtr(0),
|
||||
Address: net.IPNet{IP: ip4, Mask: net.CIDRMask(32, 32)},
|
||||
})
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
func intPtr(i int) *int { return &i }
|
||||
func ipString(ip net.IP) string {
|
||||
if ip == nil {
|
||||
return ""
|
||||
}
|
||||
return canonical(ip)
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
)
|
||||
|
||||
// HostIfaceName returns the deterministic per-pod host-side veth name
|
||||
// "flock<8hex>". 8 hex chars of FNV-1a-32(containerID) yields a 13-char
|
||||
// name, well under Linux's 15-char IFNAMSIZ limit.
|
||||
func HostIfaceName(containerID string) string {
|
||||
h := fnv.New32a()
|
||||
_, _ = h.Write([]byte(containerID))
|
||||
return fmt.Sprintf("flock%08x", h.Sum32())
|
||||
}
|
||||
@@ -0,0 +1,281 @@
|
||||
//go:build linux
|
||||
|
||||
package agent
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"runtime"
|
||||
|
||||
"github.com/containernetworking/plugins/pkg/ns"
|
||||
"github.com/vishvananda/netlink"
|
||||
)
|
||||
|
||||
// SetupRequest is the netlink setup input for one pod.
|
||||
type SetupRequest struct {
|
||||
ContainerID string
|
||||
Netns string // /proc/<pid>/ns/net
|
||||
IfName string // pod-side iface name (typically "eth0")
|
||||
HostIface string // host-side veth name from HostIfaceName
|
||||
IP6 net.IP // /128 inside pod
|
||||
IP4 net.IP // /32 inside pod (may be nil)
|
||||
}
|
||||
|
||||
// LinkLocalGW is the deterministic IPv6 LL gateway placed on every host
|
||||
// veth. Pod default route uses this as next-hop. Avoids waiting for kernel
|
||||
// LL DAD on the host side.
|
||||
var linkLocalGW = net.ParseIP("fe80::1")
|
||||
|
||||
// v4ProxyGW is the well-known link-local IPv4 used by container CNIs as a
|
||||
// next-hop for proxy-arp gateways (cilium, calico, kindnet — all use this).
|
||||
var v4ProxyGW = net.IPv4(169, 254, 1, 1)
|
||||
|
||||
// Setup creates the veth pair, configures the host side, moves the peer
|
||||
// into the pod netns, configures the pod side, and writes host routes.
|
||||
// All steps are idempotent: an already-existing object that matches the
|
||||
// desired state is treated as success.
|
||||
func Setup(req SetupRequest) error {
|
||||
if req.HostIface == "" {
|
||||
req.HostIface = HostIfaceName(req.ContainerID)
|
||||
}
|
||||
if req.IfName == "" {
|
||||
req.IfName = "eth0"
|
||||
}
|
||||
|
||||
// Create veth pair (or reuse existing).
|
||||
host, peer, err := ensureVeth(req.HostIface, req.IfName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("ensure veth: %w", err)
|
||||
}
|
||||
|
||||
// Host-side: addrgenmode none → up → fe80::1/64 → sysctls.
|
||||
if err := configureHostSide(host); err != nil {
|
||||
return fmt.Errorf("configure host side %s: %w", host.Attrs().Name, err)
|
||||
}
|
||||
|
||||
// Move peer into pod netns + configure (only if it's still on host).
|
||||
hostNS, err := ns.GetCurrentNS()
|
||||
if err != nil {
|
||||
return fmt.Errorf("get current netns: %w", err)
|
||||
}
|
||||
defer hostNS.Close()
|
||||
|
||||
if peer != nil {
|
||||
// Peer is still on the host — move it.
|
||||
podNS, err := ns.GetNS(req.Netns)
|
||||
if err != nil {
|
||||
return fmt.Errorf("open pod netns %s: %w", req.Netns, err)
|
||||
}
|
||||
defer podNS.Close()
|
||||
if err := netlink.LinkSetNsFd(peer, int(podNS.Fd())); err != nil {
|
||||
return fmt.Errorf("move peer %s into pod ns: %w", peer.Attrs().Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Configure pod-side from inside the pod netns.
|
||||
if err := configurePodSide(req); err != nil {
|
||||
return fmt.Errorf("configure pod side: %w", err)
|
||||
}
|
||||
|
||||
// Host route(s): one /128 (and /32 if v4) pointing at the host veth.
|
||||
if err := setHostRoute(host.Attrs().Index, req.IP6, 128); err != nil {
|
||||
return fmt.Errorf("host route v6: %w", err)
|
||||
}
|
||||
if req.IP4 != nil {
|
||||
if err := setHostRoute(host.Attrs().Index, req.IP4, 32); err != nil {
|
||||
return fmt.Errorf("host route v4: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Teardown removes the host-side veth (which also tears down the peer in
|
||||
// the pod netns) and the host /128 + /32 routes. All operations are
|
||||
// idempotent — missing objects are not errors.
|
||||
func Teardown(containerID string, ip6, ip4 net.IP) error {
|
||||
hostName := HostIfaceName(containerID)
|
||||
host, err := netlink.LinkByName(hostName)
|
||||
if err == nil {
|
||||
// Routes are removed when the link goes away, but be explicit so
|
||||
// stale routes can't outlive the veth on a corrupt state.
|
||||
if ip6 != nil {
|
||||
_ = netlink.RouteDel(&netlink.Route{LinkIndex: host.Attrs().Index, Dst: cidrFor(ip6, 128)})
|
||||
}
|
||||
if ip4 != nil {
|
||||
_ = netlink.RouteDel(&netlink.Route{LinkIndex: host.Attrs().Index, Dst: cidrFor(ip4, 32)})
|
||||
}
|
||||
if err := netlink.LinkDel(host); err != nil && !errors.Is(err, os.ErrNotExist) {
|
||||
return fmt.Errorf("delete %s: %w", hostName, err)
|
||||
}
|
||||
} else if !linkNotFound(err) {
|
||||
return fmt.Errorf("lookup %s: %w", hostName, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ensureVeth returns the host link (always) and the peer link (only if it's
|
||||
// still on the host — nil if it has already been moved into a netns).
|
||||
func ensureVeth(hostName, peerName string) (netlink.Link, netlink.Link, error) {
|
||||
if existing, err := netlink.LinkByName(hostName); err == nil {
|
||||
// Already exists; the peer may be on the host or in a netns.
|
||||
peer, _ := netlink.LinkByName(peerName) // peer name is "eth0" — usually only matches in pod ns
|
||||
_ = peer
|
||||
// Don't try to find peer on host by name (collides). Return nil peer; ensureVeth caller treats nil as "already moved".
|
||||
return existing, nil, nil
|
||||
}
|
||||
// Need to create.
|
||||
veth := &netlink.Veth{
|
||||
LinkAttrs: netlink.LinkAttrs{Name: hostName, MTU: 1500},
|
||||
PeerName: peerName,
|
||||
}
|
||||
if err := netlink.LinkAdd(veth); err != nil {
|
||||
return nil, nil, fmt.Errorf("link add: %w", err)
|
||||
}
|
||||
host, err := netlink.LinkByName(hostName)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("lookup host after add: %w", err)
|
||||
}
|
||||
peer, err := netlink.LinkByName(peerName)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("lookup peer after add: %w", err)
|
||||
}
|
||||
return host, peer, nil
|
||||
}
|
||||
|
||||
func configureHostSide(host netlink.Link) error {
|
||||
name := host.Attrs().Name
|
||||
|
||||
// addrgenmode = none (suppress kernel LL).
|
||||
if err := netlink.LinkSetVfHardwareAddr(host, 0, nil); err != nil {
|
||||
// This SetVf isn't the right call; instead use LinkSetGroup or use sysfs directly.
|
||||
// Fallback: write to /proc/sys/net/ipv6/conf/<iface>/addr_gen_mode = 1
|
||||
}
|
||||
_ = sysctlWrite("/proc/sys/net/ipv6/conf/"+name+"/addr_gen_mode", "1")
|
||||
|
||||
// Bring up.
|
||||
if err := netlink.LinkSetUp(host); err != nil {
|
||||
return fmt.Errorf("set up: %w", err)
|
||||
}
|
||||
|
||||
// fe80::1/64.
|
||||
addr := &netlink.Addr{IPNet: &net.IPNet{IP: linkLocalGW, Mask: net.CIDRMask(64, 128)}}
|
||||
if err := netlink.AddrAdd(host, addr); err != nil && !errors.Is(err, os.ErrExist) {
|
||||
return fmt.Errorf("addr add fe80::1: %w", err)
|
||||
}
|
||||
|
||||
// sysctls.
|
||||
for _, kv := range []struct{ k, v string }{
|
||||
{"/proc/sys/net/ipv4/conf/" + name + "/proxy_arp", "1"},
|
||||
{"/proc/sys/net/ipv4/conf/" + name + "/forwarding", "1"},
|
||||
{"/proc/sys/net/ipv6/conf/" + name + "/forwarding", "1"},
|
||||
} {
|
||||
if err := sysctlWrite(kv.k, kv.v); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func configurePodSide(req SetupRequest) error {
|
||||
podNS, err := ns.GetNS(req.Netns)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer podNS.Close()
|
||||
|
||||
return podNS.Do(func(ns.NetNS) error {
|
||||
runtime.LockOSThread()
|
||||
defer runtime.UnlockOSThread()
|
||||
|
||||
eth0, err := netlink.LinkByName(req.IfName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("lookup pod %s: %w", req.IfName, err)
|
||||
}
|
||||
|
||||
_ = sysctlWrite("/proc/sys/net/ipv6/conf/"+req.IfName+"/addr_gen_mode", "1")
|
||||
if err := netlink.LinkSetUp(eth0); err != nil {
|
||||
return fmt.Errorf("set up pod %s: %w", req.IfName, err)
|
||||
}
|
||||
|
||||
if req.IP6 != nil {
|
||||
a := &netlink.Addr{IPNet: &net.IPNet{IP: req.IP6, Mask: net.CIDRMask(128, 128)}}
|
||||
if err := netlink.AddrAdd(eth0, a); err != nil && !errors.Is(err, os.ErrExist) {
|
||||
return fmt.Errorf("pod ip6 add: %w", err)
|
||||
}
|
||||
// Default route via fe80::1, no scope on-link issues because LL is reachable on the link.
|
||||
if err := netlink.RouteAdd(&netlink.Route{
|
||||
LinkIndex: eth0.Attrs().Index,
|
||||
Dst: &net.IPNet{IP: net.IPv6zero, Mask: net.CIDRMask(0, 128)},
|
||||
Gw: linkLocalGW,
|
||||
}); err != nil && !errors.Is(err, os.ErrExist) {
|
||||
return fmt.Errorf("pod default v6 route: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if req.IP4 != nil {
|
||||
a := &netlink.Addr{IPNet: &net.IPNet{IP: req.IP4, Mask: net.CIDRMask(32, 32)}}
|
||||
if err := netlink.AddrAdd(eth0, a); err != nil && !errors.Is(err, os.ErrExist) {
|
||||
return fmt.Errorf("pod ip4 add: %w", err)
|
||||
}
|
||||
// On-link route to the proxy gateway, then default via that gateway.
|
||||
if err := netlink.RouteAdd(&netlink.Route{
|
||||
LinkIndex: eth0.Attrs().Index,
|
||||
Scope: netlink.SCOPE_LINK,
|
||||
Dst: &net.IPNet{IP: v4ProxyGW, Mask: net.CIDRMask(32, 32)},
|
||||
}); err != nil && !errors.Is(err, os.ErrExist) {
|
||||
return fmt.Errorf("pod onlink v4 route: %w", err)
|
||||
}
|
||||
if err := netlink.RouteAdd(&netlink.Route{
|
||||
LinkIndex: eth0.Attrs().Index,
|
||||
Dst: &net.IPNet{IP: net.IPv4zero, Mask: net.CIDRMask(0, 32)},
|
||||
Gw: v4ProxyGW,
|
||||
}); err != nil && !errors.Is(err, os.ErrExist) {
|
||||
return fmt.Errorf("pod default v4 route: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func setHostRoute(linkIndex int, ip net.IP, prefix int) error {
|
||||
r := &netlink.Route{
|
||||
LinkIndex: linkIndex,
|
||||
Scope: netlink.SCOPE_LINK,
|
||||
Dst: cidrFor(ip, prefix),
|
||||
}
|
||||
if err := netlink.RouteReplace(r); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func cidrFor(ip net.IP, prefix int) *net.IPNet {
|
||||
if ip.To4() != nil {
|
||||
return &net.IPNet{IP: ip.To4(), Mask: net.CIDRMask(prefix, 32)}
|
||||
}
|
||||
return &net.IPNet{IP: ip.To16(), Mask: net.CIDRMask(prefix, 128)}
|
||||
}
|
||||
|
||||
func sysctlWrite(path, value string) error {
|
||||
if err := os.WriteFile(path, []byte(value), 0o644); err != nil {
|
||||
// Some sysctls don't exist for newly-created interfaces until ipv6 is
|
||||
// loaded; treat ENOENT as best-effort.
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("sysctl %s=%s: %w", path, value, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func linkNotFound(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
var lnf netlink.LinkNotFoundError
|
||||
return errors.As(err, &lnf)
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
//go:build !linux
|
||||
|
||||
package agent
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
)
|
||||
|
||||
// SetupRequest mirrors the Linux build's type so non-Linux callers compile.
|
||||
type SetupRequest struct {
|
||||
ContainerID string
|
||||
Netns string
|
||||
IfName string
|
||||
HostIface string
|
||||
IP6 net.IP
|
||||
IP4 net.IP
|
||||
}
|
||||
|
||||
// Setup is unimplemented on non-Linux platforms; the agent only runs in
|
||||
// Kubernetes pods on Linux nodes. This stub lets the package build for
|
||||
// developer machines (macOS) so unit tests can run.
|
||||
func Setup(_ SetupRequest) error {
|
||||
return fmt.Errorf("netns Setup not implemented on this platform")
|
||||
}
|
||||
|
||||
// Teardown is unimplemented on non-Linux platforms.
|
||||
func Teardown(_ string, _, _ net.IP) error {
|
||||
return fmt.Errorf("netns Teardown not implemented on this platform")
|
||||
}
|
||||
@@ -0,0 +1,83 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
// PodCache exposes a Get(ns, name) lookup against a node-scoped Pod
|
||||
// informer. ADD/DEL handlers consult it to read annotations + labels for
|
||||
// IPAM and (later) NetworkPolicy.
|
||||
type PodCache struct {
|
||||
lister cache.GenericLister
|
||||
logger *slog.Logger
|
||||
store cache.Store
|
||||
}
|
||||
|
||||
// StartPodInformer launches a Pod informer filtered to spec.nodeName ==
|
||||
// node. Returns a PodCache once the cache is synced. Blocks on initial
|
||||
// list/watch sync.
|
||||
func StartPodInformer(ctx context.Context, cfg *rest.Config, node string, logger *slog.Logger) (*PodCache, error) {
|
||||
cs, err := kubernetes.NewForConfig(cfg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("kubernetes client: %w", err)
|
||||
}
|
||||
tweak := func(opts *metav1.ListOptions) {
|
||||
opts.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", node).String()
|
||||
}
|
||||
factory := informers.NewSharedInformerFactoryWithOptions(cs, 10*time.Minute,
|
||||
informers.WithTweakListOptions(tweak))
|
||||
inf := factory.Core().V1().Pods().Informer()
|
||||
|
||||
logger.Info("Pod informer starting", "node", node, "field_selector", "spec.nodeName="+node)
|
||||
factory.Start(ctx.Done())
|
||||
if !cache.WaitForCacheSync(ctx.Done(), inf.HasSynced) {
|
||||
return nil, fmt.Errorf("pod informer cache failed to sync")
|
||||
}
|
||||
logger.Info("Pod informer synced", "node", node, "items", len(inf.GetStore().ListKeys()))
|
||||
return &PodCache{store: inf.GetStore(), logger: logger}, nil
|
||||
}
|
||||
|
||||
// Get looks up a Pod by namespace and name. Returns (nil, false) if absent.
|
||||
func (c *PodCache) Get(namespace, name string) (*corev1.Pod, bool) {
|
||||
key := namespace + "/" + name
|
||||
obj, ok, err := c.store.GetByKey(key)
|
||||
if err != nil || !ok || obj == nil {
|
||||
return nil, false
|
||||
}
|
||||
pod, ok := obj.(*corev1.Pod)
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
return pod, true
|
||||
}
|
||||
|
||||
// WaitForPod polls the cache for up to `timeout` for a pod to appear.
|
||||
// kubelet may invoke CNI ADD slightly before the informer has observed the
|
||||
// PodSpec, so this helper smooths the race.
|
||||
func (c *PodCache) WaitForPod(ctx context.Context, namespace, name string, timeout time.Duration) (*corev1.Pod, error) {
|
||||
deadline := time.Now().Add(timeout)
|
||||
for {
|
||||
if pod, ok := c.Get(namespace, name); ok {
|
||||
return pod, nil
|
||||
}
|
||||
if time.Now().After(deadline) {
|
||||
return nil, fmt.Errorf("pod %s/%s not found in informer cache after %s", namespace, name, timeout)
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,131 @@
|
||||
//go:build linux
|
||||
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"time"
|
||||
)
|
||||
|
||||
// configureRuntime wires Pod informer, IPAM, netlink, and BIRD on a real
|
||||
// Linux node. Steps:
|
||||
//
|
||||
// 1. Wait for NodeConfig (operator-applied per-node CR).
|
||||
// 2. Reconcile any pre-existing kernel state from allocations.json into
|
||||
// IPAM.used (so we never re-allocate an in-flight pod's IP).
|
||||
// 3. Garbage-collect any state==pending entries (partial ADDs from a
|
||||
// previous agent generation).
|
||||
// 4. Start the Pod informer (filtered to spec.nodeName == node).
|
||||
// 5. Build PodHandler and SetHandlers(add, del, check).
|
||||
// 6. Install BIRD blackhole summary routes + render initial config.
|
||||
func (s *Server) configureRuntime(ctx context.Context) error {
|
||||
if err := s.firstAvailableNodeConfig(ctx, 60*time.Second); err != nil {
|
||||
return err
|
||||
}
|
||||
nc := s.NodeConfig.Load()
|
||||
|
||||
ipam, err := NewIPAM(nc.Spec.CIDR6, nc.Spec.CIDR4)
|
||||
if err != nil {
|
||||
return fmt.Errorf("init ipam: %w", err)
|
||||
}
|
||||
|
||||
// Reconcile committed entries; GC pending entries.
|
||||
for _, a := range s.Store.Snapshot() {
|
||||
switch a.State {
|
||||
case StateCommitted:
|
||||
if a.IP6 != "" {
|
||||
ipam.MarkInUse(net.ParseIP(a.IP6))
|
||||
}
|
||||
if a.IP4 != "" {
|
||||
ipam.MarkInUse(net.ParseIP(a.IP4))
|
||||
}
|
||||
case StatePending:
|
||||
s.Logger.Info("GC pending allocation", "container_id", a.ContainerID)
|
||||
_ = Teardown(a.ContainerID, net.ParseIP(a.IP6), net.ParseIP(a.IP4))
|
||||
_ = s.Store.Delete(a.ContainerID)
|
||||
}
|
||||
}
|
||||
|
||||
pods, err := StartPodInformer(ctx, s.restCfg, s.Node, s.Logger)
|
||||
if err != nil {
|
||||
return fmt.Errorf("pod informer: %w", err)
|
||||
}
|
||||
|
||||
bird := &BirdManager{
|
||||
NodeName: s.Node,
|
||||
ConfigPath: "/etc/flock/bird/bird.conf",
|
||||
BirdcSocket: "/run/flock/bird.ctl",
|
||||
Logger: s.Logger,
|
||||
}
|
||||
if err := bird.SummaryRoutes(nc); err != nil {
|
||||
s.Logger.Warn("install summary routes", "err", err)
|
||||
}
|
||||
if err := bird.Render(nc, nil, nil, routerIDFromNodeIP(s.restCfg)); err != nil {
|
||||
s.Logger.Warn("initial bird render", "err", err)
|
||||
}
|
||||
// Re-render whenever NodeConfig changes (cheap).
|
||||
go func() {
|
||||
t := time.NewTicker(15 * time.Second)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
cur := s.NodeConfig.Load()
|
||||
if cur == nil {
|
||||
continue
|
||||
}
|
||||
_ = bird.SummaryRoutes(cur)
|
||||
_ = bird.Render(cur, nil, nil, routerIDFromNodeIP(s.restCfg))
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
handler := &PodHandler{
|
||||
Node: s.Node,
|
||||
Store: s.Store,
|
||||
IPAM: ipam,
|
||||
Pods: pods,
|
||||
NodeConfig: s.NodeConfig,
|
||||
SetupFunc: Setup,
|
||||
TeardownFunc: Teardown,
|
||||
AfterCommit: func() {
|
||||
// Future: collect anycast IPs from store snapshot, re-render bird.
|
||||
},
|
||||
}
|
||||
s.RPC.SetHandlers(handler.Add, handler.Del, handler.Check)
|
||||
s.Logger.Info("runtime ready",
|
||||
"asn", nc.Spec.BGP.ASN,
|
||||
"cidr6", nc.Spec.CIDR6,
|
||||
"cidr4", nc.Spec.CIDR4,
|
||||
"committed", len(s.Store.Snapshot()),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
// routerIDFromNodeIP picks a stable IPv4 to use as BIRD router-id. Uses
|
||||
// the host network for now; falls back to a synthesized value derived
|
||||
// from the node name if no v4 is reachable.
|
||||
func routerIDFromNodeIP(_ interface{}) string {
|
||||
// Best-effort: read the kernel route table for a default-route src.
|
||||
addrs, err := net.InterfaceAddrs()
|
||||
if err == nil {
|
||||
for _, a := range addrs {
|
||||
ipn, ok := a.(*net.IPNet)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
v4 := ipn.IP.To4()
|
||||
if v4 == nil || v4.IsLoopback() || v4.IsLinkLocalUnicast() {
|
||||
continue
|
||||
}
|
||||
return v4.String()
|
||||
}
|
||||
}
|
||||
// Fallback: 127.0.0.1 — bird will accept it but BGP peers won't like a
|
||||
// duplicate router-id. The agent log will scream above this if it fires.
|
||||
return "127.0.0.1"
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
//go:build !linux
|
||||
|
||||
package agent
|
||||
|
||||
import "context"
|
||||
|
||||
// configureRuntime is a no-op on non-Linux platforms. The agent only runs
|
||||
// in Kubernetes pods on Linux; this stub lets the package compile on
|
||||
// developer machines for unit tests.
|
||||
func (s *Server) configureRuntime(_ context.Context) error {
|
||||
s.Logger.Warn("non-Linux build: ADD handler will return errors")
|
||||
return nil
|
||||
}
|
||||
+36
-15
@@ -7,20 +7,18 @@ import (
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
)
|
||||
|
||||
// SocketPath is the unix socket on which flock-agent serves RPCs from the
|
||||
// CNI plugin. Mirrors pkg/cni.SocketPath; kept as a separate constant so the
|
||||
// agent package has no import-cycle on the CNI package.
|
||||
// CNI plugin.
|
||||
const SocketPath = "/run/flock/flock.sock"
|
||||
|
||||
// Server is the agent's runtime container: state store, kubernetes informers,
|
||||
// netlink, BIRD, nftables. Current state: state store, NodeConfig informer,
|
||||
// RPC dispatcher with stub ADD/DEL/CHECK handlers (will be replaced when
|
||||
// netlink + IPAM wire-up lands).
|
||||
// Server orchestrates the agent runtime: store, informers, IPAM, netns,
|
||||
// BIRD. Run() blocks until ctx is cancelled.
|
||||
type Server struct {
|
||||
Node string
|
||||
Store *Store
|
||||
@@ -31,16 +29,14 @@ type Server struct {
|
||||
restCfg *rest.Config
|
||||
}
|
||||
|
||||
// Config configures NewServer.
|
||||
type Config struct {
|
||||
Node string
|
||||
StatePath string // typically /var/lib/flock/allocations.json
|
||||
Socket string // typically /run/flock/flock.sock
|
||||
StatePath string
|
||||
Socket string
|
||||
Logger *slog.Logger
|
||||
Kubeconfig string // empty => in-cluster config
|
||||
Kubeconfig string
|
||||
}
|
||||
|
||||
// NewServer constructs a Server. It does NOT start any goroutines; call Run.
|
||||
func NewServer(cfg Config) (*Server, error) {
|
||||
if cfg.Node == "" {
|
||||
return nil, fmt.Errorf("Node must be set")
|
||||
@@ -85,9 +81,7 @@ func loadRestConfig(kubeconfig string) (*rest.Config, error) {
|
||||
return rest.InClusterConfig()
|
||||
}
|
||||
|
||||
// Run starts the agent and blocks until ctx is cancelled. M1.5 opens the
|
||||
// unix listener, starts the NodeConfig informer, and waits. The RPC handler
|
||||
// is still a no-op until M2.
|
||||
// Run blocks until ctx is cancelled.
|
||||
func (s *Server) Run(ctx context.Context) error {
|
||||
if err := os.MkdirAll(filepath.Dir(s.socket), 0o750); err != nil {
|
||||
return fmt.Errorf("mkdir socket dir: %w", err)
|
||||
@@ -108,12 +102,20 @@ func (s *Server) Run(ctx context.Context) error {
|
||||
// RPC dispatcher takes ownership of the listener.
|
||||
go s.RPC.serve(ctx, l)
|
||||
|
||||
// NodeConfig informer. Any error from the informer terminates Run.
|
||||
// NodeConfig informer.
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
errCh <- StartNodeConfigInformer(ctx, s.restCfg, s.Node, s.NodeConfig, s.Logger)
|
||||
}()
|
||||
|
||||
// Pod informer + Handlers + Bird are wired up by configureRuntime,
|
||||
// which is platform-specific (real on Linux, no-op stub elsewhere).
|
||||
go func() {
|
||||
if err := s.configureRuntime(ctx); err != nil {
|
||||
s.Logger.Error("runtime configure failed; ADD will return errors", "err", err)
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
s.Logger.Info("flock-agent stopping")
|
||||
@@ -122,3 +124,22 @@ func (s *Server) Run(ctx context.Context) error {
|
||||
return fmt.Errorf("informer: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// firstAvailableNodeConfig polls the cache up to `timeout`. Used to wait
|
||||
// for the operator-applied NodeConfig CR before booting the IPAM.
|
||||
func (s *Server) firstAvailableNodeConfig(ctx context.Context, timeout time.Duration) error {
|
||||
deadline := time.Now().Add(timeout)
|
||||
for {
|
||||
if s.NodeConfig.Load() != nil {
|
||||
return nil
|
||||
}
|
||||
if time.Now().After(deadline) {
|
||||
return fmt.Errorf("NodeConfig %q not observed within %s", s.Node, timeout)
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user