Files
flock/pkg/agent/netpol/informers.go
T
Donavan Fritz 39ede9130b
Build flock Image / build (push) Has been cancelled
netpol: NetworkPolicy v1 enforcement via nftables
New pkg/agent/netpol implementing standard networking.k8s.io/v1
NetworkPolicy. Pipeline:

  pods + policies + namespaces  →  Translate  →  Render  →  Apply

Supports ingress + egress, all three peer types (podSelector,
namespaceSelector, ipBlock with except), numeric ports + port ranges,
default-deny semantics derived from PolicyTypes (or inferred from
non-empty Spec.Egress when unset).

Apply path is `nft -f -` shell-out — single transaction, atomic, kernel
guarantees partial-failure rollback. Idempotent dedup via last-applied
script. Reconcile triggers: informer events, 30s self-heal tick, every
CNI ADD/DEL.

Verified against the three live cluster NetPols (calico-apiserver,
remote-proxies/lodge-home-assistant, storage/garage-admin-restrict).
Fuzz target stitches Translate + Render with random selector and peer
inputs; 21 unit tests cover the policy semantics.

Named ports skip with a warn — deferred until kubelet exposes them in a
form that doesn't require shadowing pod state.

Dockerfile: + nftables.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
2026-04-25 09:25:58 -05:00

223 lines
6.0 KiB
Go

