Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 76 additions & 3 deletions inc/Cli/Commands/AgentBundleCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -857,11 +857,12 @@ private function current_artifacts( array $agent, array $installed ): array {
);
}
if ( 'flow' === $type && isset( $flow_by_slug[ $id ] ) ) {
$artifacts[] = array(
$installed_payload = is_array( $record['installed_payload'] ?? null ) ? $record['installed_payload'] : null;
$artifacts[] = array(
'artifact_type' => 'flow',
'artifact_id' => $id,
'source_path' => (string) ( $record['source_path'] ?? '' ),
'payload' => $this->flow_payload( $flow_by_slug[ $id ], $id ),
'payload' => $this->flow_payload( $flow_by_slug[ $id ], $id, $installed_payload ),
);
}
}
Expand All @@ -886,7 +887,7 @@ private function pipeline_payload( array $pipeline, string $portable_slug ): arr
);
}

private function flow_payload( array $flow, string $portable_slug ): array {
private function flow_payload( array $flow, string $portable_slug, ?array $installed_payload = null ): array {
$scheduling_policy = $this->flow_scheduling_policy( is_array( $flow['scheduling_config'] ?? null ) ? $flow['scheduling_config'] : array() );

return array(
Expand All @@ -895,6 +896,7 @@ private function flow_payload( array $flow, string $portable_slug ): array {
'flow_config' => $this->flow_config_without_runtime_queues( is_array( $flow['flow_config'] ?? null ) ? $flow['flow_config'] : array() ),
'scheduling_policy' => $scheduling_policy,
'queue_policy' => 'create_seed_upgrade_preserve_existing',
'runtime_overlays' => $this->flow_runtime_overlays( $flow, $installed_payload ),
);
}

Expand All @@ -913,13 +915,84 @@ private function flow_config_without_runtime_queues( array $flow_config ): array
foreach ( $flow_config as &$step ) {
if ( is_array( $step ) ) {
unset( $step['prompt_queue'], $step['config_patch_queue'], $step['queue_mode'], $step['_queue_consume_revision'] );
unset( $step['handler_config']['max_items'] );
if ( empty( $step['handler_config'] ) ) {
unset( $step['handler_config'] );
}
if ( is_array( $step['handler_configs'] ?? null ) ) {
foreach ( $step['handler_configs'] as $handler_slug => &$handler_config ) {
if ( is_array( $handler_config ) ) {
unset( $handler_config['max_items'] );
if ( empty( $handler_config ) ) {
unset( $step['handler_configs'][ $handler_slug ] );
}
}
}
unset( $handler_config );
if ( empty( $step['handler_configs'] ) ) {
unset( $step['handler_configs'] );
}
}
}
}
unset( $step );

return $flow_config;
}

private function flow_runtime_overlays( array $flow, ?array $installed_payload = null ): array {
if ( is_array( $installed_payload['runtime_overlays'] ?? null ) ) {
return $installed_payload['runtime_overlays'];
}

$overlays = array();
$flow_config = is_array( $flow['flow_config'] ?? null ) ? $flow['flow_config'] : array();
$steps = array();

foreach ( $flow_config as $flow_step_id => $step ) {
if ( ! is_array( $step ) ) {
continue;
}

$step_overlay = array();
foreach ( array( 'prompt_queue', 'config_patch_queue', 'queue_mode', '_queue_consume_revision' ) as $field ) {
if ( array_key_exists( $field, $step ) ) {
$step_overlay[ $field ] = $step[ $field ];
}
}
if ( array_key_exists( 'max_items', $step['handler_config'] ?? array() ) ) {
$step_overlay['handler_config'] = array( 'max_items' => $step['handler_config']['max_items'] );
}
if ( is_array( $step['handler_configs'] ?? null ) ) {
foreach ( $step['handler_configs'] as $handler_slug => $handler_config ) {
if ( is_array( $handler_config ) && array_key_exists( 'max_items', $handler_config ) ) {
$step_overlay['handler_configs'][ (string) $handler_slug ] = array( 'max_items' => $handler_config['max_items'] );
}
}
}

if ( ! empty( $step_overlay ) ) {
ksort( $step_overlay, SORT_STRING );
$steps[ (string) $flow_step_id ] = $step_overlay;
}
}

if ( ! empty( $steps ) ) {
ksort( $steps, SORT_STRING );
$overlays['steps'] = $steps;
}

$scheduling = is_array( $flow['scheduling_config'] ?? null ) ? $flow['scheduling_config'] : array();
unset( $scheduling['last_run'], $scheduling['next_run'], $scheduling['run_count'], $scheduling['run_artifacts'] );
if ( ! empty( $scheduling ) ) {
ksort( $scheduling, SORT_STRING );
$overlays['scheduling_config'] = $scheduling;
}

ksort( $overlays, SORT_STRING );
return $overlays;
}

private function resolve_bundle_agent( array $bundle, string $slug = '' ): ?array {
$target = '' !== $slug ? sanitize_title( $slug ) : sanitize_title( (string) ( $bundle['agent']['agent_slug'] ?? '' ) );
if ( '' === $target ) {
Expand Down
145 changes: 134 additions & 11 deletions inc/Core/Agents/AgentBundler.php
Original file line number Diff line number Diff line change
Expand Up @@ -844,11 +844,11 @@ public function import( array $bundle, ?string $new_slug = null, int $owner_id =
&& ! $reconcile_runtime
&& $this->artifact_has_local_modifications(
$artifact_records[ $artifact_key ] ?? null,
$this->flow_artifact_payload( $existing_flow, $portable_slug )
$this->normalized_existing_flow_payload( $existing_flow, $portable_slug, (int) $new_pipeline_id, is_array( $artifact_records[ $artifact_key ] ?? null ) ? $artifact_records[ $artifact_key ] : null )
)
&& ! hash_equals(
AgentBundleArtifactHasher::hash( $payload ),
AgentBundleArtifactHasher::hash( $this->normalized_existing_flow_payload( $existing_flow, $portable_slug, (int) $new_pipeline_id ) )
AgentBundleArtifactHasher::hash( $this->normalized_existing_flow_payload( $existing_flow, $portable_slug, (int) $new_pipeline_id, is_array( $artifact_records[ $artifact_key ] ?? null ) ? $artifact_records[ $artifact_key ] : null ) )
)
) {
$conflicts[] = array(
Expand Down Expand Up @@ -1213,8 +1213,10 @@ private function pipeline_artifact_payload( array $pipeline, string $portable_sl
);
}

private function flow_artifact_payload( array $flow, string $portable_slug ): array {
$scheduling_policy = $this->bundle_scheduling_policy( is_array( $flow['scheduling_config'] ?? null ) ? $flow['scheduling_config'] : array() );
private function flow_artifact_payload( array $flow, string $portable_slug, ?array $installed_payload = null ): array {
$scheduling_policy = is_string( $installed_payload['scheduling_policy'] ?? null )
? $installed_payload['scheduling_policy']
: $this->bundle_scheduling_policy( is_array( $flow['scheduling_config'] ?? null ) ? $flow['scheduling_config'] : array() );
$run_artifacts = \DataMachine\Engine\Bundle\BundleSchema::normalize_run_artifact_egress_policy( $flow['run_artifacts'] ?? $flow['scheduling_config']['run_artifacts'] ?? array() );

$payload = array(
Expand All @@ -1223,6 +1225,7 @@ private function flow_artifact_payload( array $flow, string $portable_slug ): ar
'flow_config' => $this->flow_config_without_runtime_queues( is_array( $flow['flow_config'] ?? null ) ? $flow['flow_config'] : array() ),
'scheduling_policy' => $scheduling_policy,
'queue_policy' => 'create_seed_upgrade_preserve_existing',
'runtime_overlays' => $this->flow_runtime_overlays( $flow, $installed_payload ),
);
if ( ! empty( $run_artifacts ) ) {
$payload['run_artifacts'] = $run_artifacts;
Expand Down Expand Up @@ -1341,30 +1344,148 @@ private function preserve_runtime_queue_fields( array $incoming_flow_config, arr
if ( ! is_array( $step ) || ! is_array( $existing_flow_config[ $flow_step_id ] ?? null ) ) {
continue;
}
$existing_step = $existing_flow_config[ $flow_step_id ];
foreach ( $runtime_fields as $field ) {
if ( array_key_exists( $field, $existing_flow_config[ $flow_step_id ] ) ) {
$step[ $field ] = $existing_flow_config[ $flow_step_id ][ $field ];
if ( array_key_exists( $field, $existing_step ) ) {
$step[ $field ] = $existing_step[ $field ];
}
}
$this->preserve_handler_max_items( $step, $existing_step );
}
unset( $step );

return $incoming_flow_config;
}

private function preserve_handler_max_items( array &$step, array $existing_step ): void {
if ( array_key_exists( 'max_items', $existing_step['handler_config'] ?? array() ) ) {
if ( ! is_array( $step['handler_config'] ?? null ) ) {
$step['handler_config'] = array();
}
$step['handler_config']['max_items'] = $existing_step['handler_config']['max_items'];
}

if ( ! is_array( $existing_step['handler_configs'] ?? null ) ) {
return;
}

foreach ( $existing_step['handler_configs'] as $handler_slug => $handler_config ) {
if ( ! is_array( $handler_config ) || ! array_key_exists( 'max_items', $handler_config ) ) {
continue;
}
if ( ! array_key_exists( $handler_slug, is_array( $step['handler_configs'] ?? null ) ? $step['handler_configs'] : array() ) ) {
continue;
}
if ( ! is_array( $step['handler_configs'] ?? null ) ) {
$step['handler_configs'] = array();
}
if ( ! is_array( $step['handler_configs'][ $handler_slug ] ?? null ) ) {
$step['handler_configs'][ $handler_slug ] = array();
}
$step['handler_configs'][ $handler_slug ]['max_items'] = $handler_config['max_items'];
}
}

private function flow_config_without_runtime_queues( array $flow_config ): array {
foreach ( $flow_config as &$step ) {
$normalized = array();
foreach ( $flow_config as $flow_step_id => $step ) {
if ( ! is_array( $step ) ) {
$normalized[ (string) $flow_step_id ] = $step;
continue;
}
unset( $step['prompt_queue'], $step['config_patch_queue'], $step['queue_mode'], $step['_queue_consume_revision'] );
unset( $step['pipeline_id'], $step['flow_id'], $step['flow_step_id'] );
unset( $step['handler_config']['max_items'] );
if ( empty( $step['handler_config'] ) ) {
unset( $step['handler_config'] );
}
if ( isset( $step['pipeline_step_id'] ) ) {
$step['pipeline_step_id'] = $this->normalize_artifact_step_id( (string) $step['pipeline_step_id'] );
}
if ( is_array( $step['handler_configs'] ?? null ) ) {
foreach ( $step['handler_configs'] as $handler_slug => &$handler_config ) {
if ( is_array( $handler_config ) ) {
unset( $handler_config['max_items'] );
if ( empty( $handler_config ) ) {
unset( $step['handler_configs'][ $handler_slug ] );
}
}
}
unset( $handler_config );
if ( empty( $step['handler_configs'] ) ) {
unset( $step['handler_configs'] );
}
}

$normalized[ $this->normalize_artifact_step_id( (string) $flow_step_id ) ] = $step;
}
unset( $step );
ksort( $normalized, SORT_STRING );

return $normalized;
}

private function normalize_artifact_step_id( string $step_id ): string {
$normalized = preg_replace( '/^\d+_/', '', $step_id );
$normalized = is_string( $normalized ) ? $normalized : $step_id;
$normalized = preg_replace( '/_\d+$/', '', $normalized );

return $flow_config;
return is_string( $normalized ) && '' !== $normalized ? $normalized : $step_id;
}

private function normalized_existing_flow_payload( array $flow, string $portable_slug, int $new_pipeline_id ): array {
private function flow_runtime_overlays( array $flow, ?array $installed_payload = null ): array {
if ( is_array( $installed_payload['runtime_overlays'] ?? null ) ) {
return $installed_payload['runtime_overlays'];
}

$overlays = array();
$flow_config = is_array( $flow['flow_config'] ?? null ) ? $flow['flow_config'] : array();
$steps = array();

foreach ( $flow_config as $flow_step_id => $step ) {
if ( ! is_array( $step ) ) {
continue;
}

$step_overlay = array();
foreach ( array( 'prompt_queue', 'config_patch_queue', 'queue_mode', '_queue_consume_revision' ) as $field ) {
if ( array_key_exists( $field, $step ) ) {
$step_overlay[ $field ] = $step[ $field ];
}
}
if ( array_key_exists( 'max_items', $step['handler_config'] ?? array() ) ) {
$step_overlay['handler_config'] = array( 'max_items' => $step['handler_config']['max_items'] );
}
if ( is_array( $step['handler_configs'] ?? null ) ) {
foreach ( $step['handler_configs'] as $handler_slug => $handler_config ) {
if ( is_array( $handler_config ) && array_key_exists( 'max_items', $handler_config ) ) {
$step_overlay['handler_configs'][ (string) $handler_slug ] = array( 'max_items' => $handler_config['max_items'] );
}
}
}

if ( ! empty( $step_overlay ) ) {
ksort( $step_overlay, SORT_STRING );
$steps[ (string) $flow_step_id ] = $step_overlay;
}
}

if ( ! empty( $steps ) ) {
ksort( $steps, SORT_STRING );
$overlays['steps'] = $steps;
}

$scheduling = $this->sanitize_scheduling_config( is_array( $flow['scheduling_config'] ?? null ) ? $flow['scheduling_config'] : array() );
unset( $scheduling['run_artifacts'] );
if ( ! empty( $scheduling ) ) {
ksort( $scheduling, SORT_STRING );
$overlays['scheduling_config'] = $scheduling;
}

ksort( $overlays, SORT_STRING );
return $overlays;
}

private function normalized_existing_flow_payload( array $flow, string $portable_slug, int $new_pipeline_id, ?array $artifact_record = null ): array {
$flow_id = (int) ( $flow['flow_id'] ?? 0 );
$flow_config = is_array( $flow['flow_config'] ?? null ) ? $flow['flow_config'] : array();

Expand All @@ -1381,7 +1502,9 @@ private function normalized_existing_flow_payload( array $flow, string $portable
break;
}

return $this->flow_artifact_payload( $flow, $portable_slug );
$installed_payload = is_array( $artifact_record['installed_payload'] ?? null ) ? $artifact_record['installed_payload'] : null;

return $this->flow_artifact_payload( $flow, $portable_slug, $installed_payload );
}

/**
Expand Down
Loading
Loading