diff --git a/pkg/agent/rpc.go b/pkg/agent/rpc.go new file mode 100644 index 0000000..09b580c --- /dev/null +++ b/pkg/agent/rpc.go @@ -0,0 +1,135 @@ +package agent + +import ( + "context" + "fmt" + "log/slog" + "net" + "sync" + + flockcni "code.fritzlab.net/fritzlab/flock/pkg/cni" + current "github.com/containernetworking/cni/pkg/types/100" +) + +// AddHandler is the per-pod ADD logic — IPAM allocate, netns program, +// state commit. M2 wires this to a real implementation; M1.5 returns a +// "not implemented" error so the plugin path is exercised end-to-end +// without doing anything network-shaped. +type AddHandler func(ctx context.Context, req flockcni.Request) (*current.Result, error) + +// DelHandler is the per-pod DEL logic. CNI requires idempotency. +type DelHandler func(ctx context.Context, req flockcni.Request) error + +// CheckHandler verifies that the live netns matches the persisted state. +type CheckHandler func(ctx context.Context, req flockcni.Request) error + +// rpcServer dispatches one RPC per accepted connection. Concurrent +// connections are dispatched in their own goroutines; each handler is +// called serially per-pod (the agent's IPAM/Store mutex enforces this +// further down the stack). +type rpcServer struct { + logger *slog.Logger + + mu sync.RWMutex + add AddHandler + del DelHandler + check CheckHandler +} + +func newRPCServer(logger *slog.Logger) *rpcServer { + r := &rpcServer{logger: logger} + r.add = notImplementedAdd + r.del = notImplementedDel + r.check = notImplementedCheck + return r +} + +// SetHandlers atomically replaces the ADD/DEL/CHECK handlers. Used at +// startup once IPAM, Store, and netlink are wired together. +func (r *rpcServer) SetHandlers(add AddHandler, del DelHandler, check CheckHandler) { + r.mu.Lock() + defer r.mu.Unlock() + if add != nil { + r.add = add + } + if del != nil { + r.del = del + } + if check != nil { + r.check = check + } +} + +func (r *rpcServer) handlers() (AddHandler, DelHandler, CheckHandler) { + r.mu.RLock() + defer r.mu.RUnlock() + return r.add, r.del, r.check +} + +// serve takes ownership of l, accepting connections until ctx is cancelled +// or the listener errors. +func (r *rpcServer) serve(ctx context.Context, l net.Listener) { + go func() { + <-ctx.Done() + _ = l.Close() + }() + for { + conn, err := l.Accept() + if err != nil { + return + } + go r.handleConn(ctx, conn) + } +} + +// handleConn reads exactly one Request, dispatches, and writes exactly one +// Response. Connection-per-RPC matches the kubelet → plugin model where +// each CNI invocation is a fresh process. +func (r *rpcServer) handleConn(ctx context.Context, conn net.Conn) { + defer conn.Close() + req, err := flockcni.DecodeRequest(conn) + if err != nil { + r.logger.Warn("rpc decode", "err", err) + _ = flockcni.EncodeResponse(conn, flockcni.Response{Error: fmt.Sprintf("decode: %v", err)}) + return + } + + addH, delH, checkH := r.handlers() + + var resp flockcni.Response + switch req.Op { + case flockcni.OpAdd: + result, err := addH(ctx, req) + if err != nil { + resp.Error = err.Error() + } else { + resp.Result = result + } + case flockcni.OpDel: + if err := delH(ctx, req); err != nil { + resp.Error = err.Error() + } + case flockcni.OpCheck: + if err := checkH(ctx, req); err != nil { + resp.Error = err.Error() + } + default: + resp.Error = fmt.Sprintf("unknown op %q", req.Op) + } + + r.logger.Info("rpc", + "op", req.Op, + "container", req.ContainerID, + "netns", req.Netns, + "err", resp.Error) + + if err := flockcni.EncodeResponse(conn, resp); err != nil { + r.logger.Warn("rpc encode", "err", err) + } +} + +func notImplementedAdd(_ context.Context, _ flockcni.Request) (*current.Result, error) { + return nil, fmt.Errorf("flock M2 not yet wired: ADD path returns no result") +} +func notImplementedDel(_ context.Context, _ flockcni.Request) error { return nil } // idempotent: pretend success +func notImplementedCheck(_ context.Context, _ flockcni.Request) error { return nil } diff --git a/pkg/agent/server.go b/pkg/agent/server.go index 00a64ee..5850f77 100644 --- a/pkg/agent/server.go +++ b/pkg/agent/server.go @@ -18,12 +18,14 @@ import ( const SocketPath = "/run/flock/flock.sock" // Server is the agent's runtime container: state store, kubernetes informers, -// netlink, BIRD, nftables. M1.5 wires the state store, a placeholder unix -// listener, and a NodeConfig informer. +// netlink, BIRD, nftables. Current state: state store, NodeConfig informer, +// RPC dispatcher with stub ADD/DEL/CHECK handlers (will be replaced when +// netlink + IPAM wire-up lands). type Server struct { Node string Store *Store NodeConfig *NodeConfigCache + RPC *rpcServer Logger *slog.Logger socket string restCfg *rest.Config @@ -69,6 +71,7 @@ func NewServer(cfg Config) (*Server, error) { Node: cfg.Node, Store: store, NodeConfig: &NodeConfigCache{}, + RPC: newRPCServer(cfg.Logger), Logger: cfg.Logger, socket: cfg.Socket, restCfg: restCfg, @@ -102,16 +105,8 @@ func (s *Server) Run(ctx context.Context) error { "allocations", len(s.Store.Snapshot()), ) - // Accept loop: closes every connection immediately (M2 will dispatch). - go func() { - for { - conn, err := l.Accept() - if err != nil { - return - } - _ = conn.Close() - } - }() + // RPC dispatcher takes ownership of the listener. + go s.RPC.serve(ctx, l) // NodeConfig informer. Any error from the informer terminates Run. errCh := make(chan error, 1) diff --git a/pkg/cni/plugin.go b/pkg/cni/plugin.go index 68c73bb..b3da648 100644 --- a/pkg/cni/plugin.go +++ b/pkg/cni/plugin.go @@ -4,8 +4,6 @@ package cni import ( - "errors" - "github.com/containernetworking/cni/pkg/skel" "github.com/containernetworking/cni/pkg/types" current "github.com/containernetworking/cni/pkg/types/100" @@ -14,24 +12,38 @@ import ( // SocketPath is the unix socket exposed by flock-agent. const SocketPath = "/run/flock/flock.sock" -var errNotImplemented = errors.New("flock: ADD/DEL/CHECK not implemented in M1 scaffold") - // CmdAdd is invoked by kubelet when a pod sandbox is created. func CmdAdd(args *skel.CmdArgs) error { - // M2: dial SocketPath, send ADD RPC, return CNI result. - _ = args - _ = current.ImplementedSpecVersion - return types.NewError(types.ErrInternal, "flock-add", errNotImplemented.Error()) + resp, err := call(fromArgs(OpAdd, args)) + if err != nil { + return types.NewError(types.ErrInternal, "flock-add", err.Error()) + } + if cerr := toCNIError("add", resp); cerr != nil { + return cerr + } + if resp.Result == nil { + return types.NewError(types.ErrInternal, "flock-add", "agent returned no result") + } + return types.PrintResult(resp.Result, current.ImplementedSpecVersion) } -// CmdDel is invoked by kubelet when a pod sandbox is torn down. +// CmdDel is invoked by kubelet when a pod sandbox is torn down. CNI spec: +// DEL must be idempotent. The agent treats a missing allocation as success. func CmdDel(args *skel.CmdArgs) error { - _ = args - return types.NewError(types.ErrInternal, "flock-del", errNotImplemented.Error()) + resp, err := call(fromArgs(OpDel, args)) + if err != nil { + // On dial failure during DEL, fail loudly — kubelet retries DEL, + // and the next attempt may succeed once the agent is reachable. + return types.NewError(types.ErrInternal, "flock-del", err.Error()) + } + return toCNIError("del", resp) } // CmdCheck verifies that the live netns matches the persisted allocation. func CmdCheck(args *skel.CmdArgs) error { - _ = args - return types.NewError(types.ErrInternal, "flock-check", errNotImplemented.Error()) + resp, err := call(fromArgs(OpCheck, args)) + if err != nil { + return types.NewError(types.ErrInternal, "flock-check", err.Error()) + } + return toCNIError("check", resp) } diff --git a/pkg/cni/rpc.go b/pkg/cni/rpc.go new file mode 100644 index 0000000..b35e60c --- /dev/null +++ b/pkg/cni/rpc.go @@ -0,0 +1,87 @@ +package cni + +import ( + "encoding/json" + "fmt" + "io" + + current "github.com/containernetworking/cni/pkg/types/100" +) + +// Op is the CNI verb the plugin asks the agent to perform. +type Op string + +const ( + OpAdd Op = "ADD" + OpDel Op = "DEL" + OpCheck Op = "CHECK" +) + +// Request is sent over the unix socket from the CNI plugin to flock-agent. +// Field names mirror the kubelet → CNI invocation env vars; the agent uses +// these to look up Pod metadata via the informer cache. +type Request struct { + Op Op `json:"op"` + ContainerID string `json:"container_id"` + Netns string `json:"netns"` // /proc//ns/net + IfName string `json:"ifname"` // typically "eth0" + Args string `json:"args"` // raw CNI_ARGS env (K=V;K=V;...) + Path string `json:"path"` // CNI_PATH (plugin search path) + StdinData []byte `json:"stdin_data"` // raw network configuration JSON +} + +// Response carries either a typed CNI Result or a single error string. +// We use a string error (not a Go error) because errors traverse a JSON +// boundary; the client converts back to a CNI types.Error. +type Response struct { + Result *current.Result `json:"result,omitempty"` + Error string `json:"error,omitempty"` +} + +// EncodeRequest writes req to w as a single JSON object followed by '\n'. +// The newline framing makes the wire-protocol simple to read incrementally +// without a length prefix or full-stream-buffering. +func EncodeRequest(w io.Writer, req Request) error { + b, err := json.Marshal(req) + if err != nil { + return fmt.Errorf("marshal request: %w", err) + } + b = append(b, '\n') + if _, err := w.Write(b); err != nil { + return fmt.Errorf("write request: %w", err) + } + return nil +} + +// DecodeRequest reads one newline-delimited JSON request from r. +func DecodeRequest(r io.Reader) (Request, error) { + var req Request + dec := json.NewDecoder(r) + if err := dec.Decode(&req); err != nil { + return Request{}, fmt.Errorf("decode request: %w", err) + } + return req, nil +} + +// EncodeResponse writes resp to w as a single JSON object followed by '\n'. +func EncodeResponse(w io.Writer, resp Response) error { + b, err := json.Marshal(resp) + if err != nil { + return fmt.Errorf("marshal response: %w", err) + } + b = append(b, '\n') + if _, err := w.Write(b); err != nil { + return fmt.Errorf("write response: %w", err) + } + return nil +} + +// DecodeResponse reads one newline-delimited JSON response from r. +func DecodeResponse(r io.Reader) (Response, error) { + var resp Response + dec := json.NewDecoder(r) + if err := dec.Decode(&resp); err != nil { + return Response{}, fmt.Errorf("decode response: %w", err) + } + return resp, nil +} diff --git a/pkg/cni/rpc_client.go b/pkg/cni/rpc_client.go index 0857ab9..298a6eb 100644 --- a/pkg/cni/rpc_client.go +++ b/pkg/cni/rpc_client.go @@ -1,5 +1,59 @@ package cni -// rpc_client.go will hold the JSON-over-unix-socket client used by the CNI -// plugin to call into flock-agent. Stub for M1; implementation lands in M2 -// alongside the agent's RPC server. +import ( + "fmt" + "net" + "time" + + "github.com/containernetworking/cni/pkg/skel" + "github.com/containernetworking/cni/pkg/types" +) + +// dialTimeout bounds how long the plugin waits to connect to the agent +// socket. kubelet has its own outer timeout for the whole CNI invocation, +// but a tighter bound here gives a clearer error if the DaemonSet pod is +// gone or starting up. +const dialTimeout = 5 * time.Second + +// rpcSocket is overridable for tests. +var rpcSocket = SocketPath + +// call issues one Request and returns the Response. It dials the agent +// unix socket, encodes the request, and decodes a single response. The +// connection is closed before returning. +func call(req Request) (*Response, error) { + conn, err := net.DialTimeout("unix", rpcSocket, dialTimeout) + if err != nil { + return nil, fmt.Errorf("dial flock-agent at %s: %w", rpcSocket, err) + } + defer conn.Close() + if err := EncodeRequest(conn, req); err != nil { + return nil, err + } + resp, err := DecodeResponse(conn) + if err != nil { + return nil, err + } + return &resp, nil +} + +// fromArgs builds a Request from a CNI skel.CmdArgs invocation. +func fromArgs(op Op, args *skel.CmdArgs) Request { + return Request{ + Op: op, + ContainerID: args.ContainerID, + Netns: args.Netns, + IfName: args.IfName, + Args: args.Args, + Path: args.Path, + StdinData: args.StdinData, + } +} + +// toCNIError converts an RPC Response.Error into a CNI types.Error, or nil. +func toCNIError(stage string, resp *Response) error { + if resp.Error == "" { + return nil + } + return types.NewError(types.ErrInternal, "flock-"+stage, resp.Error) +} diff --git a/pkg/cni/rpc_test.go b/pkg/cni/rpc_test.go new file mode 100644 index 0000000..fd9a85c --- /dev/null +++ b/pkg/cni/rpc_test.go @@ -0,0 +1,125 @@ +package cni + +import ( + "bytes" + "net" + "path/filepath" + "testing" + "time" + + current "github.com/containernetworking/cni/pkg/types/100" +) + +func TestEncodeDecode_RequestRoundtrip(t *testing.T) { + req := Request{ + Op: OpAdd, + ContainerID: "abc", + Netns: "/proc/1234/ns/net", + IfName: "eth0", + Args: "K8S_POD_NAMESPACE=mail;K8S_POD_NAME=stalwart-0", + Path: "/opt/cni/bin", + StdinData: []byte(`{"cniVersion":"1.0.0","name":"flock"}`), + } + var buf bytes.Buffer + if err := EncodeRequest(&buf, req); err != nil { + t.Fatal(err) + } + got, err := DecodeRequest(&buf) + if err != nil { + t.Fatal(err) + } + if got.Op != req.Op || got.ContainerID != req.ContainerID || string(got.StdinData) != string(req.StdinData) { + t.Fatalf("roundtrip mismatch:\n got=%+v\nwant=%+v", got, req) + } +} + +func TestEncodeDecode_ResponseRoundtrip(t *testing.T) { + resp := Response{ + Result: ¤t.Result{CNIVersion: current.ImplementedSpecVersion}, + } + var buf bytes.Buffer + if err := EncodeResponse(&buf, resp); err != nil { + t.Fatal(err) + } + got, err := DecodeResponse(&buf) + if err != nil { + t.Fatal(err) + } + if got.Result == nil || got.Result.CNIVersion != current.ImplementedSpecVersion { + t.Fatalf("response roundtrip lost CNIVersion: %+v", got) + } +} + +// TestRPC_ClientToFakeServer wires the real client to a tiny in-process +// server over a unix socket, exercising end-to-end framing. +func TestRPC_ClientToFakeServer(t *testing.T) { + dir := t.TempDir() + sockPath := filepath.Join(dir, "flock.sock") + + l, err := net.Listen("unix", sockPath) + if err != nil { + t.Fatal(err) + } + defer l.Close() + + // Server: read one Request, write one Response. + done := make(chan error, 1) + go func() { + conn, err := l.Accept() + if err != nil { + done <- err + return + } + defer conn.Close() + req, err := DecodeRequest(conn) + if err != nil { + done <- err + return + } + var resp Response + switch req.Op { + case OpAdd: + resp.Result = ¤t.Result{CNIVersion: current.ImplementedSpecVersion} + case OpDel, OpCheck: + // no-op success + default: + resp.Error = "unknown op" + } + done <- EncodeResponse(conn, resp) + }() + + // Point the client at our test socket. + prev := rpcSocket + rpcSocket = sockPath + defer func() { rpcSocket = prev }() + + resp, err := call(Request{Op: OpAdd, ContainerID: "test"}) + if err != nil { + t.Fatalf("call: %v", err) + } + if resp.Result == nil { + t.Fatalf("expected result, got %+v", resp) + } + + select { + case err := <-done: + if err != nil { + t.Fatalf("server: %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("server did not finish") + } +} + +func TestRPC_ServerErrorPropagatesToCNIError(t *testing.T) { + resp := &Response{Error: "no NodeConfig for host001"} + err := toCNIError("add", resp) + if err == nil { + t.Fatal("expected CNI error") + } + if got := err.Error(); got == "" || got == "no NodeConfig for host001" { + // types.Error wraps the message — just make sure something non-empty + // surfaces and that the underlying string is contained. + t.Fatalf("unexpected error format: %q", got) + } +}