Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion backend/src/agents/gatekeeper/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
from .agent import GatekeeperAgent
from .pipeline.gatekeeper_pipeline import GatekeeperPipeline
from .pipeline.pipeline_result import PipelineResult

__all__ = ["GatekeeperAgent"]
__all__ = [
"GatekeeperAgent",
"GatekeeperPipeline",
"PipelineResult",
]
146 changes: 47 additions & 99 deletions backend/src/agents/gatekeeper/agent.py
Original file line number Diff line number Diff line change
@@ -1,108 +1,56 @@
"""
GatekeeperAgent — Identity, Session, and Student Registry gate.

Entry point for all student identity verification in the EL system.
Consumes the StudentRegistry to validate roll numbers before any session
is allowed to proceed to ORACLE analysis.
"""

from typing import Any, Dict, Optional
from typing import Any, Dict
from src.agents.base import BaseAgent
from .registry.registry_store import StudentRegistry, SAMPLE_STUDENTS
from .registry.lookup import RegistryLookup, LookupResult, LookupFailureReason
from src.models.events import EventType

from .pipeline.gatekeeper_pipeline import GatekeeperPipeline


class GatekeeperAgent(BaseAgent):
def __init__(self, registry: Optional[StudentRegistry] = None):
def __init__(self):
super().__init__(name="GatekeeperAgent")
# Use injected registry (for testing) or create and seed the default one
if registry is not None:
self._registry = registry
else:
self._registry = StudentRegistry()
self._registry.seed(SAMPLE_STUDENTS)
self._lookup = RegistryLookup(self._registry)
# Global pipeline instance so history store persists across requests
self._pipeline = GatekeeperPipeline()

async def process(
self,
session_id: str,
input_data: Dict[str, Any],
log_callback=None,
) -> Dict[str, Any]:
async def process(self, session_id: str, input_data: Dict[str, Any], log_callback=None) -> Dict[str, Any]:
"""
Validate student identity before allowing session to proceed.

Expected input_data keys:
- roll_number (str): Student roll number to verify
- [any other session fields passed through]

Returns input_data enriched with:
- gatekeeper_status: "verified" | "rejected"
- gatekeeper_reason: failure reason string (if rejected)
- student_profile: plain dict of StudentProfile (if verified)
Gatekeeper Identity & Session Agent.
Runs the full verification pipeline (Roll + Face + Conflict).
"""
async def send_log(msg: str, log_type: str = "info"):
async def send_log(msg: str, type: str = "info"):
if log_callback:
await log_callback({"message": msg, "type": log_type})

self.log_info(f"Gatekeeper processing session {session_id}")
await send_log("[Gatekeeper] Identity verification started.", "info")

roll_number: str = input_data.get("roll_number", "")

# ── Roll-number lookup ────────────────────────────────────────────────
result: LookupResult = self._lookup.by_roll_number(roll_number)

if result.success:
self.log_info(
f"Student verified: {result.profile.full_name} ({result.roll_number})"
)
await send_log(
f"[Gatekeeper] ✅ Student verified: {result.profile.full_name}", "success"
)
self.emit_event(
session_id, "AGENT_PROGRESS",
{
"agent": "Gatekeeper",
"status": "complete",
"milestone": f"Identity Verified: {result.roll_number}",
},
)
return {
**input_data,
"gatekeeper_status": "verified",
"gatekeeper_reason": None,
"student_profile": result.profile.to_dict(),
}

# ── Rejection ─────────────────────────────────────────────────────────
self.log_info(
f"Gatekeeper rejected '{roll_number}': {result.failure_reason} — {result.message}"
)
await send_log(
f"[Gatekeeper] ❌ Rejected: {result.message}", "error"
)
self.emit_event(
session_id, "AGENT_PROGRESS",
{
"agent": "Gatekeeper",
"status": "failed",
"milestone": f"Identity Rejected: {result.failure_reason}",
},
)
return {
**input_data,
"gatekeeper_status": "rejected",
"gatekeeper_reason": result.failure_reason.value if result.failure_reason else "unknown",
"student_profile": None,
}

@property
def registry(self) -> StudentRegistry:
"""Expose the registry for direct queries (e.g. batch listing)."""
return self._registry

@property
def lookup(self) -> RegistryLookup:
"""Expose the lookup service for external callers."""
return self._lookup
await log_callback({"message": msg, "type": type})

