@@ -88,12 +88,14 @@ type KCLRunReconciler struct {
8888 DefaultServiceAccount string
8989 DisallowedFieldManagers []string
9090 artifactFetcher * fetch.ArchiveFetcher
91+ requeueDependency time.Duration
9192
9293 statusManager string
9394}
9495
9596type KCLRunReconcilerOptions struct {
96- HTTPRetry int
97+ DependencyRequeueInterval time.Duration
98+ HTTPRetry int
9799}
98100
99101// SetupWithManager sets up the controller with the Manager.
@@ -105,6 +107,7 @@ func (r *KCLRunReconciler) SetupWithManager(mgr ctrl.Manager, opts KCLRunReconci
105107 fetch .WithUntar (tar .WithMaxUntarSize (tar .UnlimitedUntarSize )),
106108 fetch .WithHostnameOverwrite (os .Getenv ("SOURCE_CONTROLLER_LOCALHOST" )),
107109 )
110+ r .requeueDependency = opts .DependencyRequeueInterval
108111 r .statusManager = "gotk-flux-kcl-controller"
109112 // New controller
110113 return ctrl .NewControllerManagedBy (mgr ).
@@ -214,6 +217,27 @@ func (r *KCLRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
214217 return ctrl.Result {}, err
215218 }
216219 artifact := source .GetArtifact ()
220+
221+ // Requeue the reconciliation if the source artifact is not found.
222+ if artifact == nil {
223+ msg := fmt .Sprintf ("Source artifact not found, retrying in %s" , r .requeueDependency .String ())
224+ conditions .MarkFalse (& obj , meta .ReadyCondition , meta .ArtifactFailedReason , "%s" , msg )
225+ log .Info (msg )
226+ return ctrl.Result {RequeueAfter : r .requeueDependency }, nil
227+ }
228+
229+ // Check dependencies and requeue the reconciliation if the check fails.
230+ if len (obj .Spec .DependsOn ) > 0 {
231+ if err := r .checkDependencies (ctx , & obj , source ); err != nil {
232+ conditions .MarkFalse (& obj , meta .ReadyCondition , meta .DependencyNotReadyReason , "%s" , err )
233+ msg := fmt .Sprintf ("Dependencies do not meet ready condition, retrying in %s" , r .requeueDependency .String ())
234+ log .Info (msg )
235+ r .event (& obj , artifact .Revision , eventv1 .EventSeverityInfo , msg , nil )
236+ return ctrl.Result {RequeueAfter : r .requeueDependency }, nil
237+ }
238+ log .Info ("All dependencies are ready, proceeding with reconciliation" )
239+ }
240+
217241 progressingMsg := fmt .Sprintf ("new revision detected %s" , artifact .Revision )
218242 log .Info (progressingMsg )
219243 conditions .MarkUnknown (& obj , meta .ReadyCondition , meta .ProgressingReason , "Reconciliation in progress" )
@@ -368,6 +392,51 @@ func (r *KCLRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
368392 return ctrl.Result {}, nil
369393}
370394
395+ func (r * KCLRunReconciler ) checkDependencies (ctx context.Context ,
396+ obj * v1alpha1.KCLRun ,
397+ source sourcev1.Source ) error {
398+ for _ , d := range obj .Spec .DependsOn {
399+ if d .Namespace == "" {
400+ d .Namespace = obj .GetNamespace ()
401+ }
402+ dName := types.NamespacedName {
403+ Namespace : d .Namespace ,
404+ Name : d .Name ,
405+ }
406+ var k v1alpha1.KCLRun
407+ err := r .Get (ctx , dName , & k )
408+ if err != nil {
409+ return fmt .Errorf ("dependency '%s' not found: %w" , dName , err )
410+ }
411+
412+ if len (k .Status .Conditions ) == 0 || k .Generation != k .Status .ObservedGeneration {
413+ return fmt .Errorf ("dependency '%s' is not ready" , dName )
414+ }
415+
416+ if ! apimeta .IsStatusConditionTrue (k .Status .Conditions , meta .ReadyCondition ) {
417+ return fmt .Errorf ("dependency '%s' is not ready" , dName )
418+ }
419+
420+ srcNamespace := k .Spec .SourceRef .Namespace
421+ if srcNamespace == "" {
422+ srcNamespace = k .GetNamespace ()
423+ }
424+ dSrcNamespace := obj .Spec .SourceRef .Namespace
425+ if dSrcNamespace == "" {
426+ dSrcNamespace = obj .GetNamespace ()
427+ }
428+
429+ if k .Spec .SourceRef .Name == obj .Spec .SourceRef .Name &&
430+ srcNamespace == dSrcNamespace &&
431+ k .Spec .SourceRef .Kind == obj .Spec .SourceRef .Kind &&
432+ ! source .GetArtifact ().HasRevision (k .Status .LastAppliedRevision ) {
433+ return fmt .Errorf ("dependency '%s' revision is not up to date" , dName )
434+ }
435+ }
436+
437+ return nil
438+ }
439+
371440func (r * KCLRunReconciler ) getSource (ctx context.Context ,
372441 obj * v1alpha1.KCLRun ) (sourcev1.Source , error ) {
373442 var src sourcev1.Source
0 commit comments