Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions inc/Abilities/Flow/QueueAbility.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
namespace DataMachine\Abilities\Flow;

use DataMachine\Core\Database\Flows\Flows as DB_Flows;
use DataMachine\Core\Database\Pipelines\Pipelines as DB_Pipelines;
use DataMachine\Abilities\DuplicateCheck\DuplicateCheckAbility;
use DataMachine\Core\Steps\FlowStepConfig;

Expand All @@ -56,6 +57,16 @@ class QueueAbility {
*/
const SLOT_CONFIG_PATCH_QUEUE = 'config_patch_queue';

/**
* Pipeline-step scoped queue for pipeline system prompts.
*/
const SLOT_SYSTEM_PROMPT_QUEUE = 'system_prompt_queue';

/**
* Pipeline-step scoped mode for system_prompt_queue.
*/
const MODE_SYSTEM_PROMPT_QUEUE = 'system_prompt_queue_mode';

/**
* Per-entry payload field name for prompt queues.
*/
Expand Down Expand Up @@ -1648,6 +1659,83 @@ public static function consumeFromQueueSlot(
return null;
}

/**
* Consume one item from a pipeline-step scoped queue slot per the given mode.
*
* This is the pipeline-config sibling of {@see consumeFromQueueSlot()}. It
* keeps the same drain / loop / static semantics while addressing queues that
* live on `pipeline_config[pipeline_step_id]` rather than a flow step.
*
* @param int $pipeline_id Pipeline ID.
* @param string $pipeline_step_id Pipeline step ID.
* @param string $slot Queue slot name.
* @param string $queue_mode "drain" | "loop" | "static".
* @param DB_Pipelines|null $db_pipelines Optional database instance.
* @return array|null The consumed entry, or null if the queue was empty.
* @since 0.84.0
*/
public static function consumeFromPipelineQueueSlot(
int $pipeline_id,
string $pipeline_step_id,
string $slot,
string $queue_mode,
?DB_Pipelines $db_pipelines = null
): ?array {
if ( null === $db_pipelines ) {
$db_pipelines = new DB_Pipelines();
}

if ( ! in_array( $queue_mode, array( 'drain', 'loop', 'static' ), true ) ) {
$queue_mode = 'static';
}

$pipeline_config = $db_pipelines->get_pipeline_config( $pipeline_id );
if ( ! isset( $pipeline_config[ $pipeline_step_id ] ) || ! is_array( $pipeline_config[ $pipeline_step_id ] ) ) {
return null;
}

$queue = $pipeline_config[ $pipeline_step_id ][ $slot ] ?? array();
if ( empty( $queue ) || ! is_array( $queue ) ) {
return null;
}

if ( 'static' === $queue_mode ) {
return is_array( $queue[0] ?? null ) ? $queue[0] : null;
}

$entry = array_shift( $queue );
if ( ! is_array( $entry ) ) {
return null;
}

if ( 'loop' === $queue_mode ) {
$queue[] = $entry;
}

$pipeline_config[ $pipeline_step_id ][ $slot ] = $queue;
$pipeline_config[ $pipeline_step_id ]['_queue_consume_revision'] =
(int) ( $pipeline_config[ $pipeline_step_id ]['_queue_consume_revision'] ?? 0 ) + 1;

if ( ! $db_pipelines->update_pipeline( $pipeline_id, array( 'pipeline_config' => $pipeline_config ) ) ) {
return null;
}

do_action(
'datamachine_log',
'info',
'Item consumed from pipeline queue',
array(
'pipeline_id' => $pipeline_id,
'pipeline_step_id' => $pipeline_step_id,
'slot' => $slot,
'queue_mode' => $queue_mode,
'remaining_count' => count( $queue ),
)
);

return $entry;
}

/**
* Validate flow_id and flow_step_id input fields.
*
Expand Down
54 changes: 53 additions & 1 deletion inc/Core/Steps/AI/Directives/PipelineSystemPromptDirective.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

namespace DataMachine\Core\Steps\AI\Directives;

use DataMachine\Abilities\Flow\QueueAbility;
use DataMachine\Abilities\HandlerAbilities;
use DataMachine\Core\Steps\FlowStepConfig;

Expand All @@ -35,7 +36,7 @@ public static function get_outputs( string $provider_name, array $tools, ?string
$engine_data = datamachine_get_engine_data( $payload['job_id'] );
$pipeline_config = $engine_data['pipeline_config'] ?? array();
$step_config = $pipeline_config[ $pipeline_step_id ] ?? array();
$system_prompt = $step_config['system_prompt'] ?? '';
$system_prompt = self::resolveSystemPrompt( $pipeline_step_id, $step_config, $payload, $engine_data );

if ( empty( $system_prompt ) ) {
return array();
Expand Down Expand Up @@ -158,6 +159,57 @@ private static function buildWorkflowVisualization( $pipeline_step_id, $current_

return $workflow_string;
}

/**
* Resolve the pipeline system prompt, optionally from a pipeline-step queue.
*
* Existing `system_prompt` behavior is preserved when no queue is configured.
* When a queue is configured, `system_prompt_queue_mode` controls whether the
* queue head is peeked, drained, or rotated.
*
* @param string $pipeline_step_id Pipeline step ID.
* @param array $step_config Pipeline step config.
* @param array $payload Directive payload.
* @param array $engine_data Job engine data.
* @return string Resolved system prompt.
*/
private static function resolveSystemPrompt( string $pipeline_step_id, array $step_config, array $payload, array $engine_data ): string {
$queue_present = array_key_exists( QueueAbility::SLOT_SYSTEM_PROMPT_QUEUE, $step_config )
&& is_array( $step_config[ QueueAbility::SLOT_SYSTEM_PROMPT_QUEUE ] );

if ( ! $queue_present ) {
return (string) ( $step_config['system_prompt'] ?? '' );
}

$queue_mode = $step_config[ QueueAbility::MODE_SYSTEM_PROMPT_QUEUE ] ?? 'static';
if ( ! in_array( $queue_mode, array( 'drain', 'loop', 'static' ), true ) ) {
$queue_mode = 'static';
}

$pipeline_id = $payload['pipeline_id'] ?? $engine_data['pipeline_id'] ?? $engine_data['job']['pipeline_id'] ?? 0;
$entry = null;

if ( is_numeric( $pipeline_id ) && (int) $pipeline_id > 0 ) {
$entry = QueueAbility::consumeFromPipelineQueueSlot(
(int) $pipeline_id,
$pipeline_step_id,
QueueAbility::SLOT_SYSTEM_PROMPT_QUEUE,
$queue_mode
);
} elseif ( 'static' === $queue_mode ) {
$entry = $step_config[ QueueAbility::SLOT_SYSTEM_PROMPT_QUEUE ][0] ?? null;
}

if ( is_array( $entry ) && ! empty( $entry[ QueueAbility::FIELD_PROMPT ] ) ) {
return (string) $entry[ QueueAbility::FIELD_PROMPT ];
}

if ( in_array( $queue_mode, array( 'drain', 'loop' ), true ) ) {
return '';
}

return (string) ( $step_config['system_prompt'] ?? '' );
}
}

