Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 36 additions & 24 deletions internal/controller/kustomization_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
kerrors "k8s.io/apimachinery/pkg/util/errors"
kuberecorder "k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand All @@ -47,7 +46,7 @@ import (
"github.com/fluxcd/cli-utils/pkg/kstatus/polling/engine"
"github.com/fluxcd/cli-utils/pkg/object"
apiacl "github.com/fluxcd/pkg/apis/acl"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
eventv1 "github.com/fluxcd/pkg/apis/event/v1"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/auth"
authutils "github.com/fluxcd/pkg/auth/utils"
Expand All @@ -59,6 +58,7 @@ import (
runtimeClient "github.com/fluxcd/pkg/runtime/client"
"github.com/fluxcd/pkg/runtime/conditions"
runtimeCtrl "github.com/fluxcd/pkg/runtime/controller"
"github.com/fluxcd/pkg/runtime/events"
"github.com/fluxcd/pkg/runtime/jitter"
"github.com/fluxcd/pkg/runtime/patch"
"github.com/fluxcd/pkg/runtime/statusreaders"
Expand All @@ -85,7 +85,7 @@ import (
// KustomizationReconciler reconciles a Kustomization object
type KustomizationReconciler struct {
client.Client
kuberecorder.EventRecorder
events.EventRecorder
runtimeCtrl.Metrics

// Kubernetes options
Expand Down Expand Up @@ -157,7 +157,8 @@ func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Reques
time.Since(reconcileStart).String(),
obj.Spec.Interval.Duration.String())
log.Info(msg, "revision", obj.Status.LastAttemptedRevision)
r.event(obj, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision, eventv1.EventSeverityInfo, msg,
r.event(obj, nil, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision, eventv1.EventSeverityInfo,
eventv1.ActionReconciled, msg,
map[string]string{
kustomizev1.GroupVersion.Group + "/" + eventv1.MetaCommitStatusKey: eventv1.MetaCommitStatusUpdateValue,
})
Expand All @@ -166,7 +167,7 @@ func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Reques

// Prune managed resources if the object is under deletion.
if !obj.ObjectMeta.DeletionTimestamp.IsZero() {
return r.finalize(ctx, obj)
return r.finalize(ctx, obj, nil)
}

// Add finalizer first if it doesn't exist to avoid the race condition
Expand All @@ -191,7 +192,7 @@ func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Reques
conditions.MarkFalse(obj, meta.ReadyCondition, meta.InvalidCELExpressionReason, "%s", errMsg)
conditions.MarkStalled(obj, meta.InvalidCELExpressionReason, "%s", errMsg)
obj.Status.ObservedGeneration = obj.Generation
r.event(obj, "", "", eventv1.EventSeverityError, errMsg, nil)
r.event(obj, nil, "", "", eventv1.EventSeverityError, eventv1.ActionFailed, errMsg, nil)
return ctrl.Result{}, reconcile.TerminalError(err)
}

Expand All @@ -203,7 +204,7 @@ func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Reques
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FeatureGateDisabledReason, msgFmt, gate)
conditions.MarkStalled(obj, meta.FeatureGateDisabledReason, msgFmt, gate)
log.Error(auth.ErrObjectLevelWorkloadIdentityNotEnabled, msg)
r.event(obj, "", "", eventv1.EventSeverityError, msg, nil)
r.event(obj, nil, "", "", eventv1.EventSeverityError, eventv1.ActionFailed, msg, nil)
return ctrl.Result{}, nil
}

Expand All @@ -221,7 +222,7 @@ func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Reques
if acl.IsAccessDenied(err) {
conditions.MarkFalse(obj, meta.ReadyCondition, apiacl.AccessDeniedReason, "%s", err)
conditions.MarkStalled(obj, apiacl.AccessDeniedReason, "%s", err)
r.event(obj, "", "", eventv1.EventSeverityError, err.Error(), nil)
r.event(obj, artifactSource, "", "", eventv1.EventSeverityError, eventv1.ActionFailed, err.Error(), nil)
return ctrl.Result{}, reconcile.TerminalError(err)
}

