Files
Donavan Fritz 31fcae2a97
Build flock Image / build (push) Has been cancelled
M2 plumbing: CNI ↔ agent JSON RPC over unix socket
Locks the wire format between /opt/cni/bin/flock and flock-agent. ADD
returns a CNI Result, DEL returns success/error, CHECK returns
success/error. Connection-per-RPC, newline-delimited JSON.

- pkg/cni/rpc.go: shared Op + Request + Response + framed encode/decode.
- pkg/cni/rpc_client.go: net.Dial + EncodeRequest + DecodeResponse;
  rpcSocket overridable for tests.
- pkg/cni/plugin.go: real implementations of CmdAdd/Del/Check that call
  through, mapping agent errors to types.Error.
- pkg/agent/rpc.go: rpcServer with swappable AddHandler/DelHandler/
  CheckHandler (defaults: not-implemented for ADD; idempotent-no-op for
  DEL/CHECK so kubelet teardown of a never-ADDed pod doesn't fail).
- pkg/agent/server.go: replaces the M1 accept-and-close placeholder
  with rpcServer.serve(ctx, listener); listener closes on ctx cancel.

Tests cover: Request/Response JSON roundtrip, end-to-end client →
unix-socket → fake server, agent error → CNI types.Error mapping.

ADD remains "not implemented" until netlink + IPAM wire-up — the agent
returns an error and kubelet will fail pod sandbox creation IF a node
were configured to use this CNI. host001's CNI plane is still 100%
Calico, so this changes nothing observable on the cluster.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
2026-04-24 22:21:33 -05:00

136 lines
3.7 KiB
Go

package agent
import (
"context"
"fmt"
"log/slog"
"net"
"sync"
flockcni "code.fritzlab.net/fritzlab/flock/pkg/cni"
current "github.com/containernetworking/cni/pkg/types/100"
)
// AddHandler is the per-pod ADD logic — IPAM allocate, netns program,
// state commit. M2 wires this to a real implementation; M1.5 returns a
// "not implemented" error so the plugin path is exercised end-to-end
// without doing anything network-shaped.
type AddHandler func(ctx context.Context, req flockcni.Request) (*current.Result, error)
// DelHandler is the per-pod DEL logic. CNI requires idempotency.
type DelHandler func(ctx context.Context, req flockcni.Request) error
// CheckHandler verifies that the live netns matches the persisted state.
type CheckHandler func(ctx context.Context, req flockcni.Request) error
// rpcServer dispatches one RPC per accepted connection. Concurrent
// connections are dispatched in their own goroutines; each handler is
// called serially per-pod (the agent's IPAM/Store mutex enforces this
// further down the stack).
type rpcServer struct {
logger *slog.Logger
mu sync.RWMutex
add AddHandler
del DelHandler
check CheckHandler
}
func newRPCServer(logger *slog.Logger) *rpcServer {
r := &rpcServer{logger: logger}
r.add = notImplementedAdd
r.del = notImplementedDel
r.check = notImplementedCheck
return r
}
// SetHandlers atomically replaces the ADD/DEL/CHECK handlers. Used at
// startup once IPAM, Store, and netlink are wired together.
func (r *rpcServer) SetHandlers(add AddHandler, del DelHandler, check CheckHandler) {
r.mu.Lock()
defer r.mu.Unlock()
if add != nil {
r.add = add
}
if del != nil {
r.del = del
}
if check != nil {
r.check = check
}
}
func (r *rpcServer) handlers() (AddHandler, DelHandler, CheckHandler) {
r.mu.RLock()
defer r.mu.RUnlock()
return r.add, r.del, r.check
}
// serve takes ownership of l, accepting connections until ctx is cancelled
// or the listener errors.
func (r *rpcServer) serve(ctx context.Context, l net.Listener) {
go func() {
<-ctx.Done()
_ = l.Close()
}()
for {
conn, err := l.Accept()
if err != nil {
return
}
go r.handleConn(ctx, conn)
}
}
// handleConn reads exactly one Request, dispatches, and writes exactly one
// Response. Connection-per-RPC matches the kubelet → plugin model where
// each CNI invocation is a fresh process.
func (r *rpcServer) handleConn(ctx context.Context, conn net.Conn) {
defer conn.Close()
req, err := flockcni.DecodeRequest(conn)
if err != nil {
r.logger.Warn("rpc decode", "err", err)
_ = flockcni.EncodeResponse(conn, flockcni.Response{Error: fmt.Sprintf("decode: %v", err)})
return
}
addH, delH, checkH := r.handlers()
var resp flockcni.Response
switch req.Op {
case flockcni.OpAdd:
result, err := addH(ctx, req)
if err != nil {
resp.Error = err.Error()
} else {
resp.Result = result
}
case flockcni.OpDel:
if err := delH(ctx, req); err != nil {
resp.Error = err.Error()
}
case flockcni.OpCheck:
if err := checkH(ctx, req); err != nil {
resp.Error = err.Error()
}
default:
resp.Error = fmt.Sprintf("unknown op %q", req.Op)
}
r.logger.Info("rpc",
"op", req.Op,
"container", req.ContainerID,
"netns", req.Netns,
"err", resp.Error)
if err := flockcni.EncodeResponse(conn, resp); err != nil {
r.logger.Warn("rpc encode", "err", err)
}
}
func notImplementedAdd(_ context.Context, _ flockcni.Request) (*current.Result, error) {
return nil, fmt.Errorf("flock M2 not yet wired: ADD path returns no result")
}
func notImplementedDel(_ context.Context, _ flockcni.Request) error { return nil } // idempotent: pretend success
func notImplementedCheck(_ context.Context, _ flockcni.Request) error { return nil }