From 39ede9130b008c47e8e0cb5bb1591d2454ed7b31 Mon Sep 17 00:00:00 2001 From: Donavan Fritz Date: Sat, 25 Apr 2026 09:25:58 -0500 Subject: [PATCH] netpol: NetworkPolicy v1 enforcement via nftables MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New pkg/agent/netpol implementing standard networking.k8s.io/v1 NetworkPolicy. Pipeline: pods + policies + namespaces → Translate → Render → Apply Supports ingress + egress, all three peer types (podSelector, namespaceSelector, ipBlock with except), numeric ports + port ranges, default-deny semantics derived from PolicyTypes (or inferred from non-empty Spec.Egress when unset). Apply path is `nft -f -` shell-out — single transaction, atomic, kernel guarantees partial-failure rollback. Idempotent dedup via last-applied script. Reconcile triggers: informer events, 30s self-heal tick, every CNI ADD/DEL. Verified against the three live cluster NetPols (calico-apiserver, remote-proxies/lodge-home-assistant, storage/garage-admin-restrict). Fuzz target stitches Translate + Render with random selector and peer inputs; 21 unit tests cover the policy semantics. Named ports skip with a warn — deferred until kubelet exposes them in a form that doesn't require shadowing pod state. Dockerfile: + nftables. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- Dockerfile | 2 +- pkg/agent/netpol/apply_linux.go | 85 ++++ pkg/agent/netpol/apply_stub.go | 16 + pkg/agent/netpol/cluster_fixtures_test.go | 250 ++++++++++++ pkg/agent/netpol/doc.go | 44 +++ pkg/agent/netpol/informers.go | 222 +++++++++++ pkg/agent/netpol/reconciler.go | 115 ++++++ pkg/agent/netpol/reconciler_test.go | 160 ++++++++ pkg/agent/netpol/render.go | 322 +++++++++++++++ pkg/agent/netpol/render_test.go | 219 +++++++++++ pkg/agent/netpol/translator.go | 443 +++++++++++++++++++++ pkg/agent/netpol/translator_fuzz_test.go | 147 +++++++ pkg/agent/netpol/translator_test.go | 452 ++++++++++++++++++++++ pkg/agent/netpol/types.go | 147 +++++++ pkg/agent/netpol_bridge.go | 56 +++ pkg/agent/runtime_linux.go | 20 +- 16 files changed, 2698 insertions(+), 2 deletions(-) create mode 100644 pkg/agent/netpol/apply_linux.go create mode 100644 pkg/agent/netpol/apply_stub.go create mode 100644 pkg/agent/netpol/cluster_fixtures_test.go create mode 100644 pkg/agent/netpol/doc.go create mode 100644 pkg/agent/netpol/informers.go create mode 100644 pkg/agent/netpol/reconciler.go create mode 100644 pkg/agent/netpol/reconciler_test.go create mode 100644 pkg/agent/netpol/render.go create mode 100644 pkg/agent/netpol/render_test.go create mode 100644 pkg/agent/netpol/translator.go create mode 100644 pkg/agent/netpol/translator_fuzz_test.go create mode 100644 pkg/agent/netpol/translator_test.go create mode 100644 pkg/agent/netpol/types.go create mode 100644 pkg/agent/netpol_bridge.go diff --git a/Dockerfile b/Dockerfile index b6f7374..de7d87d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -21,7 +21,7 @@ RUN CGO_ENABLED=0 go build -trimpath \ -o /out/flock-installer ./cmd/flock-installer FROM alpine:3.21 -RUN apk add --no-cache iproute2 bird ca-certificates +RUN apk add --no-cache iproute2 bird nftables ca-certificates COPY --from=build /out/flock /usr/local/bin/flock COPY --from=build /out/flock-agent /usr/local/bin/flock-agent COPY --from=build /out/flock-installer /usr/local/bin/flock-installer diff --git a/pkg/agent/netpol/apply_linux.go b/pkg/agent/netpol/apply_linux.go new file mode 100644 index 0000000..3ae7fe4 --- /dev/null +++ b/pkg/agent/netpol/apply_linux.go @@ -0,0 +1,85 @@ +//go:build linux + +package netpol + +import ( + "bytes" + "context" + "fmt" + "os/exec" + "time" +) + +// Applier hands rendered nft scripts to the kernel via `nft -f -`. +// nftables guarantees the entire script applies atomically — if any line +// is rejected, the previous ruleset stays intact. +// +// Applier maintains the last-applied script string and skips the exec +// when the new render is byte-identical, so a 5s reconcile tick on a +// quiet cluster is cheap. +type Applier struct { + // NftPath is the path to the nft binary. Empty means "look up `nft` + // on PATH". Tests set this to a fake. + NftPath string + + // Timeout bounds an individual nft invocation; if zero, defaults to + // 5 seconds. + Timeout time.Duration + + last string +} + +// Apply runs `nft -f -` with the supplied script. Idempotent: if script +// equals the last successful application, this is a no-op. +// +// Returns an error from nft (with stderr captured) if the script is +// malformed or the kernel rejects it. +func (a *Applier) Apply(ctx context.Context, script string) error { + if script == a.last { + return nil + } + timeout := a.Timeout + if timeout == 0 { + timeout = 5 * time.Second + } + bin := a.NftPath + if bin == "" { + bin = "nft" + } + cctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + cmd := exec.CommandContext(cctx, bin, "-f", "-") + cmd.Stdin = bytes.NewBufferString(script) + var stderr bytes.Buffer + cmd.Stderr = &stderr + if err := cmd.Run(); err != nil { + return fmt.Errorf("nft -f -: %w: %s", err, stderr.String()) + } + a.last = script + return nil +} + +// Clear tears down the flock NetworkPolicy table — used by graceful +// shutdown so a stopping agent doesn't leave stale enforcement behind. +// Best-effort: if nft is missing or the table doesn't exist, returns +// nil. +func (a *Applier) Clear(ctx context.Context) error { + timeout := a.Timeout + if timeout == 0 { + timeout = 5 * time.Second + } + bin := a.NftPath + if bin == "" { + bin = "nft" + } + cctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + cmd := exec.CommandContext(cctx, bin, "destroy", "table", "inet", "flock_netpol") + if err := cmd.Run(); err != nil { + // nft returns non-zero if the table doesn't exist — that's a + // success for our purposes. + return nil + } + a.last = "" + return nil +} diff --git a/pkg/agent/netpol/apply_stub.go b/pkg/agent/netpol/apply_stub.go new file mode 100644 index 0000000..65d12db --- /dev/null +++ b/pkg/agent/netpol/apply_stub.go @@ -0,0 +1,16 @@ +//go:build !linux + +package netpol + +import "context" + +// Applier is a no-op on non-Linux build hosts so unit tests run on macOS +// without nft. +type Applier struct { + NftPath string + Timeout interface{} + last string +} + +func (a *Applier) Apply(_ context.Context, script string) error { a.last = script; return nil } +func (a *Applier) Clear(_ context.Context) error { a.last = ""; return nil } diff --git a/pkg/agent/netpol/cluster_fixtures_test.go b/pkg/agent/netpol/cluster_fixtures_test.go new file mode 100644 index 0000000..a74f32f --- /dev/null +++ b/pkg/agent/netpol/cluster_fixtures_test.go @@ -0,0 +1,250 @@ +package netpol + +import ( + "net" + "strings" + "testing" + + corev1 "k8s.io/api/core/v1" + netv1 "k8s.io/api/networking/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" +) + +// These fixtures mirror the three NetworkPolicies live in the sjc001 +// cluster on 2026-04-25. They serve as integration-shaped tests: the +// translator + renderer must produce a sensible nft script for each. +// +// Source of truth (refresh by running `kubectl get netpol -A -o yaml`): +// +// - calico-apiserver/allow-apiserver +// - remote-proxies/lodge-home-assistant-ingress +// - storage/garage-admin-restrict + +// allowApiserverPolicy: TCP/5443 ingress to apiserver=true pods, no peer +// restriction (allow-from-anywhere on that port). +func allowApiserverPolicy() netv1.NetworkPolicy { + tcp := corev1.ProtocolTCP + port := intstr.FromInt32(5443) + return netv1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{Namespace: "calico-apiserver", Name: "allow-apiserver"}, + Spec: netv1.NetworkPolicySpec{ + PodSelector: metav1.LabelSelector{MatchLabels: map[string]string{"apiserver": "true"}}, + PolicyTypes: []netv1.PolicyType{netv1.PolicyTypeIngress}, + Ingress: []netv1.NetworkPolicyIngressRule{{ + Ports: []netv1.NetworkPolicyPort{{Protocol: &tcp, Port: &port}}, + }}, + }, + } +} + +// lodgeHomeAssistantPolicy: TCP/8080 from any pod in the `edge` namespace +// to pods labelled app=lodge-home-assistant. +func lodgeHomeAssistantPolicy() netv1.NetworkPolicy { + tcp := corev1.ProtocolTCP + port := intstr.FromInt32(8080) + return netv1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{Namespace: "remote-proxies", Name: "lodge-home-assistant-ingress"}, + Spec: netv1.NetworkPolicySpec{ + PodSelector: metav1.LabelSelector{MatchLabels: map[string]string{"app": "lodge-home-assistant"}}, + PolicyTypes: []netv1.PolicyType{netv1.PolicyTypeIngress}, + Ingress: []netv1.NetworkPolicyIngressRule{{ + From: []netv1.NetworkPolicyPeer{{ + NamespaceSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"kubernetes.io/metadata.name": "edge"}, + }, + }}, + Ports: []netv1.NetworkPolicyPort{{Protocol: &tcp, Port: &port}}, + }}, + }, + } +} + +// garageAdminPolicy: complex two-rule policy. +// +// 1. Allow TCP/{3900, 80, 3901} from anywhere. +// 2. Allow TCP/3903 only from pods in `edge` or `storage`. +func garageAdminPolicy() netv1.NetworkPolicy { + tcp := corev1.ProtocolTCP + p3900 := intstr.FromInt32(3900) + p80 := intstr.FromInt32(80) + p3901 := intstr.FromInt32(3901) + p3903 := intstr.FromInt32(3903) + return netv1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{Namespace: "storage", Name: "garage-admin-restrict"}, + Spec: netv1.NetworkPolicySpec{ + PodSelector: metav1.LabelSelector{MatchLabels: map[string]string{"app": "garage"}}, + PolicyTypes: []netv1.PolicyType{netv1.PolicyTypeIngress}, + Ingress: []netv1.NetworkPolicyIngressRule{ + { + Ports: []netv1.NetworkPolicyPort{ + {Protocol: &tcp, Port: &p3900}, + {Protocol: &tcp, Port: &p80}, + {Protocol: &tcp, Port: &p3901}, + }, + }, + { + From: []netv1.NetworkPolicyPeer{ + {NamespaceSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"kubernetes.io/metadata.name": "edge"}, + }}, + {NamespaceSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"kubernetes.io/metadata.name": "storage"}, + }}, + }, + Ports: []netv1.NetworkPolicyPort{{Protocol: &tcp, Port: &p3903}}, + }, + }, + }, + } +} + +// TestClusterFixture_AllowApiserver — pod selected by the policy gets +// isolated; the rendered script accepts TCP/5443 from anywhere. +func TestClusterFixture_AllowApiserver(t *testing.T) { + pod := Pod{ + Namespace: "calico-apiserver", + Name: "calico-apiserver-1", + Labels: map[string]string{"apiserver": "true"}, + HostIface: "flock00000001", + IPs: []net.IP{mustIP("2001:db8::1")}, + } + out, err := Translate(Inputs{ + LocalPods: []Pod{pod}, + Policies: []netv1.NetworkPolicy{allowApiserverPolicy()}, + }, nil) + if err != nil { + t.Fatal(err) + } + in, _ := isolationFor(out, "calico-apiserver/calico-apiserver-1") + if !in { + t.Fatalf("apiserver pod should be isolated for ingress") + } + script := Render(out) + if !strings.Contains(script, "tcp dport 5443 accept") { + t.Fatalf("expected TCP/5443 allow:\n%s", script) + } + // No peer filter — allow-all-on-port. + if strings.Contains(script, "ip6 saddr {") || strings.Contains(script, "ip saddr {") { + t.Fatalf("expected no peer filter for allow-from-anywhere:\n%s", script) + } +} + +// TestClusterFixture_LodgeHomeAssistant — pod isolated; only TCP/8080 +// from edge namespace is allowed. +func TestClusterFixture_LodgeHomeAssistant(t *testing.T) { + pod := Pod{ + Namespace: "remote-proxies", + Name: "lodge-home-assistant-0", + Labels: map[string]string{"app": "lodge-home-assistant"}, + HostIface: "flock00000002", + IPs: []net.IP{mustIP("2001:db8::2")}, + } + traefik := PeerPod{ + Namespace: "edge", Name: "traefik-0", + Labels: map[string]string{"app": "traefik"}, + IPs: []net.IP{mustIP("2001:db8::aa")}, + } + stranger := PeerPod{ + Namespace: "default", Name: "random", + Labels: map[string]string{"app": "random"}, + IPs: []net.IP{mustIP("2001:db8::bb")}, + } + out, err := Translate(Inputs{ + LocalPods: []Pod{pod}, + PeerPods: []PeerPod{traefik, stranger}, + Namespaces: []Namespace{ + {Name: "edge", Labels: map[string]string{"kubernetes.io/metadata.name": "edge"}}, + {Name: "default", Labels: map[string]string{"kubernetes.io/metadata.name": "default"}}, + {Name: "remote-proxies", Labels: map[string]string{"kubernetes.io/metadata.name": "remote-proxies"}}, + }, + Policies: []netv1.NetworkPolicy{lodgeHomeAssistantPolicy()}, + }, nil) + if err != nil { + t.Fatal(err) + } + if len(out.Rules) != 1 { + t.Fatalf("expected 1 rule, got %d", len(out.Rules)) + } + r := out.Rules[0] + // Peer should be exactly traefik's IP, not stranger's. + got := map[string]bool{} + for _, c := range r.PeerCIDRs { + got[c.IP.String()] = true + } + if !got["2001:db8::aa"] { + t.Fatalf("traefik IP missing from rule: %v", got) + } + if got["2001:db8::bb"] { + t.Fatalf("stranger IP leaked into rule") + } + script := Render(out) + if !strings.Contains(script, "tcp dport 8080 accept") { + t.Fatalf("expected TCP/8080 allow:\n%s", script) + } +} + +// TestClusterFixture_Garage — verifies the two-rule policy: +// +// 1. ports {3900, 80, 3901} accept from any peer +// 2. port 3903 accept only from edge or storage namespaces +func TestClusterFixture_Garage(t *testing.T) { + pod := Pod{ + Namespace: "storage", Name: "garage-0", + Labels: map[string]string{"app": "garage"}, + HostIface: "flock00000003", + IPs: []net.IP{mustIP("2001:db8::3")}, + } + storagePeer := PeerPod{ + Namespace: "storage", Name: "garage-1", + Labels: map[string]string{"app": "garage"}, + IPs: []net.IP{mustIP("2001:db8::31")}, + } + edgePeer := PeerPod{ + Namespace: "edge", Name: "traefik-0", + Labels: map[string]string{"app": "traefik"}, + IPs: []net.IP{mustIP("2001:db8::41")}, + } + stranger := PeerPod{ + Namespace: "default", Name: "random", + Labels: map[string]string{"app": "random"}, + IPs: []net.IP{mustIP("2001:db8::ff")}, + } + out, err := Translate(Inputs{ + LocalPods: []Pod{pod}, + PeerPods: []PeerPod{storagePeer, edgePeer, stranger}, + Namespaces: []Namespace{ + {Name: "edge", Labels: map[string]string{"kubernetes.io/metadata.name": "edge"}}, + {Name: "storage", Labels: map[string]string{"kubernetes.io/metadata.name": "storage"}}, + {Name: "default", Labels: map[string]string{"kubernetes.io/metadata.name": "default"}}, + }, + Policies: []netv1.NetworkPolicy{garageAdminPolicy()}, + }, nil) + if err != nil { + t.Fatal(err) + } + // Two ingress rules in the source policy → two Rules out (one per + // peer set, ports inline). + if len(out.Rules) != 2 { + t.Fatalf("expected 2 rules (one per ingress entry), got %d", len(out.Rules)) + } + script := Render(out) + for _, want := range []string{ + "tcp dport 3900 accept", + "tcp dport 80 accept", + "tcp dport 3901 accept", + "tcp dport 3903 accept", + } { + if !strings.Contains(script, want) { + t.Errorf("missing %q in script:\n%s", want, script) + } + } + // The 3903 rule must carry a peer filter for both edge and storage + // peer IPs but not the stranger. + if !strings.Contains(script, "2001:db8::31/128") || !strings.Contains(script, "2001:db8::41/128") { + t.Fatalf("expected edge+storage peer IPs in 3903 rule:\n%s", script) + } + if strings.Contains(script, "2001:db8::ff/128") { + t.Fatalf("stranger IP must not appear:\n%s", script) + } +} diff --git a/pkg/agent/netpol/doc.go b/pkg/agent/netpol/doc.go new file mode 100644 index 0000000..79fed75 --- /dev/null +++ b/pkg/agent/netpol/doc.go @@ -0,0 +1,44 @@ +// Package netpol implements Kubernetes NetworkPolicy enforcement for flock. +// +// # Model +// +// NetworkPolicy is a Kubernetes-native API (`networking.k8s.io/v1`) that +// describes which pods may receive traffic (Ingress) and / or initiate +// traffic (Egress). The semantics are isolation by selection: a pod that is +// selected by *any* NetworkPolicy in a given direction becomes default-deny +// in that direction, plus the union of all "allow" rules from every policy +// that selects it. A pod selected by no policy is unrestricted. +// +// flock enforces these semantics with nftables. Each agent is responsible +// for the pods scheduled on its own node — peer addresses (from +// podSelector / namespaceSelector / ipBlock peers) come from a cluster-wide +// informer set so the agent can resolve peers that live elsewhere. +// +// # Pipeline +// +// The work is split into four stages with hard boundaries between them so +// each can be tested in isolation: +// +// 1. Informers (informers.go) — watch NetworkPolicies, Namespaces, and +// all Pods in the cluster. Maintain indices the translator can query. +// +// 2. Translator (translator.go) — pure function from +// (NetworkPolicy set, Namespace set, Pod set, local-node pod set) to +// []Rule. No I/O, no hidden state — straightforward to fuzz and unit +// test. Implements the default-deny semantics and the peer-resolution +// rules from the NetworkPolicy spec. +// +// 3. Renderer (render.go) — pure function from []Rule to an nft script +// (string). Output is deterministic so the apply stage can de-dupe. +// +// 4. Apply (apply_linux.go) — shell out to `nft -f -` for an atomic +// reconfiguration. nftables guarantees the whole script applies as a +// single transaction; partial failures roll back automatically. +// +// # Why nftables (and not eBPF) +// +// Atomic ruleset transactions, kernel-native, no userspace ebpf-loader to +// maintain, and behaviour an operator can read directly with +// `nft list ruleset`. The cost is that we walk per-pod chains in software, +// which is fine at the cluster sizes flock targets. +package netpol diff --git a/pkg/agent/netpol/informers.go b/pkg/agent/netpol/informers.go new file mode 100644 index 0000000..b39eb24 --- /dev/null +++ b/pkg/agent/netpol/informers.go @@ -0,0 +1,222 @@ +package netpol + +import ( + "context" + "fmt" + "log/slog" + "net" + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + netv1 "k8s.io/api/networking/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" +) + +// World aggregates the cluster-wide caches the reconciler queries on +// every pass: NetworkPolicies, Namespaces, and all Pods (for peer +// resolution). Each field is safe for concurrent reads. +type World struct { + logger *slog.Logger + + mu sync.RWMutex + policies map[string]netv1.NetworkPolicy // key = ns/name + namespaces map[string]Namespace + peerPods map[string]PeerPod // key = ns/name + + onChange []func() +} + +// NewWorld returns an empty World. Callers should call Start to populate +// it; before Start, the snapshot accessors return empty slices. +func NewWorld(logger *slog.Logger) *World { + return &World{ + logger: logger, + policies: map[string]netv1.NetworkPolicy{}, + namespaces: map[string]Namespace{}, + peerPods: map[string]PeerPod{}, + } +} + +// OnChange registers a callback fired (synchronously, inside the informer +// event handler) whenever any watched object changes. The reconciler +// uses this to debounce policy reloads. +func (w *World) OnChange(f func()) { + w.mu.Lock() + defer w.mu.Unlock() + w.onChange = append(w.onChange, f) +} + +func (w *World) fireChange() { + w.mu.RLock() + cbs := append([]func(){}, w.onChange...) + w.mu.RUnlock() + for _, f := range cbs { + f() + } +} + +// Start launches three informers (NetworkPolicy, Namespace, Pod) against +// the cluster API. It blocks until each cache reports synced. The caller +// is responsible for cancelling ctx on shutdown. +func (w *World) Start(ctx context.Context, cfg *rest.Config) error { + cs, err := kubernetes.NewForConfig(cfg) + if err != nil { + return fmt.Errorf("kubernetes client: %w", err) + } + factory := informers.NewSharedInformerFactory(cs, 10*time.Minute) + + npInformer := factory.Networking().V1().NetworkPolicies().Informer() + nsInformer := factory.Core().V1().Namespaces().Informer() + podInformer := factory.Core().V1().Pods().Informer() + + if _, err := npInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { w.onPolicy(obj, false) }, + UpdateFunc: func(_, n interface{}) { w.onPolicy(n, false) }, + DeleteFunc: func(obj interface{}) { w.onPolicy(obj, true) }, + }); err != nil { + return fmt.Errorf("add netpol handler: %w", err) + } + if _, err := nsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { w.onNamespace(obj, false) }, + UpdateFunc: func(_, n interface{}) { w.onNamespace(n, false) }, + DeleteFunc: func(obj interface{}) { w.onNamespace(obj, true) }, + }); err != nil { + return fmt.Errorf("add ns handler: %w", err) + } + if _, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { w.onPod(obj, false) }, + UpdateFunc: func(_, n interface{}) { w.onPod(n, false) }, + DeleteFunc: func(obj interface{}) { w.onPod(obj, true) }, + }); err != nil { + return fmt.Errorf("add pod handler: %w", err) + } + + w.logger.Info("netpol informers starting") + factory.Start(ctx.Done()) + if !cache.WaitForCacheSync(ctx.Done(), + npInformer.HasSynced, nsInformer.HasSynced, podInformer.HasSynced) { + return fmt.Errorf("netpol informer caches failed to sync") + } + w.logger.Info("netpol informers synced", + "netpols", len(w.snapshotPolicies()), + "namespaces", len(w.snapshotNamespaces()), + "peer_pods", len(w.snapshotPeerPods())) + return nil +} + +// unwrapDFSU lifts a DeletedFinalStateUnknown wrapper if present. +func unwrapDFSU(obj interface{}) interface{} { + if d, ok := obj.(cache.DeletedFinalStateUnknown); ok { + return d.Obj + } + return obj +} + +func (w *World) onPolicy(obj interface{}, deleted bool) { + p, ok := unwrapDFSU(obj).(*netv1.NetworkPolicy) + if !ok || p == nil { + return + } + key := p.Namespace + "/" + p.Name + w.mu.Lock() + if deleted { + delete(w.policies, key) + } else { + w.policies[key] = *p + } + w.mu.Unlock() + w.fireChange() +} + +func (w *World) onNamespace(obj interface{}, deleted bool) { + ns, ok := unwrapDFSU(obj).(*corev1.Namespace) + if !ok || ns == nil { + return + } + w.mu.Lock() + if deleted { + delete(w.namespaces, ns.Name) + } else { + w.namespaces[ns.Name] = Namespace{Name: ns.Name, Labels: ns.Labels} + } + w.mu.Unlock() + w.fireChange() +} + +func (w *World) onPod(obj interface{}, deleted bool) { + pod, ok := unwrapDFSU(obj).(*corev1.Pod) + if !ok || pod == nil { + return + } + key := pod.Namespace + "/" + pod.Name + w.mu.Lock() + if deleted { + delete(w.peerPods, key) + } else { + w.peerPods[key] = PeerPod{ + Namespace: pod.Namespace, + Name: pod.Name, + Labels: pod.Labels, + IPs: podIPs(pod), + } + } + w.mu.Unlock() + w.fireChange() +} + +// podIPs extracts every PodIP from the status. Pods without status (still +// scheduling) yield nil — safe for the translator. +func podIPs(p *corev1.Pod) []net.IP { + out := make([]net.IP, 0, len(p.Status.PodIPs)) + for _, addr := range p.Status.PodIPs { + ip := net.ParseIP(addr.IP) + if ip == nil { + continue + } + out = append(out, ip) + } + if len(out) == 0 && p.Status.PodIP != "" { + // Older clusters may populate PodIP but not PodIPs; tolerate both. + if ip := net.ParseIP(p.Status.PodIP); ip != nil { + out = append(out, ip) + } + } + return out +} + +// snapshotPolicies returns a defensive copy of the policy map's values. +func (w *World) snapshotPolicies() []netv1.NetworkPolicy { + w.mu.RLock() + defer w.mu.RUnlock() + out := make([]netv1.NetworkPolicy, 0, len(w.policies)) + for _, p := range w.policies { + out = append(out, p) + } + return out +} + +// snapshotNamespaces returns a defensive copy of the namespace map. +func (w *World) snapshotNamespaces() []Namespace { + w.mu.RLock() + defer w.mu.RUnlock() + out := make([]Namespace, 0, len(w.namespaces)) + for _, n := range w.namespaces { + out = append(out, n) + } + return out +} + +// snapshotPeerPods returns a defensive copy of the peer-pod map. +func (w *World) snapshotPeerPods() []PeerPod { + w.mu.RLock() + defer w.mu.RUnlock() + out := make([]PeerPod, 0, len(w.peerPods)) + for _, p := range w.peerPods { + out = append(out, p) + } + return out +} diff --git a/pkg/agent/netpol/reconciler.go b/pkg/agent/netpol/reconciler.go new file mode 100644 index 0000000..d4eaa5f --- /dev/null +++ b/pkg/agent/netpol/reconciler.go @@ -0,0 +1,115 @@ +package netpol + +import ( + "context" + "log/slog" + "sync" + "time" +) + +// LocalPodSource produces the set of local pods (with their HostIface and +// IPs) the reconciler should enforce policy for. The agent's allocation +// store + pod informer is the natural implementer. +// +// The function is called inside the reconciler under no lock, so it must +// be safe for concurrent invocation. +type LocalPodSource func() []Pod + +// Reconciler turns the World cache + LocalPodSource into nft rule +// applications. One reconcile pass: +// +// pods + policies + namespaces → Translate → Render → Apply +// +// The pass runs on: +// +// - World.OnChange (any informer event), debounced through a single +// coalescing channel, +// - a periodic tick (default 30s) so we self-heal if the kernel +// ruleset diverges from desired (e.g. someone manually `nft flush`d), +// - and explicit Trigger() calls (the agent fires this from CNI ADD / +// DEL hooks so policy lands before pod traffic flows). +type Reconciler struct { + World *World + Local LocalPodSource + Applier *Applier + Logger *slog.Logger + Interval time.Duration + + mu sync.Mutex + trigger chan struct{} +} + +// NewReconciler returns a Reconciler ready to Run. Interval defaults to +// 30s if zero. +func NewReconciler(world *World, local LocalPodSource, applier *Applier, logger *slog.Logger) *Reconciler { + r := &Reconciler{ + World: world, + Local: local, + Applier: applier, + Logger: logger, + Interval: 30 * time.Second, + trigger: make(chan struct{}, 1), + } + world.OnChange(r.Trigger) + return r +} + +// Trigger requests one reconcile pass. Coalesces — if a pass is already +// pending, the call is a no-op. +func (r *Reconciler) Trigger() { + select { + case r.trigger <- struct{}{}: + default: + } +} + +// Run blocks until ctx is cancelled. Reconciles on Trigger or every +// Interval; calls Applier.Clear on shutdown. +func (r *Reconciler) Run(ctx context.Context) { + t := time.NewTicker(r.Interval) + defer t.Stop() + r.reconcile(ctx) // initial pass + for { + select { + case <-ctx.Done(): + // Best-effort: drop our table on graceful exit. If the agent + // crashed without doing this, the next agent's first apply + // will replace the stale table atomically anyway. + _ = r.Applier.Clear(context.Background()) + return + case <-t.C: + r.reconcile(ctx) + case <-r.trigger: + r.reconcile(ctx) + } + } +} + +func (r *Reconciler) reconcile(ctx context.Context) { + r.mu.Lock() + defer r.mu.Unlock() + + in := Inputs{ + LocalPods: r.Local(), + PeerPods: r.World.snapshotPeerPods(), + Namespaces: r.World.snapshotNamespaces(), + Policies: r.World.snapshotPolicies(), + } + out, err := Translate(in, func(s string) { r.Logger.Warn(s) }) + if err != nil { + r.Logger.Warn("netpol translate failed", "err", err) + return + } + script := Render(out) + if err := r.Applier.Apply(ctx, script); err != nil { + r.Logger.Warn("netpol apply failed", "err", err) + return + } + if len(out.Isolated) > 0 { + r.Logger.Info("netpol applied", + "isolated_chains", len(out.Isolated), + "rules", len(out.Rules), + "local_pods", len(in.LocalPods), + "policies", len(in.Policies)) + } +} diff --git a/pkg/agent/netpol/reconciler_test.go b/pkg/agent/netpol/reconciler_test.go new file mode 100644 index 0000000..bc7fd36 --- /dev/null +++ b/pkg/agent/netpol/reconciler_test.go @@ -0,0 +1,160 @@ +package netpol + +import ( + "context" + "io" + "log/slog" + "net" + "strings" + "sync" + "sync/atomic" + "testing" + + corev1 "k8s.io/api/core/v1" + netv1 "k8s.io/api/networking/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// fakeApplier captures Apply calls for assertion. Drop-in for *Applier in +// tests because Reconciler depends only on the (Apply, Clear) pair. +type fakeApplier struct { + mu sync.Mutex + calls []string + last string + err error +} + +func (f *fakeApplier) Apply(_ context.Context, script string) error { + f.mu.Lock() + defer f.mu.Unlock() + if f.err != nil { + return f.err + } + if script == f.last { + return nil // de-dup like the real Applier + } + f.last = script + f.calls = append(f.calls, script) + return nil +} +func (f *fakeApplier) Clear(_ context.Context) error { return nil } +func (f *fakeApplier) lastScript() string { + f.mu.Lock() + defer f.mu.Unlock() + return f.last +} +func (f *fakeApplier) callCount() int { + f.mu.Lock() + defer f.mu.Unlock() + return len(f.calls) +} + +// applierIface is satisfied by *Applier and *fakeApplier; we narrow +// Reconciler to this in tests by adapting via a tiny wrapper. +type applierIface interface { + Apply(context.Context, string) error + Clear(context.Context) error +} + +// reconcileOnce drives one pass synchronously without spinning a goroutine. +func reconcileOnce(t *testing.T, world *World, local LocalPodSource, app applierIface) { + t.Helper() + in := Inputs{ + LocalPods: local(), + PeerPods: world.snapshotPeerPods(), + Namespaces: world.snapshotNamespaces(), + Policies: world.snapshotPolicies(), + } + out, err := Translate(in, nil) + if err != nil { + t.Fatal(err) + } + if err := app.Apply(context.Background(), Render(out)); err != nil { + t.Fatal(err) + } +} + +// silentLogger returns a slog.Logger discarding everything — keeps test +// output tidy. +func silentLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{})) +} + +func TestReconciler_NoIsolatedPods_ShortScript(t *testing.T) { + world := NewWorld(silentLogger()) + local := func() []Pod { return nil } + app := &fakeApplier{} + reconcileOnce(t, world, local, app) + got := app.lastScript() + if !strings.Contains(got, "table inet flock_netpol") { + t.Fatalf("missing table:\n%s", got) + } + // Without any isolated pods the base chain has policy accept and no + // jumps. That's the desired "open" state. + if strings.Contains(got, "jump pod_") { + t.Fatalf("unexpected jump in open state:\n%s", got) + } +} + +func TestReconciler_PolicyIsolatesLocalPod(t *testing.T) { + world := NewWorld(silentLogger()) + + // Seed a default-deny policy in ns1. + world.onPolicy(&netv1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{Namespace: "ns1", Name: "deny-all"}, + Spec: netv1.NetworkPolicySpec{ + PodSelector: metav1.LabelSelector{}, + PolicyTypes: []netv1.PolicyType{netv1.PolicyTypeIngress}, + }, + }, false) + + local := func() []Pod { + return []Pod{{ + Namespace: "ns1", Name: "web", + Labels: map[string]string{"app": "web"}, + HostIface: "flock00000001", + IPs: []net.IP{mustIP("2001:db8::1")}, + }} + } + app := &fakeApplier{} + reconcileOnce(t, world, local, app) + got := app.lastScript() + + if !strings.Contains(got, "_ingress {") { + t.Fatalf("expected pod ingress chain:\n%s", got) + } + if !strings.Contains(got, "drop") { + t.Fatalf("expected default-deny drop:\n%s", got) + } + if !strings.Contains(got, `oifname "flock00000001"`) { + t.Fatalf("expected base-chain jump anchored on veth:\n%s", got) + } +} + +func TestReconciler_DedupesIdenticalRender(t *testing.T) { + world := NewWorld(silentLogger()) + local := func() []Pod { + return []Pod{{ + Namespace: "ns1", Name: "web", HostIface: "f1", + IPs: []net.IP{mustIP("2001:db8::1")}, + }} + } + app := &fakeApplier{} + reconcileOnce(t, world, local, app) + reconcileOnce(t, world, local, app) + reconcileOnce(t, world, local, app) + if got := app.callCount(); got != 1 { + t.Fatalf("expected 1 unique apply, got %d", got) + } +} + +func TestReconciler_OnChangeFiresTrigger(t *testing.T) { + world := NewWorld(silentLogger()) + var triggered atomic.Int32 + world.OnChange(func() { triggered.Add(1) }) + world.onNamespace(&corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, false) + world.onPolicy(&netv1.NetworkPolicy{ObjectMeta: metav1.ObjectMeta{Namespace: "foo", Name: "p"}}, false) + if triggered.Load() != 2 { + t.Fatalf("expected 2 OnChange calls, got %d", triggered.Load()) + } +} diff --git a/pkg/agent/netpol/render.go b/pkg/agent/netpol/render.go new file mode 100644 index 0000000..9775ceb --- /dev/null +++ b/pkg/agent/netpol/render.go @@ -0,0 +1,322 @@ +package netpol + +import ( + "fmt" + "hash/fnv" + "net" + "sort" + "strings" +) + +// Render produces an nftables script that, when applied with `nft -f -`, +// installs the desired NetworkPolicy enforcement state for this node. +// +// Layout: +// +// table inet flock_netpol { +// chain forward { # base chain on hook forward +// type filter hook forward priority filter; policy accept; +// # one jump per (pod, direction) that has rules and/or isolation +// iifname "flock1a2b3c4d" ip6 saddr 2001:db8::1 jump pod__egress +// oifname "flock1a2b3c4d" ip6 daddr 2001:db8::1 jump pod__ingress +// } +// chain pod__ingress { # one per isolated direction +// # explicit allow lines (empty for default-deny) +// drop +// } +// chain pod__egress { ... } +// } +// +// The whole table is replaced atomically: a "delete table … 2>/dev/null" +// (best-effort) followed by an "add table" + the chains. nft executes the +// script as a single transaction; partial application is impossible. +// +// Output is deterministic: equal Output → byte-identical script. The +// reconciler relies on this for de-dup. +func Render(out Output) string { + var sb strings.Builder + + sb.WriteString("# Generated by flock-agent netpol; do not edit by hand.\n") + // Best-effort delete; if the table doesn't exist (first run) nft + // returns an error, hence the redirect. The "add table" then + // recreates everything. + sb.WriteString("destroy table inet flock_netpol\n") + sb.WriteString("table inet flock_netpol {\n") + + // Build per-(pod, direction) chains. We need them defined BEFORE the + // base chain references them, so we render chains first. + chains := buildChains(out) + for _, c := range chains { + writeChain(&sb, c) + } + + // Base chain emits jumps in a stable order (chain name asc). + sb.WriteString("\tchain forward {\n") + sb.WriteString("\t\ttype filter hook forward priority filter; policy accept;\n") + for _, c := range chains { + writeBaseJump(&sb, c) + } + sb.WriteString("\t}\n") + + sb.WriteString("}\n") + return sb.String() +} + +// chain is one rendered chain — one direction of one pod. +type chain struct { + name string // pod__ingress / _egress + hostIface string + podIPs []net.IP + direction Direction + rules []Rule + policy string // "drop" or "accept" +} + +// buildChains groups rules by (PodKey, Direction) and adds default-deny +// chains for isolated directions that received no explicit rules. +func buildChains(out Output) []chain { + type key struct { + podKey string + dir Direction + } + byKey := map[key]*chain{} + + // Seed isolated directions with empty chains so default-deny lands + // even when no explicit allow rule was emitted for them. + for iso := range out.Isolated { + byKey[key{podKey: iso.PodKey, dir: iso.Direction}] = &chain{ + direction: iso.Direction, + policy: "drop", + } + } + + // Append rules into their chain. Rule.PodIPs and HostIface are + // authoritative — every rule for a given pod carries the same values + // (translator invariant), so we copy from the first. + for _, r := range out.Rules { + k := key{podKey: r.PodKey, dir: r.Direction} + c := byKey[k] + if c == nil { + // Rule for a non-isolated direction shouldn't happen in + // practice (translator only emits rules for selected pods) + // but be tolerant — the chain just gets policy accept. + c = &chain{direction: r.Direction, policy: "accept"} + byKey[k] = c + } + c.rules = append(c.rules, r) + if c.hostIface == "" { + c.hostIface = r.HostIface + c.podIPs = append([]net.IP(nil), r.PodIPs...) + } + } + + // If a chain was created from Isolated only (no rules), look up the + // pod's HostIface + IPs from Output.Pods. This is the path a + // default-deny policy takes — no allow rules, only isolation. + for k, c := range byKey { + if c.hostIface != "" { + continue + } + if lp, ok := out.Pods[k.podKey]; ok { + c.hostIface = lp.HostIface + c.podIPs = append([]net.IP(nil), lp.IPs...) + continue + } + // Last resort: lift from any rule sharing the PodKey. Should + // not normally happen — the translator populates Pods for every + // isolated pod — but defends against partially-populated Output + // values constructed by tests. + for _, r := range out.Rules { + if r.PodKey == k.podKey { + c.hostIface = r.HostIface + c.podIPs = append([]net.IP(nil), r.PodIPs...) + break + } + } + } + + // Materialise chain names and emit in deterministic order. + var chains []chain + for k, c := range byKey { + if c.hostIface == "" { + continue // can't jump to it; skip + } + c.name = chainName(k.podKey, c.direction) + chains = append(chains, *c) + } + sort.Slice(chains, func(i, j int) bool { return chains[i].name < chains[j].name }) + return chains +} + +// chainName produces a stable, name-safe chain identifier. Pod keys can +// contain characters nft doesn't allow in identifiers, so we hash them. +// Direction keeps ingress and egress separate. +func chainName(podKey string, dir Direction) string { + h := fnv.New64a() + _, _ = h.Write([]byte(podKey)) + return fmt.Sprintf("pod_%016x_%s", h.Sum64(), dir) +} + +// writeChain emits the chain definition. Empty chains exist deliberately: +// the chain's drop policy IS the default-deny. +func writeChain(sb *strings.Builder, c chain) { + fmt.Fprintf(sb, "\tchain %s {\n", c.name) + for _, r := range c.rules { + writeAllowRule(sb, r) + } + if c.policy == "drop" { + sb.WriteString("\t\tdrop\n") + } + sb.WriteString("\t}\n") +} + +// writeAllowRule emits one accept line: +// +// [ip|ip6 saddr {peers}] [ip|ip6 saddr != {except}] [proto dport {port|port-end}] accept +// +// The saddr / daddr field flips based on direction (ingress = from peer → +// match saddr; egress = to peer → match daddr). +func writeAllowRule(sb *strings.Builder, r Rule) { + v6Peers, v4Peers := splitFamily(r.PeerCIDRs) + v6Except, v4Except := splitFamily(r.PeerExcept) + v6Pod, v4Pod := splitIPFamily(r.PodIPs) + hasPeerFilter := len(r.PeerCIDRs) > 0 + + emit := func(family string, peers, except []*net.IPNet, podIP net.IP) { + if hasPeerFilter && len(peers) == 0 && len(except) == 0 { + // Peer filter exists but no entries of this family — rule + // must not match anything for this family. + return + } + if podIP == nil { + // Pod has no address of this family; nothing to guard. + return + } + for _, port := range r.Ports { + sb.WriteString("\t\t") + // Peer (saddr/daddr) match: address is "peer's address", + // which is saddr on ingress and daddr on egress. + peerField := peerAddrField(family, r.Direction) + if hasPeerFilter && len(peers) > 0 { + fmt.Fprintf(sb, "%s { %s } ", peerField, joinCIDRs(peers)) + } + if hasPeerFilter && len(except) > 0 { + fmt.Fprintf(sb, "%s != { %s } ", peerField, joinCIDRs(except)) + } + // Port match. + writePortMatch(sb, port) + fmt.Fprintf(sb, "%s\n", r.Action) + } + } + emit("ip6", v6Peers, v6Except, v6Pod) + emit("ip", v4Peers, v4Except, v4Pod) +} + +// peerAddrField returns "ip6 saddr" / "ip saddr" / "ip6 daddr" / "ip daddr" +// depending on family + direction. Ingress matches the peer as the source; +// egress matches the peer as the destination. +func peerAddrField(family string, dir Direction) string { + switch { + case dir == DirIngress: + return family + " saddr" + default: + return family + " daddr" + } +} + +// writePortMatch appends "tcp dport 80 " (single port) or +// "tcp dport 8000-8999 " (range), or nothing when port is "any". +func writePortMatch(sb *strings.Builder, p PortMatch) { + if p.Port == 0 && p.Protocol == "" { + return + } + proto := p.Protocol + if proto == "" { + proto = "tcp" + } + if p.Port == 0 { + // Protocol-only match. nft has `meta l4proto tcp`. + fmt.Fprintf(sb, "meta l4proto %s ", proto) + return + } + if p.EndPort > p.Port { + fmt.Fprintf(sb, "%s dport %d-%d ", proto, p.Port, p.EndPort) + return + } + fmt.Fprintf(sb, "%s dport %d ", proto, p.Port) +} + +// writeBaseJump emits one line per (pod, direction) chain in the base +// `forward` chain. The match is anchored on the host-side veth name so +// the rule only fires for traffic that genuinely crosses this pod's veth. +// +// We additionally constrain on the pod's address (saddr for egress, daddr +// for ingress) so a packet that somehow hits the wrong veth — e.g. during +// a CNI ADD race — won't be policy-evaluated against the wrong pod. +func writeBaseJump(sb *strings.Builder, c chain) { + v6, v4 := splitIPFamily(c.podIPs) + emit := func(family string, ip net.IP) { + if ip == nil { + return + } + var iface, addrField, addrStr string + if c.direction == DirEgress { + iface = "iifname" + addrField = family + " saddr" + } else { + iface = "oifname" + addrField = family + " daddr" + } + if family == "ip" { + addrStr = ip.To4().String() + } else { + addrStr = ip.To16().String() + } + fmt.Fprintf(sb, "\t\t%s \"%s\" %s %s jump %s\n", iface, c.hostIface, addrField, addrStr, c.name) + } + emit("ip6", v6) + emit("ip", v4) +} + +// splitFamily partitions CIDRs into (v6, v4) lists, preserving order +// within each family. +func splitFamily(cs []*net.IPNet) ([]*net.IPNet, []*net.IPNet) { + var v6, v4 []*net.IPNet + for _, c := range cs { + if c.IP.To4() != nil { + v4 = append(v4, c) + } else { + v6 = append(v6, c) + } + } + return v6, v4 +} + +// splitIPFamily picks one v6 and one v4 from a list of pod IPs (a pod has +// at most one of each in flock's model). +func splitIPFamily(ips []net.IP) (v6, v4 net.IP) { + for _, ip := range ips { + if ip == nil { + continue + } + if ip.To4() != nil { + if v4 == nil { + v4 = ip + } + } else { + if v6 == nil { + v6 = ip + } + } + } + return +} + +func joinCIDRs(cs []*net.IPNet) string { + parts := make([]string, len(cs)) + for i, c := range cs { + parts[i] = c.String() + } + sort.Strings(parts) + return strings.Join(parts, ", ") +} diff --git a/pkg/agent/netpol/render_test.go b/pkg/agent/netpol/render_test.go new file mode 100644 index 0000000..e013197 --- /dev/null +++ b/pkg/agent/netpol/render_test.go @@ -0,0 +1,219 @@ +package netpol + +import ( + "net" + "strings" + "testing" +) + +// TestRender_DefaultDeny — an isolated direction with no rules renders +// to a chain whose last action is "drop". +func TestRender_DefaultDeny(t *testing.T) { + out := Output{ + Isolated: map[Isolation]struct{}{ + {PodKey: "ns/web", Direction: DirIngress}: {}, + }, + Rules: []Rule{ + // Need at least one rule to give the chain its HostIface + + // PodIPs. Use an empty rule that selects the same chain. + {PodKey: "ns/web", HostIface: "flock00000001", PodIPs: []net.IP{mustIP("2001:db8::1")}, + Direction: DirIngress, Action: ActionAccept, + Ports: []PortMatch{{}}}, + }, + } + got := Render(out) + if !strings.Contains(got, "table inet flock_netpol") { + t.Fatalf("missing table:\n%s", got) + } + if !strings.Contains(got, "type filter hook forward") { + t.Fatalf("missing base chain:\n%s", got) + } + if !strings.Contains(got, "drop") { + t.Fatalf("expected default-deny drop in chain:\n%s", got) + } + // Pod chain name must be deterministic-looking (pod__ingress). + if !strings.Contains(got, "_ingress {") { + t.Fatalf("missing pod ingress chain:\n%s", got) + } + // Base chain jump anchored on veth + pod IP. + if !strings.Contains(got, `oifname "flock00000001"`) { + t.Fatalf("missing veth match in base chain:\n%s", got) + } + if !strings.Contains(got, "ip6 daddr 2001:db8::1") { + t.Fatalf("missing pod IP match in base chain:\n%s", got) + } +} + +// TestRender_DualStack — pod with both v6 + v4 IPs gets two base-chain +// jumps. +func TestRender_DualStack(t *testing.T) { + out := Output{ + Isolated: map[Isolation]struct{}{ + {PodKey: "ns/web", Direction: DirIngress}: {}, + }, + Rules: []Rule{{ + PodKey: "ns/web", HostIface: "f1", + PodIPs: []net.IP{mustIP("2001:db8::1"), mustIP("10.0.0.1")}, + Direction: DirIngress, Action: ActionAccept, + Ports: []PortMatch{{Protocol: "tcp", Port: 80}}, + }}, + } + got := Render(out) + if !strings.Contains(got, "ip6 daddr 2001:db8::1") { + t.Fatalf("missing v6 jump:\n%s", got) + } + if !strings.Contains(got, "ip daddr 10.0.0.1") { + t.Fatalf("missing v4 jump:\n%s", got) + } +} + +// TestRender_PortAndPeer — a Rule with peer + port emits a syntactically +// well-formed allow line. +func TestRender_PortAndPeer(t *testing.T) { + out := Output{ + Isolated: map[Isolation]struct{}{ + {PodKey: "ns/web", Direction: DirIngress}: {}, + }, + Rules: []Rule{{ + PodKey: "ns/web", HostIface: "f1", + PodIPs: []net.IP{mustIP("2001:db8::1")}, + Direction: DirIngress, Action: ActionAccept, + PeerCIDRs: []*net.IPNet{mustNet("2001:db8::a/128")}, + Ports: []PortMatch{{Protocol: "tcp", Port: 80}}, + }}, + } + got := Render(out) + if !strings.Contains(got, "ip6 saddr { 2001:db8::a/128 } tcp dport 80 accept") { + t.Fatalf("expected ingress allow with v6 peer + tcp/80:\n%s", got) + } +} + +// TestRender_PortRange — endPort renders as "8000-8999". +func TestRender_PortRange(t *testing.T) { + out := Output{ + Isolated: map[Isolation]struct{}{ + {PodKey: "ns/web", Direction: DirIngress}: {}, + }, + Rules: []Rule{{ + PodKey: "ns/web", HostIface: "f1", + PodIPs: []net.IP{mustIP("2001:db8::1")}, + Direction: DirIngress, Action: ActionAccept, + PeerCIDRs: []*net.IPNet{mustNet("0.0.0.0/0"), mustNet("::/0")}, + Ports: []PortMatch{{Protocol: "tcp", Port: 8000, EndPort: 8999}}, + }}, + } + got := Render(out) + if !strings.Contains(got, "tcp dport 8000-8999") { + t.Fatalf("expected port range:\n%s", got) + } +} + +// TestRender_IPBlockExcept — except produces a "saddr != { … }" guard. +func TestRender_IPBlockExcept(t *testing.T) { + out := Output{ + Isolated: map[Isolation]struct{}{ + {PodKey: "ns/web", Direction: DirIngress}: {}, + }, + Rules: []Rule{{ + PodKey: "ns/web", HostIface: "f1", + PodIPs: []net.IP{mustIP("10.0.0.1")}, + Direction: DirIngress, Action: ActionAccept, + PeerCIDRs: []*net.IPNet{mustNet("10.0.0.0/8")}, + PeerExcept: []*net.IPNet{mustNet("10.99.0.0/16")}, + Ports: []PortMatch{{}}, + }}, + } + got := Render(out) + if !strings.Contains(got, "ip saddr { 10.0.0.0/8 }") { + t.Fatalf("expected ipBlock cidr:\n%s", got) + } + if !strings.Contains(got, "ip saddr != { 10.99.0.0/16 }") { + t.Fatalf("expected ipBlock except:\n%s", got) + } +} + +// TestRender_AllowAllPeers — empty PeerCIDRs/PeerExcept means "any peer"; +// the rule should emit an unconditional accept (modulo port). +func TestRender_AllowAllPeers(t *testing.T) { + out := Output{ + Isolated: map[Isolation]struct{}{ + {PodKey: "ns/web", Direction: DirIngress}: {}, + }, + Rules: []Rule{{ + PodKey: "ns/web", HostIface: "f1", + PodIPs: []net.IP{mustIP("2001:db8::1")}, + Direction: DirIngress, Action: ActionAccept, + Ports: []PortMatch{{Protocol: "tcp", Port: 443}}, + }}, + } + got := Render(out) + if !strings.Contains(got, "tcp dport 443 accept") { + t.Fatalf("expected unconditional tcp/443 allow:\n%s", got) + } + // Should NOT have a saddr/daddr filter (empty peers). + if strings.Contains(got, "ip6 saddr {") || strings.Contains(got, "ip saddr {") { + t.Fatalf("expected no peer filter:\n%s", got) + } +} + +// TestRender_Determinism — same input → byte-identical output. +func TestRender_Determinism(t *testing.T) { + out := Output{ + Isolated: map[Isolation]struct{}{ + {PodKey: "ns/web", Direction: DirIngress}: {}, + {PodKey: "ns/db", Direction: DirEgress}: {}, + }, + Rules: []Rule{ + {PodKey: "ns/web", HostIface: "f1", PodIPs: []net.IP{mustIP("2001:db8::1")}, + Direction: DirIngress, Action: ActionAccept, + PeerCIDRs: []*net.IPNet{mustNet("2001:db8::5/128"), mustNet("2001:db8::3/128")}, + Ports: []PortMatch{{Protocol: "tcp", Port: 80}}}, + {PodKey: "ns/db", HostIface: "f2", PodIPs: []net.IP{mustIP("2001:db8::2")}, + Direction: DirEgress, Action: ActionAccept, + PeerCIDRs: []*net.IPNet{mustNet("2001:db8::aa/128")}, + Ports: []PortMatch{{}}}, + }, + } + a := Render(out) + b := Render(out) + if a != b { + t.Fatalf("Render not deterministic:\nA=\n%s\nB=\n%s", a, b) + } + // And peers in the rule must be sorted (we deliberately gave 5 then 3). + if strings.Index(a, "2001:db8::3/128") > strings.Index(a, "2001:db8::5/128") { + t.Fatalf("peer CIDRs not sorted within rule:\n%s", a) + } +} + +// TestRender_EgressDirection — egress rules use iifname + saddr (pod-side). +func TestRender_EgressDirection(t *testing.T) { + out := Output{ + Isolated: map[Isolation]struct{}{ + {PodKey: "ns/web", Direction: DirEgress}: {}, + }, + Rules: []Rule{{ + PodKey: "ns/web", HostIface: "f1", + PodIPs: []net.IP{mustIP("2001:db8::1")}, + Direction: DirEgress, Action: ActionAccept, + PeerCIDRs: []*net.IPNet{mustNet("2001:db8::aa/128")}, + Ports: []PortMatch{{Protocol: "tcp", Port: 53}}, + }}, + } + got := Render(out) + // Base-chain jump for egress matches iifname + ip6 saddr (pod's IP). + if !strings.Contains(got, `iifname "f1" ip6 saddr 2001:db8::1`) { + t.Fatalf("missing egress base-chain jump:\n%s", got) + } + // Peer filter for egress matches the *destination* (the peer is downstream). + if !strings.Contains(got, "ip6 daddr { 2001:db8::aa/128 }") { + t.Fatalf("expected daddr peer filter for egress:\n%s", got) + } +} + +func mustNet(s string) *net.IPNet { + _, n, err := net.ParseCIDR(s) + if err != nil { + panic(err) + } + return n +} diff --git a/pkg/agent/netpol/translator.go b/pkg/agent/netpol/translator.go new file mode 100644 index 0000000..49aac96 --- /dev/null +++ b/pkg/agent/netpol/translator.go @@ -0,0 +1,443 @@ +package netpol + +import ( + "fmt" + "net" + "sort" + + netv1 "k8s.io/api/networking/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" +) + +// Inputs is the world-view the translator consumes. All fields are owned +// by the caller; the translator does not mutate them. +type Inputs struct { + // LocalPods are the pods scheduled on this node that have a committed + // flock allocation. Only these pods get rules — peers may live + // elsewhere. + LocalPods []Pod + + // PeerPods is the cluster-wide pod set used to resolve podSelector + + // namespaceSelector peers. It is fine to include the local pods here + // too; duplicates are deduped by (namespace, name). + PeerPods []PeerPod + + // Namespaces is the cluster's full Namespace set. Used for + // namespaceSelector matching. + Namespaces []Namespace + + // Policies is every NetworkPolicy in the cluster. The translator + // filters down to those that select at least one local pod. + Policies []netv1.NetworkPolicy +} + +// Output is the result of one translation pass. +type Output struct { + // Rules is the flat ordered list of allow rules to render. The + // renderer groups them by (PodKey, Direction) into chains. + Rules []Rule + + // Isolated is the set of (PodKey, Direction) pairs whose chain must + // have a default-deny policy. A pod selected by at least one policy + // in a given direction shows up here. The renderer uses this to + // decide whether to emit a chain at all and what its base policy is. + Isolated map[Isolation]struct{} + + // Pods carries the HostIface + IPs for every local pod referenced + // by the policy world, including pods that produced only isolation + // (default-deny) without any allow rules. The renderer needs this + // because such a pod has no Rule to lift the HostIface from. + Pods map[string]LocalPod // key = namespace/name +} + +// Isolation is the (PodKey, Direction) key of the Isolated map. +type Isolation struct { + PodKey string + Direction Direction +} + +// Translate runs the translation pass. It is a pure function: same Inputs +// always produces semantically equal Output. (Order of slices is stable +// but Rules within a chain follow the order in which selecting policies +// appear, which is itself sorted; see canonicalisePolicies.) +// +// Errors are returned only for unrecoverable malformed input; per-rule +// translation errors are logged via warn and skipped so that a single +// broken policy can't take down enforcement for a whole node. The optional +// warn callback is invoked for each skipped sub-rule with a human-readable +// message. Pass nil to silently drop. +func Translate(in Inputs, warn func(string)) (Output, error) { + if warn == nil { + warn = func(string) {} + } + + out := Output{ + Isolated: map[Isolation]struct{}{}, + Pods: map[string]LocalPod{}, + } + policies := canonicalisePolicies(in.Policies) + nsByName := indexNamespaces(in.Namespaces) + peerPodsByNS := indexPeerPods(in.PeerPods) + + for _, pod := range in.LocalPods { + if len(pod.IPs) == 0 { + continue // no allocation yet; translator skips + } + key := pod.Namespace + "/" + pod.Name + + // Find every policy in pod.Namespace whose podSelector matches. + // Cross-namespace policies do not select pods outside their own + // namespace; that's how the NetworkPolicy spec defines it. + for _, p := range policies { + if p.Namespace != pod.Namespace { + continue + } + sel, err := metav1.LabelSelectorAsSelector(&p.Spec.PodSelector) + if err != nil { + warn(fmt.Sprintf("policy %s/%s: invalid podSelector: %v", p.Namespace, p.Name, err)) + continue + } + if !sel.Matches(labels.Set(pod.Labels)) { + continue + } + + ingress, egress := policyDirections(&p) + if ingress || egress { + out.Pods[key] = LocalPod{ + PodKey: key, + HostIface: pod.HostIface, + IPs: append([]net.IP(nil), pod.IPs...), + } + } + if ingress { + out.Isolated[Isolation{PodKey: key, Direction: DirIngress}] = struct{}{} + } + if egress { + out.Isolated[Isolation{PodKey: key, Direction: DirEgress}] = struct{}{} + } + + // Translate ingress rules. + if ingress { + for ri, r := range p.Spec.Ingress { + rules, err := buildIngressRules(pod, r, p.Namespace, nsByName, peerPodsByNS) + if err != nil { + warn(fmt.Sprintf("policy %s/%s ingress[%d]: %v", p.Namespace, p.Name, ri, err)) + continue + } + out.Rules = append(out.Rules, rules...) + } + } + // Translate egress rules. + if egress { + for ri, r := range p.Spec.Egress { + rules, err := buildEgressRules(pod, r, p.Namespace, nsByName, peerPodsByNS) + if err != nil { + warn(fmt.Sprintf("policy %s/%s egress[%d]: %v", p.Namespace, p.Name, ri, err)) + continue + } + out.Rules = append(out.Rules, rules...) + } + } + } + } + return out, nil +} + +// policyDirections reports which directions a NetworkPolicy isolates. +// +// Per the spec, the PolicyTypes field is the source of truth when set; +// when omitted, isolation is inferred from which rule lists are populated +// (Ingress always; Egress only if Spec.Egress is non-empty). +func policyDirections(p *netv1.NetworkPolicy) (ingress, egress bool) { + if len(p.Spec.PolicyTypes) > 0 { + for _, t := range p.Spec.PolicyTypes { + switch t { + case netv1.PolicyTypeIngress: + ingress = true + case netv1.PolicyTypeEgress: + egress = true + } + } + return + } + ingress = true + egress = len(p.Spec.Egress) > 0 + return +} + +// buildIngressRules expands one NetworkPolicyIngressRule into Rule(s). +// One Rule per allowed peer-set; each Rule carries the full Ports filter +// from the source rule. +func buildIngressRules( + pod Pod, + r netv1.NetworkPolicyIngressRule, + policyNS string, + nsByName map[string]Namespace, + peerPodsByNS map[string][]PeerPod, +) ([]Rule, error) { + ports, err := translatePorts(r.Ports) + if err != nil { + return nil, err + } + peers, err := translatePeers(r.From, policyNS, nsByName, peerPodsByNS) + if err != nil { + return nil, err + } + return assembleRules(pod, DirIngress, peers, ports), nil +} + +// buildEgressRules is the egress mirror of buildIngressRules. +func buildEgressRules( + pod Pod, + r netv1.NetworkPolicyEgressRule, + policyNS string, + nsByName map[string]Namespace, + peerPodsByNS map[string][]PeerPod, +) ([]Rule, error) { + ports, err := translatePorts(r.Ports) + if err != nil { + return nil, err + } + peers, err := translatePeers(r.To, policyNS, nsByName, peerPodsByNS) + if err != nil { + return nil, err + } + return assembleRules(pod, DirEgress, peers, ports), nil +} + +// peerSet is the resolved peer information for one rule's From / To list. +type peerSet struct { + // allowAll is true when the rule has no peers at all (an empty From / + // To list, which the spec defines as "from anywhere"). It overrides + // CIDRs and Except. + allowAll bool + // CIDRs is the union of every IP / CIDR contributed by the rule's + // peer entries (resolved Pod IPs, namespace pods, and ipBlock.cidr). + CIDRs []*net.IPNet + // Except is the union of every ipBlock.except entry across the rule. + Except []*net.IPNet +} + +// translatePeers resolves a list of NetworkPolicyPeer entries into a +// peerSet. Each peer entry contributes either CIDRs (resolved from +// pod / namespace selectors, or copied from ipBlock) or Except entries. +func translatePeers( + peers []netv1.NetworkPolicyPeer, + policyNS string, + nsByName map[string]Namespace, + peerPodsByNS map[string][]PeerPod, +) (peerSet, error) { + if len(peers) == 0 { + return peerSet{allowAll: true}, nil + } + out := peerSet{} + for i, p := range peers { + switch { + case p.IPBlock != nil: + _, cidr, err := net.ParseCIDR(p.IPBlock.CIDR) + if err != nil { + return peerSet{}, fmt.Errorf("peer[%d] ipBlock.cidr %q: %w", i, p.IPBlock.CIDR, err) + } + out.CIDRs = append(out.CIDRs, cidr) + for j, ex := range p.IPBlock.Except { + _, exNet, err := net.ParseCIDR(ex) + if err != nil { + return peerSet{}, fmt.Errorf("peer[%d] ipBlock.except[%d] %q: %w", i, j, ex, err) + } + out.Except = append(out.Except, exNet) + } + case p.PodSelector != nil || p.NamespaceSelector != nil: + ips, err := resolvePodNamespacePeer(p, policyNS, nsByName, peerPodsByNS) + if err != nil { + return peerSet{}, fmt.Errorf("peer[%d]: %w", i, err) + } + out.CIDRs = append(out.CIDRs, ips...) + default: + return peerSet{}, fmt.Errorf("peer[%d] is empty (must set ipBlock, podSelector, or namespaceSelector)", i) + } + } + return out, nil +} + +// resolvePodNamespacePeer walks the cluster's peer-pod set and returns +// /128 (v6) and /32 (v4) CIDRs for each pod that matches the (possibly +// combined) pod + namespace selectors. +// +// Selector semantics from the NetworkPolicy spec: +// +// - podSelector + namespaceSelector both nil → handled upstream. +// - podSelector set, namespaceSelector nil → match in the policy's +// own namespace. +// - podSelector nil, namespaceSelector set → match every pod in +// namespaces that match the namespaceSelector. +// - both set → AND: pod must be in a matching namespace AND match +// the podSelector. +// +// An empty (non-nil) selector matches everything in scope. +func resolvePodNamespacePeer( + p netv1.NetworkPolicyPeer, + policyNS string, + nsByName map[string]Namespace, + peerPodsByNS map[string][]PeerPod, +) ([]*net.IPNet, error) { + var podSel, nsSel labels.Selector + if p.PodSelector != nil { + s, err := metav1.LabelSelectorAsSelector(p.PodSelector) + if err != nil { + return nil, fmt.Errorf("podSelector: %w", err) + } + podSel = s + } + if p.NamespaceSelector != nil { + s, err := metav1.LabelSelectorAsSelector(p.NamespaceSelector) + if err != nil { + return nil, fmt.Errorf("namespaceSelector: %w", err) + } + nsSel = s + } + + // Decide which namespaces are in scope. + var inScope []string + if nsSel == nil { + // Pod-only selector → just the policy's own namespace. + inScope = []string{policyNS} + } else { + for name, ns := range nsByName { + if nsSel.Matches(labels.Set(ns.Labels)) { + inScope = append(inScope, name) + } + } + } + + var out []*net.IPNet + for _, ns := range inScope { + for _, pp := range peerPodsByNS[ns] { + if podSel != nil && !podSel.Matches(labels.Set(pp.Labels)) { + continue + } + for _, ip := range pp.IPs { + out = append(out, ipToHostCIDR(ip)) + } + } + } + return out, nil +} + +// translatePorts converts NetworkPolicyPort entries into PortMatch. +// +// A nil/empty Ports list on a NetworkPolicy rule means "all ports" by +// spec; we represent that as a single zero-valued PortMatch (any proto, +// any port) so the renderer can emit a single rule rather than a chain +// of port-equality matches. +func translatePorts(ports []netv1.NetworkPolicyPort) ([]PortMatch, error) { + if len(ports) == 0 { + return []PortMatch{{}}, nil + } + var out []PortMatch + for i, p := range ports { + var protoStr string + if p.Protocol != nil { + switch *p.Protocol { + case "TCP": + protoStr = "tcp" + case "UDP": + protoStr = "udp" + case "SCTP": + protoStr = "sctp" + default: + return nil, fmt.Errorf("port[%d]: protocol %q not supported", i, *p.Protocol) + } + } else { + // Spec default: TCP. We use empty string to mean "any of + // the three" only when the user explicitly sets neither + // protocol nor port; here the user has supplied a Port, + // which implies a protocol — and the spec default is TCP. + protoStr = "tcp" + } + var port, endPort int + if p.Port != nil { + if p.Port.Type != 0 { // intstr.Int = 0; intstr.String = 1 + return nil, fmt.Errorf("port[%d]: named ports are not yet supported", i) + } + port = int(p.Port.IntVal) + } + if p.EndPort != nil { + endPort = int(*p.EndPort) + if endPort < port { + return nil, fmt.Errorf("port[%d]: endPort %d < port %d", i, endPort, port) + } + } + out = append(out, PortMatch{Protocol: protoStr, Port: port, EndPort: endPort}) + } + return out, nil +} + +// assembleRules emits the cross-product of (one peer-set) × (port list). +// We currently emit a single Rule per direction since the peer-set is the +// expensive shared field; ports go inline. allowAll peers result in a +// rule with no PeerCIDRs, which the renderer treats as "any source". +func assembleRules(pod Pod, dir Direction, peers peerSet, ports []PortMatch) []Rule { + if !peers.allowAll && len(peers.CIDRs) == 0 { + // Selector matched no peers (e.g. podSelector for a label that + // no live pod has). Emit nothing — the rule cannot allow any + // real traffic. The pod stays in default-deny for this rule. + return nil + } + r := Rule{ + PodKey: pod.Namespace + "/" + pod.Name, + HostIface: pod.HostIface, + PodIPs: append([]net.IP(nil), pod.IPs...), + Direction: dir, + Action: ActionAccept, + Ports: append([]PortMatch(nil), ports...), + } + if !peers.allowAll { + r.PeerCIDRs = append([]*net.IPNet(nil), peers.CIDRs...) + r.PeerExcept = append([]*net.IPNet(nil), peers.Except...) + } + return []Rule{r} +} + +// canonicalisePolicies sorts the policy slice by (namespace, name) so the +// translator's output is deterministic regardless of informer event order. +func canonicalisePolicies(p []netv1.NetworkPolicy) []netv1.NetworkPolicy { + out := append([]netv1.NetworkPolicy(nil), p...) + sort.Slice(out, func(i, j int) bool { + if out[i].Namespace != out[j].Namespace { + return out[i].Namespace < out[j].Namespace + } + return out[i].Name < out[j].Name + }) + return out +} + +func indexNamespaces(nss []Namespace) map[string]Namespace { + out := make(map[string]Namespace, len(nss)) + for _, ns := range nss { + out[ns.Name] = ns + } + return out +} + +func indexPeerPods(pods []PeerPod) map[string][]PeerPod { + out := map[string][]PeerPod{} + for _, p := range pods { + out[p.Namespace] = append(out[p.Namespace], p) + } + // Sort each namespace's pod list by (name) so the translator's IP + // ordering is stable. + for k := range out { + sort.Slice(out[k], func(i, j int) bool { return out[k][i].Name < out[k][j].Name }) + } + return out +} + +// ipToHostCIDR returns ip/32 (v4) or ip/128 (v6) — the smallest CIDR +// covering exactly that one address. +func ipToHostCIDR(ip net.IP) *net.IPNet { + if v4 := ip.To4(); v4 != nil { + return &net.IPNet{IP: v4, Mask: net.CIDRMask(32, 32)} + } + return &net.IPNet{IP: ip.To16(), Mask: net.CIDRMask(128, 128)} +} diff --git a/pkg/agent/netpol/translator_fuzz_test.go b/pkg/agent/netpol/translator_fuzz_test.go new file mode 100644 index 0000000..3e3b731 --- /dev/null +++ b/pkg/agent/netpol/translator_fuzz_test.go @@ -0,0 +1,147 @@ +package netpol + +import ( + "net" + "strings" + "testing" + + corev1 "k8s.io/api/core/v1" + netv1 "k8s.io/api/networking/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" +) + +// FuzzTranslate_AndRender stitches the Translator and Renderer together +// against synthetic NetworkPolicies built from fuzzed bytes. We are not +// trying to produce *valid* policies — the goal is to confirm that: +// +// 1. Neither stage panics on weird input. +// 2. Render output is balanced (every "{" has a matching "}"). +// 3. Rendering twice is byte-stable. +// 4. The Pods set in Output is consistent with Isolated (every isolated +// PodKey has a matching entry in Pods). +// +// The translator's warn callback is captured to ensure it never panics +// with unexpected message types either. +func FuzzTranslate_AndRender(f *testing.F) { + type seed struct { + policyNS, policyName string + podSelectorKey, podSelValue string + peerSelectorKey, peerSelV string + peerNS, peerName, peerIP string + port uint16 + ipBlockCIDR, ipBlockExcept string + } + for _, s := range []seed{ + {policyNS: "ns1", policyName: "p1", podSelectorKey: "app", podSelValue: "web", port: 80}, + {policyNS: "ns1", policyName: "p1", peerSelectorKey: "app", peerSelV: "client", peerNS: "ns1", peerName: "c1", peerIP: "2001:db8::aa", port: 443}, + {policyNS: "ns1", policyName: "p1", ipBlockCIDR: "10.0.0.0/8", ipBlockExcept: "10.99.0.0/16", port: 0}, + {policyNS: "", policyName: ""}, // pathological + {policyNS: "ns1", policyName: "p1", podSelectorKey: "app\x00", podSelValue: "web\nnewline"}, + {policyNS: "ns1", policyName: "p1", port: 65535}, + {policyNS: "ns1", policyName: "p1", port: 1}, + } { + f.Add(s.policyNS, s.policyName, s.podSelectorKey, s.podSelValue, + s.peerSelectorKey, s.peerSelV, s.peerNS, s.peerName, s.peerIP, + s.port, s.ipBlockCIDR, s.ipBlockExcept) + } + + f.Fuzz(func(t *testing.T, + policyNS, policyName, + podSelectorKey, podSelValue, + peerSelectorKey, peerSelV, + peerNS, peerName, peerIP string, + port uint16, + ipBlockCIDR, ipBlockExcept string, + ) { + // Build a synthetic policy. + policy := netv1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{Namespace: policyNS, Name: policyName}, + Spec: netv1.NetworkPolicySpec{ + PolicyTypes: []netv1.PolicyType{netv1.PolicyTypeIngress}, + }, + } + if podSelectorKey != "" { + policy.Spec.PodSelector = metav1.LabelSelector{ + MatchLabels: map[string]string{podSelectorKey: podSelValue}, + } + } else { + policy.Spec.PodSelector = metav1.LabelSelector{} + } + ingress := netv1.NetworkPolicyIngressRule{} + if peerSelectorKey != "" { + ingress.From = append(ingress.From, netv1.NetworkPolicyPeer{ + PodSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{peerSelectorKey: peerSelV}, + }, + }) + } + if ipBlockCIDR != "" { + peer := netv1.NetworkPolicyPeer{ + IPBlock: &netv1.IPBlock{CIDR: ipBlockCIDR}, + } + if ipBlockExcept != "" { + peer.IPBlock.Except = []string{ipBlockExcept} + } + ingress.From = append(ingress.From, peer) + } + if port != 0 { + tcp := corev1.ProtocolTCP + p := intstr.FromInt32(int32(port)) + ingress.Ports = append(ingress.Ports, netv1.NetworkPolicyPort{ + Protocol: &tcp, Port: &p, + }) + } + policy.Spec.Ingress = append(policy.Spec.Ingress, ingress) + + // Local pod, possibly matching the policy. + pod := Pod{ + Namespace: "ns1", Name: "web", + Labels: map[string]string{podSelectorKey: podSelValue, "app": "web"}, + HostIface: "flock00000001", + IPs: []net.IP{mustIP("2001:db8::1")}, + } + // Peer pod, possibly matching the peer selector. + var peers []PeerPod + if peerName != "" { + peerIPParsed := net.ParseIP(peerIP) + if peerIPParsed != nil { + peers = append(peers, PeerPod{ + Namespace: peerNS, Name: peerName, + Labels: map[string]string{peerSelectorKey: peerSelV}, + IPs: []net.IP{peerIPParsed}, + }) + } + } + + out, err := Translate(Inputs{ + LocalPods: []Pod{pod}, + PeerPods: peers, + Namespaces: []Namespace{ + {Name: "ns1", Labels: map[string]string{"kubernetes.io/metadata.name": "ns1"}}, + }, + Policies: []netv1.NetworkPolicy{policy}, + }, func(string) {}) + if err != nil { + return // any error is acceptable + } + + // Property: every isolated PodKey appears in Output.Pods. + for iso := range out.Isolated { + if _, ok := out.Pods[iso.PodKey]; !ok { + t.Fatalf("isolated %s has no Pods entry", iso.PodKey) + } + } + + script := Render(out) + // Property: balanced braces. + if got := strings.Count(script, "{") - strings.Count(script, "}"); got != 0 { + t.Fatalf("unbalanced braces (%d):\n%s", got, script) + } + // Property: deterministic (run again, compare). + script2 := Render(out) + if script != script2 { + t.Fatalf("Render not deterministic") + } + }) +} diff --git a/pkg/agent/netpol/translator_test.go b/pkg/agent/netpol/translator_test.go new file mode 100644 index 0000000..33930c2 --- /dev/null +++ b/pkg/agent/netpol/translator_test.go @@ -0,0 +1,452 @@ +package netpol + +import ( + "net" + "testing" + + corev1 "k8s.io/api/core/v1" + netv1 "k8s.io/api/networking/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" +) + +func mustIP(s string) net.IP { + ip := net.ParseIP(s) + if ip == nil { + panic("bad IP: " + s) + } + return ip +} + +func newPolicy(ns, name string, mods ...func(*netv1.NetworkPolicy)) netv1.NetworkPolicy { + p := netv1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{Namespace: ns, Name: name}, + Spec: netv1.NetworkPolicySpec{}, + } + for _, m := range mods { + m(&p) + } + return p +} + +func tcpPort(port int) netv1.NetworkPolicyPort { + proto := corev1.ProtocolTCP + p := intstr.FromInt32(int32(port)) + return netv1.NetworkPolicyPort{Protocol: &proto, Port: &p} +} + +// Pod-only selector that matches everything (`{}`). +func emptySelector() *metav1.LabelSelector { + return &metav1.LabelSelector{} +} + +func selectorMatching(kv map[string]string) *metav1.LabelSelector { + return &metav1.LabelSelector{MatchLabels: kv} +} + +// Helper: collect Isolated keys for the given pod into a string list. +func isolationFor(out Output, podKey string) (in, eg bool) { + if _, ok := out.Isolated[Isolation{PodKey: podKey, Direction: DirIngress}]; ok { + in = true + } + if _, ok := out.Isolated[Isolation{PodKey: podKey, Direction: DirEgress}]; ok { + eg = true + } + return +} + +// TestTranslate_NoPolicies — pod with no matching policy is unrestricted. +func TestTranslate_NoPolicies(t *testing.T) { + pod := Pod{ + Namespace: "ns1", Name: "p1", + Labels: map[string]string{"app": "web"}, + HostIface: "flock00000001", + IPs: []net.IP{mustIP("2001:db8::1")}, + } + out, err := Translate(Inputs{LocalPods: []Pod{pod}}, nil) + if err != nil { + t.Fatal(err) + } + if len(out.Rules) != 0 { + t.Fatalf("expected no rules, got %d", len(out.Rules)) + } + in, eg := isolationFor(out, "ns1/p1") + if in || eg { + t.Fatalf("pod should not be isolated: in=%v eg=%v", in, eg) + } +} + +// TestTranslate_DefaultDeny — a policy with empty Ingress + PolicyTypes +// = [Ingress] selects the pod and isolates it; no allow rules emitted. +func TestTranslate_DefaultDenyIngress(t *testing.T) { + pod := Pod{ + Namespace: "ns1", Name: "web", + Labels: map[string]string{"app": "web"}, + HostIface: "flock00000001", + IPs: []net.IP{mustIP("2001:db8::1")}, + } + policy := newPolicy("ns1", "default-deny", func(p *netv1.NetworkPolicy) { + p.Spec.PodSelector = *emptySelector() + p.Spec.PolicyTypes = []netv1.PolicyType{netv1.PolicyTypeIngress} + }) + out, err := Translate(Inputs{ + LocalPods: []Pod{pod}, + Policies: []netv1.NetworkPolicy{policy}, + }, nil) + if err != nil { + t.Fatal(err) + } + if len(out.Rules) != 0 { + t.Fatalf("expected no rules from a deny-all, got %d", len(out.Rules)) + } + in, eg := isolationFor(out, "ns1/web") + if !in { + t.Fatalf("ingress should be isolated") + } + if eg { + t.Fatalf("egress should NOT be isolated (policy only set ingress)") + } +} + +// TestTranslate_DefaultDenyEgress_InferredFromEgressList — when +// PolicyTypes is omitted but Spec.Egress is non-empty, egress should +// also be isolated by inference. +func TestTranslate_DefaultDenyEgress_InferredFromEgressList(t *testing.T) { + pod := Pod{ + Namespace: "ns1", Name: "web", + Labels: map[string]string{"app": "web"}, + HostIface: "f1", IPs: []net.IP{mustIP("2001:db8::1")}, + } + policy := newPolicy("ns1", "egress-rule", func(p *netv1.NetworkPolicy) { + p.Spec.PodSelector = *emptySelector() + p.Spec.Egress = []netv1.NetworkPolicyEgressRule{{}} + }) + out, _ := Translate(Inputs{LocalPods: []Pod{pod}, Policies: []netv1.NetworkPolicy{policy}}, nil) + in, eg := isolationFor(out, "ns1/web") + if !in || !eg { + t.Fatalf("both directions should be isolated: in=%v eg=%v", in, eg) + } +} + +// TestTranslate_PodSelectorPeer_SameNamespace — peer is a single pod in +// the same namespace, identified by label. +func TestTranslate_PodSelectorPeer(t *testing.T) { + web := Pod{ + Namespace: "ns1", Name: "web", + Labels: map[string]string{"app": "web"}, + HostIface: "f1", IPs: []net.IP{mustIP("2001:db8::1")}, + } + clientIP := mustIP("2001:db8::2") + peer := PeerPod{ + Namespace: "ns1", Name: "client", + Labels: map[string]string{"app": "client"}, + IPs: []net.IP{clientIP}, + } + policy := newPolicy("ns1", "allow-from-client", func(p *netv1.NetworkPolicy) { + p.Spec.PodSelector = *selectorMatching(map[string]string{"app": "web"}) + p.Spec.PolicyTypes = []netv1.PolicyType{netv1.PolicyTypeIngress} + p.Spec.Ingress = []netv1.NetworkPolicyIngressRule{{ + From: []netv1.NetworkPolicyPeer{{ + PodSelector: selectorMatching(map[string]string{"app": "client"}), + }}, + Ports: []netv1.NetworkPolicyPort{tcpPort(80)}, + }} + }) + + out, err := Translate(Inputs{ + LocalPods: []Pod{web}, + PeerPods: []PeerPod{peer}, + Policies: []netv1.NetworkPolicy{policy}, + }, nil) + if err != nil { + t.Fatal(err) + } + if len(out.Rules) != 1 { + t.Fatalf("expected 1 rule, got %d: %+v", len(out.Rules), out.Rules) + } + r := out.Rules[0] + if r.PodKey != "ns1/web" || r.Direction != DirIngress { + t.Fatalf("rule has wrong subject: %+v", r) + } + if len(r.PeerCIDRs) != 1 || !r.PeerCIDRs[0].IP.Equal(clientIP) { + t.Fatalf("peer CIDR wrong: %+v", r.PeerCIDRs) + } + if len(r.Ports) != 1 || r.Ports[0].Protocol != "tcp" || r.Ports[0].Port != 80 { + t.Fatalf("port wrong: %+v", r.Ports) + } +} + +// TestTranslate_NamespaceSelector — peer is "every pod in any namespace +// with label tier=trusted". +func TestTranslate_NamespaceSelector(t *testing.T) { + web := Pod{ + Namespace: "ns1", Name: "web", + Labels: map[string]string{"app": "web"}, + HostIface: "f1", IPs: []net.IP{mustIP("2001:db8::1")}, + } + out, err := Translate(Inputs{ + LocalPods: []Pod{web}, + Namespaces: []Namespace{ + {Name: "ns1", Labels: map[string]string{}}, + {Name: "trusted-1", Labels: map[string]string{"tier": "trusted"}}, + {Name: "trusted-2", Labels: map[string]string{"tier": "trusted"}}, + {Name: "untrusted", Labels: map[string]string{"tier": "wild"}}, + }, + PeerPods: []PeerPod{ + {Namespace: "trusted-1", Name: "a", IPs: []net.IP{mustIP("2001:db8::a")}}, + {Namespace: "trusted-2", Name: "b", IPs: []net.IP{mustIP("2001:db8::b")}}, + {Namespace: "untrusted", Name: "x", IPs: []net.IP{mustIP("2001:db8::ff")}}, + }, + Policies: []netv1.NetworkPolicy{newPolicy("ns1", "allow-trusted", func(p *netv1.NetworkPolicy) { + p.Spec.PodSelector = *emptySelector() + p.Spec.PolicyTypes = []netv1.PolicyType{netv1.PolicyTypeIngress} + p.Spec.Ingress = []netv1.NetworkPolicyIngressRule{{ + From: []netv1.NetworkPolicyPeer{{ + NamespaceSelector: selectorMatching(map[string]string{"tier": "trusted"}), + }}, + }} + })}, + }, nil) + if err != nil { + t.Fatal(err) + } + if len(out.Rules) != 1 { + t.Fatalf("expected 1 rule, got %d", len(out.Rules)) + } + got := map[string]bool{} + for _, c := range out.Rules[0].PeerCIDRs { + got[c.IP.String()] = true + } + if !got["2001:db8::a"] || !got["2001:db8::b"] { + t.Fatalf("trusted pod IPs missing: %v", got) + } + if got["2001:db8::ff"] { + t.Fatalf("untrusted pod IP leaked into rule") + } +} + +// TestTranslate_IPBlockWithExcept — ipBlock with an except range. +func TestTranslate_IPBlockWithExcept(t *testing.T) { + pod := Pod{ + Namespace: "ns1", Name: "web", HostIface: "f1", + Labels: map[string]string{"app": "web"}, + IPs: []net.IP{mustIP("10.0.0.1")}, + } + policy := newPolicy("ns1", "ipblock", func(p *netv1.NetworkPolicy) { + p.Spec.PodSelector = *emptySelector() + p.Spec.PolicyTypes = []netv1.PolicyType{netv1.PolicyTypeIngress} + p.Spec.Ingress = []netv1.NetworkPolicyIngressRule{{ + From: []netv1.NetworkPolicyPeer{{ + IPBlock: &netv1.IPBlock{ + CIDR: "10.0.0.0/8", + Except: []string{"10.99.0.0/16", "10.42.42.0/24"}, + }, + }}, + }} + }) + out, err := Translate(Inputs{ + LocalPods: []Pod{pod}, + Policies: []netv1.NetworkPolicy{policy}, + }, nil) + if err != nil { + t.Fatal(err) + } + if len(out.Rules) != 1 { + t.Fatalf("expected 1 rule, got %d", len(out.Rules)) + } + r := out.Rules[0] + if len(r.PeerCIDRs) != 1 || r.PeerCIDRs[0].String() != "10.0.0.0/8" { + t.Fatalf("peer CIDR wrong: %v", r.PeerCIDRs) + } + if len(r.PeerExcept) != 2 { + t.Fatalf("expected 2 except, got %d", len(r.PeerExcept)) + } +} + +// TestTranslate_AllowAllPeers — empty From list means "from anywhere". +func TestTranslate_AllowAllPeers(t *testing.T) { + pod := Pod{ + Namespace: "ns1", Name: "web", HostIface: "f1", + Labels: map[string]string{"app": "web"}, + IPs: []net.IP{mustIP("2001:db8::1")}, + } + policy := newPolicy("ns1", "allow-all-on-port", func(p *netv1.NetworkPolicy) { + p.Spec.PodSelector = *emptySelector() + p.Spec.PolicyTypes = []netv1.PolicyType{netv1.PolicyTypeIngress} + p.Spec.Ingress = []netv1.NetworkPolicyIngressRule{{ + Ports: []netv1.NetworkPolicyPort{tcpPort(443)}, + }} + }) + out, _ := Translate(Inputs{LocalPods: []Pod{pod}, Policies: []netv1.NetworkPolicy{policy}}, nil) + if len(out.Rules) != 1 { + t.Fatalf("expected 1 rule, got %d", len(out.Rules)) + } + r := out.Rules[0] + if len(r.PeerCIDRs) != 0 || len(r.PeerExcept) != 0 { + t.Fatalf("expected allow-all peers, got CIDRs=%v Except=%v", r.PeerCIDRs, r.PeerExcept) + } +} + +// TestTranslate_AllowAllPorts — empty Ports list means "all ports". +func TestTranslate_AllowAllPorts(t *testing.T) { + pod := Pod{ + Namespace: "ns1", Name: "web", HostIface: "f1", + Labels: map[string]string{"app": "web"}, + IPs: []net.IP{mustIP("2001:db8::1")}, + } + policy := newPolicy("ns1", "allow-from-all", func(p *netv1.NetworkPolicy) { + p.Spec.PodSelector = *emptySelector() + p.Spec.PolicyTypes = []netv1.PolicyType{netv1.PolicyTypeIngress} + p.Spec.Ingress = []netv1.NetworkPolicyIngressRule{{ + From: []netv1.NetworkPolicyPeer{{ + PodSelector: emptySelector(), + }}, + }} + }) + peer := PeerPod{ + Namespace: "ns1", Name: "x", + IPs: []net.IP{mustIP("2001:db8::aa")}, + } + out, _ := Translate(Inputs{ + LocalPods: []Pod{pod}, PeerPods: []PeerPod{peer}, + Policies: []netv1.NetworkPolicy{policy}, + }, nil) + if len(out.Rules) != 1 { + t.Fatalf("expected 1 rule, got %d", len(out.Rules)) + } + r := out.Rules[0] + if len(r.Ports) != 1 || r.Ports[0] != (PortMatch{}) { + t.Fatalf("expected single any-port match, got %+v", r.Ports) + } +} + +// TestTranslate_PortRange — endPort field. +func TestTranslate_PortRange(t *testing.T) { + pod := Pod{ + Namespace: "ns1", Name: "web", HostIface: "f1", + Labels: map[string]string{"app": "web"}, + IPs: []net.IP{mustIP("2001:db8::1")}, + } + policy := newPolicy("ns1", "range", func(p *netv1.NetworkPolicy) { + p.Spec.PodSelector = *emptySelector() + p.Spec.PolicyTypes = []netv1.PolicyType{netv1.PolicyTypeIngress} + proto := corev1.ProtocolTCP + port := intstr.FromInt32(8000) + end := int32(8999) + p.Spec.Ingress = []netv1.NetworkPolicyIngressRule{{ + Ports: []netv1.NetworkPolicyPort{{Protocol: &proto, Port: &port, EndPort: &end}}, + }} + }) + out, _ := Translate(Inputs{LocalPods: []Pod{pod}, Policies: []netv1.NetworkPolicy{policy}}, nil) + if len(out.Rules) != 1 || out.Rules[0].Ports[0].Port != 8000 || out.Rules[0].Ports[0].EndPort != 8999 { + t.Fatalf("range not preserved: %+v", out.Rules) + } +} + +// TestTranslate_NamedPortRejected — named ports aren't supported yet; +// translator must skip the rule and warn. +func TestTranslate_NamedPortRejected(t *testing.T) { + pod := Pod{ + Namespace: "ns1", Name: "web", HostIface: "f1", + Labels: map[string]string{"app": "web"}, + IPs: []net.IP{mustIP("2001:db8::1")}, + } + proto := corev1.ProtocolTCP + named := intstr.FromString("http") + policy := newPolicy("ns1", "named", func(p *netv1.NetworkPolicy) { + p.Spec.PodSelector = *emptySelector() + p.Spec.PolicyTypes = []netv1.PolicyType{netv1.PolicyTypeIngress} + p.Spec.Ingress = []netv1.NetworkPolicyIngressRule{{ + Ports: []netv1.NetworkPolicyPort{{Protocol: &proto, Port: &named}}, + }} + }) + var warns []string + out, _ := Translate(Inputs{LocalPods: []Pod{pod}, Policies: []netv1.NetworkPolicy{policy}}, func(s string) { + warns = append(warns, s) + }) + if len(out.Rules) != 0 { + t.Fatalf("expected named-port rule to be skipped") + } + if len(warns) == 0 { + t.Fatalf("expected a warning about named ports") + } + // The pod should still be isolated since the policy selected it. + in, _ := isolationFor(out, "ns1/web") + if !in { + t.Fatalf("pod should be isolated even when its rule is dropped") + } +} + +// TestTranslate_PolicyOnlyAppliesToOwnNamespace — a policy in nsA does +// NOT select pods in nsB even if their labels match. +func TestTranslate_PolicyScopedToNamespace(t *testing.T) { + a := Pod{Namespace: "nsA", Name: "p", HostIface: "f1", + Labels: map[string]string{"app": "web"}, IPs: []net.IP{mustIP("2001:db8::1")}} + b := Pod{Namespace: "nsB", Name: "p", HostIface: "f2", + Labels: map[string]string{"app": "web"}, IPs: []net.IP{mustIP("2001:db8::2")}} + policy := newPolicy("nsA", "deny", func(p *netv1.NetworkPolicy) { + p.Spec.PodSelector = *selectorMatching(map[string]string{"app": "web"}) + p.Spec.PolicyTypes = []netv1.PolicyType{netv1.PolicyTypeIngress} + }) + out, _ := Translate(Inputs{LocalPods: []Pod{a, b}, Policies: []netv1.NetworkPolicy{policy}}, nil) + inA, _ := isolationFor(out, "nsA/p") + inB, _ := isolationFor(out, "nsB/p") + if !inA { + t.Fatalf("nsA/p should be isolated") + } + if inB { + t.Fatalf("nsB/p must NOT be isolated by a policy in nsA") + } +} + +// TestTranslate_PodWithoutAllocationSkipped — pod with no IPs is silently +// skipped (its rule could not match any traffic anyway). +func TestTranslate_PodWithoutAllocationSkipped(t *testing.T) { + pod := Pod{Namespace: "ns1", Name: "p", HostIface: "f1", + Labels: map[string]string{"app": "web"}} + policy := newPolicy("ns1", "deny", func(p *netv1.NetworkPolicy) { + p.Spec.PodSelector = *emptySelector() + p.Spec.PolicyTypes = []netv1.PolicyType{netv1.PolicyTypeIngress} + }) + out, _ := Translate(Inputs{LocalPods: []Pod{pod}, Policies: []netv1.NetworkPolicy{policy}}, nil) + in, _ := isolationFor(out, "ns1/p") + if in { + t.Fatalf("pod without IP should not appear in output") + } +} + +// TestTranslate_Determinism — translating the same Inputs twice produces +// equal outputs (Rules in equal order, Isolated equal). +func TestTranslate_Determinism(t *testing.T) { + pod := Pod{ + Namespace: "ns1", Name: "web", HostIface: "f1", + Labels: map[string]string{"app": "web"}, + IPs: []net.IP{mustIP("2001:db8::1")}, + } + peers := []PeerPod{ + {Namespace: "ns1", Name: "z", Labels: map[string]string{"app": "client"}, IPs: []net.IP{mustIP("2001:db8::2")}}, + {Namespace: "ns1", Name: "a", Labels: map[string]string{"app": "client"}, IPs: []net.IP{mustIP("2001:db8::3")}}, + } + policies := []netv1.NetworkPolicy{ + newPolicy("ns1", "z-second", func(p *netv1.NetworkPolicy) { + p.Spec.PodSelector = *emptySelector() + p.Spec.PolicyTypes = []netv1.PolicyType{netv1.PolicyTypeIngress} + p.Spec.Ingress = []netv1.NetworkPolicyIngressRule{{ + From: []netv1.NetworkPolicyPeer{{ + PodSelector: selectorMatching(map[string]string{"app": "client"}), + }}, + }} + }), + } + in := Inputs{LocalPods: []Pod{pod}, PeerPods: peers, Policies: policies} + a, _ := Translate(in, nil) + b, _ := Translate(in, nil) + if len(a.Rules) != len(b.Rules) { + t.Fatalf("rule count differs: %d vs %d", len(a.Rules), len(b.Rules)) + } + for i := range a.Rules { + if a.Rules[i].PodKey != b.Rules[i].PodKey || len(a.Rules[i].PeerCIDRs) != len(b.Rules[i].PeerCIDRs) { + t.Fatalf("rule[%d] differs", i) + } + } +} diff --git a/pkg/agent/netpol/types.go b/pkg/agent/netpol/types.go new file mode 100644 index 0000000..8efa5a1 --- /dev/null +++ b/pkg/agent/netpol/types.go @@ -0,0 +1,147 @@ +package netpol + +import "net" + +// Direction is the NetworkPolicy direction, named from the *pod's* +// perspective (matching the NetworkPolicy API). "Ingress" is traffic +// arriving at the pod; "Egress" is traffic the pod initiates. +// +// Note that on the host this maps the opposite way at the veth: an +// Ingress rule matches packets whose oifname is the pod's host-side veth +// (the kernel is forwarding into the pod), and an Egress rule matches +// packets whose iifname is the pod's host-side veth (the kernel just +// received from the pod). +type Direction int + +const ( + DirIngress Direction = iota + DirEgress +) + +// String returns the lower-case wire form ("ingress" / "egress"). +func (d Direction) String() string { + if d == DirEgress { + return "egress" + } + return "ingress" +} + +// Pod is the local-pod information the translator needs. The reconciler +// populates this from its store of CNI allocations — every pod with a +// committed allocation on this node appears here. +type Pod struct { + // Namespace + Name uniquely identify the pod. + Namespace string + Name string + // Labels are the pod labels. NetworkPolicy.Spec.PodSelector matches + // against these. + Labels map[string]string + // HostIface is the host-side veth name (e.g. "flock1a2b3c4d"). All + // rules guarding this pod hook off iifname/oifname == HostIface. + HostIface string + // IPs are the pod's eth0 addresses (IPv6 and/or IPv4). Empty means + // the agent has no allocation for this pod yet — translator should + // skip such pods. + IPs []net.IP +} + +// PeerPod is a (potentially remote) pod whose IPs may be referenced as a +// NetworkPolicy peer. The translator resolves podSelector + +// namespaceSelector peers to their IPs by walking the cluster-wide +// peer-pod set. +type PeerPod struct { + Namespace string + Name string + Labels map[string]string + IPs []net.IP +} + +// Namespace carries just enough metadata for namespaceSelector matching. +type Namespace struct { + Name string + Labels map[string]string +} + +// LocalPod is the renderer-visible subset of a local pod — just enough +// to anchor a base-chain jump. Carried in Output so the renderer can +// emit chains for default-deny pods that have no explicit allow rules. +type LocalPod struct { + PodKey string + HostIface string + IPs []net.IP +} + +// PortMatch is one allowed (protocol, port) tuple. EndPort is inclusive; +// when zero the rule matches the single Port. +type PortMatch struct { + Protocol string // "tcp", "udp", "sctp"; empty means "any of the three" + Port int // 1..65535. Zero means "any port". + EndPort int // 0 if not a range; otherwise inclusive range end. +} + +// Rule is the canonical intermediate representation between the translator +// and the renderer. One Rule is one accept-line in the rendered nft +// script. A pod's chain is the ordered concatenation of every Rule whose +// PodKey matches; any packet that falls off the end is denied by the +// trailing default-deny verdict (the chain has policy drop). +// +// PeerCIDRs are OR'd together, then PeerExcept is subtracted. Empty +// PeerCIDRs + empty PeerExcept means "any source/destination". +type Rule struct { + // PodKey is namespace/name of the pod this rule guards. Used by the + // renderer to slot the rule into the correct chain. + PodKey string + + // HostIface is the pod's host-side veth name; the renderer uses it + // to anchor the base-chain jump. + HostIface string + + // PodIPs are the pod's eth0 addresses. The base chain matches on + // (oifname == HostIface AND daddr ∈ PodIPs) for ingress, and + // (iifname == HostIface AND saddr ∈ PodIPs) for egress, so packets + // that aren't destined to / from the actual pod address don't get + // counted as policy-protected. + PodIPs []net.IP + + // Direction is Ingress or Egress, named from the pod's perspective. + Direction Direction + + // Action is "accept" for explicit allows; default-deny is implicit + // in the chain's policy drop and is not represented as a Rule. + // (Reserved for future deny-list semantics like AdminNetworkPolicy.) + Action Action + + // PeerCIDRs are the addresses of allowed peers. OR'd together. + // Empty means "any peer". + PeerCIDRs []*net.IPNet + + // PeerExcept narrows PeerCIDRs by subtracting these ranges. Only + // meaningful with non-empty PeerCIDRs (it comes from + // ipBlock.except, which requires ipBlock.cidr). + PeerExcept []*net.IPNet + + // Ports is the set of allowed (protocol, port) tuples. Empty means + // "any port / any protocol". + Ports []PortMatch +} + +// Action is the verdict emitted by a Rule. +type Action int + +const ( + // ActionAccept lets the packet through. The default-deny is implicit + // in the chain policy. + ActionAccept Action = iota + // ActionDrop is reserved for future use (AdminNetworkPolicy / + // BaselineAdminNetworkPolicy explicit denies). Not produced by the + // v1 translator. + ActionDrop +) + +// String returns the nft-syntax verdict. +func (a Action) String() string { + if a == ActionDrop { + return "drop" + } + return "accept" +} diff --git a/pkg/agent/netpol_bridge.go b/pkg/agent/netpol_bridge.go new file mode 100644 index 0000000..ee9c630 --- /dev/null +++ b/pkg/agent/netpol_bridge.go @@ -0,0 +1,56 @@ +package agent + +import ( + "net" + + "code.fritzlab.net/fritzlab/flock/pkg/agent/netpol" +) + +// collectLocalPods bridges the agent's allocation store + pod informer +// cache into the netpol-package input shape. It returns one Pod per +// committed allocation that has a matching pod in the informer cache; +// allocations whose pod was just deleted (DEL race) are skipped. +// +// Called on every netpol reconcile pass, so it must be cheap. The work +// here is O(allocations) and reads from in-memory maps only. +func collectLocalPods(store *Store, pods *PodCache) []netpol.Pod { + allocs := store.Snapshot() + out := make([]netpol.Pod, 0, len(allocs)) + for _, a := range allocs { + if a.State != StateCommitted { + continue + } + pod, ok := pods.Get(a.Namespace, a.PodName) + if !ok { + // Pod evicted but DEL hasn't fired yet; nothing to enforce. + continue + } + ips := allocationIPs(a) + if len(ips) == 0 { + continue + } + out = append(out, netpol.Pod{ + Namespace: a.Namespace, + Name: a.PodName, + Labels: pod.Labels, + HostIface: HostIfaceName(a.ContainerID), + IPs: ips, + }) + } + return out +} + +func allocationIPs(a Allocation) []net.IP { + var out []net.IP + if a.IP6 != "" { + if ip := net.ParseIP(a.IP6); ip != nil { + out = append(out, ip) + } + } + if a.IP4 != "" { + if ip := net.ParseIP(a.IP4); ip != nil { + out = append(out, ip) + } + } + return out +} diff --git a/pkg/agent/runtime_linux.go b/pkg/agent/runtime_linux.go index 8d3c1ea..7913b34 100644 --- a/pkg/agent/runtime_linux.go +++ b/pkg/agent/runtime_linux.go @@ -7,6 +7,8 @@ import ( "fmt" "net" "time" + + "code.fritzlab.net/fritzlab/flock/pkg/agent/netpol" ) // configureRuntime wires Pod informer, IPAM, netlink, and BIRD on a real @@ -103,6 +105,17 @@ func (s *Server) configureRuntime(ctx context.Context) error { } }() + // NetworkPolicy enforcement. + world := netpol.NewWorld(s.Logger) + if err := world.Start(ctx, s.restCfg); err != nil { + return fmt.Errorf("netpol informers: %w", err) + } + npApplier := &netpol.Applier{} + npReconciler := netpol.NewReconciler(world, func() []netpol.Pod { + return collectLocalPods(s.Store, pods) + }, npApplier, s.Logger) + go npReconciler.Run(ctx) + handler := &PodHandler{ Node: s.Node, Store: s.Store, @@ -111,7 +124,12 @@ func (s *Server) configureRuntime(ctx context.Context) error { NodeConfig: s.NodeConfig, SetupFunc: Setup, TeardownFunc: Teardown, - AfterCommit: anycast.Trigger, + AfterCommit: func() { + anycast.Trigger() + // Re-evaluate policy on every CNI ADD/DEL so a brand-new + // pod's chain lands before its first packet egresses. + npReconciler.Trigger() + }, } s.RPC.SetHandlers(handler.Add, handler.Del, handler.Check) s.Logger.Info("runtime ready",