// Register with universal agent directive system (Priority 50 = pipeline system prompt)
Expand Down
8 changes: 7 additions & 1 deletion inc/Core/Steps/WorkflowConfigFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,13 @@ private static function pipelineStepFromWorkflowStep( array $step, string $pipel
if ( 'ai' === $step_type ) {
$pipeline_step['system_prompt'] = $step['system_prompt'] ?? '';
$pipeline_step['disabled_tools'] = is_array( $step['disabled_tools'] ?? null ) ? array_values( $step['disabled_tools'] ) : array();
$agent_modes = self::sanitizeAgentModes( $step['agent_modes'] ?? array() );
if ( is_array( $step['system_prompt_queue'] ?? null ) ) {
$pipeline_step['system_prompt_queue'] = array_values( $step['system_prompt_queue'] );
}
if ( isset( $step['system_prompt_queue_mode'] ) && in_array( $step['system_prompt_queue_mode'], array( 'drain', 'loop', 'static' ), true ) ) {
$pipeline_step['system_prompt_queue_mode'] = $step['system_prompt_queue_mode'];
}
$agent_modes = self::sanitizeAgentModes( $step['agent_modes'] ?? array() );
if ( ! empty( $agent_modes ) ) {
$pipeline_step['agent_modes'] = $agent_modes;
}
Expand Down
81 changes: 18 additions & 63 deletions inc/Engine/AI/System/Tasks/AgentCallTask.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@

namespace DataMachine\Engine\AI\System\Tasks;

use DataMachine\Abilities\Flow\QueueAbility;

defined( 'ABSPATH' ) || exit;

class AgentCallTask extends SystemTask {
Expand Down Expand Up @@ -109,71 +107,28 @@ private function resolveQueuedInput( int $jobId, array $params ): ?array {
$input = is_array( $params['input'] ?? null ) ? $params['input'] : array();
$delivery = is_array( $params['delivery'] ?? null ) ? $params['delivery'] : array();

$flow_id = (int) ( $params['flow_id'] ?? 0 );
$flow_step_id = $params['flow_step_id'] ?? '';
$queue_mode = $params['queue_mode'] ?? 'static';
if ( ! in_array( $queue_mode, array( 'drain', 'loop', 'static' ), true ) ) {
$queue_mode = 'static';
}

$from_queue = false;
if ( 'static' !== $queue_mode && $flow_id > 0 && ! empty( $flow_step_id ) ) {
$queued_item = QueueAbility::consumeFromQueueSlot(
$flow_id,
$flow_step_id,
QueueAbility::SLOT_PROMPT_QUEUE,
$queue_mode
$queue_prompt = $this->resolveQueueablePromptField( $jobId, $params, (string) ( $input['task'] ?? '' ) );
if ( $queue_prompt['skipped'] ) {
do_action(
'datamachine_log',
'info',
'Agent call task skipped — queue mode requires per-tick input but queue is empty, no configured fallback',
array(
'job_id' => $jobId,
)
);

if ( $queued_item && ! empty( $queued_item['prompt'] ) ) {
$input['task'] = $queued_item['prompt'];
$from_queue = true;

\datamachine_merge_engine_data(
$jobId,
array(
'queued_prompt_backup' => array(
'slot' => QueueAbility::SLOT_PROMPT_QUEUE,
'mode' => $queue_mode,
'prompt' => $queued_item['prompt'],
'flow_id' => $flow_id,
'flow_step_id' => $flow_step_id,
'added_at' => $queued_item['added_at'] ?? null,
),
)
);

do_action(
'datamachine_log',
'info',
'Agent call task using input task from queue',
array(
'job_id' => $jobId,
'flow_id' => $flow_id,
'flow_step_id' => $flow_step_id,
'queue_mode' => $queue_mode,
)
);
} elseif ( empty( $input['task'] ) ) {
do_action(
'datamachine_log',
'info',
'Agent call task skipped — queue mode requires per-tick input but queue is empty, no configured fallback',
array(
'job_id' => $jobId,
'queue_mode' => $queue_mode,
)
);

$this->completeJob( $jobId, array(
'skipped' => true,
'reason' => sprintf( 'Queue mode "%s" but queue empty, no configured input task', $queue_mode ),
'completed_at' => current_time( 'mysql' ),
) );
return null;
}
$this->completeJob( $jobId, array(
'skipped' => true,
'reason' => $queue_prompt['reason'],
'completed_at' => current_time( 'mysql' ),
) );
return null;
}

$input['task'] = $queue_prompt['prompt'];
$from_queue = $queue_prompt['from_queue'];

$context = is_array( $input['context'] ?? null ) ? $input['context'] : array();
$context = array_merge(
$context,
Expand Down
Loading
Loading