|
11 | 11 |
|
12 | 12 | from tuf.api import exceptions |
13 | 13 | from tuf.api._payload import Root, T, Targets |
| 14 | +from tuf.api.dsse import Envelope |
14 | 15 | from tuf.api.metadata import Metadata |
15 | 16 |
|
16 | 17 | Delegator = Union[Root, Targets] |
@@ -83,3 +84,49 @@ def unwrap( |
83 | 84 | delegator.verify_delegate(role_name, md.signed_bytes, md.signatures) |
84 | 85 |
|
85 | 86 | return md.signed, md.signed_bytes, md.signatures |
| 87 | + |
| 88 | + |
| 89 | +class EnvelopeUnwrapper(Unwrapper): |
| 90 | + """Unwrapper implementation for Envelope payloads. |
| 91 | +
|
| 92 | + Order of unwrapping: |
| 93 | + 1. Deserializer wrapper only |
| 94 | + 2. Validate outer payload type |
| 95 | + 3. Verify signatures |
| 96 | + 4. Validate inner payload type |
| 97 | + 5. Deserialize payload |
| 98 | +
|
| 99 | + """ |
| 100 | + |
| 101 | + @staticmethod |
| 102 | + def _validate_envelope_payload_type(envelope: Envelope) -> None: |
| 103 | + # pylint: disable=protected-access |
| 104 | + if envelope.payload_type != Envelope._DEFAULT_PAYLOAD_TYPE: |
| 105 | + raise exceptions.RepositoryError( |
| 106 | + f"Expected '{Envelope._DEFAULT_PAYLOAD_TYPE}', " |
| 107 | + f"got '{envelope.payload_type}'" |
| 108 | + ) |
| 109 | + |
| 110 | + def unwrap( |
| 111 | + self, |
| 112 | + role_cls: Type[T], |
| 113 | + wrapper: bytes, |
| 114 | + delegator: Optional[Delegator] = None, |
| 115 | + role_name: Optional[str] = None, |
| 116 | + ) -> Tuple[T, bytes, Dict[str, Signature]]: # noqa: D102 |
| 117 | + envelope = Envelope[T].from_bytes(wrapper) |
| 118 | + |
| 119 | + # TODO: Envelope stores signatures as list, but `verify_delegate` |
| 120 | + # expects a dict. Should we change the envelope model? |
| 121 | + signatures = {sig.keyid: sig for sig in envelope.signatures} |
| 122 | + |
| 123 | + self._validate_envelope_payload_type(envelope) |
| 124 | + if delegator: |
| 125 | + if role_name is None: |
| 126 | + role_name = role_cls.type |
| 127 | + delegator.verify_delegate(role_name, envelope.pae(), signatures) |
| 128 | + |
| 129 | + signed = envelope.get_signed() |
| 130 | + self._validate_signed_type(signed, role_cls) |
| 131 | + |
| 132 | + return signed, envelope.pae(), signatures |
0 commit comments