Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions backend/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
from .core.config import settings
from .agents.main_agent.agent import MainAgent
from .services.face_detection import FaceDetectionService
from .services.audit_log import AuditLogService

app = FastAPI(title=settings.PROJECT_NAME)
face_service = FaceDetectionService()
audit_log = AuditLogService()

app.add_middleware(
CORSMiddleware,
Expand Down Expand Up @@ -99,6 +101,25 @@ async def admin_review_conflict(request: AdminReviewRequest):
async def get_override_log():
return face_service.get_override_log()

@app.get("/audit/timeline")
async def get_audit_timeline(roll_number: Optional[str] = None, session_id: Optional[str] = None):
result = []
if session_id:
result = audit_log.get_session_timeline(session_id)
elif roll_number:
result = audit_log.get_timeline(roll_number)
else:
result = audit_log.get_timeline()
return result

@app.get("/audit/search")
async def search_audit_log(query: str):
return audit_log.search(query)

@app.get("/audit/events/{event_type}")
async def get_audit_events_by_type(event_type: str):
return audit_log.get_events_by_type(event_type)

@app.post("/analyze")
async def analyze_repo(request: AnalyzeRequest):
# Legacy REST endpoint for backward compatibility
Expand Down
8 changes: 8 additions & 0 deletions backend/src/models/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ class EventType(str, Enum):
ADMIN_REVIEW_REJECTED = "admin_review.rejected"
ADMIN_OVERRIDE_ACTION = "admin_override.action"

# Audit Log Events
VERIFICATION_ATTEMPT = "verification.attempt"
FACE_MATCH = "face.match"
FACE_MISMATCH = "face.mismatch"
SUSPICIOUS_IDENTITY = "suspicious.identity"
ADMIN_OVERRIDE = "admin.override"
EXAMINATION_ACCESS = "examination.access"
Comment on lines +31 to +36

# ORACLE Events
FILE_RECEIVED = "file_received"
PDF_PARSED = "pdf_parsed"
Expand Down
159 changes: 159 additions & 0 deletions backend/src/services/access_decision.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
from enum import Enum
from datetime import datetime, timezone
import hashlib


class AccessDecision(str, Enum):
ACCESS_GRANTED = "ACCESS_GRANTED"
ACCESS_DENIED = "ACCESS_DENIED"
MANUAL_REVIEW_REQUIRED = "MANUAL_REVIEW_REQUIRED"
TEMPORARY_BLOCK = "TEMPORARY_BLOCK"


@dataclass
class VerificationOutput:
verified: bool
roll_number: str
confidence: float
timestamp: datetime


@dataclass
class IdentityConflictResult:
has_conflict: bool
conflict_id: Optional[str]
matched_rolls: List[str]
status: str


@dataclass
class SafetyCheck:
name: str
passed: bool
message: str


@dataclass
class AccessDecisionResult:
decision: AccessDecision
reason: str
confidence: float
evidence: Dict[str, Any]
decision_id: str
timestamp: datetime

def to_dict(self) -> Dict[str, Any]:
return {
"decision": self.decision.value,
"reason": self.reason,
"confidence": round(self.confidence, 4),
"evidence": self.evidence,
"decision_id": self.decision_id,
"timestamp": self.timestamp.isoformat(),
}


class ExaminationAccessDecision:
def __init__(self):
self._decisions: Dict[str, AccessDecisionResult] = {}

def _generate_decision_id(
self, roll_number: str, decision: AccessDecision, timestamp: datetime
) -> str:
raw = f"{timestamp.isoformat()}:{roll_number}:{decision.value}"
return hashlib.sha256(raw.encode()).hexdigest()[:16]

def decide(
self,
roll_number: str,
verification: Optional[VerificationOutput] = None,
conflict: Optional[IdentityConflictResult] = None,
safety_checks: Optional[List[SafetyCheck]] = None,
session_id: Optional[str] = None,
) -> AccessDecisionResult:
timestamp = datetime.now(timezone.utc)
evidence: Dict[str, Any] = {
"session_id": session_id,
"verification": None,
"conflict": None,
"safety_checks": [],
}

if verification:
evidence["verification"] = {
"verified": verification.verified,
"confidence": verification.confidence,
"timestamp": verification.timestamp.isoformat(),
}

if conflict:
evidence["conflict"] = {
"has_conflict": conflict.has_conflict,
"conflict_id": conflict.conflict_id,
"matched_rolls": conflict.matched_rolls,
"status": conflict.status,
}

if safety_checks:
evidence["safety_checks"] = [
{"name": sc.name, "passed": sc.passed, "message": sc.message}
for sc in safety_checks
]

if not verification or not verification.verified:
decision = AccessDecision.ACCESS_DENIED
reason = "Identity verification failed or missing"
confidence = 0.0
elif conflict and conflict.has_conflict:
if conflict.status == "pending_review":
decision = AccessDecision.MANUAL_REVIEW_REQUIRED
reason = f"Identity conflict pending review: {conflict.conflict_id}"
else:
decision = AccessDecision.ACCESS_DENIED
reason = f"Identity conflict detected with: {', '.join(conflict.matched_rolls)}"
confidence = 0.5
elif safety_checks:
failed_checks = [sc for sc in safety_checks if not sc.passed]
if failed_checks:
if any("block" in sc.message.lower() for sc in failed_checks):
decision = AccessDecision.TEMPORARY_BLOCK
reason = f"Safety violation: {failed_checks[0].message}"
else:
decision = AccessDecision.ACCESS_DENIED
reason = f"Safety check failed: {failed_checks[0].message}"
confidence = 0.3
else:
decision = AccessDecision.ACCESS_GRANTED
reason = "All checks passed"
confidence = 0.95
else:
decision = AccessDecision.ACCESS_GRANTED
reason = "All verification checks passed"
confidence = 0.95
Comment on lines +130 to +134

decision_id = self._generate_decision_id(roll_number, decision, timestamp)
result = AccessDecisionResult(
decision=decision,
reason=reason,
confidence=confidence,
evidence=evidence,
decision_id=decision_id,
timestamp=timestamp,
)

self._decisions[decision_id] = result
return result

def get_decision(self, decision_id: str) -> Optional[AccessDecisionResult]:
return self._decisions.get(decision_id)

def get_decisions_for_roll(self, roll_number: str) -> List[AccessDecisionResult]:
return [
d for d in self._decisions.values()
if roll_number in str(d.evidence.get("verification", {}))
]
Comment on lines +152 to +156

def clear(self):
self._decisions.clear()
Loading
Loading