package netpol import ( "context" "log/slog" "sync" "time" ) // LocalPodSource produces the set of local pods (with their HostIface and // IPs) the reconciler should enforce policy for. The agent's allocation // store + pod informer is the natural implementer. // // The function is called inside the reconciler under no lock, so it must // be safe for concurrent invocation. type LocalPodSource func() []Pod // Reconciler turns the World cache + LocalPodSource into nft rule // applications. One reconcile pass: // // pods + policies + namespaces → Translate → Render → Apply // // The pass runs on: // // - World.OnChange (any informer event), debounced through a single // coalescing channel, // - a periodic tick (default 30s) so we self-heal if the kernel // ruleset diverges from desired (e.g. someone manually `nft flush`d), // - and explicit Trigger() calls (the agent fires this from CNI ADD / // DEL hooks so policy lands before pod traffic flows). type Reconciler struct { World *World Local LocalPodSource Applier *Applier Logger *slog.Logger Interval time.Duration mu sync.Mutex trigger chan struct{} } // NewReconciler returns a Reconciler ready to Run. Interval defaults to // 30s if zero. func NewReconciler(world *World, local LocalPodSource, applier *Applier, logger *slog.Logger) *Reconciler { r := &Reconciler{ World: world, Local: local, Applier: applier, Logger: logger, Interval: 30 * time.Second, trigger: make(chan struct{}, 1), } world.OnChange(r.Trigger) return r } // Trigger requests one reconcile pass. Coalesces — if a pass is already // pending, the call is a no-op. func (r *Reconciler) Trigger() { select { case r.trigger <- struct{}{}: default: } } // Run blocks until ctx is cancelled. Reconciles on Trigger or every // Interval; calls Applier.Clear on shutdown. func (r *Reconciler) Run(ctx context.Context) { t := time.NewTicker(r.Interval) defer t.Stop() r.reconcile(ctx) // initial pass for { select { case <-ctx.Done(): // Best-effort: drop our table on graceful exit. If the agent // crashed without doing this, the next agent's first apply // will replace the stale table atomically anyway. _ = r.Applier.Clear(context.Background()) return case <-t.C: r.reconcile(ctx) case <-r.trigger: r.reconcile(ctx) } } } func (r *Reconciler) reconcile(ctx context.Context) { r.mu.Lock() defer r.mu.Unlock() in := Inputs{ LocalPods: r.Local(), PeerPods: r.World.snapshotPeerPods(), Namespaces: r.World.snapshotNamespaces(), Policies: r.World.snapshotPolicies(), } out, err := Translate(in, func(s string) { r.Logger.Warn(s) }) if err != nil { r.Logger.Warn("netpol translate failed", "err", err) return } script := Render(out) if err := r.Applier.Apply(ctx, script); err != nil { r.Logger.Warn("netpol apply failed", "err", err) return } if len(out.Isolated) > 0 { r.Logger.Info("netpol applied", "isolated_chains", len(out.Isolated), "rules", len(out.Rules), "local_pods", len(in.LocalPods), "policies", len(in.Policies)) } }