Files

116 lines
3.1 KiB
Go
Raw Permalink Normal View History

package netpol
import (
"context"
"log/slog"
"sync"
"time"
)
// LocalPodSource produces the set of local pods (with their HostIface and
// IPs) the reconciler should enforce policy for. The agent's allocation
// store + pod informer is the natural implementer.
//
// The function is called inside the reconciler under no lock, so it must
// be safe for concurrent invocation.
type LocalPodSource func() []Pod
// Reconciler turns the World cache + LocalPodSource into nft rule
// applications. One reconcile pass:
//
// pods + policies + namespaces → Translate → Render → Apply
//
// The pass runs on:
//
// - World.OnChange (any informer event), debounced through a single
// coalescing channel,
// - a periodic tick (default 30s) so we self-heal if the kernel
// ruleset diverges from desired (e.g. someone manually `nft flush`d),
// - and explicit Trigger() calls (the agent fires this from CNI ADD /
// DEL hooks so policy lands before pod traffic flows).
type Reconciler struct {
World *World
Local LocalPodSource
Applier *Applier
Logger *slog.Logger
Interval time.Duration
mu sync.Mutex
trigger chan struct{}
}
// NewReconciler returns a Reconciler ready to Run. Interval defaults to
// 30s if zero.
func NewReconciler(world *World, local LocalPodSource, applier *Applier, logger *slog.Logger) *Reconciler {
r := &Reconciler{
World: world,
Local: local,
Applier: applier,
Logger: logger,
Interval: 30 * time.Second,
trigger: make(chan struct{}, 1),
}
world.OnChange(r.Trigger)
return r
}
// Trigger requests one reconcile pass. Coalesces — if a pass is already
// pending, the call is a no-op.
func (r *Reconciler) Trigger() {
select {
case r.trigger <- struct{}{}:
default:
}
}
// Run blocks until ctx is cancelled. Reconciles on Trigger or every
// Interval; calls Applier.Clear on shutdown.
func (r *Reconciler) Run(ctx context.Context) {
t := time.NewTicker(r.Interval)
defer t.Stop()
r.reconcile(ctx) // initial pass
for {
select {
case <-ctx.Done():
// Best-effort: drop our table on graceful exit. If the agent
// crashed without doing this, the next agent's first apply
// will replace the stale table atomically anyway.
_ = r.Applier.Clear(context.Background())
return
case <-t.C:
r.reconcile(ctx)
case <-r.trigger:
r.reconcile(ctx)
}
}
}
func (r *Reconciler) reconcile(ctx context.Context) {
r.mu.Lock()
defer r.mu.Unlock()
in := Inputs{
LocalPods: r.Local(),
PeerPods: r.World.snapshotPeerPods(),
Namespaces: r.World.snapshotNamespaces(),
Policies: r.World.snapshotPolicies(),
}
out, err := Translate(in, func(s string) { r.Logger.Warn(s) })
if err != nil {
r.Logger.Warn("netpol translate failed", "err", err)
return
}
script := Render(out)
if err := r.Applier.Apply(ctx, script); err != nil {
r.Logger.Warn("netpol apply failed", "err", err)
return
}
if len(out.Isolated) > 0 {
r.Logger.Info("netpol applied",
"isolated_chains", len(out.Isolated),
"rules", len(out.Rules),
"local_pods", len(in.LocalPods),
"policies", len(in.Policies))
}
}