diff --git a/backend/src/main.py b/backend/src/main.py index b09d416..d81c1ae 100644 --- a/backend/src/main.py +++ b/backend/src/main.py @@ -5,9 +5,11 @@ from .core.config import settings from .agents.main_agent.agent import MainAgent from .services.face_detection import FaceDetectionService +from .services.audit_log import AuditLogService app = FastAPI(title=settings.PROJECT_NAME) face_service = FaceDetectionService() +audit_log = AuditLogService() app.add_middleware( CORSMiddleware, @@ -32,6 +34,14 @@ class FaceVerifyRequest(BaseModel): class ResolveAlertRequest(BaseModel): conflict_id: str approved: bool = False + reviewer_id: Optional[str] = None + reason: Optional[str] = None + +class AdminReviewRequest(BaseModel): + conflict_id: str + approved: bool + reviewer_id: str + reason: str # Initialize the main orchestrator agent main_agent = MainAgent() @@ -60,9 +70,56 @@ async def get_pending_alerts(): @app.post("/face/resolve-alert") async def resolve_alert(request: ResolveAlertRequest): - success = face_service.resolve_alert(request.conflict_id, request.approved) + success = face_service.resolve_alert( + request.conflict_id, + request.approved, + request.reviewer_id, + request.reason + ) return {"success": success} +@app.get("/face/conflict/{conflict_id}") +async def get_conflict_details(conflict_id: str): + details = face_service.get_conflict_details(conflict_id) + if details is None: + return {"error": "Conflict not found"}, 404 + return details + +@app.post("/admin/review-conflict") +async def admin_review_conflict(request: AdminReviewRequest): + success = face_service.admin_review_conflict( + request.conflict_id, + request.approved, + request.reviewer_id, + request.reason + ) + if not success: + return {"error": "Conflict not found"}, 404 + return {"success": True, "message": "Review decision recorded"} + +@app.get("/admin/override-log") +async def get_override_log(): + return face_service.get_override_log() + +@app.get("/audit/timeline") +async def get_audit_timeline(roll_number: Optional[str] = None, session_id: Optional[str] = None): + result = [] + if session_id: + result = audit_log.get_session_timeline(session_id) + elif roll_number: + result = audit_log.get_timeline(roll_number) + else: + result = audit_log.get_timeline() + return result + +@app.get("/audit/search") +async def search_audit_log(query: str): + return audit_log.search(query) + +@app.get("/audit/events/{event_type}") +async def get_audit_events_by_type(event_type: str): + return audit_log.get_events_by_type(event_type) + @app.post("/analyze") async def analyze_repo(request: AnalyzeRequest): # Legacy REST endpoint for backward compatibility diff --git a/backend/src/models/events.py b/backend/src/models/events.py index 2d00ce1..10e68db 100644 --- a/backend/src/models/events.py +++ b/backend/src/models/events.py @@ -21,6 +21,20 @@ class EventType(str, Enum): MANUAL_REVIEW_REQUIRED = "manual_review.required" ACCESS_DENIED_CONFLICT = "access.denied_conflict" + # Admin Review Events + ADMIN_REVIEW_STARTED = "admin_review.started" + ADMIN_REVIEW_APPROVED = "admin_review.approved" + ADMIN_REVIEW_REJECTED = "admin_review.rejected" + ADMIN_OVERRIDE_ACTION = "admin_override.action" + + # Audit Log Events + VERIFICATION_ATTEMPT = "verification.attempt" + FACE_MATCH = "face.match" + FACE_MISMATCH = "face.mismatch" + SUSPICIOUS_IDENTITY = "suspicious.identity" + ADMIN_OVERRIDE = "admin.override" + EXAMINATION_ACCESS = "examination.access" + # ORACLE Events FILE_RECEIVED = "file_received" PDF_PARSED = "pdf_parsed" diff --git a/backend/src/services/audit_log.py b/backend/src/services/audit_log.py new file mode 100644 index 0000000..5f2e17b --- /dev/null +++ b/backend/src/services/audit_log.py @@ -0,0 +1,201 @@ +from typing import Optional, List, Dict, Any +from dataclasses import dataclass, field +from datetime import datetime, timezone +import hashlib +import json + + +@dataclass +class AuditEvent: + event_id: str + timestamp: datetime + event_type: str + roll_number: str + session_id: Optional[str] + payload: Dict[str, Any] + metadata: Optional[Dict[str, Any]] = None + + def to_dict(self) -> Dict[str, Any]: + return { + "event_id": self.event_id, + "timestamp": self.timestamp.isoformat(), + "event_type": self.event_type, + "roll_number": self.roll_number, + "session_id": self.session_id, + "payload": self.payload, + "metadata": self.metadata, + } + + +class AuditLogService: + def __init__(self): + self._logs: List[AuditEvent] = [] + + def _generate_event_id(self, roll_number: str, event_type: str, timestamp: datetime) -> str: + raw = f"{timestamp.isoformat()}:{roll_number}:{event_type}" + return hashlib.sha256(raw.encode()).hexdigest()[:16] + + def log_verification_attempt( + self, + roll_number: str, + session_id: Optional[str], + capture_info: Dict[str, Any], + result: str, + confidence: float, + ) -> AuditEvent: + event = AuditEvent( + event_id=self._generate_event_id(roll_number, "verification_attempt", datetime.now(timezone.utc)), + timestamp=datetime.now(timezone.utc), + event_type="verification_attempt", + roll_number=roll_number, + session_id=session_id, + payload={ + "result": result, + "confidence": confidence, + "capture": capture_info, + }, + ) + self._logs.append(event) + return event + + def log_match( + self, + roll_number: str, + session_id: Optional[str], + similarity: float, + confidence: float, + ) -> AuditEvent: + event = AuditEvent( + event_id=self._generate_event_id(roll_number, "face_match", datetime.now(timezone.utc)), + timestamp=datetime.now(timezone.utc), + event_type="face_match", + roll_number=roll_number, + session_id=session_id, + payload={ + "similarity": similarity, + "confidence": confidence, + }, + ) + self._logs.append(event) + return event + + def log_mismatch( + self, + roll_number: str, + session_id: Optional[str], + similarity: float, + confidence: float, + threshold: float, + ) -> AuditEvent: + event = AuditEvent( + event_id=self._generate_event_id(roll_number, "face_mismatch", datetime.now(timezone.utc)), + timestamp=datetime.now(timezone.utc), + event_type="face_mismatch", + roll_number=roll_number, + session_id=session_id, + payload={ + "similarity": similarity, + "confidence": confidence, + "threshold": threshold, + }, + ) + self._logs.append(event) + return event + + def log_suspicious_event( + self, + roll_number: str, + session_id: Optional[str], + reason: str, + conflict_id: Optional[str] = None, + matched_rolls: Optional[List[str]] = None, + ) -> AuditEvent: + event = AuditEvent( + event_id=self._generate_event_id(roll_number, "suspicious_identity", datetime.now(timezone.utc)), + timestamp=datetime.now(timezone.utc), + event_type="suspicious_identity", + roll_number=roll_number, + session_id=session_id, + payload={ + "reason": reason, + "conflict_id": conflict_id, + "matched_rolls": matched_rolls or [], + }, + ) + self._logs.append(event) + return event + + def log_admin_override( + self, + roll_number: str, + session_id: Optional[str], + conflict_id: str, + reviewer_id: str, + approved: bool, + reason: str, + ) -> AuditEvent: + event = AuditEvent( + event_id=self._generate_event_id(roll_number, "admin_override", datetime.now(timezone.utc)), + timestamp=datetime.now(timezone.utc), + event_type="admin_override", + roll_number=roll_number, + session_id=session_id, + payload={ + "conflict_id": conflict_id, + "reviewer_id": reviewer_id, + "approved": approved, + "reason": reason, + }, + ) + self._logs.append(event) + return event + + def log_examination_access( + self, + roll_number: str, + session_id: str, + granted: bool, + reason: Optional[str] = None, + ) -> AuditEvent: + event = AuditEvent( + event_id=self._generate_event_id(roll_number, "examination_access", datetime.now(timezone.utc)), + timestamp=datetime.now(timezone.utc), + event_type="examination_access", + roll_number=roll_number, + session_id=session_id, + payload={ + "granted": granted, + "reason": reason, + }, + ) + self._logs.append(event) + return event + + def get_timeline(self, roll_number: Optional[str] = None) -> List[Dict[str, Any]]: + events = self._logs + if roll_number: + events = [e for e in events if e.roll_number == roll_number] + return [e.to_dict() for e in sorted(events, key=lambda x: x.timestamp)] + + def get_events_by_type(self, event_type: str) -> List[Dict[str, Any]]: + events = [e for e in self._logs if e.event_type == event_type] + return [e.to_dict() for e in sorted(events, key=lambda x: x.timestamp)] + + def get_session_timeline(self, session_id: str) -> List[Dict[str, Any]]: + events = [e for e in self._logs if e.session_id == session_id] + return [e.to_dict() for e in sorted(events, key=lambda x: x.timestamp)] + + def search(self, query: str) -> List[Dict[str, Any]]: + query_lower = query.lower() + results = [] + for event in self._logs: + payload_str = json.dumps(event.payload).lower() + if query_lower in payload_str or query_lower in event.roll_number.lower(): + results.append(event.to_dict()) + return results + + def clear(self): + self._logs.clear() + + def count(self) -> int: + return len(self._logs) \ No newline at end of file diff --git a/backend/src/services/face_detection.py b/backend/src/services/face_detection.py index 8341c46..2829513 100644 --- a/backend/src/services/face_detection.py +++ b/backend/src/services/face_detection.py @@ -26,6 +26,9 @@ class ConflictAlert: timestamp: datetime status: str = "pending_review" session_id: Optional[str] = None + reviewer_id: Optional[str] = None + review_timestamp: Optional[datetime] = None + review_reason: Optional[str] = None def to_dict(self) -> Dict[str, Any]: return { @@ -36,6 +39,9 @@ def to_dict(self) -> Dict[str, Any]: "timestamp": self.timestamp.isoformat(), "status": self.status, "session_id": self.session_id, + "reviewer_id": self.reviewer_id, + "review_timestamp": self.review_timestamp.isoformat() if self.review_timestamp else None, + "review_reason": self.review_reason, } @@ -124,10 +130,13 @@ def get_conflicts_for_roll(self, roll_number: str) -> List[ConflictAlert]: def get_pending_alerts(self) -> List[ConflictAlert]: return [alert for alert in self.alerts if alert.status == "pending_review"] - def resolve_alert(self, conflict_id: str, approved: bool) -> bool: + def resolve_alert(self, conflict_id: str, approved: bool, reviewer_id: Optional[str] = None, reason: Optional[str] = None) -> bool: for alert in self.alerts: if alert.conflict_id == conflict_id: alert.status = "approved" if approved else "rejected" + alert.reviewer_id = reviewer_id + alert.review_timestamp = datetime.now(timezone.utc) + alert.review_reason = reason return True return False @@ -147,4 +156,42 @@ def can_grant_access(self, roll_number: str) -> Tuple[bool, Optional[str]]: for conflict in conflicts: if conflict.status == "pending_review": return False, f"Identity under review: conflict {conflict.conflict_id}" - return True, None \ No newline at end of file + return True, None + + def get_conflict_details(self, conflict_id: str) -> Optional[Dict[str, Any]]: + for alert in self.alerts: + if alert.conflict_id == conflict_id: + prior_embeddings = [] + for roll in alert.matched_roll_numbers: + if roll in self.embeddings: + emb = self.embeddings[roll] + prior_embeddings.append({ + "roll_number": emb.roll_number, + "student_name": emb.student_name, + "timestamp": emb.timestamp.isoformat(), + "session_id": emb.session_id, + }) + return { + "conflict": alert.to_dict(), + "prior_embeddings": prior_embeddings, + } + return None + + def admin_review_conflict(self, conflict_id: str, approved: bool, reviewer_id: str, reason: str) -> bool: + for alert in self.alerts: + if alert.conflict_id == conflict_id: + alert.status = "approved" if approved else "rejected" + alert.reviewer_id = reviewer_id + alert.review_timestamp = datetime.now(timezone.utc) + alert.review_reason = reason + return True + return False + + def get_override_log(self) -> List[Dict[str, Any]]: + return [alert.to_dict() for alert in self.alerts if alert.status in ("approved", "rejected")] + + def get_alert_by_id(self, conflict_id: str) -> Optional[ConflictAlert]: + for alert in self.alerts: + if alert.conflict_id == conflict_id: + return alert + return None \ No newline at end of file diff --git a/backend/tests/test_audit_log.py b/backend/tests/test_audit_log.py new file mode 100644 index 0000000..1c3d9b5 --- /dev/null +++ b/backend/tests/test_audit_log.py @@ -0,0 +1,198 @@ +import pytest +from backend.src.services.audit_log import ( + AuditLogService, + AuditEvent, +) + + +class TestAuditLogService: + def test_log_verification_attempt(self): + log = AuditLogService() + event = log.log_verification_attempt( + roll_number="R001", + session_id="session-1", + capture_info={"width": 640, "height": 480}, + result="verified", + confidence=0.95, + ) + + assert isinstance(event, AuditEvent) + assert event.event_type == "verification_attempt" + assert event.roll_number == "R001" + assert event.payload["result"] == "verified" + assert event.payload["confidence"] == 0.95 + assert log.count() == 1 + + def test_log_match(self): + log = AuditLogService() + event = log.log_match( + roll_number="R001", + session_id="session-1", + similarity=0.92, + confidence=0.89, + ) + + assert event.event_type == "face_match" + assert event.payload["similarity"] == 0.92 + assert event.payload["confidence"] == 0.89 + + def test_log_mismatch(self): + log = AuditLogService() + event = log.log_mismatch( + roll_number="R001", + session_id="session-1", + similarity=0.35, + confidence=0.25, + threshold=0.85, + ) + + assert event.event_type == "face_mismatch" + assert event.payload["threshold"] == 0.85 + + def test_log_suspicious_event(self): + log = AuditLogService() + event = log.log_suspicious_event( + roll_number="R002", + session_id="session-2", + reason="Same face as R001", + conflict_id="abc123", + matched_rolls=["R001"], + ) + + assert event.event_type == "suspicious_identity" + assert event.payload["conflict_id"] == "abc123" + assert "R001" in event.payload["matched_rolls"] + + def test_log_admin_override(self): + log = AuditLogService() + event = log.log_admin_override( + roll_number="R001", + session_id="session-1", + conflict_id="abc123", + reviewer_id="admin-1", + approved=True, + reason="Verified as legitimate", + ) + + assert event.event_type == "admin_override" + assert event.payload["reviewer_id"] == "admin-1" + assert event.payload["approved"] is True + + def test_log_examination_access(self): + log = AuditLogService() + event = log.log_examination_access( + roll_number="R001", + session_id="session-1", + granted=True, + reason=None, + ) + + assert event.event_type == "examination_access" + assert event.payload["granted"] is True + + def test_get_timeline_all(self): + log = AuditLogService() + log.log_verification_attempt("R001", "session-1", {}, "verified", 0.95) + log.log_match("R002", "session-2", 0.92, 0.89) + + timeline = log.get_timeline() + + assert len(timeline) == 2 + assert timeline[0]["event_type"] == "verification_attempt" + assert timeline[1]["event_type"] == "face_match" + + def test_get_timeline_filter_by_roll(self): + log = AuditLogService() + log.log_verification_attempt("R001", "session-1", {}, "verified", 0.95) + log.log_verification_attempt("R002", "session-2", {}, "verified", 0.90) + + timeline = log.get_timeline(roll_number="R001") + + assert len(timeline) == 1 + assert timeline[0]["roll_number"] == "R001" + + def test_get_events_by_type(self): + log = AuditLogService() + log.log_verification_attempt("R001", "session-1", {}, "verified", 0.95) + log.log_verification_attempt("R002", "session-2", {}, "verified", 0.90) + log.log_match("R001", "session-1", 0.92, 0.89) + + attempts = log.get_events_by_type("verification_attempt") + + assert len(attempts) == 2 + + def test_get_session_timeline(self): + log = AuditLogService() + log.log_verification_attempt("R001", "session-1", {}, "verified", 0.95) + log.log_match("R001", "session-1", 0.92, 0.89) + log.log_verification_attempt("R002", "session-2", {}, "verified", 0.90) + + timeline = log.get_session_timeline("session-1") + + assert len(timeline) == 2 + + def test_search(self): + log = AuditLogService() + log.log_suspicious_event("R001", "session-1", "Same face as R002", "conflict-1", ["R002"]) + log.log_verification_attempt("R003", "session-3", {}, "verified", 0.95) + + results = log.search("conflict") + + assert len(results) == 1 + assert "conflict-1" in results[0]["payload"]["conflict_id"] + + def test_audit_event_to_dict(self): + log = AuditLogService() + event = log.log_verification_attempt("R001", "session-1", {}, "verified", 0.95) + + d = event.to_dict() + + assert "event_id" in d + assert "timestamp" in d + assert "event_type" in d + assert "roll_number" in d + assert "payload" in d + + def test_clear(self): + log = AuditLogService() + log.log_verification_attempt("R001", "session-1", {}, "verified", 0.95) + log.clear() + + assert log.count() == 0 + + def test_deterministic_event_id(self): + log = AuditLogService() + import time + from datetime import datetime, timezone + + ts = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + + hash1 = log._generate_event_id("R001", "test", ts) + hash2 = log._generate_event_id("R001", "test", ts) + + assert hash1 == hash2 + assert len(hash1) == 16 + + def test_full_audit_flow(self): + log = AuditLogService() + + log.log_verification_attempt("R001", "session-1", {"width": 640}, "verified", 0.95) + log.log_match("R001", "session-1", 0.92, 0.89) + log.log_examination_access("R001", "session-1", True) + + log.log_verification_attempt("R002", "session-2", {"width": 640}, "mismatch", 0.45) + log.log_mismatch("R002", "session-2", 0.35, 0.25, 0.85) + log.log_suspicious_event("R002", "session-2", "Shared face", "conflict-1", ["R001"]) + log.log_admin_override("R002", "session-2", "conflict-1", "admin-1", False, "Different person") + + full_timeline = log.get_timeline() + assert len(full_timeline) == 7 # 3 for session-1 + 4 for session-2 + + session_timeline = log.get_session_timeline("session-1") + assert len(session_timeline) == 3 + + session_timeline_2 = log.get_session_timeline("session-2") + assert len(session_timeline_2) == 4 + + admin_overrides = log.get_events_by_type("admin_override") + assert len(admin_overrides) == 1 \ No newline at end of file diff --git a/backend/tests/test_face_detection.py b/backend/tests/test_face_detection.py index 7a098d4..d392815 100644 --- a/backend/tests/test_face_detection.py +++ b/backend/tests/test_face_detection.py @@ -107,10 +107,12 @@ def test_resolve_alert(self): service.verify_identity(emb1, "R001") _, alert, _ = service.verify_identity(emb2, "R002") - result = service.resolve_alert(alert.conflict_id, approved=True) + result = service.resolve_alert(alert.conflict_id, approved=True, reviewer_id="admin-1", reason="test resolution") assert result is True assert alert.status == "approved" + assert alert.reviewer_id == "admin-1" + assert alert.review_reason == "test resolution" def test_get_suspicious_identities(self): service = FaceDetectionService() @@ -168,4 +170,94 @@ def test_multi_roll_number_conflict(self): is_valid, alert, _ = service.verify_identity(emb3, "R003") assert is_valid is False - assert len(alert.matched_roll_numbers) >= 1 \ No newline at end of file + assert len(alert.matched_roll_numbers) >= 1 + + def test_get_conflict_details(self): + service = FaceDetectionService() + emb1 = generate_embedding(base=0.5) + emb2 = create_similar_embedding(emb1, 0.92) + + service.add_embedding(emb1, "R001", "Alice") + _, alert, _ = service.verify_identity(emb2, "R002", "session-2") + + details = service.get_conflict_details(alert.conflict_id) + + assert details is not None + assert "conflict" in details + assert "prior_embeddings" in details + assert len(details["prior_embeddings"]) == 1 + assert details["prior_embeddings"][0]["roll_number"] == "R001" + assert details["prior_embeddings"][0]["student_name"] == "Alice" + + def test_admin_review_conflict(self): + service = FaceDetectionService() + emb1 = generate_embedding(base=0.5) + emb2 = create_similar_embedding(emb1, 0.92) + + service.verify_identity(emb1, "R001") + _, alert, _ = service.verify_identity(emb2, "R002") + + result = service.admin_review_conflict( + alert.conflict_id, + approved=True, + reviewer_id="admin-1", + reason="Verified as same person" + ) + + assert result is True + assert alert.status == "approved" + assert alert.reviewer_id == "admin-1" + assert alert.review_reason == "Verified as same person" + assert alert.review_timestamp is not None + + def test_admin_review_rejection(self): + service = FaceDetectionService() + emb1 = generate_embedding(base=0.5) + emb2 = create_similar_embedding(emb1, 0.92) + + service.verify_identity(emb1, "R001") + _, alert, _ = service.verify_identity(emb2, "R002") + + result = service.admin_review_conflict( + alert.conflict_id, + approved=False, + reviewer_id="admin-2", + reason="Different people, access denied" + ) + + assert result is True + assert alert.status == "rejected" + assert alert.reviewer_id == "admin-2" + + def test_get_override_log(self): + service = FaceDetectionService() + emb1 = generate_embedding(base=0.5) + emb2 = create_similar_embedding(emb1, 0.92) + emb3 = create_similar_embedding(emb1, 0.90) + + service.verify_identity(emb1, "R001") + _, alert1, _ = service.verify_identity(emb2, "R002") + _, alert2, _ = service.verify_identity(emb3, "R003") + + service.admin_review_conflict(alert1.conflict_id, True, "admin-1", "approved") + service.admin_review_conflict(alert2.conflict_id, False, "admin-2", "rejected") + + override_log = service.get_override_log() + + assert len(override_log) == 2 + assert override_log[0]["status"] in ("approved", "rejected") + assert override_log[0]["reviewer_id"] == "admin-1" + + def test_get_alert_by_id(self): + service = FaceDetectionService() + emb1 = generate_embedding(base=0.5) + emb2 = create_similar_embedding(emb1, 0.92) + + service.verify_identity(emb1, "R001") + _, alert, _ = service.verify_identity(emb2, "R002") + + found = service.get_alert_by_id(alert.conflict_id) + not_found = service.get_alert_by_id("nonexistent") + + assert found is alert + assert not_found is None \ No newline at end of file