From cc33819d352841c731740ca8f436454bd3f6fee2 Mon Sep 17 00:00:00 2001 From: Chris Huber Date: Fri, 22 May 2026 19:36:08 -0400 Subject: [PATCH 1/2] feat: queue pipeline system prompts --- inc/Abilities/Flow/QueueAbility.php | 88 ++++++++++ .../PipelineSystemPromptDirective.php | 54 ++++++- inc/Core/Steps/WorkflowConfigFactory.php | 6 + inc/Engine/AI/System/Tasks/AgentCallTask.php | 81 +++------- inc/Engine/AI/System/Tasks/SystemTask.php | 90 +++++++++++ tests/pipeline-system-prompt-queue-smoke.php | 151 ++++++++++++++++++ .../queue-consumption-consolidation-smoke.php | 65 +++++--- tests/systemtask-passthrough-smoke.php | 4 +- tests/workflow-spec-contract-smoke.php | 6 + 9 files changed, 454 insertions(+), 91 deletions(-) create mode 100644 tests/pipeline-system-prompt-queue-smoke.php diff --git a/inc/Abilities/Flow/QueueAbility.php b/inc/Abilities/Flow/QueueAbility.php index bde5cc8cd..9442d3940 100644 --- a/inc/Abilities/Flow/QueueAbility.php +++ b/inc/Abilities/Flow/QueueAbility.php @@ -33,6 +33,7 @@ namespace DataMachine\Abilities\Flow; use DataMachine\Core\Database\Flows\Flows as DB_Flows; +use DataMachine\Core\Database\Pipelines\Pipelines as DB_Pipelines; use DataMachine\Abilities\DuplicateCheck\DuplicateCheckAbility; use DataMachine\Core\Steps\FlowStepConfig; @@ -56,6 +57,16 @@ class QueueAbility { */ const SLOT_CONFIG_PATCH_QUEUE = 'config_patch_queue'; + /** + * Pipeline-step scoped queue for pipeline system prompts. + */ + const SLOT_SYSTEM_PROMPT_QUEUE = 'system_prompt_queue'; + + /** + * Pipeline-step scoped mode for system_prompt_queue. + */ + const MODE_SYSTEM_PROMPT_QUEUE = 'system_prompt_queue_mode'; + /** * Per-entry payload field name for prompt queues. */ @@ -1648,6 +1659,83 @@ public static function consumeFromQueueSlot( return null; } + /** + * Consume one item from a pipeline-step scoped queue slot per the given mode. + * + * This is the pipeline-config sibling of {@see consumeFromQueueSlot()}. It + * keeps the same drain / loop / static semantics while addressing queues that + * live on `pipeline_config[pipeline_step_id]` rather than a flow step. + * + * @param int $pipeline_id Pipeline ID. + * @param string $pipeline_step_id Pipeline step ID. + * @param string $slot Queue slot name. + * @param string $queue_mode "drain" | "loop" | "static". + * @param DB_Pipelines|null $db_pipelines Optional database instance. + * @return array|null The consumed entry, or null if the queue was empty. + * @since 0.84.0 + */ + public static function consumeFromPipelineQueueSlot( + int $pipeline_id, + string $pipeline_step_id, + string $slot, + string $queue_mode, + ?DB_Pipelines $db_pipelines = null + ): ?array { + if ( null === $db_pipelines ) { + $db_pipelines = new DB_Pipelines(); + } + + if ( ! in_array( $queue_mode, array( 'drain', 'loop', 'static' ), true ) ) { + $queue_mode = 'static'; + } + + $pipeline_config = $db_pipelines->get_pipeline_config( $pipeline_id ); + if ( ! isset( $pipeline_config[ $pipeline_step_id ] ) || ! is_array( $pipeline_config[ $pipeline_step_id ] ) ) { + return null; + } + + $queue = $pipeline_config[ $pipeline_step_id ][ $slot ] ?? array(); + if ( empty( $queue ) || ! is_array( $queue ) ) { + return null; + } + + if ( 'static' === $queue_mode ) { + return is_array( $queue[0] ?? null ) ? $queue[0] : null; + } + + $entry = array_shift( $queue ); + if ( ! is_array( $entry ) ) { + return null; + } + + if ( 'loop' === $queue_mode ) { + $queue[] = $entry; + } + + $pipeline_config[ $pipeline_step_id ][ $slot ] = $queue; + $pipeline_config[ $pipeline_step_id ]['_queue_consume_revision'] = + (int) ( $pipeline_config[ $pipeline_step_id ]['_queue_consume_revision'] ?? 0 ) + 1; + + if ( ! $db_pipelines->update_pipeline( $pipeline_id, array( 'pipeline_config' => $pipeline_config ) ) ) { + return null; + } + + do_action( + 'datamachine_log', + 'info', + 'Item consumed from pipeline queue', + array( + 'pipeline_id' => $pipeline_id, + 'pipeline_step_id' => $pipeline_step_id, + 'slot' => $slot, + 'queue_mode' => $queue_mode, + 'remaining_count' => count( $queue ), + ) + ); + + return $entry; + } + /** * Validate flow_id and flow_step_id input fields. * diff --git a/inc/Core/Steps/AI/Directives/PipelineSystemPromptDirective.php b/inc/Core/Steps/AI/Directives/PipelineSystemPromptDirective.php index 5c40223ba..3b29b8103 100644 --- a/inc/Core/Steps/AI/Directives/PipelineSystemPromptDirective.php +++ b/inc/Core/Steps/AI/Directives/PipelineSystemPromptDirective.php @@ -19,6 +19,7 @@ namespace DataMachine\Core\Steps\AI\Directives; +use DataMachine\Abilities\Flow\QueueAbility; use DataMachine\Abilities\HandlerAbilities; use DataMachine\Core\Steps\FlowStepConfig; @@ -35,7 +36,7 @@ public static function get_outputs( string $provider_name, array $tools, ?string $engine_data = datamachine_get_engine_data( $payload['job_id'] ); $pipeline_config = $engine_data['pipeline_config'] ?? array(); $step_config = $pipeline_config[ $pipeline_step_id ] ?? array(); - $system_prompt = $step_config['system_prompt'] ?? ''; + $system_prompt = self::resolveSystemPrompt( $pipeline_step_id, $step_config, $payload, $engine_data ); if ( empty( $system_prompt ) ) { return array(); @@ -158,6 +159,57 @@ private static function buildWorkflowVisualization( $pipeline_step_id, $current_ return $workflow_string; } + + /** + * Resolve the pipeline system prompt, optionally from a pipeline-step queue. + * + * Existing `system_prompt` behavior is preserved when no queue is configured. + * When a queue is configured, `system_prompt_queue_mode` controls whether the + * queue head is peeked, drained, or rotated. + * + * @param string $pipeline_step_id Pipeline step ID. + * @param array $step_config Pipeline step config. + * @param array $payload Directive payload. + * @param array $engine_data Job engine data. + * @return string Resolved system prompt. + */ + private static function resolveSystemPrompt( string $pipeline_step_id, array $step_config, array $payload, array $engine_data ): string { + $queue_present = array_key_exists( QueueAbility::SLOT_SYSTEM_PROMPT_QUEUE, $step_config ) + && is_array( $step_config[ QueueAbility::SLOT_SYSTEM_PROMPT_QUEUE ] ); + + if ( ! $queue_present ) { + return (string) ( $step_config['system_prompt'] ?? '' ); + } + + $queue_mode = $step_config[ QueueAbility::MODE_SYSTEM_PROMPT_QUEUE ] ?? 'static'; + if ( ! in_array( $queue_mode, array( 'drain', 'loop', 'static' ), true ) ) { + $queue_mode = 'static'; + } + + $pipeline_id = $payload['pipeline_id'] ?? $engine_data['pipeline_id'] ?? $engine_data['job']['pipeline_id'] ?? 0; + $entry = null; + + if ( is_numeric( $pipeline_id ) && (int) $pipeline_id > 0 ) { + $entry = QueueAbility::consumeFromPipelineQueueSlot( + (int) $pipeline_id, + $pipeline_step_id, + QueueAbility::SLOT_SYSTEM_PROMPT_QUEUE, + $queue_mode + ); + } elseif ( 'static' === $queue_mode ) { + $entry = $step_config[ QueueAbility::SLOT_SYSTEM_PROMPT_QUEUE ][0] ?? null; + } + + if ( is_array( $entry ) && ! empty( $entry[ QueueAbility::FIELD_PROMPT ] ) ) { + return (string) $entry[ QueueAbility::FIELD_PROMPT ]; + } + + if ( in_array( $queue_mode, array( 'drain', 'loop' ), true ) ) { + return ''; + } + + return (string) ( $step_config['system_prompt'] ?? '' ); + } } // Register with universal agent directive system (Priority 50 = pipeline system prompt) diff --git a/inc/Core/Steps/WorkflowConfigFactory.php b/inc/Core/Steps/WorkflowConfigFactory.php index 9c7c8f211..70e789e29 100644 --- a/inc/Core/Steps/WorkflowConfigFactory.php +++ b/inc/Core/Steps/WorkflowConfigFactory.php @@ -137,6 +137,12 @@ private static function pipelineStepFromWorkflowStep( array $step, string $pipel if ( 'ai' === $step_type ) { $pipeline_step['system_prompt'] = $step['system_prompt'] ?? ''; $pipeline_step['disabled_tools'] = is_array( $step['disabled_tools'] ?? null ) ? array_values( $step['disabled_tools'] ) : array(); + if ( is_array( $step['system_prompt_queue'] ?? null ) ) { + $pipeline_step['system_prompt_queue'] = array_values( $step['system_prompt_queue'] ); + } + if ( isset( $step['system_prompt_queue_mode'] ) && in_array( $step['system_prompt_queue_mode'], array( 'drain', 'loop', 'static' ), true ) ) { + $pipeline_step['system_prompt_queue_mode'] = $step['system_prompt_queue_mode']; + } $agent_modes = self::sanitizeAgentModes( $step['agent_modes'] ?? array() ); if ( ! empty( $agent_modes ) ) { $pipeline_step['agent_modes'] = $agent_modes; diff --git a/inc/Engine/AI/System/Tasks/AgentCallTask.php b/inc/Engine/AI/System/Tasks/AgentCallTask.php index fe6259eb8..a9c289f33 100644 --- a/inc/Engine/AI/System/Tasks/AgentCallTask.php +++ b/inc/Engine/AI/System/Tasks/AgentCallTask.php @@ -11,8 +11,6 @@ namespace DataMachine\Engine\AI\System\Tasks; -use DataMachine\Abilities\Flow\QueueAbility; - defined( 'ABSPATH' ) || exit; class AgentCallTask extends SystemTask { @@ -109,71 +107,28 @@ private function resolveQueuedInput( int $jobId, array $params ): ?array { $input = is_array( $params['input'] ?? null ) ? $params['input'] : array(); $delivery = is_array( $params['delivery'] ?? null ) ? $params['delivery'] : array(); - $flow_id = (int) ( $params['flow_id'] ?? 0 ); - $flow_step_id = $params['flow_step_id'] ?? ''; - $queue_mode = $params['queue_mode'] ?? 'static'; - if ( ! in_array( $queue_mode, array( 'drain', 'loop', 'static' ), true ) ) { - $queue_mode = 'static'; - } - - $from_queue = false; - if ( 'static' !== $queue_mode && $flow_id > 0 && ! empty( $flow_step_id ) ) { - $queued_item = QueueAbility::consumeFromQueueSlot( - $flow_id, - $flow_step_id, - QueueAbility::SLOT_PROMPT_QUEUE, - $queue_mode + $queue_prompt = $this->resolveQueueablePromptField( $jobId, $params, (string) ( $input['task'] ?? '' ) ); + if ( $queue_prompt['skipped'] ) { + do_action( + 'datamachine_log', + 'info', + 'Agent call task skipped — queue mode requires per-tick input but queue is empty, no configured fallback', + array( + 'job_id' => $jobId, + ) ); - if ( $queued_item && ! empty( $queued_item['prompt'] ) ) { - $input['task'] = $queued_item['prompt']; - $from_queue = true; - - \datamachine_merge_engine_data( - $jobId, - array( - 'queued_prompt_backup' => array( - 'slot' => QueueAbility::SLOT_PROMPT_QUEUE, - 'mode' => $queue_mode, - 'prompt' => $queued_item['prompt'], - 'flow_id' => $flow_id, - 'flow_step_id' => $flow_step_id, - 'added_at' => $queued_item['added_at'] ?? null, - ), - ) - ); - - do_action( - 'datamachine_log', - 'info', - 'Agent call task using input task from queue', - array( - 'job_id' => $jobId, - 'flow_id' => $flow_id, - 'flow_step_id' => $flow_step_id, - 'queue_mode' => $queue_mode, - ) - ); - } elseif ( empty( $input['task'] ) ) { - do_action( - 'datamachine_log', - 'info', - 'Agent call task skipped — queue mode requires per-tick input but queue is empty, no configured fallback', - array( - 'job_id' => $jobId, - 'queue_mode' => $queue_mode, - ) - ); - - $this->completeJob( $jobId, array( - 'skipped' => true, - 'reason' => sprintf( 'Queue mode "%s" but queue empty, no configured input task', $queue_mode ), - 'completed_at' => current_time( 'mysql' ), - ) ); - return null; - } + $this->completeJob( $jobId, array( + 'skipped' => true, + 'reason' => $queue_prompt['reason'], + 'completed_at' => current_time( 'mysql' ), + ) ); + return null; } + $input['task'] = $queue_prompt['prompt']; + $from_queue = $queue_prompt['from_queue']; + $context = is_array( $input['context'] ?? null ) ? $input['context'] : array(); $context = array_merge( $context, diff --git a/inc/Engine/AI/System/Tasks/SystemTask.php b/inc/Engine/AI/System/Tasks/SystemTask.php index 3ef62eaaa..43f0902f7 100644 --- a/inc/Engine/AI/System/Tasks/SystemTask.php +++ b/inc/Engine/AI/System/Tasks/SystemTask.php @@ -42,6 +42,7 @@ defined( 'ABSPATH' ) || exit; +use DataMachine\Abilities\Flow\QueueAbility; use DataMachine\Core\Database\Jobs\Jobs; use DataMachine\Core\PluginSettings; use DataMachine\Engine\AI\System\SystemTaskPromptRegistry; @@ -175,6 +176,95 @@ public function getFlowStepConfigPassthrough(): array { return array(); } + /** + * Resolve a queueable system-task prompt field from flow-step context. + * + * System tasks run from `system_task` flow steps, so queueable prompt fields + * share the existing flow-step scoped `prompt_queue` + `queue_mode` storage. + * The concrete task decides where the returned prompt lands in its task params. + * + * @param int $jobId Job ID for retry backup engine data. + * @param array $params Task params / engine data. + * @param string $fallbackPrompt Configured prompt to use when no queue item applies. + * @return array{prompt: string, from_queue: bool, skipped: bool, reason: string|null} + */ + protected function resolveQueueablePromptField( int $jobId, array $params, string $fallbackPrompt = '' ): array { + $flow_id = (int) ( $params['flow_id'] ?? 0 ); + $flow_step_id = $params['flow_step_id'] ?? ''; + $queue_mode = $params['queue_mode'] ?? 'static'; + if ( ! in_array( $queue_mode, array( 'drain', 'loop', 'static' ), true ) ) { + $queue_mode = 'static'; + } + + if ( 'static' === $queue_mode || $flow_id <= 0 || empty( $flow_step_id ) ) { + return array( + 'prompt' => $fallbackPrompt, + 'from_queue' => false, + 'skipped' => false, + 'reason' => null, + ); + } + + $queued_item = QueueAbility::consumeFromQueueSlot( + $flow_id, + (string) $flow_step_id, + QueueAbility::SLOT_PROMPT_QUEUE, + $queue_mode + ); + + if ( $queued_item && ! empty( $queued_item['prompt'] ) ) { + \datamachine_merge_engine_data( + $jobId, + array( + 'queued_prompt_backup' => array( + 'slot' => QueueAbility::SLOT_PROMPT_QUEUE, + 'mode' => $queue_mode, + 'prompt' => $queued_item['prompt'], + 'flow_id' => $flow_id, + 'flow_step_id' => (string) $flow_step_id, + 'added_at' => $queued_item['added_at'] ?? null, + ), + ) + ); + + do_action( + 'datamachine_log', + 'info', + 'System task using prompt from queue', + array( + 'job_id' => $jobId, + 'task_type' => $this->getTaskType(), + 'flow_id' => $flow_id, + 'flow_step_id' => (string) $flow_step_id, + 'queue_mode' => $queue_mode, + ) + ); + + return array( + 'prompt' => (string) $queued_item['prompt'], + 'from_queue' => true, + 'skipped' => false, + 'reason' => null, + ); + } + + if ( '' === $fallbackPrompt ) { + return array( + 'prompt' => '', + 'from_queue' => false, + 'skipped' => true, + 'reason' => sprintf( 'Queue mode "%s" but queue empty, no configured prompt fallback', $queue_mode ), + ); + } + + return array( + 'prompt' => $fallbackPrompt, + 'from_queue' => false, + 'skipped' => false, + 'reason' => null, + ); + } + // ─── Job lifecycle helpers ──────────────────────────────────────── // Used by executeTask() implementations to signal completion/failure. diff --git a/tests/pipeline-system-prompt-queue-smoke.php b/tests/pipeline-system-prompt-queue-smoke.php new file mode 100644 index 000000000..874c6eb15 --- /dev/null +++ b/tests/pipeline-system-prompt-queue-smoke.php @@ -0,0 +1,151 @@ + null, 'mutated' => false, 'remaining_count' => 0 ); + } + + $queue = $pipeline_config[ $pipeline_step_id ][ $slot ] ?? array(); + if ( empty( $queue ) || ! is_array( $queue ) ) { + return array( 'entry' => null, 'mutated' => false, 'remaining_count' => 0 ); + } + + if ( 'static' === $queue_mode ) { + return array( 'entry' => $queue[0], 'mutated' => false, 'remaining_count' => count( $queue ) ); + } + + $entry = array_shift( $queue ); + if ( 'loop' === $queue_mode ) { + $queue[] = $entry; + } + $pipeline_config[ $pipeline_step_id ][ $slot ] = $queue; + + return array( 'entry' => $entry, 'mutated' => true, 'remaining_count' => count( $queue ) ); +} + +echo "\n[semantics:1] pipeline queues preserve drain / loop / static semantics\n"; +$pc = array( + 'ai_step' => array( + 'system_prompt_queue' => array( + array( 'prompt' => 'Variant A', 'added_at' => 't0' ), + array( 'prompt' => 'Variant B', 'added_at' => 't1' ), + ), + ), +); +$result = simulate_pipeline_queue_consume( $pc, 'ai_step', 'system_prompt_queue', 'static' ); +assert_pipeline_queue( 'static peeks Variant A', 'Variant A' === ( $result['entry']['prompt'] ?? null ) ); +assert_pipeline_queue( 'static does not mutate queue', false === $result['mutated'] && 2 === count( $pc['ai_step']['system_prompt_queue'] ) ); + +$pc = array( + 'ai_step' => array( + 'system_prompt_queue' => array( + array( 'prompt' => 'Variant A', 'added_at' => 't0' ), + array( 'prompt' => 'Variant B', 'added_at' => 't1' ), + ), + ), +); +$result = simulate_pipeline_queue_consume( $pc, 'ai_step', 'system_prompt_queue', 'drain' ); +assert_pipeline_queue( 'drain pops Variant A', 'Variant A' === ( $result['entry']['prompt'] ?? null ) ); +assert_pipeline_queue( 'drain leaves Variant B as head', 1 === count( $pc['ai_step']['system_prompt_queue'] ) && 'Variant B' === $pc['ai_step']['system_prompt_queue'][0]['prompt'] ); + +$pc = array( + 'ai_step' => array( + 'system_prompt_queue' => array( + array( 'prompt' => 'Variant A', 'added_at' => 't0' ), + array( 'prompt' => 'Variant B', 'added_at' => 't1' ), + ), + ), +); +$result = simulate_pipeline_queue_consume( $pc, 'ai_step', 'system_prompt_queue', 'loop' ); +assert_pipeline_queue( 'loop returns Variant A', 'Variant A' === ( $result['entry']['prompt'] ?? null ) ); +assert_pipeline_queue( 'loop rotates Variant A to tail', 2 === count( $pc['ai_step']['system_prompt_queue'] ) && 'Variant B' === $pc['ai_step']['system_prompt_queue'][0]['prompt'] && 'Variant A' === $pc['ai_step']['system_prompt_queue'][1]['prompt'] ); + +echo "\n"; +if ( 0 === $failed ) { + echo "=== pipeline-system-prompt-queue-smoke: ALL PASS ({$total}) ===\n"; + exit( 0 ); +} +echo "=== pipeline-system-prompt-queue-smoke: {$failed} FAIL of {$total} ===\n"; +exit( 1 ); diff --git a/tests/queue-consumption-consolidation-smoke.php b/tests/queue-consumption-consolidation-smoke.php index f468aebca..8b2cc4f06 100644 --- a/tests/queue-consumption-consolidation-smoke.php +++ b/tests/queue-consumption-consolidation-smoke.php @@ -9,9 +9,8 @@ * * - AIStep uses `QueueableTrait::consumeFromPromptQueue()` * - FetchStep uses `QueueableTrait::consumeFromConfigPatchQueue()` - * - AgentCallTask called `QueueAbility::popFromQueue() / ::loopFromQueue()` - * directly — reimplementing pop logic minus the - * static-peek branch. + * - AgentCallTask called queue helpers directly — reimplementing + * prompt-field queue consumption outside SystemTask. * * The trait's `private static consumeFromQueueSlot()` and QueueAbility's * `private static popFromQueueSlot()` were near-duplicates (drain/loop @@ -24,17 +23,18 @@ * #1299 promotes the consumer-agnostic core to * `QueueAbility::consumeFromQueueSlot()` (public static), deletes the * three orphan helpers (`popFromQueue`, `loopFromQueue`, - * `popConfigPatchFromQueue` — the last had ZERO callers), and migrates - * AgentCallTask to call the new method directly. Single source of truth - * for the drain / loop / static semantics regardless of consumer. + * `popConfigPatchFromQueue` — the last had ZERO callers), and routes + * system-task prompt consumption through SystemTask's queueable prompt + * helper. Single source of truth for the drain / loop / static semantics + * regardless of consumer. * * This smoke validates: * * 1. `QueueAbility::consumeFromQueueSlot` exists with the documented * signature. * 2. Trait's wrapper methods delegate to the new public method. - * 3. AgentCallTask calls `consumeFromQueueSlot` directly (no - * `popFromQueue` / `loopFromQueue` references remain). + * 3. SystemTask owns queueable prompt-field consumption; AgentCallTask + * delegates to it (no `popFromQueue` / `loopFromQueue` references remain). * 4. The deleted helpers are truly gone from QueueAbility source. * 5. AgentCallTask now writes `queued_prompt_backup` for retry parity * with the trait consumers. @@ -125,27 +125,42 @@ function assert_consolidation( string $name, bool $condition ): void { ); // --------------------------------------------------------------- -// SECTION 3: AgentCallTask calls the new method directly. +// SECTION 3: SystemTask owns the prompt-field queue helper. // --------------------------------------------------------------- -echo "\n[agent_call:1] AgentCallTask calls QueueAbility::consumeFromQueueSlot()\n"; +echo "\n[system_task:1] SystemTask prompt-field helper calls QueueAbility::consumeFromQueueSlot()\n"; $ping_src = (string) file_get_contents( $root_dir . '/inc/Engine/AI/System/Tasks/AgentCallTask.php' ); +$system_task_src = (string) file_get_contents( + $root_dir . '/inc/Engine/AI/System/Tasks/SystemTask.php' +); assert_consolidation( - 'AgentCallTask calls QueueAbility::consumeFromQueueSlot()', - false !== strpos( $ping_src, 'QueueAbility::consumeFromQueueSlot(' ) + 'SystemTask declares resolveQueueablePromptField()', + false !== strpos( $system_task_src, 'function resolveQueueablePromptField' ) ); assert_consolidation( - 'call passes QueueAbility::SLOT_PROMPT_QUEUE constant', - false !== strpos( $ping_src, 'QueueAbility::SLOT_PROMPT_QUEUE' ) + 'SystemTask calls QueueAbility::consumeFromQueueSlot()', + false !== strpos( $system_task_src, 'QueueAbility::consumeFromQueueSlot(' ) ); assert_consolidation( - 'call threads queue_mode through (no separate loop/drain branch)', - (bool) preg_match( - '/consumeFromQueueSlot\([^)]*\$queue_mode\s*\)/s', - $ping_src - ) + 'SystemTask helper passes QueueAbility::SLOT_PROMPT_QUEUE constant', + false !== strpos( $system_task_src, 'QueueAbility::SLOT_PROMPT_QUEUE' ) +); +assert_consolidation( + 'SystemTask helper threads queue_mode through (no separate loop/drain branch)', + false !== strpos( $system_task_src, "QueueAbility::consumeFromQueueSlot(\n\t\t\t\$flow_id," ) + && false !== strpos( $system_task_src, "\n\t\t\t\$queue_mode\n\t\t);" ) +); + +echo "\n[agent_call:1] AgentCallTask delegates prompt queue consumption to SystemTask\n"; +assert_consolidation( + 'AgentCallTask calls resolveQueueablePromptField()', + false !== strpos( $ping_src, 'resolveQueueablePromptField(' ) +); +assert_consolidation( + 'AgentCallTask does NOT call QueueAbility::consumeFromQueueSlot() directly', + false === strpos( $ping_src, 'QueueAbility::consumeFromQueueSlot(' ) ); echo "\n[agent_call:2] No references to the deleted helpers in AgentCallTask\n"; @@ -222,15 +237,15 @@ function assert_consolidation( string $name, bool $condition ): void { echo "\n[parity:1] AgentCallTask writes queued_prompt_backup after a mutating consume\n"; assert_consolidation( - 'datamachine_merge_engine_data() called from AgentCallTask', - false !== strpos( $ping_src, '\\datamachine_merge_engine_data(' ) + 'datamachine_merge_engine_data() called from SystemTask prompt helper', + false !== strpos( $system_task_src, '\\datamachine_merge_engine_data(' ) ); assert_consolidation( 'queued_prompt_backup payload includes slot, mode, prompt, flow_id, flow_step_id', - false !== strpos( $ping_src, "'queued_prompt_backup'" ) - && false !== strpos( $ping_src, "'slot' => QueueAbility::SLOT_PROMPT_QUEUE" ) - && false !== strpos( $ping_src, "'mode' => \$queue_mode" ) - && false !== strpos( $ping_src, "'prompt' => \$queued_item['prompt']" ) + false !== strpos( $system_task_src, "'queued_prompt_backup'" ) + && false !== strpos( $system_task_src, "'slot' => QueueAbility::SLOT_PROMPT_QUEUE" ) + && false !== strpos( $system_task_src, "'mode' => \$queue_mode" ) + && false !== strpos( $system_task_src, "'prompt' => \$queued_item['prompt']" ) ); // --------------------------------------------------------------- diff --git a/tests/systemtask-passthrough-smoke.php b/tests/systemtask-passthrough-smoke.php index e970d494a..c1835c8d9 100644 --- a/tests/systemtask-passthrough-smoke.php +++ b/tests/systemtask-passthrough-smoke.php @@ -356,8 +356,8 @@ public function getFlowStepConfigPassthrough(): array { ); assert_passthrough( 'both base methods default to inert (false / empty array)', - false !== strpos( $base_src, "return false;\n\t}\n\n\t/**\n\t * Declare flow_step_config keys" ) - && false !== strpos( $base_src, "return array();\n\t}\n\n\t// ─── Job lifecycle helpers" ) + false !== strpos( $base_src, "public function needsPipelineContext(): bool {\n\t\treturn false;\n\t}" ) + && false !== strpos( $base_src, "public function getFlowStepConfigPassthrough(): array {\n\t\treturn array();\n\t}" ) ); echo "\n[source:3] AgentCallTask overrides match the test fixture\n"; diff --git a/tests/workflow-spec-contract-smoke.php b/tests/workflow-spec-contract-smoke.php index a47248565..039dbb1ea 100644 --- a/tests/workflow-spec-contract-smoke.php +++ b/tests/workflow-spec-contract-smoke.php @@ -164,6 +164,10 @@ function assert_workflow_spec_equals( $expected, $actual, string $name, array &$ 'label' => 'Summarize', 'agent_modes' => array( 'rl_task' ), 'system_prompt' => 'Be concise.', + 'system_prompt_queue' => array( + array( 'prompt' => 'Variant A', 'added_at' => '2026-05-22T00:00:00Z' ), + ), + 'system_prompt_queue_mode' => 'loop', 'disabled_tools' => array( 'danger_tool' ), 'completion_assertions' => array( 'required_tool_names' => array( 'publish_result' ), @@ -192,6 +196,8 @@ function assert_workflow_spec_equals( $expected, $actual, string $name, array &$ assert_workflow_spec_equals( 'Fetch Source', $pipeline_steps[0]['label'] ?? null, 'ephemeral pipeline_config preserves fetch label', $failures, $passes ); assert_workflow_spec_equals( array( 'rl_task' ), $pipeline_steps[1]['agent_modes'] ?? null, 'ephemeral pipeline_config preserves AI agent modes', $failures, $passes ); assert_workflow_spec_equals( 'Be concise.', $pipeline_steps[1]['system_prompt'] ?? null, 'ephemeral pipeline_config preserves AI system prompt', $failures, $passes ); +assert_workflow_spec_equals( array( array( 'prompt' => 'Variant A', 'added_at' => '2026-05-22T00:00:00Z' ) ), $pipeline_steps[1]['system_prompt_queue'] ?? null, 'ephemeral pipeline_config preserves AI system prompt queue', $failures, $passes ); +assert_workflow_spec_equals( 'loop', $pipeline_steps[1]['system_prompt_queue_mode'] ?? null, 'ephemeral pipeline_config preserves AI system prompt queue mode', $failures, $passes ); assert_workflow_spec_equals( array( 'danger_tool' ), $pipeline_steps[1]['disabled_tools'] ?? null, 'ephemeral pipeline_config preserves AI disabled tools', $failures, $passes ); assert_workflow_spec_equals( array( 'required_tool_names' => array( 'publish_result' ) ), $pipeline_steps[1]['completion_assertions'] ?? null, 'ephemeral pipeline_config preserves AI completion assertions', $failures, $passes ); assert_workflow_spec_equals( array( array( 'id' => 'after-worktree', 'max_calls' => 4 ) ), $pipeline_steps[1]['tool_runtime_rules'] ?? null, 'ephemeral pipeline_config preserves AI tool runtime rules', $failures, $passes ); From 48a212cd053e50aa57eb0b2642ec6d2dc3b14a90 Mon Sep 17 00:00:00 2001 From: Chris Huber Date: Sat, 23 May 2026 09:59:09 -0400 Subject: [PATCH 2/2] fix: align workflow config assignment --- inc/Core/Steps/WorkflowConfigFactory.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inc/Core/Steps/WorkflowConfigFactory.php b/inc/Core/Steps/WorkflowConfigFactory.php index 70e789e29..89dafa24b 100644 --- a/inc/Core/Steps/WorkflowConfigFactory.php +++ b/inc/Core/Steps/WorkflowConfigFactory.php @@ -143,7 +143,7 @@ private static function pipelineStepFromWorkflowStep( array $step, string $pipel if ( isset( $step['system_prompt_queue_mode'] ) && in_array( $step['system_prompt_queue_mode'], array( 'drain', 'loop', 'static' ), true ) ) { $pipeline_step['system_prompt_queue_mode'] = $step['system_prompt_queue_mode']; } - $agent_modes = self::sanitizeAgentModes( $step['agent_modes'] ?? array() ); + $agent_modes = self::sanitizeAgentModes( $step['agent_modes'] ?? array() ); if ( ! empty( $agent_modes ) ) { $pipeline_step['agent_modes'] = $agent_modes; }