From 7e906dcb9168e250845f105498e366693eea2035 Mon Sep 17 00:00:00 2001 From: Chris Huber Date: Fri, 22 May 2026 19:49:48 -0400 Subject: [PATCH 1/2] fix: separate flow runtime overlay drift --- inc/Cli/Commands/AgentBundleCommand.php | 79 +++++++++++- inc/Core/Agents/AgentBundler.php | 119 ++++++++++++++++-- inc/Engine/Bundle/AgentBundleRuntimeDrift.php | 54 +++++++- .../Core/Agents/AgentBundlerImportTest.php | 85 +++++++++++++ tests/agent-bundle-runtime-drift-smoke.php | 11 ++ tests/agent-bundle-upgrade-planner-smoke.php | 66 +++++++++- 6 files changed, 398 insertions(+), 16 deletions(-) diff --git a/inc/Cli/Commands/AgentBundleCommand.php b/inc/Cli/Commands/AgentBundleCommand.php index c4d5a2d44..710abe337 100644 --- a/inc/Cli/Commands/AgentBundleCommand.php +++ b/inc/Cli/Commands/AgentBundleCommand.php @@ -857,11 +857,12 @@ private function current_artifacts( array $agent, array $installed ): array { ); } if ( 'flow' === $type && isset( $flow_by_slug[ $id ] ) ) { - $artifacts[] = array( + $installed_payload = is_array( $record['installed_payload'] ?? null ) ? $record['installed_payload'] : null; + $artifacts[] = array( 'artifact_type' => 'flow', 'artifact_id' => $id, 'source_path' => (string) ( $record['source_path'] ?? '' ), - 'payload' => $this->flow_payload( $flow_by_slug[ $id ], $id ), + 'payload' => $this->flow_payload( $flow_by_slug[ $id ], $id, $installed_payload ), ); } } @@ -886,7 +887,7 @@ private function pipeline_payload( array $pipeline, string $portable_slug ): arr ); } - private function flow_payload( array $flow, string $portable_slug ): array { + private function flow_payload( array $flow, string $portable_slug, ?array $installed_payload = null ): array { $scheduling_policy = $this->flow_scheduling_policy( is_array( $flow['scheduling_config'] ?? null ) ? $flow['scheduling_config'] : array() ); return array( @@ -895,6 +896,7 @@ private function flow_payload( array $flow, string $portable_slug ): array { 'flow_config' => $this->flow_config_without_runtime_queues( is_array( $flow['flow_config'] ?? null ) ? $flow['flow_config'] : array() ), 'scheduling_policy' => $scheduling_policy, 'queue_policy' => 'create_seed_upgrade_preserve_existing', + 'runtime_overlays' => $this->flow_runtime_overlays( $flow, $installed_payload ), ); } @@ -913,6 +915,24 @@ private function flow_config_without_runtime_queues( array $flow_config ): array foreach ( $flow_config as &$step ) { if ( is_array( $step ) ) { unset( $step['prompt_queue'], $step['config_patch_queue'], $step['queue_mode'], $step['_queue_consume_revision'] ); + unset( $step['handler_config']['max_items'] ); + if ( empty( $step['handler_config'] ) ) { + unset( $step['handler_config'] ); + } + if ( is_array( $step['handler_configs'] ?? null ) ) { + foreach ( $step['handler_configs'] as $handler_slug => &$handler_config ) { + if ( is_array( $handler_config ) ) { + unset( $handler_config['max_items'] ); + if ( empty( $handler_config ) ) { + unset( $step['handler_configs'][ $handler_slug ] ); + } + } + } + unset( $handler_config ); + if ( empty( $step['handler_configs'] ) ) { + unset( $step['handler_configs'] ); + } + } } } unset( $step ); @@ -920,6 +940,59 @@ private function flow_config_without_runtime_queues( array $flow_config ): array return $flow_config; } + private function flow_runtime_overlays( array $flow, ?array $installed_payload = null ): array { + if ( is_array( $installed_payload['runtime_overlays'] ?? null ) ) { + return $installed_payload['runtime_overlays']; + } + + $overlays = array(); + $flow_config = is_array( $flow['flow_config'] ?? null ) ? $flow['flow_config'] : array(); + $steps = array(); + + foreach ( $flow_config as $flow_step_id => $step ) { + if ( ! is_array( $step ) ) { + continue; + } + + $step_overlay = array(); + foreach ( array( 'prompt_queue', 'config_patch_queue', 'queue_mode', '_queue_consume_revision' ) as $field ) { + if ( array_key_exists( $field, $step ) ) { + $step_overlay[ $field ] = $step[ $field ]; + } + } + if ( array_key_exists( 'max_items', $step['handler_config'] ?? array() ) ) { + $step_overlay['handler_config'] = array( 'max_items' => $step['handler_config']['max_items'] ); + } + if ( is_array( $step['handler_configs'] ?? null ) ) { + foreach ( $step['handler_configs'] as $handler_slug => $handler_config ) { + if ( is_array( $handler_config ) && array_key_exists( 'max_items', $handler_config ) ) { + $step_overlay['handler_configs'][ (string) $handler_slug ] = array( 'max_items' => $handler_config['max_items'] ); + } + } + } + + if ( ! empty( $step_overlay ) ) { + ksort( $step_overlay, SORT_STRING ); + $steps[ (string) $flow_step_id ] = $step_overlay; + } + } + + if ( ! empty( $steps ) ) { + ksort( $steps, SORT_STRING ); + $overlays['steps'] = $steps; + } + + $scheduling = is_array( $flow['scheduling_config'] ?? null ) ? $flow['scheduling_config'] : array(); + unset( $scheduling['last_run'], $scheduling['next_run'], $scheduling['run_count'], $scheduling['run_artifacts'] ); + if ( ! empty( $scheduling ) ) { + ksort( $scheduling, SORT_STRING ); + $overlays['scheduling_config'] = $scheduling; + } + + ksort( $overlays, SORT_STRING ); + return $overlays; + } + private function resolve_bundle_agent( array $bundle, string $slug = '' ): ?array { $target = '' !== $slug ? sanitize_title( $slug ) : sanitize_title( (string) ( $bundle['agent']['agent_slug'] ?? '' ) ); if ( '' === $target ) { diff --git a/inc/Core/Agents/AgentBundler.php b/inc/Core/Agents/AgentBundler.php index b7c2177ac..81b836378 100644 --- a/inc/Core/Agents/AgentBundler.php +++ b/inc/Core/Agents/AgentBundler.php @@ -844,11 +844,11 @@ public function import( array $bundle, ?string $new_slug = null, int $owner_id = && ! $reconcile_runtime && $this->artifact_has_local_modifications( $artifact_records[ $artifact_key ] ?? null, - $this->flow_artifact_payload( $existing_flow, $portable_slug ) + $this->normalized_existing_flow_payload( $existing_flow, $portable_slug, (int) $new_pipeline_id, is_array( $artifact_records[ $artifact_key ] ?? null ) ? $artifact_records[ $artifact_key ] : null ) ) && ! hash_equals( AgentBundleArtifactHasher::hash( $payload ), - AgentBundleArtifactHasher::hash( $this->normalized_existing_flow_payload( $existing_flow, $portable_slug, (int) $new_pipeline_id ) ) + AgentBundleArtifactHasher::hash( $this->normalized_existing_flow_payload( $existing_flow, $portable_slug, (int) $new_pipeline_id, is_array( $artifact_records[ $artifact_key ] ?? null ) ? $artifact_records[ $artifact_key ] : null ) ) ) ) { $conflicts[] = array( @@ -1213,7 +1213,7 @@ private function pipeline_artifact_payload( array $pipeline, string $portable_sl ); } - private function flow_artifact_payload( array $flow, string $portable_slug ): array { + private function flow_artifact_payload( array $flow, string $portable_slug, ?array $installed_payload = null ): array { $scheduling_policy = $this->bundle_scheduling_policy( is_array( $flow['scheduling_config'] ?? null ) ? $flow['scheduling_config'] : array() ); $run_artifacts = \DataMachine\Engine\Bundle\BundleSchema::normalize_run_artifact_egress_policy( $flow['run_artifacts'] ?? $flow['scheduling_config']['run_artifacts'] ?? array() ); @@ -1223,6 +1223,7 @@ private function flow_artifact_payload( array $flow, string $portable_slug ): ar 'flow_config' => $this->flow_config_without_runtime_queues( is_array( $flow['flow_config'] ?? null ) ? $flow['flow_config'] : array() ), 'scheduling_policy' => $scheduling_policy, 'queue_policy' => 'create_seed_upgrade_preserve_existing', + 'runtime_overlays' => $this->flow_runtime_overlays( $flow, $installed_payload ), ); if ( ! empty( $run_artifacts ) ) { $payload['run_artifacts'] = $run_artifacts; @@ -1341,30 +1342,132 @@ private function preserve_runtime_queue_fields( array $incoming_flow_config, arr if ( ! is_array( $step ) || ! is_array( $existing_flow_config[ $flow_step_id ] ?? null ) ) { continue; } + $existing_step = $existing_flow_config[ $flow_step_id ]; foreach ( $runtime_fields as $field ) { - if ( array_key_exists( $field, $existing_flow_config[ $flow_step_id ] ) ) { - $step[ $field ] = $existing_flow_config[ $flow_step_id ][ $field ]; + if ( array_key_exists( $field, $existing_step ) ) { + $step[ $field ] = $existing_step[ $field ]; } } + $this->preserve_handler_max_items( $step, $existing_step ); } unset( $step ); return $incoming_flow_config; } + private function preserve_handler_max_items( array &$step, array $existing_step ): void { + if ( array_key_exists( 'max_items', $existing_step['handler_config'] ?? array() ) ) { + if ( ! is_array( $step['handler_config'] ?? null ) ) { + $step['handler_config'] = array(); + } + $step['handler_config']['max_items'] = $existing_step['handler_config']['max_items']; + } + + if ( ! is_array( $existing_step['handler_configs'] ?? null ) ) { + return; + } + + foreach ( $existing_step['handler_configs'] as $handler_slug => $handler_config ) { + if ( ! is_array( $handler_config ) || ! array_key_exists( 'max_items', $handler_config ) ) { + continue; + } + if ( ! array_key_exists( $handler_slug, is_array( $step['handler_configs'] ?? null ) ? $step['handler_configs'] : array() ) ) { + continue; + } + if ( ! is_array( $step['handler_configs'] ?? null ) ) { + $step['handler_configs'] = array(); + } + if ( ! is_array( $step['handler_configs'][ $handler_slug ] ?? null ) ) { + $step['handler_configs'][ $handler_slug ] = array(); + } + $step['handler_configs'][ $handler_slug ]['max_items'] = $handler_config['max_items']; + } + } + private function flow_config_without_runtime_queues( array $flow_config ): array { foreach ( $flow_config as &$step ) { if ( ! is_array( $step ) ) { continue; } unset( $step['prompt_queue'], $step['config_patch_queue'], $step['queue_mode'], $step['_queue_consume_revision'] ); + unset( $step['handler_config']['max_items'] ); + if ( empty( $step['handler_config'] ) ) { + unset( $step['handler_config'] ); + } + if ( is_array( $step['handler_configs'] ?? null ) ) { + foreach ( $step['handler_configs'] as $handler_slug => &$handler_config ) { + if ( is_array( $handler_config ) ) { + unset( $handler_config['max_items'] ); + if ( empty( $handler_config ) ) { + unset( $step['handler_configs'][ $handler_slug ] ); + } + } + } + unset( $handler_config ); + if ( empty( $step['handler_configs'] ) ) { + unset( $step['handler_configs'] ); + } + } } unset( $step ); return $flow_config; } - private function normalized_existing_flow_payload( array $flow, string $portable_slug, int $new_pipeline_id ): array { + private function flow_runtime_overlays( array $flow, ?array $installed_payload = null ): array { + if ( is_array( $installed_payload['runtime_overlays'] ?? null ) ) { + return $installed_payload['runtime_overlays']; + } + + $overlays = array(); + $flow_config = is_array( $flow['flow_config'] ?? null ) ? $flow['flow_config'] : array(); + $steps = array(); + + foreach ( $flow_config as $flow_step_id => $step ) { + if ( ! is_array( $step ) ) { + continue; + } + + $step_overlay = array(); + foreach ( array( 'prompt_queue', 'config_patch_queue', 'queue_mode', '_queue_consume_revision' ) as $field ) { + if ( array_key_exists( $field, $step ) ) { + $step_overlay[ $field ] = $step[ $field ]; + } + } + if ( array_key_exists( 'max_items', $step['handler_config'] ?? array() ) ) { + $step_overlay['handler_config'] = array( 'max_items' => $step['handler_config']['max_items'] ); + } + if ( is_array( $step['handler_configs'] ?? null ) ) { + foreach ( $step['handler_configs'] as $handler_slug => $handler_config ) { + if ( is_array( $handler_config ) && array_key_exists( 'max_items', $handler_config ) ) { + $step_overlay['handler_configs'][ (string) $handler_slug ] = array( 'max_items' => $handler_config['max_items'] ); + } + } + } + + if ( ! empty( $step_overlay ) ) { + ksort( $step_overlay, SORT_STRING ); + $steps[ (string) $flow_step_id ] = $step_overlay; + } + } + + if ( ! empty( $steps ) ) { + ksort( $steps, SORT_STRING ); + $overlays['steps'] = $steps; + } + + $scheduling = $this->sanitize_scheduling_config( is_array( $flow['scheduling_config'] ?? null ) ? $flow['scheduling_config'] : array() ); + unset( $scheduling['run_artifacts'] ); + if ( ! empty( $scheduling ) ) { + ksort( $scheduling, SORT_STRING ); + $overlays['scheduling_config'] = $scheduling; + } + + ksort( $overlays, SORT_STRING ); + return $overlays; + } + + private function normalized_existing_flow_payload( array $flow, string $portable_slug, int $new_pipeline_id, ?array $artifact_record = null ): array { $flow_id = (int) ( $flow['flow_id'] ?? 0 ); $flow_config = is_array( $flow['flow_config'] ?? null ) ? $flow['flow_config'] : array(); @@ -1381,7 +1484,9 @@ private function normalized_existing_flow_payload( array $flow, string $portable break; } - return $this->flow_artifact_payload( $flow, $portable_slug ); + $installed_payload = is_array( $artifact_record['installed_payload'] ?? null ) ? $artifact_record['installed_payload'] : null; + + return $this->flow_artifact_payload( $flow, $portable_slug, $installed_payload ); } /** diff --git a/inc/Engine/Bundle/AgentBundleRuntimeDrift.php b/inc/Engine/Bundle/AgentBundleRuntimeDrift.php index 84610e736..423888f6c 100644 --- a/inc/Engine/Bundle/AgentBundleRuntimeDrift.php +++ b/inc/Engine/Bundle/AgentBundleRuntimeDrift.php @@ -63,13 +63,15 @@ public static function replace_runtime_queue_fields( array $incoming_flow_config if ( ! is_array( $step ) || ! is_array( $target_flow_config[ $flow_step_id ] ?? null ) ) { continue; } + $target_step = $target_flow_config[ $flow_step_id ]; foreach ( self::QUEUE_FIELDS as $field ) { - if ( array_key_exists( $field, $target_flow_config[ $flow_step_id ] ) ) { - $step[ $field ] = $target_flow_config[ $flow_step_id ][ $field ]; + if ( array_key_exists( $field, $target_step ) ) { + $step[ $field ] = $target_step[ $field ]; } else { unset( $step[ $field ] ); } } + self::replace_handler_max_items( $step, $target_step ); unset( $step['_queue_consume_revision'] ); } unset( $step ); @@ -77,6 +79,37 @@ public static function replace_runtime_queue_fields( array $incoming_flow_config return $incoming_flow_config; } + private static function replace_handler_max_items( array &$step, array $target_step ): void { + if ( array_key_exists( 'max_items', $target_step['handler_config'] ?? array() ) ) { + if ( ! is_array( $step['handler_config'] ?? null ) ) { + $step['handler_config'] = array(); + } + $step['handler_config']['max_items'] = $target_step['handler_config']['max_items']; + } elseif ( isset( $step['handler_config'] ) && is_array( $step['handler_config'] ) ) { + unset( $step['handler_config']['max_items'] ); + } + + if ( ! is_array( $target_step['handler_configs'] ?? null ) && ! is_array( $step['handler_configs'] ?? null ) ) { + return; + } + + $handler_slugs = array_unique( array_merge( array_keys( is_array( $step['handler_configs'] ?? null ) ? $step['handler_configs'] : array() ), array_keys( is_array( $target_step['handler_configs'] ?? null ) ? $target_step['handler_configs'] : array() ) ) ); + foreach ( $handler_slugs as $handler_slug ) { + $target_config = is_array( $target_step['handler_configs'][ $handler_slug ] ?? null ) ? $target_step['handler_configs'][ $handler_slug ] : array(); + if ( array_key_exists( 'max_items', $target_config ) ) { + if ( ! is_array( $step['handler_configs'] ?? null ) ) { + $step['handler_configs'] = array(); + } + if ( ! is_array( $step['handler_configs'][ $handler_slug ] ?? null ) ) { + $step['handler_configs'][ $handler_slug ] = array(); + } + $step['handler_configs'][ $handler_slug ]['max_items'] = $target_config['max_items']; + } elseif ( isset( $step['handler_configs'][ $handler_slug ] ) && is_array( $step['handler_configs'][ $handler_slug ] ) ) { + unset( $step['handler_configs'][ $handler_slug ]['max_items'] ); + } + } + } + private static function step_diffs( array $current_flow_config, array $target_flow_config ): array { $diffs = array(); $ids = array_unique( array_merge( array_keys( $current_flow_config ), array_keys( $target_flow_config ) ) ); @@ -107,9 +140,26 @@ private static function step_runtime_preview( array $step ): array { 'queue_mode' => (string) ( $step['queue_mode'] ?? '' ), 'prompt_queue_depth' => self::list_depth( $step['prompt_queue'] ?? array() ), 'config_patch_queue_depth' => self::list_depth( $step['config_patch_queue'] ?? array() ), + 'handler_max_items' => self::handler_max_items( $step ), ); } + private static function handler_max_items( array $step ): array { + $max_items = array(); + if ( array_key_exists( 'max_items', $step['handler_config'] ?? array() ) ) { + $max_items['handler_config'] = $step['handler_config']['max_items']; + } + if ( is_array( $step['handler_configs'] ?? null ) ) { + foreach ( $step['handler_configs'] as $handler_slug => $handler_config ) { + if ( is_array( $handler_config ) && array_key_exists( 'max_items', $handler_config ) ) { + $max_items[ (string) $handler_slug ] = $handler_config['max_items']; + } + } + } + ksort( $max_items, SORT_STRING ); + return $max_items; + } + private static function queue_depth( array $flow_config ): array { $depth = array( 'prompt_queue' => 0, diff --git a/tests/Unit/Core/Agents/AgentBundlerImportTest.php b/tests/Unit/Core/Agents/AgentBundlerImportTest.php index b1daf5c0e..f879a2b41 100644 --- a/tests/Unit/Core/Agents/AgentBundlerImportTest.php +++ b/tests/Unit/Core/Agents/AgentBundlerImportTest.php @@ -510,6 +510,91 @@ public function test_reconcile_runtime_replaces_local_modified_flow_queue_and_sc $this->assertSame( array( 'mcp' => 50 ), $updated_flow['scheduling_config']['max_items'] ?? null, 'Bundle schedule max items replace stale runtime bounds.' ); } + public function test_upgrade_preserves_runtime_overlays_without_hiding_bundle_seed_drift(): void { + $bundle = $this->fixture_bundle( 'runtime-overlay-agent' ); + $bundle['flows'][0]['flow_config']['1_step-uuid_1'] = array_merge( + $bundle['flows'][0]['flow_config']['1_step-uuid_1'], + array( + 'step_type' => 'fetch', + 'handler_configs' => array( + 'mcp' => array( + 'provider' => 'mgs', + 'max_items' => 5, + ), + ), + 'config_patch_queue' => array( array( 'patch' => array( 'query' => 'seed' ) ) ), + 'queue_mode' => 'loop', + ) + ); + $bundle['flows'][0]['scheduling_config'] = array( + 'enabled' => false, + 'interval' => 'manual', + 'max_items' => array( 'mcp' => 5 ), + ); + + $first = $this->bundler->import( $bundle, null, $this->owner_id ); + $this->assertTrue( (bool) $first['success'], 'Initial install succeeds.' ); + + $agent = $this->agents_repo->get_by_slug( 'runtime-overlay-agent' ); + $pipeline = $this->pipelines_repo->get_by_portable_slug( (int) $agent['agent_id'], 'static-site-pipeline' ); + $flow = $this->flows_repo->get_by_portable_slug( (int) $pipeline['pipeline_id'], 'static-site-flow' ); + $config = $flow['flow_config']; + $step_id = array_key_first( $config ); + + $config[ $step_id ]['config_patch_queue'] = array( + array( 'patch' => array( 'query' => 'live-a' ) ), + array( 'patch' => array( 'query' => 'live-b' ) ), + ); + $config[ $step_id ]['queue_mode'] = 'drain'; + $config[ $step_id ]['_queue_consume_revision'] = 'live-rev'; + $config[ $step_id ]['handler_configs']['mcp']['max_items'] = 1; + $this->flows_repo->update_flow( + (int) $flow['flow_id'], + array( + 'flow_config' => $config, + 'scheduling_config' => array( + 'enabled' => true, + 'interval' => 'hourly', + 'max_items' => array( 'mcp' => 1 ), + ), + ) + ); + + $upgrade = $bundle; + $upgrade['flows'][0]['flow_config']['1_step-uuid_1']['config_patch_queue'] = array( + array( 'patch' => array( 'query' => 'target-a' ) ), + array( 'patch' => array( 'query' => 'target-b' ) ), + ); + $upgrade['flows'][0]['flow_config']['1_step-uuid_1']['queue_mode'] = 'loop'; + $upgrade['flows'][0]['flow_config']['1_step-uuid_1']['handler_configs']['mcp']['max_items'] = 50; + $upgrade['flows'][0]['scheduling_config'] = array( + 'enabled' => false, + 'interval' => 'manual', + 'max_items' => array( 'mcp' => 50 ), + ); + + $second = $this->bundler->import( + $upgrade, + null, + $this->owner_id, + false, + array( 'is_upgrade' => true ) + ); + + $this->assertTrue( (bool) $second['success'], 'Upgrade succeeds when only runtime overlays changed locally.' ); + $this->assertSame( array(), $second['summary']['conflicts'] ?? array(), 'Runtime overlay drift is not reported as package source drift.' ); + + $updated_flow = $this->flows_repo->get_by_portable_slug( (int) $pipeline['pipeline_id'], 'static-site-flow' ); + $updated_step = $updated_flow['flow_config'][ $step_id ]; + $this->assertCount( 2, $updated_step['config_patch_queue'], 'Local queue entries are preserved.' ); + $this->assertSame( 'live-a', $updated_step['config_patch_queue'][0]['patch']['query'] ?? null, 'Local queue content is preserved.' ); + $this->assertSame( 'drain', $updated_step['queue_mode'] ?? null, 'Local queue mode is preserved.' ); + $this->assertSame( 'live-rev', $updated_step['_queue_consume_revision'] ?? null, 'Local consume revision is preserved.' ); + $this->assertSame( 1, $updated_step['handler_configs']['mcp']['max_items'] ?? null, 'Local burn-in max_items is preserved.' ); + $this->assertSame( 'hourly', $updated_flow['scheduling_config']['interval'] ?? null, 'Local schedule interval is preserved.' ); + $this->assertSame( array( 'mcp' => 1 ), $updated_flow['scheduling_config']['max_items'] ?? null, 'Local schedule max_items are preserved.' ); + } + /** * The silent-partial-success regression in #1801: a failure after the agent row was claimed used * to return `success: true` with a populated agent_id summary, while the row was rolled back at diff --git a/tests/agent-bundle-runtime-drift-smoke.php b/tests/agent-bundle-runtime-drift-smoke.php index a4dedb9ec..aec1008e2 100644 --- a/tests/agent-bundle-runtime-drift-smoke.php +++ b/tests/agent-bundle-runtime-drift-smoke.php @@ -37,6 +37,10 @@ function agent_bundle_runtime_drift_assert( bool $condition, string $name, array 'flow_config' => array( '88_fetch_2' => array( 'queue_mode' => 'drain', + '_queue_consume_revision' => 'live-rev-1', + 'handler_configs' => array( + 'mcp' => array( 'max_items' => 50 ), + ), 'config_patch_queue' => array( array( 'patch' => array( 'max_items' => 50, 'state' => 'stale-a' ) ), array( 'patch' => array( 'max_items' => 50, 'state' => 'stale-b' ) ), @@ -54,6 +58,9 @@ function agent_bundle_runtime_drift_assert( bool $condition, string $name, array 'flow_config' => array( '88_fetch_2' => array( 'queue_mode' => 'loop', + 'handler_configs' => array( + 'mcp' => array( 'max_items' => 5 ), + ), 'config_patch_queue' => array( array( 'patch' => array( 'max_items' => 5, 'state' => 'reviewed' ) ), ), @@ -74,6 +81,8 @@ function agent_bundle_runtime_drift_assert( bool $condition, string $name, array agent_bundle_runtime_drift_assert( 1 === ( $preview['queue_depth']['target']['config_patch_queue'] ?? null ), 'target queue depth is reported', $failures, $passes ); agent_bundle_runtime_drift_assert( 'drain' === ( $preview['queue_mode']['current']['88_fetch_2'] ?? null ), 'current queue mode is reported', $failures, $passes ); agent_bundle_runtime_drift_assert( 'loop' === ( $preview['queue_mode']['target']['88_fetch_2'] ?? null ), 'target queue mode is reported', $failures, $passes ); +agent_bundle_runtime_drift_assert( array( 'mcp' => 50 ) === ( $preview['steps'][0]['current']['handler_max_items'] ?? null ), 'current burn-in max_items is reported', $failures, $passes ); +agent_bundle_runtime_drift_assert( array( 'mcp' => 5 ) === ( $preview['steps'][0]['target']['handler_max_items'] ?? null ), 'target burn-in max_items is reported', $failures, $passes ); agent_bundle_runtime_drift_assert( true === ( $preview['scheduling']['changed'] ?? null ), 'scheduling drift is reported', $failures, $passes ); agent_bundle_runtime_drift_assert( array( 'fetch' => 50 ) === ( $preview['scheduling']['current']['max_items'] ?? null ), 'current scheduling max_items is reported', $failures, $passes ); agent_bundle_runtime_drift_assert( array( 'fetch' => 5 ) === ( $preview['scheduling']['target']['max_items'] ?? null ), 'target scheduling max_items is reported', $failures, $passes ); @@ -84,6 +93,8 @@ function agent_bundle_runtime_drift_assert( bool $condition, string $name, array $reconciled_config = AgentBundleRuntimeDrift::replace_runtime_queue_fields( $current_flow['flow_config'], $target_flow['flow_config'] ); agent_bundle_runtime_drift_assert( $target_flow['flow_config']['88_fetch_2']['config_patch_queue'] === $reconciled_config['88_fetch_2']['config_patch_queue'], 'explicit reconcile applies bundle seed queue', $failures, $passes ); agent_bundle_runtime_drift_assert( 'loop' === ( $reconciled_config['88_fetch_2']['queue_mode'] ?? null ), 'explicit reconcile applies bundle seed queue mode', $failures, $passes ); +agent_bundle_runtime_drift_assert( 5 === ( $reconciled_config['88_fetch_2']['handler_configs']['mcp']['max_items'] ?? null ), 'explicit reconcile applies bundle seed max_items', $failures, $passes ); +agent_bundle_runtime_drift_assert( ! array_key_exists( '_queue_consume_revision', $reconciled_config['88_fetch_2'] ), 'explicit reconcile clears consume revision', $failures, $passes ); $clean_preview = AgentBundleRuntimeDrift::preview( 'wordpress-com-digest', diff --git a/tests/agent-bundle-upgrade-planner-smoke.php b/tests/agent-bundle-upgrade-planner-smoke.php index f4e36ac99..5625c19be 100644 --- a/tests/agent-bundle-upgrade-planner-smoke.php +++ b/tests/agent-bundle-upgrade-planner-smoke.php @@ -131,6 +131,7 @@ public function get_error_message() { } } } +require_once dirname( __DIR__ ) . '/vendor/autoload.php'; if ( ! class_exists( 'AgentsAPI\\Core\\Workspace\\WP_Agent_Workspace_Scope' ) ) { eval( ' namespace AgentsAPI\\Core\\Workspace; @@ -207,8 +208,6 @@ public function to_array(): array { return $this->data; } } ' ); } - -require_once dirname( __DIR__ ) . '/vendor/autoload.php'; require_once dirname( __DIR__ ) . '/inc/Engine/Bundle/AgentBundleUpgradeActionHandlers.php'; use DataMachine\Engine\AI\Actions\ResolvePendingActionAbility; @@ -355,6 +354,64 @@ function installed_row( array $artifact ): array { assert_upgrade_plan_equals( 'authorization header is redacted', '[redacted]', $config_conflict['diff']['before']['intelligence']['context_servers']['wporg']['headers']['Authorization'] ?? null ); assert_upgrade_plan( 'raw local bearer is absent from config preview', false === strpos( (string) json_encode( $config_plan ), 'local-token' ) ); +echo "\n[2c] Flow runtime overlays are planned separately from source shape\n"; +$installed_runtime_flow = upgrade_artifact( + 'flow', + 'queued-fetch', + array( + 'portable_slug' => 'queued-fetch', + 'flow_name' => 'Queued Fetch', + 'flow_config' => array( + '1_fetch_1' => array( + 'handler_configs' => array( 'mcp' => array( 'provider' => 'mgs' ) ), + ), + ), + 'scheduling_policy' => 'create_paused_upgrade_preserve_existing', + 'queue_policy' => 'create_seed_upgrade_preserve_existing', + 'runtime_overlays' => array( + 'scheduling_config' => array( + 'enabled' => false, + 'interval' => 'manual', + 'max_items' => array( 'mcp' => 5 ), + ), + 'steps' => array( + '1_fetch_1' => array( + 'config_patch_queue' => array( array( 'patch' => array( 'query' => 'seed' ) ) ), + 'queue_mode' => 'loop', + '_queue_consume_revision' => 'seed-rev', + 'handler_configs' => array( 'mcp' => array( 'max_items' => 5 ) ), + ), + ), + ), + ), + 'flows/queued-fetch.json' +); +$current_runtime_flow = $installed_runtime_flow; +$target_runtime_flow = $installed_runtime_flow; +$target_runtime_flow['payload']['runtime_overlays']['scheduling_config'] = array( + 'enabled' => true, + 'interval' => 'hourly', + 'max_items' => array( 'mcp' => 50 ), +); +$target_runtime_flow['payload']['runtime_overlays']['steps']['1_fetch_1']['config_patch_queue'] = array( + array( 'patch' => array( 'query' => 'target-a' ) ), + array( 'patch' => array( 'query' => 'target-b' ) ), +); +$target_runtime_flow['payload']['runtime_overlays']['steps']['1_fetch_1']['queue_mode'] = 'drain'; +$target_runtime_flow['payload']['runtime_overlays']['steps']['1_fetch_1']['handler_configs']['mcp']['max_items'] = 50; +$runtime_plan = AgentBundleUpgradePlanner::plan( + array( installed_row( $installed_runtime_flow ) ), + array( $current_runtime_flow ), + array( $target_runtime_flow ), + array( 'bundle_slug' => 'runtime-overlay-brain' ) +)->to_array(); +$runtime_update = $runtime_plan['auto_apply'][0] ?? array(); +assert_upgrade_plan_equals( 'bundle seed overlay change auto-applies when source shape is clean', 'flow:queued-fetch', $runtime_update['artifact_key'] ?? null ); +assert_upgrade_plan_equals( 'runtime overlay seed change is not hidden', 2, count( $runtime_update['diff']['after']['runtime_overlays']['steps']['1_fetch_1']['config_patch_queue'] ?? array() ) ); +assert_upgrade_plan_equals( 'manual paused seed is visible in before diff', false, $runtime_update['diff']['before']['runtime_overlays']['scheduling_config']['enabled'] ?? null ); +assert_upgrade_plan_equals( 'scheduled seed is visible in after diff', 'hourly', $runtime_update['diff']['after']['runtime_overlays']['scheduling_config']['interval'] ?? null ); +assert_upgrade_plan_equals( 'burn-in max_items seed drift remains visible', 50, $runtime_update['diff']['after']['runtime_overlays']['steps']['1_fetch_1']['handler_configs']['mcp']['max_items'] ?? null ); + echo "\n[3] PendingAction stages bundle-upgrade previews\n"; $staged = AgentBundleUpgradePendingAction::stage( $plan, @@ -366,8 +423,9 @@ function installed_row( array $artifact ): array { ) ); assert_upgrade_plan( 'pending action staged', true === ( $staged['staged'] ?? false ) ); -assert_upgrade_plan_equals( 'pending action kind is bundle_upgrade', 'bundle_upgrade', $staged['kind'] ?? null ); -assert_upgrade_plan_equals( 'preview carries approval count', 1, $staged['preview']['counts']['needs_approval'] ?? null ); +$staged_pending = $staged['payload']['pending_action'] ?? $staged; +assert_upgrade_plan_equals( 'pending action kind is bundle_upgrade', 'bundle_upgrade', $staged_pending['kind'] ?? null ); +assert_upgrade_plan_equals( 'preview carries approval count', 1, $staged_pending['preview']['counts']['needs_approval'] ?? null ); echo "\n[4] Resolve applies approved artifacts only\n"; $applied_keys = array(); From 69c9e7d1f574d6d1fdcd355d467af1b3602bc211 Mon Sep 17 00:00:00 2001 From: Chris Huber Date: Sat, 23 May 2026 10:13:26 -0400 Subject: [PATCH 2/2] fix: normalize flow artifact runtime state --- inc/Cli/Commands/AgentBundleCommand.php | 2 +- inc/Core/Agents/AgentBundler.php | 26 +++++++++++++++++++++---- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/inc/Cli/Commands/AgentBundleCommand.php b/inc/Cli/Commands/AgentBundleCommand.php index 710abe337..d9bba463e 100644 --- a/inc/Cli/Commands/AgentBundleCommand.php +++ b/inc/Cli/Commands/AgentBundleCommand.php @@ -858,7 +858,7 @@ private function current_artifacts( array $agent, array $installed ): array { } if ( 'flow' === $type && isset( $flow_by_slug[ $id ] ) ) { $installed_payload = is_array( $record['installed_payload'] ?? null ) ? $record['installed_payload'] : null; - $artifacts[] = array( + $artifacts[] = array( 'artifact_type' => 'flow', 'artifact_id' => $id, 'source_path' => (string) ( $record['source_path'] ?? '' ), diff --git a/inc/Core/Agents/AgentBundler.php b/inc/Core/Agents/AgentBundler.php index 81b836378..2bc551542 100644 --- a/inc/Core/Agents/AgentBundler.php +++ b/inc/Core/Agents/AgentBundler.php @@ -1214,7 +1214,9 @@ private function pipeline_artifact_payload( array $pipeline, string $portable_sl } private function flow_artifact_payload( array $flow, string $portable_slug, ?array $installed_payload = null ): array { - $scheduling_policy = $this->bundle_scheduling_policy( is_array( $flow['scheduling_config'] ?? null ) ? $flow['scheduling_config'] : array() ); + $scheduling_policy = is_string( $installed_payload['scheduling_policy'] ?? null ) + ? $installed_payload['scheduling_policy'] + : $this->bundle_scheduling_policy( is_array( $flow['scheduling_config'] ?? null ) ? $flow['scheduling_config'] : array() ); $run_artifacts = \DataMachine\Engine\Bundle\BundleSchema::normalize_run_artifact_egress_policy( $flow['run_artifacts'] ?? $flow['scheduling_config']['run_artifacts'] ?? array() ); $payload = array( @@ -1385,15 +1387,21 @@ private function preserve_handler_max_items( array &$step, array $existing_step } private function flow_config_without_runtime_queues( array $flow_config ): array { - foreach ( $flow_config as &$step ) { + $normalized = array(); + foreach ( $flow_config as $flow_step_id => $step ) { if ( ! is_array( $step ) ) { + $normalized[ (string) $flow_step_id ] = $step; continue; } unset( $step['prompt_queue'], $step['config_patch_queue'], $step['queue_mode'], $step['_queue_consume_revision'] ); + unset( $step['pipeline_id'], $step['flow_id'], $step['flow_step_id'] ); unset( $step['handler_config']['max_items'] ); if ( empty( $step['handler_config'] ) ) { unset( $step['handler_config'] ); } + if ( isset( $step['pipeline_step_id'] ) ) { + $step['pipeline_step_id'] = $this->normalize_artifact_step_id( (string) $step['pipeline_step_id'] ); + } if ( is_array( $step['handler_configs'] ?? null ) ) { foreach ( $step['handler_configs'] as $handler_slug => &$handler_config ) { if ( is_array( $handler_config ) ) { @@ -1408,10 +1416,20 @@ private function flow_config_without_runtime_queues( array $flow_config ): array unset( $step['handler_configs'] ); } } + + $normalized[ $this->normalize_artifact_step_id( (string) $flow_step_id ) ] = $step; } - unset( $step ); + ksort( $normalized, SORT_STRING ); + + return $normalized; + } + + private function normalize_artifact_step_id( string $step_id ): string { + $normalized = preg_replace( '/^\d+_/', '', $step_id ); + $normalized = is_string( $normalized ) ? $normalized : $step_id; + $normalized = preg_replace( '/_\d+$/', '', $normalized ); - return $flow_config; + return is_string( $normalized ) && '' !== $normalized ? $normalized : $step_id; } private function flow_runtime_overlays( array $flow, ?array $installed_payload = null ): array {