diff --git a/src/extensions/score_metamodel/__init__.py b/src/extensions/score_metamodel/__init__.py index 557e7b974..3dd0c7726 100644 --- a/src/extensions/score_metamodel/__init__.py +++ b/src/extensions/score_metamodel/__init__.py @@ -294,6 +294,7 @@ def postprocess_need_links(needs_types_list: list[ScoreNeedType]): def setup(app: Sphinx) -> dict[str, str | bool]: app.add_config_value("external_needs_source", "", rebuild="env") app.add_config_value("score_metamodel_yaml", "", rebuild="env") + app.add_config_value("unmutable_options", [], rebuild="env") config_setdefault(app.config, "needs_id_required", True) config_setdefault(app.config, "needs_id_regex", "^[A-Za-z0-9_-]{6,}") @@ -308,6 +309,7 @@ def setup(app: Sphinx) -> dict[str, str | bool]: app.config.needs_fields.update(metamodel.needs_fields) app.config.graph_checks = metamodel.needs_graph_check app.config.prohibited_words_checks = metamodel.prohibited_words_checks + app.config.unmutable_options = metamodel.unmutable_options # app.config.stop_words = metamodel["stop_words"] # app.config.weak_words = metamodel["weak_words"] diff --git a/src/extensions/score_metamodel/checks/check_needs_extends.py b/src/extensions/score_metamodel/checks/check_needs_extends.py new file mode 100644 index 000000000..6bfd21f1c --- /dev/null +++ b/src/extensions/score_metamodel/checks/check_needs_extends.py @@ -0,0 +1,229 @@ +# ******************************************************************************* +# Copyright (c) 2026 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# ******************************************************************************* +from __future__ import annotations + +import sphinx_needs.directives.need +from docutils import nodes +from sphinx_needs.config import NeedsSphinxConfig +from sphinx_needs.data import ExtendType, NeedsExtendType, NeedsMutable +from sphinx_needs.exceptions import NeedsInvalidFilter +from sphinx_needs.filter_common import filter_needs_mutable +from sphinx_needs.logging import get_logger, log_warning +from sphinx_needs.need_item import NeedModification +from sphinx_needs.needs_schema import ( + FieldFunctionArray, + FieldLiteralValue, + LinksFunctionArray, + LinksLiteralValue, +) + + +class Needextend(nodes.General, nodes.Element): + pass + + +logger = get_logger(__name__) + + +def score_extend_needs_data_func( + all_needs: NeedsMutable, + extends: dict[str, NeedsExtendType], + needs_config: NeedsSphinxConfig, +) -> None: + """Use data gathered from needextend directives to modify fields of existing needs.""" + + # Sort by (docname, lineno) to ensure deterministic ordering, + # regardless of parallel build worker completion order. + sorted_extends = sorted(extends.values(), key=lambda x: (x["docname"], x["lineno"])) + + current_needextend: NeedsExtendType + raise RuntimeError("TESING replacement func") + for current_needextend in sorted_extends: + need_filter = current_needextend["filter"] + location = (current_needextend["docname"], current_needextend["lineno"]) + if current_needextend["filter_is_id"]: + try: + found_needs = [all_needs[need_filter]] + except KeyError: + error = f"Provided id {need_filter!r} for needextend does not exist." + if current_needextend["strict"]: + raise NeedsInvalidFilter(error) + log_warning(logger, error, "needextend", location=location) + continue + else: + try: + found_needs = filter_needs_mutable( + all_needs, + needs_config, + need_filter, + location=location, + origin_docname=current_needextend["docname"], + ) + except Exception as e: + log_warning( + logger, + f"Invalid filter {need_filter!r}: {e}", + "needextend", + location=location, + ) + continue + for found_need in found_needs: + # Work in the stored needs, not on the search result + need = all_needs[found_need["id"]] + need.add_modification( + NeedModification( + docname=current_needextend["docname"], + lineno=current_needextend["lineno"], + ) + ) + + location = ( + current_needextend["docname"], + current_needextend["lineno"], + ) + + for option_name, etype, link_value in current_needextend[ + "list_modifications" + ]: + # append link = ok?! + # replace / remove link = nope + # set option = ok + # replace / remove option = nope + match (etype, link_value): + case (ExtendType.APPEND, LinksLiteralValue()): + if (df := need._dynamic_fields.get(option_name)) is not None: + need._dynamic_fields[option_name] = LinksFunctionArray( + (*df.value, *link_value.value) + ) + need[option_name] = [] + else: + existing = need.get_links(option_name, as_str=False) + need[option_name] = [ + *existing, + *( # keep unique + v for v in link_value.value if v not in existing + ), + ] + case (ExtendType.APPEND, LinksFunctionArray()): + if (df := need._dynamic_fields.get(option_name)) is not None: + need._dynamic_fields[option_name] = LinksFunctionArray( + ( # keep unique + *df.value, + *(v for v in link_value.value if v not in df.value), + ) + ) + need[option_name] = [] + else: + existing = need.get_links(option_name, as_str=False) + need._dynamic_fields[option_name] = LinksFunctionArray( + ( + *existing, + *( # keep unique + v for v in link_value.value if v not in existing + ), + ) + ) + need[option_name] = [] + case (ExtendType.REPLACE | ExtendType.DELETE, LinksLiteralValue()): + error_msg = ( + "Replace or Delete action is not allowed via needextends." + ) + log_warning(logger, error_msg, "needextend", location=location) + raise RuntimeError(f"{location}: {error_msg}") + case (ExtendType.REPLACE | ExtendType.DELETE, LinksFunctionArray()): + error_msg = ( + "Replace or Delete action is not allowed via needextends." + ) + log_warning(logger, error_msg, "needextend", location=location) + raise RuntimeError(f"{location}: {error_msg}") + case other_link: + raise RuntimeError( + f"Unhandled case {other_link} for {option_name!r}" + ) + + for option_name, etype, field_value in current_needextend["modifications"]: + match (etype, field_value): + case (ExtendType.APPEND, FieldLiteralValue()): + if (df := need._dynamic_fields.get(option_name)) is not None: + need._dynamic_fields[option_name] = ( + FieldFunctionArray((*df.value, *field_value.value)) + if isinstance(field_value.value, list) + else FieldFunctionArray((*df.value, field_value.value)) + ) + else: + if isinstance(field_value.value, list): + need[option_name] = [ + *need[option_name], + *field_value.value, + ] + elif isinstance(field_value.value, str): + need[option_name] = ( + need[option_name] + " " + field_value.value + if need[option_name] + else field_value.value + ) + else: + raise RuntimeError( + f"Cannot append non-string/array value {field_value.value!r} to field '{option_name}'" + ) + case (ExtendType.APPEND, FieldFunctionArray()): + if (df := need._dynamic_fields.get(option_name)) is not None: + need._dynamic_fields[option_name] = FieldFunctionArray( + (*df.value, *field_value.value) + ) + else: + if isinstance(need[option_name], list): + need._dynamic_fields[option_name] = FieldFunctionArray( + (*need[option_name], *field_value.value) + ) + elif isinstance(need[option_name], str): + need._dynamic_fields[option_name] = FieldFunctionArray( + ( + need[option_name], + *field_value.value, + ) + ) + else: + raise RuntimeError( + f"Cannot append non-string/array value {field_value.value!r} to field '{option_name}'" + ) + case (ExtendType.REPLACE | ExtendType.DELETE, None): + error_msg = ( + "Replace or Delete action is not allowed via needextends." + ) + log_warning(logger, error_msg, "needextend", location=location) + raise RuntimeError(f"{location}: {error_msg}") + case (ExtendType.REPLACE | ExtendType.DELETE, FieldLiteralValue()): + error_msg = ( + "Replace or Delete action is not allowed via needextends." + ) + log_warning(logger, error_msg, "needextend", location=location) + raise RuntimeError(f"{location}: {error_msg}") + case (ExtendType.REPLACE | ExtendType.DELETE, FieldFunctionArray()): + error_msg = ( + "Replace or Delete action is not allowed via needextends." + ) + log_warning(logger, error_msg, "needextend", location=location) + raise RuntimeError(f"{location}: {error_msg}") + # TODO reset need[option_name] to something sensible? + case other_field: + raise RuntimeError( + f"Unhandled case {other_field} for {option_name!r}" + ) + + +sphinx_needs.directives.need.extends_needs_data = score_extend_needs_data_func + +print("=====================================") +print("WE HAVE REPLACED THE EXTENDS FUNC") +print("=====================================") diff --git a/src/extensions/score_metamodel/checks/check_options.py b/src/extensions/score_metamodel/checks/check_options.py index 1deec28e9..2feff3876 100644 --- a/src/extensions/score_metamodel/checks/check_options.py +++ b/src/extensions/score_metamodel/checks/check_options.py @@ -291,3 +291,18 @@ def check_validity_consistency( f"valid_from ({valid_from}) >= valid_until ({valid_until})." ) log.warning_for_need(need, msg) + + +# @local_check +# def check_needextends_forbidden_options(app: Sphinx, need: NeedItem, log: CheckLogger): +# extends_data = list(SphinxNeedsData(app.env).get_or_create_extends().values()) +# dissallowed: list[str] = app.config.unmutable_options +# for needsextends in extends_data: +# location = f"{needsextends['docname']}:{needsextends['lineno']}" +# modifications = needsextends["modifications"] +# for option, _, _ in modifications: +# if option in dissallowed: +# log.warning( +# f"Needextend in document: {needsextends['docname']} modifies {option} which is not allowed", +# location, +# ) diff --git a/src/extensions/score_metamodel/metamodel.yaml b/src/extensions/score_metamodel/metamodel.yaml index 7d82bde3b..bdbcc9a4f 100644 --- a/src/extensions/score_metamodel/metamodel.yaml +++ b/src/extensions/score_metamodel/metamodel.yaml @@ -53,6 +53,12 @@ prohibited_words_checks: - thing - absolutely +unmutable_options: + options: + - safety + - security + - status + needs_types: # See metamodel.md for how to define a new need type diff --git a/src/extensions/score_metamodel/yaml_parser.py b/src/extensions/score_metamodel/yaml_parser.py index 454a502c0..f17db9074 100644 --- a/src/extensions/score_metamodel/yaml_parser.py +++ b/src/extensions/score_metamodel/yaml_parser.py @@ -13,6 +13,7 @@ """Functionality related to reading in the SCORE metamodel.yaml""" from dataclasses import dataclass +from itertools import chain from pathlib import Path from typing import Any, cast @@ -32,6 +33,7 @@ class MetaModelData: needs_types: list[ScoreNeedType] needs_links: dict[str, dict[str, str]] needs_fields: dict[str, dict[str, Any]] + unmutable_options: list[str] prohibited_words_checks: list[ProhibitedWordCheck] needs_graph_check: dict[str, object] @@ -49,6 +51,10 @@ def _parse_prohibited_words( ] +def _parse_unmutable_options(option_dict: dict[str, list[str]]) -> list[str]: + return list(chain(*option_dict.values())) + + def default_options(): """ Helper function to get a list of all default options defined by @@ -216,6 +222,7 @@ def load_metamodel_data(yaml_path: Path | None = None) -> MetaModelData: needs_types=list(needs_types.values()), needs_links=_parse_links(data.get("needs_extra_links", {})), needs_fields=_collect_all_custom_options(needs_types), + unmutable_options=_parse_unmutable_options(data.get("unmutable_options", {})), prohibited_words_checks=prohibited_words_checks, needs_graph_check=data.get("graph_checks", {}), )