-
Notifications
You must be signed in to change notification settings - Fork 0
添加并支持并行执行工作流的功能,允许用户通过 --parallel 参数设置并发工作流数量 #16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
2a1f338
8102011
5a97bda
e03e950
de7d0dd
b965a47
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,7 +36,7 @@ | |
| ) | ||
| from comfy_execution.graph_utils import GraphBuilder, is_link | ||
| from comfy_execution.validation import validate_node_input | ||
| from comfy_execution.progress import get_progress_state, reset_progress_state, add_progress_handler, WebUIProgressHandler | ||
| from comfy_execution.progress import get_progress_state, reset_progress_state, add_progress_handler, remove_progress_state, WebUIProgressHandler | ||
| from comfy_execution.utils import CurrentNodeContext | ||
| from comfy_api.internal import _ComfyNodeInternal, _NodeOutputInternal, first_real_override, is_class, make_locked_method_func | ||
| from comfy_api.latest import io, _io | ||
|
|
@@ -416,15 +416,15 @@ def _is_intermediate_output(dynprompt, node_id): | |
| class_def = nodes.NODE_CLASS_MAPPINGS[class_type] | ||
| return getattr(class_def, 'HAS_INTERMEDIATE_OUTPUT', False) | ||
|
|
||
| def _send_cached_ui(server, node_id, display_node_id, cached, prompt_id, ui_outputs): | ||
| if server.client_id is None: | ||
| def _send_cached_ui(server, node_id, display_node_id, cached, prompt_id, ui_outputs, client_id): | ||
| if client_id is None: | ||
| return | ||
| cached_ui = cached.ui or {} | ||
| server.send_sync("executed", { "node": node_id, "display_node": display_node_id, "output": cached_ui.get("output", None), "prompt_id": prompt_id }, server.client_id) | ||
| server.send_sync("executed", { "node": node_id, "display_node": display_node_id, "output": cached_ui.get("output", None), "prompt_id": prompt_id }, client_id) | ||
| if cached.ui is not None: | ||
| ui_outputs[node_id] = cached.ui | ||
|
|
||
| async def execute(server, dynprompt, caches, current_item, extra_data, executed, prompt_id, execution_list, pending_subgraph_results, pending_async_nodes, ui_outputs): | ||
| async def execute(server, dynprompt, caches, current_item, extra_data, executed, prompt_id, execution_list, pending_subgraph_results, pending_async_nodes, ui_outputs, client_id=None): | ||
| unique_id = current_item | ||
| real_node_id = dynprompt.get_real_node_id(unique_id) | ||
| display_node_id = dynprompt.get_display_node_id(unique_id) | ||
|
|
@@ -434,8 +434,8 @@ async def execute(server, dynprompt, caches, current_item, extra_data, executed, | |
| class_def = nodes.NODE_CLASS_MAPPINGS[class_type] | ||
| cached = await caches.outputs.get(unique_id) | ||
| if cached is not None: | ||
| _send_cached_ui(server, unique_id, display_node_id, cached, prompt_id, ui_outputs) | ||
| get_progress_state().finish_progress(unique_id) | ||
| _send_cached_ui(server, unique_id, display_node_id, cached, prompt_id, ui_outputs, client_id) | ||
| get_progress_state(prompt_id).finish_progress(unique_id) | ||
| execution_list.cache_update(unique_id, cached) | ||
| return (ExecutionResult.SUCCESS, None, None) | ||
|
|
||
|
|
@@ -478,11 +478,11 @@ async def execute(server, dynprompt, caches, current_item, extra_data, executed, | |
| del pending_subgraph_results[unique_id] | ||
| has_subgraph = False | ||
| else: | ||
| get_progress_state().start_progress(unique_id) | ||
| get_progress_state(prompt_id).start_progress(unique_id) | ||
| input_data_all, missing_keys, v3_data = get_input_data(inputs, class_def, unique_id, execution_list, dynprompt, extra_data) | ||
| if server.client_id is not None: | ||
| if client_id is not None: | ||
| server.last_node_id = display_node_id | ||
| server.send_sync("executing", { "node": unique_id, "display_node": display_node_id, "prompt_id": prompt_id }, server.client_id) | ||
| server.send_sync("executing", { "node": unique_id, "display_node": display_node_id, "prompt_id": prompt_id }, client_id) | ||
|
|
||
| obj = await caches.objects.get(unique_id) | ||
| if obj is None: | ||
|
|
@@ -522,7 +522,7 @@ def execution_block_cb(block): | |
| "current_inputs": [], | ||
| "current_outputs": [], | ||
| } | ||
| server.send_sync("execution_error", mes, server.client_id) | ||
| server.send_sync("execution_error", mes, client_id) | ||
| return ExecutionBlocker(None) | ||
| else: | ||
| return block | ||
|
|
@@ -558,8 +558,8 @@ async def await_completion(): | |
| }, | ||
| "output": output_ui | ||
| } | ||
| if server.client_id is not None: | ||
| server.send_sync("executed", { "node": unique_id, "display_node": display_node_id, "output": output_ui, "prompt_id": prompt_id }, server.client_id) | ||
| if client_id is not None: | ||
| server.send_sync("executed", { "node": unique_id, "display_node": display_node_id, "output": output_ui, "prompt_id": prompt_id }, client_id) | ||
| if has_subgraph: | ||
| cached_outputs = [] | ||
| new_node_ids = [] | ||
|
|
@@ -640,7 +640,7 @@ async def await_completion(): | |
|
|
||
| return (ExecutionResult.FAILURE, error_details, ex) | ||
|
|
||
| get_progress_state().finish_progress(unique_id) | ||
| get_progress_state(prompt_id).finish_progress(unique_id) | ||
| executed.add(unique_id) | ||
|
|
||
| return (ExecutionResult.SUCCESS, None, None) | ||
|
|
@@ -656,15 +656,16 @@ def reset(self): | |
| self.caches = CacheSet(cache_type=self.cache_type, cache_args=self.cache_args) | ||
| self.status_messages = [] | ||
| self.success = True | ||
| self.client_id = None | ||
|
|
||
| def add_message(self, event, data: dict, broadcast: bool): | ||
| data = { | ||
| **data, | ||
| "timestamp": int(time.time() * 1000), | ||
| } | ||
| self.status_messages.append((event, data)) | ||
| if self.server.client_id is not None or broadcast: | ||
| self.server.send_sync(event, data, self.server.client_id) | ||
| if self.client_id is not None or broadcast: | ||
| self.server.send_sync(event, data, self.client_id) | ||
|
|
||
| def handle_execution_error(self, prompt_id, prompt, current_outputs, executed, error, ex): | ||
| node_id = error["node_id"] | ||
|
|
@@ -712,13 +713,11 @@ def execute(self, prompt, prompt_id, extra_data={}, execute_outputs=[]): | |
|
|
||
| async def execute_async(self, prompt, prompt_id, extra_data={}, execute_outputs=[]): | ||
| set_preview_method(extra_data.get("preview_method")) | ||
| # Register before node execution starts so targeted/global interrupts can see this prompt immediately. | ||
| comfy.model_management.register_active_prompt(prompt_id) | ||
|
|
||
| nodes.interrupt_processing(False) | ||
|
|
||
| if "client_id" in extra_data: | ||
| self.server.client_id = extra_data["client_id"] | ||
| else: | ||
| self.server.client_id = None | ||
| self.client_id = extra_data.get("client_id", None) | ||
| self.server.client_id = self.client_id | ||
|
Comment on lines
+719
to
+720
|
||
|
|
||
| self.status_messages = [] | ||
| self.add_message("execution_start", { "prompt_id": prompt_id}, broadcast=False) | ||
|
|
@@ -731,8 +730,8 @@ async def execute_async(self, prompt, prompt_id, extra_data={}, execute_outputs= | |
| try: | ||
| with torch.inference_mode(): | ||
| dynamic_prompt = DynamicPrompt(prompt) | ||
| reset_progress_state(prompt_id, dynamic_prompt) | ||
| add_progress_handler(WebUIProgressHandler(self.server)) | ||
| reset_progress_state(prompt_id, dynamic_prompt, client_id=self.client_id) | ||
| add_progress_handler(WebUIProgressHandler(self.server, client_id=self.client_id), prompt_id=prompt_id) | ||
| is_changed_cache = IsChangedCache(prompt_id, dynamic_prompt, self.caches.outputs) | ||
| for cache in self.caches.all: | ||
| await cache.set_prompt(dynamic_prompt, prompt.keys(), is_changed_cache) | ||
|
|
@@ -767,7 +766,7 @@ async def execute_async(self, prompt, prompt_id, extra_data={}, execute_outputs= | |
| break | ||
|
|
||
| assert node_id is not None, "Node ID should not be None at this point" | ||
| result, error, ex = await execute(self.server, dynamic_prompt, self.caches, node_id, extra_data, executed, prompt_id, execution_list, pending_subgraph_results, pending_async_nodes, ui_node_outputs) | ||
| result, error, ex = await execute(self.server, dynamic_prompt, self.caches, node_id, extra_data, executed, prompt_id, execution_list, pending_subgraph_results, pending_async_nodes, ui_node_outputs, client_id=self.client_id) | ||
| self.success = result != ExecutionResult.FAILURE | ||
| if result == ExecutionResult.FAILURE: | ||
| self.handle_execution_error(prompt_id, dynamic_prompt.original_prompt, current_outputs, executed, error, ex) | ||
|
|
@@ -791,7 +790,7 @@ async def execute_async(self, prompt, prompt_id, extra_data={}, execute_outputs= | |
| cached = await self.caches.outputs.get(node_id) | ||
| if cached is not None: | ||
| display_node_id = dynamic_prompt.get_display_node_id(node_id) | ||
| _send_cached_ui(self.server, node_id, display_node_id, cached, prompt_id, ui_node_outputs) | ||
| _send_cached_ui(self.server, node_id, display_node_id, cached, prompt_id, ui_node_outputs, client_id=self.client_id) | ||
| self.add_message("execution_success", { "prompt_id": prompt_id }, broadcast=False) | ||
|
|
||
| ui_outputs = {} | ||
|
|
@@ -809,6 +808,9 @@ async def execute_async(self, prompt, prompt_id, extra_data={}, execute_outputs= | |
| finally: | ||
| comfy.memory_management.set_ram_cache_release_state(None, 0) | ||
| self._notify_prompt_lifecycle("end", prompt_id) | ||
| remove_progress_state(prompt_id) | ||
| # Drop prompt-scoped interrupt state once this execution is fully finished. | ||
| comfy.model_management.unregister_active_prompt(prompt_id) | ||
|
|
||
|
|
||
| async def validate_inputs(prompt_id, prompt, item, validated): | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
set_progress()derivesprompt_idfromexecuting_context, but when there is no executing context it becomesNoneand the code still callsget_progress_state(prompt_id). With the new per-prompt registry design this risks updating the wrong prompt’s registry (or a dummy/no-op registry) and makes behavior dependent on internal fallback logic. Consider requiring an executing context (raise if missing), or extending the API to accept an explicitprompt_idand using that instead ofNone.