Expand All @@ -248,15 +249,15 @@ func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Reques
conditions.MarkFalse(obj, meta.ReadyCondition, meta.InvalidCELExpressionReason, "%s", errMsg)
conditions.MarkStalled(obj, meta.InvalidCELExpressionReason, "%s", errMsg)
obj.Status.ObservedGeneration = obj.Generation
r.event(obj, revision, originRevision, eventv1.EventSeverityError, errMsg, nil)
r.event(obj, artifactSource, revision, originRevision, eventv1.EventSeverityError, eventv1.ActionFailed, errMsg, nil)
return ctrl.Result{}, err
}

// Retry on transient errors.
conditions.MarkFalse(obj, meta.ReadyCondition, meta.DependencyNotReadyReason, "%s", err)
msg := fmt.Sprintf("Dependencies do not meet ready condition, retrying in %s", r.DependencyRequeueInterval.String())
log.Info(msg)
r.event(obj, revision, originRevision, eventv1.EventSeverityInfo, msg, nil)
r.event(obj, artifactSource, revision, originRevision, eventv1.EventSeverityInfo, eventv1.ActionWaiting, msg, nil)
return ctrl.Result{RequeueAfter: r.DependencyRequeueInterval}, nil
}
log.Info("All dependencies are ready, proceeding with reconciliation")
Expand All @@ -280,7 +281,7 @@ func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Reques
meta.HealthCheckCanceledReason,
"New reconciliation triggered by %s/%s/%s", qes.Kind, qes.Namespace, qes.Name)
ctrl.LoggerFrom(ctx).Info("New reconciliation triggered, canceling health checks", "trigger", qes)
r.event(obj, revision, originRevision, eventv1.EventSeverityInfo,
r.event(obj, artifactSource, revision, originRevision, eventv1.EventSeverityInfo, eventv1.ActionProgressing,
fmt.Sprintf("Health checks canceled due to new reconciliation triggered by %s/%s/%s",
qes.Kind, qes.Namespace, qes.Name), nil)

Expand All @@ -298,7 +299,7 @@ func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Reques
obj.GetRetryInterval().String()),
"revision",
revision)
r.event(obj, revision, originRevision, eventv1.EventSeverityError,
r.event(obj, artifactSource, revision, originRevision, eventv1.EventSeverityError, eventv1.ActionFailed,
reconcileErr.Error(), nil)
return ctrl.Result{RequeueAfter: obj.GetRetryInterval()}, nil
}
Expand Down Expand Up @@ -468,7 +469,7 @@ func (r *KustomizationReconciler) reconcile(
}

