Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 86 additions & 42 deletions App/routes/explain.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import os
import io
from pydantic import Field
from fastapi import APIRouter, status, Depends
from fastapi.exceptions import HTTPException
from fastapi.responses import JSONResponse
Expand Down Expand Up @@ -94,47 +92,93 @@ async def get_explain_image_result(
"cam_loc": result["cam_loc"],
}

# @router.post("/video/{video_id}/frame/{frame_index}", status_code=status.HTTP_202_ACCEPTED,
# response_class=JSONResponse, summary="딥페이크 비디오 프레임 위조 흔적 시각화 비동기 접수")
# async def explain_frame(
# video_id: int,
# frame_index: int,
# explain_req: ExplainFrameRequest,
# conn: Connection = Depends(context_get_conn),
# session_user = Depends(session_svc.get_session_user_prt), # 로그인 필수
# ):
# # 딥페이크 비디오 추론 결과 가져오기
# result = await video_svc.get_video_result(conn, video_id)

# if result.status != "SUCCESS":
# raise HTTPException(
# status_code = status.HTTP_400_BAD_REQUEST,
# detail = "비디오 프레임 위조 흔적 분석은 추론이 성공한 비디오에서만 가능합니다"
# )
@router.post("/video/{video_id}/frame/{frame_index}", status_code=status.HTTP_202_ACCEPTED,
response_class=JSONResponse, summary="딥페이크 비디오 프레임 위조 흔적 시각화 비동기 접수")
async def explain_frame(
video_id: int,
frame_index: int,
explain_req: ExplainFrameRequest,
conn: Connection = Depends(context_get_conn),
session_user = Depends(session_svc.get_session_user_prt), # 로그인 필수
):
# 딥페이크 비디오 추론 결과 가져오기
result = await video_svc.get_video_result(conn, video_id)

# 딥페이크 비디오 추론 성공 여부 확인하기
if result.status != "SUCCESS":
raise HTTPException(
status_code = status.HTTP_400_BAD_REQUEST,
detail = "비디오 프레임 위조 흔적 분석은 추론이 성공한 비디오에서만 가능합니다"
)

# video_path = "." + result.video_loc
# 비디오 파일 저장 경로 가져오기
video_path = "." + result.video_loc
if not os.path.exists(video_path):
raise HTTPException(
status_code = status.HTTP_404_NOT_FOUND,
detail = f"요청하신 비디오 파일을 찾을 수 없습니다. 삭제하였는지 다시 확인해주세요."
)

# if not os.path.exists(video_path):
# raise HTTPException(
# status_code = status.HTTP_404_NOT_FOUND,
# detail = f"요청하신 비디오 파일을 찾을 수 없습니다. 삭제하였는지 다시 확인해주세요."
# )

# if result.model_type == "pro" and explain_req.aug_smooth:
# raise HTTPException(
# status_code=status.HTTP_400_BAD_REQUEST,
# detail="Pro 모델은 aug_smooth 기능을 지원하지 않습니다",
# )
# 딥페이크 비디오 프레임 위조 흔적 분석 (pro model는 aug_smooth 사용 불가, 연산이 너무 많아짐)
if result.model_type == "pro" and explain_req.aug_smooth:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Pro 모델은 aug_smooth 기능을 지원하지 않습니다",
)

# 비디오 내 해당 frame이 몇초에 위치한 frame인지 확인
frame_time = video_svc.get_video_frame_by_index(conn, video_id, frame_index)

# # Celery Task 호출(Redis Broker 활용)
# task = explain_svc.process_explain_frame_task.delay(
# user_email = session_user["email"],
# result_dict = result.model_dump(mode='json'),
# explain_req_dict = explain_req.model_dump())
# return {
# "message": "딥페이크 비디오 프레임 위조 흔적 시각화 접수 완료. 시각화 분석 시작 ...",
# "task_id": task.id,
# }
# Celery Task 호출(Redis Broker 활용)
task = explain_svc.process_explain_frame_task.delay(
user_email = session_user["email"],
version_type = result.version_type,
domain_type = result.domain_type,
video_loc = result.video_loc,
video_id = video_id,
category = 1 if result.label == "FAKE" else 0,
frame_time = frame_time,
explain_req_dict = explain_req.model_dump())
return {
"message": "딥페이크 비디오 프레임 위조 흔적 시각화 접수 완료. 시각화 분석 시작 ...",
"task_id": task.id,
}

# async def get_explain_frame_result():
# return None
@router.get("/frame/result/{task_id}", status_code=status.HTTP_200_OK,
response_class=JSONResponse, summary="딥페이크 비디오 프레임 위조 흔적 시각화 결과 가져오기")
async def get_explain_frame_result(
task_id: str,
session_user = Depends(session_svc.get_session_user_prt), # 로그인 필수
):

