Skip to content

Commit 957aa8c

Browse files
Maffoochclaude
andcommitted
Add OSS subscriber for Open Source Messaging banner
Fetches a markdown message from the DaaS-published GCS bucket, renders the bleached headline and optional expanded section through the existing additional_banners template loop. Cached for 1h; any fetch/parse failure silently yields no banner. No Django settings introduced — disabling the banner requires forking. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 9d661d7 commit 957aa8c

6 files changed

Lines changed: 359 additions & 2 deletions

File tree

dojo/announcements/__init__.py

Whitespace-only changes.

dojo/announcements/os_message.py

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
import logging
2+
3+
import bleach
4+
import markdown
5+
import requests
6+
from django.core.cache import cache
7+
8+
logger = logging.getLogger(__name__)
9+
10+
BUCKET_URL = "https://storage.googleapis.com/defectdojo-os-messages-prod/open_source_message.md"
11+
CACHE_SECONDS = 3600
12+
HTTP_TIMEOUT_SECONDS = 2
13+
CACHE_KEY = "os_message:v1"
14+
15+
INLINE_TAGS = ["strong", "em", "a"]
16+
INLINE_ATTRS = {"a": ["href", "title"]}
17+
18+
# Keep BLOCK_TAGS / BLOCK_ATTRS in sync with the DaaS publisher's
19+
# MARKDOWNIFY["default"]["WHITELIST_TAGS"] / WHITELIST_ATTRS so previews
20+
# on DaaS and rendering in OSS stay byte-identical.
21+
BLOCK_TAGS = [
22+
"p", "ul", "ol", "li", "a", "strong", "em", "code", "pre",
23+
"blockquote", "h2", "h3", "h4", "hr", "br", "b", "i",
24+
"abbr", "acronym",
25+
]
26+
BLOCK_ATTRS = {
27+
"a": ["href", "title"],
28+
"abbr": ["title"],
29+
"acronym": ["title"],
30+
}
31+
32+
_MISS = object()
33+
34+
35+
def fetch_os_message():
36+
cached = cache.get(CACHE_KEY, default=_MISS)
37+
if cached is not _MISS:
38+
return cached
39+
40+
try:
41+
response = requests.get(BUCKET_URL, timeout=HTTP_TIMEOUT_SECONDS)
42+
except Exception:
43+
logger.debug("os_message: fetch failed", exc_info=True)
44+
cache.set(CACHE_KEY, None, CACHE_SECONDS)
45+
return None
46+
47+
if response.status_code != 200 or not response.text.strip():
48+
cache.set(CACHE_KEY, None, CACHE_SECONDS)
49+
return None
50+
51+
cache.set(CACHE_KEY, response.text, CACHE_SECONDS)
52+
return response.text
53+
54+
55+
def _strip_outer_p(html):
56+
stripped = html.strip()
57+
if stripped.startswith("<p>") and stripped.endswith("</p>"):
58+
return stripped[3:-4]
59+
return stripped
60+
61+
62+
def parse_os_message(text):
63+
lines = text.splitlines()
64+
65+
headline_source = None
66+
body_start = None
67+
for index, line in enumerate(lines):
68+
if line.startswith("# "):
69+
headline_source = line[2:].strip()
70+
body_start = index + 1
71+
break
72+
73+
if not headline_source:
74+
return None
75+
76+
headline_source = headline_source[:100]
77+
headline_rendered = markdown.markdown(headline_source)
78+
headline_cleaned = bleach.clean(
79+
headline_rendered,
80+
tags=INLINE_TAGS,
81+
attributes=INLINE_ATTRS,
82+
strip=True,
83+
)
84+
headline_html = _strip_outer_p(headline_cleaned)
85+
86+
expanded_html = None
87+
expanded_marker = "## Expanded Message"
88+
expanded_body_lines = None
89+
for offset, line in enumerate(lines[body_start:], start=body_start):
90+
if line.strip() == expanded_marker:
91+
expanded_body_lines = lines[offset + 1:]
92+
break
93+
94+
if expanded_body_lines is not None:
95+
expanded_source = "\n".join(expanded_body_lines).strip()
96+
if expanded_source:
97+
expanded_rendered = markdown.markdown(
98+
expanded_source,
99+
extensions=["extra", "fenced_code"],
100+
)
101+
expanded_html = bleach.clean(
102+
expanded_rendered,
103+
tags=BLOCK_TAGS,
104+
attributes=BLOCK_ATTRS,
105+
strip=True,
106+
)
107+
108+
return {"message": headline_html, "expanded_html": expanded_html}
109+
110+
111+
def get_os_banner():
112+
try:
113+
text = fetch_os_message()
114+
if not text:
115+
return None
116+
return parse_os_message(text)
117+
except Exception:
118+
logger.debug("os_message: get_os_banner failed", exc_info=True)
119+
return None

