Files
gyurix 77f80dea1b
continuous-integration/drone/push Build is passing
feat: add idempotent route checks and container network routes
- Make AddRouteInContainer idempotent by checking existing routes and handling "File exists" errors
- Add loop in firewall reconciler to add routes for containers to reach other networks
- Update iptables checks to include port for better rule distinction
2026-06-16 09:42:47 +02:00

373 lines
14 KiB
Go

package docker
import (
"context"
"fmt"
"net"
"os/exec"
"regexp"
"strings"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/client"
"firewall_containers/network-go/config"
"firewall_containers/network-go/logger"
)
// DockerAPI defines the interface for Docker operations, enabling mock implementations for testing
type DockerAPI interface {
Close() error
EnsureNetwork(ctx context.Context, netCfg config.NetworkConfig) error
RemoveNetwork(ctx context.Context, networkName string) error
ConnectContainer(ctx context.Context, containerName, networkName, ip string) error
DisconnectContainer(ctx context.Context, containerName, networkName string) error
InspectContainer(ctx context.Context, containerName string) (*types.ContainerJSON, error)
WaitForContainerRunning(ctx context.Context, containerName string, timeout time.Duration) error
GetContainerPID(ctx context.Context, containerName string) (int, error)
AddRouteInContainer(ctx context.Context, containerName, network, gateway string) error
FindContainerName(ctx context.Context, name, selector string) (string, error)
IsConnected(ctx context.Context, containerName, networkName, expectedIP string) bool
}
// Client wraps the Docker SDK client
type Client struct {
cli *client.Client
}
// Ensure Client implements DockerAPI
var _ DockerAPI = (*Client)(nil)
// NewClient creates a new Docker client
func NewClient() (*Client, error) {
logger.Info("DOCKER: creating Docker client (using DOCKER_HOST env)")
cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
return nil, fmt.Errorf("failed to create Docker client: %w", err)
}
ping, err := cli.Ping(context.Background())
if err != nil {
logger.Warn("DOCKER: Docker daemon ping failed: %v", err)
} else {
logger.Info("DOCKER: connected to Docker daemon (API version=%s, OS=%s)", ping.APIVersion, ping.OSType)
}
return &Client{cli: cli}, nil
}
// Close closes the Docker client
func (c *Client) Close() error {
logger.Debug("DOCKER: closing Docker client")
return c.cli.Close()
}
// EnsureNetwork creates a Docker network if it does not already exist
func (c *Client) EnsureNetwork(ctx context.Context, netCfg config.NetworkConfig) error {
logger.Debug("DOCKER: checking if network %q exists", netCfg.NetworkName)
existingNetworks, err := c.cli.NetworkList(ctx, network.ListOptions{
Filters: filters.NewArgs(filters.Arg("name", netCfg.NetworkName)),
})
if err != nil {
return fmt.Errorf("failed to list networks: %w", err)
}
for _, n := range existingNetworks {
if n.Name == netCfg.NetworkName {
logger.Debug("DOCKER: network %q already exists", netCfg.NetworkName)
return nil
}
}
_, ipNet, err := net.ParseCIDR(netCfg.Subnet)
if err != nil {
return fmt.Errorf("failed to parse subnet %s: %w", netCfg.Subnet, err)
}
gatewayIP := net.ParseIP(netCfg.Gateway)
if gatewayIP == nil {
return fmt.Errorf("failed to parse gateway IP %s", netCfg.Gateway)
}
createOpts := network.CreateOptions{
Driver: "bridge",
IPAM: &network.IPAM{
Config: []network.IPAMConfig{
{
Subnet: ipNet.String(),
Gateway: gatewayIP.String(),
},
},
},
Labels: map[string]string{
"managed-by": "firewall-network-go",
},
Attachable: true,
}
logger.Info("DOCKER: creating network %q (subnet=%s, gateway=%s)", netCfg.NetworkName, netCfg.Subnet, netCfg.Gateway)
resp, err := c.cli.NetworkCreate(ctx, netCfg.NetworkName, createOpts)
if err != nil {
return fmt.Errorf("failed to create network %s: %w", netCfg.NetworkName, err)
}
logger.Info("DOCKER: network %q created (ID=%s)", netCfg.NetworkName, resp.ID)
return nil
}
// RemoveNetwork removes a Docker network
func (c *Client) RemoveNetwork(ctx context.Context, networkName string) error {
logger.Info("DOCKER: removing network %q", networkName)
err := c.cli.NetworkRemove(ctx, networkName)
if err != nil {
return fmt.Errorf("failed to remove network %s: %w", networkName, err)
}
logger.Info("DOCKER: network %q removed", networkName)
return nil
}
// ConnectContainer connects a container to a network with a specific IP
func (c *Client) ConnectContainer(ctx context.Context, containerName, networkName, ip string) error {
logger.Info("DOCKER: connecting container %q to network %q with IP %s", containerName, networkName, ip)
endpointSettings := &network.EndpointSettings{
IPAMConfig: &network.EndpointIPAMConfig{
IPv4Address: ip,
},
}
err := c.cli.NetworkConnect(ctx, networkName, containerName, endpointSettings)
if err != nil {
// "endpoint with name ... already exists" is expected on re-reconciliation
if strings.Contains(err.Error(), "already exists") {
logger.Info("DOCKER: container %q already connected to network %q", containerName, networkName)
return nil
}
return fmt.Errorf("failed to connect container %s to network %s: %w", containerName, networkName, err)
}
logger.Info("DOCKER: container %q connected to network %q with IP %s", containerName, networkName, ip)
return nil
}
// DisconnectContainer disconnects a container from a network
func (c *Client) DisconnectContainer(ctx context.Context, containerName, networkName string) error {
logger.Info("DOCKER: disconnecting container %q from network %q", containerName, networkName)
err := c.cli.NetworkDisconnect(ctx, networkName, containerName, true)
if err != nil {
return fmt.Errorf("failed to disconnect container %s from network %s: %w", containerName, networkName, err)
}
logger.Info("DOCKER: container %q disconnected from network %q", containerName, networkName)
return nil
}
// InspectContainer returns the container's details
func (c *Client) InspectContainer(ctx context.Context, containerName string) (*types.ContainerJSON, error) {
logger.Debug("DOCKER: inspecting container %q", containerName)
container, err := c.cli.ContainerInspect(ctx, containerName)
if err != nil {
return nil, fmt.Errorf("failed to inspect container %s: %w", containerName, err)
}
logger.Debug("DOCKER: container %q inspected (status=%s, PID=%d)",
containerName, container.State.Status, container.State.Pid)
return &container, nil
}
// WaitForContainerRunning waits until a container is in running state, with a timeout
func (c *Client) WaitForContainerRunning(ctx context.Context, containerName string, timeout time.Duration) error {
logger.Info("DOCKER: waiting for container %q to be running (timeout=%s)", containerName, timeout)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return fmt.Errorf("timeout waiting for container %s to be running", containerName)
case <-ticker.C:
container, err := c.cli.ContainerInspect(ctx, containerName)
if err != nil {
logger.Debug("DOCKER: container %q inspect failed (not ready yet): %v", containerName, err)
continue
}
if container.State != nil && container.State.Running {
logger.Info("DOCKER: container %q is now running", containerName)
return nil
}
logger.Debug("DOCKER: container %q status=%s, not running yet", containerName, container.State.Status)
}
}
}
// GetContainerPID returns the PID of a container for nsenter operations
func (c *Client) GetContainerPID(ctx context.Context, containerName string) (int, error) {
logger.Info("DOCKER: getting PID for container %q", containerName)
cont, err := c.cli.ContainerInspect(ctx, containerName)
if err != nil {
return 0, fmt.Errorf("failed to inspect container %s: %w", containerName, err)
}
if cont.State == nil || !cont.State.Running {
return 0, fmt.Errorf("container %s is not running", containerName)
}
logger.Info("DOCKER: container %q PID=%d", containerName, cont.State.Pid)
return cont.State.Pid, nil
}
// AddRouteInContainer adds a route inside a container's network namespace using nsenter.
// Idempotent: checks if route already exists before adding.
func (c *Client) AddRouteInContainer(ctx context.Context, containerName, network, gateway string) error {
pid, err := c.GetContainerPID(ctx, containerName)
if err != nil {
return fmt.Errorf("failed to get PID for container %s: %w", containerName, err)
}
// First check if the route already exists
checkArgs := []string{
"-t", fmt.Sprintf("%d", pid),
"-n", "--",
"ip", "route", "show", network,
}
checkCmd := exec.Command("nsenter", checkArgs...)
checkOutput, _ := checkCmd.CombinedOutput()
checkStr := strings.TrimSpace(string(checkOutput))
// If the route exists and points to the correct gateway, skip
if checkStr != "" && strings.Contains(checkStr, gateway) {
logger.Debug("DOCKER: route %s via %s already exists in container %q, skipping", network, gateway, containerName)
return nil
}
logger.Info("DOCKER: adding route in container %q (PID=%d): %s via %s", containerName, pid, network, gateway)
args := []string{
"-t", fmt.Sprintf("%d", pid),
"-n", "--",
"ip", "route", "add", network, "via", gateway,
}
cmd := exec.Command("nsenter", args...)
output, err := cmd.CombinedOutput()
if err != nil {
// "File exists" means route already exists (race condition)
if strings.Contains(string(output), "File exists") {
logger.Debug("DOCKER: route %s via %s already exists in container %q (File exists), skipping", network, gateway, containerName)
return nil
}
return fmt.Errorf("failed to add route in container %s: %w\noutput: %s", containerName, err, string(output))
}
logger.Info("DOCKER: route added in container %q: %s via %s", containerName, network, gateway)
logger.Debug("DOCKER: nsenter output: %s", strings.TrimSpace(string(output)))
return nil
}
// FindContainerName attempts to find a running container by name or selector.
// First tries exact name match, then exact selector, then prefix matching
// (matching the old shell script's grep $D"-" behavior).
func (c *Client) FindContainerName(ctx context.Context, name, selector string) (string, error) {
logger.Info("DOCKER: finding container: name=%q selector=%q", name, selector)
// Try exact name match using ContainerList with a name filter
// Docker API name filter does an exact match by default
containers, err := c.cli.ContainerList(ctx, container.ListOptions{
Filters: filters.NewArgs(
filters.Arg("name", "^/?"+regexp.QuoteMeta(name)+"$"),
filters.Arg("status", "running"),
),
})
if err == nil && len(containers) > 0 {
cName := strings.TrimPrefix(containers[0].Names[0], "/")
logger.Info("DOCKER: found container by exact name match: %q", cName)
return cName, nil
}
// Try exact selector match
if selector != "" && selector != name {
logger.Debug("DOCKER: trying selector match: %q", selector)
containers, err = c.cli.ContainerList(ctx, container.ListOptions{
Filters: filters.NewArgs(
filters.Arg("name", "^/?"+regexp.QuoteMeta(selector)+"$"),
filters.Arg("status", "running"),
),
})
if err == nil && len(containers) > 0 {
cName := strings.TrimPrefix(containers[0].Names[0], "/")
logger.Info("DOCKER: found container by selector match: %q", cName)
return cName, nil
}
}
// Try prefix matching on both name and selector (shell script behavior: grep $D"-")
candidates := []string{name, selector}
for _, candidate := range candidates {
if candidate == "" {
continue
}
// Extract prefix before first dash if present
prefix := candidate
if strings.Contains(candidate, "-") {
prefix = candidate[:strings.Index(candidate, "-")]
}
logger.Debug("DOCKER: trying prefix match: candidate=%q prefix=%q", candidate, prefix)
containers, err = c.cli.ContainerList(ctx, container.ListOptions{
Filters: filters.NewArgs(
filters.Arg("name", prefix+"-"),
filters.Arg("status", "running"),
),
})
if err != nil {
logger.Debug("DOCKER: prefix list failed: %v", err)
continue
}
for _, c := range containers {
for _, cName := range c.Names {
cName = strings.TrimPrefix(cName, "/")
logger.Info("DOCKER: found container by prefix match %q-: %q", prefix, cName)
return cName, nil
}
}
}
return "", fmt.Errorf("no running container found matching name=%q selector=%q", name, selector)
}
// IsConnected checks if a container is already connected to a network with the expected IP.
// Returns true if the connection already exists with the correct IP (stateful check).
func (c *Client) IsConnected(ctx context.Context, containerName, networkName, expectedIP string) bool {
logger.Debug("DOCKER: checking if container %q is connected to %q with IP %s", containerName, networkName, expectedIP)
containers, err := c.cli.ContainerList(ctx, container.ListOptions{
Filters: filters.NewArgs(
filters.Arg("name", "^/?"+regexp.QuoteMeta(containerName)+"$"),
filters.Arg("status", "running"),
),
})
if err != nil || len(containers) == 0 {
logger.Debug("DOCKER: container %q not found or error: %v", containerName, err)
return false
}
// Check the network settings for the expected IP
inspect, err := c.cli.ContainerInspect(ctx, containerName)
if err != nil {
logger.Debug("DOCKER: inspect failed for %q: %v", containerName, err)
return false
}
if inspect.NetworkSettings != nil && inspect.NetworkSettings.Networks != nil {
if netSettings, ok := inspect.NetworkSettings.Networks[networkName]; ok {
currentIP := netSettings.IPAddress
logger.Debug("DOCKER: container %q on network %q has IP=%s (expected=%s)", containerName, networkName, currentIP, expectedIP)
if currentIP == expectedIP {
logger.Debug("DOCKER: container %q already correctly connected to %q with IP %s", containerName, networkName, expectedIP)
return true
}
}
}
logger.Debug("DOCKER: container %q not yet correctly connected to %q", containerName, networkName)
return false
}