Files
flock/pkg/agent/rpc.go
T

136 lines
3.7 KiB
Go
Raw Normal View History

package agent
import (
"context"
"fmt"
"log/slog"
"net"
"sync"
flockcni "code.fritzlab.net/fritzlab/flock/pkg/cni"
current "github.com/containernetworking/cni/pkg/types/100"
)
// AddHandler is the per-pod ADD logic — IPAM allocate, netns program,
// state commit. M2 wires this to a real implementation; M1.5 returns a
// "not implemented" error so the plugin path is exercised end-to-end
// without doing anything network-shaped.
type AddHandler func(ctx context.Context, req flockcni.Request) (*current.Result, error)
// DelHandler is the per-pod DEL logic. CNI requires idempotency.
type DelHandler func(ctx context.Context, req flockcni.Request) error
// CheckHandler verifies that the live netns matches the persisted state.
type CheckHandler func(ctx context.Context, req flockcni.Request) error
// rpcServer dispatches one RPC per accepted connection. Concurrent
// connections are dispatched in their own goroutines; each handler is
// called serially per-pod (the agent's IPAM/Store mutex enforces this
// further down the stack).
type rpcServer struct {
logger *slog.Logger
mu sync.RWMutex
add AddHandler
del DelHandler
check CheckHandler
}
func newRPCServer(logger *slog.Logger) *rpcServer {
r := &rpcServer{logger: logger}
r.add = notImplementedAdd
r.del = notImplementedDel
r.check = notImplementedCheck
return r
}
// SetHandlers atomically replaces the ADD/DEL/CHECK handlers. Used at
// startup once IPAM, Store, and netlink are wired together.
func (r *rpcServer) SetHandlers(add AddHandler, del DelHandler, check CheckHandler) {
r.mu.Lock()
defer r.mu.Unlock()
if add != nil {
r.add = add
}
if del != nil {
r.del = del
}
if check != nil {
r.check = check
}
}
func (r *rpcServer) handlers() (AddHandler, DelHandler, CheckHandler) {
r.mu.RLock()
defer r.mu.RUnlock()
return r.add, r.del, r.check
}
// serve takes ownership of l, accepting connections until ctx is cancelled
// or the listener errors.
func (r *rpcServer) serve(ctx context.Context, l net.Listener) {
go func() {
<-ctx.Done()
_ = l.Close()
}()
for {
conn, err := l.Accept()
if err != nil {
return
}
go r.handleConn(ctx, conn)
}
}
// handleConn reads exactly one Request, dispatches, and writes exactly one
// Response. Connection-per-RPC matches the kubelet → plugin model where
// each CNI invocation is a fresh process.
func (r *rpcServer) handleConn(ctx context.Context, conn net.Conn) {
defer conn.Close()
req, err := flockcni.DecodeRequest(conn)
if err != nil {
r.logger.Warn("rpc decode", "err", err)
_ = flockcni.EncodeResponse(conn, flockcni.Response{Error: fmt.Sprintf("decode: %v", err)})
return
}
addH, delH, checkH := r.handlers()
var resp flockcni.Response
switch req.Op {
case flockcni.OpAdd:
result, err := addH(ctx, req)
if err != nil {
resp.Error = err.Error()
} else {
resp.Result = result
}
case flockcni.OpDel:
if err := delH(ctx, req); err != nil {
resp.Error = err.Error()
}
case flockcni.OpCheck:
if err := checkH(ctx, req); err != nil {
resp.Error = err.Error()
}
default:
resp.Error = fmt.Sprintf("unknown op %q", req.Op)
}
r.logger.Info("rpc",
"op", req.Op,
"container", req.ContainerID,
"netns", req.Netns,
"err", resp.Error)
if err := flockcni.EncodeResponse(conn, resp); err != nil {
r.logger.Warn("rpc encode", "err", err)
}
}
func notImplementedAdd(_ context.Context, _ flockcni.Request) (*current.Result, error) {
return nil, fmt.Errorf("flock M2 not yet wired: ADD path returns no result")
}
func notImplementedDel(_ context.Context, _ flockcni.Request) error { return nil } // idempotent: pretend success
func notImplementedCheck(_ context.Context, _ flockcni.Request) error { return nil }