diff --git a/python/semantic_kernel/core_plugins/http_plugin.py b/python/semantic_kernel/core_plugins/http_plugin.py index 8a554410fb44..c461deccbf8c 100644 --- a/python/semantic_kernel/core_plugins/http_plugin.py +++ b/python/semantic_kernel/core_plugins/http_plugin.py @@ -1,7 +1,7 @@ # Copyright (c) Microsoft. All rights reserved. import json -from typing import Annotated, Any +from typing import Annotated, Any, ClassVar from urllib.parse import urlparse import aiohttp @@ -35,6 +35,9 @@ class HttpPlugin(KernelBaseModel): - When ``allow_all_domains`` is True, redirects are allowed regardless of whether ``allowed_domains`` is also set. - Only ``http`` and ``https`` URL schemes are permitted. + - Only standard ports (80, 443) are permitted by default. Set ``allowed_ports`` + to permit additional ports. Port validation is skipped when + ``allow_all_domains`` is True. """ allowed_domains: set[str] | None = None @@ -43,7 +46,16 @@ class HttpPlugin(KernelBaseModel): allow_all_domains: bool = False """When True, requests to any domain are allowed. Must be explicitly set.""" - _ALLOWED_SCHEMES: frozenset[str] = frozenset({"http", "https"}) + allowed_ports: set[int] | None = None + """Set of ports permitted for outbound requests. Defaults to ``{80, 443}`` when not set. + + Ignored when ``allow_all_domains`` is True. Set explicitly to permit non-standard ports + (e.g. ``allowed_ports={443, 8443}``). + """ + + _ALLOWED_SCHEMES: ClassVar[frozenset[str]] = frozenset({"http", "https"}) + _DEFAULT_SCHEME_PORTS: ClassVar[dict[str, int]] = {"http": 80, "https": 443} + _DEFAULT_ALLOWED_PORTS: ClassVar[frozenset[int]] = frozenset({80, 443}) @property def _allow_redirects(self) -> bool: @@ -74,10 +86,25 @@ def _is_uri_allowed(self, url: str) -> bool: if not host: return False - # If allow_all_domains is set, skip domain check + # Validate that the port component is syntactically valid, regardless of + # allow_all_domains. Accessing parsed.port raises ValueError for a malformed + # or out-of-range port. + try: + port = parsed.port + except ValueError: + return False + + # If allow_all_domains is set, skip the domain and port allow-list checks. if self.allow_all_domains: return True + # Enforce the port allow-list (deny-by-default to non-standard ports). + if port is None: + port = self._DEFAULT_SCHEME_PORTS.get(parsed.scheme.lower()) + allowed_ports = self.allowed_ports if self.allowed_ports is not None else self._DEFAULT_ALLOWED_PORTS + if port not in allowed_ports: + return False + # If allowed_domains is set, check against it if self.allowed_domains is not None: return host.lower() in {domain.lower() for domain in self.allowed_domains} @@ -86,14 +113,19 @@ def _is_uri_allowed(self, url: str) -> bool: return False def _validate_url(self, url: str) -> None: - """Validate the URL, checking scheme, emptiness, and allowed domains. + """Validate the URL before sending a request. + + Always checks that the URL is non-empty, uses an allowed scheme, and has a + syntactically valid port. When ``allow_all_domains`` is False, additionally + enforces the port and domain allow-lists. Args: url: The URL to validate. Raises: FunctionExecutionException: If the URL is empty, uses a disallowed scheme, - or targets a domain that is not allowed. + has a malformed port, or (unless ``allow_all_domains`` is True) targets + a port or domain that is not allowed. """ if not url: raise FunctionExecutionException("url cannot be `None` or empty") diff --git a/python/tests/unit/core_plugins/test_http_plugin.py b/python/tests/unit/core_plugins/test_http_plugin.py index 216967ffaee9..c0c8211b1d78 100644 --- a/python/tests/unit/core_plugins/test_http_plugin.py +++ b/python/tests/unit/core_plugins/test_http_plugin.py @@ -195,10 +195,44 @@ async def test_allowed_domains_multiple_domains(): async def test_allowed_domains_with_port(): - """Test that domain matching works with URLs containing ports.""" + """Test that non-standard ports are rejected by default, even on an allowed domain.""" plugin = HttpPlugin(allowed_domains={"example.com"}) - # Port is not part of the host/hostname in urlparse - assert plugin._is_uri_allowed("https://example.com:8080/path") is True + # Standard ports on an allowed domain are permitted. + assert plugin._is_uri_allowed("https://example.com/path") is True + assert plugin._is_uri_allowed("https://example.com:443/path") is True + assert plugin._is_uri_allowed("http://example.com:80/path") is True + # Non-standard ports are rejected even though the host is allowed (SSRF hardening). + assert plugin._is_uri_allowed("https://example.com:8080/path") is False + assert plugin._is_uri_allowed("https://example.com:9200/_cat/indices") is False + assert plugin._is_uri_allowed("https://example.com:6379/") is False + + +async def test_allowed_ports_custom(): + """Test that custom allowed_ports permit additional ports.""" + plugin = HttpPlugin(allowed_domains={"example.com"}, allowed_ports={443, 8443}) + assert plugin._is_uri_allowed("https://example.com:8443/path") is True + assert plugin._is_uri_allowed("https://example.com/path") is True + # A port not in the custom set is still rejected. + assert plugin._is_uri_allowed("https://example.com:9200/path") is False + + +async def test_allow_all_domains_ignores_port(): + """Test that port validation is skipped when allow_all_domains is True.""" + plugin = HttpPlugin(allow_all_domains=True) + assert plugin._is_uri_allowed("https://any-domain.com:9200/path") is True + assert plugin._is_uri_allowed("https://any-domain.com:6379/path") is True + + +async def test_malformed_port_rejected(): + """Test that a malformed/out-of-range port is rejected.""" + plugin = HttpPlugin(allowed_domains={"example.com"}) + assert plugin._is_uri_allowed("https://example.com:99999/path") is False + + +async def test_malformed_port_rejected_with_allow_all_domains(): + """Test that a malformed/out-of-range port is rejected even when allow_all_domains is True.""" + plugin = HttpPlugin(allow_all_domains=True) + assert plugin._is_uri_allowed("https://example.com:99999/path") is False async def test_allowed_domains_subdomain_not_matched():