Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 58 additions & 1 deletion backend/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
from .core.config import settings
from .agents.main_agent.agent import MainAgent
from .services.face_detection import FaceDetectionService
from .services.audit_log import AuditLogService

app = FastAPI(title=settings.PROJECT_NAME)
face_service = FaceDetectionService()
audit_log = AuditLogService()

app.add_middleware(
CORSMiddleware,
Expand All @@ -32,6 +34,14 @@ class FaceVerifyRequest(BaseModel):
class ResolveAlertRequest(BaseModel):
conflict_id: str
approved: bool = False
reviewer_id: Optional[str] = None
reason: Optional[str] = None

class AdminReviewRequest(BaseModel):
conflict_id: str
approved: bool
reviewer_id: str
reason: str

# Initialize the main orchestrator agent
main_agent = MainAgent()
Expand Down Expand Up @@ -60,9 +70,56 @@ async def get_pending_alerts():

@app.post("/face/resolve-alert")
async def resolve_alert(request: ResolveAlertRequest):
success = face_service.resolve_alert(request.conflict_id, request.approved)
success = face_service.resolve_alert(
request.conflict_id,
request.approved,
request.reviewer_id,
request.reason
)
return {"success": success}

@app.get("/face/conflict/{conflict_id}")
async def get_conflict_details(conflict_id: str):
details = face_service.get_conflict_details(conflict_id)
if details is None:
return {"error": "Conflict not found"}, 404
return details

@app.post("/admin/review-conflict")
async def admin_review_conflict(request: AdminReviewRequest):
Comment on lines +88 to +89
success = face_service.admin_review_conflict(
request.conflict_id,
request.approved,
request.reviewer_id,
request.reason
)
if not success:
return {"error": "Conflict not found"}, 404
return {"success": True, "message": "Review decision recorded"}

@app.get("/admin/override-log")
async def get_override_log():
Comment on lines +100 to +101
return face_service.get_override_log()

@app.get("/audit/timeline")
async def get_audit_timeline(roll_number: Optional[str] = None, session_id: Optional[str] = None):
Comment on lines +104 to +105
result = []
if session_id:
result = audit_log.get_session_timeline(session_id)
elif roll_number:
result = audit_log.get_timeline(roll_number)
else:
result = audit_log.get_timeline()
return result

@app.get("/audit/search")
async def search_audit_log(query: str):
return audit_log.search(query)

@app.get("/audit/events/{event_type}")
async def get_audit_events_by_type(event_type: str):
return audit_log.get_events_by_type(event_type)

@app.post("/analyze")
async def analyze_repo(request: AnalyzeRequest):
# Legacy REST endpoint for backward compatibility
Expand Down
14 changes: 14 additions & 0 deletions backend/src/models/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,20 @@ class EventType(str, Enum):
MANUAL_REVIEW_REQUIRED = "manual_review.required"
ACCESS_DENIED_CONFLICT = "access.denied_conflict"

# Admin Review Events
ADMIN_REVIEW_STARTED = "admin_review.started"
ADMIN_REVIEW_APPROVED = "admin_review.approved"
ADMIN_REVIEW_REJECTED = "admin_review.rejected"
ADMIN_OVERRIDE_ACTION = "admin_override.action"

# Audit Log Events
VERIFICATION_ATTEMPT = "verification.attempt"
FACE_MATCH = "face.match"
FACE_MISMATCH = "face.mismatch"
SUSPICIOUS_IDENTITY = "suspicious.identity"
ADMIN_OVERRIDE = "admin.override"
EXAMINATION_ACCESS = "examination.access"
Comment on lines +31 to +36

# ORACLE Events
FILE_RECEIVED = "file_received"
PDF_PARSED = "pdf_parsed"
Expand Down
201 changes: 201 additions & 0 deletions backend/src/services/audit_log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
from typing import Optional, List, Dict, Any
from dataclasses import dataclass, field
from datetime import datetime, timezone
import hashlib
import json


@dataclass
class AuditEvent:
event_id: str
timestamp: datetime
event_type: str
roll_number: str
session_id: Optional[str]
payload: Dict[str, Any]
metadata: Optional[Dict[str, Any]] = None

def to_dict(self) -> Dict[str, Any]:
return {
"event_id": self.event_id,
"timestamp": self.timestamp.isoformat(),
"event_type": self.event_type,
"roll_number": self.roll_number,
"session_id": self.session_id,
"payload": self.payload,
"metadata": self.metadata,
}


class AuditLogService:
def __init__(self):
self._logs: List[AuditEvent] = []

def _generate_event_id(self, roll_number: str, event_type: str, timestamp: datetime) -> str:
raw = f"{timestamp.isoformat()}:{roll_number}:{event_type}"
return hashlib.sha256(raw.encode()).hexdigest()[:16]

def log_verification_attempt(
self,
roll_number: str,
session_id: Optional[str],
capture_info: Dict[str, Any],
result: str,
confidence: float,
) -> AuditEvent:
event = AuditEvent(
event_id=self._generate_event_id(roll_number, "verification_attempt", datetime.now(timezone.utc)),
timestamp=datetime.now(timezone.utc),
event_type="verification_attempt",
roll_number=roll_number,
session_id=session_id,
payload={
"result": result,
"confidence": confidence,
"capture": capture_info,
},
)
self._logs.append(event)
return event

def log_match(
self,
roll_number: str,
session_id: Optional[str],
similarity: float,
confidence: float,
) -> AuditEvent:
event = AuditEvent(
event_id=self._generate_event_id(roll_number, "face_match", datetime.now(timezone.utc)),
timestamp=datetime.now(timezone.utc),
event_type="face_match",
roll_number=roll_number,
session_id=session_id,
payload={
"similarity": similarity,
"confidence": confidence,
},
)
self._logs.append(event)
return event

def log_mismatch(
self,
roll_number: str,
session_id: Optional[str],
similarity: float,
confidence: float,
threshold: float,
) -> AuditEvent:
event = AuditEvent(
event_id=self._generate_event_id(roll_number, "face_mismatch", datetime.now(timezone.utc)),
timestamp=datetime.now(timezone.utc),
event_type="face_mismatch",
roll_number=roll_number,
session_id=session_id,
payload={
"similarity": similarity,
"confidence": confidence,
"threshold": threshold,
},
)
self._logs.append(event)
return event

def log_suspicious_event(
self,
roll_number: str,
session_id: Optional[str],
reason: str,
conflict_id: Optional[str] = None,
matched_rolls: Optional[List[str]] = None,
) -> AuditEvent:
event = AuditEvent(
event_id=self._generate_event_id(roll_number, "suspicious_identity", datetime.now(timezone.utc)),
timestamp=datetime.now(timezone.utc),
event_type="suspicious_identity",
roll_number=roll_number,
session_id=session_id,
payload={
"reason": reason,
"conflict_id": conflict_id,
"matched_rolls": matched_rolls or [],
},
)
self._logs.append(event)
return event

def log_admin_override(
self,
roll_number: str,
session_id: Optional[str],
conflict_id: str,
reviewer_id: str,
approved: bool,
reason: str,
) -> AuditEvent:
event = AuditEvent(
event_id=self._generate_event_id(roll_number, "admin_override", datetime.now(timezone.utc)),
timestamp=datetime.now(timezone.utc),
event_type="admin_override",
roll_number=roll_number,
session_id=session_id,
payload={
"conflict_id": conflict_id,
"reviewer_id": reviewer_id,
"approved": approved,
"reason": reason,
},
)
self._logs.append(event)
return event

def log_examination_access(
self,
roll_number: str,
session_id: str,
granted: bool,
reason: Optional[str] = None,
) -> AuditEvent:
event = AuditEvent(
event_id=self._generate_event_id(roll_number, "examination_access", datetime.now(timezone.utc)),
timestamp=datetime.now(timezone.utc),
event_type="examination_access",
roll_number=roll_number,
session_id=session_id,
payload={
"granted": granted,
"reason": reason,
},
)
self._logs.append(event)
return event

def get_timeline(self, roll_number: Optional[str] = None) -> List[Dict[str, Any]]:
events = self._logs
if roll_number:
events = [e for e in events if e.roll_number == roll_number]
return [e.to_dict() for e in sorted(events, key=lambda x: x.timestamp)]

def get_events_by_type(self, event_type: str) -> List[Dict[str, Any]]:
events = [e for e in self._logs if e.event_type == event_type]
return [e.to_dict() for e in sorted(events, key=lambda x: x.timestamp)]

def get_session_timeline(self, session_id: str) -> List[Dict[str, Any]]:
events = [e for e in self._logs if e.session_id == session_id]
return [e.to_dict() for e in sorted(events, key=lambda x: x.timestamp)]

def search(self, query: str) -> List[Dict[str, Any]]:
query_lower = query.lower()
results = []
for event in self._logs:
payload_str = json.dumps(event.payload).lower()
if query_lower in payload_str or query_lower in event.roll_number.lower():
results.append(event.to_dict())
return results

def clear(self):
self._logs.clear()

def count(self) -> int:
return len(self._logs)
51 changes: 49 additions & 2 deletions backend/src/services/face_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ class ConflictAlert:
timestamp: datetime
status: str = "pending_review"
session_id: Optional[str] = None
reviewer_id: Optional[str] = None
review_timestamp: Optional[datetime] = None
review_reason: Optional[str] = None

def to_dict(self) -> Dict[str, Any]:
return {
Expand All @@ -36,6 +39,9 @@ def to_dict(self) -> Dict[str, Any]:
"timestamp": self.timestamp.isoformat(),
"status": self.status,
"session_id": self.session_id,
"reviewer_id": self.reviewer_id,
"review_timestamp": self.review_timestamp.isoformat() if self.review_timestamp else None,
"review_reason": self.review_reason,
}


Expand Down Expand Up @@ -124,10 +130,13 @@ def get_conflicts_for_roll(self, roll_number: str) -> List[ConflictAlert]:
def get_pending_alerts(self) -> List[ConflictAlert]:
return [alert for alert in self.alerts if alert.status == "pending_review"]

def resolve_alert(self, conflict_id: str, approved: bool) -> bool:
def resolve_alert(self, conflict_id: str, approved: bool, reviewer_id: Optional[str] = None, reason: Optional[str] = None) -> bool:
for alert in self.alerts:
if alert.conflict_id == conflict_id:
alert.status = "approved" if approved else "rejected"
alert.reviewer_id = reviewer_id
alert.review_timestamp = datetime.now(timezone.utc)
alert.review_reason = reason
return True
return False

Expand All @@ -147,4 +156,42 @@ def can_grant_access(self, roll_number: str) -> Tuple[bool, Optional[str]]:
for conflict in conflicts:
if conflict.status == "pending_review":
return False, f"Identity under review: conflict {conflict.conflict_id}"
return True, None
return True, None

def get_conflict_details(self, conflict_id: str) -> Optional[Dict[str, Any]]:
for alert in self.alerts:
if alert.conflict_id == conflict_id:
prior_embeddings = []
for roll in alert.matched_roll_numbers:
if roll in self.embeddings:
emb = self.embeddings[roll]
prior_embeddings.append({
"roll_number": emb.roll_number,
"student_name": emb.student_name,
"timestamp": emb.timestamp.isoformat(),
"session_id": emb.session_id,
})
return {
"conflict": alert.to_dict(),
"prior_embeddings": prior_embeddings,
}
return None

def admin_review_conflict(self, conflict_id: str, approved: bool, reviewer_id: str, reason: str) -> bool:
for alert in self.alerts:
if alert.conflict_id == conflict_id:
alert.status = "approved" if approved else "rejected"
alert.reviewer_id = reviewer_id
alert.review_timestamp = datetime.now(timezone.utc)
alert.review_reason = reason
return True
return False

def get_override_log(self) -> List[Dict[str, Any]]:
return [alert.to_dict() for alert in self.alerts if alert.status in ("approved", "rejected")]

def get_alert_by_id(self, conflict_id: str) -> Optional[ConflictAlert]:
for alert in self.alerts:
if alert.conflict_id == conflict_id:
return alert
return None
Loading
Loading