Agent now watches nodeconfigs.flock.fritzlab.net via a client-go dynamic informer, filters events to its own node name, and caches the typed NodeConfig in memory (NodeConfigCache, atomic pointer). M2's IPAM will read from that cache. - pkg/agent/nodeconfig.go: informer + JSON-round-trip decode (avoids hand-written DeepCopy + scheme registration for this small a use). - pkg/agent/server.go: starts the informer goroutine; Run terminates if the informer returns. - pkg/api/v1alpha1: switch placeholder TypeMeta/ObjectMeta to metav1. - deploy/rbac: get/list/watch on nodeconfigs. - cmd/flock-agent: --kubeconfig flag for out-of-cluster runs (tests). Satisfies M1 verified-by: "kubectl apply NodeConfig; agent logs read it". Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,138 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
flockv1alpha1 "code.fritzlab.net/fritzlab/flock/pkg/api/v1alpha1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/dynamic/dynamicinformer"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
// nodeConfigGVR is the resource coordinate watched by the agent.
|
||||
var nodeConfigGVR = schema.GroupVersionResource{
|
||||
Group: flockv1alpha1.GroupName,
|
||||
Version: flockv1alpha1.Version,
|
||||
Resource: "nodeconfigs",
|
||||
}
|
||||
|
||||
// NodeConfigCache holds the latest observed NodeConfig for this node, or
|
||||
// nil when none exists. Read with Load(); safe for concurrent use.
|
||||
type NodeConfigCache struct {
|
||||
current atomic.Pointer[flockv1alpha1.NodeConfig]
|
||||
}
|
||||
|
||||
// Load returns the currently-cached NodeConfig (nil if none).
|
||||
func (c *NodeConfigCache) Load() *flockv1alpha1.NodeConfig {
|
||||
return c.current.Load()
|
||||
}
|
||||
|
||||
func (c *NodeConfigCache) store(nc *flockv1alpha1.NodeConfig) {
|
||||
c.current.Store(nc)
|
||||
}
|
||||
|
||||
// StartNodeConfigInformer runs a dynamic informer against the cluster that
|
||||
// watches `nodeconfigs.flock.fritzlab.net` and caches the entry for `node`.
|
||||
// It blocks until ctx is cancelled. All other NodeConfigs are ignored —
|
||||
// each agent only cares about its own.
|
||||
//
|
||||
// The caller (agent.Server) passes in cfg so tests / out-of-cluster runs can
|
||||
// supply a fake; NewServer resolves in-cluster config via rest.InClusterConfig.
|
||||
func StartNodeConfigInformer(
|
||||
ctx context.Context,
|
||||
cfg *rest.Config,
|
||||
node string,
|
||||
cache_ *NodeConfigCache,
|
||||
logger *slog.Logger,
|
||||
) error {
|
||||
dyn, err := dynamic.NewForConfig(cfg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("dynamic client: %w", err)
|
||||
}
|
||||
|
||||
factory := dynamicinformer.NewDynamicSharedInformerFactory(dyn, 10*time.Minute)
|
||||
inf := factory.ForResource(nodeConfigGVR).Informer()
|
||||
|
||||
handle := func(event string, obj interface{}) {
|
||||
u, ok := obj.(*unstructured.Unstructured)
|
||||
if !ok {
|
||||
// Informer hands us DeletedFinalStateUnknown on replays; skip.
|
||||
return
|
||||
}
|
||||
if u.GetName() != node {
|
||||
return
|
||||
}
|
||||
switch event {
|
||||
case "delete":
|
||||
logger.Info("NodeConfig removed", "node", node)
|
||||
cache_.store(nil)
|
||||
return
|
||||
}
|
||||
nc, err := decode(u)
|
||||
if err != nil {
|
||||
logger.Warn("NodeConfig decode failed",
|
||||
"node", node, "event", event, "err", err)
|
||||
return
|
||||
}
|
||||
cache_.store(nc)
|
||||
logger.Info("NodeConfig observed",
|
||||
"event", event,
|
||||
"node", nc.Name,
|
||||
"asn", nc.Spec.BGP.ASN,
|
||||
"cidr6", nc.Spec.CIDR6,
|
||||
"cidr4", nc.Spec.CIDR4,
|
||||
"peers", len(nc.Spec.BGP.Peers))
|
||||
}
|
||||
|
||||
_, err = inf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) { handle("add", obj) },
|
||||
UpdateFunc: func(_, obj interface{}) { handle("update", obj) },
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
// Unwrap DeletedFinalStateUnknown.
|
||||
if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
|
||||
obj = d.Obj
|
||||
}
|
||||
handle("delete", obj)
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("add event handler: %w", err)
|
||||
}
|
||||
|
||||
logger.Info("NodeConfig informer starting", "node", node, "gvr", nodeConfigGVR.String())
|
||||
factory.Start(ctx.Done())
|
||||
if synced := factory.WaitForCacheSync(ctx.Done()); !synced[nodeConfigGVR] {
|
||||
return fmt.Errorf("informer cache failed to sync for %s", nodeConfigGVR)
|
||||
}
|
||||
logger.Info("NodeConfig informer synced", "node", node)
|
||||
|
||||
<-ctx.Done()
|
||||
return nil
|
||||
}
|
||||
|
||||
// decode turns an Unstructured NodeConfig into the typed form. We do this by
|
||||
// round-tripping through JSON, which is fine for the low-frequency updates
|
||||
// here and keeps pkg/api/v1alpha1 free of scheme-registration boilerplate.
|
||||
func decode(u *unstructured.Unstructured) (*flockv1alpha1.NodeConfig, error) {
|
||||
b, err := json.Marshal(u.Object)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var nc flockv1alpha1.NodeConfig
|
||||
if err := json.Unmarshal(b, &nc); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// metav1.ObjectMeta lives on u; copy just the name for clarity.
|
||||
nc.Name = u.GetName()
|
||||
nc.TypeMeta = metav1.TypeMeta{Kind: u.GetKind(), APIVersion: u.GetAPIVersion()}
|
||||
return &nc, nil
|
||||
}
|
||||
+53
-24
@@ -7,6 +7,9 @@ import (
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
)
|
||||
|
||||
// SocketPath is the unix socket on which flock-agent serves RPCs from the
|
||||
@@ -15,22 +18,24 @@ import (
|
||||
const SocketPath = "/run/flock/flock.sock"
|
||||
|
||||
// Server is the agent's runtime container: state store, kubernetes informers,
|
||||
// netlink, BIRD, nftables. M1 wires only the state store and a placeholder
|
||||
// listener so the binary boots and exits cleanly under a context.
|
||||
// netlink, BIRD, nftables. M1.5 wires the state store, a placeholder unix
|
||||
// listener, and a NodeConfig informer.
|
||||
type Server struct {
|
||||
Node string
|
||||
Store *Store
|
||||
Logger *slog.Logger
|
||||
socket string
|
||||
closeCh chan struct{}
|
||||
Node string
|
||||
Store *Store
|
||||
NodeConfig *NodeConfigCache
|
||||
Logger *slog.Logger
|
||||
socket string
|
||||
restCfg *rest.Config
|
||||
}
|
||||
|
||||
// Config configures NewServer.
|
||||
type Config struct {
|
||||
Node string
|
||||
StatePath string // typically /var/lib/flock/allocations.json
|
||||
Socket string // typically /run/flock/flock.sock
|
||||
Logger *slog.Logger
|
||||
Node string
|
||||
StatePath string // typically /var/lib/flock/allocations.json
|
||||
Socket string // typically /run/flock/flock.sock
|
||||
Logger *slog.Logger
|
||||
Kubeconfig string // empty => in-cluster config
|
||||
}
|
||||
|
||||
// NewServer constructs a Server. It does NOT start any goroutines; call Run.
|
||||
@@ -54,18 +59,32 @@ func NewServer(cfg Config) (*Server, error) {
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("open store: %w", err)
|
||||
}
|
||||
|
||||
restCfg, err := loadRestConfig(cfg.Kubeconfig)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("load kube config: %w", err)
|
||||
}
|
||||
|
||||
return &Server{
|
||||
Node: cfg.Node,
|
||||
Store: store,
|
||||
Logger: cfg.Logger,
|
||||
socket: cfg.Socket,
|
||||
closeCh: make(chan struct{}),
|
||||
Node: cfg.Node,
|
||||
Store: store,
|
||||
NodeConfig: &NodeConfigCache{},
|
||||
Logger: cfg.Logger,
|
||||
socket: cfg.Socket,
|
||||
restCfg: restCfg,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Run starts the agent and blocks until ctx is cancelled. M1 only opens the
|
||||
// unix listener (proving permissions/path); the RPC handler is a no-op
|
||||
// returning ENOSYS until M2.
|
||||
func loadRestConfig(kubeconfig string) (*rest.Config, error) {
|
||||
if kubeconfig != "" {
|
||||
return clientcmd.BuildConfigFromFlags("", kubeconfig)
|
||||
}
|
||||
return rest.InClusterConfig()
|
||||
}
|
||||
|
||||
// Run starts the agent and blocks until ctx is cancelled. M1.5 opens the
|
||||
// unix listener, starts the NodeConfig informer, and waits. The RPC handler
|
||||
// is still a no-op until M2.
|
||||
func (s *Server) Run(ctx context.Context) error {
|
||||
if err := os.MkdirAll(filepath.Dir(s.socket), 0o750); err != nil {
|
||||
return fmt.Errorf("mkdir socket dir: %w", err)
|
||||
@@ -83,18 +102,28 @@ func (s *Server) Run(ctx context.Context) error {
|
||||
"allocations", len(s.Store.Snapshot()),
|
||||
)
|
||||
|
||||
// Accept loop: M1 closes every accepted conn immediately. M2 will dispatch.
|
||||
// Accept loop: closes every connection immediately (M2 will dispatch).
|
||||
go func() {
|
||||
for {
|
||||
conn, err := l.Accept()
|
||||
if err != nil {
|
||||
return // listener closed
|
||||
return
|
||||
}
|
||||
_ = conn.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
<-ctx.Done()
|
||||
s.Logger.Info("flock-agent stopping")
|
||||
return nil
|
||||
// NodeConfig informer. Any error from the informer terminates Run.
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
errCh <- StartNodeConfigInformer(ctx, s.restCfg, s.Node, s.NodeConfig, s.Logger)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
s.Logger.Info("flock-agent stopping")
|
||||
return nil
|
||||
case err := <-errCh:
|
||||
return fmt.Errorf("informer: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user