dojo/context_processors.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,14 @@
55
from django.conf import settings
66
from django.contrib import messages
77

8+
from dojo.announcements.os_message import get_os_banner
89
from dojo.labels import get_labels
910
from dojo.models import Alerts, System_Settings, UserAnnouncement
1011

1112

1213
def globalize_vars(request):
1314
# return the value you want as a dictionnary. you may add multiple values in there.
14-
return {
15+
context = {
1516
"SHOW_LOGIN_FORM": settings.SHOW_LOGIN_FORM,
1617
"FORGOT_PASSWORD": settings.FORGOT_PASSWORD,
1718
"FORGOT_USERNAME": settings.FORGOT_USERNAME,
@@ -39,6 +40,16 @@ def globalize_vars(request):
3940
# V3 Feature Flags
4041
"V3_FEATURE_LOCATIONS": settings.V3_FEATURE_LOCATIONS,
4142
}
43+
os_banner = get_os_banner()
44+
if os_banner is not None:
45+
context["additional_banners"] = [{
46+
"message": os_banner["message"],
47+
"style": "info",
48+
"url": "",
49+
"link_text": "",
50+
"expanded_html": os_banner["expanded_html"],
51+
}]
52+
return context
4253

4354

4455
def bind_system_settings(request):

dojo/static/dojo/css/dojo.css

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1126,6 +1126,25 @@ div.custom-search-form {
11261126
border-radius: 0px 0px 4px 4px;
11271127
}
11281128

1129+
.os-banner-toggle {
1130+
margin-left: 6px;
1131+
cursor: pointer;
1132+
color: inherit;
1133+
text-decoration: none;
1134+
}
1135+
1136+
.os-banner-toggle .fa-caret-down {
1137+
transition: transform 0.15s ease-in-out;
1138+
}
1139+
1140+
.os-banner-toggle:not(.collapsed) .fa-caret-down {
1141+
transform: rotate(180deg);
1142+
}
1143+
1144+
.os-banner-expanded {
1145+
margin-top: 8px;
1146+
}
1147+
11291148
@media (min-width: 795px) {
11301149
div.custom-search-form {
11311150
height: 21px;

dojo/templates/base.html

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -672,7 +672,17 @@
672672
{% endif %}
673673
{% for banner in additional_banners %}
674674
<div role="alert" class="announcement-banner alert alert-{{ banner.style }} show">
675-
{{ banner.message }} <a href="{{ banner.url }}">{{ banner.link_text }}</a>
675+
{{ banner.message|safe }}{% if banner.url %} <a href="{{ banner.url }}">{{ banner.link_text }}</a>{% endif %}
676+
{% if banner.expanded_html %}
677+
<a role="button" data-toggle="collapse" href="#os-banner-expanded-{{ forloop.counter }}"
678+
aria-expanded="false" aria-controls="os-banner-expanded-{{ forloop.counter }}"
679+
class="os-banner-toggle collapsed">
680+
<i class="fa-solid fa-caret-down"></i>
681+
</a>
682+
<div id="os-banner-expanded-{{ forloop.counter }}" class="collapse os-banner-expanded">
683+
{{ banner.expanded_html|safe }}
684+
</div>
685+
{% endif %}
676686
</div>
677687
{% endfor %}
678688
<div class="container-fluid">

unittests/test_os_message.py

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
from unittest.mock import patch
2+
3+
import requests
4+
from django.core.cache import cache
5+
from django.template import Context, Template
6+
from django.test import RequestFactory, SimpleTestCase, override_settings
7+
8+
from dojo import context_processors
9+
from dojo.announcements import os_message
10+
11+
12+
class _Resp:
13+
def __init__(self, status_code=200, text=""):
14+
self.status_code = status_code
15+
self.text = text
16+
17+
18+
@override_settings(CACHES={"default": {"BACKEND": "django.core.cache.backends.locmem.LocMemCache"}})
19+
class TestParseOsMessage(SimpleTestCase):
20+
21+
def setUp(self):
22+
cache.clear()
23+
24+
def test_valid_doc_with_expanded(self):
25+
text = (
26+
"# DefectDojo v3.0 is available\n"
27+
"\n"
28+
"## Expanded Message\n"
29+
"\n"
30+
"- Major feature A\n"
31+
"- Major feature B\n"
32+
)
33+
result = os_message.parse_os_message(text)
34+
self.assertEqual(result["message"], "DefectDojo v3.0 is available")
35+
self.assertIn("<li>Major feature A</li>", result["expanded_html"])
36+
self.assertIn("<li>Major feature B</li>", result["expanded_html"])
37+
38+
def test_missing_headline_returns_none(self):
39+
text = "No headline here\n## Expanded Message\nbody\n"
40+
self.assertIsNone(os_message.parse_os_message(text))
41+
42+
def test_headline_inline_markdown(self):
43+
text = "# Read the **release notes** at [link](https://example.com)\n"
44+
result = os_message.parse_os_message(text)
45+
self.assertIn("<strong>release notes</strong>", result["message"])
46+
self.assertIn('<a href="https://example.com">link</a>', result["message"])
47+
self.assertIsNone(result["expanded_html"])
48+
49+
def test_headline_strips_disallowed_html(self):
50+
text = "# Headline <script>alert(1)</script> tail\n"
51+
result = os_message.parse_os_message(text)
52+
self.assertNotIn("<script", result["message"])
53+
self.assertNotIn("</script>", result["message"])
54+
self.assertIn("Headline", result["message"])
55+
56+
def test_missing_expanded_section(self):
57+
text = "# Just a headline\n"
58+
result = os_message.parse_os_message(text)
59+
self.assertEqual(result["message"], "Just a headline")
60+
self.assertIsNone(result["expanded_html"])
61+
62+
def test_expanded_with_fenced_code(self):
63+
text = (
64+
"# Headline\n"
65+
"## Expanded Message\n"
66+
"```python\n"
67+
"print('hi')\n"
68+
"```\n"
69+
)
70+
result = os_message.parse_os_message(text)
71+
self.assertIn("<pre>", result["expanded_html"])
72+
self.assertIn("<code>", result["expanded_html"])
73+
self.assertIn("print('hi')", result["expanded_html"])
74+
75+
def test_expanded_strips_script_tag(self):
76+
text = (
77+
"# Headline\n"
78+
"## Expanded Message\n"
79+
"<script>alert(1)</script>\n"
80+
"Body paragraph\n"
81+
)
82+
result = os_message.parse_os_message(text)
83+
self.assertNotIn("<script", result["expanded_html"])
84+
self.assertNotIn("</script>", result["expanded_html"])
85+
self.assertIn("Body paragraph", result["expanded_html"])
86+
87+
def test_headline_outer_p_is_stripped(self):
88+
text = "# Plain headline\n"
89+
result = os_message.parse_os_message(text)
90+
self.assertFalse(result["message"].startswith("<p>"))
91+
self.assertFalse(result["message"].endswith("</p>"))
92+
93+
def test_headline_truncated_to_100_chars(self):
94+
long_headline = "x" * 200
95+
text = f"# {long_headline}\n"
96+
result = os_message.parse_os_message(text)
97+
self.assertLessEqual(len(result["message"]), 100)
98+
99+
100+
@override_settings(CACHES={"default": {"BACKEND": "django.core.cache.backends.locmem.LocMemCache"}})
101+
class TestFetchOsMessage(SimpleTestCase):
102+
103+
def setUp(self):
104+
cache.clear()
105+
106+
def test_200_with_body_caches_body(self):
107+
body = "# headline\n"
108+
with patch("dojo.announcements.os_message.requests.get", return_value=_Resp(200, body)) as mock_get:
109+
result = os_message.fetch_os_message()
110+
self.assertEqual(result, body)
111+
self.assertEqual(cache.get(os_message.CACHE_KEY), body)
112+
mock_get.assert_called_once()
113+
114+
def test_404_caches_none(self):
115+
with patch("dojo.announcements.os_message.requests.get", return_value=_Resp(404, "not found")):
116+
result = os_message.fetch_os_message()
117+
self.assertIsNone(result)
118+
self.assertIsNone(cache.get(os_message.CACHE_KEY, default="sentinel"))
119+
120+
def test_timeout_caches_none(self):
121+
with patch("dojo.announcements.os_message.requests.get", side_effect=requests.exceptions.Timeout):
122+
result = os_message.fetch_os_message()
123+
self.assertIsNone(result)
124+
self.assertIsNone(cache.get(os_message.CACHE_KEY, default="sentinel"))
125+
126+
def test_connection_error_caches_none(self):
127+
with patch("dojo.announcements.os_message.requests.get", side_effect=requests.exceptions.ConnectionError):
128+
result = os_message.fetch_os_message()
129+
self.assertIsNone(result)
130+
self.assertIsNone(cache.get(os_message.CACHE_KEY, default="sentinel"))
131+
132+
def test_empty_body_caches_none(self):
133+
with patch("dojo.announcements.os_message.requests.get", return_value=_Resp(200, " \n\n")):
134+
result = os_message.fetch_os_message()
135+
self.assertIsNone(result)
136+
self.assertIsNone(cache.get(os_message.CACHE_KEY, default="sentinel"))
137+
138+
def test_second_call_hits_cache(self):
139+
with patch("dojo.announcements.os_message.requests.get", return_value=_Resp(200, "# h\n")) as mock_get:
140+
os_message.fetch_os_message()
141+
os_message.fetch_os_message()
142+
self.assertEqual(mock_get.call_count, 1)
143+
144+
def test_second_call_after_failure_also_hits_cache(self):
145+
with patch("dojo.announcements.os_message.requests.get", side_effect=requests.exceptions.Timeout) as mock_get:
146+
os_message.fetch_os_message()
147+
os_message.fetch_os_message()
148+
self.assertEqual(mock_get.call_count, 1)
149+
150+
151+
@override_settings(CACHES={"default": {"BACKEND": "django.core.cache.backends.locmem.LocMemCache"}})
152+
class TestGetOsBanner(SimpleTestCase):
153+
154+
def setUp(self):
155+
cache.clear()
156+
157+
def test_returns_none_when_fetch_returns_none(self):
158+
with patch("dojo.announcements.os_message.fetch_os_message", return_value=None):
159+
self.assertIsNone(os_message.get_os_banner())
160+
161+
def test_swallows_parse_exception(self):
162+
with patch("dojo.announcements.os_message.fetch_os_message", return_value="# ok\n"), \
163+
patch("dojo.announcements.os_message.parse_os_message", side_effect=RuntimeError("boom")):
164+
self.assertIsNone(os_message.get_os_banner())
165+
166+
167+
@override_settings(CACHES={"default": {"BACKEND": "django.core.cache.backends.locmem.LocMemCache"}})
168+
class TestGlobalizeVarsOsBanner(SimpleTestCase):
169+
170+
def setUp(self):
171+
cache.clear()
172+
self.request = RequestFactory().get("/")
173+
174+
def test_additional_banners_populated_when_banner_present(self):
175+
banner = {"message": "<strong>Hi</strong>", "expanded_html": "<p>body</p>"}
176+
with patch.object(context_processors, "get_os_banner", return_value=banner):
177+
result = context_processors.globalize_vars(self.request)
178+
self.assertIn("additional_banners", result)
179+
entry = result["additional_banners"][0]
180+
self.assertEqual(entry["message"], "<strong>Hi</strong>")
181+
self.assertEqual(entry["expanded_html"], "<p>body</p>")
182+
self.assertEqual(entry["style"], "info")
183+
self.assertEqual(entry["url"], "")
184+
self.assertEqual(entry["link_text"], "")
185+
186+
def test_additional_banners_absent_when_no_banner(self):
187+
with patch.object(context_processors, "get_os_banner", return_value=None):
188+
result = context_processors.globalize_vars(self.request)
189+
self.assertNotIn("additional_banners", result)
190+
191+
def test_template_renders_bleached_message(self):
192+
banner = {"message": "<strong>Hi</strong>", "expanded_html": None}
193+
with patch.object(context_processors, "get_os_banner", return_value=banner):
194+
ctx = context_processors.globalize_vars(self.request)
195+
rendered = Template(
196+
"{% for b in additional_banners %}{{ b.message|safe }}{% endfor %}",
197+
).render(Context(ctx))
198+
self.assertIn("<strong>Hi</strong>", rendered)

0 commit comments

Comments
 (0)