package netpol
import (
"context"
"fmt"
"log/slog"
"net"
"sync"
"time"
corev1 "k8s.io/api/core/v1"
netv1 "k8s.io/api/networking/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
// World aggregates the cluster-wide caches the reconciler queries on
// every pass: NetworkPolicies, Namespaces, and all Pods (for peer
// resolution). Each field is safe for concurrent reads.
type World struct {
logger *slog.Logger
mu sync.RWMutex
policies map[string]netv1.NetworkPolicy // key = ns/name
namespaces map[string]Namespace
peerPods map[string]PeerPod // key = ns/name
onChange []func()
}
// NewWorld returns an empty World. Callers should call Start to populate
// it; before Start, the snapshot accessors return empty slices.
func NewWorld(logger *slog.Logger) *World {
return &World{
logger: logger,
policies: map[string]netv1.NetworkPolicy{},
namespaces: map[string]Namespace{},
peerPods: map[string]PeerPod{},
}
}
// OnChange registers a callback fired (synchronously, inside the informer
// event handler) whenever any watched object changes. The reconciler
// uses this to debounce policy reloads.
func (w *World) OnChange(f func()) {
w.mu.Lock()
defer w.mu.Unlock()
w.onChange = append(w.onChange, f)
}
func (w *World) fireChange() {
w.mu.RLock()
cbs := append([]func(){}, w.onChange...)
w.mu.RUnlock()
for _, f := range cbs {
f()
}
}
// Start launches three informers (NetworkPolicy, Namespace, Pod) against
// the cluster API. It blocks until each cache reports synced. The caller
// is responsible for cancelling ctx on shutdown.
func (w *World) Start(ctx context.Context, cfg *rest.Config) error {
cs, err := kubernetes.NewForConfig(cfg)
if err != nil {
return fmt.Errorf("kubernetes client: %w", err)
}
factory := informers.NewSharedInformerFactory(cs, 10*time.Minute)
npInformer := factory.Networking().V1().NetworkPolicies().Informer()
nsInformer := factory.Core().V1().Namespaces().Informer()
podInformer := factory.Core().V1().Pods().Informer()
if _, err := npInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { w.onPolicy(obj, false) },
UpdateFunc: func(_, n interface{}) { w.onPolicy(n, false) },
DeleteFunc: func(obj interface{}) { w.onPolicy(obj, true) },
}); err != nil {
return fmt.Errorf("add netpol handler: %w", err)
}
if _, err := nsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { w.onNamespace(obj, false) },
UpdateFunc: func(_, n interface{}) { w.onNamespace(n, false) },
DeleteFunc: func(obj interface{}) { w.onNamespace(obj, true) },
}); err != nil {
return fmt.Errorf("add ns handler: %w", err)
}
if _, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { w.onPod(obj, false) },
UpdateFunc: func(_, n interface{}) { w.onPod(n, false) },
DeleteFunc: func(obj interface{}) { w.onPod(obj, true) },
}); err != nil {
return fmt.Errorf("add pod handler: %w", err)
}
w.logger.Info("netpol informers starting")
factory.Start(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(),
npInformer.HasSynced, nsInformer.HasSynced, podInformer.HasSynced) {
return fmt.Errorf("netpol informer caches failed to sync")
}
w.logger.Info("netpol informers synced",
"netpols", len(w.snapshotPolicies()),
"namespaces", len(w.snapshotNamespaces()),
"peer_pods", len(w.snapshotPeerPods()))
return nil
}
// unwrapDFSU lifts a DeletedFinalStateUnknown wrapper if present.
func unwrapDFSU(obj interface{}) interface{} {
if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
return d.Obj
}
return obj
}
func (w *World) onPolicy(obj interface{}, deleted bool) {
p, ok := unwrapDFSU(obj).(*netv1.NetworkPolicy)
if !ok || p == nil {
return
}
key := p.Namespace + "/" + p.Name
w.mu.Lock()
if deleted {
delete(w.policies, key)
} else {
w.policies[key] = *p
}
w.mu.Unlock()
w.fireChange()
}
func (w *World) onNamespace(obj interface{}, deleted bool) {
ns, ok := unwrapDFSU(obj).(*corev1.Namespace)
if !ok || ns == nil {
return
}
w.mu.Lock()
if deleted {
delete(w.namespaces, ns.Name)
} else {
w.namespaces[ns.Name] = Namespace{Name: ns.Name, Labels: ns.Labels}
}
w.mu.Unlock()
w.fireChange()
}
func (w *World) onPod(obj interface{}, deleted bool) {
pod, ok := unwrapDFSU(obj).(*corev1.Pod)
if !ok || pod == nil {
return
}
key := pod.Namespace + "/" + pod.Name
w.mu.Lock()
if deleted {
delete(w.peerPods, key)
} else {
w.peerPods[key] = PeerPod{
Namespace: pod.Namespace,
Name: pod.Name,
Labels: pod.Labels,
IPs: podIPs(pod),
}
}
w.mu.Unlock()
w.fireChange()
}
// podIPs extracts every PodIP from the status. Pods without status (still
// scheduling) yield nil — safe for the translator.
func podIPs(p *corev1.Pod) []net.IP {
out := make([]net.IP, 0, len(p.Status.PodIPs))
for _, addr := range p.Status.PodIPs {
ip := net.ParseIP(addr.IP)
if ip == nil {
continue
}
out = append(out, ip)
}
if len(out) == 0 && p.Status.PodIP != "" {
// Older clusters may populate PodIP but not PodIPs; tolerate both.
if ip := net.ParseIP(p.Status.PodIP); ip != nil {
out = append(out, ip)
}
}
return out
}
// snapshotPolicies returns a defensive copy of the policy map's values.
func (w *World) snapshotPolicies() []netv1.NetworkPolicy {
w.mu.RLock()
defer w.mu.RUnlock()
out := make([]netv1.NetworkPolicy, 0, len(w.policies))
for _, p := range w.policies {
out = append(out, p)
}
return out
}
// snapshotNamespaces returns a defensive copy of the namespace map.
func (w *World) snapshotNamespaces() []Namespace {
w.mu.RLock()
defer w.mu.RUnlock()
out := make([]Namespace, 0, len(w.namespaces))
for _, n := range w.namespaces {
out = append(out, n)
}
return out
}
// snapshotPeerPods returns a defensive copy of the peer-pod map.
func (w *World) snapshotPeerPods() []PeerPod {
w.mu.RLock()
defer w.mu.RUnlock()
out := make([]PeerPod, 0, len(w.peerPods))
for _, p := range w.peerPods {
out = append(out, p)
}
return out
}