M2 plumbing: CNI ↔ agent JSON RPC over unix socket
Build flock Image / build (push) Has been cancelled
Build flock Image / build (push) Has been cancelled
Locks the wire format between /opt/cni/bin/flock and flock-agent. ADD returns a CNI Result, DEL returns success/error, CHECK returns success/error. Connection-per-RPC, newline-delimited JSON. - pkg/cni/rpc.go: shared Op + Request + Response + framed encode/decode. - pkg/cni/rpc_client.go: net.Dial + EncodeRequest + DecodeResponse; rpcSocket overridable for tests. - pkg/cni/plugin.go: real implementations of CmdAdd/Del/Check that call through, mapping agent errors to types.Error. - pkg/agent/rpc.go: rpcServer with swappable AddHandler/DelHandler/ CheckHandler (defaults: not-implemented for ADD; idempotent-no-op for DEL/CHECK so kubelet teardown of a never-ADDed pod doesn't fail). - pkg/agent/server.go: replaces the M1 accept-and-close placeholder with rpcServer.serve(ctx, listener); listener closes on ctx cancel. Tests cover: Request/Response JSON roundtrip, end-to-end client → unix-socket → fake server, agent error → CNI types.Error mapping. ADD remains "not implemented" until netlink + IPAM wire-up — the agent returns an error and kubelet will fail pod sandbox creation IF a node were configured to use this CNI. host001's CNI plane is still 100% Calico, so this changes nothing observable on the cluster. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,135 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
flockcni "code.fritzlab.net/fritzlab/flock/pkg/cni"
|
||||
current "github.com/containernetworking/cni/pkg/types/100"
|
||||
)
|
||||
|
||||
// AddHandler is the per-pod ADD logic — IPAM allocate, netns program,
|
||||
// state commit. M2 wires this to a real implementation; M1.5 returns a
|
||||
// "not implemented" error so the plugin path is exercised end-to-end
|
||||
// without doing anything network-shaped.
|
||||
type AddHandler func(ctx context.Context, req flockcni.Request) (*current.Result, error)
|
||||
|
||||
// DelHandler is the per-pod DEL logic. CNI requires idempotency.
|
||||
type DelHandler func(ctx context.Context, req flockcni.Request) error
|
||||
|
||||
// CheckHandler verifies that the live netns matches the persisted state.
|
||||
type CheckHandler func(ctx context.Context, req flockcni.Request) error
|
||||
|
||||
// rpcServer dispatches one RPC per accepted connection. Concurrent
|
||||
// connections are dispatched in their own goroutines; each handler is
|
||||
// called serially per-pod (the agent's IPAM/Store mutex enforces this
|
||||
// further down the stack).
|
||||
type rpcServer struct {
|
||||
logger *slog.Logger
|
||||
|
||||
mu sync.RWMutex
|
||||
add AddHandler
|
||||
del DelHandler
|
||||
check CheckHandler
|
||||
}
|
||||
|
||||
func newRPCServer(logger *slog.Logger) *rpcServer {
|
||||
r := &rpcServer{logger: logger}
|
||||
r.add = notImplementedAdd
|
||||
r.del = notImplementedDel
|
||||
r.check = notImplementedCheck
|
||||
return r
|
||||
}
|
||||
|
||||
// SetHandlers atomically replaces the ADD/DEL/CHECK handlers. Used at
|
||||
// startup once IPAM, Store, and netlink are wired together.
|
||||
func (r *rpcServer) SetHandlers(add AddHandler, del DelHandler, check CheckHandler) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
if add != nil {
|
||||
r.add = add
|
||||
}
|
||||
if del != nil {
|
||||
r.del = del
|
||||
}
|
||||
if check != nil {
|
||||
r.check = check
|
||||
}
|
||||
}
|
||||
|
||||
func (r *rpcServer) handlers() (AddHandler, DelHandler, CheckHandler) {
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
return r.add, r.del, r.check
|
||||
}
|
||||
|
||||
// serve takes ownership of l, accepting connections until ctx is cancelled
|
||||
// or the listener errors.
|
||||
func (r *rpcServer) serve(ctx context.Context, l net.Listener) {
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
_ = l.Close()
|
||||
}()
|
||||
for {
|
||||
conn, err := l.Accept()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
go r.handleConn(ctx, conn)
|
||||
}
|
||||
}
|
||||
|
||||
// handleConn reads exactly one Request, dispatches, and writes exactly one
|
||||
// Response. Connection-per-RPC matches the kubelet → plugin model where
|
||||
// each CNI invocation is a fresh process.
|
||||
func (r *rpcServer) handleConn(ctx context.Context, conn net.Conn) {
|
||||
defer conn.Close()
|
||||
req, err := flockcni.DecodeRequest(conn)
|
||||
if err != nil {
|
||||
r.logger.Warn("rpc decode", "err", err)
|
||||
_ = flockcni.EncodeResponse(conn, flockcni.Response{Error: fmt.Sprintf("decode: %v", err)})
|
||||
return
|
||||
}
|
||||
|
||||
addH, delH, checkH := r.handlers()
|
||||
|
||||
var resp flockcni.Response
|
||||
switch req.Op {
|
||||
case flockcni.OpAdd:
|
||||
result, err := addH(ctx, req)
|
||||
if err != nil {
|
||||
resp.Error = err.Error()
|
||||
} else {
|
||||
resp.Result = result
|
||||
}
|
||||
case flockcni.OpDel:
|
||||
if err := delH(ctx, req); err != nil {
|
||||
resp.Error = err.Error()
|
||||
}
|
||||
case flockcni.OpCheck:
|
||||
if err := checkH(ctx, req); err != nil {
|
||||
resp.Error = err.Error()
|
||||
}
|
||||
default:
|
||||
resp.Error = fmt.Sprintf("unknown op %q", req.Op)
|
||||
}
|
||||
|
||||
r.logger.Info("rpc",
|
||||
"op", req.Op,
|
||||
"container", req.ContainerID,
|
||||
"netns", req.Netns,
|
||||
"err", resp.Error)
|
||||
|
||||
if err := flockcni.EncodeResponse(conn, resp); err != nil {
|
||||
r.logger.Warn("rpc encode", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
func notImplementedAdd(_ context.Context, _ flockcni.Request) (*current.Result, error) {
|
||||
return nil, fmt.Errorf("flock M2 not yet wired: ADD path returns no result")
|
||||
}
|
||||
func notImplementedDel(_ context.Context, _ flockcni.Request) error { return nil } // idempotent: pretend success
|
||||
func notImplementedCheck(_ context.Context, _ flockcni.Request) error { return nil }
|
||||
+7
-12
@@ -18,12 +18,14 @@ import (
|
||||
const SocketPath = "/run/flock/flock.sock"
|
||||
|
||||
// Server is the agent's runtime container: state store, kubernetes informers,
|
||||
// netlink, BIRD, nftables. M1.5 wires the state store, a placeholder unix
|
||||
// listener, and a NodeConfig informer.
|
||||
// netlink, BIRD, nftables. Current state: state store, NodeConfig informer,
|
||||
// RPC dispatcher with stub ADD/DEL/CHECK handlers (will be replaced when
|
||||
// netlink + IPAM wire-up lands).
|
||||
type Server struct {
|
||||
Node string
|
||||
Store *Store
|
||||
NodeConfig *NodeConfigCache
|
||||
RPC *rpcServer
|
||||
Logger *slog.Logger
|
||||
socket string
|
||||
restCfg *rest.Config
|
||||
@@ -69,6 +71,7 @@ func NewServer(cfg Config) (*Server, error) {
|
||||
Node: cfg.Node,
|
||||
Store: store,
|
||||
NodeConfig: &NodeConfigCache{},
|
||||
RPC: newRPCServer(cfg.Logger),
|
||||
Logger: cfg.Logger,
|
||||
socket: cfg.Socket,
|
||||
restCfg: restCfg,
|
||||
@@ -102,16 +105,8 @@ func (s *Server) Run(ctx context.Context) error {
|
||||
"allocations", len(s.Store.Snapshot()),
|
||||
)
|
||||
|
||||
// Accept loop: closes every connection immediately (M2 will dispatch).
|
||||
go func() {
|
||||
for {
|
||||
conn, err := l.Accept()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
_ = conn.Close()
|
||||
}
|
||||
}()
|
||||
// RPC dispatcher takes ownership of the listener.
|
||||
go s.RPC.serve(ctx, l)
|
||||
|
||||
// NodeConfig informer. Any error from the informer terminates Run.
|
||||
errCh := make(chan error, 1)
|
||||
|
||||
Reference in New Issue
Block a user