self.log_info(f"Gatekeeper running verification for session {session_id}.")
await send_log("[Gatekeeper] Starting End-to-End Verification Pipeline...", "info")

# ── 1. Extract inputs ─────────────────────────────────────────────────
# Typically provided by frontend/client API.
raw_roll = input_data.get("roll_number", "")
raw_face = input_data.get("face_id", "")

# ── 2. Run Pipeline ───────────────────────────────────────────────────
result = self._pipeline.run(raw_roll, raw_face)

# ── 3. Emit Events & Logs ─────────────────────────────────────────────
decision = result.access_decision

if result.is_admitted:
await send_log(f"[Gatekeeper] Identity Verified: {decision.student_name} ({decision.roll_number})", "success")
self.emit_event(session_id, EventType.IDENTITY_VERIFIED, {"roll_number": decision.roll_number})
else:
await send_log(f"[Gatekeeper] Access {decision.decision.value.upper()}: {decision.reasons[0]}", "error")

self.emit_event(session_id, EventType.AGENT_PROGRESS, {
"agent": "Gatekeeper",
"status": "complete",
"milestone": "Pipeline Executed",
"decision": decision.decision.value,
})

# ── 4. Return Output ──────────────────────────────────────────────────
# Append pipeline results into the original input_data dictionary so
# Oracle or downstream agents can consume it.
output_data = input_data.copy()
output_data["gatekeeper_result"] = result.to_dict()

return output_data
14 changes: 14 additions & 0 deletions backend/src/agents/gatekeeper/authorization/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""
authorization/__init__.py
─────────────────────────
Public surface of the GATEKEEPER authorization package.
"""

from .access_decision import AccessDecision, DecisionStatus
from .auth_engine import AuthorizationEngine

__all__ = [
"AccessDecision",
"DecisionStatus",
"AuthorizationEngine",
]
64 changes: 64 additions & 0 deletions backend/src/agents/gatekeeper/authorization/access_decision.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""
access_decision.py
──────────────────
Final output of the GATEKEEPER Authorization Engine.

Rules
─────
- Frozen dataclass — immutable after creation.
- Fully serializable.
- Contains the final go/no-go decision and the audit trail explaining why.
"""

from __future__ import annotations

from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List


class DecisionStatus(str, Enum):
GRANTED = "granted"
DENIED = "denied"
PENDING_ADMIN_REVIEW = "pending_admin_review"


@dataclass(frozen=True)
class AccessDecision:
"""
Final authorization decision for a viva session request.

Fields
──────
decision : GRANTED, DENIED, or PENDING_ADMIN_REVIEW.
roll_number : The requested roll number (or raw input if invalid).
student_name : The resolved student name (if available).
reasons : List of human-readable explanations for the decision.
requires_admin_review : True if the session is halted pending staff override.
audit_trail : Dump of all pipeline stage results for logging.
"""
decision: DecisionStatus
roll_number: str
student_name: str
reasons: List[str] = field(default_factory=list)
requires_admin_review: bool = False
audit_trail: Dict[str, Any] = field(default_factory=dict)

@property
def is_granted(self) -> bool:
return self.decision == DecisionStatus.GRANTED

@property
def is_denied(self) -> bool:
return self.decision == DecisionStatus.DENIED

def to_dict(self) -> dict:
return {
"decision": self.decision.value,
"roll_number": self.roll_number,
"student_name": self.student_name,
"reasons": self.reasons,
"requires_admin_review": self.requires_admin_review,
"is_granted": self.is_granted,
"audit_trail": self.audit_trail,
}
145 changes: 145 additions & 0 deletions backend/src/agents/gatekeeper/authorization/auth_engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
"""
auth_engine.py
──────────────
Authorization Engine for the GATEKEEPER pipeline.

Responsibilities
────────────────
- Consume the results of all prior pipeline stages (Roll verification, Face verification, Conflict detection).
- Apply final business rules to determine if the session should be GRANTED, DENIED, or PENDING_ADMIN_REVIEW.
- Compile the final AccessDecision with a complete audit trail.

Rules
─────
- Pure logic — evaluates inputs and emits a decision.
- Deterministic and auditable.
- Never raises.
"""

from __future__ import annotations

import logging
from typing import Any, Dict

