Skip to content

Commit 52fa73a

Browse files
author
Lukas Pühringer
authored
Merge pull request #2436 from lukpueh/add-dsse
Add basic DSSE equivalent for Metadata API and configurable DSSE support in ngclient
2 parents 3077932 + 4005e76 commit 52fa73a

10 files changed

Lines changed: 2353 additions & 1929 deletions

File tree

examples/uploader/_localrepo.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,10 @@ def open(self, role: str) -> Metadata:
6363
# if there is a metadata version fetched from remote, use that
6464
# HACK: access Updater internals
6565
if role in self.updater._trusted_set:
66-
return copy.deepcopy(self.updater._trusted_set[role])
66+
# NOTE: The original signature wrapper (Metadata) was verified and
67+
# discarded upon inclusion in the trusted set. It is safe to use
68+
# a fresh wrapper. `close` will override existing signatures anyway.
69+
return Metadata(copy.deepcopy(self.updater._trusted_set[role]))
6770

6871
# otherwise we're creating metadata from scratch
6972
md = Metadata(Targets())

tests/test_api.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import unittest
1616
from copy import copy, deepcopy
1717
from datetime import datetime, timedelta
18+
from pathlib import Path
1819
from typing import Any, ClassVar, Dict, Optional
1920

2021
from securesystemslib import exceptions as sslib_exceptions
@@ -33,6 +34,7 @@
3334

3435
from tests import utils
3536
from tuf.api import exceptions
37+
from tuf.api.dsse import SimpleEnvelope
3638
from tuf.api.metadata import (
3739
TOP_LEVEL_ROLE_NAMES,
3840
DelegatedRole,
@@ -1144,6 +1146,95 @@ def test_delegations_get_delegated_role(self) -> None:
11441146
)
11451147

11461148

1149+
class TestSimpleEnvelope(unittest.TestCase):
1150+
"""Tests for public API in 'tuf/api/dsse.py'."""
1151+
1152+
@classmethod
1153+
def setUpClass(cls) -> None:
1154+
repo_data_dir = Path(utils.TESTS_DIR) / "repository_data"
1155+
cls.metadata_dir = repo_data_dir / "repository" / "metadata"
1156+
cls.signer_store = {}
1157+
for role in [Snapshot, Targets, Timestamp]:
1158+
key_path = repo_data_dir / "keystore" / f"{role.type}_key"
1159+
key = import_ed25519_privatekey_from_file(
1160+
str(key_path),
1161+
password="password",
1162+
)
1163+
cls.signer_store[role.type] = SSlibSigner(key)
1164+
1165+
def test_serialization(self) -> None:
1166+
"""Basic de/serialization test.
1167+
1168+
1. Load test metadata for each role
1169+
2. Wrap metadata payloads in envelope serializing the payload
1170+
3. Serialize envelope
1171+
4. De-serialize envelope
1172+
5. De-serialize payload
1173+
1174+
"""
1175+
for role in [Root, Timestamp, Snapshot, Targets]:
1176+
metadata_path = self.metadata_dir / f"{role.type}.json"
1177+
metadata = Metadata.from_file(str(metadata_path))
1178+
self.assertIsInstance(metadata.signed, role)
1179+
1180+
envelope = SimpleEnvelope.from_signed(metadata.signed)
1181+
envelope_bytes = envelope.to_bytes()
1182+
1183+
envelope2 = SimpleEnvelope.from_bytes(envelope_bytes)
1184+
payload = envelope2.get_signed()
1185+
self.assertEqual(metadata.signed, payload)
1186+
1187+
def test_fail_envelope_serialization(self) -> None:
1188+
envelope = SimpleEnvelope(b"foo", "bar", ["baz"])
1189+
with self.assertRaises(SerializationError):
1190+
envelope.to_bytes()
1191+
1192+
def test_fail_envelope_deserialization(self) -> None:
1193+
with self.assertRaises(DeserializationError):
1194+
SimpleEnvelope.from_bytes(b"[")
1195+
1196+
def test_fail_payload_serialization(self) -> None:
1197+
with self.assertRaises(SerializationError):
1198+
SimpleEnvelope.from_signed("foo") # type: ignore
1199+
1200+
def test_fail_payload_deserialization(self) -> None:
1201+
payloads = [b"[", b'{"_type": "foo"}']
1202+
for payload in payloads:
1203+
envelope = SimpleEnvelope(payload, "bar", [])
1204+
with self.assertRaises(DeserializationError):
1205+
envelope.get_signed()
1206+
1207+
def test_verify_delegate(self) -> None:
1208+
"""Basic verification test.
1209+
1210+
1. Load test metadata for each role
1211+
2. Wrap non-root payloads in envelope serializing the payload
1212+
3. Sign with correct delegated key
1213+
4. Verify delegate with root
1214+
1215+
"""
1216+
root_path = self.metadata_dir / "root.json"
1217+
root = Metadata[Root].from_file(str(root_path)).signed
1218+
1219+
for role in [Timestamp, Snapshot, Targets]:
1220+
metadata_path = self.metadata_dir / f"{role.type}.json"
1221+
metadata = Metadata.from_file(str(metadata_path))
1222+
self.assertIsInstance(metadata.signed, role)
1223+
1224+
signer = self.signer_store[role.type]
1225+
self.assertIn(
1226+
signer.key_dict["keyid"], root.roles[role.type].keyids
1227+
)
1228+
1229+
envelope = SimpleEnvelope.from_signed(metadata.signed)
1230+
envelope.sign(signer)
1231+
self.assertTrue(len(envelope.signatures) == 1)
1232+
1233+
root.verify_delegate(
1234+
role.type, envelope.pae(), envelope.signatures_dict
1235+
)
1236+
1237+
11471238
# Run unit test.
11481239
if __name__ == "__main__":
11491240
utils.configure_test_logging(sys.argv)