// Validate and apply resources in stages.
drifted, changeSet, err := r.apply(ctx, resourceManager, obj, revision, originRevision, objects)
drifted, changeSet, err := r.apply(ctx, resourceManager, obj, src, revision, originRevision, objects)
if err != nil {
obj.Status.History.Upsert(checksum, time.Now(), time.Since(reconcileStart), meta.ReconciliationFailedReason, historyMeta)
conditions.MarkFalse(obj, meta.ReadyCondition, meta.ReconciliationFailedReason, "%s", err)
Expand Down Expand Up @@ -496,7 +497,7 @@ func (r *KustomizationReconciler) reconcile(
}

// Run garbage collection for stale resources that do not have pruning disabled.
if _, err := r.prune(ctx, resourceManager, obj, revision, originRevision, staleObjects); err != nil {
if _, err := r.prune(ctx, resourceManager, obj, src, revision, originRevision, staleObjects); err != nil {
obj.Status.History.Upsert(checksum, time.Now(), time.Since(reconcileStart), meta.PruneFailedReason, historyMeta)
conditions.MarkFalse(obj, meta.ReadyCondition, meta.PruneFailedReason, "%s", err)
return err
Expand All @@ -508,6 +509,7 @@ func (r *KustomizationReconciler) reconcile(
resourceManager,
patcher,
obj,
src,
revision,
originRevision,
isNewRevision,
Expand Down Expand Up @@ -831,6 +833,7 @@ func (r *KustomizationReconciler) build(ctx context.Context,
func (r *KustomizationReconciler) apply(ctx context.Context,
manager *ssa.ResourceManager,
obj *kustomizev1.Kustomization,
src sourcev1.Source,
revision string,
originRevision string,
objects []*unstructured.Unstructured) (bool, *ssa.ChangeSet, error) {
Expand Down Expand Up @@ -956,7 +959,7 @@ func (r *KustomizationReconciler) apply(ctx context.Context,
// emit event only if the server-side apply resulted in changes
applyLog := strings.TrimSuffix(changeSetLog.String(), "\n")
if applyLog != "" {
r.event(obj, revision, originRevision, eventv1.EventSeverityInfo, applyLog, nil)
r.event(obj, src, revision, originRevision, eventv1.EventSeverityInfo, eventv1.ActionApplied, applyLog, nil)
}

return applyLog != "", resultSet, nil
Expand All @@ -966,6 +969,7 @@ func (r *KustomizationReconciler) checkHealth(ctx context.Context,
manager *ssa.ResourceManager,
patcher *patch.SerialPatcher,
obj *kustomizev1.Kustomization,
src sourcev1.Source,
revision string,
originRevision string,
isNewRevision bool,
Expand Down Expand Up @@ -1043,7 +1047,7 @@ func (r *KustomizationReconciler) checkHealth(ctx context.Context,
// Emit recovery event if the previous health check failed.
msg := fmt.Sprintf("Health check passed in %s", time.Since(checkStart).String())
if !wasHealthy || (isNewRevision && drifted) {
r.event(obj, revision, originRevision, eventv1.EventSeverityInfo, msg, nil)
r.event(obj, src, revision, originRevision, eventv1.EventSeverityInfo, eventv1.ActionReconciled, msg, nil)
}

conditions.MarkTrue(obj, meta.HealthyCondition, meta.SucceededReason, "%s", msg)
Expand All @@ -1057,6 +1061,7 @@ func (r *KustomizationReconciler) checkHealth(ctx context.Context,
func (r *KustomizationReconciler) prune(ctx context.Context,
manager *ssa.ResourceManager,
obj *kustomizev1.Kustomization,
src sourcev1.Source,
revision string,
originRevision string,
objects []*unstructured.Unstructured) (bool, error) {
Expand All @@ -1074,7 +1079,7 @@ func (r *KustomizationReconciler) prune(ctx context.Context,
// emit event only if the prune operation resulted in changes
if changeSet != nil && len(changeSet.Entries) > 0 {
log.Info(fmt.Sprintf("garbage collection completed: %s", changeSet.String()))
r.event(obj, revision, originRevision, eventv1.EventSeverityInfo, changeSet.String(), nil)
r.event(obj, src, revision, originRevision, eventv1.EventSeverityInfo, eventv1.ActionDeleted, changeSet.String(), nil)
return true, nil
}

Expand Down Expand Up @@ -1112,7 +1117,7 @@ func finalizerShouldDeleteResources(obj *kustomizev1.Kustomization) bool {
// If the service account used for impersonation is no longer available or if a timeout occurs
// while waiting for resources to be terminated, an error is logged and the finalizer is removed.
func (r *KustomizationReconciler) finalize(ctx context.Context,
obj *kustomizev1.Kustomization) (ctrl.Result, error) {
obj *kustomizev1.Kustomization, src sourcev1.Source) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)
if finalizerShouldDeleteResources(obj) {
objects, _ := inventory.List(obj.Status.Inventory)
Expand Down Expand Up @@ -1153,14 +1158,16 @@ func (r *KustomizationReconciler) finalize(ctx context.Context,

changeSet, err := deleteObjects(ctx, obj, resourceManager, objects)
if err != nil {
r.event(obj, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision, eventv1.EventSeverityError, "pruning for deleted resource failed", nil)
r.event(obj, src, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision,
eventv1.EventSeverityError, eventv1.ActionFailed, "pruning for deleted resource failed", nil)
// Return the error so we retry the failed garbage collection
return ctrl.Result{}, err
}

if changeSet != nil && len(changeSet.Entries) > 0 {
// Emit event with the resources marked for deletion.
r.event(obj, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision, eventv1.EventSeverityInfo, changeSet.String(), nil)
r.event(obj, src, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision,
eventv1.EventSeverityInfo, eventv1.ActionDeleted, changeSet.String(), nil)

// Wait for the resources marked for deletion to be terminated.
if obj.GetDeletionPolicy() == kustomizev1.DeletionPolicyWaitForTermination {
Expand All @@ -1171,15 +1178,17 @@ func (r *KustomizationReconciler) finalize(ctx context.Context,
// Emit an event and log the error if a timeout occurs.
msg := "failed to wait for resources termination"
log.Error(err, msg)
r.event(obj, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision, eventv1.EventSeverityError, msg, nil)
r.event(obj, src, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision,
eventv1.EventSeverityError, eventv1.ActionFailed, msg, nil)
}
}
}
} else {
// when the account to impersonate is gone, log the stale objects and continue with the finalization
msg := fmt.Sprintf("unable to prune objects: \n%s", ssautil.FmtUnstructuredList(objects))
log.Error(fmt.Errorf("skiping pruning, failed to find account to impersonate"), msg)
r.event(obj, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision, eventv1.EventSeverityError, msg, nil)
r.event(obj, src, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision,
eventv1.EventSeverityError, eventv1.ActionFailed, msg, nil)
}
}

Expand All @@ -1196,7 +1205,10 @@ func (r *KustomizationReconciler) finalize(ctx context.Context,
}

func (r *KustomizationReconciler) event(obj *kustomizev1.Kustomization,
revision, originRevision, severity, msg string,
src sourcev1.Source,
revision, originRevision, severity,
action string,
msg string,
metadata map[string]string) {
if metadata == nil {
metadata = map[string]string{}
Expand All @@ -1218,7 +1230,7 @@ func (r *KustomizationReconciler) event(obj *kustomizev1.Kustomization,
eventType = corev1.EventTypeWarning
}

r.EventRecorder.AnnotatedEventf(obj, metadata, eventType, reason, msg)
r.AnnotatedEventf(obj, src, metadata, eventType, reason, action, "%s", msg)
}

func (r *KustomizationReconciler) finalizeStatus(ctx context.Context,
Expand Down
8 changes: 4 additions & 4 deletions internal/controller/kustomization_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ import (
"testing"
"time"

"github.com/fluxcd/pkg/apis/meta"
sourcev1 "github.com/fluxcd/source-controller/api/v1"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

kustomizev1 "github.com/fluxcd/kustomize-controller/api/v1"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/events"
sourcev1 "github.com/fluxcd/source-controller/api/v1"
)

func TestKustomizationReconciler_StagedApply(t *testing.T) {
Expand Down Expand Up @@ -130,7 +130,7 @@ func TestKustomizationReconciler_deleteBeforeFinalizer(t *testing.T) {

r := &KustomizationReconciler{
Client: k8sClient,
EventRecorder: record.NewFakeRecorder(32),
EventRecorder: events.NewFakeRecorder(32, true),
}
// NOTE: Only a real API server responds with an error in this scenario.
_, err := r.Reconcile(ctx, ctrl.Request{NamespacedName: client.ObjectKeyFromObject(kustomization)})
Expand Down
4 changes: 2 additions & 2 deletions internal/controller/kustomization_decryptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ func TestKustomizationReconciler_Decryptor(t *testing.T) {

events := getEvents(resultK.GetName(), map[string]string{"kustomize.toolkit.fluxcd.io/revision": revision})
g.Expect(len(events)).To(BeIdenticalTo(1))
g.Expect(events[0].Message).Should(ContainSubstring("Reconciliation finished"))
g.Expect(events[0].Message).ShouldNot(ContainSubstring("configured"))
g.Expect(events[0].Note).Should(ContainSubstring("Reconciliation finished"))
g.Expect(events[0].Note).ShouldNot(ContainSubstring("configured"))
})

t.Run("global SOPS age secret as fallback", func(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions internal/controller/kustomization_externalartifact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,11 @@ stringData:
events := getEvents(resultK.GetName(), map[string]string{"kustomize.toolkit.fluxcd.io/revision": revision})
g.Expect(len(events) > 2).To(BeTrue())
g.Expect(events[0].Reason).To(BeIdenticalTo(meta.ProgressingReason))
g.Expect(events[0].Message).To(ContainSubstring("created"))
g.Expect(events[0].Note).To(ContainSubstring("created"))
g.Expect(events[1].Reason).To(BeIdenticalTo(meta.ProgressingReason))
g.Expect(events[1].Message).To(ContainSubstring("check passed"))
g.Expect(events[1].Note).To(ContainSubstring("check passed"))
g.Expect(events[2].Reason).To(BeIdenticalTo(meta.ReconciliationSucceededReason))
g.Expect(events[2].Message).To(ContainSubstring("finished"))
g.Expect(events[2].Note).To(ContainSubstring("finished"))
})

t.Run("watches for external artifact revision change", func(t *testing.T) {
Expand Down Expand Up @@ -164,7 +164,7 @@ stringData:

events := getEvents(resultK.GetName(), nil)
g.Expect(events[len(events)-1].Reason).To(BeIdenticalTo(apiacl.AccessDeniedReason))
g.Expect(events[len(events)-1].Message).To(ContainSubstring("feature gate is disabled"))
g.Expect(events[len(events)-1].Note).To(ContainSubstring("feature gate is disabled"))
})
}

Expand Down
2 changes: 1 addition & 1 deletion internal/controller/kustomization_force_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ stringData:
events := getEvents(resultK.GetName(), map[string]string{"kustomize.toolkit.fluxcd.io/revision": revision})
g.Expect(len(events) > 0).To(BeTrue())
g.Expect(events[0].Type).To(BeIdenticalTo("Warning"))
g.Expect(events[0].Message).To(ContainSubstring("field is immutable"))
g.Expect(events[0].Note).To(ContainSubstring("field is immutable"))
})
})

Expand Down
9 changes: 5 additions & 4 deletions internal/controller/kustomization_fuzzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/opencontainers/go-digest"
"github.com/ory/dockertest/v3"
corev1 "k8s.io/api/core/v1"
eventsv1 "k8s.io/api/events/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand Down Expand Up @@ -562,12 +563,12 @@ func randStringRunes(n int) string {
return string(b)
}

func getEvents(objName string, annotations map[string]string) []corev1.Event {
var result []corev1.Event
events := &corev1.EventList{}
func getEvents(objName string, annotations map[string]string) []eventsv1.Event {
var result []eventsv1.Event
events := &eventsv1.EventList{}
_ = k8sClient.List(ctx, events)
for _, event := range events.Items {
if event.InvolvedObject.Name == objName {
if event.Regarding.Name == objName {
if annotations == nil && len(annotations) == 0 {
result = append(result, event)
} else {
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/kustomization_origin_revision_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"testing"
"time"

eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
eventv1 "github.com/fluxcd/pkg/apis/event/v1"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/testserver"
sourcev1 "github.com/fluxcd/source-controller/api/v1"
Expand Down
Loading