package agent import ( "context" "fmt" "log/slog" "net" "os" "path/filepath" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" ) // SocketPath is the unix socket on which flock-agent serves RPCs from the // CNI plugin. Mirrors pkg/cni.SocketPath; kept as a separate constant so the // agent package has no import-cycle on the CNI package. const SocketPath = "/run/flock/flock.sock" // Server is the agent's runtime container: state store, kubernetes informers, // netlink, BIRD, nftables. M1.5 wires the state store, a placeholder unix // listener, and a NodeConfig informer. type Server struct { Node string Store *Store NodeConfig *NodeConfigCache Logger *slog.Logger socket string restCfg *rest.Config } // Config configures NewServer. type Config struct { Node string StatePath string // typically /var/lib/flock/allocations.json Socket string // typically /run/flock/flock.sock Logger *slog.Logger Kubeconfig string // empty => in-cluster config } // NewServer constructs a Server. It does NOT start any goroutines; call Run. func NewServer(cfg Config) (*Server, error) { if cfg.Node == "" { return nil, fmt.Errorf("Node must be set") } if cfg.StatePath == "" { cfg.StatePath = "/var/lib/flock/allocations.json" } if cfg.Socket == "" { cfg.Socket = SocketPath } if cfg.Logger == nil { cfg.Logger = slog.Default() } if err := os.MkdirAll(filepath.Dir(cfg.StatePath), 0o750); err != nil { return nil, fmt.Errorf("mkdir state dir: %w", err) } store, err := NewStore(cfg.StatePath, cfg.Node) if err != nil { return nil, fmt.Errorf("open store: %w", err) } restCfg, err := loadRestConfig(cfg.Kubeconfig) if err != nil { return nil, fmt.Errorf("load kube config: %w", err) } return &Server{ Node: cfg.Node, Store: store, NodeConfig: &NodeConfigCache{}, Logger: cfg.Logger, socket: cfg.Socket, restCfg: restCfg, }, nil } func loadRestConfig(kubeconfig string) (*rest.Config, error) { if kubeconfig != "" { return clientcmd.BuildConfigFromFlags("", kubeconfig) } return rest.InClusterConfig() } // Run starts the agent and blocks until ctx is cancelled. M1.5 opens the // unix listener, starts the NodeConfig informer, and waits. The RPC handler // is still a no-op until M2. func (s *Server) Run(ctx context.Context) error { if err := os.MkdirAll(filepath.Dir(s.socket), 0o750); err != nil { return fmt.Errorf("mkdir socket dir: %w", err) } _ = os.Remove(s.socket) l, err := net.Listen("unix", s.socket) if err != nil { return fmt.Errorf("listen %s: %w", s.socket, err) } defer l.Close() s.Logger.Info("flock-agent started", "node", s.Node, "socket", s.socket, "allocations", len(s.Store.Snapshot()), ) // Accept loop: closes every connection immediately (M2 will dispatch). go func() { for { conn, err := l.Accept() if err != nil { return } _ = conn.Close() } }() // NodeConfig informer. Any error from the informer terminates Run. errCh := make(chan error, 1) go func() { errCh <- StartNodeConfigInformer(ctx, s.restCfg, s.Node, s.NodeConfig, s.Logger) }() select { case <-ctx.Done(): s.Logger.Info("flock-agent stopping") return nil case err := <-errCh: return fmt.Errorf("informer: %w", err) } }