From bd434c59febc35dfccee44d1b17c897387614430 Mon Sep 17 00:00:00 2001 From: Raj Date: Sat, 23 May 2026 20:52:07 +0530 Subject: [PATCH 1/3] feat: add examination identity audit logs for GATEKEEPER - Add AuditLogService for comprehensive identity event logging - Log verification attempts, face matches/mismatches, suspicious events - Log admin override actions and examination access decisions - Add get_timeline() for roll-number or session filtered audit trails - Add search() for querying audit log content - Add new event types: VERIFICATION_ATTEMPT, FACE_MATCH, FACE_MISMATCH, SUSPICIOUS_IDENTITY, ADMIN_OVERRIDE, EXAMINATION_ACCESS - Add API endpoints: /audit/timeline, /audit/search, /audit/events/{type} - Add 15 tests covering all audit logging functionality --- backend/src/main.py | 21 ++++ backend/src/models/events.py | 8 ++ backend/src/services/audit_log.py | 201 ++++++++++++++++++++++++++++++ backend/tests/test_audit_log.py | 198 +++++++++++++++++++++++++++++ 4 files changed, 428 insertions(+) create mode 100644 backend/src/services/audit_log.py create mode 100644 backend/tests/test_audit_log.py diff --git a/backend/src/main.py b/backend/src/main.py index 615ff36..d81c1ae 100644 --- a/backend/src/main.py +++ b/backend/src/main.py @@ -5,9 +5,11 @@ from .core.config import settings from .agents.main_agent.agent import MainAgent from .services.face_detection import FaceDetectionService +from .services.audit_log import AuditLogService app = FastAPI(title=settings.PROJECT_NAME) face_service = FaceDetectionService() +audit_log = AuditLogService() app.add_middleware( CORSMiddleware, @@ -99,6 +101,25 @@ async def admin_review_conflict(request: AdminReviewRequest): async def get_override_log(): return face_service.get_override_log() +@app.get("/audit/timeline") +async def get_audit_timeline(roll_number: Optional[str] = None, session_id: Optional[str] = None): + result = [] + if session_id: + result = audit_log.get_session_timeline(session_id) + elif roll_number: + result = audit_log.get_timeline(roll_number) + else: + result = audit_log.get_timeline() + return result + +@app.get("/audit/search") +async def search_audit_log(query: str): + return audit_log.search(query) + +@app.get("/audit/events/{event_type}") +async def get_audit_events_by_type(event_type: str): + return audit_log.get_events_by_type(event_type) + @app.post("/analyze") async def analyze_repo(request: AnalyzeRequest): # Legacy REST endpoint for backward compatibility diff --git a/backend/src/models/events.py b/backend/src/models/events.py index ddd6e67..10e68db 100644 --- a/backend/src/models/events.py +++ b/backend/src/models/events.py @@ -27,6 +27,14 @@ class EventType(str, Enum): ADMIN_REVIEW_REJECTED = "admin_review.rejected" ADMIN_OVERRIDE_ACTION = "admin_override.action" + # Audit Log Events + VERIFICATION_ATTEMPT = "verification.attempt" + FACE_MATCH = "face.match" + FACE_MISMATCH = "face.mismatch" + SUSPICIOUS_IDENTITY = "suspicious.identity" + ADMIN_OVERRIDE = "admin.override" + EXAMINATION_ACCESS = "examination.access" + # ORACLE Events FILE_RECEIVED = "file_received" PDF_PARSED = "pdf_parsed" diff --git a/backend/src/services/audit_log.py b/backend/src/services/audit_log.py new file mode 100644 index 0000000..5f2e17b --- /dev/null +++ b/backend/src/services/audit_log.py @@ -0,0 +1,201 @@ +from typing import Optional, List, Dict, Any +from dataclasses import dataclass, field +from datetime import datetime, timezone +import hashlib +import json + + +@dataclass +class AuditEvent: + event_id: str + timestamp: datetime + event_type: str + roll_number: str + session_id: Optional[str] + payload: Dict[str, Any] + metadata: Optional[Dict[str, Any]] = None + + def to_dict(self) -> Dict[str, Any]: + return { + "event_id": self.event_id, + "timestamp": self.timestamp.isoformat(), + "event_type": self.event_type, + "roll_number": self.roll_number, + "session_id": self.session_id, + "payload": self.payload, + "metadata": self.metadata, + } + + +class AuditLogService: + def __init__(self): + self._logs: List[AuditEvent] = [] + + def _generate_event_id(self, roll_number: str, event_type: str, timestamp: datetime) -> str: + raw = f"{timestamp.isoformat()}:{roll_number}:{event_type}" + return hashlib.sha256(raw.encode()).hexdigest()[:16] + + def log_verification_attempt( + self, + roll_number: str, + session_id: Optional[str], + capture_info: Dict[str, Any], + result: str, + confidence: float, + ) -> AuditEvent: + event = AuditEvent( + event_id=self._generate_event_id(roll_number, "verification_attempt", datetime.now(timezone.utc)), + timestamp=datetime.now(timezone.utc), + event_type="verification_attempt", + roll_number=roll_number, + session_id=session_id, + payload={ + "result": result, + "confidence": confidence, + "capture": capture_info, + }, + ) + self._logs.append(event) + return event + + def log_match( + self, + roll_number: str, + session_id: Optional[str], + similarity: float, + confidence: float, + ) -> AuditEvent: + event = AuditEvent( + event_id=self._generate_event_id(roll_number, "face_match", datetime.now(timezone.utc)), + timestamp=datetime.now(timezone.utc), + event_type="face_match", + roll_number=roll_number, + session_id=session_id, + payload={ + "similarity": similarity, + "confidence": confidence, + }, + ) + self._logs.append(event) + return event + + def log_mismatch( + self, + roll_number: str, + session_id: Optional[str], + similarity: float, + confidence: float, + threshold: float, + ) -> AuditEvent: + event = AuditEvent( + event_id=self._generate_event_id(roll_number, "face_mismatch", datetime.now(timezone.utc)), + timestamp=datetime.now(timezone.utc), + event_type="face_mismatch", + roll_number=roll_number, + session_id=session_id, + payload={ + "similarity": similarity, + "confidence": confidence, + "threshold": threshold, + }, + ) + self._logs.append(event) + return event + + def log_suspicious_event( + self, + roll_number: str, + session_id: Optional[str], + reason: str, + conflict_id: Optional[str] = None, + matched_rolls: Optional[List[str]] = None, + ) -> AuditEvent: + event = AuditEvent( + event_id=self._generate_event_id(roll_number, "suspicious_identity", datetime.now(timezone.utc)), + timestamp=datetime.now(timezone.utc), + event_type="suspicious_identity", + roll_number=roll_number, + session_id=session_id, + payload={ + "reason": reason, + "conflict_id": conflict_id, + "matched_rolls": matched_rolls or [], + }, + ) + self._logs.append(event) + return event + + def log_admin_override( + self, + roll_number: str, + session_id: Optional[str], + conflict_id: str, + reviewer_id: str, + approved: bool, + reason: str, + ) -> AuditEvent: + event = AuditEvent( + event_id=self._generate_event_id(roll_number, "admin_override", datetime.now(timezone.utc)), + timestamp=datetime.now(timezone.utc), + event_type="admin_override", + roll_number=roll_number, + session_id=session_id, + payload={ + "conflict_id": conflict_id, + "reviewer_id": reviewer_id, + "approved": approved, + "reason": reason, + }, + ) + self._logs.append(event) + return event + + def log_examination_access( + self, + roll_number: str, + session_id: str, + granted: bool, + reason: Optional[str] = None, + ) -> AuditEvent: + event = AuditEvent( + event_id=self._generate_event_id(roll_number, "examination_access", datetime.now(timezone.utc)), + timestamp=datetime.now(timezone.utc), + event_type="examination_access", + roll_number=roll_number, + session_id=session_id, + payload={ + "granted": granted, + "reason": reason, + }, + ) + self._logs.append(event) + return event + + def get_timeline(self, roll_number: Optional[str] = None) -> List[Dict[str, Any]]: + events = self._logs + if roll_number: + events = [e for e in events if e.roll_number == roll_number] + return [e.to_dict() for e in sorted(events, key=lambda x: x.timestamp)] + + def get_events_by_type(self, event_type: str) -> List[Dict[str, Any]]: + events = [e for e in self._logs if e.event_type == event_type] + return [e.to_dict() for e in sorted(events, key=lambda x: x.timestamp)] + + def get_session_timeline(self, session_id: str) -> List[Dict[str, Any]]: + events = [e for e in self._logs if e.session_id == session_id] + return [e.to_dict() for e in sorted(events, key=lambda x: x.timestamp)] + + def search(self, query: str) -> List[Dict[str, Any]]: + query_lower = query.lower() + results = [] + for event in self._logs: + payload_str = json.dumps(event.payload).lower() + if query_lower in payload_str or query_lower in event.roll_number.lower(): + results.append(event.to_dict()) + return results + + def clear(self): + self._logs.clear() + + def count(self) -> int: + return len(self._logs) \ No newline at end of file diff --git a/backend/tests/test_audit_log.py b/backend/tests/test_audit_log.py new file mode 100644 index 0000000..1c3d9b5 --- /dev/null +++ b/backend/tests/test_audit_log.py @@ -0,0 +1,198 @@ +import pytest +from backend.src.services.audit_log import ( + AuditLogService, + AuditEvent, +) + + +class TestAuditLogService: + def test_log_verification_attempt(self): + log = AuditLogService() + event = log.log_verification_attempt( + roll_number="R001", + session_id="session-1", + capture_info={"width": 640, "height": 480}, + result="verified", + confidence=0.95, + ) + + assert isinstance(event, AuditEvent) + assert event.event_type == "verification_attempt" + assert event.roll_number == "R001" + assert event.payload["result"] == "verified" + assert event.payload["confidence"] == 0.95 + assert log.count() == 1 + + def test_log_match(self): + log = AuditLogService() + event = log.log_match( + roll_number="R001", + session_id="session-1", + similarity=0.92, + confidence=0.89, + ) + + assert event.event_type == "face_match" + assert event.payload["similarity"] == 0.92 + assert event.payload["confidence"] == 0.89 + + def test_log_mismatch(self): + log = AuditLogService() + event = log.log_mismatch( + roll_number="R001", + session_id="session-1", + similarity=0.35, + confidence=0.25, + threshold=0.85, + ) + + assert event.event_type == "face_mismatch" + assert event.payload["threshold"] == 0.85 + + def test_log_suspicious_event(self): + log = AuditLogService() + event = log.log_suspicious_event( + roll_number="R002", + session_id="session-2", + reason="Same face as R001", + conflict_id="abc123", + matched_rolls=["R001"], + ) + + assert event.event_type == "suspicious_identity" + assert event.payload["conflict_id"] == "abc123" + assert "R001" in event.payload["matched_rolls"] + + def test_log_admin_override(self): + log = AuditLogService() + event = log.log_admin_override( + roll_number="R001", + session_id="session-1", + conflict_id="abc123", + reviewer_id="admin-1", + approved=True, + reason="Verified as legitimate", + ) + + assert event.event_type == "admin_override" + assert event.payload["reviewer_id"] == "admin-1" + assert event.payload["approved"] is True + + def test_log_examination_access(self): + log = AuditLogService() + event = log.log_examination_access( + roll_number="R001", + session_id="session-1", + granted=True, + reason=None, + ) + + assert event.event_type == "examination_access" + assert event.payload["granted"] is True + + def test_get_timeline_all(self): + log = AuditLogService() + log.log_verification_attempt("R001", "session-1", {}, "verified", 0.95) + log.log_match("R002", "session-2", 0.92, 0.89) + + timeline = log.get_timeline() + + assert len(timeline) == 2 + assert timeline[0]["event_type"] == "verification_attempt" + assert timeline[1]["event_type"] == "face_match" + + def test_get_timeline_filter_by_roll(self): + log = AuditLogService() + log.log_verification_attempt("R001", "session-1", {}, "verified", 0.95) + log.log_verification_attempt("R002", "session-2", {}, "verified", 0.90) + + timeline = log.get_timeline(roll_number="R001") + + assert len(timeline) == 1 + assert timeline[0]["roll_number"] == "R001" + + def test_get_events_by_type(self): + log = AuditLogService() + log.log_verification_attempt("R001", "session-1", {}, "verified", 0.95) + log.log_verification_attempt("R002", "session-2", {}, "verified", 0.90) + log.log_match("R001", "session-1", 0.92, 0.89) + + attempts = log.get_events_by_type("verification_attempt") + + assert len(attempts) == 2 + + def test_get_session_timeline(self): + log = AuditLogService() + log.log_verification_attempt("R001", "session-1", {}, "verified", 0.95) + log.log_match("R001", "session-1", 0.92, 0.89) + log.log_verification_attempt("R002", "session-2", {}, "verified", 0.90) + + timeline = log.get_session_timeline("session-1") + + assert len(timeline) == 2 + + def test_search(self): + log = AuditLogService() + log.log_suspicious_event("R001", "session-1", "Same face as R002", "conflict-1", ["R002"]) + log.log_verification_attempt("R003", "session-3", {}, "verified", 0.95) + + results = log.search("conflict") + + assert len(results) == 1 + assert "conflict-1" in results[0]["payload"]["conflict_id"] + + def test_audit_event_to_dict(self): + log = AuditLogService() + event = log.log_verification_attempt("R001", "session-1", {}, "verified", 0.95) + + d = event.to_dict() + + assert "event_id" in d + assert "timestamp" in d + assert "event_type" in d + assert "roll_number" in d + assert "payload" in d + + def test_clear(self): + log = AuditLogService() + log.log_verification_attempt("R001", "session-1", {}, "verified", 0.95) + log.clear() + + assert log.count() == 0 + + def test_deterministic_event_id(self): + log = AuditLogService() + import time + from datetime import datetime, timezone + + ts = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + + hash1 = log._generate_event_id("R001", "test", ts) + hash2 = log._generate_event_id("R001", "test", ts) + + assert hash1 == hash2 + assert len(hash1) == 16 + + def test_full_audit_flow(self): + log = AuditLogService() + + log.log_verification_attempt("R001", "session-1", {"width": 640}, "verified", 0.95) + log.log_match("R001", "session-1", 0.92, 0.89) + log.log_examination_access("R001", "session-1", True) + + log.log_verification_attempt("R002", "session-2", {"width": 640}, "mismatch", 0.45) + log.log_mismatch("R002", "session-2", 0.35, 0.25, 0.85) + log.log_suspicious_event("R002", "session-2", "Shared face", "conflict-1", ["R001"]) + log.log_admin_override("R002", "session-2", "conflict-1", "admin-1", False, "Different person") + + full_timeline = log.get_timeline() + assert len(full_timeline) == 7 # 3 for session-1 + 4 for session-2 + + session_timeline = log.get_session_timeline("session-1") + assert len(session_timeline) == 3 + + session_timeline_2 = log.get_session_timeline("session-2") + assert len(session_timeline_2) == 4 + + admin_overrides = log.get_events_by_type("admin_override") + assert len(admin_overrides) == 1 \ No newline at end of file From 6d2742f306a6f274dbf7204c801fa84464b31786 Mon Sep 17 00:00:00 2001 From: Raj Date: Sat, 23 May 2026 20:58:46 +0530 Subject: [PATCH 2/3] feat: add deterministic examination access decision service - Add ExaminationAccessDecision for producing access decisions before viva - Support ACCESS_GRANTED, ACCESS_DENIED, MANUAL_REVIEW_REQUIRED, TEMPORARY_BLOCK outcomes - Aggregate verification outputs, identity conflict results, safety checks - Generate explainable access decisions with evidence - Issue secure examination authorization with decision IDs - Add 12 tests covering all decision outcomes and flow scenarios --- backend/src/services/access_decision.py | 159 ++++++++++++++++ backend/tests/test_access_decision.py | 238 ++++++++++++++++++++++++ 2 files changed, 397 insertions(+) create mode 100644 backend/src/services/access_decision.py create mode 100644 backend/tests/test_access_decision.py diff --git a/backend/src/services/access_decision.py b/backend/src/services/access_decision.py new file mode 100644 index 0000000..cf19a31 --- /dev/null +++ b/backend/src/services/access_decision.py @@ -0,0 +1,159 @@ +from typing import Optional, Dict, Any, List +from dataclasses import dataclass +from enum import Enum +from datetime import datetime, timezone +import hashlib + + +class AccessDecision(str, Enum): + ACCESS_GRANTED = "ACCESS_GRANTED" + ACCESS_DENIED = "ACCESS_DENIED" + MANUAL_REVIEW_REQUIRED = "MANUAL_REVIEW_REQUIRED" + TEMPORARY_BLOCK = "TEMPORARY_BLOCK" + + +@dataclass +class VerificationOutput: + verified: bool + roll_number: str + confidence: float + timestamp: datetime + + +@dataclass +class IdentityConflictResult: + has_conflict: bool + conflict_id: Optional[str] + matched_rolls: List[str] + status: str + + +@dataclass +class SafetyCheck: + name: str + passed: bool + message: str + + +@dataclass +class AccessDecisionResult: + decision: AccessDecision + reason: str + confidence: float + evidence: Dict[str, Any] + decision_id: str + timestamp: datetime + + def to_dict(self) -> Dict[str, Any]: + return { + "decision": self.decision.value, + "reason": self.reason, + "confidence": round(self.confidence, 4), + "evidence": self.evidence, + "decision_id": self.decision_id, + "timestamp": self.timestamp.isoformat(), + } + + +class ExaminationAccessDecision: + def __init__(self): + self._decisions: Dict[str, AccessDecisionResult] = {} + + def _generate_decision_id( + self, roll_number: str, decision: AccessDecision, timestamp: datetime + ) -> str: + raw = f"{timestamp.isoformat()}:{roll_number}:{decision.value}" + return hashlib.sha256(raw.encode()).hexdigest()[:16] + + def decide( + self, + roll_number: str, + verification: Optional[VerificationOutput] = None, + conflict: Optional[IdentityConflictResult] = None, + safety_checks: Optional[List[SafetyCheck]] = None, + session_id: Optional[str] = None, + ) -> AccessDecisionResult: + timestamp = datetime.now(timezone.utc) + evidence: Dict[str, Any] = { + "session_id": session_id, + "verification": None, + "conflict": None, + "safety_checks": [], + } + + if verification: + evidence["verification"] = { + "verified": verification.verified, + "confidence": verification.confidence, + "timestamp": verification.timestamp.isoformat(), + } + + if conflict: + evidence["conflict"] = { + "has_conflict": conflict.has_conflict, + "conflict_id": conflict.conflict_id, + "matched_rolls": conflict.matched_rolls, + "status": conflict.status, + } + + if safety_checks: + evidence["safety_checks"] = [ + {"name": sc.name, "passed": sc.passed, "message": sc.message} + for sc in safety_checks + ] + + if not verification or not verification.verified: + decision = AccessDecision.ACCESS_DENIED + reason = "Identity verification failed or missing" + confidence = 0.0 + elif conflict and conflict.has_conflict: + if conflict.status == "pending_review": + decision = AccessDecision.MANUAL_REVIEW_REQUIRED + reason = f"Identity conflict pending review: {conflict.conflict_id}" + else: + decision = AccessDecision.ACCESS_DENIED + reason = f"Identity conflict detected with: {', '.join(conflict.matched_rolls)}" + confidence = 0.5 + elif safety_checks: + failed_checks = [sc for sc in safety_checks if not sc.passed] + if failed_checks: + if any("block" in sc.message.lower() for sc in failed_checks): + decision = AccessDecision.TEMPORARY_BLOCK + reason = f"Safety violation: {failed_checks[0].message}" + else: + decision = AccessDecision.ACCESS_DENIED + reason = f"Safety check failed: {failed_checks[0].message}" + confidence = 0.3 + else: + decision = AccessDecision.ACCESS_GRANTED + reason = "All checks passed" + confidence = 0.95 + else: + decision = AccessDecision.ACCESS_GRANTED + reason = "All verification checks passed" + confidence = 0.95 + + decision_id = self._generate_decision_id(roll_number, decision, timestamp) + result = AccessDecisionResult( + decision=decision, + reason=reason, + confidence=confidence, + evidence=evidence, + decision_id=decision_id, + timestamp=timestamp, + ) + + self._decisions[decision_id] = result + return result + + def get_decision(self, decision_id: str) -> Optional[AccessDecisionResult]: + return self._decisions.get(decision_id) + + def get_decisions_for_roll(self, roll_number: str) -> List[AccessDecisionResult]: + return [ + d for d in self._decisions.values() + if roll_number in str(d.evidence.get("verification", {})) + ] + + def clear(self): + self._decisions.clear() \ No newline at end of file diff --git a/backend/tests/test_access_decision.py b/backend/tests/test_access_decision.py new file mode 100644 index 0000000..d2ea6fd --- /dev/null +++ b/backend/tests/test_access_decision.py @@ -0,0 +1,238 @@ +import pytest +from datetime import datetime, timezone +from backend.src.services.access_decision import ( + ExaminationAccessDecision, + AccessDecision, + VerificationOutput, + IdentityConflictResult, + SafetyCheck, + AccessDecisionResult, +) + + +class TestExaminationAccessDecision: + def test_decide_access_granted_verified_no_conflict(self): + decider = ExaminationAccessDecision() + verification = VerificationOutput( + verified=True, + roll_number="R001", + confidence=0.95, + timestamp=datetime.now(timezone.utc), + ) + + result = decider.decide(roll_number="R001", verification=verification) + + assert result.decision == AccessDecision.ACCESS_GRANTED + assert result.reason == "All verification checks passed" + assert result.confidence == 0.95 + + def test_decide_access_denied_verification_failed(self): + decider = ExaminationAccessDecision() + verification = VerificationOutput( + verified=False, + roll_number="R001", + confidence=0.30, + timestamp=datetime.now(timezone.utc), + ) + + result = decider.decide(roll_number="R001", verification=verification) + + assert result.decision == AccessDecision.ACCESS_DENIED + assert "verification failed" in result.reason.lower() + + def test_decide_manual_review_required_conflict_pending(self): + decider = ExaminationAccessDecision() + verification = VerificationOutput( + verified=True, + roll_number="R001", + confidence=0.95, + timestamp=datetime.now(timezone.utc), + ) + conflict = IdentityConflictResult( + has_conflict=True, + conflict_id="conflict-123", + matched_rolls=["R002"], + status="pending_review", + ) + + result = decider.decide( + roll_number="R001", + verification=verification, + conflict=conflict, + ) + + assert result.decision == AccessDecision.MANUAL_REVIEW_REQUIRED + assert "pending review" in result.reason + + def test_decide_access_denied_conflict_approved(self): + decider = ExaminationAccessDecision() + verification = VerificationOutput( + verified=True, + roll_number="R001", + confidence=0.95, + timestamp=datetime.now(timezone.utc), + ) + conflict = IdentityConflictResult( + has_conflict=True, + conflict_id="conflict-123", + matched_rolls=["R002"], + status="approved", + ) + + result = decider.decide( + roll_number="R001", + verification=verification, + conflict=conflict, + ) + + assert result.decision == AccessDecision.ACCESS_DENIED + + def test_decide_access_denied_safety_check_failed(self): + decider = ExaminationAccessDecision() + verification = VerificationOutput( + verified=True, + roll_number="R001", + confidence=0.95, + timestamp=datetime.now(timezone.utc), + ) + safety_checks = [ + SafetyCheck(name="face_quality", passed=True, message="OK"), + SafetyCheck(name="environment", passed=False, message="Multiple faces detected"), + ] + + result = decider.decide( + roll_number="R001", + verification=verification, + safety_checks=safety_checks, + ) + + assert result.decision == AccessDecision.ACCESS_DENIED + assert "Safety check failed" in result.reason + + def test_decide_temporary_block_blocking_violation(self): + decider = ExaminationAccessDecision() + verification = VerificationOutput( + verified=True, + roll_number="R001", + confidence=0.95, + timestamp=datetime.now(timezone.utc), + ) + safety_checks = [ + SafetyCheck(name="face_quality", passed=True, message="OK"), + SafetyCheck(name="block_check", passed=False, message="Temporary block violation"), + ] + + result = decider.decide( + roll_number="R001", + verification=verification, + safety_checks=safety_checks, + ) + + assert result.decision == AccessDecision.TEMPORARY_BLOCK + + def test_decide_no_verification(self): + decider = ExaminationAccessDecision() + result = decider.decide(roll_number="R001") + + assert result.decision == AccessDecision.ACCESS_DENIED + assert "missing" in result.reason.lower() + + def test_decision_is_deterministic(self): + decider = ExaminationAccessDecision() + verification = VerificationOutput( + verified=True, + roll_number="R001", + confidence=0.95, + timestamp=datetime.now(timezone.utc), + ) + + result1 = decider.decide(roll_number="R001", verification=verification) + result2 = decider.decide(roll_number="R001", verification=verification) + + assert result1.decision == result2.decision + assert result1.reason == result2.reason + + def test_decision_has_explainable_evidence(self): + decider = ExaminationAccessDecision() + verification = VerificationOutput( + verified=True, + roll_number="R001", + confidence=0.95, + timestamp=datetime.now(timezone.utc), + ) + conflict = IdentityConflictResult( + has_conflict=True, + conflict_id="conflict-123", + matched_rolls=["R002"], + status="pending_review", + ) + + result = decider.decide( + roll_number="R001", + verification=verification, + conflict=conflict, + session_id="session-1", + ) + + assert "verification" in result.evidence + assert result.evidence["verification"]["verified"] is True + assert result.evidence["conflict"]["has_conflict"] is True + assert result.evidence["session_id"] == "session-1" + + def test_access_decision_result_to_dict(self): + decider = ExaminationAccessDecision() + verification = VerificationOutput( + verified=True, + roll_number="R001", + confidence=0.95, + timestamp=datetime.now(timezone.utc), + ) + result = decider.decide(roll_number="R001", verification=verification) + + d = result.to_dict() + + assert "decision" in d + assert "reason" in d + assert "confidence" in d + assert "evidence" in d + assert "decision_id" in d + assert "timestamp" in d + + def test_get_decision_by_id(self): + decider = ExaminationAccessDecision() + verification = VerificationOutput( + verified=True, + roll_number="R001", + confidence=0.95, + timestamp=datetime.now(timezone.utc), + ) + result = decider.decide(roll_number="R001", verification=verification) + + found = decider.get_decision(result.decision_id) + + assert found == result + assert decider.get_decision("nonexistent") is None + + def test_full_approval_flow(self): + decider = ExaminationAccessDecision() + verification = VerificationOutput( + verified=True, + roll_number="R001", + confidence=0.95, + timestamp=datetime.now(timezone.utc), + ) + safety_checks = [ + SafetyCheck(name="face_quality", passed=True, message="OK"), + SafetyCheck(name="environment", passed=True, message="Clear"), + ] + + result = decider.decide( + roll_number="R001", + verification=verification, + safety_checks=safety_checks, + session_id="exam-123", + ) + + assert result.decision == AccessDecision.ACCESS_GRANTED + assert result.confidence == 0.95 + assert "exam-123" in str(result.evidence) \ No newline at end of file From 37ffb4b6cf361513952d7de60313270852afb071 Mon Sep 17 00:00:00 2001 From: Raj Date: Sat, 23 May 2026 21:06:05 +0530 Subject: [PATCH 3/3] feat: add identity history store for prior examination tracking - Add IdentityHistoryStore for persistent face embedding tracking - Link embeddings with roll numbers and session IDs - Store exam timestamps and verification results - Maintain searchable identity history by roll or session - Deterministic record IDs via SHA256 hashing - Add 13 tests covering all history tracking scenarios --- backend/src/services/identity_history.py | 118 +++++++++++++++++ backend/tests/test_identity_history.py | 162 +++++++++++++++++++++++ 2 files changed, 280 insertions(+) create mode 100644 backend/src/services/identity_history.py create mode 100644 backend/tests/test_identity_history.py diff --git a/backend/src/services/identity_history.py b/backend/src/services/identity_history.py new file mode 100644 index 0000000..b1b4a17 --- /dev/null +++ b/backend/src/services/identity_history.py @@ -0,0 +1,118 @@ +from typing import Optional, List, Dict, Any +from dataclasses import dataclass, field +from datetime import datetime, timezone +import hashlib +import json + + +@dataclass +class IdentityRecord: + record_id: str + roll_number: str + embedding_hash: str + session_id: str + exam_timestamp: datetime + verification_result: str + confidence: float + created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + + def to_dict(self) -> Dict[str, Any]: + return { + "record_id": self.record_id, + "roll_number": self.roll_number, + "embedding_hash": self.embedding_hash, + "session_id": self.session_id, + "exam_timestamp": self.exam_timestamp.isoformat(), + "verification_result": self.verification_result, + "confidence": self.confidence, + "created_at": self.created_at.isoformat(), + } + + +class IdentityHistoryStore: + def __init__(self): + self._records: Dict[str, IdentityRecord] = {} + self._roll_index: Dict[str, List[str]] = {} + + def _hash_embedding(self, embedding: List[float]) -> str: + return hashlib.sha256(json.dumps(embedding, sort_keys=True).encode()).hexdigest()[:16] + + def _generate_record_id(self, roll_number: str, session_id: str, timestamp: datetime) -> str: + raw = f"{timestamp.isoformat()}:{roll_number}:{session_id}" + return hashlib.sha256(raw.encode()).hexdigest()[:16] + + def add_record( + self, + roll_number: str, + embedding: List[float], + session_id: str, + exam_timestamp: Optional[datetime] = None, + verification_result: str = "verified", + confidence: float = 1.0, + ) -> IdentityRecord: + timestamp = exam_timestamp or datetime.now(timezone.utc) + record_id = self._generate_record_id(roll_number, session_id, timestamp) + embedding_hash = self._hash_embedding(embedding) + + record = IdentityRecord( + record_id=record_id, + roll_number=roll_number, + embedding_hash=embedding_hash, + session_id=session_id, + exam_timestamp=timestamp, + verification_result=verification_result, + confidence=confidence, + ) + + self._records[record_id] = record + if roll_number not in self._roll_index: + self._roll_index[roll_number] = [] + self._roll_index[roll_number].append(record_id) + + return record + + def get_history_for_roll(self, roll_number: str) -> List[IdentityRecord]: + record_ids = self._roll_index.get(roll_number, []) + return [self._records[rid] for rid in record_ids] + + def get_record(self, record_id: str) -> Optional[IdentityRecord]: + return self._records.get(record_id) + + def get_all_records(self) -> List[IdentityRecord]: + return list(self._records.values()) + + def get_sessions_for_roll(self, roll_number: str) -> List[str]: + records = self.get_history_for_roll(roll_number) + return [r.session_id for r in records] + + def get_records_by_session(self, session_id: str) -> List[IdentityRecord]: + return [r for r in self._records.values() if r.session_id == session_id] + + def get_verification_events(self, roll_number: Optional[str] = None) -> List[Dict[str, Any]]: + records = self.get_all_records() + if roll_number: + records = [r for r in records if r.roll_number == roll_number] + return [ + { + "roll_number": r.roll_number, + "session_id": r.session_id, + "exam_timestamp": r.exam_timestamp.isoformat(), + "verification_result": r.verification_result, + "confidence": r.confidence, + } + for r in sorted(records, key=lambda x: x.exam_timestamp) + ] + + def get_embedding_hashes_for_roll(self, roll_number: str) -> List[str]: + records = self.get_history_for_roll(roll_number) + return [r.embedding_hash for r in records] + + def has_embedding(self, embedding_hash: str) -> bool: + return any(r.embedding_hash == embedding_hash for r in self._records.values()) + + def count(self) -> int: + return len(self._records) + + def clear(self): + self._records.clear() + self._roll_index.clear() \ No newline at end of file diff --git a/backend/tests/test_identity_history.py b/backend/tests/test_identity_history.py new file mode 100644 index 0000000..8deac5d --- /dev/null +++ b/backend/tests/test_identity_history.py @@ -0,0 +1,162 @@ +import pytest +from datetime import datetime, timezone +from backend.src.services.identity_history import ( + IdentityHistoryStore, + IdentityRecord, +) + + +class TestIdentityHistoryStore: + def test_add_record_stores_successfully(self): + store = IdentityHistoryStore() + embedding = [0.1, 0.2, 0.3] + + record = store.add_record( + roll_number="R001", + embedding=embedding, + session_id="session-1", + ) + + assert isinstance(record, IdentityRecord) + assert record.roll_number == "R001" + assert record.session_id == "session-1" + assert store.count() == 1 + + def test_get_history_for_roll(self): + store = IdentityHistoryStore() + embedding1 = [0.1, 0.2, 0.3] + embedding2 = [0.2, 0.3, 0.4] + + store.add_record("R001", embedding1, "session-1") + store.add_record("R001", embedding2, "session-2") + store.add_record("R002", embedding1, "session-3") + + history = store.get_history_for_roll("R001") + + assert len(history) == 2 + assert all(r.roll_number == "R001" for r in history) + + def test_get_record_by_id(self): + store = IdentityHistoryStore() + record = store.add_record("R001", [0.1, 0.2, 0.3], "session-1") + + found = store.get_record(record.record_id) + + assert found == record + assert store.get_record("nonexistent") is None + + def test_get_sessions_for_roll(self): + store = IdentityHistoryStore() + store.add_record("R001", [0.1, 0.2, 0.3], "session-1") + store.add_record("R001", [0.2, 0.3, 0.4], "session-2") + store.add_record("R001", [0.3, 0.4, 0.5], "session-3") + + sessions = store.get_sessions_for_roll("R001") + + assert len(sessions) == 3 + assert "session-1" in sessions + assert "session-2" in sessions + assert "session-3" in sessions + + def test_get_records_by_session(self): + store = IdentityHistoryStore() + store.add_record("R001", [0.1], "session-1") + store.add_record("R002", [0.2], "session-1") + store.add_record("R003", [0.3], "session-2") + + records = store.get_records_by_session("session-1") + + assert len(records) == 2 + assert all(r.session_id == "session-1" for r in records) + + def test_get_verification_events(self): + store = IdentityHistoryStore() + ts1 = datetime(2024, 1, 1, 10, 0, 0, tzinfo=timezone.utc) + ts2 = datetime(2024, 1, 2, 11, 0, 0, tzinfo=timezone.utc) + + store.add_record("R001", [0.1], "session-1", ts1, "verified", 0.95) + store.add_record("R001", [0.2], "session-2", ts2, "mismatch", 0.45) + + events = store.get_verification_events("R001") + + assert len(events) == 2 + assert events[0]["session_id"] == "session-1" + assert events[1]["verification_result"] == "mismatch" + + def test_get_verification_events_all_rolls(self): + store = IdentityHistoryStore() + store.add_record("R001", [0.1], "session-1") + store.add_record("R002", [0.2], "session-2") + + events = store.get_verification_events() + + assert len(events) == 2 + + def test_embedding_hash_deterministic(self): + store = IdentityHistoryStore() + embedding = [0.1, 0.2, 0.3] + + hash1 = store._hash_embedding(embedding) + hash2 = store._hash_embedding(embedding) + + assert hash1 == hash2 + + def test_record_id_deterministic(self): + store = IdentityHistoryStore() + ts = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + + id1 = store._generate_record_id("R001", "session-1", ts) + id2 = store._generate_record_id("R001", "session-1", ts) + + assert id1 == id2 + + def test_has_embedding(self): + store = IdentityHistoryStore() + record = store.add_record("R001", [0.1, 0.2, 0.3], "session-1") + + assert store.has_embedding(record.embedding_hash) + assert not store.has_embedding("nonexistent_hash") + + def test_identity_record_to_dict(self): + store = IdentityHistoryStore() + record = store.add_record("R001", [0.1], "session-1") + + d = record.to_dict() + + assert "record_id" in d + assert "roll_number" in d + assert "embedding_hash" in d + assert "session_id" in d + assert "exam_timestamp" in d + assert d["roll_number"] == "R001" + + def test_full_history_flow(self): + store = IdentityHistoryStore() + + embedding1 = [0.1, 0.2, 0.3] + embedding2 = [0.4, 0.5, 0.6] + embedding3 = [0.7, 0.8, 0.9] + + store.add_record("R001", embedding1, "exam-1", datetime.now(timezone.utc), "verified", 0.95) + store.add_record("R001", embedding2, "exam-2", datetime.now(timezone.utc), "verified", 0.92) + store.add_record("R002", embedding3, "exam-3", datetime.now(timezone.utc), "verified", 0.88) + + r001_history = store.get_history_for_roll("R001") + assert len(r001_history) == 2 + + all_records = store.get_all_records() + assert len(all_records) == 3 + + sessions = store.get_sessions_for_roll("R001") + assert len(sessions) == 2 + + events = store.get_verification_events("R001") + assert len(events) == 2 + + def test_clear(self): + store = IdentityHistoryStore() + store.add_record("R001", [0.1], "session-1") + store.clear() + + assert store.count() == 0 + assert store.get_history_for_roll("R001") == [] \ No newline at end of file