759ed21b37
Build flock Image / build (push) Has been cancelled
Agent now watches nodeconfigs.flock.fritzlab.net via a client-go dynamic informer, filters events to its own node name, and caches the typed NodeConfig in memory (NodeConfigCache, atomic pointer). M2's IPAM will read from that cache. - pkg/agent/nodeconfig.go: informer + JSON-round-trip decode (avoids hand-written DeepCopy + scheme registration for this small a use). - pkg/agent/server.go: starts the informer goroutine; Run terminates if the informer returns. - pkg/api/v1alpha1: switch placeholder TypeMeta/ObjectMeta to metav1. - deploy/rbac: get/list/watch on nodeconfigs. - cmd/flock-agent: --kubeconfig flag for out-of-cluster runs (tests). Satisfies M1 verified-by: "kubectl apply NodeConfig; agent logs read it". Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
139 lines
4.0 KiB
Go
139 lines
4.0 KiB
Go
package agent
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
flockv1alpha1 "code.fritzlab.net/fritzlab/flock/pkg/api/v1alpha1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
"k8s.io/client-go/dynamic"
|
|
"k8s.io/client-go/dynamic/dynamicinformer"
|
|
"k8s.io/client-go/rest"
|
|
"k8s.io/client-go/tools/cache"
|
|
)
|
|
|
|
// nodeConfigGVR is the resource coordinate watched by the agent.
|
|
var nodeConfigGVR = schema.GroupVersionResource{
|
|
Group: flockv1alpha1.GroupName,
|
|
Version: flockv1alpha1.Version,
|
|
Resource: "nodeconfigs",
|
|
}
|
|
|
|
// NodeConfigCache holds the latest observed NodeConfig for this node, or
|
|
// nil when none exists. Read with Load(); safe for concurrent use.
|
|
type NodeConfigCache struct {
|
|
current atomic.Pointer[flockv1alpha1.NodeConfig]
|
|
}
|
|
|
|
// Load returns the currently-cached NodeConfig (nil if none).
|
|
func (c *NodeConfigCache) Load() *flockv1alpha1.NodeConfig {
|
|
return c.current.Load()
|
|
}
|
|
|
|
func (c *NodeConfigCache) store(nc *flockv1alpha1.NodeConfig) {
|
|
c.current.Store(nc)
|
|
}
|
|
|
|
// StartNodeConfigInformer runs a dynamic informer against the cluster that
|
|
// watches `nodeconfigs.flock.fritzlab.net` and caches the entry for `node`.
|
|
// It blocks until ctx is cancelled. All other NodeConfigs are ignored —
|
|
// each agent only cares about its own.
|
|
//
|
|
// The caller (agent.Server) passes in cfg so tests / out-of-cluster runs can
|
|
// supply a fake; NewServer resolves in-cluster config via rest.InClusterConfig.
|
|
func StartNodeConfigInformer(
|
|
ctx context.Context,
|
|
cfg *rest.Config,
|
|
node string,
|
|
cache_ *NodeConfigCache,
|
|
logger *slog.Logger,
|
|
) error {
|
|
dyn, err := dynamic.NewForConfig(cfg)
|
|
if err != nil {
|
|
return fmt.Errorf("dynamic client: %w", err)
|
|
}
|
|
|
|
factory := dynamicinformer.NewDynamicSharedInformerFactory(dyn, 10*time.Minute)
|
|
inf := factory.ForResource(nodeConfigGVR).Informer()
|
|
|
|
handle := func(event string, obj interface{}) {
|
|
u, ok := obj.(*unstructured.Unstructured)
|
|
if !ok {
|
|
// Informer hands us DeletedFinalStateUnknown on replays; skip.
|
|
return
|
|
}
|
|
if u.GetName() != node {
|
|
return
|
|
}
|
|
switch event {
|
|
case "delete":
|
|
logger.Info("NodeConfig removed", "node", node)
|
|
cache_.store(nil)
|
|
return
|
|
}
|
|
nc, err := decode(u)
|
|
if err != nil {
|
|
logger.Warn("NodeConfig decode failed",
|
|
"node", node, "event", event, "err", err)
|
|
return
|
|
}
|
|
cache_.store(nc)
|
|
logger.Info("NodeConfig observed",
|
|
"event", event,
|
|
"node", nc.Name,
|
|
"asn", nc.Spec.BGP.ASN,
|
|
"cidr6", nc.Spec.CIDR6,
|
|
"cidr4", nc.Spec.CIDR4,
|
|
"peers", len(nc.Spec.BGP.Peers))
|
|
}
|
|
|
|
_, err = inf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
AddFunc: func(obj interface{}) { handle("add", obj) },
|
|
UpdateFunc: func(_, obj interface{}) { handle("update", obj) },
|
|
DeleteFunc: func(obj interface{}) {
|
|
// Unwrap DeletedFinalStateUnknown.
|
|
if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
|
|
obj = d.Obj
|
|
}
|
|
handle("delete", obj)
|
|
},
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("add event handler: %w", err)
|
|
}
|
|
|
|
logger.Info("NodeConfig informer starting", "node", node, "gvr", nodeConfigGVR.String())
|
|
factory.Start(ctx.Done())
|
|
if synced := factory.WaitForCacheSync(ctx.Done()); !synced[nodeConfigGVR] {
|
|
return fmt.Errorf("informer cache failed to sync for %s", nodeConfigGVR)
|
|
}
|
|
logger.Info("NodeConfig informer synced", "node", node)
|
|
|
|
<-ctx.Done()
|
|
return nil
|
|
}
|
|
|
|
// decode turns an Unstructured NodeConfig into the typed form. We do this by
|
|
// round-tripping through JSON, which is fine for the low-frequency updates
|
|
// here and keeps pkg/api/v1alpha1 free of scheme-registration boilerplate.
|
|
func decode(u *unstructured.Unstructured) (*flockv1alpha1.NodeConfig, error) {
|
|
b, err := json.Marshal(u.Object)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var nc flockv1alpha1.NodeConfig
|
|
if err := json.Unmarshal(b, &nc); err != nil {
|
|
return nil, err
|
|
}
|
|
// metav1.ObjectMeta lives on u; copy just the name for clarity.
|
|
nc.Name = u.GetName()
|
|
nc.TypeMeta = metav1.TypeMeta{Kind: u.GetKind(), APIVersion: u.GetAPIVersion()}
|
|
return &nc, nil
|
|
}
|