Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.26
require (
github.com/blang/semver/v4 v4.0.0
github.com/distribution/distribution/v3 v3.0.0
github.com/fsnotify/fsnotify v1.9.0
github.com/gardener/gardener v1.133.0
github.com/go-logr/logr v1.4.3
github.com/ironcore-dev/gardener-extension-provider-ironcore-metal v0.0.0-20251201164657-4e9433a44917
Expand Down Expand Up @@ -38,7 +39,6 @@ require (
github.com/emicklei/go-restful/v3 v3.13.0 // indirect
github.com/evanphx/json-patch/v5 v5.9.11 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.9.0 // indirect
github.com/fxamacker/cbor/v2 v2.9.0 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-logr/zapr v1.3.0 // indirect
Expand Down
52 changes: 29 additions & 23 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package main

import (
"context"
"flag"
"os"

Expand All @@ -19,13 +20,16 @@ import (

"github.com/cobaltcore-dev/cloud-profile-sync/api/v1alpha1"
"github.com/cobaltcore-dev/cloud-profile-sync/controllers"
"github.com/cobaltcore-dev/cloud-profile-sync/util/filewatcher"
)

var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
)

const inClusterCertPath = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(gardenerv1beta1.AddToScheme(scheme))
Expand All @@ -43,33 +47,35 @@ func main() {
flag.Parse()

ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))
restConfig := getKubeconfigOrDie(kubecontext)
setupLog.Info("loaded kubeconfig", "context", kubecontext, "host", restConfig.Host)
ctx := ctrl.SetupSignalHandler()

mgr, err := ctrl.NewManager(restConfig, ctrl.Options{
Scheme: scheme,
LeaderElection: false,
})
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}
filewatcher.RerunOnFileUpdate(ctx, inClusterCertPath, func(ctx context.Context) {
restConfig := getKubeconfigOrDie(kubecontext)
setupLog.Info("loaded kubeconfig", "context", kubecontext, "host", restConfig.Host)

ctx := ctrl.SetupSignalHandler()
mgr, err := ctrl.NewManager(restConfig, ctrl.Options{
Scheme: scheme,
LeaderElection: false,
})
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}
Comment thread
adziauho marked this conversation as resolved.

reconciler := controllers.Reconciler{
Client: mgr.GetClient(),
}
if err := reconciler.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "problem setting up ManagedCloudProfile reconciler")
os.Exit(1)
}
reconciler := controllers.Reconciler{
Client: mgr.GetClient(),
}
if err := reconciler.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "problem setting up ManagedCloudProfile reconciler")
os.Exit(1)
}
Comment thread
adziauho marked this conversation as resolved.

setupLog.Info("starting manager")
if err := mgr.Start(ctx); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
setupLog.Info("starting manager")
if err := mgr.Start(ctx); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
Comment thread
adziauho marked this conversation as resolved.
})
setupLog.Info("received SIGTERM or SIGINT. See you later.")
}

Expand Down
16 changes: 16 additions & 0 deletions util/filewatcher/suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// SPDX-FileCopyrightText: 2025 SAP SE or an SAP affiliate company
// SPDX-License-Identifier: Apache-2.0

package filewatcher_test

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestFilewatcher(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Filewatcher Suite")
}
108 changes: 108 additions & 0 deletions util/filewatcher/watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// SPDX-FileCopyrightText: 2025 SAP SE or an SAP affiliate company
// SPDX-License-Identifier: Apache-2.0

package filewatcher

import (
"context"
"path/filepath"
"slices"
"time"

"github.com/fsnotify/fsnotify"
ctrl "sigs.k8s.io/controller-runtime"
)

const fileEventsDebouncePeriod = 200 * time.Millisecond

var log = ctrl.Log.WithName("filewatcher")

// RerunOnFileUpdate watches the file at path and calls runFunc on each start
// and whenever the file changes. runFunc receives a child context that is
// cancelled when a file change is detected; the outer ctx cancels the loop.
func RerunOnFileUpdate(ctx context.Context, path string, runFunc func(ctx context.Context)) {
log.Info("watching file", "path", path)

watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Error(err, "unable to create file watcher; hot-reload disabled")
runFunc(ctx)

return
}

defer watcher.Close()
// Watch the parent directory to handle Kubernetes atomic Secret volume
// updates which swap a "..data" symlink rather than writing the file directly.
if err := watcher.Add(filepath.Dir(path)); err != nil {
log.Error(err, "unable to watch file directory; hot-reload disabled")
runFunc(ctx)

return
}

