diff --git a/dojo/api_v2/serializers.py b/dojo/api_v2/serializers.py index 1eeb021d165..61114cf7c32 100644 --- a/dojo/api_v2/serializers.py +++ b/dojo/api_v2/serializers.py @@ -1760,9 +1760,7 @@ class FindingSerializer(serializers.ModelSerializer): mitigated_by = serializers.PrimaryKeyRelatedField(required=False, allow_null=True, queryset=User.objects.all()) tags = TagListSerializerField(required=False) request_response = serializers.SerializerMethodField() - accepted_risks = RiskAcceptanceSerializer( - many=True, read_only=True, source="risk_acceptance_set", - ) + accepted_risks = serializers.SerializerMethodField() push_to_jira = serializers.BooleanField(default=False) found_by = serializers.PrimaryKeyRelatedField( queryset=Test_Type.objects.all(), many=True, @@ -1806,6 +1804,17 @@ def __init__(self, *args, **kwargs): many=True, required=False, queryset=Endpoint.objects.all(), ) + @extend_schema_field(RiskAcceptanceSerializer(many=True)) + def get_accepted_risks(self, obj): + request = self.context.get("request") + if request is None: + return [] + if not user_has_permission(request.user, obj, Permissions.Risk_Acceptance): + return [] + return RiskAcceptanceSerializer( + obj.risk_acceptance_set.all(), many=True, + ).data + @extend_schema_field(serializers.DateTimeField()) def get_jira_creation(self, obj): return jira_helper.get_jira_creation(obj) diff --git a/dojo/api_v2/views.py b/dojo/api_v2/views.py index 3461e54b25a..c106d667e77 100644 --- a/dojo/api_v2/views.py +++ b/dojo/api_v2/views.py @@ -45,6 +45,7 @@ serializers, ) from dojo.api_v2.prefetch.prefetcher import _Prefetcher +from dojo.authorization.authorization import user_has_permission_or_403 from dojo.authorization.roles_permissions import Permissions from dojo.celery_dispatch import dojo_dispatch_task from dojo.cred.queries import get_authorized_cred_mappings @@ -351,7 +352,11 @@ def get_queryset(self): responses={status.HTTP_200_OK: serializers.ReportGenerateSerializer}, ) @action( - detail=True, methods=["post"], permission_classes=[IsAuthenticated], + detail=True, methods=["post"], + # IsAuthenticated only: report generation requires View permission, + # enforced by the permission-filtered get_queryset(). The viewset's + # permission_classes would check Edit (POST), which is too restrictive. + permission_classes=[IsAuthenticated], ) def generate_report(self, request, pk=None): endpoint = self.get_object() @@ -475,7 +480,11 @@ def reopen(self, request, pk=None): responses={status.HTTP_200_OK: serializers.ReportGenerateSerializer}, ) @action( - detail=True, methods=["post"], permission_classes=[IsAuthenticated], + detail=True, methods=["post"], + # IsAuthenticated only: report generation requires View permission, + # enforced by the permission-filtered get_queryset(). The viewset's + # permission_classes would check Edit (POST), which is too restrictive. + permission_classes=[IsAuthenticated], ) def generate_report(self, request, pk=None): engagement = self.get_object() @@ -691,7 +700,8 @@ def download_file(self, request, file_id, pk=None): responses={status.HTTP_200_OK: serializers.EngagementUpdateJiraEpicSerializer}, ) @action( - detail=True, methods=["post"], permission_classes=[IsAuthenticated], + detail=True, methods=["post"], + permission_classes=(IsAuthenticated, permissions.UserHasEngagementRelatedObjectPermission), ) def update_jira_epic(self, request, pk=None): engagement = self.get_object() @@ -1383,7 +1393,11 @@ def set_finding_as_original(self, request, pk, new_fid): responses={status.HTTP_200_OK: serializers.ReportGenerateSerializer}, ) @action( - detail=False, methods=["post"], permission_classes=[IsAuthenticated], + detail=False, methods=["post"], + # IsAuthenticated only: report generation requires View permission, + # enforced by the permission-filtered get_queryset(). The viewset's + # permission_classes would check Edit (POST), which is too restrictive. + permission_classes=[IsAuthenticated], ) def generate_report(self, request): findings = self.get_queryset() @@ -1728,38 +1742,55 @@ def batch(self, request, pk=None): serialized_data = serializers.MetaMainSerializer(data=request.data) if serialized_data.is_valid(raise_exception=True): if request.method == "POST": - self.process_post(request.data) + self.process_post(request) status_code = status.HTTP_201_CREATED if request.method == "PATCH": - self.process_patch(request.data) + self.process_patch(request) status_code = status.HTTP_200_OK return Response(status=status_code, data=serialized_data.data) - def process_post(self: object, data: dict): - product = Product.objects.filter(id=data.get("product")).first() - finding = Finding.objects.filter(id=data.get("finding")).first() - endpoint = Endpoint.objects.filter(id=data.get("endpoint")).first() + def _fetch_and_authorize_parents(self, request, permission_map): + """Fetch parent objects and verify the user has the required permissions.""" + data = request.data + parents = {} + for field, (model, permission) in permission_map.items(): + obj = model.objects.filter(id=data.get(field)).first() + if obj: + user_has_permission_or_403(request.user, obj, permission) + parents[field] = obj + return parents + + def process_post(self, request): + data = request.data + parents = self._fetch_and_authorize_parents(request, { + "product": (Product, Permissions.Product_Edit), + "finding": (Finding, Permissions.Finding_Edit), + "endpoint": (Endpoint, Permissions.Location_Edit), + }) metalist = data.get("metadata") for metadata in metalist: try: DojoMeta.objects.create( - product=product, - finding=finding, - endpoint=endpoint, + product=parents["product"], + finding=parents["finding"], + endpoint=parents["endpoint"], name=metadata.get("name"), value=metadata.get("value"), ) except (IntegrityError) as ex: # this should not happen as the data was validated in the batch call raise ValidationError(str(ex)) - def process_patch(self: object, data: dict): - product = Product.objects.filter(id=data.get("product")).first() - finding = Finding.objects.filter(id=data.get("finding")).first() - endpoint = Endpoint.objects.filter(id=data.get("endpoint")).first() + def process_patch(self, request): + data = request.data + parents = self._fetch_and_authorize_parents(request, { + "product": (Product, Permissions.Product_Edit), + "finding": (Finding, Permissions.Finding_Edit), + "endpoint": (Endpoint, Permissions.Location_Edit), + }) metalist = data.get("metadata") for metadata in metalist: - dojometa = DojoMeta.objects.filter(product=product, finding=finding, endpoint=endpoint, name=metadata.get("name")) + dojometa = DojoMeta.objects.filter(product=parents["product"], finding=parents["finding"], endpoint=parents["endpoint"], name=metadata.get("name")) if dojometa: try: dojometa.update( @@ -1815,7 +1846,11 @@ def destroy(self, request, *args, **kwargs): responses={status.HTTP_200_OK: serializers.ReportGenerateSerializer}, ) @action( - detail=True, methods=["post"], permission_classes=[IsAuthenticated], + detail=True, methods=["post"], + # IsAuthenticated only: report generation requires View permission, + # enforced by the permission-filtered get_queryset(). The viewset's + # permission_classes would check Edit (POST), which is too restrictive. + permission_classes=[IsAuthenticated], ) def generate_report(self, request, pk=None): product = self.get_object() @@ -1956,7 +1991,11 @@ def destroy(self, request, *args, **kwargs): responses={status.HTTP_200_OK: serializers.ReportGenerateSerializer}, ) @action( - detail=True, methods=["post"], permission_classes=[IsAuthenticated], + detail=True, methods=["post"], + # IsAuthenticated only: report generation requires View permission, + # enforced by the permission-filtered get_queryset(). The viewset's + # permission_classes would check Edit (POST), which is too restrictive. + permission_classes=[IsAuthenticated], ) def generate_report(self, request, pk=None): product_type = self.get_object() @@ -2143,7 +2182,11 @@ def get_serializer_class(self): responses={status.HTTP_200_OK: serializers.ReportGenerateSerializer}, ) @action( - detail=True, methods=["post"], permission_classes=[IsAuthenticated], + detail=True, methods=["post"], + # IsAuthenticated only: report generation requires View permission, + # enforced by the permission-filtered get_queryset(). The viewset's + # permission_classes would check Edit (POST), which is too restrictive. + permission_classes=[IsAuthenticated], ) def generate_report(self, request, pk=None): test = self.get_object() diff --git a/dojo/benchmark/views.py b/dojo/benchmark/views.py index 00b331b8421..4577854ac5e 100644 --- a/dojo/benchmark/views.py +++ b/dojo/benchmark/views.py @@ -46,6 +46,8 @@ def update_benchmark(request, pid, _type): field = request.POST.get("field") value = request.POST.get("value") value = {"true": True, "false": False}.get(value, value) + product = get_object_or_404(Product, id=pid) + bench = get_object_or_404(Benchmark_Product.objects.filter(product=product), id=bench_id) if field in { "enabled", @@ -54,7 +56,6 @@ def update_benchmark(request, pid, _type): "get_notes", "delete_notes", }: - bench = Benchmark_Product.objects.get(id=bench_id) if field == "enabled": bench.enabled = value elif field == "pass_fail": @@ -90,21 +91,22 @@ def update_benchmark(request, pid, _type): @user_is_authorized(Product, Permissions.Benchmark_Edit, "pid") def update_benchmark_summary(request, pid, _type, summary): if request.method == "POST": + product = get_object_or_404(Product, id=pid) + benchmark_summary = get_object_or_404(Benchmark_Product_Summary.objects.filter(product=product), id=summary) field = request.POST.get("field") value = request.POST.get("value") value = {"true": True, "false": False}.get(value, value) if field in {"publish", "desired_level"}: - summary = Benchmark_Product_Summary.objects.get(id=summary) data = {} if field == "publish": - summary.publish = value + benchmark_summary.publish = value data = {"publish": value} elif field == "desired_level": - summary.desired_level = value - data = {"desired_level": value, "text": asvs_level(summary)} + benchmark_summary.desired_level = value + data = {"desired_level": value, "text": asvs_level(benchmark_summary)} - summary.save() + benchmark_summary.save() return JsonResponse(data) return redirect_to_return_url_or_else( diff --git a/dojo/cred/views.py b/dojo/cred/views.py index 4feaddb73b6..ac6a47f2ae3 100644 --- a/dojo/cred/views.py +++ b/dojo/cred/views.py @@ -47,7 +47,7 @@ def all_cred_product(request, pid): return render(request, "dojo/view_cred_prod.html", {"product_tab": product_tab, "creds": creds, "prod": prod}) -@user_is_authorized(Cred_User, Permissions.Credential_Edit, "ttid") +@user_is_configuration_authorized(Permissions.Credential_Edit) def edit_cred(request, ttid): tool_config = Cred_User.objects.get(pk=ttid) if request.method == "POST": @@ -79,7 +79,7 @@ def edit_cred(request, ttid): }) -@user_is_authorized(Cred_User, Permissions.Credential_View, "ttid") +@user_is_configuration_authorized(Permissions.Credential_View) def view_cred_details(request, ttid): cred = Cred_User.objects.get(pk=ttid) notes = cred.notes.all() @@ -127,7 +127,7 @@ def cred(request): @user_is_authorized(Product, Permissions.Product_View, "pid") -@user_is_authorized(Cred_User, Permissions.Credential_View, "ttid") +@user_is_authorized(Cred_Mapping, Permissions.Credential_View, "ttid") def view_cred_product(request, pid, ttid): cred = get_object_or_404( Cred_Mapping.objects.select_related("cred_id"), id=ttid) @@ -182,8 +182,8 @@ def view_cred_product(request, pid, ttid): }) -@user_is_authorized(Product, Permissions.Engagement_View, "eid") -@user_is_authorized(Cred_User, Permissions.Credential_View, "ttid") +@user_is_authorized(Engagement, Permissions.Engagement_View, "eid") +@user_is_authorized(Cred_Mapping, Permissions.Credential_View, "ttid") def view_cred_product_engagement(request, eid, ttid): cred = get_object_or_404( Cred_Mapping.objects.select_related("cred_id"), id=ttid) @@ -231,8 +231,8 @@ def view_cred_product_engagement(request, eid, ttid): }) -@user_is_authorized(Product, Permissions.Test_View, "tid") -@user_is_authorized(Cred_User, Permissions.Credential_View, "ttid") +@user_is_authorized(Test, Permissions.Test_View, "tid") +@user_is_authorized(Cred_Mapping, Permissions.Credential_View, "ttid") def view_cred_engagement_test(request, tid, ttid): cred = get_object_or_404( Cred_Mapping.objects.select_related("cred_id"), id=ttid) @@ -282,8 +282,8 @@ def view_cred_engagement_test(request, tid, ttid): }) -@user_is_authorized(Product, Permissions.Finding_View, "fid") -@user_is_authorized(Cred_User, Permissions.Credential_View, "ttid") +@user_is_authorized(Finding, Permissions.Finding_View, "fid") +@user_is_authorized(Cred_Mapping, Permissions.Credential_View, "ttid") def view_cred_finding(request, fid, ttid): cred = get_object_or_404( Cred_Mapping.objects.select_related("cred_id"), id=ttid) @@ -334,7 +334,7 @@ def view_cred_finding(request, fid, ttid): @user_is_authorized(Product, Permissions.Product_Edit, "pid") -@user_is_authorized(Cred_User, Permissions.Credential_Edit, "ttid") +@user_is_authorized(Cred_Mapping, Permissions.Credential_Edit, "ttid") def edit_cred_product(request, pid, ttid): cred = get_object_or_404( Cred_Mapping.objects.select_related("cred_id"), id=ttid) @@ -362,7 +362,7 @@ def edit_cred_product(request, pid, ttid): @user_is_authorized(Engagement, Permissions.Engagement_Edit, "eid") -@user_is_authorized(Cred_User, Permissions.Credential_Edit, "ttid") +@user_is_authorized(Cred_Mapping, Permissions.Credential_Edit, "ttid") def edit_cred_product_engagement(request, eid, ttid): cred = get_object_or_404( Cred_Mapping.objects.select_related("cred_id"), id=ttid) @@ -582,7 +582,6 @@ def new_cred_finding(request, fid): }) -@user_is_authorized(Cred_User, Permissions.Credential_Delete, "ttid") def delete_cred_controller(request, destination_url, elem_id, ttid): cred = Cred_Mapping.objects.filter(pk=ttid).first() if request.method == "POST": @@ -662,30 +661,30 @@ def delete_cred_controller(request, destination_url, elem_id, ttid): }) -@user_is_authorized(Cred_User, Permissions.Credential_Delete, "ttid") +@user_is_configuration_authorized(Permissions.Credential_Delete) def delete_cred(request, ttid): return delete_cred_controller(request, "cred", 0, ttid=ttid) @user_is_authorized(Product, Permissions.Product_Edit, "pid") -@user_is_authorized(Cred_User, Permissions.Credential_Delete, "ttid") +@user_is_authorized(Cred_Mapping, Permissions.Credential_Delete, "ttid") def delete_cred_product(request, pid, ttid): return delete_cred_controller(request, "all_cred_product", pid, ttid) @user_is_authorized(Engagement, Permissions.Engagement_Edit, "eid") -@user_is_authorized(Cred_User, Permissions.Credential_Delete, "ttid") +@user_is_authorized(Cred_Mapping, Permissions.Credential_Delete, "ttid") def delete_cred_engagement(request, eid, ttid): return delete_cred_controller(request, "view_engagement", eid, ttid) @user_is_authorized(Test, Permissions.Test_Edit, "tid") -@user_is_authorized(Cred_User, Permissions.Credential_Delete, "ttid") +@user_is_authorized(Cred_Mapping, Permissions.Credential_Delete, "ttid") def delete_cred_test(request, tid, ttid): return delete_cred_controller(request, "view_test", tid, ttid) @user_is_authorized(Finding, Permissions.Finding_Edit, "fid") -@user_is_authorized(Cred_User, Permissions.Credential_Delete, "ttid") +@user_is_authorized(Cred_Mapping, Permissions.Credential_Delete, "ttid") def delete_cred_finding(request, fid, ttid): return delete_cred_controller(request, "view_finding", fid, ttid) diff --git a/dojo/engagement/views.py b/dojo/engagement/views.py index fbe0ca6b496..3d03319a4cd 100644 --- a/dojo/engagement/views.py +++ b/dojo/engagement/views.py @@ -1377,7 +1377,7 @@ def edit_risk_acceptance(request, eid, raid): # will only be called by view_risk_acceptance and edit_risk_acceptance def view_edit_risk_acceptance(request, eid, raid, *, edit_mode=False): risk_acceptance = get_object_or_404(Risk_Acceptance, pk=raid) - eng = get_object_or_404(Engagement, pk=eid) + eng = get_object_or_404(Engagement.objects.filter(risk_acceptance=risk_acceptance), pk=eid) if edit_mode and not eng.product.enable_full_risk_acceptance: raise PermissionDenied @@ -1537,8 +1537,7 @@ def view_edit_risk_acceptance(request, eid, raid, *, edit_mode=False): @user_is_authorized(Engagement, Permissions.Risk_Acceptance, "eid") def expire_risk_acceptance(request, eid, raid): risk_acceptance = get_object_or_404(prefetch_for_expiration(Risk_Acceptance.objects.all()), pk=raid) - # Validate the engagement ID exists before moving forward - get_object_or_404(Engagement, pk=eid) + get_object_or_404(Engagement.objects.filter(risk_acceptance=risk_acceptance), pk=eid) ra_helper.expire_now(risk_acceptance) @@ -1548,7 +1547,7 @@ def expire_risk_acceptance(request, eid, raid): @user_is_authorized(Engagement, Permissions.Risk_Acceptance, "eid") def reinstate_risk_acceptance(request, eid, raid): risk_acceptance = get_object_or_404(prefetch_for_expiration(Risk_Acceptance.objects.all()), pk=raid) - eng = get_object_or_404(Engagement, pk=eid) + eng = get_object_or_404(Engagement.objects.filter(risk_acceptance=risk_acceptance), pk=eid) if not eng.product.enable_full_risk_acceptance: raise PermissionDenied @@ -1561,7 +1560,7 @@ def reinstate_risk_acceptance(request, eid, raid): @user_is_authorized(Engagement, Permissions.Risk_Acceptance, "eid") def delete_risk_acceptance(request, eid, raid): risk_acceptance = get_object_or_404(Risk_Acceptance, pk=raid) - eng = get_object_or_404(Engagement, pk=eid) + eng = get_object_or_404(Engagement.objects.filter(risk_acceptance=risk_acceptance), pk=eid) ra_helper.delete(eng, risk_acceptance) diff --git a/dojo/finding/views.py b/dojo/finding/views.py index 932db5d1b2d..8b1fe39cd77 100644 --- a/dojo/finding/views.py +++ b/dojo/finding/views.py @@ -32,7 +32,7 @@ import dojo.finding.helper as finding_helper import dojo.jira_link.helper as jira_helper import dojo.risk_acceptance.helper as ra_helper -from dojo.authorization.authorization import user_has_permission_or_403 +from dojo.authorization.authorization import user_has_global_permission_or_403, user_has_permission_or_403 from dojo.authorization.authorization_decorators import ( user_has_global_permission, user_is_authorized, @@ -1732,6 +1732,9 @@ def mktemplate(request, fid): @user_is_authorized(Finding, Permissions.Finding_Edit, "fid") def find_template_to_apply(request, fid): + # Templates may contain sensitive data from any product; require global permission + # to match the authorization level of the /template list view + user_has_global_permission_or_403(request.user, Permissions.Finding_Edit) finding = get_object_or_404(Finding, id=fid) test = get_object_or_404(Test, id=finding.test.id) templates_by_cve = ( diff --git a/dojo/object/views.py b/dojo/object/views.py index 96616c556a8..9bd443d27dd 100644 --- a/dojo/object/views.py +++ b/dojo/object/views.py @@ -1,7 +1,7 @@ import logging from django.contrib import messages -from django.core.exceptions import BadRequest +from django.core.exceptions import PermissionDenied from django.http import HttpResponseRedirect from django.shortcuts import get_object_or_404, render from django.urls import reverse @@ -63,12 +63,10 @@ def view_objects(request, pid): @user_is_authorized(Product, Permissions.Product_Tracking_Files_Edit, "pid") def edit_object(request, pid, ttid): - object_prod = Objects_Product.objects.get(pk=ttid) + object_prod = get_object_or_404(Objects_Product, pk=ttid) product = get_object_or_404(Product, id=pid) if object_prod.product != product: - msg = labels.ASSET_TRACKED_FILES_ID_MISMATCH_ERROR_MESSAGE % {"asset_id": pid, - "object_asset_id": object_prod.product.id} - raise BadRequest(msg) + raise PermissionDenied if request.method == "POST": tform = ObjectSettingsForm(request.POST, instance=object_prod) @@ -94,12 +92,10 @@ def edit_object(request, pid, ttid): @user_is_authorized(Product, Permissions.Product_Tracking_Files_Delete, "pid") def delete_object(request, pid, ttid): - object_prod = Objects_Product.objects.get(pk=ttid) + object_prod = get_object_or_404(Objects_Product, pk=ttid) product = get_object_or_404(Product, id=pid) if object_prod.product != product: - msg = labels.ASSET_TRACKED_FILES_ID_MISMATCH_ERROR_MESSAGE % {"asset_id": pid, - "object_asset_id": object_prod.product.id} - raise BadRequest(msg) + raise PermissionDenied if request.method == "POST": tform = ObjectSettingsForm(request.POST, instance=object_prod) diff --git a/dojo/product/views.py b/dojo/product/views.py index 18d29d91616..07c4a50b66e 100644 --- a/dojo/product/views.py +++ b/dojo/product/views.py @@ -1597,7 +1597,7 @@ def engagement_presets(request, pid): @user_is_authorized(Product, Permissions.Product_Edit, "pid") def edit_engagement_presets(request, pid, eid): prod = get_object_or_404(Product, id=pid) - preset = get_object_or_404(Engagement_Presets, id=eid) + preset = get_object_or_404(Engagement_Presets.objects.filter(product=prod), id=eid) product_tab = Product_Tab(prod, title=_("Edit Engagement Preset"), tab="settings") @@ -1646,7 +1646,7 @@ def add_engagement_presets(request, pid): @user_is_authorized(Product, Permissions.Product_Edit, "pid") def delete_engagement_presets(request, pid, eid): prod = get_object_or_404(Product, id=pid) - preset = get_object_or_404(Engagement_Presets, id=eid) + preset = get_object_or_404(Engagement_Presets.objects.filter(product=prod), id=eid) form = DeleteEngagementPresetsForm(instance=preset) if request.method == "POST": diff --git a/dojo/survey/views.py b/dojo/survey/views.py index 3829e82d211..60c8cd58a15 100644 --- a/dojo/survey/views.py +++ b/dojo/survey/views.py @@ -57,7 +57,7 @@ @user_is_authorized(Engagement, Permissions.Engagement_Edit, "eid") def delete_engagement_survey(request, eid, sid): engagement = get_object_or_404(Engagement, id=eid) - survey = get_object_or_404(Answered_Survey, id=sid) + survey = get_object_or_404(Answered_Survey.objects.filter(engagement=engagement), id=sid) questions = get_answered_questions(survey=survey, read_only=True) form = Delete_Questionnaire_Form(instance=survey) @@ -96,8 +96,8 @@ def delete_engagement_survey(request, eid, sid): def answer_questionnaire(request, eid, sid): - survey = get_object_or_404(Answered_Survey, id=sid) engagement = get_object_or_404(Engagement, id=eid) + survey = get_object_or_404(Answered_Survey.objects.filter(engagement=engagement), id=sid) system_settings = System_Settings.objects.all()[0] if not system_settings.allow_anonymous_survey_repsonse: @@ -162,8 +162,8 @@ def answer_questionnaire(request, eid, sid): @user_is_authorized(Engagement, Permissions.Engagement_Edit, "eid") def assign_questionnaire(request, eid, sid): - survey = get_object_or_404(Answered_Survey, id=sid) engagement = get_object_or_404(Engagement, id=eid) + survey = get_object_or_404(Answered_Survey.objects.filter(engagement=engagement), id=sid) form = AssignUserForm(instance=survey) if request.method == "POST": @@ -183,8 +183,8 @@ def assign_questionnaire(request, eid, sid): @user_is_authorized(Engagement, Permissions.Engagement_View, "eid") def view_questionnaire(request, eid, sid): - survey = get_object_or_404(Answered_Survey, id=sid) engagement = get_object_or_404(Engagement, id=eid) + survey = get_object_or_404(Answered_Survey.objects.filter(engagement=engagement), id=sid) questions = get_answered_questions(survey=survey, read_only=True) add_breadcrumb( diff --git a/dojo/tool_product/views.py b/dojo/tool_product/views.py index def26f088d2..98f56d2f5ce 100644 --- a/dojo/tool_product/views.py +++ b/dojo/tool_product/views.py @@ -2,7 +2,7 @@ import logging from django.contrib import messages -from django.core.exceptions import BadRequest +from django.core.exceptions import PermissionDenied from django.http import HttpResponseRedirect from django.shortcuts import get_object_or_404, render from django.urls import reverse @@ -60,10 +60,9 @@ def all_tool_product(request, pid): @user_is_authorized(Product, Permissions.Product_Edit, "pid") def edit_tool_product(request, pid, ttid): product = get_object_or_404(Product, id=pid) - tool_product = Tool_Product_Settings.objects.get(pk=ttid) + tool_product = get_object_or_404(Tool_Product_Settings, pk=ttid) if tool_product.product != product: - msg = f"Product {pid} does not fit to product of Tool_Product {tool_product.product.id}" - raise BadRequest(msg) + raise PermissionDenied if request.method == "POST": tform = ToolProductSettingsForm(request.POST, instance=tool_product) @@ -87,11 +86,10 @@ def edit_tool_product(request, pid, ttid): @user_is_authorized(Product, Permissions.Product_Edit, "pid") def delete_tool_product(request, pid, ttid): - tool_product = Tool_Product_Settings.objects.get(pk=ttid) + tool_product = get_object_or_404(Tool_Product_Settings, pk=ttid) product = get_object_or_404(Product, id=pid) if tool_product.product != product: - msg = f"Product {pid} does not fit to product of Tool_Product {tool_product.product.id}" - raise BadRequest(msg) + raise PermissionDenied if request.method == "POST": DeleteToolProductSettingsForm(request.POST, instance=tool_product) diff --git a/unittests/test_permissions_audit.py b/unittests/test_permissions_audit.py new file mode 100644 index 00000000000..7ef32860be1 --- /dev/null +++ b/unittests/test_permissions_audit.py @@ -0,0 +1,1019 @@ +""" +Security-focused permission tests for the permissions audit. + +Tests verify: +1. Risk Acceptance data is not exposed to users without Risk_Acceptance permission +2. Metadata batch operations enforce permissions on parent objects +3. Note removal verifies note-finding relationship (regression) +4. Benchmark IDOR: update_benchmark rejects bench_id from different product +5. Object/tool_product parent mismatch returns 403 +6. Risk Acceptance cross-engagement IDOR (H1 #3577434 / #3569882) +7. Engagement Presets cross-product IDOR (H1 #3577398 / #3570349) +8. Questionnaire cross-engagement IDOR (H1 #3571957) +9. Finding Templates exposure via find_template_to_apply (H1 #3577363) +10. Jira Epic BFLA - Reader cannot trigger update_jira_epic (H1 #3577193) +""" +import datetime + +from django.test import Client +from django.urls import reverse +from django.utils import timezone +from rest_framework.authtoken.models import Token +from rest_framework.test import APIClient + +from dojo.models import ( + Answered_Survey, + Benchmark_Category, + Benchmark_Product, + Benchmark_Product_Summary, + Benchmark_Requirement, + Benchmark_Type, + Dojo_User, + DojoMeta, + Engagement, + Engagement_Presets, + Engagement_Survey, + Finding, + Finding_Template, + Notes, + Objects_Product, + Objects_Review, + Product, + Product_Member, + Product_Type, + Risk_Acceptance, + Role, + Test, + Test_Type, + Tool_Configuration, + Tool_Product_Settings, + Tool_Type, +) + +from .dojo_test_case import DojoTestCase + + +class TestRiskAcceptanceExposure(DojoTestCase): + + """FindingSerializer must not expose accepted_risks to users without Risk_Acceptance permission.""" + + @classmethod + def setUpTestData(cls): + cls.reader_role = Role.objects.get(name="Reader") + cls.writer_role = Role.objects.get(name="Writer") + + # Create product type and product + cls.product_type = Product_Type.objects.create(name="RA Exposure Test PT") + cls.product = Product.objects.create( + name="RA Exposure Test Product", + description="Test", + prod_type=cls.product_type, + ) + + # Create users + cls.reader_user = Dojo_User.objects.create_user( + username="ra_test_reader", + password="testTEST1234!@#$", # noqa: S106 + is_active=True, + ) + cls.writer_user = Dojo_User.objects.create_user( + username="ra_test_writer", + password="testTEST1234!@#$", # noqa: S106 + is_active=True, + ) + + # Assign roles + Product_Member.objects.create( + product=cls.product, + user=cls.reader_user, + role=cls.reader_role, + ) + Product_Member.objects.create( + product=cls.product, + user=cls.writer_user, + role=cls.writer_role, + ) + + # Create engagement, test, finding + cls.engagement = Engagement.objects.create( + name="RA Test Engagement", + product=cls.product, + target_start=datetime.date(2024, 1, 1), + target_end=datetime.date(2024, 12, 31), + ) + test_type, _ = Test_Type.objects.get_or_create(name="Manual Code Review") + cls.test = Test.objects.create( + engagement=cls.engagement, + test_type=test_type, + target_start=timezone.now(), + target_end=timezone.now(), + ) + cls.finding = Finding.objects.create( + title="RA Test Finding", + test=cls.test, + severity="High", + numerical_severity="S1", + reporter=cls.writer_user, + ) + + # Create risk acceptance linked to the finding + cls.risk_acceptance = Risk_Acceptance.objects.create( + name="Test RA", + owner=cls.writer_user, + ) + cls.risk_acceptance.accepted_findings.add(cls.finding) + cls.engagement.risk_acceptance.add(cls.risk_acceptance) + + def _get_finding_as_user(self, user): + token, _ = Token.objects.get_or_create(user=user) + client = APIClient() + client.credentials(HTTP_AUTHORIZATION="Token " + token.key) + return client.get(reverse("finding-detail", args=(self.finding.id,))) + + def test_reader_cannot_see_accepted_risks(self): + """Reader role lacks Risk_Acceptance permission — accepted_risks must be empty.""" + response = self._get_finding_as_user(self.reader_user) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json()["accepted_risks"], []) + + def test_writer_can_see_accepted_risks(self): + """Writer role has Risk_Acceptance permission — accepted_risks must contain data.""" + response = self._get_finding_as_user(self.writer_user) + self.assertEqual(response.status_code, 200) + accepted = response.json()["accepted_risks"] + self.assertGreater(len(accepted), 0) + self.assertEqual(accepted[0]["name"], "Test RA") + + +class TestMetadataBatchPermissions(DojoTestCase): + + """Metadata batch endpoint must enforce permissions on parent objects.""" + + @classmethod + def setUpTestData(cls): + cls.reader_role = Role.objects.get(name="Reader") + cls.writer_role = Role.objects.get(name="Writer") + + # Product the user CAN access + cls.product_type = Product_Type.objects.create(name="Meta Batch Test PT") + cls.accessible_product = Product.objects.create( + name="Meta Batch Accessible Product", + description="Test", + prod_type=cls.product_type, + ) + + # Product the user CANNOT access + cls.inaccessible_product = Product.objects.create( + name="Meta Batch Inaccessible Product", + description="Test", + prod_type=cls.product_type, + ) + + # User with Writer on accessible product, no role on inaccessible product + cls.writer_user = Dojo_User.objects.create_user( + username="meta_batch_writer", + password="testTEST1234!@#$", # noqa: S106 + is_active=True, + ) + Product_Member.objects.create( + product=cls.accessible_product, + user=cls.writer_user, + role=cls.writer_role, + ) + + # User with Reader on accessible product (Reader lacks Product_Edit) + cls.reader_user = Dojo_User.objects.create_user( + username="meta_batch_reader", + password="testTEST1234!@#$", # noqa: S106 + is_active=True, + ) + Product_Member.objects.create( + product=cls.accessible_product, + user=cls.reader_user, + role=cls.reader_role, + ) + + def _client_for_user(self, user): + token, _ = Token.objects.get_or_create(user=user) + client = APIClient() + client.credentials(HTTP_AUTHORIZATION="Token " + token.key) + return client + + def test_batch_post_unauthorized_product(self): + """Writer should be denied when targeting a product they have no access to.""" + client = self._client_for_user(self.writer_user) + response = client.post( + reverse("metadata-batch"), + data={ + "product": self.inaccessible_product.id, + "metadata": [{"name": "hack_key", "value": "hack_val"}], + }, + format="json", + ) + self.assertIn(response.status_code, [403, 404]) + self.assertFalse( + DojoMeta.objects.filter( + product=self.inaccessible_product, name="hack_key", + ).exists(), + ) + + def test_batch_post_reader_cannot_edit(self): + """Reader lacks Product_Edit — batch POST should be denied.""" + client = self._client_for_user(self.reader_user) + response = client.post( + reverse("metadata-batch"), + data={ + "product": self.accessible_product.id, + "metadata": [{"name": "reader_key", "value": "reader_val"}], + }, + format="json", + ) + self.assertIn(response.status_code, [403, 404]) + self.assertFalse( + DojoMeta.objects.filter( + product=self.accessible_product, name="reader_key", + ).exists(), + ) + + +class TestNoteRelationshipVerification(DojoTestCase): + + """Regression: remove_note must verify the note belongs to the finding.""" + + @classmethod + def setUpTestData(cls): + cls.owner_role = Role.objects.get(name="Owner") + + cls.product_type = Product_Type.objects.create(name="Note Test PT") + cls.product = Product.objects.create( + name="Note Test Product", + description="Test", + prod_type=cls.product_type, + ) + + cls.user = Dojo_User.objects.create_user( + username="note_test_owner", + password="testTEST1234!@#$", # noqa: S106 + is_active=True, + ) + Product_Member.objects.create( + product=cls.product, + user=cls.user, + role=cls.owner_role, + ) + + cls.engagement = Engagement.objects.create( + name="Note Test Engagement", + product=cls.product, + target_start=datetime.date(2024, 1, 1), + target_end=datetime.date(2024, 12, 31), + ) + test_type, _ = Test_Type.objects.get_or_create(name="Manual Code Review") + cls.test = Test.objects.create( + engagement=cls.engagement, + test_type=test_type, + target_start=timezone.now(), + target_end=timezone.now(), + ) + + # Create two findings + cls.finding_a = Finding.objects.create( + title="Note Test Finding A", + test=cls.test, + severity="High", + numerical_severity="S1", + reporter=cls.user, + ) + cls.finding_b = Finding.objects.create( + title="Note Test Finding B", + test=cls.test, + severity="Medium", + numerical_severity="S2", + reporter=cls.user, + ) + + # Create a note on finding A + cls.note = Notes.objects.create( + entry="Test note on finding A", + author=cls.user, + ) + cls.finding_a.notes.add(cls.note) + + def test_remove_note_from_wrong_finding(self): + """Removing a note via a different finding's endpoint must fail.""" + token, _ = Token.objects.get_or_create(user=self.user) + client = APIClient() + client.credentials(HTTP_AUTHORIZATION="Token " + token.key) + + response = client.patch( + reverse("finding-remove-note", args=(self.finding_b.id,)), + data={"note_id": self.note.id}, + format="json", + ) + self.assertEqual(response.status_code, 400) + # Note should still exist + self.assertTrue(Notes.objects.filter(id=self.note.id).exists()) + + def test_remove_note_from_correct_finding(self): + """Removing a note from the correct finding must succeed for the author.""" + # Create a fresh note so we don't affect other tests + note = Notes.objects.create( + entry="Disposable test note", + author=self.user, + ) + self.finding_a.notes.add(note) + + token, _ = Token.objects.get_or_create(user=self.user) + client = APIClient() + client.credentials(HTTP_AUTHORIZATION="Token " + token.key) + + response = client.patch( + reverse("finding-remove-note", args=(self.finding_a.id,)), + data={"note_id": note.id}, + format="json", + ) + self.assertEqual(response.status_code, 204) + self.assertFalse(Notes.objects.filter(id=note.id).exists()) + + +class TestBenchmarkIDOR(DojoTestCase): + + """update_benchmark must reject bench_id belonging to a different product.""" + + @classmethod + def setUpTestData(cls): + cls.owner_role = Role.objects.get(name="Owner") + cls.product_type = Product_Type.objects.create(name="Bench IDOR Test PT") + + # Two separate products + cls.product_a = Product.objects.create( + name="Bench IDOR Product A", + description="Test", + prod_type=cls.product_type, + ) + cls.product_b = Product.objects.create( + name="Bench IDOR Product B", + description="Test", + prod_type=cls.product_type, + ) + + # User with Owner on both products + cls.user = Dojo_User.objects.create_user( + username="bench_idor_owner", + password="testTEST1234!@#$", # noqa: S106 + is_active=True, + ) + Product_Member.objects.create( + product=cls.product_a, user=cls.user, role=cls.owner_role, + ) + Product_Member.objects.create( + product=cls.product_b, user=cls.user, role=cls.owner_role, + ) + + # Create benchmark type, category, requirement + cls.bench_type = Benchmark_Type.objects.create( + name="IDOR Test Type", enabled=True, + ) + cls.bench_category = Benchmark_Category.objects.create( + type=cls.bench_type, name="V1: Test Category", enabled=True, + ) + cls.bench_requirement = Benchmark_Requirement.objects.create( + category=cls.bench_category, + objective_number="1.1", + objective="Test objective", + enabled=True, + ) + + # Create a benchmark entry for product A + cls.bench_product_a = Benchmark_Product.objects.create( + product=cls.product_a, + control=cls.bench_requirement, + ) + + # Create benchmark summary for product B (needed for URL) + cls.bench_summary_a = Benchmark_Product_Summary.objects.create( + product=cls.product_a, benchmark_type=cls.bench_type, + ) + cls.bench_summary_b = Benchmark_Product_Summary.objects.create( + product=cls.product_b, benchmark_type=cls.bench_type, + ) + + def test_update_benchmark_cross_product_rejected(self): + """POSTing a bench_id from product A via product B's URL must be denied.""" + client = Client() + client.login(username="bench_idor_owner", password="testTEST1234!@#$") # noqa: S106 + + # Try to update product A's benchmark through product B's endpoint + url = reverse( + "update_product_benchmark", + args=(self.product_b.id, self.bench_type.id), + ) + response = client.post(url, { + "bench_id": self.bench_product_a.id, + "field": "pass_fail", + "value": "true", + }) + # Scoped get_object_or_404 returns 404 for cross-product access; + # PermissionDenied would give 400/403 via custom handler403 (DD bug) + self.assertIn(response.status_code, [400, 403, 404]) + + def test_update_benchmark_summary_cross_product_rejected(self): + """POSTing a summary from product A via product B's URL must be denied.""" + client = Client() + client.login(username="bench_idor_owner", password="testTEST1234!@#$") # noqa: S106 + + url = reverse( + "update_product_benchmark_summary", + args=(self.product_b.id, self.bench_type.id, self.bench_summary_a.id), + ) + response = client.post(url, { + "field": "publish", + "value": "true", + }) + # Scoped get_object_or_404 returns 404 for cross-product access; + # PermissionDenied would give 400/403 via custom handler403 (DD bug) + self.assertIn(response.status_code, [400, 403, 404]) + + def test_update_benchmark_same_product_allowed(self): + """POSTing a bench_id for the correct product should succeed.""" + client = Client() + client.login(username="bench_idor_owner", password="testTEST1234!@#$") # noqa: S106 + + url = reverse( + "update_product_benchmark", + args=(self.product_a.id, self.bench_type.id), + ) + response = client.post(url, { + "bench_id": self.bench_product_a.id, + "field": "enabled", + "value": "true", + }) + self.assertEqual(response.status_code, 200) + + +class TestObjectProductParentCheck(DojoTestCase): + + """edit_object and delete_object must reject objects from different products.""" + + @classmethod + def setUpTestData(cls): + cls.owner_role = Role.objects.get(name="Owner") + cls.product_type = Product_Type.objects.create(name="Object Parent Test PT") + + cls.product_a = Product.objects.create( + name="Object Parent Product A", + description="Test", + prod_type=cls.product_type, + ) + cls.product_b = Product.objects.create( + name="Object Parent Product B", + description="Test", + prod_type=cls.product_type, + ) + + cls.user = Dojo_User.objects.create_user( + username="object_parent_owner", + password="testTEST1234!@#$", # noqa: S106 + is_active=True, + ) + Product_Member.objects.create( + product=cls.product_a, user=cls.user, role=cls.owner_role, + ) + Product_Member.objects.create( + product=cls.product_b, user=cls.user, role=cls.owner_role, + ) + + # Object belonging to product A + cls.review_status = Objects_Review.objects.create(name="In Review") + cls.tracked_file = Objects_Product.objects.create( + product=cls.product_a, + path="/test/path", + folder="test_folder", + artifact="test.py", + review_status=cls.review_status, + ) + + def test_edit_object_cross_product_rejected(self): + """Editing an object from product A via product B's URL must be denied.""" + client = Client() + client.login(username="object_parent_owner", password="testTEST1234!@#$") # noqa: S106 + + url = reverse("edit_object", args=(self.product_b.id, self.tracked_file.id)) + response = client.get(url) + # PermissionDenied raised; custom handler403 returns 400 (DD bug) + self.assertIn(response.status_code, [400, 403]) + + def test_delete_object_cross_product_rejected(self): + """Deleting an object from product A via product B's URL must be denied.""" + client = Client() + client.login(username="object_parent_owner", password="testTEST1234!@#$") # noqa: S106 + + url = reverse("delete_object", args=(self.product_b.id, self.tracked_file.id)) + response = client.get(url) + # PermissionDenied raised; custom handler403 returns 400 (DD bug) + self.assertIn(response.status_code, [400, 403]) + + +class TestToolProductParentCheck(DojoTestCase): + + """edit_tool_product and delete_tool_product must reject tools from different products.""" + + @classmethod + def setUpTestData(cls): + cls.owner_role = Role.objects.get(name="Owner") + cls.product_type = Product_Type.objects.create(name="Tool Parent Test PT") + + cls.product_a = Product.objects.create( + name="Tool Parent Product A", + description="Test", + prod_type=cls.product_type, + ) + cls.product_b = Product.objects.create( + name="Tool Parent Product B", + description="Test", + prod_type=cls.product_type, + ) + + cls.user = Dojo_User.objects.create_user( + username="tool_parent_owner", + password="testTEST1234!@#$", # noqa: S106 + is_active=True, + ) + Product_Member.objects.create( + product=cls.product_a, user=cls.user, role=cls.owner_role, + ) + Product_Member.objects.create( + product=cls.product_b, user=cls.user, role=cls.owner_role, + ) + + # Tool type, configuration, and tool setting belonging to product A + cls.tool_type = Tool_Type.objects.create(name="Test Tool Type Parent Check") + cls.tool_config = Tool_Configuration.objects.create( + name="Test Tool Config", + tool_type=cls.tool_type, + ) + cls.tool_setting = Tool_Product_Settings.objects.create( + name="Test Tool Setting", + product=cls.product_a, + tool_configuration=cls.tool_config, + ) + + def test_edit_tool_product_cross_product_rejected(self): + """Editing a tool setting from product A via product B's URL must be denied.""" + client = Client() + client.login(username="tool_parent_owner", password="testTEST1234!@#$") # noqa: S106 + + url = reverse("edit_tool_product", args=(self.product_b.id, self.tool_setting.id)) + response = client.get(url) + # PermissionDenied raised; custom handler403 returns 400 (DD bug) + self.assertIn(response.status_code, [400, 403]) + + def test_delete_tool_product_cross_product_rejected(self): + """Deleting a tool setting from product A via product B's URL must be denied.""" + client = Client() + client.login(username="tool_parent_owner", password="testTEST1234!@#$") # noqa: S106 + + url = reverse("delete_tool_product", args=(self.product_b.id, self.tool_setting.id)) + response = client.get(url) + # PermissionDenied raised; custom handler403 returns 400 (DD bug) + self.assertIn(response.status_code, [400, 403]) + + +class TestRiskAcceptanceCrossEngagementIDOR(DojoTestCase): + + """ + H1 #3577434 / #3569882: Risk acceptance endpoints must reject + a raid belonging to a different engagement than the eid in the URL. + """ + + @classmethod + def setUpTestData(cls): + cls.owner_role = Role.objects.get(name="Owner") + cls.product_type = Product_Type.objects.create(name="RA IDOR Test PT") + cls.product = Product.objects.create( + name="RA IDOR Test Product", + description="Test", + prod_type=cls.product_type, + ) + cls.user = Dojo_User.objects.create_user( + username="ra_idor_owner", + password="testTEST1234!@#$", # noqa: S106 + is_active=True, + ) + Product_Member.objects.create( + product=cls.product, user=cls.user, role=cls.owner_role, + ) + + # Two engagements under the same product + cls.engagement_a = Engagement.objects.create( + name="RA IDOR Engagement A", + product=cls.product, + target_start=datetime.date(2024, 1, 1), + target_end=datetime.date(2024, 12, 31), + ) + cls.engagement_b = Engagement.objects.create( + name="RA IDOR Engagement B", + product=cls.product, + target_start=datetime.date(2024, 1, 1), + target_end=datetime.date(2024, 12, 31), + ) + + # Create a risk acceptance on engagement A + test_type, _ = Test_Type.objects.get_or_create(name="Manual Code Review") + cls.test_a = Test.objects.create( + engagement=cls.engagement_a, + test_type=test_type, + target_start=timezone.now(), + target_end=timezone.now(), + ) + cls.finding_a = Finding.objects.create( + title="RA IDOR Finding", + test=cls.test_a, + severity="High", + numerical_severity="S1", + reporter=cls.user, + ) + cls.risk_acceptance = Risk_Acceptance.objects.create( + name="RA IDOR Test RA", + owner=cls.user, + ) + cls.risk_acceptance.accepted_findings.add(cls.finding_a) + cls.engagement_a.risk_acceptance.add(cls.risk_acceptance) + + def _login(self): + client = Client() + client.login(username="ra_idor_owner", password="testTEST1234!@#$") # noqa: S106 + return client + + def test_view_risk_acceptance_cross_engagement(self): + """Viewing a risk acceptance via a different engagement's URL must be denied.""" + client = self._login() + url = reverse("view_risk_acceptance", args=( + self.engagement_b.id, self.risk_acceptance.id, + )) + response = client.get(url) + self.assertEqual(response.status_code, 404) + + def test_edit_risk_acceptance_cross_engagement(self): + """Editing a risk acceptance via a different engagement's URL must be denied.""" + client = self._login() + url = reverse("edit_risk_acceptance", args=( + self.engagement_b.id, self.risk_acceptance.id, + )) + response = client.get(url) + self.assertEqual(response.status_code, 404) + + def test_expire_risk_acceptance_cross_engagement(self): + """Expiring a risk acceptance via a different engagement's URL must be denied.""" + client = self._login() + url = reverse("expire_risk_acceptance", args=( + self.engagement_b.id, self.risk_acceptance.id, + )) + response = client.get(url) + self.assertEqual(response.status_code, 404) + + def test_reinstate_risk_acceptance_cross_engagement(self): + """Reinstating a risk acceptance via a different engagement's URL must be denied.""" + client = self._login() + url = reverse("reinstate_risk_acceptance", args=( + self.engagement_b.id, self.risk_acceptance.id, + )) + response = client.get(url) + self.assertEqual(response.status_code, 404) + + def test_delete_risk_acceptance_cross_engagement(self): + """Deleting a risk acceptance via a different engagement's URL must be denied.""" + client = self._login() + url = reverse("delete_risk_acceptance", args=( + self.engagement_b.id, self.risk_acceptance.id, + )) + response = client.get(url) + self.assertEqual(response.status_code, 404) + + def test_view_risk_acceptance_same_engagement(self): + """Viewing a risk acceptance via the correct engagement's URL should work.""" + client = self._login() + url = reverse("view_risk_acceptance", args=( + self.engagement_a.id, self.risk_acceptance.id, + )) + response = client.get(url) + self.assertEqual(response.status_code, 200) + + +class TestEngagementPresetsCrossProductIDOR(DojoTestCase): + + """ + H1 #3577398 / #3570349: Engagement preset endpoints must reject + a preset belonging to a different product than the pid in the URL. + """ + + @classmethod + def setUpTestData(cls): + cls.owner_role = Role.objects.get(name="Owner") + cls.product_type = Product_Type.objects.create(name="Preset IDOR Test PT") + + cls.product_a = Product.objects.create( + name="Preset IDOR Product A", + description="Test", + prod_type=cls.product_type, + ) + cls.product_b = Product.objects.create( + name="Preset IDOR Product B", + description="Test", + prod_type=cls.product_type, + ) + + cls.user = Dojo_User.objects.create_user( + username="preset_idor_owner", + password="testTEST1234!@#$", # noqa: S106 + is_active=True, + ) + Product_Member.objects.create( + product=cls.product_a, user=cls.user, role=cls.owner_role, + ) + Product_Member.objects.create( + product=cls.product_b, user=cls.user, role=cls.owner_role, + ) + + # Preset belonging to product A + cls.preset = Engagement_Presets.objects.create( + title="IDOR Test Preset", + product=cls.product_a, + scope="Test scope", + ) + + def _login(self): + client = Client() + client.login(username="preset_idor_owner", password="testTEST1234!@#$") # noqa: S106 + return client + + def test_edit_preset_cross_product(self): + """Editing a preset from product A via product B's URL must return 404.""" + client = self._login() + url = reverse("edit_engagement_presets", args=( + self.product_b.id, self.preset.id, + )) + response = client.get(url) + # Scoped get_object_or_404 returns 404 for cross-product access + self.assertEqual(response.status_code, 404) + + def test_delete_preset_cross_product(self): + """Deleting a preset from product A via product B's URL must return 404.""" + client = self._login() + url = reverse("delete_engagement_presets", args=( + self.product_b.id, self.preset.id, + )) + response = client.get(url) + self.assertEqual(response.status_code, 404) + + def test_edit_preset_same_product(self): + """Editing a preset via the correct product's URL should work.""" + client = self._login() + url = reverse("edit_engagement_presets", args=( + self.product_a.id, self.preset.id, + )) + response = client.get(url) + self.assertEqual(response.status_code, 200) + + +class TestQuestionnaireCrossEngagementIDOR(DojoTestCase): + + """ + H1 #3571957: Survey/questionnaire endpoints must reject + a survey belonging to a different engagement than the eid in the URL. + """ + + @classmethod + def setUpTestData(cls): + cls.owner_role = Role.objects.get(name="Owner") + cls.product_type = Product_Type.objects.create(name="Survey IDOR Test PT") + cls.product = Product.objects.create( + name="Survey IDOR Test Product", + description="Test", + prod_type=cls.product_type, + ) + cls.user = Dojo_User.objects.create_user( + username="survey_idor_owner", + password="testTEST1234!@#$", # noqa: S106 + is_active=True, + ) + Product_Member.objects.create( + product=cls.product, user=cls.user, role=cls.owner_role, + ) + + cls.engagement_a = Engagement.objects.create( + name="Survey IDOR Engagement A", + product=cls.product, + target_start=datetime.date(2024, 1, 1), + target_end=datetime.date(2024, 12, 31), + ) + cls.engagement_b = Engagement.objects.create( + name="Survey IDOR Engagement B", + product=cls.product, + target_start=datetime.date(2024, 1, 1), + target_end=datetime.date(2024, 12, 31), + ) + + # Create a questionnaire (Engagement_Survey) and an Answered_Survey on engagement A + cls.survey_template = Engagement_Survey.objects.create( + name="Test Questionnaire", + description="Test description", + active=True, + ) + cls.answered_survey = Answered_Survey.objects.create( + engagement=cls.engagement_a, + survey=cls.survey_template, + responder=cls.user, + completed=False, + ) + + def _login(self): + client = Client() + client.login(username="survey_idor_owner", password="testTEST1234!@#$") # noqa: S106 + return client + + def test_view_questionnaire_cross_engagement(self): + """Viewing a survey from engagement A via engagement B's URL must return 404.""" + client = self._login() + url = reverse("view_questionnaire", args=( + self.engagement_b.id, self.answered_survey.id, + )) + response = client.get(url) + self.assertEqual(response.status_code, 404) + + def test_delete_survey_cross_engagement(self): + """Deleting a survey from engagement A via engagement B's URL must return 404.""" + client = self._login() + url = reverse("delete_engagement_survey", args=( + self.engagement_b.id, self.answered_survey.id, + )) + response = client.get(url) + self.assertEqual(response.status_code, 404) + + def test_answer_questionnaire_cross_engagement(self): + """Answering a survey from engagement A via engagement B's URL must return 404.""" + client = self._login() + url = reverse("answer_questionnaire", args=( + self.engagement_b.id, self.answered_survey.id, + )) + response = client.get(url) + self.assertEqual(response.status_code, 404) + + def test_view_questionnaire_same_engagement(self): + """Viewing a survey via the correct engagement's URL should work.""" + client = self._login() + url = reverse("view_questionnaire", args=( + self.engagement_a.id, self.answered_survey.id, + )) + response = client.get(url) + self.assertEqual(response.status_code, 200) + + +class TestFindingTemplatesGlobalPermission(DojoTestCase): + + """ + H1 #3577363: find_template_to_apply must require global Finding_Edit + permission, not just product-level Finding_Edit. + """ + + @classmethod + def setUpTestData(cls): + cls.writer_role = Role.objects.get(name="Writer") + cls.product_type = Product_Type.objects.create(name="Template Test PT") + cls.product = Product.objects.create( + name="Template Test Product", + description="Test", + prod_type=cls.product_type, + ) + + # Product-level writer (no global permission) + cls.product_writer = Dojo_User.objects.create_user( + username="template_test_writer", + password="testTEST1234!@#$", # noqa: S106 + is_active=True, + ) + Product_Member.objects.create( + product=cls.product, user=cls.product_writer, role=cls.writer_role, + ) + + # Superuser (has global permissions) + cls.superuser = Dojo_User.objects.create_user( + username="template_test_super", + password="testTEST1234!@#$", # noqa: S106 + is_active=True, + is_superuser=True, + ) + + # Create engagement, test, finding + cls.engagement = Engagement.objects.create( + name="Template Test Engagement", + product=cls.product, + target_start=datetime.date(2024, 1, 1), + target_end=datetime.date(2024, 12, 31), + ) + test_type, _ = Test_Type.objects.get_or_create(name="Manual Code Review") + cls.test = Test.objects.create( + engagement=cls.engagement, + test_type=test_type, + target_start=timezone.now(), + target_end=timezone.now(), + ) + cls.finding = Finding.objects.create( + title="Template Test Finding", + test=cls.test, + severity="High", + numerical_severity="S1", + reporter=cls.product_writer, + ) + + # Create a template (should only be visible to global permission holders) + Finding_Template.objects.create( + title="Secret Template", + severity="Critical", + ) + + def test_product_writer_cannot_access_find_template(self): + """Product-level Writer without global permission should be denied.""" + client = Client() + client.login(username="template_test_writer", password="testTEST1234!@#$") # noqa: S106 + url = reverse("find_template_to_apply", args=(self.finding.id,)) + response = client.get(url) + # PermissionDenied raised; custom handler403 returns 400 (DD bug) + self.assertIn(response.status_code, [400, 403]) + + def test_superuser_can_access_find_template(self): + """Superuser (implicit global permission) should be able to access.""" + client = Client() + client.login(username="template_test_super", password="testTEST1234!@#$") # noqa: S106 + url = reverse("find_template_to_apply", args=(self.finding.id,)) + response = client.get(url) + self.assertEqual(response.status_code, 200) + + +class TestJiraEpicBFLA(DojoTestCase): + + """ + H1 #3577193: update_jira_epic must enforce Engagement_Edit permission, + not just IsAuthenticated. Reader role should be denied. + """ + + @classmethod + def setUpTestData(cls): + cls.reader_role = Role.objects.get(name="Reader") + cls.writer_role = Role.objects.get(name="Writer") + cls.product_type = Product_Type.objects.create(name="Jira Epic BFLA Test PT") + cls.product = Product.objects.create( + name="Jira Epic BFLA Test Product", + description="Test", + prod_type=cls.product_type, + ) + + cls.reader_user = Dojo_User.objects.create_user( + username="jira_epic_reader", + password="testTEST1234!@#$", # noqa: S106 + is_active=True, + ) + cls.writer_user = Dojo_User.objects.create_user( + username="jira_epic_writer", + password="testTEST1234!@#$", # noqa: S106 + is_active=True, + ) + + Product_Member.objects.create( + product=cls.product, user=cls.reader_user, role=cls.reader_role, + ) + Product_Member.objects.create( + product=cls.product, user=cls.writer_user, role=cls.writer_role, + ) + + cls.engagement = Engagement.objects.create( + name="Jira Epic BFLA Engagement", + product=cls.product, + target_start=datetime.date(2024, 1, 1), + target_end=datetime.date(2024, 12, 31), + ) + + def _client_for_user(self, user): + token, _ = Token.objects.get_or_create(user=user) + client = APIClient() + client.credentials(HTTP_AUTHORIZATION="Token " + token.key) + return client + + def test_reader_cannot_update_jira_epic(self): + """Reader role should be denied POST to update_jira_epic.""" + client = self._client_for_user(self.reader_user) + url = reverse("engagement-update-jira-epic", args=(self.engagement.id,)) + response = client.post(url, data={}, format="json") + self.assertIn(response.status_code, [403, 404]) + + def test_writer_allowed_update_jira_epic(self): + """ + Writer role should be allowed to POST to update_jira_epic + (may fail at Jira level, but not at permission level). + """ + client = self._client_for_user(self.writer_user) + url = reverse("engagement-update-jira-epic", args=(self.engagement.id,)) + response = client.post(url, data={}, format="json") + # Writer has Engagement_Edit, so should pass permission check. + # May get 400/500 from Jira integration, but NOT 403. + self.assertNotEqual(response.status_code, 403)