tests/test_trusted_metadata_set.py

Lines changed: 93 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,22 @@
1515

1616
from tests import utils
1717
from tuf.api import exceptions
18+
from tuf.api.dsse import SimpleEnvelope
1819
from tuf.api.metadata import (
1920
Metadata,
2021
MetaFile,
2122
Root,
23+
Signed,
2224
Snapshot,
2325
Targets,
2426
Timestamp,
2527
)
2628
from tuf.api.serialization.json import JSONSerializer
27-
from tuf.ngclient._internal.trusted_metadata_set import TrustedMetadataSet
29+
from tuf.ngclient._internal.trusted_metadata_set import (
30+
TrustedMetadataSet,
31+
_load_from_simple_envelope,
32+
)
33+
from tuf.ngclient.config import EnvelopeType
2834

2935
logger = logging.getLogger(__name__)
3036

@@ -93,7 +99,9 @@ def hashes_length_modifier(timestamp: Timestamp) -> None:
9399
)
94100

95101
def setUp(self) -> None:
96-
self.trusted_set = TrustedMetadataSet(self.metadata[Root.type])
102+
self.trusted_set = TrustedMetadataSet(
103+
self.metadata[Root.type], EnvelopeType.METADATA
104+
)
97105

98106
def _update_all_besides_targets(
99107
self,
@@ -132,7 +140,7 @@ def test_update(self) -> None:
132140

133141
count = 0
134142
for md in self.trusted_set:
135-
self.assertIsInstance(md, Metadata)
143+
self.assertIsInstance(md, Signed)
136144
count += 1
137145

138146
self.assertTrue(count, 6)
@@ -149,11 +157,11 @@ def test_update_metadata_output(self) -> None:
149157
delegeted_targets_2 = self.trusted_set.update_delegated_targets(
150158
self.metadata["role2"], "role2", "role1"
151159
)
152-
self.assertIsInstance(timestamp.signed, Timestamp)
153-
self.assertIsInstance(snapshot.signed, Snapshot)
154-
self.assertIsInstance(targets.signed, Targets)
155-
self.assertIsInstance(delegeted_targets_1.signed, Targets)
156-
self.assertIsInstance(delegeted_targets_2.signed, Targets)
160+
self.assertIsInstance(timestamp, Timestamp)
161+
self.assertIsInstance(snapshot, Snapshot)
162+
self.assertIsInstance(targets, Targets)
163+
self.assertIsInstance(delegeted_targets_1, Targets)
164+
self.assertIsInstance(delegeted_targets_2, Targets)
157165

158166
def test_out_of_order_ops(self) -> None:
159167
# Update snapshot before timestamp
@@ -192,25 +200,40 @@ def test_out_of_order_ops(self) -> None:
192200
self.metadata["role1"], "role1", Targets.type
193201
)
194202

195-
def test_root_with_invalid_json(self) -> None:
196-
# Test loading initial root and root update
197-
for test_func in [TrustedMetadataSet, self.trusted_set.update_root]:
198-
# root is not json
199-
with self.assertRaises(exceptions.RepositoryError):
200-
test_func(b"")
203+
def test_bad_initial_root(self) -> None:
204+
# root is not json
205+
with self.assertRaises(exceptions.RepositoryError):
206+
TrustedMetadataSet(b"", EnvelopeType.METADATA)
201207

202-
# root is invalid
203-
root = Metadata.from_bytes(self.metadata[Root.type])
204-
root.signed.version += 1
205-
with self.assertRaises(exceptions.UnsignedMetadataError):
206-
test_func(root.to_bytes())
208+
# root is invalid
209+
root = Metadata.from_bytes(self.metadata[Root.type])
210+
root.signed.version += 1
211+
with self.assertRaises(exceptions.UnsignedMetadataError):
212+
TrustedMetadataSet(root.to_bytes(), EnvelopeType.METADATA)
207213

