diff --git a/dojo/api_v2/permissions.py b/dojo/api_v2/permissions.py index 807bbcf7b2a..acfe57cb016 100644 --- a/dojo/api_v2/permissions.py +++ b/dojo/api_v2/permissions.py @@ -473,7 +473,13 @@ def has_permission(self, request, view): # Raise an explicit drf exception here raise ValidationError(e) if engagement := converted_dict.get("engagement"): - # existing engagement, nothing special to check + # Validate the resolved engagement's parent chain matches any provided identifiers + if (product := converted_dict.get("product")) and engagement.product_id != product.id: + msg = f'The resolved engagement is associated with product "{engagement.product.name}", not with product "{converted_dict.get("product_name")}"' + raise ValidationError(msg) + if (engagement_name := converted_dict.get("engagement_name")) and engagement.name != engagement_name: + msg = f'The resolved engagement is named "{engagement.name}", not "{engagement_name}"' + raise ValidationError(msg) return user_has_permission( request.user, engagement, Permissions.Import_Scan_Result, ) @@ -764,6 +770,11 @@ def has_permission(self, request, view): try: converted_dict = auto_create.convert_querydict_to_dict(request.data) auto_create.process_import_meta_data_from_dict(converted_dict) + # engagement is not a declared field on ReImportScanSerializer and will be + # stripped during validation — don't use it in the permission check either, + # so the permission check resolves targets the same way execution does + converted_dict.pop("engagement", None) + converted_dict.pop("engagement_id", None) # Get an existing product converted_dict["product_type"] = auto_create.get_target_product_type_if_exists(**converted_dict) converted_dict["product"] = auto_create.get_target_product_if_exists(**converted_dict) @@ -774,7 +785,20 @@ def has_permission(self, request, view): raise ValidationError(e) if test := converted_dict.get("test"): - # existing test, nothing special to check + # Validate the resolved test's parent chain matches any provided identifiers + if (product := converted_dict.get("product")) and test.engagement.product_id != product.id: + msg = f'The resolved test is associated with product "{test.engagement.product.name}", not with product "{converted_dict.get("product_name")}"' + raise ValidationError(msg) + if (engagement := converted_dict.get("engagement")) and test.engagement_id != engagement.id: + msg = f'The resolved test is associated with engagement "{test.engagement.name}", not with engagement "{converted_dict.get("engagement_name")}"' + raise ValidationError(msg) + # Also validate by name when the objects were not resolved (e.g. names that match no existing record) + if not converted_dict.get("product") and (product_name := converted_dict.get("product_name")) and test.engagement.product.name != product_name: + msg = f'The resolved test is associated with product "{test.engagement.product.name}", not with product "{product_name}"' + raise ValidationError(msg) + if not converted_dict.get("engagement") and (engagement_name := converted_dict.get("engagement_name")) and test.engagement.name != engagement_name: + msg = f'The resolved test is associated with engagement "{test.engagement.name}", not with engagement "{engagement_name}"' + raise ValidationError(msg) return user_has_permission( request.user, test, Permissions.Import_Scan_Result, ) @@ -1181,7 +1205,10 @@ def check_auto_create_permission( raise ValidationError(msg) if engagement: - # existing engagement, nothing special to check + # Validate the resolved engagement's parent chain matches any provided names + if product is not None and engagement.product_id != product.id: + msg = f'The resolved engagement is associated with product "{engagement.product.name}", not with product "{product_name}"' + raise ValidationError(msg) return user_has_permission( user, engagement, Permissions.Import_Scan_Result, ) diff --git a/dojo/importers/auto_create_context.py b/dojo/importers/auto_create_context.py index 26d37ae65b0..9e9103edd41 100644 --- a/dojo/importers/auto_create_context.py +++ b/dojo/importers/auto_create_context.py @@ -181,6 +181,9 @@ def get_target_engagement_if_exists( """ if engagement := get_object_or_none(Engagement, pk=engagement_id): logger.debug("Using existing engagement by id: %s", engagement_id) + if product is not None and engagement.product_id != product.id: + msg = f'Engagement "{engagement_id}" does not belong to product "{product}"' + raise ValueError(msg) return engagement # if there's no product, then for sure there's no engagement either if product is None: @@ -203,6 +206,9 @@ def get_target_test_if_exists( """ if test := get_object_or_none(Test, pk=test_id): logger.debug("Using existing Test by id: %s", test_id) + if engagement is not None and test.engagement_id != engagement.id: + msg = f'Test "{test_id}" does not belong to engagement "{engagement}"' + raise ValueError(msg) return test # If the engagement is not supplied, we cannot do anything if not engagement: diff --git a/unittests/test_rest_framework.py b/unittests/test_rest_framework.py index 95807d4c536..7199e08c126 100644 --- a/unittests/test_rest_framework.py +++ b/unittests/test_rest_framework.py @@ -3429,6 +3429,95 @@ def test_create_not_authorized_product_name_engagement_name_scan_type_title(self importer_mock.assert_not_called() reimporter_mock.assert_not_called() + # Security tests: verify that conflicting ID-based and name-based identifiers are rejected + + @patch("dojo.importers.default_reimporter.DefaultReImporter.process_scan") + @patch("dojo.importers.default_importer.DefaultImporter.process_scan") + @patch("dojo.api_v2.permissions.user_has_permission") + def test_reimport_engagement_param_ignored_permission_checked_on_name_resolved_target(self, mock, importer_mock, reimporter_mock): + """ + Engagement is not a declared field on ReImportScanSerializer — verify + the permission check uses the name-resolved target, not the engagement param. + """ + mock.return_value = False + importer_mock.return_value = IMPORTER_MOCK_RETURN_VALUE + reimporter_mock.return_value = REIMPORTER_MOCK_RETURN_VALUE + + with Path("tests/zap_sample.xml").open(encoding="utf-8") as testfile: + payload = { + "minimum_severity": "Low", + "active": True, + "verified": True, + "scan_type": "ZAP Scan", + "file": testfile, + # engagement=1 belongs to Product 2 Engagement 1, but it should be ignored + "engagement": 1, + # These names resolve to Product 2's Engagement 4 -> Test 4 + "product_name": "Security How-to", + "engagement_name": "April monthly engagement", + "version": "1.0.0", + } + response = self.client.post(self.url, payload) + self.assertEqual(403, response.status_code, response.content[:1000]) + # Permission must be checked on name-resolved Test 4 (in Engagement 4), + # NOT on Test 3 (which belongs to the engagement=1 param) + mock.assert_called_with(User.objects.get(username="admin"), + Test.objects.get(id=4), + Permissions.Import_Scan_Result) + importer_mock.assert_not_called() + reimporter_mock.assert_not_called() + + @patch("dojo.importers.default_reimporter.DefaultReImporter.process_scan") + @patch("dojo.importers.default_importer.DefaultImporter.process_scan") + def test_reimport_with_test_id_mismatched_product_name_is_rejected(self, importer_mock, reimporter_mock): + """Sending test ID from one product with product_name from another must be rejected.""" + importer_mock.return_value = IMPORTER_MOCK_RETURN_VALUE + reimporter_mock.return_value = REIMPORTER_MOCK_RETURN_VALUE + + with Path("tests/zap_sample.xml").open(encoding="utf-8") as testfile: + payload = { + "minimum_severity": "Low", + "active": True, + "verified": True, + "scan_type": "ZAP Scan", + "file": testfile, + # Test 3 belongs to Engagement 1 -> Product 2 ("Security How-to") + "test": 3, + # But product_name points to Product 1 ("Python How-to") + "product_name": "Python How-to", + "version": "1.0.0", + } + response = self.client.post(self.url, payload) + self.assertEqual(400, response.status_code, response.content[:1000]) + importer_mock.assert_not_called() + reimporter_mock.assert_not_called() + + @patch("dojo.importers.default_reimporter.DefaultReImporter.process_scan") + @patch("dojo.importers.default_importer.DefaultImporter.process_scan") + def test_reimport_with_test_id_mismatched_engagement_name_is_rejected(self, importer_mock, reimporter_mock): + """Sending test ID from one engagement with engagement_name from another must be rejected.""" + importer_mock.return_value = IMPORTER_MOCK_RETURN_VALUE + reimporter_mock.return_value = REIMPORTER_MOCK_RETURN_VALUE + + with Path("tests/zap_sample.xml").open(encoding="utf-8") as testfile: + payload = { + "minimum_severity": "Low", + "active": True, + "verified": True, + "scan_type": "ZAP Scan", + "file": testfile, + # Test 3 belongs to Engagement 1 ("1st Quarter Engagement") + "test": 3, + # But engagement_name points to a different engagement + "product_name": "Security How-to", + "engagement_name": "April monthly engagement", + "version": "1.0.0", + } + response = self.client.post(self.url, payload) + self.assertEqual(400, response.status_code, response.content[:1000]) + importer_mock.assert_not_called() + reimporter_mock.assert_not_called() + @versioned_fixtures class ProductTypeTest(BaseClass.BaseClassTest):