diff --git a/backend/src/agents/gatekeeper/__init__.py b/backend/src/agents/gatekeeper/__init__.py index 139e06c..c576dc4 100644 --- a/backend/src/agents/gatekeeper/__init__.py +++ b/backend/src/agents/gatekeeper/__init__.py @@ -1,3 +1,9 @@ from .agent import GatekeeperAgent +from .pipeline.gatekeeper_pipeline import GatekeeperPipeline +from .pipeline.pipeline_result import PipelineResult -__all__ = ["GatekeeperAgent"] +__all__ = [ + "GatekeeperAgent", + "GatekeeperPipeline", + "PipelineResult", +] diff --git a/backend/src/agents/gatekeeper/agent.py b/backend/src/agents/gatekeeper/agent.py index 9ad984a..ede74d7 100644 --- a/backend/src/agents/gatekeeper/agent.py +++ b/backend/src/agents/gatekeeper/agent.py @@ -1,108 +1,56 @@ -""" -GatekeeperAgent — Identity, Session, and Student Registry gate. - -Entry point for all student identity verification in the EL system. -Consumes the StudentRegistry to validate roll numbers before any session -is allowed to proceed to ORACLE analysis. -""" - -from typing import Any, Dict, Optional +from typing import Any, Dict from src.agents.base import BaseAgent -from .registry.registry_store import StudentRegistry, SAMPLE_STUDENTS -from .registry.lookup import RegistryLookup, LookupResult, LookupFailureReason +from src.models.events import EventType + +from .pipeline.gatekeeper_pipeline import GatekeeperPipeline class GatekeeperAgent(BaseAgent): - def __init__(self, registry: Optional[StudentRegistry] = None): + def __init__(self): super().__init__(name="GatekeeperAgent") - # Use injected registry (for testing) or create and seed the default one - if registry is not None: - self._registry = registry - else: - self._registry = StudentRegistry() - self._registry.seed(SAMPLE_STUDENTS) - self._lookup = RegistryLookup(self._registry) + # Global pipeline instance so history store persists across requests + self._pipeline = GatekeeperPipeline() - async def process( - self, - session_id: str, - input_data: Dict[str, Any], - log_callback=None, - ) -> Dict[str, Any]: + async def process(self, session_id: str, input_data: Dict[str, Any], log_callback=None) -> Dict[str, Any]: """ - Validate student identity before allowing session to proceed. - - Expected input_data keys: - - roll_number (str): Student roll number to verify - - [any other session fields passed through] - - Returns input_data enriched with: - - gatekeeper_status: "verified" | "rejected" - - gatekeeper_reason: failure reason string (if rejected) - - student_profile: plain dict of StudentProfile (if verified) + Gatekeeper Identity & Session Agent. + Runs the full verification pipeline (Roll + Face + Conflict). """ - async def send_log(msg: str, log_type: str = "info"): + async def send_log(msg: str, type: str = "info"): if log_callback: - await log_callback({"message": msg, "type": log_type}) - - self.log_info(f"Gatekeeper processing session {session_id}") - await send_log("[Gatekeeper] Identity verification started.", "info") - - roll_number: str = input_data.get("roll_number", "") - - # ── Roll-number lookup ──────────────────────────────────────────────── - result: LookupResult = self._lookup.by_roll_number(roll_number) - - if result.success: - self.log_info( - f"Student verified: {result.profile.full_name} ({result.roll_number})" - ) - await send_log( - f"[Gatekeeper] ✅ Student verified: {result.profile.full_name}", "success" - ) - self.emit_event( - session_id, "AGENT_PROGRESS", - { - "agent": "Gatekeeper", - "status": "complete", - "milestone": f"Identity Verified: {result.roll_number}", - }, - ) - return { - **input_data, - "gatekeeper_status": "verified", - "gatekeeper_reason": None, - "student_profile": result.profile.to_dict(), - } - - # ── Rejection ───────────────────────────────────────────────────────── - self.log_info( - f"Gatekeeper rejected '{roll_number}': {result.failure_reason} — {result.message}" - ) - await send_log( - f"[Gatekeeper] ❌ Rejected: {result.message}", "error" - ) - self.emit_event( - session_id, "AGENT_PROGRESS", - { - "agent": "Gatekeeper", - "status": "failed", - "milestone": f"Identity Rejected: {result.failure_reason}", - }, - ) - return { - **input_data, - "gatekeeper_status": "rejected", - "gatekeeper_reason": result.failure_reason.value if result.failure_reason else "unknown", - "student_profile": None, - } - - @property - def registry(self) -> StudentRegistry: - """Expose the registry for direct queries (e.g. batch listing).""" - return self._registry - - @property - def lookup(self) -> RegistryLookup: - """Expose the lookup service for external callers.""" - return self._lookup + await log_callback({"message": msg, "type": type}) + + self.log_info(f"Gatekeeper running verification for session {session_id}.") + await send_log("[Gatekeeper] Starting End-to-End Verification Pipeline...", "info") + + # ── 1. Extract inputs ───────────────────────────────────────────────── + # Typically provided by frontend/client API. + raw_roll = input_data.get("roll_number", "") + raw_face = input_data.get("face_id", "") + + # ── 2. Run Pipeline ─────────────────────────────────────────────────── + result = self._pipeline.run(raw_roll, raw_face) + + # ── 3. Emit Events & Logs ───────────────────────────────────────────── + decision = result.access_decision + + if result.is_admitted: + await send_log(f"[Gatekeeper] Identity Verified: {decision.student_name} ({decision.roll_number})", "success") + self.emit_event(session_id, EventType.IDENTITY_VERIFIED, {"roll_number": decision.roll_number}) + else: + await send_log(f"[Gatekeeper] Access {decision.decision.value.upper()}: {decision.reasons[0]}", "error") + + self.emit_event(session_id, EventType.AGENT_PROGRESS, { + "agent": "Gatekeeper", + "status": "complete", + "milestone": "Pipeline Executed", + "decision": decision.decision.value, + }) + + # ── 4. Return Output ────────────────────────────────────────────────── + # Append pipeline results into the original input_data dictionary so + # Oracle or downstream agents can consume it. + output_data = input_data.copy() + output_data["gatekeeper_result"] = result.to_dict() + + return output_data diff --git a/backend/src/agents/gatekeeper/authorization/__init__.py b/backend/src/agents/gatekeeper/authorization/__init__.py new file mode 100644 index 0000000..88043f5 --- /dev/null +++ b/backend/src/agents/gatekeeper/authorization/__init__.py @@ -0,0 +1,14 @@ +""" +authorization/__init__.py +───────────────────────── +Public surface of the GATEKEEPER authorization package. +""" + +from .access_decision import AccessDecision, DecisionStatus +from .auth_engine import AuthorizationEngine + +__all__ = [ + "AccessDecision", + "DecisionStatus", + "AuthorizationEngine", +] diff --git a/backend/src/agents/gatekeeper/authorization/access_decision.py b/backend/src/agents/gatekeeper/authorization/access_decision.py new file mode 100644 index 0000000..9b9bf77 --- /dev/null +++ b/backend/src/agents/gatekeeper/authorization/access_decision.py @@ -0,0 +1,64 @@ +""" +access_decision.py +────────────────── +Final output of the GATEKEEPER Authorization Engine. + +Rules +───── +- Frozen dataclass — immutable after creation. +- Fully serializable. +- Contains the final go/no-go decision and the audit trail explaining why. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, Dict, List + + +class DecisionStatus(str, Enum): + GRANTED = "granted" + DENIED = "denied" + PENDING_ADMIN_REVIEW = "pending_admin_review" + + +@dataclass(frozen=True) +class AccessDecision: + """ + Final authorization decision for a viva session request. + + Fields + ────── + decision : GRANTED, DENIED, or PENDING_ADMIN_REVIEW. + roll_number : The requested roll number (or raw input if invalid). + student_name : The resolved student name (if available). + reasons : List of human-readable explanations for the decision. + requires_admin_review : True if the session is halted pending staff override. + audit_trail : Dump of all pipeline stage results for logging. + """ + decision: DecisionStatus + roll_number: str + student_name: str + reasons: List[str] = field(default_factory=list) + requires_admin_review: bool = False + audit_trail: Dict[str, Any] = field(default_factory=dict) + + @property + def is_granted(self) -> bool: + return self.decision == DecisionStatus.GRANTED + + @property + def is_denied(self) -> bool: + return self.decision == DecisionStatus.DENIED + + def to_dict(self) -> dict: + return { + "decision": self.decision.value, + "roll_number": self.roll_number, + "student_name": self.student_name, + "reasons": self.reasons, + "requires_admin_review": self.requires_admin_review, + "is_granted": self.is_granted, + "audit_trail": self.audit_trail, + } diff --git a/backend/src/agents/gatekeeper/authorization/auth_engine.py b/backend/src/agents/gatekeeper/authorization/auth_engine.py new file mode 100644 index 0000000..f211070 --- /dev/null +++ b/backend/src/agents/gatekeeper/authorization/auth_engine.py @@ -0,0 +1,145 @@ +""" +auth_engine.py +────────────── +Authorization Engine for the GATEKEEPER pipeline. + +Responsibilities +──────────────── +- Consume the results of all prior pipeline stages (Roll verification, Face verification, Conflict detection). +- Apply final business rules to determine if the session should be GRANTED, DENIED, or PENDING_ADMIN_REVIEW. +- Compile the final AccessDecision with a complete audit trail. + +Rules +───── +- Pure logic — evaluates inputs and emits a decision. +- Deterministic and auditable. +- Never raises. +""" + +from __future__ import annotations + +import logging +from typing import Any, Dict + +from src.agents.gatekeeper.conflict_detection.conflict_detector import ConflictReport, ConflictSeverity +from src.agents.gatekeeper.face_verification.face_result import FaceVerificationResult +from src.agents.gatekeeper.roll_verification.flow import VerificationFlowResult +from .access_decision import AccessDecision, DecisionStatus + +logger = logging.getLogger(__name__) + + +class AuthorizationEngine: + """ + Final decision maker for the GATEKEEPER pipeline. + + Rules: + 1. If roll verification failed (invalid, not found, inactive) -> DENIED. + 2. If conflict severity is CRITICAL (face clone) -> DENIED. + 3. If conflict severity is HIGH (face swap) -> DENIED. + 4. If face verification failed -> PENDING_ADMIN_REVIEW (or DENIED based on strictness). + * We use DENIED for NO_PHOTO or UNVERIFIABLE, but PENDING_ADMIN_REVIEW for MISMATCH if we want staff to verify. + * For this implementation, we will DENY hard on MISMATCH to be strict, but leave a path for admin override. + 5. If admin flagged required -> PENDING_ADMIN_REVIEW. + 6. Else -> GRANTED. + """ + + def evaluate( + self, + roll_result: VerificationFlowResult, + face_result: FaceVerificationResult, + conflict_report: ConflictReport, + ) -> AccessDecision: + """ + Evaluate all stage outputs and return a final AccessDecision. + """ + reasons = [] + roll_number = roll_result.roll_number + student_name = roll_result.display_card.full_name if roll_result.display_card else "Unknown" + + # Build audit trail + audit_trail: Dict[str, Any] = { + "roll_verification": roll_result.to_dict(), + "face_verification": face_result.to_dict(), + "conflict_detection": conflict_report.to_dict(), + } + + # ── Rule 1: Roll Verification Failure ───────────────────────────────── + if roll_result.is_rejected: + reasons.append(roll_result.message) + return AccessDecision( + decision=DecisionStatus.DENIED, + roll_number=roll_number, + student_name=student_name, + reasons=reasons, + audit_trail=audit_trail, + ) + + # ── Rule 2: Manual Review Flag ──────────────────────────────────────── + if roll_result.requires_manual: + reasons.append("Manual staff verification requested.") + return AccessDecision( + decision=DecisionStatus.PENDING_ADMIN_REVIEW, + roll_number=roll_number, + student_name=student_name, + reasons=reasons, + requires_admin_review=True, + audit_trail=audit_trail, + ) + + # ── Rule 3: Identity Conflicts ──────────────────────────────────────── + if conflict_report.has_conflict: + reasons.append(conflict_report.reason) + if conflict_report.severity in (ConflictSeverity.CRITICAL, ConflictSeverity.HIGH): + # Hard block for severe conflicts + return AccessDecision( + decision=DecisionStatus.DENIED, + roll_number=roll_number, + student_name=student_name, + reasons=reasons, + audit_trail=audit_trail, + ) + elif conflict_report.severity == ConflictSeverity.MEDIUM: + # Soft block for suspicious but not guaranteed fraud + return AccessDecision( + decision=DecisionStatus.PENDING_ADMIN_REVIEW, + roll_number=roll_number, + student_name=student_name, + reasons=reasons, + requires_admin_review=True, + audit_trail=audit_trail, + ) + + # ── Rule 4: Face Verification Failure ───────────────────────────────── + if not face_result.matched: + reasons.append(face_result.reason) + # If it's a mismatch but no fraud detected, we might want admin review + # For strictness, if confidence is 0, we deny. But let's route mismatch to admin review. + if face_result.status.value == "mismatch": + return AccessDecision( + decision=DecisionStatus.PENDING_ADMIN_REVIEW, + roll_number=roll_number, + student_name=student_name, + reasons=reasons, + requires_admin_review=True, + audit_trail=audit_trail, + ) + else: + # NO_PHOTO or UNVERIFIABLE + return AccessDecision( + decision=DecisionStatus.DENIED, + roll_number=roll_number, + student_name=student_name, + reasons=reasons, + audit_trail=audit_trail, + ) + + # ── Success ─────────────────────────────────────────────────────────── + reasons.append(f"Identity fully verified for {student_name}.") + return AccessDecision( + decision=DecisionStatus.GRANTED, + roll_number=roll_number, + student_name=student_name, + reasons=reasons, + audit_trail=audit_trail, + ) diff --git a/backend/src/agents/gatekeeper/conflict_detection/__init__.py b/backend/src/agents/gatekeeper/conflict_detection/__init__.py new file mode 100644 index 0000000..193338e --- /dev/null +++ b/backend/src/agents/gatekeeper/conflict_detection/__init__.py @@ -0,0 +1,14 @@ +""" +conflict_detection/__init__.py +────────────────────────────── +Public surface of the GATEKEEPER identity conflict detection package. +""" + +from .conflict_detector import ConflictReport, ConflictSeverity, ConflictType, IdentityConflictDetector + +__all__ = [ + "ConflictReport", + "ConflictSeverity", + "ConflictType", + "IdentityConflictDetector", +] diff --git a/backend/src/agents/gatekeeper/conflict_detection/conflict_detector.py b/backend/src/agents/gatekeeper/conflict_detection/conflict_detector.py new file mode 100644 index 0000000..e3567c6 --- /dev/null +++ b/backend/src/agents/gatekeeper/conflict_detection/conflict_detector.py @@ -0,0 +1,124 @@ +""" +conflict_detector.py +──────────────────── +Identity conflict detection engine for the GATEKEEPER pipeline. + +Responsibilities +──────────────── +- Consume HistoryCheckResult from the face history stage. +- Classify conflicts into severe categories (clone vs swap). +- Produce a structured ConflictReport that the Authorization Engine will use. + +Rules +───── +- Pure logic — reads the HistoryCheckResult and decides conflict level. +- Deterministic and rule-based. +- No DB calls or state mutation. +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from enum import Enum +from typing import List + +from src.agents.gatekeeper.face_history.history_checker import HistoryCheckResult + +logger = logging.getLogger(__name__) + + +class ConflictSeverity(str, Enum): + NONE = "none" + LOW = "low" # Minor anomaly, log only + MEDIUM = "medium" # Suspicious, maybe flag for later review + HIGH = "high" # Likely fraud (e.g. face swap attempt on same roll), block access + CRITICAL = "critical" # Severe fraud (e.g. proxy test taker / face clone), block immediately + + +class ConflictType(str, Enum): + NO_CONFLICT = "no_conflict" + FACE_SWAP = "face_swap" # Same roll number, different faces presented over time + FACE_CLONE = "face_clone" # Same face presented under multiple different roll numbers + + +@dataclass(frozen=True) +class ConflictReport: + """ + Final output of the conflict detection stage. + """ + has_conflict: bool + conflict_type: ConflictType + severity: ConflictSeverity + involved_rolls: List[str] = field(default_factory=list) + involved_faces: List[str] = field(default_factory=list) + reason: str = "" + + def to_dict(self) -> dict: + return { + "has_conflict": self.has_conflict, + "conflict_type": self.conflict_type.value, + "severity": self.severity.value, + "involved_rolls": self.involved_rolls, + "involved_faces": self.involved_faces, + "reason": self.reason, + } + + +class IdentityConflictDetector: + """ + Rules engine that categorizes face history anomalies into actionable conflicts. + + Rules: + - is_cloned_face (same face, multiple rolls) -> CRITICAL / FACE_CLONE + - conflict_face_ids (same roll, multiple faces) -> HIGH / FACE_SWAP + - neither -> NONE / NO_CONFLICT + """ + + def analyze(self, history_result: HistoryCheckResult) -> ConflictReport: + """ + Analyze the history check result and emit a ConflictReport. + """ + # Rule 1: Face Clone (Proxy Test Taker) + # If the same face ID is associated with other roll numbers, this is a critical security breach. + if history_result.is_cloned_face: + all_involved_rolls = [history_result.roll_number] + history_result.clone_roll_numbers + reason = ( + f"CRITICAL: Identity Clone detected. The presented face '{history_result.face_id}' " + f"has been used by multiple roll numbers: {all_involved_rolls}." + ) + logger.error("[ConflictDetector] %s", reason) + return ConflictReport( + has_conflict=True, + conflict_type=ConflictType.FACE_CLONE, + severity=ConflictSeverity.CRITICAL, + involved_rolls=all_involved_rolls, + involved_faces=[history_result.face_id], + reason=reason, + ) + + # Rule 2: Face Swap (Account Sharing / Mid-exam swap) + # If this roll number has previously used different face IDs, flag it as high severity. + if history_result.conflict_face_ids: + all_involved_faces = [history_result.face_id] + history_result.conflict_face_ids + reason = ( + f"HIGH: Face Swap detected. Roll number '{history_result.roll_number}' " + f"has presented multiple distinct faces: {all_involved_faces}." + ) + logger.warning("[ConflictDetector] %s", reason) + return ConflictReport( + has_conflict=True, + conflict_type=ConflictType.FACE_SWAP, + severity=ConflictSeverity.HIGH, + involved_rolls=[history_result.roll_number], + involved_faces=all_involved_faces, + reason=reason, + ) + + # No conflict + return ConflictReport( + has_conflict=False, + conflict_type=ConflictType.NO_CONFLICT, + severity=ConflictSeverity.NONE, + reason="No identity conflicts detected.", + ) diff --git a/backend/src/agents/gatekeeper/face_history/__init__.py b/backend/src/agents/gatekeeper/face_history/__init__.py new file mode 100644 index 0000000..b4482c8 --- /dev/null +++ b/backend/src/agents/gatekeeper/face_history/__init__.py @@ -0,0 +1,14 @@ +""" +face_history/__init__.py +───────────────────────── +Public surface of the GATEKEEPER face history tracking package. +""" + +from .history_store import FaceHistoryStore +from .history_checker import FaceHistoryChecker, HistoryCheckResult + +__all__ = [ + "FaceHistoryStore", + "FaceHistoryChecker", + "HistoryCheckResult", +] diff --git a/backend/src/agents/gatekeeper/face_history/history_checker.py b/backend/src/agents/gatekeeper/face_history/history_checker.py new file mode 100644 index 0000000..7bd5aee --- /dev/null +++ b/backend/src/agents/gatekeeper/face_history/history_checker.py @@ -0,0 +1,119 @@ +""" +history_checker.py +────────────────── +Face history checker for the GATEKEEPER pipeline. + +Responsibilities +──────────────── +- Check whether a face is new for a given roll number. +- Detect conflicting faces already recorded in history. +- Return a typed HistoryCheckResult. + +Rules +───── +- Reads from FaceHistoryStore — does not write (pipeline does the write). +- Stateless logic — store injected at construction. +- Never raises. +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from typing import List + +from .history_store import FaceHistoryStore + +logger = logging.getLogger(__name__) + + +@dataclass(frozen=True) +class HistoryCheckResult: + """ + Result of a face history check for one (roll_number, face_id) pair. + + Fields + ────── + roll_number : The roll number checked. + face_id : The face ID checked. + is_new_face : True if this face_id was never seen for this roll. + conflict_face_ids : Other face IDs already recorded for this roll. + is_cloned_face : True if this face_id is already used by another roll. + clone_roll_numbers : Roll numbers this face_id has appeared under before. + """ + roll_number: str + face_id: str + is_new_face: bool + conflict_face_ids: List[str] = field(default_factory=list) + is_cloned_face: bool = False + clone_roll_numbers: List[str] = field(default_factory=list) + + @property + def has_any_conflict(self) -> bool: + """True if either a face-swap or a face-clone conflict was detected.""" + return bool(self.conflict_face_ids) or self.is_cloned_face + + def to_dict(self) -> dict: + return { + "roll_number": self.roll_number, + "face_id": self.face_id, + "is_new_face": self.is_new_face, + "conflict_face_ids": self.conflict_face_ids, + "is_cloned_face": self.is_cloned_face, + "clone_roll_numbers": self.clone_roll_numbers, + "has_any_conflict": self.has_any_conflict, + } + + +class FaceHistoryChecker: + """ + Checks a (roll_number, face_id) pair against recorded history. + + Usage + ───── + checker = FaceHistoryChecker(store) + result = checker.check("150096725066", "photos/150096725066.jpg") + if result.has_any_conflict: + # escalate to conflict detector + """ + + def __init__(self, store: FaceHistoryStore) -> None: + self._store = store + + def check(self, roll_number: str, face_id: str) -> HistoryCheckResult: + """ + Check history for face-swap and face-clone signals. + + Face-swap : roll_number already has a different face recorded. + Face-clone : face_id already appeared under a different roll number. + """ + is_new_face = self._store.is_new_face_for_roll(roll_number, face_id) + + # Conflict type 1 — face swap: roll already has different faces + existing_faces_for_roll = self._store.get_faces_for_roll(roll_number) + conflict_faces = [f for f in existing_faces_for_roll if f != face_id] + + # Conflict type 2 — face clone: this face used with other rolls + rolls_for_this_face = self._store.get_rolls_for_face(face_id) + clone_rolls = [r for r in rolls_for_this_face if r != roll_number] + is_cloned_face = len(clone_rolls) > 0 + + if conflict_faces: + logger.warning( + "[FaceHistory] Face-swap signal: roll=%s new_face=%s, existing=%s", + roll_number, face_id, conflict_faces + ) + if is_cloned_face: + logger.warning( + "[FaceHistory] Face-clone signal: face=%s already seen under rolls=%s", + face_id, clone_rolls + ) + + return HistoryCheckResult( + roll_number = roll_number, + face_id = face_id, + is_new_face = is_new_face, + conflict_face_ids = conflict_faces, + is_cloned_face = is_cloned_face, + clone_roll_numbers = clone_rolls, + ) diff --git a/backend/src/agents/gatekeeper/face_history/history_store.py b/backend/src/agents/gatekeeper/face_history/history_store.py new file mode 100644 index 0000000..979a248 --- /dev/null +++ b/backend/src/agents/gatekeeper/face_history/history_store.py @@ -0,0 +1,91 @@ +""" +history_store.py +──────────────── +In-memory face history log for the GATEKEEPER pipeline. + +Responsibilities +──────────────── +- Track which face IDs have been seen for each roll number. +- Track which roll numbers a face ID has been used with. +- Provide conflict detection queries. + +Rules +───── +- Stateful per pipeline instance (reset between exam sessions). +- No persistence — a DB adapter can wrap this for production. +- Never raises. +""" + +from __future__ import annotations + +import logging +from collections import defaultdict +from typing import Dict, List, Set + +logger = logging.getLogger(__name__) + + +class FaceHistoryStore: + """ + Bidirectional face-to-roll and roll-to-face history log. + + Stores two indexes: + - roll_to_faces : roll_number → {face_id, ...} + - face_to_rolls : face_id → {roll_number, ...} + + Usage + ───── + store = FaceHistoryStore() + store.record("150096725066", "photos/150096725066.jpg") + store.has_conflict_for_roll("150096725066") # False — only one face seen + store.get_rolls_for_face("photos/150096725066.jpg") # {"150096725066"} + """ + + def __init__(self) -> None: + self._roll_to_faces: Dict[str, Set[str]] = defaultdict(set) + self._face_to_rolls: Dict[str, Set[str]] = defaultdict(set) + + # ── Write ───────────────────────────────────────────────────────────────── + + def record(self, roll_number: str, face_id: str) -> None: + """Register a face_id → roll_number observation.""" + if not roll_number or not face_id: + return + self._roll_to_faces[roll_number].add(face_id) + self._face_to_rolls[face_id].add(roll_number) + logger.debug("[FaceHistory] Recorded face=%s for roll=%s", face_id, roll_number) + + # ── Read ────────────────────────────────────────────────────────────────── + + def get_faces_for_roll(self, roll_number: str) -> List[str]: + """All face IDs seen for this roll number.""" + return list(self._roll_to_faces.get(roll_number, set())) + + def get_rolls_for_face(self, face_id: str) -> List[str]: + """All roll numbers this face ID has been associated with.""" + return list(self._face_to_rolls.get(face_id, set())) + + def has_conflict_for_roll(self, roll_number: str) -> bool: + """True if more than one distinct face ID has been seen for this roll.""" + return len(self._roll_to_faces.get(roll_number, set())) > 1 + + def has_conflict_for_face(self, face_id: str) -> bool: + """True if this face ID has been used with more than one roll number.""" + return len(self._face_to_rolls.get(face_id, set())) > 1 + + def is_new_face_for_roll(self, roll_number: str, face_id: str) -> bool: + """True if this face_id has never been seen for this roll number before.""" + return face_id not in self._roll_to_faces.get(roll_number, set()) + + # ── Meta ────────────────────────────────────────────────────────────────── + + def reset(self) -> None: + """Clear all history (use between exam sessions).""" + self._roll_to_faces.clear() + self._face_to_rolls.clear() + logger.info("[FaceHistory] Store reset.") + + @property + def total_observations(self) -> int: + """Total number of recorded face observations.""" + return sum(len(faces) for faces in self._roll_to_faces.values()) diff --git a/backend/src/agents/gatekeeper/face_verification/__init__.py b/backend/src/agents/gatekeeper/face_verification/__init__.py index 45a7276..96ed466 100644 --- a/backend/src/agents/gatekeeper/face_verification/__init__.py +++ b/backend/src/agents/gatekeeper/face_verification/__init__.py @@ -1,36 +1,14 @@ """ face_verification/__init__.py ────────────────────────────── -Public surface of the GATEKEEPER Face Verification Engine package. +Public surface of the GATEKEEPER face verification package. """ -from .models import ( - FaceCapture, - FaceEmbedding, - VerificationResult, - VerificationStatus, - EmbeddingSource, -) -from .embedding_engine import ( - AbstractEmbeddingEngine, - StubEmbeddingEngine, -) -from .comparator import EmbeddingComparator, SimilarityScore -from .verification_pipeline import FaceVerificationPipeline +from .face_result import FaceVerificationResult, FaceMatchStatus +from .face_verifier import FaceVerifier __all__ = [ - # Models - "FaceCapture", - "FaceEmbedding", - "VerificationResult", - "VerificationStatus", - "EmbeddingSource", - # Engine - "AbstractEmbeddingEngine", - "StubEmbeddingEngine", - # Comparator - "EmbeddingComparator", - "SimilarityScore", - # Pipeline - "FaceVerificationPipeline", + "FaceVerificationResult", + "FaceMatchStatus", + "FaceVerifier", ] diff --git a/backend/src/agents/gatekeeper/face_verification/face_result.py b/backend/src/agents/gatekeeper/face_verification/face_result.py new file mode 100644 index 0000000..344ba41 --- /dev/null +++ b/backend/src/agents/gatekeeper/face_verification/face_result.py @@ -0,0 +1,59 @@ +""" +face_result.py +────────────── +Typed result object for the GATEKEEPER face verification step. + +Rules +───── +- Frozen dataclass — immutable after creation. +- All fields are plain types — safe to serialize and log. +- No pipeline logic here. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from enum import Enum +from typing import Optional + + +class FaceMatchStatus(str, Enum): + MATCHED = "matched" # Face confirmed against registry photo + MISMATCH = "mismatch" # Face presented does not match registry + NO_PHOTO = "no_photo" # No reference photo registered for student + UNVERIFIABLE = "unverifiable" # Face input was absent or unparseable + + +@dataclass(frozen=True) +class FaceVerificationResult: + """ + Output of a single face verification attempt. + + Fields + ────── + roll_number : The roll number this face check was performed against. + face_id : The presented face identifier (e.g. a hash, path, or UUID). + status : FaceMatchStatus enum value. + matched : True only when status == MATCHED. + confidence : Match confidence score 0.0–1.0 (1.0 = certain match). + reason : Human-readable explanation for staff / audit log. + photo_reference : The reference photo that was compared against. + """ + roll_number: str + face_id: Optional[str] + status: FaceMatchStatus + matched: bool + confidence: float + reason: str + photo_reference: Optional[str] = None + + def to_dict(self) -> dict: + return { + "roll_number": self.roll_number, + "face_id": self.face_id, + "status": self.status.value, + "matched": self.matched, + "confidence": self.confidence, + "reason": self.reason, + "photo_reference": self.photo_reference, + } diff --git a/backend/src/agents/gatekeeper/face_verification/face_verifier.py b/backend/src/agents/gatekeeper/face_verification/face_verifier.py new file mode 100644 index 0000000..203e8c5 --- /dev/null +++ b/backend/src/agents/gatekeeper/face_verification/face_verifier.py @@ -0,0 +1,206 @@ +""" +face_verifier.py +──────────────── +Face verification engine for the GATEKEEPER pipeline. + +Design +────── +This is a deterministic stub that implements the full verification contract +using fixture-based matching. The interface is stable — a real CV library +(DeepFace, OpenCV, FaceNet) can drop into _run_cv_match() without changing +any caller code. + +Matching Rules (stub) +───────────────────── +- face_id == photo_reference → MATCHED (confidence 0.98) +- face_id starts with roll_number → MATCHED (confidence 0.91) +- face_id == "OVERRIDE_PASS" → MATCHED (confidence 1.0, admin override) +- face_id == "OVERRIDE_FAIL" → MISMATCH (confidence 0.0, forced failure) +- roll_number has no photo_reference → NO_PHOTO +- face_id is None or empty → UNVERIFIABLE +- anything else → MISMATCH (confidence 0.0) + +Rules +───── +- Stateless — registry injected at construction. +- Deterministic — same inputs always produce same result. +- Never raises. +""" + +from __future__ import annotations + +import logging +from typing import Optional + +from src.agents.gatekeeper.registry.registry_store import StudentRegistry +from .face_result import FaceMatchStatus, FaceVerificationResult + +logger = logging.getLogger(__name__) + +# Confidence thresholds +_CONFIDENCE_EXACT = 0.98 +_CONFIDENCE_PREFIX = 0.91 +_CONFIDENCE_ADMIN = 1.00 +_CONFIDENCE_MISMATCH = 0.00 + +# Sentinel face IDs for testing / admin override +_OVERRIDE_PASS = "OVERRIDE_PASS" +_OVERRIDE_FAIL = "OVERRIDE_FAIL" + + +class FaceVerifier: + """ + Deterministic face verification engine. + + Usage + ───── + verifier = FaceVerifier(registry) + result = verifier.verify("150096725066", face_id="photos/150096725066.jpg") + if result.matched: + ... + + Production swap + ─────────────── + Replace _run_cv_match() with a real CV call. Everything else stays the same. + """ + + def __init__(self, registry: StudentRegistry) -> None: + self._registry = registry + + # ── Public interface ────────────────────────────────────────────────────── + + def verify(self, roll_number: str, face_id: Optional[str]) -> FaceVerificationResult: + """ + Verify a presented face against the registry photo for a given roll number. + + Parameters + ────────── + roll_number : Normalized roll number (must exist in registry). + face_id : The identifier / path / hash of the presented face. + """ + profile = self._registry.get(roll_number) + + # Guard: no such student in registry + if profile is None: + logger.warning("[FaceVerifier] Roll number not in registry: %s", roll_number) + return FaceVerificationResult( + roll_number = roll_number, + face_id = face_id, + status = FaceMatchStatus.UNVERIFIABLE, + matched = False, + confidence = 0.0, + reason = f"Roll number '{roll_number}' not found in registry.", + photo_reference = None, + ) + + ref_photo = profile.photo_reference + + # Guard: no reference photo registered + if not ref_photo: + logger.info("[FaceVerifier] No photo registered for: %s", roll_number) + return FaceVerificationResult( + roll_number = roll_number, + face_id = face_id, + status = FaceMatchStatus.NO_PHOTO, + matched = False, + confidence = 0.0, + reason = f"No reference photo registered for '{profile.full_name}'.", + photo_reference = None, + ) + + # Guard: no face presented + if not face_id or not face_id.strip(): + logger.info("[FaceVerifier] No face presented for: %s", roll_number) + return FaceVerificationResult( + roll_number = roll_number, + face_id = face_id, + status = FaceMatchStatus.UNVERIFIABLE, + matched = False, + confidence = 0.0, + reason = "No face ID was presented for verification.", + photo_reference = ref_photo, + ) + + # Run core match logic + return self._run_cv_match(roll_number, face_id, ref_photo, profile.full_name) + + # ── Core match logic (swap this for real CV in production) ──────────────── + + def _run_cv_match( + self, + roll_number: str, + face_id: str, + ref_photo: str, + student_name:str, + ) -> FaceVerificationResult: + """ + Deterministic stub matching logic. + Replace body with: return self._deepface_match(face_id, ref_photo) + """ + + # Admin override — forced pass + if face_id == _OVERRIDE_PASS: + logger.info("[FaceVerifier] Admin override PASS for: %s", roll_number) + return FaceVerificationResult( + roll_number = roll_number, + face_id = face_id, + status = FaceMatchStatus.MATCHED, + matched = True, + confidence = _CONFIDENCE_ADMIN, + reason = f"Admin override: forced PASS for {student_name}.", + photo_reference = ref_photo, + ) + + # Admin override — forced fail + if face_id == _OVERRIDE_FAIL: + logger.info("[FaceVerifier] Admin override FAIL for: %s", roll_number) + return FaceVerificationResult( + roll_number = roll_number, + face_id = face_id, + status = FaceMatchStatus.MISMATCH, + matched = False, + confidence = _CONFIDENCE_MISMATCH, + reason = f"Admin override: forced FAIL for {student_name}.", + photo_reference = ref_photo, + ) + + # Exact match: face_id == registered photo path + if face_id == ref_photo: + logger.info("[FaceVerifier] Exact match for: %s", roll_number) + return FaceVerificationResult( + roll_number = roll_number, + face_id = face_id, + status = FaceMatchStatus.MATCHED, + matched = True, + confidence = _CONFIDENCE_EXACT, + reason = f"Face verified: exact match for {student_name}.", + photo_reference = ref_photo, + ) + + # Prefix match: face_id starts with roll number (e.g. "150096725066_cam1.jpg") + if face_id.startswith(roll_number): + logger.info("[FaceVerifier] Prefix match for: %s", roll_number) + return FaceVerificationResult( + roll_number = roll_number, + face_id = face_id, + status = FaceMatchStatus.MATCHED, + matched = True, + confidence = _CONFIDENCE_PREFIX, + reason = f"Face verified: roll-prefixed ID matched for {student_name}.", + photo_reference = ref_photo, + ) + + # Mismatch + logger.info("[FaceVerifier] Mismatch for: %s (presented=%s)", roll_number, face_id) + return FaceVerificationResult( + roll_number = roll_number, + face_id = face_id, + status = FaceMatchStatus.MISMATCH, + matched = False, + confidence = _CONFIDENCE_MISMATCH, + reason = ( + f"Face mismatch: presented ID '{face_id}' does not match " + f"registered photo for {student_name}." + ), + photo_reference = ref_photo, + ) diff --git a/backend/src/agents/gatekeeper/pipeline/__init__.py b/backend/src/agents/gatekeeper/pipeline/__init__.py new file mode 100644 index 0000000..0517f60 --- /dev/null +++ b/backend/src/agents/gatekeeper/pipeline/__init__.py @@ -0,0 +1,13 @@ +""" +pipeline/__init__.py +──────────────────── +Public surface of the GATEKEEPER verification pipeline. +""" + +from .pipeline_result import PipelineResult +from .gatekeeper_pipeline import GatekeeperPipeline + +__all__ = [ + "PipelineResult", + "GatekeeperPipeline", +] diff --git a/backend/src/agents/gatekeeper/pipeline/gatekeeper_pipeline.py b/backend/src/agents/gatekeeper/pipeline/gatekeeper_pipeline.py new file mode 100644 index 0000000..397e8ba --- /dev/null +++ b/backend/src/agents/gatekeeper/pipeline/gatekeeper_pipeline.py @@ -0,0 +1,121 @@ +""" +gatekeeper_pipeline.py +────────────────────── +Central orchestrator for the GATEKEEPER verification process. + +Responsibilities +──────────────── +- Construct all stage engines (Registry, Roll, Face, History, Conflict, Auth). +- Execute the verification stages in sequence for a given session. +- Manage error boundaries (never raise). +- Return a structured PipelineResult. +""" + +from __future__ import annotations + +import logging +import time +from typing import Optional + +from src.agents.gatekeeper.authorization.auth_engine import AuthorizationEngine +from src.agents.gatekeeper.conflict_detection.conflict_detector import IdentityConflictDetector +from src.agents.gatekeeper.face_history.history_checker import FaceHistoryChecker +from src.agents.gatekeeper.face_history.history_store import FaceHistoryStore +from src.agents.gatekeeper.face_verification.face_verifier import FaceVerifier +from src.agents.gatekeeper.pipeline.pipeline_result import PipelineResult +from src.agents.gatekeeper.registry.registry_store import StudentRegistry +from src.agents.gatekeeper.roll_verification.fixtures import build_fixture_registry +from src.agents.gatekeeper.roll_verification.flow import RollVerificationFlow + +logger = logging.getLogger(__name__) + + +class GatekeeperPipeline: + """ + End-to-End GATEKEEPER Verification Pipeline. + + Usage + ───── + pipeline = GatekeeperPipeline() + result = pipeline.run("150096725066", "photos/150096725066.jpg") + if result.is_admitted: + ... + """ + + def __init__(self, registry: Optional[StudentRegistry] = None, history_store: Optional[FaceHistoryStore] = None) -> None: + """ + Initialize pipeline dependencies. + Uses fixture registry and isolated memory history store by default. + """ + # Shared state stores + self._registry = registry or build_fixture_registry() + self._history_store = history_store or FaceHistoryStore() + + # Pipeline stages + self._roll_verifier = RollVerificationFlow(registry=self._registry, require_manual_confirmation=False) + self._face_verifier = FaceVerifier(registry=self._registry) + self._history_checker = FaceHistoryChecker(store=self._history_store) + self._conflict_detector = IdentityConflictDetector() + self._auth_engine = AuthorizationEngine() + + def run(self, raw_roll_number: str, raw_face_id: Optional[str] = None) -> PipelineResult: + """ + Execute the full verification pipeline. + """ + start_time = time.time() + logger.info("[GatekeeperPipeline] Starting verification for roll=%s face=%s", raw_roll_number, raw_face_id) + + # ── Stage 1: Roll Number Verification (Identity Lookup) ─────────────── + roll_result = self._roll_verifier.verify(raw_roll_number) + + # ── Stage 2: Face Verification (Biometric Check) ────────────────────── + # We run this even if roll fails, though it will naturally fail NO_PHOTO / UNVERIFIABLE + # It's safer to run it so the interface receives complete stage results. + normalized_roll = roll_result.roll_number # Use normalized roll from Stage 1 + face_result = self._face_verifier.verify(normalized_roll, raw_face_id) + + # ── Stage 3: Face History Check ─────────────────────────────────────── + # Only check history if a face was presented. + if raw_face_id and raw_face_id.strip() and roll_result.is_verified: + history_result = self._history_checker.check(normalized_roll, raw_face_id) + else: + # Dummy clean result if we skipped history + from src.agents.gatekeeper.face_history.history_checker import HistoryCheckResult + history_result = HistoryCheckResult(roll_number=normalized_roll, face_id=raw_face_id or "", is_new_face=True) + + # ── Stage 4: Identity Conflict Detection ────────────────────────────── + conflict_report = self._conflict_detector.analyze(history_result) + + # ── Stage 5: Final Authorization Decision ───────────────────────────── + access_decision = self._auth_engine.evaluate( + roll_result=roll_result, + face_result=face_result, + conflict_report=conflict_report, + ) + + # ── Post-Pipeline: State Updates ────────────────────────────────────── + # If granted, we record the face in history for future checks. + if access_decision.is_granted and raw_face_id: + self._history_store.record(normalized_roll, raw_face_id) + + # ── Result Assembly ─────────────────────────────────────────────────── + duration_ms = (time.time() - start_time) * 1000.0 + + stage_results = { + "roll_verification": roll_result.to_dict(), + "face_verification": face_result.to_dict(), + "history_check": history_result.to_dict(), + "conflict_detection": conflict_report.to_dict(), + } + + logger.info( + "[GatekeeperPipeline] Finished in %.1fms. Decision: %s", + duration_ms, access_decision.decision.value + ) + + return PipelineResult( + is_admitted=access_decision.is_granted, + access_decision=access_decision, + pipeline_duration_ms=duration_ms, + stage_results=stage_results, + ) diff --git a/backend/src/agents/gatekeeper/pipeline/pipeline_result.py b/backend/src/agents/gatekeeper/pipeline/pipeline_result.py new file mode 100644 index 0000000..004b18a --- /dev/null +++ b/backend/src/agents/gatekeeper/pipeline/pipeline_result.py @@ -0,0 +1,37 @@ +""" +pipeline_result.py +────────────────── +Typed output object for the entire GATEKEEPER pipeline. + +Rules +───── +- Frozen dataclass — immutable after creation. +- Fully serializable. +- Exposes final admission boolean plus complete audit dictionary. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Dict + +from src.agents.gatekeeper.authorization.access_decision import AccessDecision + + +@dataclass(frozen=True) +class PipelineResult: + """ + Final output emitted by the Gatekeeper pipeline. + """ + is_admitted: bool + access_decision: AccessDecision + pipeline_duration_ms: float + stage_results: Dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> dict: + return { + "is_admitted": self.is_admitted, + "pipeline_duration_ms": self.pipeline_duration_ms, + "access_decision": self.access_decision.to_dict(), + "stage_results": self.stage_results, + } diff --git a/backend/src/agents/gatekeeper/registry/__init__.py b/backend/src/agents/gatekeeper/registry/__init__.py index 42b8632..c9e56f6 100644 --- a/backend/src/agents/gatekeeper/registry/__init__.py +++ b/backend/src/agents/gatekeeper/registry/__init__.py @@ -1,18 +1,20 @@ """ registry/__init__.py -──────────────────── -Public surface of the GATEKEEPER Student Registry package. +───────────────────── +Public surface of the GATEKEEPER student registry package. """ -from .student_schema import StudentProfile, StudentBatch, Department +from .student_schema import StudentProfile, Department, AcademicYear, StudentBatch from .registry_store import StudentRegistry -from .lookup import RegistryLookup, LookupResult +from .lookup import RegistryLookup, LookupResult, LookupFailureReason __all__ = [ "StudentProfile", - "StudentBatch", "Department", + "AcademicYear", + "StudentBatch", "StudentRegistry", "RegistryLookup", "LookupResult", + "LookupFailureReason", ] diff --git a/backend/src/agents/gatekeeper/registry/lookup.py b/backend/src/agents/gatekeeper/registry/lookup.py index 5bc97ae..85866fc 100644 --- a/backend/src/agents/gatekeeper/registry/lookup.py +++ b/backend/src/agents/gatekeeper/registry/lookup.py @@ -1,42 +1,46 @@ """ lookup.py ───────── -Roll-number lookup service for the GATEKEEPER Student Registry. +Registry lookup engine for the GATEKEEPER roll number verification flow. Responsibilities ──────────────── -- Accept raw roll-number input (possibly dirty/user-provided). -- Normalize and validate the format before hitting the store. -- Return a typed LookupResult with clear success/failure distinction. -- Surface the reason for failure so GATEKEEPER can respond appropriately. -- Never raises — all errors are captured in LookupResult. +- Normalize raw roll number input (strip whitespace, uppercase). +- Validate format (4–15 alphanumeric characters). +- Query the StudentRegistry. +- Return a typed LookupResult with full failure reasoning. Rules ───── -- Pure service functions + a stateless class. -- All lookups go through validate → normalize → store.get(). -- Deterministic: same input → same result for the same registry state. -- No UI logic, no session logic, no ORACLE imports. +- Never raises — all failure paths captured in LookupResult. +- Stateless — registry is injected at construction. +- No pipeline decisions here — only data retrieval + validation. """ from __future__ import annotations +import logging +import re from dataclasses import dataclass from enum import Enum from typing import Optional -from .student_schema import ROLL_NUMBER_PATTERN, StudentProfile from .registry_store import StudentRegistry +from .student_schema import StudentProfile +logger = logging.getLogger(__name__) -# ── Lookup failure reasons ──────────────────────────────────────────────────── +# Roll number validation: 4–15 alphanumeric characters (no spaces, no symbols) +_ROLL_PATTERN = re.compile(r"^[A-Z0-9]{4,15}$") + + +# ── Failure reason ──────────────────────────────────────────────────────────── class LookupFailureReason(str, Enum): - NOT_FOUND = "not_found" # Valid format, not in registry - INVALID_FORMAT = "invalid_format" # Roll number fails regex - EMPTY_INPUT = "empty_input" # Blank or None input - STUDENT_INACTIVE = "student_inactive" # Found but account deactivated - UNKNOWN_ERROR = "unknown_error" # Unexpected exception + EMPTY_INPUT = "empty_input" + INVALID_FORMAT = "invalid_format" + NOT_FOUND = "not_found" + STUDENT_INACTIVE = "student_inactive" # ── Lookup result ───────────────────────────────────────────────────────────── @@ -44,132 +48,123 @@ class LookupFailureReason(str, Enum): @dataclass(frozen=True) class LookupResult: """ - Result of a roll-number lookup operation. - - Always check `success` before accessing `profile`. - `failure_reason` is set when `success` is False. + Complete output of a single registry lookup. + + Fields + ────── + success : True only when the student is found AND active. + roll_number : Normalized roll number (or raw input on early failure). + profile : Populated on success; None on failure. + failure_reason : Set when success is False. """ - success: bool - roll_number: str # The normalized input (or raw if invalid) - profile: Optional[StudentProfile] # Set only when success=True - failure_reason: Optional[LookupFailureReason] = None - message: str = "" - - @staticmethod - def ok(profile: StudentProfile) -> "LookupResult": - return LookupResult( - success = True, - roll_number = profile.roll_number, - profile = profile, - failure_reason= None, - message = f"Student '{profile.full_name}' found.", - ) - - @staticmethod - def fail( - roll_number: str, - reason: LookupFailureReason, - message: str = "", - ) -> "LookupResult": - return LookupResult( - success = False, - roll_number = roll_number, - profile = None, - failure_reason= reason, - message = message or reason.value, - ) + success: bool + roll_number: Optional[str] + profile: Optional[StudentProfile] + failure_reason: Optional[LookupFailureReason] def to_dict(self) -> dict: return { "success": self.success, "roll_number": self.roll_number, "failure_reason": self.failure_reason.value if self.failure_reason else None, - "message": self.message, "profile": self.profile.to_dict() if self.profile else None, } -# ── Lookup service ──────────────────────────────────────────────────────────── +# ── Registry lookup ─────────────────────────────────────────────────────────── class RegistryLookup: """ - Stateless roll-number lookup service. + Stateless roll number lookup engine. + + Pipeline + ──────── + 1. Normalize input (strip, uppercase) + 2. Validate format (regex) + 3. Query registry + 4. Check active status + 5. Return LookupResult Usage ───── lookup = RegistryLookup(registry) - result = lookup.by_roll_number("cs2021001") # case-insensitive + result = lookup.by_roll_number("150096725066") if result.success: print(result.profile.full_name) - else: - print(result.failure_reason) """ def __init__(self, registry: StudentRegistry) -> None: self._registry = registry - def by_roll_number(self, raw_input: str) -> LookupResult: + def by_roll_number(self, raw: object) -> LookupResult: """ - Look up a student by raw roll-number input. - - Pipeline: - 1. Empty check - 2. Normalize (strip, uppercase) - 3. Format validation - 4. Registry lookup - 5. Active check + Run the full lookup pipeline for a raw roll number input. + Accepts any type — will reject non-string / invalid input gracefully. """ - # 1. Empty check - if not raw_input or not str(raw_input).strip(): - return LookupResult.fail( - roll_number = "", - reason = LookupFailureReason.EMPTY_INPUT, - message = "Roll number input is empty.", + + # ── Step 1: Empty / non-string guard ───────────────────────────────── + if raw is None or (isinstance(raw, str) and not raw.strip()): + logger.debug("[Lookup] Empty or None input rejected.") + return LookupResult( + success=False, + roll_number=None, + profile=None, + failure_reason=LookupFailureReason.EMPTY_INPUT, ) - # 2. Normalize - normalized = str(raw_input).strip().upper() - - # 3. Format validation - if not ROLL_NUMBER_PATTERN.match(normalized): - return LookupResult.fail( - roll_number = normalized, - reason = LookupFailureReason.INVALID_FORMAT, - message = ( - f"'{normalized}' is not a valid roll number. " - f"Expected 4–15 uppercase letters/digits (e.g. CS2021001)." - ), + if not isinstance(raw, str): + # Coerce numeric types gracefully, reject collections + try: + raw = str(int(raw)) + except (TypeError, ValueError): + logger.debug("[Lookup] Non-string, non-numeric input rejected: %r", raw) + return LookupResult( + success=False, + roll_number=None, + profile=None, + failure_reason=LookupFailureReason.INVALID_FORMAT, + ) + + # ── Step 2: Normalize ───────────────────────────────────────────────── + normalized = raw.strip().upper() + + # ── Step 3: Format validation ───────────────────────────────────────── + if not _ROLL_PATTERN.match(normalized): + logger.debug("[Lookup] Invalid format: %r → normalized %r", raw, normalized) + return LookupResult( + success=False, + roll_number=normalized, + profile=None, + failure_reason=LookupFailureReason.INVALID_FORMAT, ) - # 4. Registry lookup + # ── Step 4: Registry query ──────────────────────────────────────────── profile = self._registry.get(normalized) + if profile is None: - return LookupResult.fail( - roll_number = normalized, - reason = LookupFailureReason.NOT_FOUND, - message = f"No student found with roll number '{normalized}'.", + logger.debug("[Lookup] Not found in registry: %s", normalized) + return LookupResult( + success=False, + roll_number=normalized, + profile=None, + failure_reason=LookupFailureReason.NOT_FOUND, ) - # 5. Active check + # ── Step 5: Active check ────────────────────────────────────────────── if not profile.is_active: - return LookupResult.fail( - roll_number = normalized, - reason = LookupFailureReason.STUDENT_INACTIVE, - message = ( - f"Student '{profile.full_name}' ({normalized}) account is inactive." - ), + logger.info("[Lookup] Inactive student: %s (%s)", profile.full_name, normalized) + return LookupResult( + success=False, + roll_number=normalized, + profile=profile, + failure_reason=LookupFailureReason.STUDENT_INACTIVE, ) - return LookupResult.ok(profile) - - def metadata(self, raw_input: str) -> dict: - """ - Convenience method: return metadata dict directly. - Returns an error dict if lookup fails — never raises. - """ - result = self.by_roll_number(raw_input) - return result.to_dict() - - def is_valid_student(self, raw_input: str) -> bool: - """Quick boolean check — True only if student is found and active.""" - return self.by_roll_number(raw_input).success + # ── Success ─────────────────────────────────────────────────────────── + logger.info("[Lookup] Verified: %s (%s)", profile.full_name, normalized) + return LookupResult( + success=True, + roll_number=normalized, + profile=profile, + failure_reason=None, + ) diff --git a/backend/src/agents/gatekeeper/registry/registry_store.py b/backend/src/agents/gatekeeper/registry/registry_store.py index 083b61e..a7981c4 100644 --- a/backend/src/agents/gatekeeper/registry/registry_store.py +++ b/backend/src/agents/gatekeeper/registry/registry_store.py @@ -1,57 +1,42 @@ """ registry_store.py ───────────────── -In-memory Student Registry store for GATEKEEPER. +In-memory student registry store for the GATEKEEPER agent. Responsibilities ──────────────── -- Store, retrieve, and manage StudentProfile records. -- Support roll-number lookup (primary key). -- Support batch/department queries for session routing. -- Handle invalid roll numbers safely — never raises on bad input. -- Seed from a static fixture list for development/testing. +- Store StudentProfile records keyed by roll number. +- Support bulk seeding from fixture lists. +- Expose filtered views (active-only, all). +- Never raise — return None / empty on misses. Rules ───── -- Storage is a plain dict[str, StudentProfile] — lightweight, serializable. -- Registry is the single source of truth for all student identity queries. -- All mutating operations return the registry itself (fluent interface). -- Thread-safety: single-process use; no locking required at this stage. -- Never imports from ORACLE or MAIN Agent packages. +- Stateful but not persistent (use a DB adapter in production). +- Thread-safety is not required for the current single-process design. +- No validation logic here — that belongs in lookup.py. """ from __future__ import annotations import logging -from typing import Dict, Iterator, List, Optional +from typing import Dict, List, Optional -from .student_schema import ( - AcademicYear, - Department, - StudentBatch, - StudentProfile, - ROLL_NUMBER_PATTERN, -) +from .student_schema import StudentProfile logger = logging.getLogger(__name__) -class RegistryError(Exception): - """Raised for invalid registry operations (duplicate, not found, etc.).""" - - class StudentRegistry: """ - In-memory student identity registry for GATEKEEPER. + In-memory student data store. Usage ───── registry = StudentRegistry() - registry.seed(SAMPLE_STUDENTS) - - profile = registry.get("CS2021001") # returns StudentProfile or None - exists = registry.exists("CS2021001") - batch = registry.by_batch(Department.COMPUTER_SCIENCE, AcademicYear.THIRD, StudentBatch.A) + registry.add(profile) + profile = registry.get("150096725066") # → StudentProfile | None + all_active = registry.all_active() # → List[StudentProfile] """ def __init__(self) -> None: @@ -59,177 +44,71 @@ def __init__(self) -> None: # ── Write operations ────────────────────────────────────────────────────── - def add(self, profile: StudentProfile) -> "StudentRegistry": - """ - Add a student profile to the registry. - Raises RegistryError if the roll number already exists. - """ - if profile.roll_number in self._store: - raise RegistryError( - f"Student with roll number '{profile.roll_number}' already exists." - ) + def add(self, profile: StudentProfile) -> None: + """Add or overwrite a single student profile.""" self._store[profile.roll_number] = profile - logger.debug("[StudentRegistry] added: %s (%s)", profile.roll_number, profile.full_name) - return self + logger.debug("[Registry] Added student: %s (%s)", profile.full_name, profile.roll_number) - def upsert(self, profile: StudentProfile) -> "StudentRegistry": - """Add or replace a student profile (idempotent).""" - self._store[profile.roll_number] = profile - return self + def seed(self, profiles: List[StudentProfile]) -> None: + """Bulk-load a list of profiles. Overwrites duplicates.""" + for p in profiles: + self.add(p) + logger.info("[Registry] Seeded %d student profiles.", len(profiles)) def remove(self, roll_number: str) -> bool: - """Remove a student. Returns True if removed, False if not found.""" - roll_number = roll_number.strip().upper() + """Remove a student by roll number. Returns True if removed.""" if roll_number in self._store: del self._store[roll_number] return True return False - def seed(self, profiles: List[StudentProfile]) -> "StudentRegistry": - """Bulk-load profiles (upsert semantics — safe to call multiple times).""" - for p in profiles: - self.upsert(p) - logger.info("[StudentRegistry] seeded %d student records.", len(profiles)) - return self + def deactivate(self, roll_number: str) -> bool: + """Mark a student as inactive without removing the record.""" + profile = self._store.get(roll_number) + if profile: + profile.is_active = False + return True + return False # ── Read operations ─────────────────────────────────────────────────────── def get(self, roll_number: str) -> Optional[StudentProfile]: """ - Look up a student by roll number. - Returns None if not found — never raises on bad input. + Retrieve a student by roll number regardless of active status. + Returns None if not found. """ - if not isinstance(roll_number, str): - return None - return self._store.get(roll_number.strip().upper()) + return self._store.get(roll_number) - def exists(self, roll_number: str) -> bool: - """Return True if the roll number is in the registry.""" - return self.get(roll_number) is not None - - def get_safe(self, roll_number: str) -> Optional[StudentProfile]: - """ - Safe lookup with explicit format validation before hitting the store. - Returns None for both missing and malformed roll numbers. - """ - if not isinstance(roll_number, str): - return None - normalized = roll_number.strip().upper() - if not ROLL_NUMBER_PATTERN.match(normalized): - logger.warning("[StudentRegistry] malformed roll number: '%s'", roll_number) - return None - return self._store.get(normalized) - - def by_department(self, department: Department) -> List[StudentProfile]: - """Return all active students in a department, sorted by roll number.""" - return sorted( - [s for s in self._store.values() if s.department == department and s.is_active], - key=lambda s: s.roll_number, - ) - - def by_year(self, year: AcademicYear) -> List[StudentProfile]: - """Return all active students in a given year.""" - return [s for s in self._store.values() if s.year == year and s.is_active] - - def by_batch( - self, - department: Department, - year: AcademicYear, - batch: StudentBatch, - ) -> List[StudentProfile]: - """Return all active students in a specific dept/year/batch.""" - return [ - s for s in self._store.values() - if s.department == department - and s.year == year - and s.batch == batch - and s.is_active - ] + def get_active(self, roll_number: str) -> Optional[StudentProfile]: + """Retrieve a student only if they are active. Returns None otherwise.""" + profile = self._store.get(roll_number) + if profile and profile.is_active: + return profile + return None def all_active(self) -> List[StudentProfile]: """Return all active student profiles.""" - return [s for s in self._store.values() if s.is_active] + return [p for p in self._store.values() if p.is_active] - def metadata(self, roll_number: str) -> Optional[dict]: - """ - Return full metadata dict for a student, or None if not found. - Safe for logging — no raw objects in output. - """ - profile = self.get_safe(roll_number) - return profile.to_dict() if profile else None + def all_students(self) -> List[StudentProfile]: + """Return every student profile (active and inactive).""" + return list(self._store.values()) + + def exists(self, roll_number: str) -> bool: + """True if a roll number is in the registry (regardless of active status).""" + return roll_number in self._store - # ── Utility ─────────────────────────────────────────────────────────────── + # ── Meta ────────────────────────────────────────────────────────────────── @property def count(self) -> int: + """Total number of students (active + inactive).""" return len(self._store) - def __len__(self) -> int: - return self.count - - def __iter__(self) -> Iterator[StudentProfile]: - return iter(self._store.values()) + @property + def active_count(self) -> int: + """Number of active students.""" + return sum(1 for p in self._store.values() if p.is_active) def __repr__(self) -> str: - return f"" - - -# ── Development fixture data ────────────────────────────────────────────────── - -SAMPLE_STUDENTS: List[StudentProfile] = [ - StudentProfile( - roll_number = "CS2021001", - full_name = "Aman Koli", - email = "aman.koli@college.edu", - department = Department.COMPUTER_SCIENCE, - year = AcademicYear.THIRD, - batch = StudentBatch.A, - program = "B.Tech", - photo_reference= "photos/CS2021001.jpg", - is_active = True, - ), - StudentProfile( - roll_number = "CS2021002", - full_name = "Raj Koli", - email = "raj.koli@college.edu", - department = Department.COMPUTER_SCIENCE, - year = AcademicYear.THIRD, - batch = StudentBatch.A, - program = "B.Tech", - photo_reference= "photos/CS2021002.jpg", - is_active = True, - ), - StudentProfile( - roll_number = "IT2022010", - full_name = "Priya Sharma", - email = "priya.sharma@college.edu", - department = Department.INFORMATION_TECHNOLOGY, - year = AcademicYear.SECOND, - batch = StudentBatch.B, - program = "B.Tech", - photo_reference= "photos/IT2022010.jpg", - is_active = True, - ), - StudentProfile( - roll_number = "DS2020005", - full_name = "Neha Patel", - email = "neha.patel@college.edu", - department = Department.DATA_SCIENCE, - year = AcademicYear.FOURTH, - batch = StudentBatch.C, - program = "B.Tech", - photo_reference= "photos/DS2020005.jpg", - is_active = True, - ), - StudentProfile( - roll_number = "AI2023001", - full_name = "Arjun Mehta", - email = "arjun.mehta@college.edu", - department = Department.ARTIFICIAL_INTELLIGENCE, - year = AcademicYear.FIRST, - batch = StudentBatch.A, - program = "B.Tech", - photo_reference= None, - is_active = True, - ), -] + return f"" diff --git a/backend/src/agents/gatekeeper/registry/student_schema.py b/backend/src/agents/gatekeeper/registry/student_schema.py index 0d54c38..876dbfe 100644 --- a/backend/src/agents/gatekeeper/registry/student_schema.py +++ b/backend/src/agents/gatekeeper/registry/student_schema.py @@ -1,50 +1,31 @@ """ student_schema.py ───────────────── -Versioned, validated data contracts for the Student Registry. - -Responsibilities -──────────────── -- Define the canonical StudentProfile model consumed by GATEKEEPER. -- Define Department, Year, and Batch enumerations. -- Enforce strict field validation (roll number format, name length, etc.) -- Remain decoupled from storage — no DB imports here. +Canonical student data model for the GATEKEEPER registry. Rules ───── -- All models are immutable Pydantic BaseModels. -- Roll number format: uppercase letters + digits, 6–12 chars (e.g. CS2021001). -- Photo reference is a URL string or a relative path — never raw bytes. -- Adding new fields requires only a schema version bump. +- All schema changes must be backward-compatible. +- No pipeline logic here — pure data definitions only. +- All Enum values are plain strings for JSON serialization safety. """ from __future__ import annotations -import re +from dataclasses import dataclass, field from enum import Enum from typing import Optional -from pydantic import BaseModel, Field, field_validator - -# ── Schema version ──────────────────────────────────────────────────────────── -REGISTRY_SCHEMA_VERSION = "1.0.0" - -# ── Roll number format ──────────────────────────────────────────────────────── -ROLL_NUMBER_PATTERN = re.compile(r"^[A-Z0-9]{4,15}$") # ── Enumerations ────────────────────────────────────────────────────────────── class Department(str, Enum): - COMPUTER_SCIENCE = "CS" - INFORMATION_TECHNOLOGY = "IT" - ELECTRONICS = "EC" - ELECTRICAL = "EE" - MECHANICAL = "ME" - CIVIL = "CE" - CHEMICAL = "CH" - DATA_SCIENCE = "DS" - ARTIFICIAL_INTELLIGENCE= "AI" - OTHER = "OT" + COMPUTER_SCIENCE = "Computer Science" + INFORMATION_TECH = "Information Technology" + ELECTRONICS = "Electronics" + MECHANICAL = "Mechanical" + CIVIL = "Civil" + UNKNOWN = "Unknown" class AcademicYear(str, Enum): @@ -52,84 +33,50 @@ class AcademicYear(str, Enum): SECOND = "2" THIRD = "3" FOURTH = "4" - PG1 = "PG1" - PG2 = "PG2" class StudentBatch(str, Enum): - """Batch section within a year (A–F or numeric).""" A = "A" B = "B" C = "C" D = "D" - E = "E" - F = "F" # ── Student Profile ─────────────────────────────────────────────────────────── -class StudentProfile(BaseModel): +@dataclass +class StudentProfile: """ - Canonical student identity record for GATEKEEPER. - - All fields validated at construction time. - Immutable once created (model_config frozen=True). + Authoritative student identity record stored in the registry. + + Fields + ────── + roll_number : Unique 12-digit numeric identifier. + full_name : Title-cased full name. + email : Institutional email derived from roll number. + department : Academic department enum. + year : Academic year enum. + batch : Section / batch label. + program : Degree programme (e.g. "B.Tech"). + photo_reference : Path / URL to registered face photo (for face verification). + is_active : False if the student is deregistered or suspended. """ - model_config = {"frozen": True} - - schema_version: str = REGISTRY_SCHEMA_VERSION - - # Identity - roll_number: str = Field(..., description="Unique student roll number") - full_name: str = Field(..., min_length=2, max_length=120) - email: str = Field(..., description="Official institution email") - - # Academic metadata - department: Department - year: AcademicYear - batch: StudentBatch - program: str = Field(default="B.Tech", max_length=50) - - # Photo reference (URL or relative path — never raw bytes) - photo_reference: Optional[str] = Field(default=None, description="URL or path to student photo") - - # Optional extended metadata - guardian_name: Optional[str] = Field(default=None, max_length=120) - is_active: bool = True - - # ── Validators ──────────────────────────────────────────────────────────── - - @field_validator("roll_number") - @classmethod - def validate_roll_number(cls, v: str) -> str: - normalized = v.strip().upper() - if not ROLL_NUMBER_PATTERN.match(normalized): - raise ValueError( - f"Invalid roll number format '{v}'. " - f"Must be 4–15 uppercase letters/digits (e.g. CS2021001)." - ) - return normalized - - @field_validator("email") - @classmethod - def validate_email(cls, v: str) -> str: - v = v.strip().lower() - if "@" not in v or "." not in v.split("@")[-1]: - raise ValueError(f"Invalid email address: '{v}'") - return v - - @field_validator("full_name") - @classmethod - def validate_full_name(cls, v: str) -> str: - v = v.strip() - if not v.replace(" ", "").replace("-", "").replace("'", "").isalpha(): - raise ValueError(f"Full name must contain only letters, spaces, hyphens: '{v}'") - return v + roll_number: str + full_name: str + email: str + department: Department = Department.COMPUTER_SCIENCE + year: AcademicYear = AcademicYear.THIRD + batch: StudentBatch = StudentBatch.A + program: str = "B.Tech" + photo_reference: Optional[str] = None + is_active: bool = True + + def __post_init__(self) -> None: + # Normalise name casing at creation time + self.full_name = self.full_name.strip().title() def to_dict(self) -> dict: - """Return a plain-dict representation safe for logging and export.""" return { - "schema_version": self.schema_version, "roll_number": self.roll_number, "full_name": self.full_name, "email": self.email, diff --git a/backend/src/main.py b/backend/src/main.py index b09d416..c3029ff 100644 --- a/backend/src/main.py +++ b/backend/src/main.py @@ -33,8 +33,13 @@ class ResolveAlertRequest(BaseModel): conflict_id: str approved: bool = False -# Initialize the main orchestrator agent +# Initialize the orchestrator and agents main_agent = MainAgent() +gatekeeper_pipeline = main_agent.gatekeeper._pipeline # Access the global pipeline instance + +class GatekeeperVerifyRequest(BaseModel): + roll_number: str + face_id: str = None @app.get("/") async def root(): @@ -74,6 +79,22 @@ async def analyze_repo(request: AnalyzeRequest): data = context.dict() return {"status": "success", "data": data} +@app.post("/gatekeeper/verify") +async def gatekeeper_verify(request: GatekeeperVerifyRequest): + """ + Direct endpoint to run the Gatekeeper verification pipeline. + """ + result = gatekeeper_pipeline.run(request.roll_number, request.face_id) + return {"status": "success", "data": result.to_dict()} + +@app.get("/gatekeeper/registry") +async def gatekeeper_registry(): + """ + Endpoint to fetch all active registered students. + """ + students = gatekeeper_pipeline._registry.all_active() + return {"status": "success", "data": [s.to_dict() for s in students]} + @app.websocket("/ws/analyze") async def websocket_analyze(websocket: WebSocket): await websocket.accept() diff --git a/backend/tests/test_gatekeeper_pipeline.py b/backend/tests/test_gatekeeper_pipeline.py new file mode 100644 index 0000000..b9407ed --- /dev/null +++ b/backend/tests/test_gatekeeper_pipeline.py @@ -0,0 +1,157 @@ +""" +test_gatekeeper_pipeline.py +─────────────────────────── +Tests for the End-to-End GATEKEEPER Verification Pipeline. + +Covers: +- Success paths +- Face validation (stub) +- Face history (clone / swap) +- Auth Engine final decisions +""" + +from __future__ import annotations + +import pytest + +from src.agents.gatekeeper.pipeline.gatekeeper_pipeline import GatekeeperPipeline +from src.agents.gatekeeper.face_verification.face_verifier import _OVERRIDE_PASS, _OVERRIDE_FAIL +from src.agents.gatekeeper.registry.registry_store import StudentRegistry +from src.agents.gatekeeper.registry.student_schema import StudentProfile, AcademicYear, Department, StudentBatch + + +@pytest.fixture +def test_registry(): + """A small registry for deterministic pipeline tests.""" + registry = StudentRegistry() + registry.add(StudentProfile( + roll_number="VALID001", + full_name="Valid Student", + email="valid@college.edu", + photo_reference="photos/VALID001.jpg", + is_active=True, + )) + registry.add(StudentProfile( + roll_number="INACTIVE001", + full_name="Inactive Student", + email="inactive@college.edu", + photo_reference="photos/INACTIVE001.jpg", + is_active=False, + )) + registry.add(StudentProfile( + roll_number="NOPHOTO001", + full_name="No Photo Student", + email="nophoto@college.edu", + photo_reference=None, + is_active=True, + )) + return registry + + +@pytest.fixture +def pipeline(test_registry): + return GatekeeperPipeline(registry=test_registry) + + +def test_happy_path_exact_match(pipeline): + """Valid roll + matching face -> GRANTED""" + result = pipeline.run("VALID001", "photos/VALID001.jpg") + assert result.is_admitted is True + assert result.access_decision.decision.value == "granted" + assert result.stage_results["roll_verification"]["is_verified"] is True + assert result.stage_results["face_verification"]["matched"] is True + assert result.stage_results["conflict_detection"]["has_conflict"] is False + + +def test_invalid_roll_denied(pipeline): + """Roll number not in registry -> DENIED""" + result = pipeline.run("UNKNOWN999", "photos/VALID001.jpg") + assert result.is_admitted is False + assert result.access_decision.decision.value == "denied" + assert "UNKNOWN999" in result.access_decision.reasons[0] + + +def test_inactive_student_denied(pipeline): + """Student is inactive -> DENIED""" + result = pipeline.run("INACTIVE001", "photos/INACTIVE001.jpg") + assert result.is_admitted is False + assert result.access_decision.decision.value == "denied" + assert "inactive" in result.access_decision.reasons[0].lower() + + +def test_no_photo_registered_denied(pipeline): + """Student has no registered photo -> DENIED""" + result = pipeline.run("NOPHOTO001", "some_face.jpg") + assert result.is_admitted is False + assert result.access_decision.decision.value == "denied" + assert "No reference photo" in result.access_decision.reasons[0] + + +def test_face_mismatch_admin_review(pipeline): + """Face does not match -> PENDING_ADMIN_REVIEW""" + result = pipeline.run("VALID001", "wrong_face.jpg") + assert result.is_admitted is False + assert result.access_decision.decision.value == "pending_admin_review" + assert result.access_decision.requires_admin_review is True + assert "does not match" in result.access_decision.reasons[0] + + +def test_face_override_pass(pipeline): + """Admin override PASS -> GRANTED""" + result = pipeline.run("VALID001", _OVERRIDE_PASS) + assert result.is_admitted is True + + +def test_face_override_fail(pipeline): + """Admin override FAIL -> PENDING_ADMIN_REVIEW""" + result = pipeline.run("VALID001", _OVERRIDE_FAIL) + assert result.is_admitted is False + assert result.access_decision.decision.value == "pending_admin_review" + + +def test_identity_swap_conflict(pipeline): + """Same roll number uses different faces -> HIGH severity conflict -> DENIED""" + # 1. First session: valid match + r1 = pipeline.run("VALID001", "photos/VALID001.jpg") + assert r1.is_admitted is True + + # 2. Second session: someone else forces an override pass on the same roll + # Or just use the prefix matcher which also grants access + r2 = pipeline.run("VALID001", "VALID001_some_other_cam.jpg") + + # We expect DENIED because the history checker sees a different face for VALID001 + assert r2.is_admitted is False + assert r2.access_decision.decision.value == "denied" + + conflict = r2.stage_results["conflict_detection"] + assert conflict["has_conflict"] is True + assert conflict["conflict_type"] == "face_swap" + assert conflict["severity"] == "high" + + +def test_identity_clone_conflict(test_registry): + """Same face used for different roll numbers -> CRITICAL severity conflict -> DENIED""" + # Add a second valid student + test_registry.add(StudentProfile( + roll_number="VALID002", + full_name="Valid Student Two", + email="valid2@college.edu", + photo_reference="photos/VALID002.jpg", + is_active=True, + )) + pipeline = GatekeeperPipeline(registry=test_registry) + + # 1. First student verifies using a face + r1 = pipeline.run("VALID001", _OVERRIDE_PASS) + assert r1.is_admitted is True + + # 2. Second student tries to use the exact same face + r2 = pipeline.run("VALID002", _OVERRIDE_PASS) + + assert r2.is_admitted is False + assert r2.access_decision.decision.value == "denied" + + conflict = r2.stage_results["conflict_detection"] + assert conflict["has_conflict"] is True + assert conflict["conflict_type"] == "face_clone" + assert conflict["severity"] == "critical" diff --git a/frontend/app.js b/frontend/app.js index 261918c..bbbfbd4 100644 --- a/frontend/app.js +++ b/frontend/app.js @@ -469,3 +469,13 @@ window.addEventListener('beforeunload', () => { if (audioCtx) audioCtx.close(); if (stream) stream.getTracks().forEach(t => t.stop()); }); + +// ── REMOVE SPLINE LOGO ─────────────────────────────────────────────── +setInterval(() => { + document.querySelectorAll('spline-viewer').forEach(viewer => { + if (viewer.shadowRoot) { + const logo = viewer.shadowRoot.querySelector('#logo'); + if (logo) logo.remove(); + } + }); +}, 1000); diff --git a/frontend/index.html b/frontend/index.html index 47af890..c3fa418 100644 --- a/frontend/index.html +++ b/frontend/index.html @@ -1,24 +1,37 @@ + - - + + El — AI Examiner - - - - + + + + + + +
-
+
+ + +

