2026-04-25 09:25:45 -05:00
|
|
|
// This file implements the durable per-node allocation file at
|
|
|
|
|
// /var/lib/flock/allocations.json. The package-level doc lives in doc.go.
|
|
|
|
|
|
2026-04-24 21:17:42 -05:00
|
|
|
package agent
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"encoding/json"
|
|
|
|
|
"fmt"
|
|
|
|
|
"os"
|
|
|
|
|
"path/filepath"
|
|
|
|
|
"sync"
|
|
|
|
|
"time"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// AllocationState is the lifecycle marker for an entry in allocations.json.
|
|
|
|
|
//
|
|
|
|
|
// pending — IPAM picked addresses; netlink work may be incomplete.
|
|
|
|
|
// On agent startup these are GC'd: addrs/routes/veth removed.
|
|
|
|
|
// committed — netlink ops finished; entry is the source of truth.
|
|
|
|
|
type AllocationState string
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
StatePending AllocationState = "pending"
|
|
|
|
|
StateCommitted AllocationState = "committed"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// Allocation is a single per-pod entry persisted in allocations.json.
|
|
|
|
|
type Allocation struct {
|
|
|
|
|
ContainerID string `json:"container_id"`
|
|
|
|
|
Namespace string `json:"namespace"`
|
|
|
|
|
PodName string `json:"pod_name"`
|
|
|
|
|
OwnerUID string `json:"owner_uid"`
|
|
|
|
|
IP6 string `json:"ip6,omitempty"`
|
|
|
|
|
IP4 string `json:"ip4,omitempty"`
|
|
|
|
|
Anycast []string `json:"anycast,omitempty"`
|
|
|
|
|
State AllocationState `json:"state"`
|
|
|
|
|
AllocatedAt time.Time `json:"allocated_at"`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// State is the on-disk file shape. Version is bumped on incompatible changes.
|
|
|
|
|
type State struct {
|
|
|
|
|
Version int `json:"version"`
|
|
|
|
|
Node string `json:"node"`
|
|
|
|
|
Allocations []Allocation `json:"allocations"`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const stateVersion = 1
|
|
|
|
|
|
|
|
|
|
// Store is the durable allocation store.
|
|
|
|
|
//
|
|
|
|
|
// All public methods are safe for concurrent use. They serialize through
|
|
|
|
|
// a single mutex so that the on-disk file is always consistent and so that
|
|
|
|
|
// the CNI ADD/DEL critical sections (which mutate kernel state alongside the
|
|
|
|
|
// file) can rely on snapshot semantics.
|
|
|
|
|
type Store struct {
|
|
|
|
|
mu sync.Mutex
|
|
|
|
|
path string
|
|
|
|
|
node string
|
|
|
|
|
data State
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewStore opens (or creates) a per-node store. The directory containing
|
|
|
|
|
// `path` must already exist; we do not create it because in production it is
|
|
|
|
|
// /var/lib/flock managed by the DaemonSet, not the agent process.
|
|
|
|
|
func NewStore(path, node string) (*Store, error) {
|
|
|
|
|
s := &Store{path: path, node: node}
|
|
|
|
|
if err := s.load(); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
return s, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Store) load() error {
|
|
|
|
|
b, err := os.ReadFile(s.path)
|
|
|
|
|
if os.IsNotExist(err) {
|
|
|
|
|
s.data = State{Version: stateVersion, Node: s.node, Allocations: []Allocation{}}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("read state: %w", err)
|
|
|
|
|
}
|
|
|
|
|
if err := json.Unmarshal(b, &s.data); err != nil {
|
|
|
|
|
return fmt.Errorf("parse state: %w", err)
|
|
|
|
|
}
|
|
|
|
|
if s.data.Version != stateVersion {
|
|
|
|
|
return fmt.Errorf("state version %d, want %d", s.data.Version, stateVersion)
|
|
|
|
|
}
|
|
|
|
|
if s.data.Allocations == nil {
|
|
|
|
|
s.data.Allocations = []Allocation{}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// flushLocked writes the in-memory state to disk durably:
|
|
|
|
|
//
|
|
|
|
|
// 1. write to <path>.tmp
|
|
|
|
|
// 2. fsync(tmpfd)
|
|
|
|
|
// 3. rename to <path>
|
|
|
|
|
// 4. fsync(parent dir) — required so the rename survives power loss.
|
|
|
|
|
//
|
|
|
|
|
// Caller MUST hold s.mu.
|
|
|
|
|
func (s *Store) flushLocked() error {
|
|
|
|
|
b, err := json.MarshalIndent(s.data, "", " ")
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("marshal state: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tmp := s.path + ".tmp"
|
|
|
|
|
f, err := os.OpenFile(tmp, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o600)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("open tmp: %w", err)
|
|
|
|
|
}
|
|
|
|
|
if _, err := f.Write(b); err != nil {
|
|
|
|
|
f.Close()
|
|
|
|
|
os.Remove(tmp)
|
|
|
|
|
return fmt.Errorf("write tmp: %w", err)
|
|
|
|
|
}
|
|
|
|
|
if err := f.Sync(); err != nil {
|
|
|
|
|
f.Close()
|
|
|
|
|
os.Remove(tmp)
|
|
|
|
|
return fmt.Errorf("fsync tmp: %w", err)
|
|
|
|
|
}
|
|
|
|
|
if err := f.Close(); err != nil {
|
|
|
|
|
os.Remove(tmp)
|
|
|
|
|
return fmt.Errorf("close tmp: %w", err)
|
|
|
|
|
}
|
|
|
|
|
if err := os.Rename(tmp, s.path); err != nil {
|
|
|
|
|
os.Remove(tmp)
|
|
|
|
|
return fmt.Errorf("rename: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
dir, err := os.Open(filepath.Dir(s.path))
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("open parent: %w", err)
|
|
|
|
|
}
|
|
|
|
|
defer dir.Close()
|
|
|
|
|
if err := dir.Sync(); err != nil {
|
|
|
|
|
return fmt.Errorf("fsync parent: %w", err)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Get returns the entry for containerID, ok=false if absent. Returned value
|
|
|
|
|
// is a copy; mutations do not affect store state.
|
|
|
|
|
func (s *Store) Get(containerID string) (Allocation, bool) {
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
for _, a := range s.data.Allocations {
|
|
|
|
|
if a.ContainerID == containerID {
|
|
|
|
|
return a, true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return Allocation{}, false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Upsert inserts or replaces the entry for a.ContainerID and flushes.
|
|
|
|
|
func (s *Store) Upsert(a Allocation) error {
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
for i, ex := range s.data.Allocations {
|
|
|
|
|
if ex.ContainerID == a.ContainerID {
|
|
|
|
|
s.data.Allocations[i] = a
|
|
|
|
|
return s.flushLocked()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
s.data.Allocations = append(s.data.Allocations, a)
|
|
|
|
|
return s.flushLocked()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Delete removes the entry for containerID (no-op if absent) and flushes.
|
|
|
|
|
func (s *Store) Delete(containerID string) error {
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
for i, a := range s.data.Allocations {
|
|
|
|
|
if a.ContainerID == containerID {
|
|
|
|
|
s.data.Allocations = append(s.data.Allocations[:i], s.data.Allocations[i+1:]...)
|
|
|
|
|
return s.flushLocked()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Snapshot returns a defensive copy of all allocations.
|
|
|
|
|
func (s *Store) Snapshot() []Allocation {
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
out := make([]Allocation, len(s.data.Allocations))
|
|
|
|
|
copy(out, s.data.Allocations)
|
|
|
|
|
return out
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// PendingContainerIDs returns container IDs whose entries are State==pending.
|
|
|
|
|
// Used by agent startup GC.
|
|
|
|
|
func (s *Store) PendingContainerIDs() []string {
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
var out []string
|
|
|
|
|
for _, a := range s.data.Allocations {
|
|
|
|
|
if a.State == StatePending {
|
|
|
|
|
out = append(out, a.ContainerID)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return out
|
|
|
|
|
}
|