diff --git a/go.mod b/go.mod index 43b4eeb..34718ff 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.26 require ( github.com/blang/semver/v4 v4.0.0 github.com/distribution/distribution/v3 v3.0.0 + github.com/fsnotify/fsnotify v1.9.0 github.com/gardener/gardener v1.133.0 github.com/go-logr/logr v1.4.3 github.com/ironcore-dev/gardener-extension-provider-ironcore-metal v0.0.0-20251201164657-4e9433a44917 @@ -38,7 +39,6 @@ require ( github.com/emicklei/go-restful/v3 v3.13.0 // indirect github.com/evanphx/json-patch/v5 v5.9.11 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect - github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/fxamacker/cbor/v2 v2.9.0 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-logr/zapr v1.3.0 // indirect diff --git a/main.go b/main.go index a35a49a..fcfcdfe 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ package main import ( + "context" "flag" "os" @@ -19,6 +20,7 @@ import ( "github.com/cobaltcore-dev/cloud-profile-sync/api/v1alpha1" "github.com/cobaltcore-dev/cloud-profile-sync/controllers" + "github.com/cobaltcore-dev/cloud-profile-sync/util/filewatcher" ) var ( @@ -26,6 +28,8 @@ var ( setupLog = ctrl.Log.WithName("setup") ) +const inClusterCertPath = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" + func init() { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(gardenerv1beta1.AddToScheme(scheme)) @@ -43,33 +47,35 @@ func main() { flag.Parse() ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) - restConfig := getKubeconfigOrDie(kubecontext) - setupLog.Info("loaded kubeconfig", "context", kubecontext, "host", restConfig.Host) + ctx := ctrl.SetupSignalHandler() - mgr, err := ctrl.NewManager(restConfig, ctrl.Options{ - Scheme: scheme, - LeaderElection: false, - }) - if err != nil { - setupLog.Error(err, "unable to start manager") - os.Exit(1) - } + filewatcher.RerunOnFileUpdate(ctx, inClusterCertPath, func(ctx context.Context) { + restConfig := getKubeconfigOrDie(kubecontext) + setupLog.Info("loaded kubeconfig", "context", kubecontext, "host", restConfig.Host) - ctx := ctrl.SetupSignalHandler() + mgr, err := ctrl.NewManager(restConfig, ctrl.Options{ + Scheme: scheme, + LeaderElection: false, + }) + if err != nil { + setupLog.Error(err, "unable to start manager") + os.Exit(1) + } - reconciler := controllers.Reconciler{ - Client: mgr.GetClient(), - } - if err := reconciler.SetupWithManager(mgr); err != nil { - setupLog.Error(err, "problem setting up ManagedCloudProfile reconciler") - os.Exit(1) - } + reconciler := controllers.Reconciler{ + Client: mgr.GetClient(), + } + if err := reconciler.SetupWithManager(mgr); err != nil { + setupLog.Error(err, "problem setting up ManagedCloudProfile reconciler") + os.Exit(1) + } - setupLog.Info("starting manager") - if err := mgr.Start(ctx); err != nil { - setupLog.Error(err, "problem running manager") - os.Exit(1) - } + setupLog.Info("starting manager") + if err := mgr.Start(ctx); err != nil { + setupLog.Error(err, "problem running manager") + os.Exit(1) + } + }) setupLog.Info("received SIGTERM or SIGINT. See you later.") } diff --git a/util/filewatcher/suite_test.go b/util/filewatcher/suite_test.go new file mode 100644 index 0000000..6b343ba --- /dev/null +++ b/util/filewatcher/suite_test.go @@ -0,0 +1,16 @@ +// SPDX-FileCopyrightText: 2025 SAP SE or an SAP affiliate company +// SPDX-License-Identifier: Apache-2.0 + +package filewatcher_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestFilewatcher(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Filewatcher Suite") +} diff --git a/util/filewatcher/watcher.go b/util/filewatcher/watcher.go new file mode 100644 index 0000000..71ac1dd --- /dev/null +++ b/util/filewatcher/watcher.go @@ -0,0 +1,108 @@ +// SPDX-FileCopyrightText: 2025 SAP SE or an SAP affiliate company +// SPDX-License-Identifier: Apache-2.0 + +package filewatcher + +import ( + "context" + "path/filepath" + "slices" + "time" + + "github.com/fsnotify/fsnotify" + ctrl "sigs.k8s.io/controller-runtime" +) + +const fileEventsDebouncePeriod = 200 * time.Millisecond + +var log = ctrl.Log.WithName("filewatcher") + +// RerunOnFileUpdate watches the file at path and calls runFunc on each start +// and whenever the file changes. runFunc receives a child context that is +// cancelled when a file change is detected; the outer ctx cancels the loop. +func RerunOnFileUpdate(ctx context.Context, path string, runFunc func(ctx context.Context)) { + log.Info("watching file", "path", path) + + watcher, err := fsnotify.NewWatcher() + if err != nil { + log.Error(err, "unable to create file watcher; hot-reload disabled") + runFunc(ctx) + + return + } + + defer watcher.Close() + // Watch the parent directory to handle Kubernetes atomic Secret volume + // updates which swap a "..data" symlink rather than writing the file directly. + if err := watcher.Add(filepath.Dir(path)); err != nil { + log.Error(err, "unable to watch file directory; hot-reload disabled") + runFunc(ctx) + + return + } + + reloadCh := watchFileEvents(watcher, filepath.Base(path)) + + for { + runCtx, runCancel := context.WithCancel(ctx) + + go func() { + select { + case <-runCtx.Done(): + case <-reloadCh: + log.Info("file changed, restarting") + runCancel() + } + }() + + runFunc(runCtx) + runCancel() + if ctx.Err() != nil { + return + } + + log.Info("restarting after file change", "path", path) + } +} + +// watchFileEvents returns a channel that receives a signal after a 200ms quiet +// period whenever the watched file (or the Kubernetes atomic-writer "..data" +// symlink) changes. The debounce prevents multiple rapid filesystem events +// (e.g. Write + Chmod from a single logical write) from triggering redundant +// reloads. The channel has a buffer of 1: if a reload is already pending, +// further signals are dropped — safe because the reload always reads the latest +// state of the file. +func watchFileEvents(watcher *fsnotify.Watcher, filename string) <-chan struct{} { + reloadCh := make(chan struct{}, 1) + go func() { + var debounce <-chan time.Time + for { + select { + case err, ok := <-watcher.Errors: + if !ok { + return + } + log.Error(err, "file watcher error") + case event, ok := <-watcher.Events: + if !ok { + return + } + base := filepath.Base(event.Name) + notificationEvents := []fsnotify.Op{fsnotify.Create, fsnotify.Write, fsnotify.Rename} + + if (base == filename || base == "..data") && + slices.Contains(notificationEvents, event.Op) { + debounce = time.After(fileEventsDebouncePeriod) + } + case <-debounce: + debounce = nil + select { + case reloadCh <- struct{}{}: + default: // reload already pending, drop + } + } + } + }() + + return reloadCh +} diff --git a/util/filewatcher/watcher_symlink_test.go b/util/filewatcher/watcher_symlink_test.go new file mode 100644 index 0000000..a1f9552 --- /dev/null +++ b/util/filewatcher/watcher_symlink_test.go @@ -0,0 +1,72 @@ +// SPDX-FileCopyrightText: 2025 SAP SE or an SAP affiliate company +// SPDX-License-Identifier: Apache-2.0 + +//go:build linux + +package filewatcher_test + +import ( + "context" + "os" + "path/filepath" + "sync/atomic" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/cobaltcore-dev/cloud-profile-sync/filewatcher" +) + +var _ = Describe("RunWithFileUpdate (symlink)", func() { + It("restarts when the watched file is updated via symlink swap (Kubernetes atomic write)", func() { + // Simulate the Kubernetes atomic Secret/ConfigMap volume update pattern: + // a "..data" symlink in the watched directory is replaced atomically, + // which fsnotify (via inotify on Linux) reports as a Rename event on "..data". + // This pattern does not emit directory-level events on macOS (kqueue). + dir, err := os.MkdirTemp("", "watched-symlink-*") + Expect(err).NotTo(HaveOccurred()) + defer os.RemoveAll(dir) + + // Create two versioned data directories and a token file in each. + dataDir1 := filepath.Join(dir, "..2024_01_01") + dataDir2 := filepath.Join(dir, "..2024_01_02") + Expect(os.Mkdir(dataDir1, 0700)).To(Succeed()) + Expect(os.Mkdir(dataDir2, 0700)).To(Succeed()) + Expect(os.WriteFile(filepath.Join(dataDir1, "token"), []byte("token-v1"), 0600)).To(Succeed()) + Expect(os.WriteFile(filepath.Join(dataDir2, "token"), []byte("token-v2"), 0600)).To(Succeed()) + + // Point "..data" symlink at the first data directory, then expose + // "token" as a symlink into it — matching the Kubernetes projection layout. + dataSymlink := filepath.Join(dir, "..data") + Expect(os.Symlink(dataDir1, dataSymlink)).To(Succeed()) + Expect(os.Symlink(filepath.Join(dataSymlink, "token"), filepath.Join(dir, "token"))).To(Succeed()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var startCount atomic.Int32 + runFunc := func(runCtx context.Context) { + startCount.Add(1) + <-runCtx.Done() + } + + done := make(chan struct{}) + go func() { + defer close(done) + filewatcher.RerunOnFileUpdate(ctx, filepath.Join(dir, "token"), runFunc) + }() + + Eventually(startCount.Load, 2*time.Second, 10*time.Millisecond).Should(BeNumerically("==", 1)) + + // Swap "..data" to the new directory atomically (rename a tmp symlink over it). + tmpSymlink := filepath.Join(dir, "..data_tmp") + Expect(os.Symlink(dataDir2, tmpSymlink)).To(Succeed()) + Expect(os.Rename(tmpSymlink, dataSymlink)).To(Succeed()) + + Eventually(startCount.Load, 2*time.Second, 10*time.Millisecond).Should(BeNumerically("==", 2)) + + cancel() + Eventually(done, 2*time.Second).Should(BeClosed()) + }) +}) diff --git a/util/filewatcher/watcher_test.go b/util/filewatcher/watcher_test.go new file mode 100644 index 0000000..000e2fb --- /dev/null +++ b/util/filewatcher/watcher_test.go @@ -0,0 +1,57 @@ +// SPDX-FileCopyrightText: 2025 SAP SE or an SAP affiliate company +// SPDX-License-Identifier: Apache-2.0 + +package filewatcher_test + +import ( + "context" + "os" + "sync/atomic" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/cobaltcore-dev/cloud-profile-sync/filewatcher" +) + +var _ = Describe("RunWithFileUpdate", func() { + It("restarts when the watched file is updated", func() { + f, err := os.CreateTemp("", "watched-*.yaml") + Expect(err).NotTo(HaveOccurred()) + defer os.Remove(f.Name()) + _, err = f.WriteString("version: 1") + Expect(err).NotTo(HaveOccurred()) + f.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var startCount atomic.Int32 + + // runFunc blocks until its context is cancelled, simulating a running manager. + runFunc := func(runCtx context.Context) { + startCount.Add(1) + <-runCtx.Done() + } + + done := make(chan struct{}) + go func() { + defer close(done) + filewatcher.RerunOnFileUpdate(ctx, f.Name(), runFunc) + }() + + // Wait for the first start. + Eventually(startCount.Load, 2*time.Second, 10*time.Millisecond).Should(BeNumerically("==", 1)) + + // Update the file to trigger a reload. + Expect(os.WriteFile(f.Name(), []byte("version: 2"), 0600)).To(Succeed()) + + // The debounce period is 200ms; give it a comfortable margin. + Eventually(startCount.Load, 2*time.Second, 10*time.Millisecond).Should(BeNumerically("==", 2)) + + // Cancel the outer context and confirm the loop exits. + cancel() + Eventually(done, 2*time.Second).Should(BeClosed()) + }) +})