208-
# metadata is of wrong type
209-
with self.assertRaises(exceptions.RepositoryError):
210-
test_func(self.metadata[Snapshot.type])
214+
# metadata is of wrong type
215+
with self.assertRaises(exceptions.RepositoryError):
216+
TrustedMetadataSet(
217+
self.metadata[Snapshot.type], EnvelopeType.METADATA
218+
)
219+
220+
def test_bad_root_update(self) -> None:
221+
# root is not json
222+
with self.assertRaises(exceptions.RepositoryError):
223+
self.trusted_set.update_root(b"")
224+
225+
# root is invalid
226+
root = Metadata.from_bytes(self.metadata[Root.type])
227+
root.signed.version += 1
228+
with self.assertRaises(exceptions.UnsignedMetadataError):
229+
self.trusted_set.update_root(root.to_bytes())
230+
231+
# metadata is of wrong type
232+
with self.assertRaises(exceptions.RepositoryError):
233+
self.trusted_set.update_root(self.metadata[Snapshot.type])
211234

212235
def test_top_level_md_with_invalid_json(self) -> None:
213-
top_level_md: List[Tuple[bytes, Callable[[bytes], Metadata]]] = [
236+
top_level_md: List[Tuple[bytes, Callable[[bytes], Signed]]] = [
214237
(self.metadata[Timestamp.type], self.trusted_set.update_timestamp),
215238
(self.metadata[Snapshot.type], self.trusted_set.update_snapshot),
216239
(self.metadata[Targets.type], self.trusted_set.update_targets),
@@ -260,7 +283,7 @@ def root_expired_modifier(root: Root) -> None:
260283

261284
# intermediate root can be expired
262285
root = self.modify_metadata(Root.type, root_expired_modifier)
263-
tmp_trusted_set = TrustedMetadataSet(root)
286+
tmp_trusted_set = TrustedMetadataSet(root, EnvelopeType.METADATA)
264287
# update timestamp to trigger final root expiry check
265288
with self.assertRaises(exceptions.ExpiredMetadataError):
266289
tmp_trusted_set.update_timestamp(self.metadata[Timestamp.type])
@@ -471,6 +494,52 @@ def target_expired_modifier(target: Targets) -> None:
471494

472495
# TODO test updating over initial metadata (new keys, newer timestamp, etc)
473496

497+
def test_load_from_simple_envelope(self) -> None:
498+
"""Basic unit test for ``_load_from_simple_envelope`` helper.
499+
500+
TODO: Test via trusted metadata set tests like for traditional metadata
501+
"""
502+
metadata = Metadata.from_bytes(self.metadata[Root.type])
503+
root = metadata.signed
504+
envelope = SimpleEnvelope.from_signed(root)
505+
506+
# Unwrap unsigned envelope without verification
507+
envelope_bytes = envelope.to_bytes()
508+
payload_obj, signed_bytes, signatures = _load_from_simple_envelope(
509+
Root, envelope_bytes
510+
)
511+
512+
self.assertEqual(payload_obj, root)
513+
self.assertEqual(signed_bytes, envelope.pae())
514+
self.assertDictEqual(signatures, {})
515+
516+
# Unwrap correctly signed envelope (use default role name)
517+
sig = envelope.sign(self.keystore[Root.type])
518+
envelope_bytes = envelope.to_bytes()
519+
_, _, signatures = _load_from_simple_envelope(
520+
Root, envelope_bytes, root
521+
)
522+
self.assertDictEqual(signatures, {sig.keyid: sig})
523+
524+
# Load correctly signed envelope (with explicit role name)
525+
_, _, signatures = _load_from_simple_envelope(
526+
Root, envelope.to_bytes(), root, Root.type
527+
)
528+
self.assertDictEqual(signatures, {sig.keyid: sig})
529+
530+
# Fail load envelope with unexpected 'payload_type'
531+
envelope_bad_type = SimpleEnvelope.from_signed(root)
532+
envelope_bad_type.payload_type = "foo"
533+
envelope_bad_type_bytes = envelope_bad_type.to_bytes()
534+
with self.assertRaises(exceptions.RepositoryError):
535+
_load_from_simple_envelope(Root, envelope_bad_type_bytes)
536+
537+
# Fail load envelope with unexpected payload type
538+
envelope_bad_signed = SimpleEnvelope.from_signed(root)
539+
envelope_bad_signed_bytes = envelope_bad_signed.to_bytes()
540+
with self.assertRaises(exceptions.RepositoryError):
541+
_load_from_simple_envelope(Targets, envelope_bad_signed_bytes)
542+
474543

475544
if __name__ == "__main__":
476545
utils.configure_test_logging(sys.argv)

tests/test_updater_ng.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ def test_updating_root(self) -> None:
285285
# Bump root version, resign and refresh
286286
self._modify_repository_root(lambda root: None, bump_version=True)
287287
self.updater.refresh()
288-
self.assertEqual(self.updater._trusted_set.root.signed.version, 2)
288+
self.assertEqual(self.updater._trusted_set.root.version, 2)
289289

290290
def test_missing_targetinfo(self) -> None:
291291
self.updater.refresh()

0 commit comments

Comments
 (0)