from src.agents.gatekeeper.conflict_detection.conflict_detector import ConflictReport, ConflictSeverity
from src.agents.gatekeeper.face_verification.face_result import FaceVerificationResult
from src.agents.gatekeeper.roll_verification.flow import VerificationFlowResult
from .access_decision import AccessDecision, DecisionStatus

logger = logging.getLogger(__name__)


class AuthorizationEngine:
"""
Final decision maker for the GATEKEEPER pipeline.

Rules:
1. If roll verification failed (invalid, not found, inactive) -> DENIED.
2. If conflict severity is CRITICAL (face clone) -> DENIED.
3. If conflict severity is HIGH (face swap) -> DENIED.
4. If face verification failed -> PENDING_ADMIN_REVIEW (or DENIED based on strictness).
* We use DENIED for NO_PHOTO or UNVERIFIABLE, but PENDING_ADMIN_REVIEW for MISMATCH if we want staff to verify.
* For this implementation, we will DENY hard on MISMATCH to be strict, but leave a path for admin override.
5. If admin flagged required -> PENDING_ADMIN_REVIEW.
6. Else -> GRANTED.
"""

def evaluate(
self,
roll_result: VerificationFlowResult,
face_result: FaceVerificationResult,
conflict_report: ConflictReport,
) -> AccessDecision:
"""
Evaluate all stage outputs and return a final AccessDecision.
"""
reasons = []
roll_number = roll_result.roll_number
student_name = roll_result.display_card.full_name if roll_result.display_card else "Unknown"

# Build audit trail
audit_trail: Dict[str, Any] = {
"roll_verification": roll_result.to_dict(),
"face_verification": face_result.to_dict(),
"conflict_detection": conflict_report.to_dict(),
}

# ── Rule 1: Roll Verification Failure ─────────────────────────────────
if roll_result.is_rejected:
reasons.append(roll_result.message)
return AccessDecision(
decision=DecisionStatus.DENIED,
roll_number=roll_number,
student_name=student_name,
reasons=reasons,
audit_trail=audit_trail,
)

# ── Rule 2: Manual Review Flag ────────────────────────────────────────
if roll_result.requires_manual:
reasons.append("Manual staff verification requested.")
return AccessDecision(
decision=DecisionStatus.PENDING_ADMIN_REVIEW,
roll_number=roll_number,
student_name=student_name,
reasons=reasons,
requires_admin_review=True,
audit_trail=audit_trail,
)

# ── Rule 3: Identity Conflicts ────────────────────────────────────────
if conflict_report.has_conflict:
reasons.append(conflict_report.reason)
if conflict_report.severity in (ConflictSeverity.CRITICAL, ConflictSeverity.HIGH):
# Hard block for severe conflicts
return AccessDecision(
decision=DecisionStatus.DENIED,
roll_number=roll_number,
student_name=student_name,
reasons=reasons,
audit_trail=audit_trail,
)
elif conflict_report.severity == ConflictSeverity.MEDIUM:
# Soft block for suspicious but not guaranteed fraud
return AccessDecision(
decision=DecisionStatus.PENDING_ADMIN_REVIEW,
roll_number=roll_number,
student_name=student_name,
reasons=reasons,
requires_admin_review=True,
audit_trail=audit_trail,
)

# ── Rule 4: Face Verification Failure ─────────────────────────────────
if not face_result.matched:
reasons.append(face_result.reason)
# If it's a mismatch but no fraud detected, we might want admin review
# For strictness, if confidence is 0, we deny. But let's route mismatch to admin review.
if face_result.status.value == "mismatch":
return AccessDecision(
decision=DecisionStatus.PENDING_ADMIN_REVIEW,
roll_number=roll_number,
student_name=student_name,
reasons=reasons,
requires_admin_review=True,
audit_trail=audit_trail,
)
else:
# NO_PHOTO or UNVERIFIABLE
return AccessDecision(
decision=DecisionStatus.DENIED,
roll_number=roll_number,
student_name=student_name,
reasons=reasons,
audit_trail=audit_trail,
)

# ── Success ───────────────────────────────────────────────────────────
reasons.append(f"Identity fully verified for {student_name}.")
return AccessDecision(
decision=DecisionStatus.GRANTED,
roll_number=roll_number,
student_name=student_name,
reasons=reasons,
audit_trail=audit_trail,
)
Loading
Loading