diff --git a/setup.cfg b/setup.cfg index 82e62d1..f900004 100644 --- a/setup.cfg +++ b/setup.cfg @@ -48,7 +48,7 @@ python_requires = >=3.10 # new major versions. This works if the required packages follow Semantic Versioning. # For more information, check out https://semver.org/. install_requires = - oold>=0.11.1 + oold>=0.15.0 opensemantic opensemantic.core>=0.57.4 opensemantic.base>=0.42.7 diff --git a/src/osw/auth.py b/src/osw/auth.py index b7d69b0..04134a9 100644 --- a/src/osw/auth.py +++ b/src/osw/auth.py @@ -3,34 +3,39 @@ import getpass from enum import Enum from pathlib import Path -from typing import TYPE_CHECKING, List, Optional, Union -from warnings import warn +from typing import List, Optional, Union import yaml +from oold.backend.auth import UserPwdCredential as _OoldUserPwdCredential +from oold.backend.auth import find_credential as _find_credential +from oold.backend.auth import load_credentials as _load_credentials from opensemantic.v1 import OswBaseModel from pydantic.v1 import PrivateAttr from osw.defaults import paths as default_paths -if TYPE_CHECKING: - PossibleFilePath = Path -else: - from osw.custom_types import PossibleFilePath + +def _secret_to_str(v): + """Unwrap SecretStr to plain str, pass through otherwise.""" + if hasattr(v, "get_secret_value"): + return v.get_secret_value() + return v class CredentialManager(OswBaseModel): - """Handles credentials""" + """Handles credentials. + + Delegates YAML loading and IRI matching to oold.backend.auth, + adding osw-specific features (default paths, .gitignore management). + Remains a v1 model because WtSiteConfig (v1) uses it as a field. + """ - cred_filepath: Optional[ - Union[Union[str, PossibleFilePath], List[Union[str, PossibleFilePath]]] - ] + cred_filepath: Optional[Union[Union[str, Path], List[Union[str, Path]]]] = None """Filepath to yaml file with credentials for osw and connected services""" - cert_filepath: Optional[ - Union[Union[str, PossibleFilePath], List[Union[str, PossibleFilePath]]] - ] + cert_filepath: Optional[Union[Union[str, Path], List[Union[str, Path]]]] = None """Filepath to the certificates for osw and connected services""" - _credentials: List[BaseCredential] = PrivateAttr([]) + _credentials: List[CredentialManager.BaseCredential] = PrivateAttr([]) """in memory credential store""" class BaseCredential(OswBaseModel): @@ -90,15 +95,47 @@ def __init__(self, **data): if not isinstance(self.cred_filepath, list): self.cred_filepath = [self.cred_filepath] self.cred_filepath = [Path(fp) for fp in self.cred_filepath if fp != ""] - # Make sure to at least warn the user if they pass cred_filepath instead of - # cred_filepath - attribute_names = self.__dict__.keys() - unexpected_kwargs = [key for key in data.keys() if key not in attribute_names] - if unexpected_kwargs: - warn(f"Unexpected keyword argument(s): {', '.join(unexpected_kwargs)}") + + @staticmethod + def _oold_to_osw(oold_cred) -> CredentialManager.BaseCredential: + """Convert an oold BaseCredential to an osw credential (plain str passwords).""" + from oold.backend.auth import OAuth1Credential as _OoldOAuth1 + + if isinstance(oold_cred, _OoldOAuth1): + return CredentialManager.OAuth1Credential( + iri=oold_cred.iri, + consumer_token=oold_cred.consumer_token, + consumer_secret=_secret_to_str(oold_cred.consumer_secret), + access_token=oold_cred.access_token, + access_secret=_secret_to_str(oold_cred.access_secret), + ) + if isinstance(oold_cred, _OoldUserPwdCredential): + return CredentialManager.UserPwdCredential( + iri=oold_cred.iri, + username=oold_cred.username, + password=_secret_to_str(oold_cred.password), + ) + return CredentialManager.BaseCredential(iri=oold_cred.iri) + + def _load_file_credentials(self): + """Load credentials from YAML files using oold, return as dict.""" + all_creds = {} + if self.cred_filepath: + for fp in self.cred_filepath: + fp = Path(fp) + if not fp.exists(): + continue + try: + loaded = _load_credentials(fp, into_store=False) + all_creds.update(loaded) + except Exception as exc: + print(exc) + return all_creds def get_credential(self, config: CredentialConfig) -> BaseCredential: - """Reads credentials from a yaml file or the in memory store + """Reads credentials from a yaml file or the in memory store. + + Uses oold.backend.auth.find_credential for IRI matching. Parameters ---------- @@ -111,78 +148,36 @@ def get_credential(self, config: CredentialConfig) -> BaseCredential: Credential, contain attributes 'username' and 'password' and the matching iri. """ + oold_creds = self._load_file_credentials() - _file_credentials: List[CredentialManager.BaseCredential] = [] - if self.cred_filepath: - filepaths = self.cred_filepath - if type(filepaths) is not list: - filepaths = [filepaths] + for osw_cred in self._credentials: + oold_creds[osw_cred.iri] = osw_cred - for filepath in filepaths: - if not filepath.exists(): - continue - with open(filepath, "r", encoding="utf-8") as stream: - try: - accounts = yaml.safe_load(stream) - if accounts is None: # Catch empty file - continue - for iri in accounts.keys(): - if ( - "username" in accounts[iri] - and "password" in accounts[iri] - ): - cred = CredentialManager.UserPwdCredential( - username=accounts[iri]["username"], - password=accounts[iri]["password"], - iri=iri, - ) - _file_credentials.append(cred) - if ( - "consumer_token" in accounts[iri] - and "consumer_secret" in accounts[iri] - and "access_token" in accounts[iri] - and "access_secret" in accounts[iri] - ): - cred = CredentialManager.OAuth1Credential( - consumer_token=accounts[iri]["consumer_token"], - consumer_secret=accounts[iri]["consumer_secret"], - access_token=accounts[iri]["access_token"], - access_secret=accounts[iri]["access_secret"], - iri=iri, - ) - _file_credentials.append(cred) - except yaml.YAMLError as exc: - print(exc) - - match_iri = "" - cred = None - creds = _file_credentials + self._credentials - for _cred in creds: - iri = _cred.iri - if config.iri in iri: - if match_iri == "" or len(match_iri) > len( - iri - ): # use the less specific match - match_iri = iri - cred = _cred - - if cred is None: - if config.fallback is CredentialManager.CredentialFallback.ask: - if self.cred_filepath: - filepath_str = "', '".join([str(fp) for fp in self.cred_filepath]) - print( - f"No credentials for {config.iri} found in path '{filepath_str}'. " - f"Please use the prompt to login" - ) - username = input("Enter username: ") - password = getpass.getpass("Enter password: ") - cred = CredentialManager.UserPwdCredential( - username=username, password=password, iri=config.iri + match = _find_credential(config.iri, oold_creds) + + if match is not None: + if isinstance(match, CredentialManager.BaseCredential): + return match + return self._oold_to_osw(match) + + if config.fallback is CredentialManager.CredentialFallback.ask: + if self.cred_filepath: + filepath_str = "', '".join([str(fp) for fp in self.cred_filepath]) + print( + f"No credentials for {config.iri} found in path '{filepath_str}'. " + f"Please use the prompt to login" ) - self.add_credential(cred) - if self.cred_filepath: - self.save_credentials_to_file() - return cred + username = input("Enter username: ") + password = getpass.getpass("Enter password: ") + cred = CredentialManager.UserPwdCredential( + username=username, password=password, iri=config.iri + ) + self.add_credential(cred) + if self.cred_filepath: + self.save_credentials_to_file() + return cred + + return None def add_credential(self, cred: BaseCredential): """adds a credential to the in memory store @@ -232,7 +227,7 @@ def iri_in_file(self, iri: str) -> bool: with open(fp, "r", encoding="utf-8") as stream: try: accounts = yaml.safe_load(stream) - if accounts is None: # Catch empty file + if accounts is None: continue for iri_ in accounts.keys(): if iri_ == iri: @@ -243,7 +238,7 @@ def iri_in_file(self, iri: str) -> bool: def save_credentials_to_file( self, - filepath: Union[str, PossibleFilePath] = None, + filepath: Union[str, Path] = None, set_cred_filepath: bool = False, ): """Saves the in memory credentials to a file @@ -259,13 +254,11 @@ def save_credentials_to_file( cred_filepath of the CredentialManager is not changed. """ cred_filepaths = [filepath] - """The filepath to save the credentials to.""" if filepath is None: cred_filepaths = self.cred_filepath if self.cred_filepath is None: cred_filepaths = [default_paths.cred_filepath] if set_cred_filepath: - # Creates error if file does not exist -> Using custom FilePath self.cred_filepath = cred_filepaths for fp in cred_filepaths: file = Path(fp) @@ -275,7 +268,7 @@ def save_credentials_to_file( file_already_exists = file.exists() if file_already_exists: data = yaml.safe_load(file.read_text(encoding="utf-8")) - if data is None: # Catch empty file + if data is None: data = {} for cred in self._credentials: data[cred.iri] = cred.dict(exclude={"iri"}) @@ -316,7 +309,6 @@ def save_credentials_to_file( f"'{gitignore_fp}'." ) containing_gitignore = gitignore_fp.parent.absolute() - if containing_gitignore in default_paths.osw_files_dir.parents: # If the default_path.osw_files_dir is a subdirectory of the directory # containing the .gitignore file, add the relative path to the diff --git a/src/osw/utils/oold.py b/src/osw/utils/oold.py index 231d338..62c2cb1 100644 --- a/src/osw/utils/oold.py +++ b/src/osw/utils/oold.py @@ -1,519 +1,22 @@ -"""Contains essential functions for working with JSON, JSON-SCHEMA -and JSON-LD context objects. Python implementation of -https://github.com/OpenSemanticLab/mediawiki-extensions-MwJson/blob/main/modules/ext.MwJson.util/MwJson_util.js +"""Forwarding stubs for oold.utils.json_tools. +TODO: remove in next major release """ -import json -from copy import deepcopy -from enum import Enum -from typing import Dict, Optional, TypeVar - -from pydantic import BaseModel -from typing_extensions import deprecated - -JsonType = TypeVar("JsonType", dict, list, float, int, str, None) - - -def deep_equal(x: JsonType, y: JsonType): - """Compares two objects deeply. - - Parameters - ---------- - x - a dictionary, list or scalar value - y - another dictionary, list or scalar value - - Returns - ------- - True if the two objects are deeply equal, False otherwise - """ - - if x is not None and y is not None and isinstance(x, dict) and isinstance(y, dict): - return len(x.keys()) == len(y.keys()) and all( - deep_equal(x[key], y.get(key, None)) for key in x - ) - elif ( - x is not None and y is not None and isinstance(x, list) and isinstance(y, list) - ): - return len(x) == len(y) and all( - deep_equal(x[key], y[key]) for key in range(0, len(x)) - ) - else: - return x == y - # all(deep_equal(x[key], y.get(key)) for key in x) or x == y - - -def unique_array(array: list) -> list: - """Returns a new array with only unique elements by comparing them deeply. - - Parameters: - array: list - The array to be filtered - - Returns: - list - A new array with only unique elements - """ - result = [] - for item in array: - add = True - for added_item in result: - if deep_equal(added_item, item): - add = False - break - if add: - result.append(item) - return result - - -def is_object(obj): - """Tests if an object is a dictionary. - - Parameters - ---------- - obj - the object to be tested - - Returns - ------- - True if the object is a dictionary, False otherwise - """ - return isinstance(obj, dict) - - -def is_array(obj): - """Tests if an object is a list. - - Parameters - ---------- - obj - the object to be tested - - Returns - ------- - True if the object is a list, False otherwise - """ - return isinstance(obj, list) - - -def is_string(obj): - """Tests if an object is a string. - - Parameters - ---------- - obj - the object to be tested - - Returns - ------- - True if the object is a string, False otherwise - """ - return isinstance(obj, str) - - -def copy_deep(target: JsonType) -> JsonType: - """Copies an object deeply. - - Parameters - ---------- - target - the object which values will be copied - - Returns - ------- - the copied object - """ - return deepcopy(target) - - -@deprecated("Use merge_deep instead") -def merge_deep_objects(target: dict, source: dict) -> JsonType: - """Merges two objects deeply, does not handle lists. - If dictionaries are encountered, the values of the source object - will overwrite the target object. - Missing keys in the target object will be added. - If an array is encountered as a subelement, the arrays are - concatenated and duplicates are removed. - If literals are encountered, the source value will - overwrite the target value. - - Parameters - ---------- - target - the object which values will be potentially overwritten - source - the object which values will take precedence over the target object - - Returns - ------- - the merged object - """ - if not target: - return source - if not source: - return target - output = deepcopy(target) - if is_object(target) and is_object(source): - for key in source: - if is_array(source[key]) and is_array(target.get(key)): - if key not in target: - output[key] = source[key] - else: - output[key] = unique_array(target[key] + source[key]) - elif is_object(source[key]): - if key not in target: - output[key] = source[key] - else: - output[key] = merge_deep(target[key], source[key]) - else: - output[key] = source[key] - - return output - - -def merge_deep(target: JsonType, source: JsonType) -> JsonType: - """Merges two objects deeply. - If dictionaries are encountered, the values of the source object - will overwrite the target object. - Missing keys in the target object will be added. - If an array is encountered as a subelement, the arrays are - concatenated and duplicates are removed. - If literals are encountered, the source value will - overwrite the target value. - - Parameters - ---------- - target - the object which values will be potentially overwritten - source - the object which values will take precedence over the target object - - Returns - ------- - the merged object - """ - if not target: - return source - if not source: - return target - output = deepcopy(target) - - if is_object(target) and is_object(source): - for key in source: - output[key] = merge_deep(output.get(key, None), source[key]) - elif is_array(source) and is_array(target): - output = unique_array(target + source) - else: - output = source - return output - - -def merge_jsonld_context_object_list(context: list) -> list: - """to cleanup generated json-ld context - ["/some/remove/context", {"a": "ex:a"}, {"a": "ex:a", "b": "ex:b"}] - => ["/some/remove/context", {"a": "ex:a", "b": "ex:b"}] - - Parameters - ---------- - list - mixed list of strings and dictionaries - """ - - # interate over all elements - # if element is a string, add it to the result list - # if element is a dictionary, merge it with the last dictionary in the - # result list - - # if not a list, return immediately - if not is_array(context): - return context - - result = [] - last = None - for e in context: - if is_object(e): - if last is None: - last = e - else: - last = merge_deep(last, e) - else: - if last is not None: - result.append(last) - last = None - result.append(e) - if last is not None: - result.append(last) - return result - - -class AggregateGeneratedSchemasParamMode(str, Enum): - ROOT_LEVEL = "root_level" - """ The generated schema is merged at the root level """ - DEFINITIONS_SECTION = "definitions_section" - """ The generated schema is merged into the definitions section """ - - -class AggregateGeneratedSchemasParam(BaseModel): - target_schema: Optional[dict] = {} - """ The target schema to be merged with the generated schema """ - generated_schemas: Dict[str, dict] - """ List of JSON schemas to be aggregated """ - mode: AggregateGeneratedSchemasParamMode = ( - AggregateGeneratedSchemasParamMode.ROOT_LEVEL - ) - """ The mode to be used for aggregation """ - def_key: Optional[str] = "$defs" - """ The keyword for schema definitions. $defs is recommended""" - gen_def_key: Optional[str] = "generated" - """ The keyword to store the generated schema. - Note: Having a separate section per generated schema would lead - to many partial classes in code generation """ - generate_root_ref: Optional[bool] = False - """ If true, generate $ref: "#/def...", else allOf: [{$ref: "#/def...""}. - Root refs are not supported by json_ref_parser < 0.10 and data-model-codegen """ - gen_def_pointer: Optional[str] = None - """ The pointer to the generated schema. If None, it will be set to - "#/" + def_key + "/" + gen_def_key """ - - def __init__(self, **data): - super().__init__(**data) - if self.gen_def_pointer is None: - self.gen_def_pointer = "#/" + self.def_key + "/" + self.gen_def_key - - -class AggregateGeneratedSchemasResult(BaseModel): - aggregated_schema: dict - """ The aggregated schema """ - - -def aggregate_generated_schemas( - param: AggregateGeneratedSchemasParam, -) -> AggregateGeneratedSchemasResult: - """Applies a merge operation on two OO-LD schemas. - - Parameters - ---------- - param - see AggregateGeneratedSchemasParam - - Returns - ------- - see AggregateGeneratedSchemasResult - """ - mode = param.mode - def_key = param.def_key - gen_def_key = param.gen_def_key - gen_def_pointer = param.gen_def_pointer - generate_root_ref = param.generate_root_ref - schema = param.target_schema - - for generated_schema_id in param.generated_schemas.keys(): - generated_schema = param.generated_schemas[generated_schema_id] - if mode == AggregateGeneratedSchemasParamMode.ROOT_LEVEL: - schema = merge_deep(schema, generated_schema) - else: - # Store generated schema in #/$defs/generated (force overwrite), - # add $ref: #/$defs/generated to schema - # note: using $def with $ leads to recursion error in - # note: requires addition schema properties are allowed on the - # same level as $ref. allOf: $ref would imply a superclass - if "@context" in generated_schema: - generated_context = copy_deep(generated_schema["@context"]) - del generated_schema["@context"] - existing_context = schema.get("@context", None) - if existing_context is not None: - # case A: "" + "" => ["", ""] - # case B: "" + {} => ["", {}] - # case C: "" + [] => ["", ] - # case D: [] + {} => [, {}] - # case E: {} + {} => {} - # case F: [] + [] => [] - - if is_array(existing_context) and not is_array(generated_context): - generated_context = [generated_context] - # case C + D - elif not is_array(existing_context) and is_array(generated_context): - existing_context = [existing_context] - # case C + D - elif not is_array(existing_context) and not is_array( - generated_context - ): - if is_string(existing_context) or is_string( - existing_context - ): # case A + B - generated_context = [generated_context] - existing_context = [existing_context] - # case E + F: nothing to do - schema["@context"] = merge_deep( - {"@context": existing_context}, {"@context": generated_context} - )["@context"] - if is_array(schema["@context"]): - schema["@context"] = merge_jsonld_context_object_list( - schema["@context"] - ) - - if def_key not in schema: - schema[def_key] = {} - if gen_def_key not in schema[def_key]: - schema[def_key][gen_def_key] = { - "$comment": "Autogenerated section - do not edit. Generated from" - } - schema[def_key][gen_def_key]["$comment"] += " " + generated_schema_id - # schema[def_key][gen_def_key] = generated_schema; # full override - schema[def_key][gen_def_key] = merge_deep( - schema[def_key][gen_def_key], generated_schema - ) - # merge - - if generate_root_ref: - if "$ref" in schema and schema["$ref"] != gen_def_pointer: - print( - "Error while applying generated schema: $ref already set to " - + schema["$ref"] - ) - else: - schema["$ref"] = gen_def_pointer - else: - if "allOf" not in schema: - schema["allOf"] = [] - # check if any allOf already points to the generated schema - exists = any( - [allOf["$ref"] == gen_def_pointer for allOf in schema["allOf"]] - ) - if not exists: - schema["allOf"].append({"$ref": gen_def_pointer}) - if "title" in generated_schema: - schema["title"] = generated_schema["title"] - schema[def_key][gen_def_key]["title"] = ( - "Generated" + generated_schema["title"] - ) - schema[def_key][gen_def_key]["description"] = ( - "This is an autogenerated partial class definition of '" - + generated_schema["title"] - + "'" - ) - if "description" in generated_schema: - schema["description"] = generated_schema["description"] - - return AggregateGeneratedSchemasResult(aggregated_schema=schema) - - -def merge_generated_definitions(schema: JsonType): - """Merges the generated definitions into the schema. - Example: - { - "$defs": { - "generated": { - "title": "Generated", - "description": "Generated schema", - "description*": { - "en": "Actual description", - } - } - }, - "$ref": "#/$defs/generated", - "title": "Overwritten" - } - will be merged into the schema - { - "title": "Overwritten", - "description": "Actual description" - } - """ - - generated_content = schema.get("$defs", {}).get("generated") - - if generated_content: - # Check if "$ref": "#/$defs/generated" exists directly in the dictionary - if schema.get("$ref") == "#/$defs/generated": - - # if schema has no description use the multi-lang description - # (pref: en, else first) in the $defs section - if ( - "description" not in schema - or schema["description"] is None - or schema["description"] == "" - ): - if ( - "description*" in generated_content - and len(generated_content["description*"]) > 0 - ): - first_key = list(generated_content["description*"].keys())[0] - schema["description"] = generated_content["description*"].get( - "en", generated_content["description*"][first_key] - ) - else: - schema["description"] = generated_content.get("description") - - schema = merge_deep(deepcopy(generated_content), schema) - schema.pop("$ref", None) # Remove the reference after merging - - # Check if "$ref": "#/$defs/generated" is contained in "allOf" - if "allOf" in schema: - for _index, item in enumerate(schema["allOf"]): - if item.get("$ref") == "#/$defs/generated": - # Remove the reference after merging - for i, v in enumerate(schema["allOf"]): - if v.get("$ref") == "#/$defs/generated": - del schema["allOf"][i] - break - # if schema has no description use the multi-lang description - # (pref: en, else first) in the $defs section - if ( - "description" not in schema - or schema["description"] is None - or schema["description"] == "" - ): - if ( - "description*" in generated_content - and len(generated_content["description*"]) > 0 - ): - first_key = list(generated_content["description*"].keys())[ - 0 - ] - schema["description"] = generated_content[ - "description*" - ].get("en", generated_content["description*"][first_key]) - else: - schema["description"] = generated_content.get("description") - schema = merge_deep(deepcopy(generated_content), schema) - break - - # Remove "generated" from "$defs" - schema.get("$defs", {}).pop("generated", None) - return schema - - -def escape_json_strings(obj: JsonType) -> JsonType: - """replace double quotes `"` with escaped double quotes `\"` in - and non-standard escape-squences in strings. - If the object is a string, the escaped string is returned. - If the object is a list, the function is called recursively for each element. - If the object is a dictionary, the function is called recursively for each value. - Else the object is returned as is. - - Parameters - ---------- - obj - the object to handle - - Returns - ------- - returns the object with double quotes escaped if applicable - """ - if isinstance(obj, str): - # Escape double quotes in string - # Replace invalid backslashes outside of math environments - return json.dumps(obj)[1:-1] - elif isinstance(obj, list): - # Iterate over array elements - return [escape_json_strings(item) for item in obj] - elif isinstance(obj, dict): - # Iterate over object properties - escaped_obj = {} - for key, value in obj.items(): - escaped_obj[key] = escape_json_strings(value) - return escaped_obj - # Return the value as is for non-string, non-object types - return obj +from oold.utils.json_tools import ( # noqa: F401 + AggregateGeneratedSchemasParam, + AggregateGeneratedSchemasParamMode, + AggregateGeneratedSchemasResult, + JsonType, + aggregate_generated_schemas, + copy_deep, + deep_equal, + escape_json_strings, + is_array, + is_object, + is_string, + merge_deep, + merge_deep_objects, + merge_generated_definitions, + merge_jsonld_context_object_list, + unique_array, +)