package agent import ( "context" "encoding/json" "fmt" "log/slog" "sync/atomic" "time" flockv1alpha1 "code.fritzlab.net/fritzlab/flock/pkg/api/v1alpha1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" ) // nodeConfigGVR is the resource coordinate watched by the agent. var nodeConfigGVR = schema.GroupVersionResource{ Group: flockv1alpha1.GroupName, Version: flockv1alpha1.Version, Resource: "nodeconfigs", } // NodeConfigCache holds the latest observed NodeConfig for this node, or // nil when none exists. Read with Load(); safe for concurrent use. type NodeConfigCache struct { current atomic.Pointer[flockv1alpha1.NodeConfig] } // Load returns the currently-cached NodeConfig (nil if none). func (c *NodeConfigCache) Load() *flockv1alpha1.NodeConfig { return c.current.Load() } func (c *NodeConfigCache) store(nc *flockv1alpha1.NodeConfig) { c.current.Store(nc) } // StartNodeConfigInformer runs a dynamic informer against the cluster that // watches `nodeconfigs.flock.fritzlab.net` and caches the entry for `node`. // It blocks until ctx is cancelled. All other NodeConfigs are ignored — // each agent only cares about its own. // // The caller (agent.Server) passes in cfg so tests / out-of-cluster runs can // supply a fake; NewServer resolves in-cluster config via rest.InClusterConfig. func StartNodeConfigInformer( ctx context.Context, cfg *rest.Config, node string, cache_ *NodeConfigCache, logger *slog.Logger, ) error { dyn, err := dynamic.NewForConfig(cfg) if err != nil { return fmt.Errorf("dynamic client: %w", err) } factory := dynamicinformer.NewDynamicSharedInformerFactory(dyn, 10*time.Minute) inf := factory.ForResource(nodeConfigGVR).Informer() handle := func(event string, obj interface{}) { u, ok := obj.(*unstructured.Unstructured) if !ok { // Informer hands us DeletedFinalStateUnknown on replays; skip. return } if u.GetName() != node { return } switch event { case "delete": logger.Info("NodeConfig removed", "node", node) cache_.store(nil) return } nc, err := decode(u) if err != nil { logger.Warn("NodeConfig decode failed", "node", node, "event", event, "err", err) return } cache_.store(nc) logger.Info("NodeConfig observed", "event", event, "node", nc.Name, "asn", nc.Spec.BGP.ASN, "cidr6", nc.Spec.CIDR6, "cidr4", nc.Spec.CIDR4, "peers", len(nc.Spec.BGP.Peers)) } _, err = inf.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { handle("add", obj) }, UpdateFunc: func(_, obj interface{}) { handle("update", obj) }, DeleteFunc: func(obj interface{}) { // Unwrap DeletedFinalStateUnknown. if d, ok := obj.(cache.DeletedFinalStateUnknown); ok { obj = d.Obj } handle("delete", obj) }, }) if err != nil { return fmt.Errorf("add event handler: %w", err) } logger.Info("NodeConfig informer starting", "node", node, "gvr", nodeConfigGVR.String()) factory.Start(ctx.Done()) if synced := factory.WaitForCacheSync(ctx.Done()); !synced[nodeConfigGVR] { return fmt.Errorf("informer cache failed to sync for %s", nodeConfigGVR) } logger.Info("NodeConfig informer synced", "node", node) <-ctx.Done() return nil } // decode turns an Unstructured NodeConfig into the typed form. We do this by // round-tripping through JSON, which is fine for the low-frequency updates // here and keeps pkg/api/v1alpha1 free of scheme-registration boilerplate. func decode(u *unstructured.Unstructured) (*flockv1alpha1.NodeConfig, error) { b, err := json.Marshal(u.Object) if err != nil { return nil, err } var nc flockv1alpha1.NodeConfig if err := json.Unmarshal(b, &nc); err != nil { return nil, err } // metav1.ObjectMeta lives on u; copy just the name for clarity. nc.Name = u.GetName() nc.TypeMeta = metav1.TypeMeta{Kind: u.GetKind(), APIVersion: u.GetAPIVersion()} return &nc, nil }