diff --git a/codex-rs/exec/src/lib.rs b/codex-rs/exec/src/lib.rs index c10b86f696f..22ecfe8f66a 100644 --- a/codex-rs/exec/src/lib.rs +++ b/codex-rs/exec/src/lib.rs @@ -150,6 +150,12 @@ struct ExecRunArgs { stderr_with_ansi: bool, } +#[derive(Debug, PartialEq, Eq)] +enum ResumeTarget { + StartNewThread, + ResumePath(PathBuf), +} + fn exec_root_span() -> tracing::Span { info_span!( "codex.exec", @@ -548,11 +554,11 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> { })?; // Handle resume subcommand by resolving a rollout path and using explicit resume API. - let (primary_thread_id, fallback_session_configured) = - if let Some(ExecCommand::Resume(args)) = command.as_ref() { - let resume_path = resolve_resume_path(&config, args).await?; - - if let Some(path) = resume_path { + let (primary_thread_id, fallback_session_configured) = if let Some(ExecCommand::Resume(args)) = + command.as_ref() + { + match resolve_resume_target(&config, args).await? { + ResumeTarget::ResumePath(path) => { let response: ThreadResumeResponse = send_request_with_response( &client, ClientRequest::ThreadResume { @@ -566,7 +572,8 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> { let session_configured = session_configured_from_thread_resume_response(&response) .map_err(anyhow::Error::msg)?; (session_configured.session_id, session_configured) - } else { + } + ResumeTarget::StartNewThread => { let response: ThreadStartResponse = send_request_with_response( &client, ClientRequest::ThreadStart { @@ -581,21 +588,22 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> { .map_err(anyhow::Error::msg)?; (session_configured.session_id, session_configured) } - } else { - let response: ThreadStartResponse = send_request_with_response( - &client, - ClientRequest::ThreadStart { - request_id: request_ids.next(), - params: thread_start_params_from_config(&config), - }, - "thread/start", - ) - .await - .map_err(anyhow::Error::msg)?; - let session_configured = session_configured_from_thread_start_response(&response) - .map_err(anyhow::Error::msg)?; - (session_configured.session_id, session_configured) - }; + } + } else { + let response: ThreadStartResponse = send_request_with_response( + &client, + ClientRequest::ThreadStart { + request_id: request_ids.next(), + params: thread_start_params_from_config(&config), + }, + "thread/start", + ) + .await + .map_err(anyhow::Error::msg)?; + let session_configured = + session_configured_from_thread_start_response(&response).map_err(anyhow::Error::msg)?; + (session_configured.session_id, session_configured) + }; let primary_thread_id_for_span = primary_thread_id.to_string(); let mut buffered_events = VecDeque::new(); @@ -1407,10 +1415,10 @@ fn local_external_chatgpt_tokens( }) } -async fn resolve_resume_path( +async fn resolve_resume_target( config: &Config, args: &crate::cli::ResumeArgs, -) -> anyhow::Result> { +) -> anyhow::Result { if args.last { let default_provider_filter = vec![config.model_provider_id.clone()]; let filter_cwd = if args.all { @@ -1430,22 +1438,26 @@ async fn resolve_resume_path( ) .await { - Ok(path) => Ok(path), + Ok(Some(path)) => Ok(ResumeTarget::ResumePath(path)), + Ok(None) => Ok(ResumeTarget::StartNewThread), Err(e) => { error!("Error listing threads: {e}"); - Ok(None) + Ok(ResumeTarget::StartNewThread) } } } else if let Some(id_str) = args.session_id.as_deref() { - if Uuid::parse_str(id_str).is_ok() { - let path = find_thread_path_by_id_str(&config.codex_home, id_str).await?; - Ok(path) + let path = if Uuid::parse_str(id_str).is_ok() { + find_thread_path_by_id_str(&config.codex_home, id_str).await? } else { - let path = find_thread_path_by_name_str(&config.codex_home, id_str).await?; - Ok(path) + find_thread_path_by_name_str(&config.codex_home, id_str).await? + }; + + match path { + Some(path) => Ok(ResumeTarget::ResumePath(path)), + None => anyhow::bail!("Session not found: {id_str}"), } } else { - Ok(None) + Ok(ResumeTarget::StartNewThread) } } @@ -1628,6 +1640,8 @@ mod tests { use opentelemetry::trace::TracerProvider as _; use opentelemetry_sdk::trace::SdkTracerProvider; use pretty_assertions::assert_eq; + use std::io::Write; + use std::path::Path; use tempfile::tempdir; use tracing_opentelemetry::OpenTelemetrySpanExt; @@ -1637,6 +1651,43 @@ mod tests { tracing_subscriber::registry().with(tracing_opentelemetry::layer().with_tracer(tracer)) } + fn write_minimal_rollout_with_id(codex_home: &Path, id: Uuid) -> PathBuf { + let sessions = codex_home.join("sessions/2024/01/01"); + std::fs::create_dir_all(&sessions).expect("create rollout directory"); + + let file = sessions.join(format!("rollout-2024-01-01T00-00-00-{id}.jsonl")); + let mut rollout = std::fs::File::create(&file).expect("create rollout file"); + writeln!( + rollout, + "{}", + serde_json::json!({ + "timestamp": "2024-01-01T00:00:00.000Z", + "type": "session_meta", + "payload": { + "id": id, + "timestamp": "2024-01-01T00:00:00Z", + "cwd": ".", + "originator": "test", + "cli_version": "test", + "model_provider": "test-provider" + } + }) + ) + .expect("write rollout"); + + file + } + + fn make_resume_args(session_id: Option<&str>, last: bool) -> crate::cli::ResumeArgs { + crate::cli::ResumeArgs { + session_id: session_id.map(str::to_string), + last, + all: false, + images: Vec::new(), + prompt: None, + } + } + #[test] fn exec_defaults_analytics_to_enabled() { assert_eq!(DEFAULT_ANALYTICS_ENABLED, true); @@ -1723,6 +1774,47 @@ mod tests { assert_eq!(request, expected); } + #[tokio::test] + async fn resolve_resume_target_errors_for_missing_explicit_session_id() { + let codex_home = tempdir().expect("create temp codex home"); + let cwd = tempdir().expect("create temp cwd"); + let config = ConfigBuilder::default() + .codex_home(codex_home.path().to_path_buf()) + .fallback_cwd(Some(cwd.path().to_path_buf())) + .build() + .await + .expect("build default config"); + let missing_id = "00000000-0000-0000-0000-000000000000"; + let args = make_resume_args(Some(missing_id), false); + + let err = resolve_resume_target(&config, &args) + .await + .expect_err("missing explicit session id should error"); + + assert_eq!(err.to_string(), format!("Session not found: {missing_id}")); + } + + #[tokio::test] + async fn resolve_resume_target_returns_existing_explicit_session_path() { + let codex_home = tempdir().expect("create temp codex home"); + let cwd = tempdir().expect("create temp cwd"); + let config = ConfigBuilder::default() + .codex_home(codex_home.path().to_path_buf()) + .fallback_cwd(Some(cwd.path().to_path_buf())) + .build() + .await + .expect("build default config"); + let session_id = Uuid::new_v4(); + let expected = write_minimal_rollout_with_id(codex_home.path(), session_id); + let args = make_resume_args(Some(&session_id.to_string()), false); + + let target = resolve_resume_target(&config, &args) + .await + .expect("existing explicit session id should resolve"); + + assert_eq!(target, ResumeTarget::ResumePath(expected)); + } + #[test] fn decode_prompt_bytes_strips_utf8_bom() { let input = [0xEF, 0xBB, 0xBF, b'h', b'i', b'\n']; diff --git a/codex-rs/exec/tests/suite/resume.rs b/codex-rs/exec/tests/suite/resume.rs index a85183620ed..8633051ab55 100644 --- a/codex-rs/exec/tests/suite/resume.rs +++ b/codex-rs/exec/tests/suite/resume.rs @@ -104,6 +104,37 @@ fn last_user_image_count(path: &std::path::Path) -> usize { last_count } +fn session_rollout_count(home_path: &std::path::Path) -> usize { + let sessions_dir = home_path.join("sessions"); + if !sessions_dir.exists() { + return 0; + } + + WalkDir::new(sessions_dir) + .into_iter() + .filter_map(Result::ok) + .filter(|entry| entry.file_type().is_file()) + .filter(|entry| entry.file_name().to_string_lossy().ends_with(".jsonl")) + .count() +} + +fn extract_thread_started_id_from_jsonl(stdout: &[u8]) -> Option { + let stdout = std::str::from_utf8(stdout).ok()?; + for line in stdout.lines() { + if line.trim().is_empty() { + continue; + } + let item: Value = serde_json::from_str(line).ok()?; + if item.get("type").and_then(|value| value.as_str()) == Some("thread.started") { + return item + .get("thread_id") + .and_then(|value| value.as_str()) + .map(str::to_string); + } + } + None +} + fn exec_fixture() -> anyhow::Result { Ok(find_resource!("tests/fixtures/cli_responses_fixture.sse")?) } @@ -559,3 +590,103 @@ fn exec_resume_accepts_images_after_subcommand() -> anyhow::Result<()> { Ok(()) } + +#[test] +fn exec_resume_by_missing_id_fails_without_starting_new_session() -> anyhow::Result<()> { + let test = test_codex_exec(); + let fixture = exec_fixture()?; + let repo_root = exec_repo_root()?; + let missing_id = "00000000-0000-0000-0000-000000000000"; + let marker = format!("resume-missing-id-{}", Uuid::new_v4()); + let prompt = format!("echo {marker}"); + + let output = test + .cmd() + .env("CODEX_RS_SSE_FIXTURE", &fixture) + .env("OPENAI_BASE_URL", "http://unused.local") + .arg("--skip-git-repo-check") + .arg("-C") + .arg(&repo_root) + .arg("resume") + .arg(missing_id) + .arg(&prompt) + .output() + .context("resume by missing id should return an error")?; + + assert!( + !output.status.success(), + "resume by missing id unexpectedly succeeded: {output:?}" + ); + + let stderr = String::from_utf8(output.stderr)?; + assert!( + stderr.contains(&format!("Session not found: {missing_id}")), + "stderr missing not-found message: {stderr}" + ); + assert_eq!(session_rollout_count(test.home_path()), 0); + assert!( + find_session_file_containing_marker(&test.home_path().join("sessions"), &marker).is_none(), + "resume by missing id should not create a new session rollout" + ); + + Ok(()) +} + +#[test] +fn exec_resume_by_ephemeral_id_fails_without_starting_new_session() -> anyhow::Result<()> { + let test = test_codex_exec(); + let fixture = exec_fixture()?; + let repo_root = exec_repo_root()?; + let seed_output = test + .cmd() + .env("CODEX_RS_SSE_FIXTURE", &fixture) + .env("OPENAI_BASE_URL", "http://unused.local") + .arg("--skip-git-repo-check") + .arg("--ephemeral") + .arg("--json") + .arg("-C") + .arg(&repo_root) + .arg("echo ephemeral-seed") + .output() + .context("ephemeral seed run should succeed")?; + + assert!( + seed_output.status.success(), + "ephemeral seed failed: {seed_output:?}" + ); + let ephemeral_session_id = extract_thread_started_id_from_jsonl(&seed_output.stdout) + .context("missing thread.started id in ephemeral json output")?; + + let marker = format!("resume-ephemeral-id-{}", Uuid::new_v4()); + let prompt = format!("echo {marker}"); + let output = test + .cmd() + .env("CODEX_RS_SSE_FIXTURE", &fixture) + .env("OPENAI_BASE_URL", "http://unused.local") + .arg("--skip-git-repo-check") + .arg("-C") + .arg(&repo_root) + .arg("resume") + .arg(&ephemeral_session_id) + .arg(&prompt) + .output() + .context("resume by ephemeral id should return an error")?; + + assert!( + !output.status.success(), + "resume by ephemeral id unexpectedly succeeded: {output:?}" + ); + + let stderr = String::from_utf8(output.stderr)?; + assert!( + stderr.contains(&format!("Session not found: {ephemeral_session_id}")), + "stderr missing ephemeral not-found message: {stderr}" + ); + assert_eq!(session_rollout_count(test.home_path()), 0); + assert!( + find_session_file_containing_marker(&test.home_path().join("sessions"), &marker).is_none(), + "resume by ephemeral id should not create a new session rollout" + ); + + Ok(()) +}