Skip to content

Commit ea47d2b

Browse files
ericjutacodex
andcommitted
fix: sync multi-agent interruption marker behavior
Cherry-picks and adapts upstream 120aa07 for this branch's inline turn-aborted marker implementation. Co-authored-by: Codex <noreply@openai.com>
1 parent bb25a5b commit ea47d2b

4 files changed

Lines changed: 126 additions & 23 deletions

File tree

codex-rs/core/src/tasks/mod.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,18 +62,23 @@ pub(crate) use user_shell::UserShellCommandTask;
6262
pub(crate) use user_shell::execute_user_shell_command;
6363

6464
const GRACEFULL_INTERRUPTION_TIMEOUT_MS: u64 = 100;
65-
const TURN_ABORTED_INTERRUPTED_GUIDANCE: &str = "The user interrupted the previous turn on purpose. Any running unified exec processes may still be running in the background. If any tools/commands were aborted, they may have partially executed.";
65+
pub(crate) const TURN_ABORTED_INTERRUPTED_GUIDANCE: &str = "The user interrupted the previous turn on purpose. Any running unified exec processes may still be running in the background. If any tools/commands were aborted, they may have partially executed.";
66+
pub(crate) const TURN_ABORTED_INTERRUPTED_DEVELOPER_GUIDANCE: &str = "The previous turn was interrupted on purpose. Any running unified exec processes may still be running in the background. If any tools/commands were aborted, they may have partially executed.";
6667