# Redis Broker에서 Task ID에 해당하는 비동기 작업 상태 가져오기
task = AsyncResult(task_id, app=celery_app)

# 비동기 작업 진행 상태 Check
if task.state in ("PENDING", "STARTED", "RETRY"):
return JSONResponse(
status_code = status.HTTP_202_ACCEPTED,
content = {"message": "딥페이크 비디오 프레임 위조 흔적 시각화 분석 중 ..."}
)

# 비동기 작업 실패
if task.state == "FAILURE":
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="딥페이크 비디오 프레임 위조 흔적 시각화 중 알 수 없는 오류가 발생하였습니다")

# Celery Task 결과 가져오기
result = task.result

# 딥페이크 비디오 프레임 위조 흔적 시각화 생성 또는 파일 저장 도중 오류 발생
if result["status"] == "FAILED":
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=result["message"])

return {
"status": result["status"],
"message": result["message"],
"cam_loc": result["cam_loc"],
}

1 change: 1 addition & 0 deletions App/schemas/image_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class InferenceResult(BaseModel):
result_msg : str
class UserHistory(BaseMetadata):
user_id: int
status: str

class UserHistory_indi(UserHistory, InferenceResult):
pass
Expand Down
89 changes: 89 additions & 0 deletions App/services/explain_svc.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@

_explainer_cache: dict = {}

# 비디오 내 특정 Frame 추출 + Face Cropping
def _extract_face_from_frame(video_path: str, frame_time: float, explainer: CAMExplainer):
cap = cv2.VideoCapture(video_path)
cap.set(cv2.CAP_PROP_POS_MSEC, frame_time * 1000)
ret, frame = cap.read()
cap.release()

frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
bbox = explainer._get_face_bbox(frame_rgb)

return explainer._crop_face(frame_rgb, bbox[:4])

# 캐시된 CAMExplainer 객체 반환하거나 새로 생성
def _get_or_create_explainer(model_name: str, dataset: str, explain_req_dict: dict):
cache_key = (model_name, dataset, explain_req_dict["explainer_type"], explain_req_dict["branch_level"])
Expand All @@ -50,6 +62,19 @@ def _run_visualization(explainer: CAMExplainer, image_path: str, category: int,
else: # display_type == "heatmap_bbox"
return explainer.display_heatmap_bbox_on_image(image_path, image_weight=explain_req_dict["overlay_ratio"], threshold=explain_req_dict["threshold"],
category=category, aug_smooth=explain_req_dict["aug_smooth"], eigen_smooth=explain_req_dict["eigen_smooth"])

# 비디오 프레임 시각화 생성 (heatmap, contour, bbox 선택)
def _run_visualization_from_array(explainer: CAMExplainer, face: str, category: int, explain_req_dict: dict) -> np.ndarray:
if explain_req_dict["display_type"] == "heatmap":
return explainer.display_heatmap_from_array(face, image_weight=explain_req_dict["overlay_ratio"], threshold=explain_req_dict["threshold"],
category=category, aug_smooth=explain_req_dict["aug_smooth"], eigen_smooth=explain_req_dict["eigen_smooth"])
elif explain_req_dict["display_type"] == "bbox":
return explainer.display_bbox_from_array(face, threshold=explain_req_dict["threshold"],
category=category, aug_smooth=explain_req_dict["aug_smooth"], eigen_smooth=explain_req_dict["eigen_smooth"])
else: # display_type == "heatmap_bbox"
return explainer.display_heatmap_bbox_from_array(face, image_weight=explain_req_dict["overlay_ratio"], threshold=explain_req_dict["threshold"],
category=category, aug_smooth=explain_req_dict["aug_smooth"], eigen_smooth=explain_req_dict["eigen_smooth"])

# 딥페이크 이미지 위조 흔적 시각화 처리
@celery_app.task(name="process_explain_image_task")
def process_explain_image_task(user_email: str,
Expand Down Expand Up @@ -109,3 +134,67 @@ async def run_explain():
return loop.run_until_complete(run_explain())
finally:
loop.close()

# 딥페이크 비디오 프레임 위조 흔적 시각화 처리
@celery_app.task(name="process_explain_frame_task")
def process_explain_frame_task(user_email: str,
version_type: str,
domain_type: str,
video_loc: str,
video_id: int,
category: int,
frame_time: float,
explain_req_dict: dict):
async def run_explain():
cam_loc = None
try:
model_name, dataset = inference_svc.MODEL_CONFIG[version_type][explain_req_dict["model_type"]][domain_type]
video_path = "." + video_loc

explainer = _get_or_create_explainer(model_name, dataset, explain_req_dict)

face = _extract_face_from_frame(video_path, frame_time, explainer)

# 비디오 프레임 시각화 생성 시작
try:
image = _run_visualization_from_array(explainer, face, category, explain_req_dict)
except Exception:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="딥페이크 비디오 프레임 위조 흔적을 생성하는 중 오류가 발생하였습니다"
)

# 비디오 프레임 시각화 파일 저장
try:
cam_loc = await image_svc.upload_frame_cam(user_email, video_id, frame_time, image)
except Exception:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="딥페이크 비디오 프레임 위조 흔적 파일을 저장하는 중 오류가 발생했습니다."
)

