Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ python_requires = >=3.10
# new major versions. This works if the required packages follow Semantic Versioning.
# For more information, check out https://semver.org/.
install_requires =
oold>=0.11.1
oold>=0.15.0
opensemantic
opensemantic.core>=0.57.4
opensemantic.base>=0.42.7
Expand Down
184 changes: 88 additions & 96 deletions src/osw/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,39 @@
import getpass
from enum import Enum
from pathlib import Path
from typing import TYPE_CHECKING, List, Optional, Union
from warnings import warn
from typing import List, Optional, Union

import yaml
from oold.backend.auth import UserPwdCredential as _OoldUserPwdCredential
from oold.backend.auth import find_credential as _find_credential
from oold.backend.auth import load_credentials as _load_credentials
from opensemantic.v1 import OswBaseModel
from pydantic.v1 import PrivateAttr

from osw.defaults import paths as default_paths

if TYPE_CHECKING:
PossibleFilePath = Path
else:
from osw.custom_types import PossibleFilePath

def _secret_to_str(v):
"""Unwrap SecretStr to plain str, pass through otherwise."""
if hasattr(v, "get_secret_value"):
return v.get_secret_value()
return v


class CredentialManager(OswBaseModel):
"""Handles credentials"""
"""Handles credentials.

Delegates YAML loading and IRI matching to oold.backend.auth,
adding osw-specific features (default paths, .gitignore management).
Remains a v1 model because WtSiteConfig (v1) uses it as a field.
"""

cred_filepath: Optional[
Union[Union[str, PossibleFilePath], List[Union[str, PossibleFilePath]]]
]
cred_filepath: Optional[Union[Union[str, Path], List[Union[str, Path]]]] = None
"""Filepath to yaml file with credentials for osw and connected services"""
cert_filepath: Optional[
Union[Union[str, PossibleFilePath], List[Union[str, PossibleFilePath]]]
]
cert_filepath: Optional[Union[Union[str, Path], List[Union[str, Path]]]] = None
"""Filepath to the certificates for osw and connected services"""

_credentials: List[BaseCredential] = PrivateAttr([])
_credentials: List[CredentialManager.BaseCredential] = PrivateAttr([])
"""in memory credential store"""

class BaseCredential(OswBaseModel):
Expand Down Expand Up @@ -90,15 +95,47 @@ def __init__(self, **data):
if not isinstance(self.cred_filepath, list):
self.cred_filepath = [self.cred_filepath]
self.cred_filepath = [Path(fp) for fp in self.cred_filepath if fp != ""]
# Make sure to at least warn the user if they pass cred_filepath instead of
# cred_filepath
attribute_names = self.__dict__.keys()
unexpected_kwargs = [key for key in data.keys() if key not in attribute_names]
if unexpected_kwargs:
warn(f"Unexpected keyword argument(s): {', '.join(unexpected_kwargs)}")

@staticmethod
def _oold_to_osw(oold_cred) -> CredentialManager.BaseCredential:
"""Convert an oold BaseCredential to an osw credential (plain str passwords)."""
from oold.backend.auth import OAuth1Credential as _OoldOAuth1

if isinstance(oold_cred, _OoldOAuth1):
return CredentialManager.OAuth1Credential(
iri=oold_cred.iri,
consumer_token=oold_cred.consumer_token,
consumer_secret=_secret_to_str(oold_cred.consumer_secret),
access_token=oold_cred.access_token,
access_secret=_secret_to_str(oold_cred.access_secret),
)
if isinstance(oold_cred, _OoldUserPwdCredential):
return CredentialManager.UserPwdCredential(
iri=oold_cred.iri,
username=oold_cred.username,
password=_secret_to_str(oold_cred.password),
)
return CredentialManager.BaseCredential(iri=oold_cred.iri)

def _load_file_credentials(self):
"""Load credentials from YAML files using oold, return as dict."""
all_creds = {}
if self.cred_filepath:
for fp in self.cred_filepath:
fp = Path(fp)
if not fp.exists():
continue
try:
loaded = _load_credentials(fp, into_store=False)
all_creds.update(loaded)
except Exception as exc:
print(exc)
return all_creds

def get_credential(self, config: CredentialConfig) -> BaseCredential:
"""Reads credentials from a yaml file or the in memory store
"""Reads credentials from a yaml file or the in memory store.

Uses oold.backend.auth.find_credential for IRI matching.

