Files
flock/pkg/agent/server.go
T
Donavan Fritz eb1f5e0d8d
Build flock Image / build (push) Has been cancelled
M2: netlink, IPAM/handler wiring, BIRD sidecar, CNI installer
Code (Linux build, with no-op stubs for macOS dev):
- pkg/agent/netns_linux.go: ensureVeth → host-side configure (addrgenmode
  none, fe80::1/64, proxy_arp, forwarding) → move peer to pod ns →
  configure pod side (addr, default route via fe80::1, v4 169.254.1.1
  on-link gateway) → host /128 + /32 routes. Idempotent.
- pkg/agent/hostiface.go: deterministic host iface name flock<8hex> from
  FNV-1a-32(containerID).
- pkg/agent/annotations.go: parse flock.fritzlab.net/{ipv6,ipv4,cidr6,
  cidr4,ip-algo,anycast} with design-doc defaults; ParseCNIArgs for the
  K8S_POD_* keys kubelet sets.
- pkg/agent/podinfo.go: shared informer scoped to spec.nodeName==NODE,
  WaitForPod helper for ADD-vs-informer-sync race.
- pkg/agent/handlers.go: PodHandler does
    cache lookup → annotations → IPAM → store(pending) → SetupFunc →
    store(committed) → Result. Idempotent on retry. Del symmetric.
- pkg/routing/bird/config.go: text/template render with stable ordering;
  golden tests for host001 + anycast injection + sort stability.
- pkg/agent/bird.go: writes /etc/flock/bird/bird.conf, debounces 500ms,
  execs `birdc -s /run/flock/bird.ctl configure`. Installs blackhole
  kernel routes for the node summary CIDRs so BIRD's protocol kernel
  imports them.
- pkg/agent/runtime_linux.go: at startup, waits up to 60s for the per-
  node NodeConfig, reconciles committed allocations into IPAM.used,
  garbage-collects pending entries, builds PodHandler, swaps RPC
  handlers in.
- cmd/flock-installer: init-container binary that copies /opt/cni/bin/
  flock and writes 01-flock.conflist (lex-first so kubelet picks it
  over Calico's 10-calico.conflist on flock-labeled nodes).

Deploy:
- Dockerfile: alpine + iproute2 + bird2; multi-binary image.
- deploy/daemonset.yaml: install-cni init container; bird sidecar
  sharing /etc/flock/bird + /run/flock with the agent; ConfigMap-seeded
  bootstrap bird.conf so the sidecar boots before the agent renders.
  Privileged on flock-agent + install-cni; bird sidecar uses
  NET_ADMIN/RAW only.
- RBAC: pods + networkpolicies get/list/watch (the latter is reserved
  for M8 — harmless to grant now).

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
2026-04-24 22:33:48 -05:00

146 lines
3.4 KiB
Go

package agent
import (
"context"
"fmt"
"log/slog"
"net"
"os"
"path/filepath"
"time"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
// SocketPath is the unix socket on which flock-agent serves RPCs from the
// CNI plugin.
const SocketPath = "/run/flock/flock.sock"
// Server orchestrates the agent runtime: store, informers, IPAM, netns,
// BIRD. Run() blocks until ctx is cancelled.
type Server struct {
Node string
Store *Store
NodeConfig *NodeConfigCache
RPC *rpcServer
Logger *slog.Logger
socket string
restCfg *rest.Config
}
type Config struct {
Node string
StatePath string
Socket string
Logger *slog.Logger
Kubeconfig string
}
func NewServer(cfg Config) (*Server, error) {
if cfg.Node == "" {
return nil, fmt.Errorf("Node must be set")
}
if cfg.StatePath == "" {
cfg.StatePath = "/var/lib/flock/allocations.json"
}
if cfg.Socket == "" {
cfg.Socket = SocketPath
}
if cfg.Logger == nil {
cfg.Logger = slog.Default()
}
if err := os.MkdirAll(filepath.Dir(cfg.StatePath), 0o750); err != nil {
return nil, fmt.Errorf("mkdir state dir: %w", err)
}
store, err := NewStore(cfg.StatePath, cfg.Node)
if err != nil {
return nil, fmt.Errorf("open store: %w", err)
}
restCfg, err := loadRestConfig(cfg.Kubeconfig)
if err != nil {
return nil, fmt.Errorf("load kube config: %w", err)
}
return &Server{
Node: cfg.Node,
Store: store,
NodeConfig: &NodeConfigCache{},
RPC: newRPCServer(cfg.Logger),
Logger: cfg.Logger,
socket: cfg.Socket,
restCfg: restCfg,
}, nil
}
func loadRestConfig(kubeconfig string) (*rest.Config, error) {
if kubeconfig != "" {
return clientcmd.BuildConfigFromFlags("", kubeconfig)
}
return rest.InClusterConfig()
}
// Run blocks until ctx is cancelled.
func (s *Server) Run(ctx context.Context) error {
if err := os.MkdirAll(filepath.Dir(s.socket), 0o750); err != nil {
return fmt.Errorf("mkdir socket dir: %w", err)
}
_ = os.Remove(s.socket)
l, err := net.Listen("unix", s.socket)
if err != nil {
return fmt.Errorf("listen %s: %w", s.socket, err)
}
defer l.Close()
s.Logger.Info("flock-agent started",
"node", s.Node,
"socket", s.socket,
"allocations", len(s.Store.Snapshot()),
)
// RPC dispatcher takes ownership of the listener.
go s.RPC.serve(ctx, l)
// NodeConfig informer.
errCh := make(chan error, 1)
go func() {
errCh <- StartNodeConfigInformer(ctx, s.restCfg, s.Node, s.NodeConfig, s.Logger)
}()
// Pod informer + Handlers + Bird are wired up by configureRuntime,
// which is platform-specific (real on Linux, no-op stub elsewhere).
go func() {
if err := s.configureRuntime(ctx); err != nil {
s.Logger.Error("runtime configure failed; ADD will return errors", "err", err)
}
}()
select {
case <-ctx.Done():
s.Logger.Info("flock-agent stopping")
return nil
case err := <-errCh:
return fmt.Errorf("informer: %w", err)
}
}
// firstAvailableNodeConfig polls the cache up to `timeout`. Used to wait
// for the operator-applied NodeConfig CR before booting the IPAM.
func (s *Server) firstAvailableNodeConfig(ctx context.Context, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
for {
if s.NodeConfig.Load() != nil {
return nil
}
if time.Now().After(deadline) {
return fmt.Errorf("NodeConfig %q not observed within %s", s.Node, timeout)
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(200 * time.Millisecond):
}
}
}