136 lines
3.7 KiB
Go
136 lines
3.7 KiB
Go
|
|
package agent
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"fmt"
|
||
|
|
"log/slog"
|
||
|
|
"net"
|
||
|
|
"sync"
|
||
|
|
|
||
|
|
flockcni "code.fritzlab.net/fritzlab/flock/pkg/cni"
|
||
|
|
current "github.com/containernetworking/cni/pkg/types/100"
|
||
|
|
)
|
||
|
|
|
||
|
|
// AddHandler is the per-pod ADD logic — IPAM allocate, netns program,
|
||
|
|
// state commit. M2 wires this to a real implementation; M1.5 returns a
|
||
|
|
// "not implemented" error so the plugin path is exercised end-to-end
|
||
|
|
// without doing anything network-shaped.
|
||
|
|
type AddHandler func(ctx context.Context, req flockcni.Request) (*current.Result, error)
|
||
|
|
|
||
|
|
// DelHandler is the per-pod DEL logic. CNI requires idempotency.
|
||
|
|
type DelHandler func(ctx context.Context, req flockcni.Request) error
|
||
|
|
|
||
|
|
// CheckHandler verifies that the live netns matches the persisted state.
|
||
|
|
type CheckHandler func(ctx context.Context, req flockcni.Request) error
|
||
|
|
|
||
|
|
// rpcServer dispatches one RPC per accepted connection. Concurrent
|
||
|
|
// connections are dispatched in their own goroutines; each handler is
|
||
|
|
// called serially per-pod (the agent's IPAM/Store mutex enforces this
|
||
|
|
// further down the stack).
|
||
|
|
type rpcServer struct {
|
||
|
|
logger *slog.Logger
|
||
|
|
|
||
|
|
mu sync.RWMutex
|
||
|
|
add AddHandler
|
||
|
|
del DelHandler
|
||
|
|
check CheckHandler
|
||
|
|
}
|
||
|
|
|
||
|
|
func newRPCServer(logger *slog.Logger) *rpcServer {
|
||
|
|
r := &rpcServer{logger: logger}
|
||
|
|
r.add = notImplementedAdd
|
||
|
|
r.del = notImplementedDel
|
||
|
|
r.check = notImplementedCheck
|
||
|
|
return r
|
||
|
|
}
|
||
|
|
|
||
|
|
// SetHandlers atomically replaces the ADD/DEL/CHECK handlers. Used at
|
||
|
|
// startup once IPAM, Store, and netlink are wired together.
|
||
|
|
func (r *rpcServer) SetHandlers(add AddHandler, del DelHandler, check CheckHandler) {
|
||
|
|
r.mu.Lock()
|
||
|
|
defer r.mu.Unlock()
|
||
|
|
if add != nil {
|
||
|
|
r.add = add
|
||
|
|
}
|
||
|
|
if del != nil {
|
||
|
|
r.del = del
|
||
|
|
}
|
||
|
|
if check != nil {
|
||
|
|
r.check = check
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (r *rpcServer) handlers() (AddHandler, DelHandler, CheckHandler) {
|
||
|
|
r.mu.RLock()
|
||
|
|
defer r.mu.RUnlock()
|
||
|
|
return r.add, r.del, r.check
|
||
|
|
}
|
||
|
|
|
||
|
|
// serve takes ownership of l, accepting connections until ctx is cancelled
|
||
|
|
// or the listener errors.
|
||
|
|
func (r *rpcServer) serve(ctx context.Context, l net.Listener) {
|
||
|
|
go func() {
|
||
|
|
<-ctx.Done()
|
||
|
|
_ = l.Close()
|
||
|
|
}()
|
||
|
|
for {
|
||
|
|
conn, err := l.Accept()
|
||
|
|
if err != nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
go r.handleConn(ctx, conn)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// handleConn reads exactly one Request, dispatches, and writes exactly one
|
||
|
|
// Response. Connection-per-RPC matches the kubelet → plugin model where
|
||
|
|
// each CNI invocation is a fresh process.
|
||
|
|
func (r *rpcServer) handleConn(ctx context.Context, conn net.Conn) {
|
||
|
|
defer conn.Close()
|
||
|
|
req, err := flockcni.DecodeRequest(conn)
|
||
|
|
if err != nil {
|
||
|
|
r.logger.Warn("rpc decode", "err", err)
|
||
|
|
_ = flockcni.EncodeResponse(conn, flockcni.Response{Error: fmt.Sprintf("decode: %v", err)})
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
addH, delH, checkH := r.handlers()
|
||
|
|
|
||
|
|
var resp flockcni.Response
|
||
|
|
switch req.Op {
|
||
|
|
case flockcni.OpAdd:
|
||
|
|
result, err := addH(ctx, req)
|
||
|
|
if err != nil {
|
||
|
|
resp.Error = err.Error()
|
||
|
|
} else {
|
||
|
|
resp.Result = result
|
||
|
|
}
|
||
|
|
case flockcni.OpDel:
|
||
|
|
if err := delH(ctx, req); err != nil {
|
||
|
|
resp.Error = err.Error()
|
||
|
|
}
|
||
|
|
case flockcni.OpCheck:
|
||
|
|
if err := checkH(ctx, req); err != nil {
|
||
|
|
resp.Error = err.Error()
|
||
|
|
}
|
||
|
|
default:
|
||
|
|
resp.Error = fmt.Sprintf("unknown op %q", req.Op)
|
||
|
|
}
|
||
|
|
|
||
|
|
r.logger.Info("rpc",
|
||
|
|
"op", req.Op,
|
||
|
|
"container", req.ContainerID,
|
||
|
|
"netns", req.Netns,
|
||
|
|
"err", resp.Error)
|
||
|
|
|
||
|
|
if err := flockcni.EncodeResponse(conn, resp); err != nil {
|
||
|
|
r.logger.Warn("rpc encode", "err", err)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func notImplementedAdd(_ context.Context, _ flockcni.Request) (*current.Result, error) {
|
||
|
|
return nil, fmt.Errorf("flock M2 not yet wired: ADD path returns no result")
|
||
|
|
}
|
||
|
|
func notImplementedDel(_ context.Context, _ flockcni.Request) error { return nil } // idempotent: pretend success
|
||
|
|
func notImplementedCheck(_ context.Context, _ flockcni.Request) error { return nil }
|