116 lines
3.1 KiB
Go
116 lines
3.1 KiB
Go
|
|
package netpol
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"log/slog"
|
||
|
|
"sync"
|
||
|
|
"time"
|
||
|
|
)
|
||
|
|
|
||
|
|
// LocalPodSource produces the set of local pods (with their HostIface and
|
||
|
|
// IPs) the reconciler should enforce policy for. The agent's allocation
|
||
|
|
// store + pod informer is the natural implementer.
|
||
|
|
//
|
||
|
|
// The function is called inside the reconciler under no lock, so it must
|
||
|
|
// be safe for concurrent invocation.
|
||
|
|
type LocalPodSource func() []Pod
|
||
|
|
|
||
|
|
// Reconciler turns the World cache + LocalPodSource into nft rule
|
||
|
|
// applications. One reconcile pass:
|
||
|
|
//
|
||
|
|
// pods + policies + namespaces → Translate → Render → Apply
|
||
|
|
//
|
||
|
|
// The pass runs on:
|
||
|
|
//
|
||
|
|
// - World.OnChange (any informer event), debounced through a single
|
||
|
|
// coalescing channel,
|
||
|
|
// - a periodic tick (default 30s) so we self-heal if the kernel
|
||
|
|
// ruleset diverges from desired (e.g. someone manually `nft flush`d),
|
||
|
|
// - and explicit Trigger() calls (the agent fires this from CNI ADD /
|
||
|
|
// DEL hooks so policy lands before pod traffic flows).
|
||
|
|
type Reconciler struct {
|
||
|
|
World *World
|
||
|
|
Local LocalPodSource
|
||
|
|
Applier *Applier
|
||
|
|
Logger *slog.Logger
|
||
|
|
Interval time.Duration
|
||
|
|
|
||
|
|
mu sync.Mutex
|
||
|
|
trigger chan struct{}
|
||
|
|
}
|
||
|
|
|
||
|
|
// NewReconciler returns a Reconciler ready to Run. Interval defaults to
|
||
|
|
// 30s if zero.
|
||
|
|
func NewReconciler(world *World, local LocalPodSource, applier *Applier, logger *slog.Logger) *Reconciler {
|
||
|
|
r := &Reconciler{
|
||
|
|
World: world,
|
||
|
|
Local: local,
|
||
|
|
Applier: applier,
|
||
|
|
Logger: logger,
|
||
|
|
Interval: 30 * time.Second,
|
||
|
|
trigger: make(chan struct{}, 1),
|
||
|
|
}
|
||
|
|
world.OnChange(r.Trigger)
|
||
|
|
return r
|
||
|
|
}
|
||
|
|
|
||
|
|
// Trigger requests one reconcile pass. Coalesces — if a pass is already
|
||
|
|
// pending, the call is a no-op.
|
||
|
|
func (r *Reconciler) Trigger() {
|
||
|
|
select {
|
||
|
|
case r.trigger <- struct{}{}:
|
||
|
|
default:
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// Run blocks until ctx is cancelled. Reconciles on Trigger or every
|
||
|
|
// Interval; calls Applier.Clear on shutdown.
|
||
|
|
func (r *Reconciler) Run(ctx context.Context) {
|
||
|
|
t := time.NewTicker(r.Interval)
|
||
|
|
defer t.Stop()
|
||
|
|
r.reconcile(ctx) // initial pass
|
||
|
|
for {
|
||
|
|
select {
|
||
|
|
case <-ctx.Done():
|
||
|
|
// Best-effort: drop our table on graceful exit. If the agent
|
||
|
|
// crashed without doing this, the next agent's first apply
|
||
|
|
// will replace the stale table atomically anyway.
|
||
|
|
_ = r.Applier.Clear(context.Background())
|
||
|
|
return
|
||
|
|
case <-t.C:
|
||
|
|
r.reconcile(ctx)
|
||
|
|
case <-r.trigger:
|
||
|
|
r.reconcile(ctx)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (r *Reconciler) reconcile(ctx context.Context) {
|
||
|
|
r.mu.Lock()
|
||
|
|
defer r.mu.Unlock()
|
||
|
|
|
||
|
|
in := Inputs{
|
||
|
|
LocalPods: r.Local(),
|
||
|
|
PeerPods: r.World.snapshotPeerPods(),
|
||
|
|
Namespaces: r.World.snapshotNamespaces(),
|
||
|
|
Policies: r.World.snapshotPolicies(),
|
||
|
|
}
|
||
|
|
out, err := Translate(in, func(s string) { r.Logger.Warn(s) })
|
||
|
|
if err != nil {
|
||
|
|
r.Logger.Warn("netpol translate failed", "err", err)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
script := Render(out)
|
||
|
|
if err := r.Applier.Apply(ctx, script); err != nil {
|
||
|
|
r.Logger.Warn("netpol apply failed", "err", err)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
if len(out.Isolated) > 0 {
|
||
|
|
r.Logger.Info("netpol applied",
|
||
|
|
"isolated_chains", len(out.Isolated),
|
||
|
|
"rules", len(out.Rules),
|
||
|
|
"local_pods", len(in.LocalPods),
|
||
|
|
"policies", len(in.Policies))
|
||
|
|
}
|
||
|
|
}
|