reloadCh := watchFileEvents(watcher, filepath.Base(path))

for {
runCtx, runCancel := context.WithCancel(ctx)

go func() {
select {
case <-runCtx.Done():
case <-reloadCh:
log.Info("file changed, restarting")
runCancel()
}
}()

runFunc(runCtx)
runCancel()
if ctx.Err() != nil {
return
}

log.Info("restarting after file change", "path", path)
}
Comment on lines +46 to +65
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Only restart when a reload actually fired.

Lines 51-58 currently loop on any runFunc return, not just file-change cancellations. If runFunc exits because startup failed or it shut down normally, this code immediately relaunches it and logs it as a file-change restart, which can turn a real failure into a tight restart loop.

Suggested fix
 	for {
 		runCtx, runCancel := context.WithCancel(ctx)
+		restartedByReload := make(chan struct{}, 1)
 		go func() {
 			select {
 			case <-runCtx.Done():
 			case <-reloadCh:
 				log.Info("file changed, restarting")
+				restartedByReload <- struct{}{}
 				runCancel()
 			}
 		}()
 
 		runFunc(runCtx)
 		runCancel()
 
 		if ctx.Err() != nil {
 			return
 		}
+
+		select {
+		case <-restartedByReload:
+		default:
+			return
+		}
+
 		log.Info("restarting after file change", "path", path)
 	}
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for {
runCtx, runCancel := context.WithCancel(ctx)
go func() {
select {
case <-runCtx.Done():
case <-reloadCh:
log.Info("file changed, restarting")
runCancel()
}
}()
runFunc(runCtx)
runCancel()
if ctx.Err() != nil {
return
}
log.Info("restarting after file change", "path", path)
}
for {
runCtx, runCancel := context.WithCancel(ctx)
restartedByReload := make(chan struct{}, 1)
go func() {
select {
case <-runCtx.Done():
case <-reloadCh:
log.Info("file changed, restarting")
restartedByReload <- struct{}{}
runCancel()
}
}()
runFunc(runCtx)
runCancel()
if ctx.Err() != nil {
return
}
select {
case <-restartedByReload:
default:
return
}
log.Info("restarting after file change", "path", path)
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@filewatcher/watcher.go` around lines 40 - 58, The loop currently restarts
unconditionally after runFunc returns; change it to only restart when the
reloadCh actually fired. Inside the loop, create a fresh one-shot channel or
flag (e.g., reloadFired channel) and have the goroutine that listens to reloadCh
close that channel (or set the flag) just before calling runCancel(); after
runFunc(runCtx) returns, check the reloadFired channel non-blockingly (or the
flag) and only continue the loop and log the "restarting after file change" when
that reload indicator was triggered; otherwise return to avoid relaunching on
normal exit or startup failure. Use the existing symbols runFunc, runCtx,
reloadCh, runCancel, ctx, and path to implement this.

}

// watchFileEvents returns a channel that receives a signal after a 200ms quiet
// period whenever the watched file (or the Kubernetes atomic-writer "..data"
// symlink) changes. The debounce prevents multiple rapid filesystem events
// (e.g. Write + Chmod from a single logical write) from triggering redundant
// reloads. The channel has a buffer of 1: if a reload is already pending,
// further signals are dropped — safe because the reload always reads the latest
// state of the file.
func watchFileEvents(watcher *fsnotify.Watcher, filename string) <-chan struct{} {
reloadCh := make(chan struct{}, 1)
go func() {
var debounce <-chan time.Time
for {
select {
case err, ok := <-watcher.Errors:
if !ok {
return
}
log.Error(err, "file watcher error")
case event, ok := <-watcher.Events:
if !ok {
return
}
base := filepath.Base(event.Name)
notificationEvents := []fsnotify.Op{fsnotify.Create, fsnotify.Write, fsnotify.Rename}

if (base == filename || base == "..data") &&
slices.Contains(notificationEvents, event.Op) {
debounce = time.After(fileEventsDebouncePeriod)
}
case <-debounce:
debounce = nil
select {
case reloadCh <- struct{}{}:
default: // reload already pending, drop
}
}
}
}()

return reloadCh
}
72 changes: 72 additions & 0 deletions util/filewatcher/watcher_symlink_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// SPDX-FileCopyrightText: 2025 SAP SE or an SAP affiliate company
// SPDX-License-Identifier: Apache-2.0

//go:build linux

package filewatcher_test

import (
"context"
"os"
"path/filepath"
"sync/atomic"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/cobaltcore-dev/cloud-profile-sync/filewatcher"
)

var _ = Describe("RunWithFileUpdate (symlink)", func() {
It("restarts when the watched file is updated via symlink swap (Kubernetes atomic write)", func() {
// Simulate the Kubernetes atomic Secret/ConfigMap volume update pattern:
// a "..data" symlink in the watched directory is replaced atomically,
// which fsnotify (via inotify on Linux) reports as a Rename event on "..data".
// This pattern does not emit directory-level events on macOS (kqueue).
dir, err := os.MkdirTemp("", "watched-symlink-*")
Expect(err).NotTo(HaveOccurred())
defer os.RemoveAll(dir)

// Create two versioned data directories and a token file in each.
dataDir1 := filepath.Join(dir, "..2024_01_01")
dataDir2 := filepath.Join(dir, "..2024_01_02")
Expect(os.Mkdir(dataDir1, 0700)).To(Succeed())
Expect(os.Mkdir(dataDir2, 0700)).To(Succeed())
Expect(os.WriteFile(filepath.Join(dataDir1, "token"), []byte("token-v1"), 0600)).To(Succeed())
Expect(os.WriteFile(filepath.Join(dataDir2, "token"), []byte("token-v2"), 0600)).To(Succeed())

// Point "..data" symlink at the first data directory, then expose
// "token" as a symlink into it — matching the Kubernetes projection layout.
dataSymlink := filepath.Join(dir, "..data")
Expect(os.Symlink(dataDir1, dataSymlink)).To(Succeed())
Expect(os.Symlink(filepath.Join(dataSymlink, "token"), filepath.Join(dir, "token"))).To(Succeed())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var startCount atomic.Int32
runFunc := func(runCtx context.Context) {
startCount.Add(1)
<-runCtx.Done()
}

done := make(chan struct{})
go func() {
defer close(done)
filewatcher.RerunOnFileUpdate(ctx, filepath.Join(dir, "token"), runFunc)
}()

Eventually(startCount.Load, 2*time.Second, 10*time.Millisecond).Should(BeNumerically("==", 1))

// Swap "..data" to the new directory atomically (rename a tmp symlink over it).
tmpSymlink := filepath.Join(dir, "..data_tmp")
Expect(os.Symlink(dataDir2, tmpSymlink)).To(Succeed())
Expect(os.Rename(tmpSymlink, dataSymlink)).To(Succeed())

Eventually(startCount.Load, 2*time.Second, 10*time.Millisecond).Should(BeNumerically("==", 2))

cancel()
Eventually(done, 2*time.Second).Should(BeClosed())
})
})
57 changes: 57 additions & 0 deletions util/filewatcher/watcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// SPDX-FileCopyrightText: 2025 SAP SE or an SAP affiliate company
// SPDX-License-Identifier: Apache-2.0

package filewatcher_test

import (
"context"
"os"
"sync/atomic"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/cobaltcore-dev/cloud-profile-sync/filewatcher"
)

var _ = Describe("RunWithFileUpdate", func() {
It("restarts when the watched file is updated", func() {
f, err := os.CreateTemp("", "watched-*.yaml")
Expect(err).NotTo(HaveOccurred())
defer os.Remove(f.Name())
_, err = f.WriteString("version: 1")
Expect(err).NotTo(HaveOccurred())
f.Close()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var startCount atomic.Int32

// runFunc blocks until its context is cancelled, simulating a running manager.
runFunc := func(runCtx context.Context) {
startCount.Add(1)
<-runCtx.Done()
}

done := make(chan struct{})
go func() {
defer close(done)
filewatcher.RerunOnFileUpdate(ctx, f.Name(), runFunc)
}()

// Wait for the first start.
Eventually(startCount.Load, 2*time.Second, 10*time.Millisecond).Should(BeNumerically("==", 1))

// Update the file to trigger a reload.
Expect(os.WriteFile(f.Name(), []byte("version: 2"), 0600)).To(Succeed())

// The debounce period is 200ms; give it a comfortable margin.
Eventually(startCount.Load, 2*time.Second, 10*time.Millisecond).Should(BeNumerically("==", 2))

// Cancel the outer context and confirm the loop exits.
cancel()
Eventually(done, 2*time.Second).Should(BeClosed())
})
})