39ede9130b
Build flock Image / build (push) Has been cancelled
New pkg/agent/netpol implementing standard networking.k8s.io/v1 NetworkPolicy. Pipeline: pods + policies + namespaces → Translate → Render → Apply Supports ingress + egress, all three peer types (podSelector, namespaceSelector, ipBlock with except), numeric ports + port ranges, default-deny semantics derived from PolicyTypes (or inferred from non-empty Spec.Egress when unset). Apply path is `nft -f -` shell-out — single transaction, atomic, kernel guarantees partial-failure rollback. Idempotent dedup via last-applied script. Reconcile triggers: informer events, 30s self-heal tick, every CNI ADD/DEL. Verified against the three live cluster NetPols (calico-apiserver, remote-proxies/lodge-home-assistant, storage/garage-admin-restrict). Fuzz target stitches Translate + Render with random selector and peer inputs; 21 unit tests cover the policy semantics. Named ports skip with a warn — deferred until kubelet exposes them in a form that doesn't require shadowing pod state. Dockerfile: + nftables. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
223 lines
6.0 KiB
Go
223 lines
6.0 KiB
Go
package netpol
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"net"
|
|
"sync"
|
|
"time"
|
|
|
|
corev1 "k8s.io/api/core/v1"
|
|
netv1 "k8s.io/api/networking/v1"
|
|
"k8s.io/client-go/informers"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/rest"
|
|
"k8s.io/client-go/tools/cache"
|
|
)
|
|
|
|
// World aggregates the cluster-wide caches the reconciler queries on
|
|
// every pass: NetworkPolicies, Namespaces, and all Pods (for peer
|
|
// resolution). Each field is safe for concurrent reads.
|
|
type World struct {
|
|
logger *slog.Logger
|
|
|
|
mu sync.RWMutex
|
|
policies map[string]netv1.NetworkPolicy // key = ns/name
|
|
namespaces map[string]Namespace
|
|
peerPods map[string]PeerPod // key = ns/name
|
|
|
|
onChange []func()
|
|
}
|
|
|
|
// NewWorld returns an empty World. Callers should call Start to populate
|
|
// it; before Start, the snapshot accessors return empty slices.
|
|
func NewWorld(logger *slog.Logger) *World {
|
|
return &World{
|
|
logger: logger,
|
|
policies: map[string]netv1.NetworkPolicy{},
|
|
namespaces: map[string]Namespace{},
|
|
peerPods: map[string]PeerPod{},
|
|
}
|
|
}
|
|
|
|
// OnChange registers a callback fired (synchronously, inside the informer
|
|
// event handler) whenever any watched object changes. The reconciler
|
|
// uses this to debounce policy reloads.
|
|
func (w *World) OnChange(f func()) {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
w.onChange = append(w.onChange, f)
|
|
}
|
|
|
|
func (w *World) fireChange() {
|
|
w.mu.RLock()
|
|
cbs := append([]func(){}, w.onChange...)
|
|
w.mu.RUnlock()
|
|
for _, f := range cbs {
|
|
f()
|
|
}
|
|
}
|
|
|
|
// Start launches three informers (NetworkPolicy, Namespace, Pod) against
|
|
// the cluster API. It blocks until each cache reports synced. The caller
|
|
// is responsible for cancelling ctx on shutdown.
|
|
func (w *World) Start(ctx context.Context, cfg *rest.Config) error {
|
|
cs, err := kubernetes.NewForConfig(cfg)
|
|
if err != nil {
|
|
return fmt.Errorf("kubernetes client: %w", err)
|
|
}
|
|
factory := informers.NewSharedInformerFactory(cs, 10*time.Minute)
|
|
|
|
npInformer := factory.Networking().V1().NetworkPolicies().Informer()
|
|
nsInformer := factory.Core().V1().Namespaces().Informer()
|
|
podInformer := factory.Core().V1().Pods().Informer()
|
|
|
|
if _, err := npInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
AddFunc: func(obj interface{}) { w.onPolicy(obj, false) },
|
|
UpdateFunc: func(_, n interface{}) { w.onPolicy(n, false) },
|
|
DeleteFunc: func(obj interface{}) { w.onPolicy(obj, true) },
|
|
}); err != nil {
|
|
return fmt.Errorf("add netpol handler: %w", err)
|
|
}
|
|
if _, err := nsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
AddFunc: func(obj interface{}) { w.onNamespace(obj, false) },
|
|
UpdateFunc: func(_, n interface{}) { w.onNamespace(n, false) },
|
|
DeleteFunc: func(obj interface{}) { w.onNamespace(obj, true) },
|
|
}); err != nil {
|
|
return fmt.Errorf("add ns handler: %w", err)
|
|
}
|
|
if _, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
AddFunc: func(obj interface{}) { w.onPod(obj, false) },
|
|
UpdateFunc: func(_, n interface{}) { w.onPod(n, false) },
|
|
DeleteFunc: func(obj interface{}) { w.onPod(obj, true) },
|
|
}); err != nil {
|
|
return fmt.Errorf("add pod handler: %w", err)
|
|
}
|
|
|
|
w.logger.Info("netpol informers starting")
|
|
factory.Start(ctx.Done())
|
|
if !cache.WaitForCacheSync(ctx.Done(),
|
|
npInformer.HasSynced, nsInformer.HasSynced, podInformer.HasSynced) {
|
|
return fmt.Errorf("netpol informer caches failed to sync")
|
|
}
|
|
w.logger.Info("netpol informers synced",
|
|
"netpols", len(w.snapshotPolicies()),
|
|
"namespaces", len(w.snapshotNamespaces()),
|
|
"peer_pods", len(w.snapshotPeerPods()))
|
|
return nil
|
|
}
|
|
|
|
// unwrapDFSU lifts a DeletedFinalStateUnknown wrapper if present.
|
|
func unwrapDFSU(obj interface{}) interface{} {
|
|
if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
|
|
return d.Obj
|
|
}
|
|
return obj
|
|
}
|
|
|
|
func (w *World) onPolicy(obj interface{}, deleted bool) {
|
|
p, ok := unwrapDFSU(obj).(*netv1.NetworkPolicy)
|
|
if !ok || p == nil {
|
|
return
|
|
}
|
|
key := p.Namespace + "/" + p.Name
|
|
w.mu.Lock()
|
|
if deleted {
|
|
delete(w.policies, key)
|
|
} else {
|
|
w.policies[key] = *p
|
|
}
|
|
w.mu.Unlock()
|
|
w.fireChange()
|
|
}
|
|
|
|
func (w *World) onNamespace(obj interface{}, deleted bool) {
|
|
ns, ok := unwrapDFSU(obj).(*corev1.Namespace)
|
|
if !ok || ns == nil {
|
|
return
|
|
}
|
|
w.mu.Lock()
|
|
if deleted {
|
|
delete(w.namespaces, ns.Name)
|
|
} else {
|
|
w.namespaces[ns.Name] = Namespace{Name: ns.Name, Labels: ns.Labels}
|
|
}
|
|
w.mu.Unlock()
|
|
w.fireChange()
|
|
}
|
|
|
|
func (w *World) onPod(obj interface{}, deleted bool) {
|
|
pod, ok := unwrapDFSU(obj).(*corev1.Pod)
|
|
if !ok || pod == nil {
|
|
return
|
|
}
|
|
key := pod.Namespace + "/" + pod.Name
|
|
w.mu.Lock()
|
|
if deleted {
|
|
delete(w.peerPods, key)
|
|
} else {
|
|
w.peerPods[key] = PeerPod{
|
|
Namespace: pod.Namespace,
|
|
Name: pod.Name,
|
|
Labels: pod.Labels,
|
|
IPs: podIPs(pod),
|
|
}
|
|
}
|
|
w.mu.Unlock()
|
|
w.fireChange()
|
|
}
|
|
|
|
// podIPs extracts every PodIP from the status. Pods without status (still
|
|
// scheduling) yield nil — safe for the translator.
|
|
func podIPs(p *corev1.Pod) []net.IP {
|
|
out := make([]net.IP, 0, len(p.Status.PodIPs))
|
|
for _, addr := range p.Status.PodIPs {
|
|
ip := net.ParseIP(addr.IP)
|
|
if ip == nil {
|
|
continue
|
|
}
|
|
out = append(out, ip)
|
|
}
|
|
if len(out) == 0 && p.Status.PodIP != "" {
|
|
// Older clusters may populate PodIP but not PodIPs; tolerate both.
|
|
if ip := net.ParseIP(p.Status.PodIP); ip != nil {
|
|
out = append(out, ip)
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
// snapshotPolicies returns a defensive copy of the policy map's values.
|
|
func (w *World) snapshotPolicies() []netv1.NetworkPolicy {
|
|
w.mu.RLock()
|
|
defer w.mu.RUnlock()
|
|
out := make([]netv1.NetworkPolicy, 0, len(w.policies))
|
|
for _, p := range w.policies {
|
|
out = append(out, p)
|
|
}
|
|
return out
|
|
}
|
|
|
|
// snapshotNamespaces returns a defensive copy of the namespace map.
|
|
func (w *World) snapshotNamespaces() []Namespace {
|
|
w.mu.RLock()
|
|
defer w.mu.RUnlock()
|
|
out := make([]Namespace, 0, len(w.namespaces))
|
|
for _, n := range w.namespaces {
|
|
out = append(out, n)
|
|
}
|
|
return out
|
|
}
|
|
|
|
// snapshotPeerPods returns a defensive copy of the peer-pod map.
|
|
func (w *World) snapshotPeerPods() []PeerPod {
|
|
w.mu.RLock()
|
|
defer w.mu.RUnlock()
|
|
out := make([]PeerPod, 0, len(w.peerPods))
|
|
for _, p := range w.peerPods {
|
|
out = append(out, p)
|
|
}
|
|
return out
|
|
}
|