Parameters
----------
Expand All @@ -111,78 +148,36 @@ def get_credential(self, config: CredentialConfig) -> BaseCredential:
Credential, contain attributes 'username' and 'password' and
the matching iri.
"""
oold_creds = self._load_file_credentials()

_file_credentials: List[CredentialManager.BaseCredential] = []
if self.cred_filepath:
filepaths = self.cred_filepath
if type(filepaths) is not list:
filepaths = [filepaths]
for osw_cred in self._credentials:
oold_creds[osw_cred.iri] = osw_cred

for filepath in filepaths:
if not filepath.exists():
continue
with open(filepath, "r", encoding="utf-8") as stream:
try:
accounts = yaml.safe_load(stream)
if accounts is None: # Catch empty file
continue
for iri in accounts.keys():
if (
"username" in accounts[iri]
and "password" in accounts[iri]
):
cred = CredentialManager.UserPwdCredential(
username=accounts[iri]["username"],
password=accounts[iri]["password"],
iri=iri,
)
_file_credentials.append(cred)
if (
"consumer_token" in accounts[iri]
and "consumer_secret" in accounts[iri]
and "access_token" in accounts[iri]
and "access_secret" in accounts[iri]
):
cred = CredentialManager.OAuth1Credential(
consumer_token=accounts[iri]["consumer_token"],
consumer_secret=accounts[iri]["consumer_secret"],
access_token=accounts[iri]["access_token"],
access_secret=accounts[iri]["access_secret"],
iri=iri,
)
_file_credentials.append(cred)
except yaml.YAMLError as exc:
print(exc)

match_iri = ""
cred = None
creds = _file_credentials + self._credentials
for _cred in creds:
iri = _cred.iri
if config.iri in iri:
if match_iri == "" or len(match_iri) > len(
iri
): # use the less specific match
match_iri = iri
cred = _cred

if cred is None:
if config.fallback is CredentialManager.CredentialFallback.ask:
if self.cred_filepath:
filepath_str = "', '".join([str(fp) for fp in self.cred_filepath])
print(
f"No credentials for {config.iri} found in path '{filepath_str}'. "
f"Please use the prompt to login"
)
username = input("Enter username: ")
password = getpass.getpass("Enter password: ")
cred = CredentialManager.UserPwdCredential(
username=username, password=password, iri=config.iri
match = _find_credential(config.iri, oold_creds)

if match is not None:
if isinstance(match, CredentialManager.BaseCredential):
return match
return self._oold_to_osw(match)

if config.fallback is CredentialManager.CredentialFallback.ask:
if self.cred_filepath:
filepath_str = "', '".join([str(fp) for fp in self.cred_filepath])
print(
f"No credentials for {config.iri} found in path '{filepath_str}'. "
f"Please use the prompt to login"
)
self.add_credential(cred)
if self.cred_filepath:
self.save_credentials_to_file()
return cred
username = input("Enter username: ")
password = getpass.getpass("Enter password: ")
cred = CredentialManager.UserPwdCredential(
username=username, password=password, iri=config.iri
)
self.add_credential(cred)
if self.cred_filepath:
self.save_credentials_to_file()
return cred

return None

def add_credential(self, cred: BaseCredential):
"""adds a credential to the in memory store
Expand Down Expand Up @@ -232,7 +227,7 @@ def iri_in_file(self, iri: str) -> bool:
with open(fp, "r", encoding="utf-8") as stream:
try:
accounts = yaml.safe_load(stream)
if accounts is None: # Catch empty file
if accounts is None:
continue
for iri_ in accounts.keys():
if iri_ == iri:
Expand All @@ -243,7 +238,7 @@ def iri_in_file(self, iri: str) -> bool:

def save_credentials_to_file(
self,
filepath: Union[str, PossibleFilePath] = None,
filepath: Union[str, Path] = None,
set_cred_filepath: bool = False,
):
"""Saves the in memory credentials to a file
Expand All @@ -259,13 +254,11 @@ def save_credentials_to_file(
cred_filepath of the CredentialManager is not changed.
"""
cred_filepaths = [filepath]
"""The filepath to save the credentials to."""
if filepath is None:
cred_filepaths = self.cred_filepath
if self.cred_filepath is None:
cred_filepaths = [default_paths.cred_filepath]
if set_cred_filepath:
# Creates error if file does not exist -> Using custom FilePath
self.cred_filepath = cred_filepaths
for fp in cred_filepaths:
file = Path(fp)
Expand All @@ -275,7 +268,7 @@ def save_credentials_to_file(
file_already_exists = file.exists()
if file_already_exists:
data = yaml.safe_load(file.read_text(encoding="utf-8"))
if data is None: # Catch empty file
if data is None:
data = {}
for cred in self._credentials:
data[cred.iri] = cred.dict(exclude={"iri"})
Expand Down Expand Up @@ -316,7 +309,6 @@ def save_credentials_to_file(
f"'{gitignore_fp}'."
)
containing_gitignore = gitignore_fp.parent.absolute()

if containing_gitignore in default_paths.osw_files_dir.parents:
# If the default_path.osw_files_dir is a subdirectory of the directory
# containing the .gitignore file, add the relative path to the
Expand Down
Loading
Loading