package agent import ( "context" "fmt" "log/slog" "net" "os" "path/filepath" "time" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" ) // SocketPath is the unix socket on which flock-agent serves RPCs from the // CNI plugin. const SocketPath = "/run/flock/flock.sock" // Server orchestrates the agent runtime: store, informers, IPAM, netns, // BIRD. Run() blocks until ctx is cancelled. type Server struct { Node string Store *Store NodeConfig *NodeConfigCache RPC *rpcServer Logger *slog.Logger socket string restCfg *rest.Config } type Config struct { Node string StatePath string Socket string Logger *slog.Logger Kubeconfig string } func NewServer(cfg Config) (*Server, error) { if cfg.Node == "" { return nil, fmt.Errorf("Node must be set") } if cfg.StatePath == "" { cfg.StatePath = "/var/lib/flock/allocations.json" } if cfg.Socket == "" { cfg.Socket = SocketPath } if cfg.Logger == nil { cfg.Logger = slog.Default() } if err := os.MkdirAll(filepath.Dir(cfg.StatePath), 0o750); err != nil { return nil, fmt.Errorf("mkdir state dir: %w", err) } store, err := NewStore(cfg.StatePath, cfg.Node) if err != nil { return nil, fmt.Errorf("open store: %w", err) } restCfg, err := loadRestConfig(cfg.Kubeconfig) if err != nil { return nil, fmt.Errorf("load kube config: %w", err) } return &Server{ Node: cfg.Node, Store: store, NodeConfig: &NodeConfigCache{}, RPC: newRPCServer(cfg.Logger), Logger: cfg.Logger, socket: cfg.Socket, restCfg: restCfg, }, nil } func loadRestConfig(kubeconfig string) (*rest.Config, error) { if kubeconfig != "" { return clientcmd.BuildConfigFromFlags("", kubeconfig) } return rest.InClusterConfig() } // Run blocks until ctx is cancelled. func (s *Server) Run(ctx context.Context) error { if err := os.MkdirAll(filepath.Dir(s.socket), 0o750); err != nil { return fmt.Errorf("mkdir socket dir: %w", err) } _ = os.Remove(s.socket) l, err := net.Listen("unix", s.socket) if err != nil { return fmt.Errorf("listen %s: %w", s.socket, err) } defer l.Close() s.Logger.Info("flock-agent started", "node", s.Node, "socket", s.socket, "allocations", len(s.Store.Snapshot()), ) // RPC dispatcher takes ownership of the listener. go s.RPC.serve(ctx, l) // NodeConfig informer. errCh := make(chan error, 1) go func() { errCh <- StartNodeConfigInformer(ctx, s.restCfg, s.Node, s.NodeConfig, s.Logger) }() // Pod informer + Handlers + Bird are wired up by configureRuntime, // which is platform-specific (real on Linux, no-op stub elsewhere). go func() { if err := s.configureRuntime(ctx); err != nil { s.Logger.Error("runtime configure failed; ADD will return errors", "err", err) } }() select { case <-ctx.Done(): s.Logger.Info("flock-agent stopping") return nil case err := <-errCh: return fmt.Errorf("informer: %w", err) } } // firstAvailableNodeConfig polls the cache up to `timeout`. Used to wait // for the operator-applied NodeConfig CR before booting the IPAM. func (s *Server) firstAvailableNodeConfig(ctx context.Context, timeout time.Duration) error { deadline := time.Now().Add(timeout) for { if s.NodeConfig.Load() != nil { return nil } if time.Now().After(deadline) { return fmt.Errorf("NodeConfig %q not observed within %s", s.Node, timeout) } select { case <-ctx.Done(): return ctx.Err() case <-time.After(200 * time.Millisecond): } } }