Skip to content

Commit ae8dd90

Browse files
ericjutacodex
andcommitted
feat: sync upstream multi-agent controls
Cherry-picks and adapts upstream deb4509 and 2874286 for this branch. Co-authored-by: Codex <noreply@openai.com>
1 parent ea47d2b commit ae8dd90

16 files changed

Lines changed: 316 additions & 46 deletions

codex-rs/config/src/config_toml.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -546,6 +546,9 @@ pub struct AgentsToml {
546546
/// Default maximum runtime in seconds for agent job workers.
547547
#[schemars(range(min = 1))]
548548
pub job_max_runtime_seconds: Option<u64>,
549+
/// Whether to record a model-visible message when an agent turn is interrupted.
550+
/// Defaults to true.
551+
pub interrupt_message: Option<bool>,
549552

550553
/// User-defined role declarations keyed by role name.
551554
///

codex-rs/core/config.schema.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@
5555
"$ref": "#/definitions/AgentRoleToml"
5656
},
5757
"properties": {
58+
"interrupt_message": {
59+
"description": "Whether to record a model-visible message when an agent turn is interrupted. Defaults to true.",
60+
"type": "boolean"
61+
},
5862
"job_max_runtime_seconds": {
5963
"description": "Default maximum runtime in seconds for agent job workers.",
6064
"format": "uint64",

codex-rs/core/src/config/config_tests.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3642,6 +3642,7 @@ async fn load_config_rejects_missing_agent_role_config_file() -> std::io::Result
36423642
max_threads: None,
36433643
max_depth: None,
36443644
job_max_runtime_seconds: None,
3645+
interrupt_message: None,
36453646
roles: BTreeMap::from([(
36463647
"researcher".to_string(),
36473648
AgentRoleToml {
@@ -4557,6 +4558,29 @@ model = "gpt-5-mini"
45574558
Ok(())
45584559
}
45594560

4561+
#[tokio::test]
4562+
async fn load_config_resolves_agent_interrupt_message() -> std::io::Result<()> {
4563+
let codex_home = TempDir::new()?;
4564+
let cfg = ConfigToml {
4565+
agents: Some(AgentsToml {
4566+
interrupt_message: Some(false),
4567+
..Default::default()
4568+
}),
4569+
..Default::default()
4570+
};
4571+
4572+
let config = Config::load_from_base_config_with_overrides(
4573+
cfg,
4574+
ConfigOverrides::default(),
4575+
codex_home.abs(),
4576+
)
4577+
.await?;
4578+
4579+
assert!(!config.agent_interrupt_message_enabled);
4580+
4581+
Ok(())
4582+
}
4583+
45604584
#[tokio::test]
45614585
async fn load_config_normalizes_agent_role_nickname_candidates() -> std::io::Result<()> {
45624586
let codex_home = TempDir::new()?;
@@ -4565,6 +4589,7 @@ async fn load_config_normalizes_agent_role_nickname_candidates() -> std::io::Res
45654589
max_threads: None,
45664590
max_depth: None,
45674591
job_max_runtime_seconds: None,
4592+
interrupt_message: None,
45684593
roles: BTreeMap::from([(
45694594
"researcher".to_string(),
45704595
AgentRoleToml {
@@ -4607,6 +4632,7 @@ async fn load_config_rejects_empty_agent_role_nickname_candidates() -> std::io::
46074632
max_threads: None,
46084633
max_depth: None,
46094634
job_max_runtime_seconds: None,
4635+
interrupt_message: None,
46104636
roles: BTreeMap::from([(
46114637
"researcher".to_string(),
46124638
AgentRoleToml {
@@ -4643,6 +4669,7 @@ async fn load_config_rejects_duplicate_agent_role_nickname_candidates() -> std::
46434669
max_threads: None,
46444670
max_depth: None,
46454671
job_max_runtime_seconds: None,
4672+
interrupt_message: None,
46464673
roles: BTreeMap::from([(
46474674
"researcher".to_string(),
46484675
AgentRoleToml {
@@ -4679,6 +4706,7 @@ async fn load_config_rejects_unsafe_agent_role_nickname_candidates() -> std::io:
46794706
max_threads: None,
46804707
max_depth: None,
46814708
job_max_runtime_seconds: None,
4709+
interrupt_message: None,
46824710
roles: BTreeMap::from([(
46834711
"researcher".to_string(),
46844712
AgentRoleToml {
@@ -4934,6 +4962,7 @@ async fn test_precedence_fixture_with_o3_profile() -> std::io::Result<()> {
49344962
agent_roles: BTreeMap::new(),
49354963
memories: MemoriesConfig::default(),
49364964
agent_job_max_runtime_seconds: DEFAULT_AGENT_JOB_MAX_RUNTIME_SECONDS,
4965+
agent_interrupt_message_enabled: true,
49374966
codex_home: fixture.codex_home(),
49384967
sqlite_home: fixture.codex_home().to_path_buf(),
49394968
log_dir: fixture.codex_home().join("log").to_path_buf(),
@@ -5086,6 +5115,7 @@ async fn test_precedence_fixture_with_gpt3_profile() -> std::io::Result<()> {
50865115
agent_roles: BTreeMap::new(),
50875116
memories: MemoriesConfig::default(),
50885117
agent_job_max_runtime_seconds: DEFAULT_AGENT_JOB_MAX_RUNTIME_SECONDS,
5118+
agent_interrupt_message_enabled: true,
50895119
codex_home: fixture.codex_home(),
50905120
sqlite_home: fixture.codex_home().to_path_buf(),
50915121
log_dir: fixture.codex_home().join("log").to_path_buf(),
@@ -5236,6 +5266,7 @@ async fn test_precedence_fixture_with_zdr_profile() -> std::io::Result<()> {
52365266
agent_roles: BTreeMap::new(),
52375267
memories: MemoriesConfig::default(),
52385268
agent_job_max_runtime_seconds: DEFAULT_AGENT_JOB_MAX_RUNTIME_SECONDS,
5269+
agent_interrupt_message_enabled: true,
52395270
codex_home: fixture.codex_home(),
52405271
sqlite_home: fixture.codex_home().to_path_buf(),
52415272
log_dir: fixture.codex_home().join("log").to_path_buf(),
@@ -5371,6 +5402,7 @@ async fn test_precedence_fixture_with_gpt5_profile() -> std::io::Result<()> {
53715402
agent_roles: BTreeMap::new(),
53725403
memories: MemoriesConfig::default(),
53735404
agent_job_max_runtime_seconds: DEFAULT_AGENT_JOB_MAX_RUNTIME_SECONDS,
5405+
agent_interrupt_message_enabled: true,
53745406
codex_home: fixture.codex_home(),
53755407
sqlite_home: fixture.codex_home().to_path_buf(),
53765408
log_dir: fixture.codex_home().join("log").to_path_buf(),

codex-rs/core/src/config/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,9 @@ pub struct Config {
421421
/// Maximum runtime in seconds for agent job workers before they are failed.
422422
pub agent_job_max_runtime_seconds: Option<u64>,
423423

424+
/// Whether to record a model-visible message when an agent turn is interrupted.
425+
pub agent_interrupt_message_enabled: bool,
426+
424427
/// Maximum nesting depth allowed for spawned agent threads.
425428
pub agent_max_depth: i32,
426429

@@ -1890,6 +1893,11 @@ impl Config {
18901893
"agents.job_max_runtime_seconds must fit within a 64-bit signed integer",
18911894
));
18921895
}
1896+
let agent_interrupt_message_enabled = cfg
1897+
.agents
1898+
.as_ref()
1899+
.and_then(|agents| agents.interrupt_message)
1900+
.unwrap_or(true);
18931901
let background_terminal_max_timeout = cfg
18941902
.background_terminal_max_timeout
18951903
.unwrap_or(DEFAULT_MAX_BACKGROUND_TERMINAL_TIMEOUT_MS)
@@ -2237,6 +2245,7 @@ impl Config {
22372245
agent_roles,
22382246
memories: cfg.memories.unwrap_or_default().into(),
22392247
agent_job_max_runtime_seconds,
2248+
agent_interrupt_message_enabled,
22402249
codex_home,
22412250
sqlite_home,
22422251
log_dir,

codex-rs/core/src/session/review.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ pub(super) async fn spawn_review_thread(
5151
.with_spawn_agent_usage_hint(config.multi_agent_v2.usage_hint_enabled)
5252
.with_spawn_agent_usage_hint_text(config.multi_agent_v2.usage_hint_text.clone())
5353
.with_hide_spawn_agent_metadata(config.multi_agent_v2.hide_spawn_agent_metadata)
54+
.with_max_concurrent_threads_per_session(config.agent_max_threads)
5455
.with_agent_type_description(crate::agent::role::spawn_tool_spec::build(
5556
&config.agent_roles,
5657
));

codex-rs/core/src/session/turn_context.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ impl TurnContext {
149149
.with_spawn_agent_usage_hint(config.multi_agent_v2.usage_hint_enabled)
150150
.with_spawn_agent_usage_hint_text(config.multi_agent_v2.usage_hint_text.clone())
151151
.with_hide_spawn_agent_metadata(config.multi_agent_v2.hide_spawn_agent_metadata)
152+
.with_max_concurrent_threads_per_session(config.agent_max_threads)
152153
.with_agent_type_description(crate::agent::role::spawn_tool_spec::build(
153154
&config.agent_roles,
154155
));
@@ -387,6 +388,7 @@ impl Session {
387388
.with_spawn_agent_usage_hint(per_turn_config.multi_agent_v2.usage_hint_enabled)
388389
.with_spawn_agent_usage_hint_text(per_turn_config.multi_agent_v2.usage_hint_text.clone())
389390
.with_hide_spawn_agent_metadata(per_turn_config.multi_agent_v2.hide_spawn_agent_metadata)
391+
.with_max_concurrent_threads_per_session(per_turn_config.agent_max_threads)
390392
.with_agent_type_description(crate::agent::role::spawn_tool_spec::build(
391393
&per_turn_config.agent_roles,
392394
));

codex-rs/core/src/tasks/mod.rs

Lines changed: 51 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use tracing::info_span;
1919
use tracing::trace;
2020
use tracing::warn;
2121

22+
use crate::config::Config;
2223
use crate::contextual_user_message::TURN_ABORTED_CLOSE_TAG;
2324
use crate::contextual_user_message::TURN_ABORTED_OPEN_TAG;
2425
use crate::hook_runtime::PendingInputHookDisposition;
@@ -65,15 +66,45 @@ const GRACEFULL_INTERRUPTION_TIMEOUT_MS: u64 = 100;
6566
pub(crate) const TURN_ABORTED_INTERRUPTED_GUIDANCE: &str = "The user interrupted the previous turn on purpose. Any running unified exec processes may still be running in the background. If any tools/commands were aborted, they may have partially executed.";
6667
pub(crate) const TURN_ABORTED_INTERRUPTED_DEVELOPER_GUIDANCE: &str = "The previous turn was interrupted on purpose. Any running unified exec processes may still be running in the background. If any tools/commands were aborted, they may have partially executed.";
6768

69+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70+
pub(crate) enum InterruptedTurnHistoryMarker {
71+
Disabled,
72+
ContextualUser,
73+
Developer,
74+
}
75+
76+
impl InterruptedTurnHistoryMarker {
77+
pub(crate) fn from_config(config: &Config) -> Self {
78+
if !config.agent_interrupt_message_enabled {
79+
return Self::Disabled;
80+
}
81+
if config.features.enabled(Feature::MultiAgentV2) {
82+
Self::Developer
83+
} else {
84+
Self::ContextualUser
85+
}
86+
}
87+
}
88+
6889
/// Shared model-visible marker used by both the real interrupt path and
6990
/// interrupted fork snapshots.
70-
pub(crate) fn interrupted_turn_history_marker(multi_agent_v2_enabled: bool) -> ResponseItem {
71-
let (role, guidance) = if multi_agent_v2_enabled {
72-
("developer", TURN_ABORTED_INTERRUPTED_DEVELOPER_GUIDANCE)
73-
} else {
74-
("user", TURN_ABORTED_INTERRUPTED_GUIDANCE)
75-
};
91+
pub(crate) fn interrupted_turn_history_marker(
92+
marker: InterruptedTurnHistoryMarker,
93+
) -> Option<ResponseItem> {
94+
match marker {
95+
InterruptedTurnHistoryMarker::Disabled => None,
96+
InterruptedTurnHistoryMarker::ContextualUser => Some(interrupted_turn_message(
97+
"user",
98+
TURN_ABORTED_INTERRUPTED_GUIDANCE,
99+
)),
100+
InterruptedTurnHistoryMarker::Developer => Some(interrupted_turn_message(
101+
"developer",
102+
TURN_ABORTED_INTERRUPTED_DEVELOPER_GUIDANCE,
103+
)),
104+
}
105+
}
76106

107+
fn interrupted_turn_message(role: &str, guidance: &str) -> ResponseItem {
77108
ResponseItem::Message {
78109
id: None,
79110
role: role.to_string(),
@@ -641,15 +672,20 @@ impl Session {
641672
if reason == TurnAbortReason::Interrupted {
642673
self.cleanup_after_interrupt(&task.turn_context).await;
643674

644-
let marker = interrupted_turn_history_marker(self.enabled(Feature::MultiAgentV2));
645-
self.record_into_history(std::slice::from_ref(&marker), task.turn_context.as_ref())
646-
.await;
647-
self.persist_rollout_items(&[RolloutItem::ResponseItem(marker)])
648-
.await;
649-
// Ensure the marker is durably visible before emitting TurnAborted: some clients
650-
// synchronously re-read the rollout on receipt of the abort event.
651-
if let Err(err) = self.flush_rollout().await {
652-
warn!("failed to flush interrupted-turn marker before emitting TurnAborted: {err}");
675+
if let Some(marker) = interrupted_turn_history_marker(
676+
InterruptedTurnHistoryMarker::from_config(task.turn_context.config.as_ref()),
677+
) {
678+
self.record_into_history(std::slice::from_ref(&marker), task.turn_context.as_ref())
679+
.await;
680+
self.persist_rollout_items(&[RolloutItem::ResponseItem(marker)])
681+
.await;
682+
// Ensure the marker is durably visible before emitting TurnAborted: some clients
683+
// synchronously re-read the rollout on receipt of the abort event.
684+
if let Err(err) = self.flush_rollout().await {
685+
warn!(
686+
"failed to flush interrupted-turn marker before emitting TurnAborted: {err}"
687+
);
688+
}
653689
}
654690
}
655691

codex-rs/core/src/thread_manager.rs

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@ use crate::session::INITIAL_SUBMIT_ID;
1414
use crate::shell_snapshot::ShellSnapshot;
1515
use crate::skills_watcher::SkillsWatcher;
1616
use crate::skills_watcher::SkillsWatcherEvent;
17+
use crate::tasks::InterruptedTurnHistoryMarker;
1718
use crate::tasks::interrupted_turn_history_marker;
1819
use codex_analytics::AnalyticsEventsClient;
1920
use codex_app_server_protocol::ThreadHistoryBuilder;
2021
use codex_app_server_protocol::TurnStatus;
2122
use codex_exec_server::EnvironmentManager;
22-
use codex_features::Feature;
2323
use codex_login::AuthManager;
2424
use codex_login::CodexAuth;
2525
use codex_model_provider_info::ModelProviderInfo;
@@ -686,7 +686,7 @@ impl ThreadManager {
686686
let snapshot = snapshot.into();
687687
let history = RolloutRecorder::get_rollout_history(&path).await?;
688688
let snapshot_state = snapshot_turn_state(&history);
689-
let multi_agent_v2_enabled = config.features.enabled(Feature::MultiAgentV2);
689+
let interrupted_marker = InterruptedTurnHistoryMarker::from_config(&config);
690690
let history = match snapshot {
691691
ForkSnapshot::TruncateBeforeNthUserMessage(nth_user_message) => {
692692
truncate_before_nth_user_message(history, nth_user_message, &snapshot_state)
@@ -702,7 +702,7 @@ impl ThreadManager {
702702
append_interrupted_boundary(
703703
history,
704704
snapshot_state.active_turn_id,
705-
multi_agent_v2_enabled,
705+
interrupted_marker,
706706
)
707707
} else {
708708
history
@@ -1108,7 +1108,7 @@ fn snapshot_turn_state(history: &InitialHistory) -> SnapshotTurnState {
11081108
fn append_interrupted_boundary(
11091109
history: InitialHistory,
11101110
turn_id: Option<String>,
1111-
multi_agent_v2_enabled: bool,
1111+
interrupted_marker: InterruptedTurnHistoryMarker,
11121112
) -> InitialHistory {
11131113
let aborted_event = RolloutItem::EventMsg(EventMsg::TurnAborted(TurnAbortedEvent {
11141114
turn_id,
@@ -1118,23 +1118,25 @@ fn append_interrupted_boundary(
11181118
}));
11191119

11201120
match history {
1121-
InitialHistory::New | InitialHistory::Cleared => InitialHistory::Forked(vec![
1122-
RolloutItem::ResponseItem(interrupted_turn_history_marker(multi_agent_v2_enabled)),
1123-
aborted_event,
1124-
]),
1121+
InitialHistory::New | InitialHistory::Cleared => {
1122+
let mut history = Vec::new();
1123+
if let Some(marker) = interrupted_turn_history_marker(interrupted_marker) {
1124+
history.push(RolloutItem::ResponseItem(marker));
1125+
}
1126+
history.push(aborted_event);
1127+
InitialHistory::Forked(history)
1128+
}
11251129
InitialHistory::Forked(mut history) => {
1126-
history.push(RolloutItem::ResponseItem(interrupted_turn_history_marker(
1127-
multi_agent_v2_enabled,
1128-
)));
1130+
if let Some(marker) = interrupted_turn_history_marker(interrupted_marker) {
1131+
history.push(RolloutItem::ResponseItem(marker));
1132+
}
11291133
history.push(aborted_event);
11301134
InitialHistory::Forked(history)
11311135
}
11321136
InitialHistory::Resumed(mut resumed) => {
1133-
resumed
1134-
.history
1135-
.push(RolloutItem::ResponseItem(interrupted_turn_history_marker(
1136-
multi_agent_v2_enabled,
1137-
)));
1137+
if let Some(marker) = interrupted_turn_history_marker(interrupted_marker) {
1138+
resumed.history.push(RolloutItem::ResponseItem(marker));
1139+
}
11381140
resumed.history.push(aborted_event);
11391141
InitialHistory::Forked(resumed.history)
11401142
}

0 commit comments

Comments
 (0)