// Package agent owns the in-process flock-agent runtime: IPAM, netns, state, // anycast, and NetworkPolicy. This file implements the durable per-node // allocation file at /var/lib/flock/allocations.json. package agent import ( "encoding/json" "fmt" "os" "path/filepath" "sync" "time" ) // AllocationState is the lifecycle marker for an entry in allocations.json. // // pending — IPAM picked addresses; netlink work may be incomplete. // On agent startup these are GC'd: addrs/routes/veth removed. // committed — netlink ops finished; entry is the source of truth. type AllocationState string const ( StatePending AllocationState = "pending" StateCommitted AllocationState = "committed" ) // Allocation is a single per-pod entry persisted in allocations.json. type Allocation struct { ContainerID string `json:"container_id"` Namespace string `json:"namespace"` PodName string `json:"pod_name"` OwnerUID string `json:"owner_uid"` IP6 string `json:"ip6,omitempty"` IP4 string `json:"ip4,omitempty"` Anycast []string `json:"anycast,omitempty"` State AllocationState `json:"state"` AllocatedAt time.Time `json:"allocated_at"` } // State is the on-disk file shape. Version is bumped on incompatible changes. type State struct { Version int `json:"version"` Node string `json:"node"` Allocations []Allocation `json:"allocations"` } const stateVersion = 1 // Store is the durable allocation store. // // All public methods are safe for concurrent use. They serialize through // a single mutex so that the on-disk file is always consistent and so that // the CNI ADD/DEL critical sections (which mutate kernel state alongside the // file) can rely on snapshot semantics. type Store struct { mu sync.Mutex path string node string data State } // NewStore opens (or creates) a per-node store. The directory containing // `path` must already exist; we do not create it because in production it is // /var/lib/flock managed by the DaemonSet, not the agent process. func NewStore(path, node string) (*Store, error) { s := &Store{path: path, node: node} if err := s.load(); err != nil { return nil, err } return s, nil } func (s *Store) load() error { b, err := os.ReadFile(s.path) if os.IsNotExist(err) { s.data = State{Version: stateVersion, Node: s.node, Allocations: []Allocation{}} return nil } if err != nil { return fmt.Errorf("read state: %w", err) } if err := json.Unmarshal(b, &s.data); err != nil { return fmt.Errorf("parse state: %w", err) } if s.data.Version != stateVersion { return fmt.Errorf("state version %d, want %d", s.data.Version, stateVersion) } if s.data.Allocations == nil { s.data.Allocations = []Allocation{} } return nil } // flushLocked writes the in-memory state to disk durably: // // 1. write to .tmp // 2. fsync(tmpfd) // 3. rename to // 4. fsync(parent dir) — required so the rename survives power loss. // // Caller MUST hold s.mu. func (s *Store) flushLocked() error { b, err := json.MarshalIndent(s.data, "", " ") if err != nil { return fmt.Errorf("marshal state: %w", err) } tmp := s.path + ".tmp" f, err := os.OpenFile(tmp, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o600) if err != nil { return fmt.Errorf("open tmp: %w", err) } if _, err := f.Write(b); err != nil { f.Close() os.Remove(tmp) return fmt.Errorf("write tmp: %w", err) } if err := f.Sync(); err != nil { f.Close() os.Remove(tmp) return fmt.Errorf("fsync tmp: %w", err) } if err := f.Close(); err != nil { os.Remove(tmp) return fmt.Errorf("close tmp: %w", err) } if err := os.Rename(tmp, s.path); err != nil { os.Remove(tmp) return fmt.Errorf("rename: %w", err) } dir, err := os.Open(filepath.Dir(s.path)) if err != nil { return fmt.Errorf("open parent: %w", err) } defer dir.Close() if err := dir.Sync(); err != nil { return fmt.Errorf("fsync parent: %w", err) } return nil } // Get returns the entry for containerID, ok=false if absent. Returned value // is a copy; mutations do not affect store state. func (s *Store) Get(containerID string) (Allocation, bool) { s.mu.Lock() defer s.mu.Unlock() for _, a := range s.data.Allocations { if a.ContainerID == containerID { return a, true } } return Allocation{}, false } // Upsert inserts or replaces the entry for a.ContainerID and flushes. func (s *Store) Upsert(a Allocation) error { s.mu.Lock() defer s.mu.Unlock() for i, ex := range s.data.Allocations { if ex.ContainerID == a.ContainerID { s.data.Allocations[i] = a return s.flushLocked() } } s.data.Allocations = append(s.data.Allocations, a) return s.flushLocked() } // Delete removes the entry for containerID (no-op if absent) and flushes. func (s *Store) Delete(containerID string) error { s.mu.Lock() defer s.mu.Unlock() for i, a := range s.data.Allocations { if a.ContainerID == containerID { s.data.Allocations = append(s.data.Allocations[:i], s.data.Allocations[i+1:]...) return s.flushLocked() } } return nil } // Snapshot returns a defensive copy of all allocations. func (s *Store) Snapshot() []Allocation { s.mu.Lock() defer s.mu.Unlock() out := make([]Allocation, len(s.data.Allocations)) copy(out, s.data.Allocations) return out } // PendingContainerIDs returns container IDs whose entries are State==pending. // Used by agent startup GC. func (s *Store) PendingContainerIDs() []string { s.mu.Lock() defer s.mu.Unlock() var out []string for _, a := range s.data.Allocations { if a.State == StatePending { out = append(out, a.ContainerID) } } return out }