From 095a7579fc79b9a027e21a1af9f0ca073fd00629 Mon Sep 17 00:00:00 2001 From: SimonTaurus Date: Wed, 29 Apr 2026 05:37:33 +0200 Subject: [PATCH 1/3] refactor: extract reusable Prefect+OSW utils from examples - Add ConnectionSettings, connect(), WorkflowRequest, and register_flow() to osw.utils.workflow for reuse across Prefect workflows - register_flow() creates Software + PrefectFlow entities on OSW, cleans up stale deployments, and writes usage snippet to wiki page - Integrate OSW registration into deploy() via optional osw parameter - Simplify hello_world.py to use workflow utils instead of inline code - Add opensemantic.base to dependencies --- .pre-commit-config.yaml | 3 +- examples/prefect/hello_world.py | 183 ++++++++-------------- setup.cfg | 3 +- src/osw/core.py | 12 ++ src/osw/model/entity.py | 5 + src/osw/utils/workflow.py | 259 +++++++++++++++++++++++++++++++- 6 files changed, 337 insertions(+), 128 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 09dcaf51..200f2342 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -24,8 +24,7 @@ repos: hooks: - id: autoflake args: [ -# --in-place, # Use this to modify the files in place, without printing diffs, as opposed to --stdout - --stdout, + --in-place, --remove-all-unused-imports, --remove-unused-variables, ] diff --git a/examples/prefect/hello_world.py b/examples/prefect/hello_world.py index 564b8765..e693ed7b 100644 --- a/examples/prefect/hello_world.py +++ b/examples/prefect/hello_world.py @@ -1,86 +1,53 @@ -import asyncio -import uuid -from os import environ from typing import Optional from uuid import UUID, uuid4 -from prefect import flow, get_client, task -from prefect.blocks.system import Secret -from pydantic.v1 import Field +from opensemantic.base.v1 import Article +from prefect import flow, task +from pydantic.v1 import BaseModel, Field import osw.model.entity as model -from osw.auth import CredentialManager from osw.core import OSW -from osw.utils.wiki import get_full_title +from osw.utils.workflow import ( + ConnectionSettings, + DeployConfig, + DeployParam, + WorkflowRequest, + connect, + deploy, +) from osw.wtsite import WtSite - -class ConnectionSettings(model.OswBaseModel): - """Connection data for OSW""" - - osw_user_name: Optional[str] - """The login username. - Note: value of envar OSW_USER used of not given - Note: value of envar OSW_PASSWORD used for login""" - osw_domain: Optional[str] - """The domain of the instance - Note: value of envar OSW_SERVER used of not given""" +# Module-level OSW instance, set by the connect_osw task +osw: Optional[OSW] = None @task -def connect(settings: Optional[ConnectionSettings] = None): - """Initiates the connection to the OSW instance - - Parameters - ---------- - settings - see ConnectionSetttings dataclass - """ - if settings is None: - settings = ConnectionSettings() - global wtsite - # define username - if environ.get("OSW_USER") is not None and environ.get("OSW_USER") != "": - settings.osw_user_name = environ.get("OSW_USER") - if environ.get("OSW_SERVER") is not None and environ.get("OSW_SERVER") != "": - settings.osw_domain = environ.get("OSW_SERVER") - password = "" - if environ.get("OSW_PASSWORD") is not None and environ.get("OSW_PASSWORD") != "": - password = environ.get("OSW_PASSWORD") - else: - # fetch secret stored in prefect server from calculated name - password = Secret.load( - settings.osw_user_name.lower() + "-" + settings.osw_domain.replace(".", "-") - ).get() # e. g. mybot-wiki-dev-open-semantic-lab-org - cm = CredentialManager() - cm.add_credential( - CredentialManager.UserPwdCredential( - iri=settings.osw_domain, username=settings.osw_user_name, password=password - ) - ) - wtsite = WtSite(WtSite.WtSiteConfig(iri=settings.osw_domain, cred_mngr=cm)) +def connect_osw(settings: Optional[ConnectionSettings] = None): + """Initiates the connection to the OSW instance""" global osw - osw = OSW(site=wtsite) + osw = connect(settings) @task def fetch_schema(): - """this will load the current entity schema from the OSW instance.""" - # Load Article Schema on demand - if not hasattr(model, "Article"): - osw.fetch_schema( - OSW.FetchSchemaParam( - schema_title=[ - "Category:OSW77e749fc598341ac8b6d2fff21574058", # Software - "Category:OSW72eae3c8f41f4a22a94dbc01974ed404", # PrefectFlow - "Category:OSW92cc6b1a2e6b4bb7bad470dfdcfdaf26", # Article - ], - mode="replace", - ) - ) + """Fetch custom schemas not yet available in packages. + + Software, PrefectFlow, and Article are already provided by + opensemantic.base, so no fetch is needed for this example. + Uncomment and adapt the code below if your workflow uses + schemas that are only available on the OSW instance. + """ + # osw.fetch_schema( + # OSW.FetchSchemaParam( + # schema_title=[ + # "Category:OSW...", # your custom category + # ], + # mode="replace", + # ) + # ) -class Result(model.OswBaseModel): +class Result(BaseModel): """The result dataclass""" uuid: Optional[UUID] = Field(default_factory=uuid4, title="UUID") @@ -106,15 +73,21 @@ def store_and_document_result(result: Result): title = result.target_title else: title = "Item:" + osw.get_osw_id(result.uuid) - entity = osw.load_entity(title) + entity = osw.load_entity( + OSW.LoadEntityParam( + titles=[title], + autofetch_schema=False, + model_to_use=Article, + ) + ).entities[0] if entity is None: # does not exist yet - create a new one - entity = model.Article( + entity = Article( uuid=result.uuid, label=[model.Label(text="Article for dummy workflow")] ) # edit structured data - entity = entity.cast(model.Article) + entity = entity.cast(Article) entity.description = [model.Description(text="some descriptive text")] osw.store_entity(entity) @@ -127,13 +100,9 @@ def store_and_document_result(result: Result): print("FINISHED") -class Request(model.OswBaseModel): - uuid: UUID = Field(default_factory=uuid4, title="UUID") - """UUIDv4 of the request.""" - osw_domain: Optional[str] = "wiki-dev.open-semantic-lab.org" - """To domain of the OSW instance""" - subject: Optional[str] = "Item:OSW56f9439d43244fe7a83163bab9414ee1" - """Where to store the results. For testing, we use a static default value""" +class Request(WorkflowRequest): + """Request for the dummy workflow.""" + msg: Optional[str] = "test message" """The message you want to leave on the target page""" @@ -150,56 +119,24 @@ def dummy_workflow(request: Request): request see Request dataclass """ - connect(ConnectionSettings(osw_domain=request.osw_domain)) + connect_osw(ConnectionSettings(osw_domain=request.osw_domain)) fetch_schema() store_and_document_result(Result(msg=request.msg, target_title=request.subject)) -async def deploy(): - """programmatic deployment supported in newer prefect versions""" - flow = dummy_workflow - # flow_name = flow.name - deployment_name = flow.name + " Deployment" - - # create a deployment and apply it - config = await flow.to_deployment(name=deployment_name) - await config.apply() # returns the deployment_uuid - - # fetch flow uuid - async with get_client() as client: - response = await client.read_flow_by_name(flow.name) - print(response.json()) - flow_uuid = response.id - - await connect() - await fetch_schema() - # static UUIDv5 namespace for a stable UUID - namespace_uuid = uuid.UUID("0dd6c54a-b162-4552-bab9-9942ccaf4f41") - - # self-documentation / registration - this_tool = model.Software( - uuid=uuid.uuid5(namespace_uuid, flow.name), - label=[model.Label(text=flow.name)], - description=[model.Description(text=flow.description)], - ) - - prefect_domain = environ.get("PREFECT_API_URL").split("//")[-1].split("/")[0] - this_flow = model.PrefectFlow( - uuid=flow_uuid, - label=[model.Label(text=flow.name + " Prefect Flow")], - description=[model.Description(text=flow.description)], - flow_id=str(flow_uuid), - hosted_software=[get_full_title(this_tool)], - domain=prefect_domain, - ) - - osw.store_entity(osw.StoreEntityParam(entities=[this_tool, this_flow])) - - # start agent to serve deployment - await dummy_workflow.serve(name=deployment_name) - - if __name__ == "__main__": - # dummy_workflow(Request(msg="Test")) - with asyncio.Runner() as runner: - runner.run(deploy()) + # Direct run: dummy_workflow(Request(msg="Test")) + + # Deploy and serve with OSW registration + osw_instance = connect() + deploy( + DeployParam( + deployments=[ + DeployConfig( + flow=dummy_workflow, + name="Dummy Workflow Deployment", + ) + ], + osw=osw_instance, + ) + ) diff --git a/setup.cfg b/setup.cfg index 8cbf96f5..9c27d4c1 100644 --- a/setup.cfg +++ b/setup.cfg @@ -50,7 +50,8 @@ python_requires = >=3.10 install_requires = oold>=0.11.1 opensemantic - opensemantic.core>=0.53.1 + opensemantic.core>=0.57.5 + opensemantic.base>=0.42.7 pydantic[email]>=1.10.17 datamodel-code-generator==0.51.0 black diff --git a/src/osw/core.py b/src/osw/core.py index db9bb4e1..775f8495 100644 --- a/src/osw/core.py +++ b/src/osw/core.py @@ -683,6 +683,8 @@ def _fetch_schema( use_title_as_name=True, use_schema_description=True, use_field_description=True, + # https://github.com/koxudaxi/datamodel-code-generator/issues/2447 + # use_standard_collections=data_model_type != "pydantic.BaseModel", encoding="utf-8", use_double_quotes=True, collapse_root_models=True, @@ -879,6 +881,16 @@ def _fetch_schema( "\n" ) + # import Software, PrefectWorkflow from base + if data_model_type == "pydantic.BaseModel": + header += ( + "from opensemantic.base.v1 import Software, PrefectFlow\n" + ) + else: + header += ( + "from opensemantic.base import Software, PrefectFlow\n" + ) + content = re.sub( pattern=r"(^class\s*\S*\s*\(\s*[\S\s]*?\s*\)\s*:.*\n)", repl=header + r"\n\n\n\1", diff --git a/src/osw/model/entity.py b/src/osw/model/entity.py index c1ed735b..1782ec0f 100644 --- a/src/osw/model/entity.py +++ b/src/osw/model/entity.py @@ -21,3 +21,8 @@ WikiFile, PagePackage, ) # noqa: F401, E402 + +from opensemantic.base.v1 import ( # isort:skip + Software, + PrefectFlow, +) # noqa: F401, E402 diff --git a/src/osw/utils/workflow.py b/src/osw/utils/workflow.py index a9be8dbb..acc040c8 100644 --- a/src/osw/utils/workflow.py +++ b/src/osw/utils/workflow.py @@ -3,20 +3,104 @@ import asyncio import re import sys +import uuid as uuid_module from datetime import timedelta from importlib.metadata import version from inspect import signature +from os import environ from typing import Any, Dict, Iterable, List, Optional, Union +from uuid import UUID from packaging.specifiers import SpecifierSet -from prefect import Flow, serve +from prefect import Flow, get_client, serve from prefect.blocks.notifications import MicrosoftTeamsWebhook +from prefect.blocks.system import Secret from prefect.client.schemas.objects import FlowRun from prefect.settings import PREFECT_API_URL from prefect.states import State from pydantic import SecretStr from pydantic.v1 import BaseModel +import osw.model.entity as model +from osw.auth import CredentialManager +from osw.core import OSW +from osw.utils.wiki import get_full_title +from osw.wtsite import WtSite + + +# ------------------------------ CONNECTION --------------------- +class ConnectionSettings(BaseModel): + """Connection data for OSW. + + If osw_user_name or osw_domain are not provided, they are read from + environment variables OSW_USER and OSW_SERVER respectively. + """ + + osw_user_name: Optional[str] = None + """The login username. Falls back to env var OSW_USER.""" + osw_domain: Optional[str] = None + """The domain of the instance. Falls back to env var OSW_SERVER.""" + + +def connect(settings: Optional[ConnectionSettings] = None) -> OSW: + """Connect to an OSW instance. + + Reads OSW_USER, OSW_SERVER, OSW_PASSWORD from environment variables. + If OSW_PASSWORD is not set, falls back to a Prefect Secret named + ``-`` (e.g. ``mybot-wiki-dev-open-semantic-lab-org``). + + Parameters + ---------- + settings + Optional connection settings. If None, all values come from env vars. + + Returns + ------- + OSW + A connected OSW instance. + """ + if settings is None: + settings = ConnectionSettings() + if environ.get("OSW_USER"): + settings.osw_user_name = environ["OSW_USER"] + if environ.get("OSW_SERVER"): + settings.osw_domain = environ["OSW_SERVER"] + password = environ.get("OSW_PASSWORD", "") + if not password: + # fetch secret stored in prefect server from calculated name + password = Secret.load( + settings.osw_user_name.lower() + "-" + settings.osw_domain.replace(".", "-") + ).get() + cm = CredentialManager() + cm.add_credential( + CredentialManager.UserPwdCredential( + iri=settings.osw_domain, + username=settings.osw_user_name, + password=password, + ) + ) + wtsite = WtSite(WtSite.WtSiteConfig(iri=settings.osw_domain, cred_mngr=cm)) + return OSW(site=wtsite) + + +# ------------------------------ REQUESTS --------------------- +class WorkflowRequest(BaseModel): + """Base request model for OSW Prefect workflows. + + Provides common fields for provenance tracking and targeting. + Extend this class with workflow-specific fields. + """ + + uuid: UUID = None + """UUIDv4 of the request (for provenance tracking).""" + osw_domain: Optional[str] = None + """The domain of the OSW instance. Falls back to env var OSW_SERVER.""" + subject: Optional[str] = None + """The target page title to associate results with.""" + + class Config: + arbitrary_types_allowed = True + # ------------------------------ NOTIFICATIONS --------------------- class NotifyTeamsParam(BaseModel): @@ -137,7 +221,170 @@ class DeployParam(BaseModel): # TODO: Implement remove_existing_deployments remove_existing_deployments: Optional[bool] = False """Will remove existing deployments of the specified flows/software""" - # TODO: Add parameter for OSW support in next version + osw: Optional[OSW] = None + """If provided, deployed flows are registered as Software + PrefectFlow + entities on the OSW instance.""" + namespace_uuid: Optional[UUID] = None + """Static UUIDv5 namespace for stable Software entity UUIDs. + If None, a default namespace is used.""" + + class Config: + arbitrary_types_allowed = True + + +# Default UUIDv5 namespace for flow registration +_DEFAULT_NAMESPACE_UUID = uuid_module.UUID("0dd6c54a-b162-4552-bab9-9942ccaf4f41") + + +async def register_flow( + osw_instance: OSW, + flow: Flow, + namespace_uuid: Optional[UUID] = None, +) -> None: + """Register a Prefect flow as Software + PrefectFlow entities on OSW. + + Creates or updates: + - A Software entity with a stable UUID (uuid5 from flow name) + - A PrefectFlow entity with the UUID from the Prefect API + + Parameters + ---------- + osw_instance + A connected OSW instance. + flow + The Prefect Flow object to register. + namespace_uuid + UUIDv5 namespace for generating the Software entity UUID. + If None, uses a default namespace. + """ + if namespace_uuid is None: + namespace_uuid = _DEFAULT_NAMESPACE_UUID + + # ensure required schemas are loaded (needed if entity.py is regenerated) + if not hasattr(model, "Software") or not hasattr(model, "PrefectFlow"): + osw_instance.fetch_schema( + OSW.FetchSchemaParam( + schema_title=[ + "Category:OSW77e749fc598341ac8b6d2fff21574058", # Software + "Category:OSW72eae3c8f41f4a22a94dbc01974ed404", # PrefectFlow + ], + mode="replace", + ) + ) + + # fetch flow uuid from Prefect API + async with get_client() as client: + response = await client.read_flow_by_name(flow.name) + flow_uuid = response.id + + # create Software entity + this_tool = model.Software( + uuid=uuid_module.uuid5(namespace_uuid, flow.name), + label=[model.Label(text=flow.name)], + description=[model.Description(text=flow.description or "")], + ) + + # create PrefectFlow entity + prefect_api_url = environ.get("PREFECT_API_URL", "") + prefect_domain = prefect_api_url.split("//")[-1].split("/")[0] + this_flow = model.PrefectFlow( + uuid=flow_uuid, + label=[model.Label(text=flow.name + " Prefect Flow")], + description=[model.Description(text=flow.description or "")], + flow_id=str(flow_uuid), + hosted_software=[get_full_title(this_tool)], + domain=prefect_domain, + ) + + # delete stale PrefectFlow entities that reference the same Software + # (avoids confusing the prefect.js reverse lookup) + software_title = get_full_title(this_tool) + stale_titles = osw_instance.site.semantic_search(f"[[Hosts::{software_title}]]") + new_flow_title = "Item:" + OSW.get_osw_id(flow_uuid) + stale_entities = [] + for title in stale_titles: + if title != new_flow_title: + entity = osw_instance.load_entity(title) + if entity is not None: + stale_entities.append(entity) + if stale_entities: + for e in stale_entities: + print( + f"WARNING: Deleting stale PrefectFlow entity " + f"'{get_full_title(e)}' (superseded by '{new_flow_title}')" + ) + osw_instance.delete_entity( + stale_entities, + comment="Replaced by updated PrefectFlow deployment", + ) + + osw_instance.store_entity(OSW.StoreEntityParam(entities=[this_tool, this_flow])) + + # build parameters template from flow function signature + # includes all fields so users can see and customize them + import inspect + import json + + sig = inspect.signature(flow.fn) + params_template = {} + for name, param in sig.parameters.items(): + if param.default is not inspect.Parameter.empty: + default = param.default + if hasattr(default, "dict"): + default = default.dict() + params_template[name] = default + elif param.annotation is not inspect.Parameter.empty: + ann = param.annotation + if hasattr(ann, "__fields__"): + # pydantic model — show all fields with defaults or placeholders + field_values = {} + for fname, field in ann.__fields__.items(): + if field.default_factory is not None: + field_values[fname] = f"<{fname}>" + elif field.default is not None: + field_values[fname] = field.default + else: + field_values[fname] = f"<{fname}>" + params_template[name] = field_values + else: + params_template[name] = f"<{name}>" + else: + params_template[name] = f"<{name}>" + + params_json = json.dumps(params_template, default=str) + + # replace known values with wiki/mustache template variables + wiki_var_replacements = { + '""': '"{{uuid}}"', + '""': '"{{FULLPAGENAME}}"', + '""': '"{{SERVERNAME}}"', + } + for placeholder, wiki_var in wiki_var_replacements.items(): + params_json = params_json.replace(placeholder, wiki_var) + + # write usage template snippet to the Software page's main slot + snippet = ( + '
" + ) + usage_text = ( + "== Usage ==\n" + "Insert this snippet in a page to trigger this workflow:\n" + "
\n"
+        f"{snippet}\n"
+        "
\n" + ) + print(f"DEBUG: Writing usage template to '{software_title}':\n{snippet}") + + page = osw_instance.site.get_page( + WtSite.GetPageParam(titles=[software_title]) + ).pages[0] + page.set_slot_content(slot_key="main", content=usage_text) + page.edit() async def _deploy(param: DeployParam): @@ -161,6 +408,14 @@ async def _deploy(param: DeployParam): deployments.append(config) + # Register flow on OSW if an instance is provided + if param.osw is not None: + await register_flow( + osw_instance=param.osw, + flow=flow, + namespace_uuid=param.namespace_uuid, + ) + if version("prefect") in SpecifierSet(">=3.0"): print(f"prefect version IF: {version('prefect')}") # return deployments From e4918581845eadcc7fb6ab1544bd39a7bea419d5 Mon Sep 17 00:00:00 2001 From: SimonTaurus Date: Wed, 29 Apr 2026 08:54:54 +0200 Subject: [PATCH 2/3] fix: opensemantic.core>=0.57.4 --- setup.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index 9c27d4c1..99b4e788 100644 --- a/setup.cfg +++ b/setup.cfg @@ -50,7 +50,7 @@ python_requires = >=3.10 install_requires = oold>=0.11.1 opensemantic - opensemantic.core>=0.57.5 + opensemantic.core>=0.57.4 opensemantic.base>=0.42.7 pydantic[email]>=1.10.17 datamodel-code-generator==0.51.0 From 491f8bcc023e034986d45299cc4056151e9f43ca Mon Sep 17 00:00:00 2001 From: SimonTaurus Date: Wed, 29 Apr 2026 09:39:32 +0200 Subject: [PATCH 3/3] fix: prefect>=2.20.25,<3.0 --- setup.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index 99b4e788..82e62d10 100644 --- a/setup.cfg +++ b/setup.cfg @@ -99,7 +99,7 @@ dataimport = UI = pysimplegui workflow = - prefect==2.20.0 + prefect>=2.20.25,<3.0 tutorial = %(dataimport)s all =