Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 58 additions & 1 deletion backend/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
from .core.config import settings
from .agents.main_agent.agent import MainAgent
from .services.face_detection import FaceDetectionService
from .services.audit_log import AuditLogService

app = FastAPI(title=settings.PROJECT_NAME)
face_service = FaceDetectionService()
audit_log = AuditLogService()

app.add_middleware(
CORSMiddleware,
Expand All @@ -32,6 +34,14 @@ class FaceVerifyRequest(BaseModel):
class ResolveAlertRequest(BaseModel):
conflict_id: str
approved: bool = False
reviewer_id: Optional[str] = None
reason: Optional[str] = None

class AdminReviewRequest(BaseModel):
conflict_id: str
approved: bool
reviewer_id: str
reason: str

# Initialize the main orchestrator agent
main_agent = MainAgent()
Expand Down Expand Up @@ -60,9 +70,56 @@ async def get_pending_alerts():

@app.post("/face/resolve-alert")
async def resolve_alert(request: ResolveAlertRequest):
success = face_service.resolve_alert(request.conflict_id, request.approved)
success = face_service.resolve_alert(
request.conflict_id,
request.approved,
request.reviewer_id,
request.reason
)
return {"success": success}

@app.get("/face/conflict/{conflict_id}")
async def get_conflict_details(conflict_id: str):
details = face_service.get_conflict_details(conflict_id)
if details is None:
return {"error": "Conflict not found"}, 404
return details
Comment on lines +82 to +86

@app.post("/admin/review-conflict")
async def admin_review_conflict(request: AdminReviewRequest):
success = face_service.admin_review_conflict(
request.conflict_id,
request.approved,
request.reviewer_id,
request.reason
)
if not success:
return {"error": "Conflict not found"}, 404
return {"success": True, "message": "Review decision recorded"}
Comment on lines +90 to +98

@app.get("/admin/override-log")
async def get_override_log():
return face_service.get_override_log()

@app.get("/audit/timeline")
async def get_audit_timeline(roll_number: Optional[str] = None, session_id: Optional[str] = None):
result = []
if session_id:
result = audit_log.get_session_timeline(session_id)
elif roll_number:
result = audit_log.get_timeline(roll_number)
else:
result = audit_log.get_timeline()
return result
Comment on lines +104 to +113

@app.get("/audit/search")
async def search_audit_log(query: str):
return audit_log.search(query)

@app.get("/audit/events/{event_type}")
async def get_audit_events_by_type(event_type: str):
return audit_log.get_events_by_type(event_type)

@app.post("/analyze")
async def analyze_repo(request: AnalyzeRequest):
# Legacy REST endpoint for backward compatibility
Expand Down
14 changes: 14 additions & 0 deletions backend/src/models/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,20 @@ class EventType(str, Enum):
MANUAL_REVIEW_REQUIRED = "manual_review.required"
ACCESS_DENIED_CONFLICT = "access.denied_conflict"

# Admin Review Events
ADMIN_REVIEW_STARTED = "admin_review.started"
ADMIN_REVIEW_APPROVED = "admin_review.approved"
ADMIN_REVIEW_REJECTED = "admin_review.rejected"
ADMIN_OVERRIDE_ACTION = "admin_override.action"

# Audit Log Events
VERIFICATION_ATTEMPT = "verification.attempt"
FACE_MATCH = "face.match"
FACE_MISMATCH = "face.mismatch"
SUSPICIOUS_IDENTITY = "suspicious.identity"
ADMIN_OVERRIDE = "admin.override"
EXAMINATION_ACCESS = "examination.access"

# ORACLE Events
FILE_RECEIVED = "file_received"
PDF_PARSED = "pdf_parsed"
Expand Down
159 changes: 159 additions & 0 deletions backend/src/services/access_decision.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
from enum import Enum
from datetime import datetime, timezone
import hashlib


class AccessDecision(str, Enum):
ACCESS_GRANTED = "ACCESS_GRANTED"
ACCESS_DENIED = "ACCESS_DENIED"
MANUAL_REVIEW_REQUIRED = "MANUAL_REVIEW_REQUIRED"
TEMPORARY_BLOCK = "TEMPORARY_BLOCK"


@dataclass
class VerificationOutput:
verified: bool
roll_number: str
confidence: float
timestamp: datetime


@dataclass
class IdentityConflictResult:
has_conflict: bool
conflict_id: Optional[str]
matched_rolls: List[str]
status: str


@dataclass
class SafetyCheck:
name: str
passed: bool
message: str


@dataclass
class AccessDecisionResult:
decision: AccessDecision
reason: str
confidence: float
evidence: Dict[str, Any]
decision_id: str
timestamp: datetime

def to_dict(self) -> Dict[str, Any]:
return {
"decision": self.decision.value,
"reason": self.reason,
"confidence": round(self.confidence, 4),
"evidence": self.evidence,
"decision_id": self.decision_id,
"timestamp": self.timestamp.isoformat(),
}


class ExaminationAccessDecision:
def __init__(self):
self._decisions: Dict[str, AccessDecisionResult] = {}

def _generate_decision_id(
self, roll_number: str, decision: AccessDecision, timestamp: datetime
) -> str:
raw = f"{timestamp.isoformat()}:{roll_number}:{decision.value}"
return hashlib.sha256(raw.encode()).hexdigest()[:16]

def decide(
self,
roll_number: str,
verification: Optional[VerificationOutput] = None,
conflict: Optional[IdentityConflictResult] = None,
safety_checks: Optional[List[SafetyCheck]] = None,
session_id: Optional[str] = None,
) -> AccessDecisionResult:
timestamp = datetime.now(timezone.utc)
evidence: Dict[str, Any] = {
"session_id": session_id,
"verification": None,
"conflict": None,
"safety_checks": [],
}

if verification:
evidence["verification"] = {
"verified": verification.verified,
"confidence": verification.confidence,
"timestamp": verification.timestamp.isoformat(),
}

if conflict:
evidence["conflict"] = {
"has_conflict": conflict.has_conflict,
"conflict_id": conflict.conflict_id,
"matched_rolls": conflict.matched_rolls,
"status": conflict.status,
}

if safety_checks:
evidence["safety_checks"] = [
{"name": sc.name, "passed": sc.passed, "message": sc.message}
for sc in safety_checks
]

if not verification or not verification.verified:
decision = AccessDecision.ACCESS_DENIED
reason = "Identity verification failed or missing"
confidence = 0.0
elif conflict and conflict.has_conflict:
if conflict.status == "pending_review":
decision = AccessDecision.MANUAL_REVIEW_REQUIRED
reason = f"Identity conflict pending review: {conflict.conflict_id}"
else:
decision = AccessDecision.ACCESS_DENIED
reason = f"Identity conflict detected with: {', '.join(conflict.matched_rolls)}"
confidence = 0.5
elif safety_checks:
failed_checks = [sc for sc in safety_checks if not sc.passed]
if failed_checks:
if any("block" in sc.message.lower() for sc in failed_checks):
decision = AccessDecision.TEMPORARY_BLOCK
reason = f"Safety violation: {failed_checks[0].message}"
else:
decision = AccessDecision.ACCESS_DENIED
reason = f"Safety check failed: {failed_checks[0].message}"
confidence = 0.3
else:
decision = AccessDecision.ACCESS_GRANTED
reason = "All checks passed"
confidence = 0.95
else:
decision = AccessDecision.ACCESS_GRANTED
reason = "All verification checks passed"
confidence = 0.95

decision_id = self._generate_decision_id(roll_number, decision, timestamp)
result = AccessDecisionResult(
decision=decision,
reason=reason,
confidence=confidence,
evidence=evidence,
decision_id=decision_id,
timestamp=timestamp,
)

self._decisions[decision_id] = result
return result

def get_decision(self, decision_id: str) -> Optional[AccessDecisionResult]:
return self._decisions.get(decision_id)

def get_decisions_for_roll(self, roll_number: str) -> List[AccessDecisionResult]:
return [
d for d in self._decisions.values()
if roll_number in str(d.evidence.get("verification", {}))
]
Comment on lines +152 to +156

def clear(self):
self._decisions.clear()
Loading
Loading