From 70e10febc9615dac794e639011016cb72f208ed9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cle=CC=81ment=20Doumouro?= Date: Thu, 21 May 2026 11:01:20 +0200 Subject: [PATCH] feature(datashare-python): add activity contextual workdirs --- datashare-python/datashare_python/utils.py | 40 ++++++++++++++++++++- workers/asr-worker/asr_worker/activities.py | 13 +++---- workers/asr-worker/asr_worker/workflows.py | 7 +++- 3 files changed, 50 insertions(+), 10 deletions(-) diff --git a/datashare-python/datashare_python/utils.py b/datashare-python/datashare_python/utils.py index e827c747..88f4ea6b 100644 --- a/datashare-python/datashare_python/utils.py +++ b/datashare-python/datashare_python/utils.py @@ -502,7 +502,7 @@ def debuggable_name( return f"{uuid}-{'--'.join(displayable_file_name)}" -def activity_contextual_id( +def contextual_id( *, wf_context: bool = True, act_context: bool = False, run_context: bool = False ) -> str: contextual_id = [] @@ -519,3 +519,41 @@ def activity_contextual_id( if run_context: contextual_id.append(act_info.activity_run_id) return "-".join(contextual_id) + + +# TODO: deprecated, remove me at the next breaking +activity_contextual_id = contextual_id + + +def _contextual_path( + *, wf_context: bool = True, act_context: bool = True, run_context: bool = False +) -> Path: + act_info = activity.info() + path = [] + if not wf_context and not act_context: + raise ValueError("at least one of wf_context and act_context must be True") + if wf_context: + path = [act_info.workflow_type] + path.append(act_info.workflow_id) + if run_context: + path.append(act_info.workflow_run_id) + if act_context: + path.append(act_info.activity_type) + path.append(act_info.activity_id) + if run_context: + path.append(act_info.activity_run_id) + return Path(*path) + + +def activity_workdir( + workdir: Path, + project: str, + *, + wf_context: bool = True, + act_context: bool = True, + run_context: bool = False, +) -> Path: + ctx_path = _contextual_path( + wf_context=wf_context, act_context=act_context, run_context=run_context + ) + return workdir.joinpath(project, ctx_path) diff --git a/workers/asr-worker/asr_worker/activities.py b/workers/asr-worker/asr_worker/activities.py index 97c656e0..c0251177 100644 --- a/workers/asr-worker/asr_worker/activities.py +++ b/workers/asr-worker/asr_worker/activities.py @@ -26,8 +26,8 @@ from datashare_python.types_ import ProgressRateHandler, RawProgressHandler from datashare_python.utils import ( ActivityWithProgress, - activity_contextual_id, activity_defn, + activity_workdir, debuggable_name, safe_dir, to_raw_progress, @@ -83,9 +83,8 @@ async def search_audio_paths( ) -> list[Path]: es_client = lifespan_es_client() worker_config = cast(ASRWorkerConfig, lifespan_worker_config()) - batch_dir_name = activity_contextual_id() workdir = worker_config.workdir - output_dir = workdir / batch_dir_name + output_dir = activity_workdir(workdir, project) output_dir.mkdir(parents=True, exist_ok=True) batch_paths = [ p.relative_to(workdir) @@ -102,15 +101,14 @@ async def search_audio_paths( @activity_defn(name=PREPROCESS_ACTIVITY, progress_weight=_PREPROCESS_WEIGHT) def preprocess( - self, audio_batch: Path, config: ParakeetPreprocessorConfig + self, audio_batch: Path, project: str, config: ParakeetPreprocessorConfig ) -> list[Path]: # TODO: this shouldn't be necessary, fix this bug worker_config = cast(ASRWorkerConfig, lifespan_worker_config()) workdir = worker_config.workdir # TODO: implement caching preprocessor = Preprocessor.from_config(config) - contextual_id = activity_contextual_id() - output_dir = workdir / contextual_id + output_dir = activity_workdir(workdir, project) output_dir.mkdir(parents=True, exist_ok=True) audio_batch = workdir / audio_batch with preprocessor: @@ -136,8 +134,7 @@ async def infer( config = _INFERENCE_CONFIG_TYPE_ADAPTER.validate_python(config) worker_config = cast(ASRWorkerConfig, lifespan_worker_config()) workdir = worker_config.workdir - contextual_id = activity_contextual_id() - output_dir = workdir / project / contextual_id + output_dir = activity_workdir(workdir, project) output_dir.mkdir(parents=True, exist_ok=True) preprocessed_inputs = _LIST_OF_PATH_ADAPTER.validate_python(preprocessed_inputs) preprocessed_inputs = [workdir / p for p in preprocessed_inputs] diff --git a/workers/asr-worker/asr_worker/workflows.py b/workers/asr-worker/asr_worker/workflows.py index 60b02220..f6403a1d 100644 --- a/workers/asr-worker/asr_worker/workflows.py +++ b/workers/asr-worker/asr_worker/workflows.py @@ -44,7 +44,12 @@ async def run(self, args: ASRArgs) -> ASRResponse: task_queue=TaskQueues.IO, ) # Preprocessing - preprocess_args = zip(batch_paths, repeat(config.preprocessing), strict=False) + preprocess_args = zip( + batch_paths, + repeat(args.project), + repeat(config.preprocessing), + strict=False, + ) preprocessing_acts = ( execute_activity( ASRActivities.preprocess,