Files
flock/pkg/agent/state.go
T

205 lines
5.6 KiB
Go
Raw Normal View History

// Package agent owns the in-process flock-agent runtime: IPAM, netns, state,
// anycast, and NetworkPolicy. This file implements the durable per-node
// allocation file at /var/lib/flock/allocations.json.
package agent
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"time"
)
// AllocationState is the lifecycle marker for an entry in allocations.json.
//
// pending — IPAM picked addresses; netlink work may be incomplete.
// On agent startup these are GC'd: addrs/routes/veth removed.
// committed — netlink ops finished; entry is the source of truth.
type AllocationState string
const (
StatePending AllocationState = "pending"
StateCommitted AllocationState = "committed"
)
// Allocation is a single per-pod entry persisted in allocations.json.
type Allocation struct {
ContainerID string `json:"container_id"`
Namespace string `json:"namespace"`
PodName string `json:"pod_name"`
OwnerUID string `json:"owner_uid"`
IP6 string `json:"ip6,omitempty"`
IP4 string `json:"ip4,omitempty"`
Anycast []string `json:"anycast,omitempty"`
State AllocationState `json:"state"`
AllocatedAt time.Time `json:"allocated_at"`
}
// State is the on-disk file shape. Version is bumped on incompatible changes.
type State struct {
Version int `json:"version"`
Node string `json:"node"`
Allocations []Allocation `json:"allocations"`
}
const stateVersion = 1
// Store is the durable allocation store.
//
// All public methods are safe for concurrent use. They serialize through
// a single mutex so that the on-disk file is always consistent and so that
// the CNI ADD/DEL critical sections (which mutate kernel state alongside the
// file) can rely on snapshot semantics.
type Store struct {
mu sync.Mutex
path string
node string
data State
}
// NewStore opens (or creates) a per-node store. The directory containing
// `path` must already exist; we do not create it because in production it is
// /var/lib/flock managed by the DaemonSet, not the agent process.
func NewStore(path, node string) (*Store, error) {
s := &Store{path: path, node: node}
if err := s.load(); err != nil {
return nil, err
}
return s, nil
}
func (s *Store) load() error {
b, err := os.ReadFile(s.path)
if os.IsNotExist(err) {
s.data = State{Version: stateVersion, Node: s.node, Allocations: []Allocation{}}
return nil
}
if err != nil {
return fmt.Errorf("read state: %w", err)
}
if err := json.Unmarshal(b, &s.data); err != nil {
return fmt.Errorf("parse state: %w", err)
}
if s.data.Version != stateVersion {
return fmt.Errorf("state version %d, want %d", s.data.Version, stateVersion)
}
if s.data.Allocations == nil {
s.data.Allocations = []Allocation{}
}
return nil
}
// flushLocked writes the in-memory state to disk durably:
//
// 1. write to <path>.tmp
// 2. fsync(tmpfd)
// 3. rename to <path>
// 4. fsync(parent dir) — required so the rename survives power loss.
//
// Caller MUST hold s.mu.
func (s *Store) flushLocked() error {
b, err := json.MarshalIndent(s.data, "", " ")
if err != nil {
return fmt.Errorf("marshal state: %w", err)
}
tmp := s.path + ".tmp"
f, err := os.OpenFile(tmp, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o600)
if err != nil {
return fmt.Errorf("open tmp: %w", err)
}
if _, err := f.Write(b); err != nil {
f.Close()
os.Remove(tmp)
return fmt.Errorf("write tmp: %w", err)
}
if err := f.Sync(); err != nil {
f.Close()
os.Remove(tmp)
return fmt.Errorf("fsync tmp: %w", err)
}
if err := f.Close(); err != nil {
os.Remove(tmp)
return fmt.Errorf("close tmp: %w", err)
}
if err := os.Rename(tmp, s.path); err != nil {
os.Remove(tmp)
return fmt.Errorf("rename: %w", err)
}
dir, err := os.Open(filepath.Dir(s.path))
if err != nil {
return fmt.Errorf("open parent: %w", err)
}
defer dir.Close()
if err := dir.Sync(); err != nil {
return fmt.Errorf("fsync parent: %w", err)
}
return nil
}
// Get returns the entry for containerID, ok=false if absent. Returned value
// is a copy; mutations do not affect store state.
func (s *Store) Get(containerID string) (Allocation, bool) {
s.mu.Lock()
defer s.mu.Unlock()
for _, a := range s.data.Allocations {
if a.ContainerID == containerID {
return a, true
}
}
return Allocation{}, false
}
// Upsert inserts or replaces the entry for a.ContainerID and flushes.
func (s *Store) Upsert(a Allocation) error {
s.mu.Lock()
defer s.mu.Unlock()
for i, ex := range s.data.Allocations {
if ex.ContainerID == a.ContainerID {
s.data.Allocations[i] = a
return s.flushLocked()
}
}
s.data.Allocations = append(s.data.Allocations, a)
return s.flushLocked()
}
// Delete removes the entry for containerID (no-op if absent) and flushes.
func (s *Store) Delete(containerID string) error {
s.mu.Lock()
defer s.mu.Unlock()
for i, a := range s.data.Allocations {
if a.ContainerID == containerID {
s.data.Allocations = append(s.data.Allocations[:i], s.data.Allocations[i+1:]...)
return s.flushLocked()
}
}
return nil
}
// Snapshot returns a defensive copy of all allocations.
func (s *Store) Snapshot() []Allocation {
s.mu.Lock()
defer s.mu.Unlock()
out := make([]Allocation, len(s.data.Allocations))
copy(out, s.data.Allocations)
return out
}
// PendingContainerIDs returns container IDs whose entries are State==pending.
// Used by agent startup GC.
func (s *Store) PendingContainerIDs() []string {
s.mu.Lock()
defer s.mu.Unlock()
var out []string
for _, a := range s.data.Allocations {
if a.State == StatePending {
out = append(out, a.ContainerID)
}
}
return out
}