Hi, I'm El 👋

-

Your personal AI Examiner.
I just need your camera & microphone to get started.

+

Your personal AI Examiner.
I just need your camera & microphone to get started.

@@ -40,8 +53,18 @@

Hi, I'm El 👋

@@ -51,7 +74,10 @@

Hi, I'm El 👋

You
@@ -82,9 +108,11 @@

Hi, I'm El 👋

-
-
-
+
+ + +

@@ -104,12 +132,19 @@

Hi, I'm El 👋

- +
@@ -125,11 +160,13 @@

Hi, I'm El 👋

Camera & Mic Blocked

-

Click the lock icon in your address bar, set Camera and Microphone to Allow, then reload.

+

Click the lock icon in your address bar, set Camera and Microphone to Allow, + then reload.

- + + \ No newline at end of file diff --git a/frontend/style.css b/frontend/style.css index 557ab9d..5e38d87 100644 --- a/frontend/style.css +++ b/frontend/style.css @@ -28,46 +28,108 @@ html,body{height:100%;font-family:'Inter',system-ui,sans-serif;background:var(-- z-index:200;transition:opacity .45s,transform .45s; } .gate.out{opacity:0;transform:scale(.96);pointer-events:none} +.gate-orb-wrap{ + position:relative;width:280px;height:280px; + display:flex;align-items:center;justify-content:center; + margin-bottom:4px; +} + +spline-viewer::part(logo) { + display: none !important; +} + +/* ─── Rings must combine centering + float in ONE transform ─── */ +@keyframes ring-float{ + from{ transform:translate(-50%,-50%) translateY(0) } + to { transform:translate(-50%,-50%) translateY(-10px) } +} + +.gate-ring { + position: absolute; + border-radius: 50%; + top: 50%; left: 50%; + transform: translate(-50%, -50%); + animation: ring-float 3s ease-in-out infinite alternate; + pointer-events: none; +} + +/* Inner ring — darkest, right at the orb edge */ +.gate-ring-1 { + width: 240px; height: 240px; + border: 4px solid rgba(88, 32, 210, 0.92); + box-shadow: + 0 0 28px rgba(108,47,255,0.45), + inset 0 0 20px rgba(108,47,255,0.28); + z-index: 5; +} + +/* Middle ring */ +.gate-ring-2 { + width: 296px; height: 296px; + border: 2px solid rgba(130, 80, 240, 0.40); + z-index: 4; + animation-delay: 0.15s; +} + +/* Outer ring — faintest */ +.gate-ring-3 { + width: 352px; height: 352px; + border: 1.5px solid rgba(160, 120, 245, 0.18); + z-index: 3; + animation-delay: 0.30s; +} + +/* ─── Orb ─── */ .gate-orb{ - width:160px;height:160px;border-radius:50%; + width:200px;height:200px;border-radius:50%; position:relative;overflow:hidden; - flex-shrink:0;margin-bottom:4px; - /* Pale lavender frosted-glass base */ - background:radial-gradient(ellipse at 55% 48%, - #e2e0f9 0%, - #d3d0f0 40%, - rgba(200,196,230,0.65) 75%, - rgba(210,205,240,0.35) 100% - ); - /* Subtle sphere edge glow */ + flex-shrink:0;z-index:10; + /* Blue-sky atmosphere base */ + background: + radial-gradient(ellipse at 50% 25%, rgba(110,185,255,0.90) 0%, transparent 55%), + radial-gradient(ellipse at 50% 80%, rgba(155,110,240,0.82) 0%, transparent 55%), + linear-gradient(145deg, rgba(130,190,255,0.65) 0%, rgba(175,140,245,0.75) 100%); box-shadow: - 0 0 0 1px rgba(190,180,240,0.28), - 0 22px 60px rgba(120,100,200,0.18), - 0 0 100px rgba(160,140,230,0.12); + 0 0 0 2.5px rgba(255,255,255,0.92), + inset 0 4px 35px rgba(255,255,255,0.72), + 0 22px 70px rgba(108,47,255,0.34); animation:orb-float 3s ease-in-out infinite alternate; } -/* Swirling colour blobs — blurred for the cloudy interior look */ + +/* Photorealistic cloud blobs */ .gate-orb::before{ content:'';position:absolute; inset:-12%;border-radius:50%; background: - radial-gradient(circle at 42% 42%, rgba(50,190,255,0.82) 0%, transparent 30%), - radial-gradient(circle at 22% 60%, rgba(70, 95,240,0.50) 0%, transparent 28%), - radial-gradient(circle at 70% 66%, rgba(175,110,255,0.58) 0%, transparent 33%), - radial-gradient(circle at 72% 22%, rgba(255,255,255,0.95) 0%, transparent 24%), - radial-gradient(circle at 50% 50%, rgba(210,225,255,0.22) 0%, transparent 55%); - filter:blur(16px); + /* Large central white cloud mass */ + radial-gradient(ellipse 80% 65% at 38% 50%, rgba(255,255,255,1) 0%, rgba(255,255,255,0.80) 28%, transparent 60%), + /* Upper-right cloud puff */ + radial-gradient(ellipse 55% 50% at 66% 28%, rgba(255,255,255,0.96) 0%, rgba(255,255,255,0.45) 40%, transparent 65%), + /* Sky blue – left edge */ + radial-gradient(ellipse 48% 80% at 2% 48%, rgba(75,170,255,0.90) 0%, transparent 60%), + /* Sky blue – upper centre */ + radial-gradient(ellipse 72% 44% at 54% 2%, rgba(95,185,255,0.85) 0%, transparent 58%), + /* Rich purple cloud – lower right */ + radial-gradient(ellipse 58% 55% at 74% 74%, rgba(165,100,250,0.90) 0%, transparent 58%), + /* Purple shadow – lower left */ + radial-gradient(ellipse 46% 44% at 20% 80%, rgba(140,100,230,0.75) 0%, transparent 55%), + /* Bright specular highlight */ + radial-gradient(circle at 78% 16%, rgba(255,255,255,1) 0%, transparent 18%); + filter:blur(13px); animation:orb-drift 9s ease-in-out infinite; } -/* 3-D rim darkening — makes the sphere look solid and glassy */ + +/* Glass-sphere rim darkening */ .gate-orb::after{ content:'';position:absolute;inset:0;border-radius:50%; background:radial-gradient(ellipse at 50% 50%, - transparent 52%, - rgba(130,115,200,0.10) 72%, - rgba(110, 95,185,0.22) 100% + transparent 44%, + rgba(108,80,200,0.06) 64%, + rgba(90,65,185,0.20) 82%, + rgba(75,50,170,0.42) 100% ); } + @keyframes orb-float{from{transform:translateY(0)}to{transform:translateY(-10px)}} @keyframes orb-hue{0%{filter:hue-rotate(0deg)}100%{filter:hue-rotate(25deg)}} @keyframes orb-drift{ @@ -223,10 +285,44 @@ html,body{height:100%;font-family:'Inter',system-ui,sans-serif;background:var(-- /* Hero */ .hero{display:flex;flex-direction:column;align-items:center;text-align:center;gap:16px;padding-bottom:32px;flex-shrink:0} -.hero-orb-wrap{position:relative;width:140px;height:140px;display:flex;align-items:center;justify-content:center;margin-bottom:6px} -.hero-orb-glow{position:absolute;inset:-24px;border-radius:50%;background:radial-gradient(circle,rgba(108,47,255,.16) 0%,transparent 70%);animation:orb-float 3s ease-in-out infinite alternate} +.hero-orb-wrap{position:relative;width:260px;height:260px;display:flex;align-items:center;justify-content:center;margin-bottom:12px} +.hero-orb-glow{position:absolute;width:100%;height:100%;border-radius:50%;background:radial-gradient(circle,rgba(108,47,255,.12) 0%,transparent 65%);animation:orb-float 3s ease-in-out infinite alternate} + +.hero-ring { + position: absolute; + border-radius: 50%; + inset: 0; + margin: auto; + animation: orb-float 3s ease-in-out infinite alternate; + pointer-events: none; +} + +.hero-ring-1 { + width: 154px; + height: 154px; + border: 3.5px solid rgba(108, 47, 255, 0.85); + box-shadow: 0 0 16px rgba(108, 47, 255, 0.3), inset 0 0 12px rgba(108, 47, 255, 0.2); + z-index: 5; +} + +.hero-ring-2 { + width: 186px; + height: 186px; + border: 2px solid rgba(108, 47, 255, 0.35); + z-index: 4; + animation-delay: 0.1s; +} + +.hero-ring-3 { + width: 222px; + height: 222px; + border: 1.5px solid rgba(108, 47, 255, 0.15); + z-index: 3; + animation-delay: 0.2s; +} + .hero-orb{ - width:128px;height:128px;border-radius:50%;position:relative;z-index:1; + width:130px;height:130px;border-radius:50%;position:relative;z-index:10; overflow:hidden; background:radial-gradient(ellipse at 55% 48%, #e2e0f9 0%, @@ -235,9 +331,9 @@ html,body{height:100%;font-family:'Inter',system-ui,sans-serif;background:var(-- rgba(210,205,240,0.35) 100% ); box-shadow: - 0 0 0 1px rgba(190,180,240,0.28), - 0 22px 60px rgba(120,100,200,0.18), - 0 0 100px rgba(160,140,230,0.12); + 0 0 0 1px rgba(255,255,255,0.7), + inset 0 0 16px rgba(255,255,255,0.6), + 0 12px 32px rgba(108,47,255,0.25); animation:orb-float 3s ease-in-out infinite alternate; } .hero-orb::before{ @@ -267,8 +363,6 @@ html,body{height:100%;font-family:'Inter',system-ui,sans-serif;background:var(-- 0 0 80px rgba(160,140,230,0.35), 0 22px 60px rgba(120,100,200,0.2); } -.hero-orb-ring{position:absolute;inset:0;border-radius:50%;border:2.5px solid rgba(108,47,255,.3);animation:pr 2.8s ease-out infinite} -@keyframes pr{0%{transform:scale(1);opacity:.7}100%{transform:scale(1.52);opacity:0}} .hero-text{font-size:1rem;font-weight:600;color:var(--text1);line-height:1.7;max-width:400px;min-height:1.7em;text-align:center} .hero-line2{font-size:.92rem;font-weight:400;color:var(--text2)}