6768
/// Shared model-visible marker used by both the real interrupt path and
6869
/// interrupted fork snapshots.
69-
pub(crate) fn interrupted_turn_history_marker() -> ResponseItem {
70+
pub(crate) fn interrupted_turn_history_marker(multi_agent_v2_enabled: bool) -> ResponseItem {
71+
let (role, guidance) = if multi_agent_v2_enabled {
72+
("developer", TURN_ABORTED_INTERRUPTED_DEVELOPER_GUIDANCE)
73+
} else {
74+
("user", TURN_ABORTED_INTERRUPTED_GUIDANCE)
75+
};
76+
7077
ResponseItem::Message {
7178
id: None,
72-
role: "user".to_string(),
79+
role: role.to_string(),
7380
content: vec![ContentItem::InputText {
74-
text: format!(
75-
"{TURN_ABORTED_OPEN_TAG}\n{TURN_ABORTED_INTERRUPTED_GUIDANCE}\n{TURN_ABORTED_CLOSE_TAG}"
76-
),
81+
text: format!("{TURN_ABORTED_OPEN_TAG}\n{guidance}\n{TURN_ABORTED_CLOSE_TAG}"),
7782
}],
7883
end_turn: None,
7984
phase: None,
@@ -636,7 +641,7 @@ impl Session {
636641
if reason == TurnAbortReason::Interrupted {
637642
self.cleanup_after_interrupt(&task.turn_context).await;
638643

639-
let marker = interrupted_turn_history_marker();
644+
let marker = interrupted_turn_history_marker(self.enabled(Feature::MultiAgentV2));
640645
self.record_into_history(std::slice::from_ref(&marker), task.turn_context.as_ref())
641646
.await;
642647
self.persist_rollout_items(&[RolloutItem::ResponseItem(marker)])

codex-rs/core/src/thread_manager.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use codex_analytics::AnalyticsEventsClient;
1919
use codex_app_server_protocol::ThreadHistoryBuilder;
2020
use codex_app_server_protocol::TurnStatus;
2121
use codex_exec_server::EnvironmentManager;
22+
use codex_features::Feature;
2223
use codex_login::AuthManager;
2324
use codex_login::CodexAuth;
2425
use codex_model_provider_info::ModelProviderInfo;
@@ -685,6 +686,7 @@ impl ThreadManager {
685686
let snapshot = snapshot.into();
686687
let history = RolloutRecorder::get_rollout_history(&path).await?;
687688
let snapshot_state = snapshot_turn_state(&history);
689+
let multi_agent_v2_enabled = config.features.enabled(Feature::MultiAgentV2);
688690
let history = match snapshot {
689691
ForkSnapshot::TruncateBeforeNthUserMessage(nth_user_message) => {
690692
truncate_before_nth_user_message(history, nth_user_message, &snapshot_state)
@@ -697,7 +699,11 @@ impl ThreadManager {
697699
InitialHistory::Resumed(resumed) => InitialHistory::Forked(resumed.history),
698700
};
699701
if snapshot_state.ends_mid_turn {
700-
append_interrupted_boundary(history, snapshot_state.active_turn_id)
702+
append_interrupted_boundary(
703+
history,
704+
snapshot_state.active_turn_id,
705+
multi_agent_v2_enabled,
706+
)
701707
} else {
702708
history
703709
}
@@ -1099,7 +1105,11 @@ fn snapshot_turn_state(history: &InitialHistory) -> SnapshotTurnState {
10991105
/// Append the same persisted interrupt boundary used by the live interrupt path
11001106
/// to an existing fork snapshot after the source thread has been confirmed to
11011107
/// be mid-turn.
1102-
fn append_interrupted_boundary(history: InitialHistory, turn_id: Option<String>) -> InitialHistory {
1108+
fn append_interrupted_boundary(
1109+
history: InitialHistory,
1110+
turn_id: Option<String>,
1111+
multi_agent_v2_enabled: bool,
1112+
) -> InitialHistory {
11031113
let aborted_event = RolloutItem::EventMsg(EventMsg::TurnAborted(TurnAbortedEvent {
11041114
turn_id,
11051115
reason: TurnAbortReason::Interrupted,
@@ -1109,18 +1119,22 @@ fn append_interrupted_boundary(history: InitialHistory, turn_id: Option<String>)
11091119

11101120
match history {
11111121
InitialHistory::New | InitialHistory::Cleared => InitialHistory::Forked(vec![
1112-
RolloutItem::ResponseItem(interrupted_turn_history_marker()),
1122+
RolloutItem::ResponseItem(interrupted_turn_history_marker(multi_agent_v2_enabled)),
11131123
aborted_event,
11141124
]),
11151125
InitialHistory::Forked(mut history) => {
1116-
history.push(RolloutItem::ResponseItem(interrupted_turn_history_marker()));
1126+
history.push(RolloutItem::ResponseItem(interrupted_turn_history_marker(
1127+
multi_agent_v2_enabled,
1128+
)));
11171129
history.push(aborted_event);
11181130
InitialHistory::Forked(history)
11191131
}
11201132
InitialHistory::Resumed(mut resumed) => {
11211133
resumed
11221134
.history
1123-
.push(RolloutItem::ResponseItem(interrupted_turn_history_marker()));
1135+
.push(RolloutItem::ResponseItem(interrupted_turn_history_marker(
1136+
multi_agent_v2_enabled,
1137+
)));
11241138
resumed.history.push(aborted_event);
11251139
InitialHistory::Forked(resumed.history)
11261140
}

codex-rs/core/src/thread_manager_tests.rs

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -314,12 +314,19 @@ fn interrupted_fork_snapshot_appends_interrupt_boundary() {
314314

315315
assert_eq!(
316316
serde_json::to_value(
317-
append_interrupted_boundary(committed_history, /*turn_id*/ None).get_rollout_items()
317+
append_interrupted_boundary(
318+
committed_history,
319+
/*turn_id*/ None,
320+
/*multi_agent_v2_enabled*/ false,
321+
)
322+
.get_rollout_items()
318323
)
319324
.expect("serialize interrupted fork history"),
320325
serde_json::to_value(vec![
321326
RolloutItem::ResponseItem(user_msg("hello")),
322-
RolloutItem::ResponseItem(interrupted_turn_history_marker()),
327+
RolloutItem::ResponseItem(interrupted_turn_history_marker(
328+
/*multi_agent_v2_enabled*/ false,
329+
)),
323330
RolloutItem::EventMsg(EventMsg::TurnAborted(TurnAbortedEvent {
324331
turn_id: None,
325332
reason: TurnAbortReason::Interrupted,
@@ -331,11 +338,18 @@ fn interrupted_fork_snapshot_appends_interrupt_boundary() {
331338
);
332339
assert_eq!(
333340
serde_json::to_value(
334-
append_interrupted_boundary(InitialHistory::New, /*turn_id*/ None).get_rollout_items()
341+
append_interrupted_boundary(
342+
InitialHistory::New,
343+
/*turn_id*/ None,
344+
/*multi_agent_v2_enabled*/ false,
345+
)
346+
.get_rollout_items()
335347
)
336348
.expect("serialize interrupted empty fork history"),
337349
serde_json::to_value(vec![
338-
RolloutItem::ResponseItem(interrupted_turn_history_marker()),
350+
RolloutItem::ResponseItem(interrupted_turn_history_marker(
351+
/*multi_agent_v2_enabled*/ false,
352+
)),
339353
RolloutItem::EventMsg(EventMsg::TurnAborted(TurnAbortedEvent {
340354
turn_id: None,
341355
reason: TurnAbortReason::Interrupted,
@@ -352,7 +366,9 @@ fn interrupted_snapshot_is_not_mid_turn() {
352366
let interrupted_history = InitialHistory::Forked(vec![
353367
RolloutItem::ResponseItem(user_msg("hello")),
354368
RolloutItem::ResponseItem(assistant_msg("partial")),
355-
RolloutItem::ResponseItem(interrupted_turn_history_marker()),
369+
RolloutItem::ResponseItem(interrupted_turn_history_marker(
370+
/*multi_agent_v2_enabled*/ false,
371+
)),
356372
RolloutItem::EventMsg(EventMsg::TurnAborted(TurnAbortedEvent {
357373
turn_id: Some("turn-1".to_string()),
358374
reason: TurnAbortReason::Interrupted,
@@ -371,6 +387,24 @@ fn interrupted_snapshot_is_not_mid_turn() {
371387
);
372388
}
373389

390+
#[test]
391+
fn multi_agent_v2_interrupted_marker_uses_developer_input_message() {
392+
let marker = interrupted_turn_history_marker(/*multi_agent_v2_enabled*/ true);
393+
394+
let ResponseItem::Message { role, content, .. } = marker else {
395+
panic!("expected interrupted marker to be a message");
396+
};
397+
assert_eq!(role, "developer");
398+
assert!(
399+
matches!(
400+
content.as_slice(),
401+
[ContentItem::InputText { text }]
402+
if text.contains(crate::tasks::TURN_ABORTED_INTERRUPTED_DEVELOPER_GUIDANCE)
403+
),
404+
"expected interrupted marker to use developer InputText content"
405+
);
406+
}
407+
374408
#[test]
375409
fn completed_legacy_event_history_is_not_mid_turn() {
376410
let completed_history = InitialHistory::Forked(vec![
@@ -488,9 +522,10 @@ async fn interrupted_fork_snapshot_does_not_synthesize_turn_id_for_legacy_histor
488522
.into_iter()
489523
.filter(|item| !matches!(item, RolloutItem::SessionMeta(_)))
490524
.collect();
491-
let interrupted_marker_json =
492-
serde_json::to_value(RolloutItem::ResponseItem(interrupted_turn_history_marker()))
493-
.expect("serialize interrupted marker");
525+
let interrupted_marker_json = serde_json::to_value(RolloutItem::ResponseItem(
526+
interrupted_turn_history_marker(/*multi_agent_v2_enabled*/ false),
527+
))
528+
.expect("serialize interrupted marker");
494529
let interrupted_abort_json = serde_json::to_value(RolloutItem::EventMsg(
495530
EventMsg::TurnAborted(TurnAbortedEvent {
496531
turn_id: expected_turn_id,
@@ -683,9 +718,10 @@ async fn interrupted_fork_snapshot_uses_persisted_mid_turn_history_without_live_
683718
.into_iter()
684719
.filter(|item| !matches!(item, RolloutItem::SessionMeta(_)))
685720
.collect();
686-
let interrupted_marker_json =
687-
serde_json::to_value(RolloutItem::ResponseItem(interrupted_turn_history_marker()))
688-
.expect("serialize interrupted marker");
721+
let interrupted_marker_json = serde_json::to_value(RolloutItem::ResponseItem(
722+
interrupted_turn_history_marker(/*multi_agent_v2_enabled*/ false),
723+
))
724+
.expect("serialize interrupted marker");
689725
assert_eq!(
690726
forked_rollout_items
691727
.iter()

codex-rs/core/src/tools/handlers/multi_agents_tests.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ use crate::session_prefix::format_subagent_notification_message;
99
use crate::state::TaskKind;
1010
use crate::tasks::SessionTask;
1111
use crate::tasks::SessionTaskContext;
12+
use crate::tasks::TURN_ABORTED_INTERRUPTED_DEVELOPER_GUIDANCE;
13+
use crate::tasks::TURN_ABORTED_INTERRUPTED_GUIDANCE;
1214
use crate::tools::context::ToolOutput;
1315
use crate::tools::handlers::multi_agents_v2::CloseAgentHandler as CloseAgentHandlerV2;
1416
use crate::tools::handlers::multi_agents_v2::FollowupTaskHandler as FollowupTaskHandlerV2;
@@ -1564,6 +1566,52 @@ async fn multi_agent_v2_followup_task_interrupts_busy_child_without_losing_messa
15641566
}));
15651567

15661568
wait_for_turn_aborted(&thread, &interrupted_turn_id, TurnAbortReason::Interrupted).await;
1569+
let history_items = thread
1570+
.codex
1571+
.session
1572+
.clone_history()
1573+
.await
1574+
.raw_items()
1575+
.to_vec();
1576+
assert!(
1577+
history_items.iter().any(|item| matches!(
1578+
item,
1579+
ResponseItem::Message { role, content, .. }
1580+
if role == "developer"
1581+
&& content.iter().any(|content_item| matches!(
1582+
content_item,
1583+
ContentItem::InputText { text }
1584+
if text.contains(TURN_ABORTED_INTERRUPTED_DEVELOPER_GUIDANCE)
1585+
))
1586+
)),
1587+
"v2 interrupted-turn marker should be recorded as a developer input message"
1588+
);
1589+
assert!(
1590+
!history_items.iter().any(|item| matches!(
1591+
item,
1592+
ResponseItem::Message { role, content, .. }
1593+
if role == "user"
1594+
&& content.iter().any(|content_item| matches!(
1595+
content_item,
1596+
ContentItem::InputText { text } | ContentItem::OutputText { text }
1597+
if text.contains(TURN_ABORTED_INTERRUPTED_GUIDANCE)
1598+
))
1599+
)),
1600+
"v2 interrupted-turn marker should not be recorded as a user message"
1601+
);
1602+
assert!(
1603+
!history_items.iter().any(|item| matches!(
1604+
item,
1605+
ResponseItem::Message { role, content, .. }
1606+
if role == "assistant"
1607+
&& content.iter().any(|content_item| matches!(
1608+
content_item,
1609+
ContentItem::InputText { text } | ContentItem::OutputText { text }
1610+
if text.contains(TURN_ABORTED_INTERRUPTED_DEVELOPER_GUIDANCE)
1611+
))
1612+
)),
1613+
"v2 interrupted-turn marker should not be recorded as an assistant message"
1614+
);
15671615
wait_for_redirected_envelope_in_history(
15681616
&thread,
15691617
&InterAgentCommunication::new(

0 commit comments

Comments
 (0)