package netpol import ( "context" "fmt" "log/slog" "net" "sync" "time" corev1 "k8s.io/api/core/v1" netv1 "k8s.io/api/networking/v1" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" ) // World aggregates the cluster-wide caches the reconciler queries on // every pass: NetworkPolicies, Namespaces, and all Pods (for peer // resolution). Each field is safe for concurrent reads. type World struct { logger *slog.Logger mu sync.RWMutex policies map[string]netv1.NetworkPolicy // key = ns/name namespaces map[string]Namespace peerPods map[string]PeerPod // key = ns/name onChange []func() } // NewWorld returns an empty World. Callers should call Start to populate // it; before Start, the snapshot accessors return empty slices. func NewWorld(logger *slog.Logger) *World { return &World{ logger: logger, policies: map[string]netv1.NetworkPolicy{}, namespaces: map[string]Namespace{}, peerPods: map[string]PeerPod{}, } } // OnChange registers a callback fired (synchronously, inside the informer // event handler) whenever any watched object changes. The reconciler // uses this to debounce policy reloads. func (w *World) OnChange(f func()) { w.mu.Lock() defer w.mu.Unlock() w.onChange = append(w.onChange, f) } func (w *World) fireChange() { w.mu.RLock() cbs := append([]func(){}, w.onChange...) w.mu.RUnlock() for _, f := range cbs { f() } } // Start launches three informers (NetworkPolicy, Namespace, Pod) against // the cluster API. It blocks until each cache reports synced. The caller // is responsible for cancelling ctx on shutdown. func (w *World) Start(ctx context.Context, cfg *rest.Config) error { cs, err := kubernetes.NewForConfig(cfg) if err != nil { return fmt.Errorf("kubernetes client: %w", err) } factory := informers.NewSharedInformerFactory(cs, 10*time.Minute) npInformer := factory.Networking().V1().NetworkPolicies().Informer() nsInformer := factory.Core().V1().Namespaces().Informer() podInformer := factory.Core().V1().Pods().Informer() if _, err := npInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { w.onPolicy(obj, false) }, UpdateFunc: func(_, n interface{}) { w.onPolicy(n, false) }, DeleteFunc: func(obj interface{}) { w.onPolicy(obj, true) }, }); err != nil { return fmt.Errorf("add netpol handler: %w", err) } if _, err := nsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { w.onNamespace(obj, false) }, UpdateFunc: func(_, n interface{}) { w.onNamespace(n, false) }, DeleteFunc: func(obj interface{}) { w.onNamespace(obj, true) }, }); err != nil { return fmt.Errorf("add ns handler: %w", err) } if _, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { w.onPod(obj, false) }, UpdateFunc: func(_, n interface{}) { w.onPod(n, false) }, DeleteFunc: func(obj interface{}) { w.onPod(obj, true) }, }); err != nil { return fmt.Errorf("add pod handler: %w", err) } w.logger.Info("netpol informers starting") factory.Start(ctx.Done()) if !cache.WaitForCacheSync(ctx.Done(), npInformer.HasSynced, nsInformer.HasSynced, podInformer.HasSynced) { return fmt.Errorf("netpol informer caches failed to sync") } w.logger.Info("netpol informers synced", "netpols", len(w.snapshotPolicies()), "namespaces", len(w.snapshotNamespaces()), "peer_pods", len(w.snapshotPeerPods())) return nil } // unwrapDFSU lifts a DeletedFinalStateUnknown wrapper if present. func unwrapDFSU(obj interface{}) interface{} { if d, ok := obj.(cache.DeletedFinalStateUnknown); ok { return d.Obj } return obj } func (w *World) onPolicy(obj interface{}, deleted bool) { p, ok := unwrapDFSU(obj).(*netv1.NetworkPolicy) if !ok || p == nil { return } key := p.Namespace + "/" + p.Name w.mu.Lock() if deleted { delete(w.policies, key) } else { w.policies[key] = *p } w.mu.Unlock() w.fireChange() } func (w *World) onNamespace(obj interface{}, deleted bool) { ns, ok := unwrapDFSU(obj).(*corev1.Namespace) if !ok || ns == nil { return } w.mu.Lock() if deleted { delete(w.namespaces, ns.Name) } else { w.namespaces[ns.Name] = Namespace{Name: ns.Name, Labels: ns.Labels} } w.mu.Unlock() w.fireChange() } func (w *World) onPod(obj interface{}, deleted bool) { pod, ok := unwrapDFSU(obj).(*corev1.Pod) if !ok || pod == nil { return } key := pod.Namespace + "/" + pod.Name w.mu.Lock() if deleted { delete(w.peerPods, key) } else { w.peerPods[key] = PeerPod{ Namespace: pod.Namespace, Name: pod.Name, Labels: pod.Labels, IPs: podIPs(pod), } } w.mu.Unlock() w.fireChange() } // podIPs extracts every PodIP from the status. Pods without status (still // scheduling) yield nil — safe for the translator. func podIPs(p *corev1.Pod) []net.IP { out := make([]net.IP, 0, len(p.Status.PodIPs)) for _, addr := range p.Status.PodIPs { ip := net.ParseIP(addr.IP) if ip == nil { continue } out = append(out, ip) } if len(out) == 0 && p.Status.PodIP != "" { // Older clusters may populate PodIP but not PodIPs; tolerate both. if ip := net.ParseIP(p.Status.PodIP); ip != nil { out = append(out, ip) } } return out } // snapshotPolicies returns a defensive copy of the policy map's values. func (w *World) snapshotPolicies() []netv1.NetworkPolicy { w.mu.RLock() defer w.mu.RUnlock() out := make([]netv1.NetworkPolicy, 0, len(w.policies)) for _, p := range w.policies { out = append(out, p) } return out } // snapshotNamespaces returns a defensive copy of the namespace map. func (w *World) snapshotNamespaces() []Namespace { w.mu.RLock() defer w.mu.RUnlock() out := make([]Namespace, 0, len(w.namespaces)) for _, n := range w.namespaces { out = append(out, n) } return out } // snapshotPeerPods returns a defensive copy of the peer-pod map. func (w *World) snapshotPeerPods() []PeerPod { w.mu.RLock() defer w.mu.RUnlock() out := make([]PeerPod, 0, len(w.peerPods)) for _, p := range w.peerPods { out = append(out, p) } return out }