Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 37 additions & 5 deletions python/semantic_kernel/core_plugins/http_plugin.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright (c) Microsoft. All rights reserved.

import json
from typing import Annotated, Any
from typing import Annotated, Any, ClassVar
from urllib.parse import urlparse

import aiohttp
Expand Down Expand Up @@ -35,6 +35,9 @@ class HttpPlugin(KernelBaseModel):
- When ``allow_all_domains`` is True, redirects are allowed regardless of
whether ``allowed_domains`` is also set.
- Only ``http`` and ``https`` URL schemes are permitted.
- Only standard ports (80, 443) are permitted by default. Set ``allowed_ports``
to permit additional ports. Port validation is skipped when
``allow_all_domains`` is True.
"""

allowed_domains: set[str] | None = None
Expand All @@ -43,7 +46,16 @@ class HttpPlugin(KernelBaseModel):
allow_all_domains: bool = False
"""When True, requests to any domain are allowed. Must be explicitly set."""

_ALLOWED_SCHEMES: frozenset[str] = frozenset({"http", "https"})
allowed_ports: set[int] | None = None
"""Set of ports permitted for outbound requests. Defaults to ``{80, 443}`` when not set.

Ignored when ``allow_all_domains`` is True. Set explicitly to permit non-standard ports
(e.g. ``allowed_ports={443, 8443}``).
"""

_ALLOWED_SCHEMES: ClassVar[frozenset[str]] = frozenset({"http", "https"})
_DEFAULT_SCHEME_PORTS: ClassVar[dict[str, int]] = {"http": 80, "https": 443}
_DEFAULT_ALLOWED_PORTS: ClassVar[frozenset[int]] = frozenset({80, 443})

@property
def _allow_redirects(self) -> bool:
Expand Down Expand Up @@ -74,10 +86,25 @@ def _is_uri_allowed(self, url: str) -> bool:
if not host:
return False

# If allow_all_domains is set, skip domain check
# Validate that the port component is syntactically valid, regardless of
# allow_all_domains. Accessing parsed.port raises ValueError for a malformed
# or out-of-range port.
try:
port = parsed.port
except ValueError:
return False

# If allow_all_domains is set, skip the domain and port allow-list checks.
if self.allow_all_domains:
return True

# Enforce the port allow-list (deny-by-default to non-standard ports).
if port is None:
port = self._DEFAULT_SCHEME_PORTS.get(parsed.scheme.lower())
allowed_ports = self.allowed_ports if self.allowed_ports is not None else self._DEFAULT_ALLOWED_PORTS
if port not in allowed_ports:
return False

# If allowed_domains is set, check against it
if self.allowed_domains is not None:
return host.lower() in {domain.lower() for domain in self.allowed_domains}
Expand All @@ -86,14 +113,19 @@ def _is_uri_allowed(self, url: str) -> bool:
return False

def _validate_url(self, url: str) -> None:
"""Validate the URL, checking scheme, emptiness, and allowed domains.
"""Validate the URL before sending a request.

Always checks that the URL is non-empty, uses an allowed scheme, and has a
syntactically valid port. When ``allow_all_domains`` is False, additionally
enforces the port and domain allow-lists.

Args:
url: The URL to validate.

Raises:
FunctionExecutionException: If the URL is empty, uses a disallowed scheme,
or targets a domain that is not allowed.
has a malformed port, or (unless ``allow_all_domains`` is True) targets
a port or domain that is not allowed.
"""
if not url:
raise FunctionExecutionException("url cannot be `None` or empty")
Expand Down
40 changes: 37 additions & 3 deletions python/tests/unit/core_plugins/test_http_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,44 @@ async def test_allowed_domains_multiple_domains():


async def test_allowed_domains_with_port():
"""Test that domain matching works with URLs containing ports."""
"""Test that non-standard ports are rejected by default, even on an allowed domain."""
plugin = HttpPlugin(allowed_domains={"example.com"})
# Port is not part of the host/hostname in urlparse
assert plugin._is_uri_allowed("https://example.com:8080/path") is True
# Standard ports on an allowed domain are permitted.
assert plugin._is_uri_allowed("https://example.com/path") is True
assert plugin._is_uri_allowed("https://example.com:443/path") is True
assert plugin._is_uri_allowed("http://example.com:80/path") is True
# Non-standard ports are rejected even though the host is allowed (SSRF hardening).
assert plugin._is_uri_allowed("https://example.com:8080/path") is False
assert plugin._is_uri_allowed("https://example.com:9200/_cat/indices") is False
assert plugin._is_uri_allowed("https://example.com:6379/") is False


async def test_allowed_ports_custom():
"""Test that custom allowed_ports permit additional ports."""
plugin = HttpPlugin(allowed_domains={"example.com"}, allowed_ports={443, 8443})
assert plugin._is_uri_allowed("https://example.com:8443/path") is True
assert plugin._is_uri_allowed("https://example.com/path") is True
# A port not in the custom set is still rejected.
assert plugin._is_uri_allowed("https://example.com:9200/path") is False


async def test_allow_all_domains_ignores_port():
"""Test that port validation is skipped when allow_all_domains is True."""
plugin = HttpPlugin(allow_all_domains=True)
assert plugin._is_uri_allowed("https://any-domain.com:9200/path") is True
assert plugin._is_uri_allowed("https://any-domain.com:6379/path") is True


async def test_malformed_port_rejected():
"""Test that a malformed/out-of-range port is rejected."""
plugin = HttpPlugin(allowed_domains={"example.com"})
assert plugin._is_uri_allowed("https://example.com:99999/path") is False


async def test_malformed_port_rejected_with_allow_all_domains():
"""Test that a malformed/out-of-range port is rejected even when allow_all_domains is True."""
plugin = HttpPlugin(allow_all_domains=True)
assert plugin._is_uri_allowed("https://example.com:99999/path") is False


async def test_allowed_domains_subdomain_not_matched():
Expand Down
Loading