|
6 | 6 | """ |
7 | 7 |
|
8 | 8 | import unittest |
| 9 | +from unittest.mock import patch |
9 | 10 | import jwt |
10 | 11 | import botocore |
11 | 12 | import requests |
|
15 | 16 | from botocore.client import Config as BotoConfig |
16 | 17 | from botocore.stub import Stubber, ANY |
17 | 18 | from datetime import datetime, timedelta, timezone |
| 19 | +from os import environ |
18 | 20 |
|
19 | 21 | from staxapp.api import Api |
20 | 22 | from staxapp.auth import StaxAuth, ApiTokenAuth, RootAuth |
@@ -156,7 +158,7 @@ def testCredsClient(self): |
156 | 158 | expected_params=expected_parameters, |
157 | 159 | ) |
158 | 160 | self.cognito_stub.activate() |
159 | | - |
| 161 | + |
160 | 162 | with self.assertRaises(InvalidCredentialsException) as e: |
161 | 163 | sa.sts_from_cognito_identity_pool(jwt_token.get("sub"), cognito_client=self.cognito_client) |
162 | 164 |
|
@@ -305,6 +307,52 @@ def testApiTokenAuth(self): |
305 | 307 | ) |
306 | 308 | self.assertIsNotNone(StaxConfig.auth) |
307 | 309 |
|
| 310 | + |
| 311 | + @patch("test_auth.StaxAuth.requests_auth") |
| 312 | + def testApiTokenAuthExpiring(self, requests_auth_mock): |
| 313 | + """ |
| 314 | + Test credentials close to expiration get refreshed |
| 315 | + """ |
| 316 | + sa = StaxAuth("ApiAuth") |
| 317 | + StaxConfig = Config |
| 318 | + ## expiration 20 minutes in the future, no need to refresh |
| 319 | + StaxConfig.expiration = datetime.now(timezone.utc) + timedelta(minutes=20) |
| 320 | + |
| 321 | + ApiTokenAuth.requests_auth( |
| 322 | + "username", |
| 323 | + "password", |
| 324 | + srp_client=self.aws_srp_client, |
| 325 | + cognito_client=self.cognito_client, |
| 326 | + ) |
| 327 | + requests_auth_mock.assert_not_called() |
| 328 | + |
| 329 | + requests_auth_mock.reset_mock() |
| 330 | + ## expiration in 5 seconds from now, refresh to avoid token becoming stale used |
| 331 | + StaxConfig.expiration = datetime.now(timezone.utc) + timedelta(seconds=5) |
| 332 | + |
| 333 | + ApiTokenAuth.requests_auth( |
| 334 | + "username", |
| 335 | + "password", |
| 336 | + srp_client=self.aws_srp_client, |
| 337 | + cognito_client=self.cognito_client, |
| 338 | + ) |
| 339 | + requests_auth_mock.assert_called_once() |
| 340 | + |
| 341 | + |
| 342 | + requests_auth_mock.reset_mock() |
| 343 | + ## expiration in 5 minutes from now, refresh to avoid token becoming stale used |
| 344 | + ## override default triggering library to not refresh |
| 345 | + environ["TOKEN_EXPIRY_THRESHOLD_IN_MINS"] = "10" |
| 346 | + StaxConfig.expiration = datetime.now(timezone.utc) + timedelta(minutes=2) |
| 347 | + |
| 348 | + ApiTokenAuth.requests_auth( |
| 349 | + "username", |
| 350 | + "password", |
| 351 | + srp_client=self.aws_srp_client, |
| 352 | + cognito_client=self.cognito_client, |
| 353 | + ) |
| 354 | + requests_auth_mock.assert_called_once() |
| 355 | + |
308 | 356 | def testRootAuthNotExpired(self): |
309 | 357 | """ |
310 | 358 | Test credentials have not expired |
@@ -359,5 +407,6 @@ def testApiAuth(self): |
359 | 407 | self.assertIsNotNone(Api._requests_auth) |
360 | 408 |
|
361 | 409 |
|
| 410 | + |
362 | 411 | if __name__ == "__main__": |
363 | 412 | unittest.main() |
0 commit comments