223 lines
6.0 KiB
Go
223 lines
6.0 KiB
Go
|
|
package netpol
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"fmt"
|
||
|
|
"log/slog"
|
||
|
|
"net"
|
||
|
|
"sync"
|
||
|
|
"time"
|
||
|
|
|
||
|
|
corev1 "k8s.io/api/core/v1"
|
||
|
|
netv1 "k8s.io/api/networking/v1"
|
||
|
|
"k8s.io/client-go/informers"
|
||
|
|
"k8s.io/client-go/kubernetes"
|
||
|
|
"k8s.io/client-go/rest"
|
||
|
|
"k8s.io/client-go/tools/cache"
|
||
|
|
)
|
||
|
|
|
||
|
|
// World aggregates the cluster-wide caches the reconciler queries on
|
||
|
|
// every pass: NetworkPolicies, Namespaces, and all Pods (for peer
|
||
|
|
// resolution). Each field is safe for concurrent reads.
|
||
|
|
type World struct {
|
||
|
|
logger *slog.Logger
|
||
|
|
|
||
|
|
mu sync.RWMutex
|
||
|
|
policies map[string]netv1.NetworkPolicy // key = ns/name
|
||
|
|
namespaces map[string]Namespace
|
||
|
|
peerPods map[string]PeerPod // key = ns/name
|
||
|
|
|
||
|
|
onChange []func()
|
||
|
|
}
|
||
|
|
|
||
|
|
// NewWorld returns an empty World. Callers should call Start to populate
|
||
|
|
// it; before Start, the snapshot accessors return empty slices.
|
||
|
|
func NewWorld(logger *slog.Logger) *World {
|
||
|
|
return &World{
|
||
|
|
logger: logger,
|
||
|
|
policies: map[string]netv1.NetworkPolicy{},
|
||
|
|
namespaces: map[string]Namespace{},
|
||
|
|
peerPods: map[string]PeerPod{},
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// OnChange registers a callback fired (synchronously, inside the informer
|
||
|
|
// event handler) whenever any watched object changes. The reconciler
|
||
|
|
// uses this to debounce policy reloads.
|
||
|
|
func (w *World) OnChange(f func()) {
|
||
|
|
w.mu.Lock()
|
||
|
|
defer w.mu.Unlock()
|
||
|
|
w.onChange = append(w.onChange, f)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (w *World) fireChange() {
|
||
|
|
w.mu.RLock()
|
||
|
|
cbs := append([]func(){}, w.onChange...)
|
||
|
|
w.mu.RUnlock()
|
||
|
|
for _, f := range cbs {
|
||
|
|
f()
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// Start launches three informers (NetworkPolicy, Namespace, Pod) against
|
||
|
|
// the cluster API. It blocks until each cache reports synced. The caller
|
||
|
|
// is responsible for cancelling ctx on shutdown.
|
||
|
|
func (w *World) Start(ctx context.Context, cfg *rest.Config) error {
|
||
|
|
cs, err := kubernetes.NewForConfig(cfg)
|
||
|
|
if err != nil {
|
||
|
|
return fmt.Errorf("kubernetes client: %w", err)
|
||
|
|
}
|
||
|
|
factory := informers.NewSharedInformerFactory(cs, 10*time.Minute)
|
||
|
|
|
||
|
|
npInformer := factory.Networking().V1().NetworkPolicies().Informer()
|
||
|
|
nsInformer := factory.Core().V1().Namespaces().Informer()
|
||
|
|
podInformer := factory.Core().V1().Pods().Informer()
|
||
|
|
|
||
|
|
if _, err := npInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||
|
|
AddFunc: func(obj interface{}) { w.onPolicy(obj, false) },
|
||
|
|
UpdateFunc: func(_, n interface{}) { w.onPolicy(n, false) },
|
||
|
|
DeleteFunc: func(obj interface{}) { w.onPolicy(obj, true) },
|
||
|
|
}); err != nil {
|
||
|
|
return fmt.Errorf("add netpol handler: %w", err)
|
||
|
|
}
|
||
|
|
if _, err := nsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||
|
|
AddFunc: func(obj interface{}) { w.onNamespace(obj, false) },
|
||
|
|
UpdateFunc: func(_, n interface{}) { w.onNamespace(n, false) },
|
||
|
|
DeleteFunc: func(obj interface{}) { w.onNamespace(obj, true) },
|
||
|
|
}); err != nil {
|
||
|
|
return fmt.Errorf("add ns handler: %w", err)
|
||
|
|
}
|
||
|
|
if _, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||
|
|
AddFunc: func(obj interface{}) { w.onPod(obj, false) },
|
||
|
|
UpdateFunc: func(_, n interface{}) { w.onPod(n, false) },
|
||
|
|
DeleteFunc: func(obj interface{}) { w.onPod(obj, true) },
|
||
|
|
}); err != nil {
|
||
|
|
return fmt.Errorf("add pod handler: %w", err)
|
||
|
|
}
|
||
|
|
|
||
|
|
w.logger.Info("netpol informers starting")
|
||
|
|
factory.Start(ctx.Done())
|
||
|
|
if !cache.WaitForCacheSync(ctx.Done(),
|
||
|
|
npInformer.HasSynced, nsInformer.HasSynced, podInformer.HasSynced) {
|
||
|
|
return fmt.Errorf("netpol informer caches failed to sync")
|
||
|
|
}
|
||
|
|
w.logger.Info("netpol informers synced",
|
||
|
|
"netpols", len(w.snapshotPolicies()),
|
||
|
|
"namespaces", len(w.snapshotNamespaces()),
|
||
|
|
"peer_pods", len(w.snapshotPeerPods()))
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
// unwrapDFSU lifts a DeletedFinalStateUnknown wrapper if present.
|
||
|
|
func unwrapDFSU(obj interface{}) interface{} {
|
||
|
|
if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
|
||
|
|
return d.Obj
|
||
|
|
}
|
||
|
|
return obj
|
||
|
|
}
|
||
|
|
|
||
|
|
func (w *World) onPolicy(obj interface{}, deleted bool) {
|
||
|
|
p, ok := unwrapDFSU(obj).(*netv1.NetworkPolicy)
|
||
|
|
if !ok || p == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
key := p.Namespace + "/" + p.Name
|
||
|
|
w.mu.Lock()
|
||
|
|
if deleted {
|
||
|
|
delete(w.policies, key)
|
||
|
|
} else {
|
||
|
|
w.policies[key] = *p
|
||
|
|
}
|
||
|
|
w.mu.Unlock()
|
||
|
|
w.fireChange()
|
||
|
|
}
|
||
|
|
|
||
|
|
func (w *World) onNamespace(obj interface{}, deleted bool) {
|
||
|
|
ns, ok := unwrapDFSU(obj).(*corev1.Namespace)
|
||
|
|
if !ok || ns == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
w.mu.Lock()
|
||
|
|
if deleted {
|
||
|
|
delete(w.namespaces, ns.Name)
|
||
|
|
} else {
|
||
|
|
w.namespaces[ns.Name] = Namespace{Name: ns.Name, Labels: ns.Labels}
|
||
|
|
}
|
||
|
|
w.mu.Unlock()
|
||
|
|
w.fireChange()
|
||
|
|
}
|
||
|
|
|
||
|
|
func (w *World) onPod(obj interface{}, deleted bool) {
|
||
|
|
pod, ok := unwrapDFSU(obj).(*corev1.Pod)
|
||
|
|
if !ok || pod == nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
key := pod.Namespace + "/" + pod.Name
|
||
|
|
w.mu.Lock()
|
||
|
|
if deleted {
|
||
|
|
delete(w.peerPods, key)
|
||
|
|
} else {
|
||
|
|
w.peerPods[key] = PeerPod{
|
||
|
|
Namespace: pod.Namespace,
|
||
|
|
Name: pod.Name,
|
||
|
|
Labels: pod.Labels,
|
||
|
|
IPs: podIPs(pod),
|
||
|
|
}
|
||
|
|
}
|
||
|
|
w.mu.Unlock()
|
||
|
|
w.fireChange()
|
||
|
|
}
|
||
|
|
|
||
|
|
// podIPs extracts every PodIP from the status. Pods without status (still
|
||
|
|
// scheduling) yield nil — safe for the translator.
|
||
|
|
func podIPs(p *corev1.Pod) []net.IP {
|
||
|
|
out := make([]net.IP, 0, len(p.Status.PodIPs))
|
||
|
|
for _, addr := range p.Status.PodIPs {
|
||
|
|
ip := net.ParseIP(addr.IP)
|
||
|
|
if ip == nil {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
out = append(out, ip)
|
||
|
|
}
|
||
|
|
if len(out) == 0 && p.Status.PodIP != "" {
|
||
|
|
// Older clusters may populate PodIP but not PodIPs; tolerate both.
|
||
|
|
if ip := net.ParseIP(p.Status.PodIP); ip != nil {
|
||
|
|
out = append(out, ip)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return out
|
||
|
|
}
|
||
|
|
|
||
|
|
// snapshotPolicies returns a defensive copy of the policy map's values.
|
||
|
|
func (w *World) snapshotPolicies() []netv1.NetworkPolicy {
|
||
|
|
w.mu.RLock()
|
||
|
|
defer w.mu.RUnlock()
|
||
|
|
out := make([]netv1.NetworkPolicy, 0, len(w.policies))
|
||
|
|
for _, p := range w.policies {
|
||
|
|
out = append(out, p)
|
||
|
|
}
|
||
|
|
return out
|
||
|
|
}
|
||
|
|
|
||
|
|
// snapshotNamespaces returns a defensive copy of the namespace map.
|
||
|
|
func (w *World) snapshotNamespaces() []Namespace {
|
||
|
|
w.mu.RLock()
|
||
|
|
defer w.mu.RUnlock()
|
||
|
|
out := make([]Namespace, 0, len(w.namespaces))
|
||
|
|
for _, n := range w.namespaces {
|
||
|
|
out = append(out, n)
|
||
|
|
}
|
||
|
|
return out
|
||
|
|
}
|
||
|
|
|
||
|
|
// snapshotPeerPods returns a defensive copy of the peer-pod map.
|
||
|
|
func (w *World) snapshotPeerPods() []PeerPod {
|
||
|
|
w.mu.RLock()
|
||
|
|
defer w.mu.RUnlock()
|
||
|
|
out := make([]PeerPod, 0, len(w.peerPods))
|
||
|
|
for _, p := range w.peerPods {
|
||
|
|
out = append(out, p)
|
||
|
|
}
|
||
|
|
return out
|
||
|
|
}
|