Skip to content

Commit 8eb6f08

Browse files
author
Lukas Puehringer
committed
ngclient: replace internal wrapping interface
The internal wrapping interface to case handle deserialization and verification of traditional metadata vs. simple envelopes inside trusted metadata set might be a more complicated solution than necessary. This removes the interface and instead adds the methods of the interface implementations as helpers to trusted metadata set, and updates it to to call one or the other function based on the envelope type configuration flag. Signed-off-by: Lukas Puehringer <lukas.puehringer@nyu.edu>
1 parent 1897f9a commit 8eb6f08

2 files changed

Lines changed: 87 additions & 149 deletions

File tree

tuf/ngclient/_internal/trusted_metadata_set.py

Lines changed: 87 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -64,19 +64,27 @@
6464
import datetime
6565
import logging
6666
from collections import abc
67-
from typing import Dict, Iterator, Optional, Union, cast
67+
from typing import Dict, Iterator, Optional, Tuple, Type, Union, cast
68+
69+
from securesystemslib.signer import Signature
6870

6971
from tuf.api import exceptions
70-
from tuf.api.metadata import Root, Signed, Snapshot, Targets, Timestamp
71-
from tuf.ngclient._internal.wrapping import (
72-
EnvelopeUnwrapper,
73-
MetadataUnwrapper,
74-
Unwrapper,
72+
from tuf.api.dsse import SimpleEnvelope
73+
from tuf.api.metadata import (
74+
Metadata,
75+
Root,
76+
Signed,
77+
Snapshot,
78+
T,
79+
Targets,
80+
Timestamp,
7581
)
7682
from tuf.ngclient.config import EnvelopeType
7783

7884
logger = logging.getLogger(__name__)
7985

86+
Delegator = Union[Root, Targets]
87+
8088

8189
class TrustedMetadataSet(abc.Mapping):
8290
"""Internal class to keep track of trusted metadata in ``Updater``.
@@ -101,17 +109,14 @@ def __init__(self, root_data: bytes, envelope_type: EnvelopeType):
101109
RepositoryError: Metadata failed to load or verify. The actual
102110
error type and content will contain more details.
103111
"""
104-
unwrapper: Unwrapper
105-
if envelope_type is EnvelopeType.SIMPLE:
106-
unwrapper = EnvelopeUnwrapper()
107-
else:
108-
unwrapper = MetadataUnwrapper()
109-
110-
self._unwrapper = unwrapper
111-
112112
self._trusted_set: Dict[str, Signed] = {}
113113
self.reference_time = datetime.datetime.utcnow()
114114

115+
if envelope_type is EnvelopeType.SIMPLE:
116+
self._load_data = _load_from_simple_envelope
117+
else:
118+
self._load_data = _load_from_metadata
119+
115120
# Load and validate the local root metadata. Valid initial trusted root
116121
# metadata is required
117122
logger.debug("Updating initial trusted root")
@@ -174,7 +179,7 @@ def update_root(self, data: bytes) -> Root:
174179
raise RuntimeError("Cannot update root after timestamp")
175180
logger.debug("Updating root")
176181

177-
new_root, new_root_bytes, new_root_signatures = self._unwrapper.unwrap(
182+
new_root, new_root_bytes, new_root_signatures = self._load_data(
178183
Root, data, self.root
179184
)
180185
if new_root.version != self.root.version + 1:
@@ -222,7 +227,7 @@ def update_timestamp(self, data: bytes) -> Timestamp:
222227
# No need to check for 5.3.11 (fast forward attack recovery):
223228
# timestamp/snapshot can not yet be loaded at this point
224229

225-
new_timestamp, _, _ = self._unwrapper.unwrap(Timestamp, data, self.root)
230+
new_timestamp, _, _ = self._load_data(Timestamp, data, self.root)
226231

227232
# If an existing trusted timestamp is updated,
228233
# check for a rollback attack
@@ -310,7 +315,7 @@ def update_snapshot(
310315
if not trusted:
311316
snapshot_meta.verify_length_and_hashes(data)
312317

313-
new_snapshot, _, _ = self._unwrapper.unwrap(Snapshot, data, self.root)
318+
new_snapshot, _, _ = self._load_data(Snapshot, data, self.root)
314319

315320
# version not checked against meta version to allow old snapshot to be
316321
# used in rollback protection: it is checked when targets is updated
@@ -411,7 +416,7 @@ def update_delegated_targets(
411416

412417
meta.verify_length_and_hashes(data)
413418

414-
new_delegate, _, _ = self._unwrapper.unwrap(
419+
new_delegate, _, _ = self._load_data(
415420
Targets, data, delegator, role_name
416421
)
417422

@@ -435,10 +440,73 @@ def _load_trusted_root(self, data: bytes) -> None:
435440
Note that an expired initial root is considered valid: expiry is
436441
only checked for the final root in ``update_timestamp()``.
437442
"""
438-
new_root, new_root_bytes, new_root_signatures = self._unwrapper.unwrap(
443+
new_root, new_root_bytes, new_root_signatures = self._load_data(
439444
Root, data
440445
)
441446
new_root.verify_delegate(Root.type, new_root_bytes, new_root_signatures)
442447

443448
self._trusted_set[Root.type] = new_root
444449
logger.debug("Loaded trusted root v%d", new_root.version)
450+
451+
452+
def _load_from_metadata(
453+
role: Type[T],
454+
data: bytes,
455+
delegator: Optional[Delegator] = None,
456+
role_name: Optional[str] = None,
457+
) -> Tuple[T, bytes, Dict[str, Signature]]: # noqa: D102
458+
"""Load traditional metadata bytes, and extract and verify payload.
459+
460+
If no delegator is passed, verification is skipped. Returns a tuple of
461+
deserialized payload, signed payload bytes, and signatures.
462+
"""
463+
md = Metadata[T].from_bytes(data)
464+
465+
if md.signed.type != role.type:
466+
raise exceptions.RepositoryError(
467+
f"Expected '{role.type}', got '{md.signed.type}'"
468+
)
469+
470+
if delegator:
471+
if role_name is None:
472+
role_name = role.type
473+
474+
delegator.verify_delegate(role_name, md.signed_bytes, md.signatures)
475+
476+
return md.signed, md.signed_bytes, md.signatures
477+
478+
479+
def _load_from_simple_envelope(
480+
role: Type[T],
481+
data: bytes,
482+
delegator: Optional[Delegator] = None,
483+
role_name: Optional[str] = None,
484+
) -> Tuple[T, bytes, Dict[str, Signature]]: # noqa: D102
485+
"""Load simple envelope bytes, and extract and verify payload.
486+
487+
If no delegator is passed, verification is skipped. Returns a tuple of
488+
deserialized payload, signed payload bytes, and signatures.
489+
"""
490+
491+
envelope = SimpleEnvelope[T].from_bytes(data)
492+
493+
if envelope.payload_type != SimpleEnvelope._DEFAULT_PAYLOAD_TYPE:
494+
raise exceptions.RepositoryError(
495+
f"Expected '{SimpleEnvelope._DEFAULT_PAYLOAD_TYPE}', "
496+
f"got '{envelope.payload_type}'"
497+
)
498+
499+
if delegator:
500+
if role_name is None:
501+
role_name = role.type
502+
delegator.verify_delegate(
503+
role_name, envelope.pae(), envelope.signatures_dict
504+
)
505+
506+
signed = envelope.get_signed()
507+
if signed.type != role.type:
508+
raise exceptions.RepositoryError(
509+
f"Expected '{role.type}', got '{signed.type}'"
510+
)
511+
512+
return signed, envelope.pae(), envelope.signatures_dict

tuf/ngclient/_internal/wrapping.py

Lines changed: 0 additions & 130 deletions
This file was deleted.

0 commit comments

Comments
 (0)