return {"status": "SUCCESS",
"message": "딥페이크 비디오 프레임 위조 흔적 시각화가 성공적으로 이루어졌습니다",
"cam_loc": cam_loc}

except HTTPException as e:
print(e.detail)
return {"status": "FAILED", "message": str(e.detail)}

except Exception as e:
print(str(e))
return {"status": "FAILED", "message": str(e)}

finally:
# 임시 저장된 비디오 프레임 시각화 파일 삭제
if cam_loc:
image_svc.cleanup_image_cam.apply_async(args=[cam_loc], countdown=60)

# 동기식 Celery 워커 내 비동기 이벤트 루프 구동
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(run_explain())
finally:
loop.close()

22 changes: 21 additions & 1 deletion App/services/image_svc.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,25 @@ async def upload_image_cam(user_email: str, image_id: int, image: np.ndarray) ->

return cam_loc[1:].replace("\\", "/")

# 딥페이크 비디오 프레임 위조 흔적 시각화 파일 서버 내 저장 (회원 전용)
async def upload_frame_cam(user_email: str, video_id: int, frame_time: float, image: np.ndarray) -> str:
user_dir = os.path.join(EXPLAIN_UPLOAD_DIR, user_email)
os.makedirs(user_dir, exist_ok=True)

cam_filename = f"v{video_id}_t{frame_time}_{int(time.time())}.png"
cam_loc = os.path.join(user_dir, cam_filename)

image_bgr = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
_, buf = cv2.imencode(".png", image_bgr)

try:
async with aio.open(cam_loc, "wb") as outfile:
await outfile.write(buf.tobytes())
except Exception as e:
raise e

return cam_loc[1:].replace("\\", "/")

# 사용자 업로드 이미지 서버 내 삭제
# 호출 : image.py : history 삭제 할 때 db 와 실제 파일 삭제
# 호출 : inference.py : 추론 FAIL일 때 delete_video and delete_video_db 실행
Expand Down Expand Up @@ -147,7 +166,7 @@ async def get_user_histories(conn: Connection, user_id: int):
async def get_user_history(conn: Connection, image_id: int):
try:
query = """
SELECT id, user_id, image_loc, label, score, face_conf, face_ratio, face_brightness, version_type, model_type, domain_type, result_msg, created_at
SELECT id, user_id, image_loc, status, label, score, face_conf, face_ratio, face_brightness, version_type, model_type, domain_type, result_msg, created_at
FROM image_result
WHERE id = :image_id;
"""
Expand All @@ -162,6 +181,7 @@ async def get_user_history(conn: Connection, image_id: int):
image_id = row.id,
user_id = row.user_id,
image_loc = row.image_loc,
status = row.status,
label = row.label,
score = row.score,
face_conf = row.face_conf,
Expand Down
27 changes: 27 additions & 0 deletions App/services/video_svc.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,33 @@ async def get_video_frame_result(conn: Connection, video_id: int):
except Exception as e:
print(e)
raise HTTPException(status_code=500, detail="알수없는 이유로 문제가 발생하였습니다.")

# 비디오 특정 프레임의 Frame Time 조회
# 호출 위치: routers/explain.py - explain_frame()
# 비디오 내 frame_index에 해당하는 특정 frame의 frame_time만 추출한다
async def get_video_frame_by_index(conn: Connection, video_id: int, frame_index: int):
try:
query = text("""
SELECT frame_time
FROM video_frame_result
WHERE video_id = :video_id AND frame_index = :frame_index
""")

result = await conn.execute(query, {"video_id": video_id, "frame_index": frame_index})
if result.rowcount == 0:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail=f"해당 ID: {video_id} Video 내 Frame Index {frame_index}에 해당하는 프레임을 찾을 수 없습니다")

row = result.fetchone()

return row.frame_time

except SQLAlchemyError as e:
print(f"[Frame Query Error] {e}")
raise HTTPException(status_code=503, detail="데이터베이스 조회 중 문제가 발생했습니다.")
except Exception as e:
print(e)
raise HTTPException(status_code=500, detail="알수없는 이유로 문제가 발생하였습니다.")



Loading