22
33from __future__ import annotations
44
5- import base64
65import secrets
76from typing import Literal
87from uuid import UUID
1110
1211from src .core .config .setting import get_settings
1312from src .core .email .service import EmailService
14- from src .modules .user .domain .entities .user import User , UserSecurity
13+ from src .modules .user .domain .entities .user import User
1514from src .modules .user .domain .repositories .user_repository import UserRepository
1615from src .shared .unit_of_work import UnitOfWork
1716
1817
1918class TwoFactorAuthService :
2019 """Service for managing two-factor authentication.
21-
20+
2221 Supports:
2322 - TOTP (Time-based One-Time Password) for authenticator apps like Google Authenticator, Authy, etc.
2423 - Email-based 2FA codes
@@ -37,7 +36,7 @@ def __init__(
3736
3837 async def setup_totp (self , user_id : UUID ) -> dict [str , str ]:
3938 """Set up TOTP for a user.
40-
39+
4140 Returns:
4241 dict with 'secret', 'uri', and 'qr_code_data' keys
4342 """
@@ -48,14 +47,13 @@ async def setup_totp(self, user_id: UUID) -> dict[str, str]:
4847
4948 # Generate a new secret
5049 secret = pyotp .random_base32 ()
51-
50+
5251 # Create TOTP URI for QR code generation
5352 issuer = self ._settings .JWT_ISSUER or "TodoApp"
5453 uri = pyotp .totp .TOTP (secret ).provisioning_uri (
55- name = user .email ,
56- issuer_name = issuer
54+ name = user .email , issuer_name = issuer
5755 )
58-
56+
5957 # Store the secret temporarily (not enabled yet)
6058 user .security .two_factor_secret = secret
6159 user .security .two_factor_enabled = False
@@ -65,16 +63,16 @@ async def setup_totp(self, user_id: UUID) -> dict[str, str]:
6563 return {
6664 "secret" : secret ,
6765 "uri" : uri ,
68- "qr_code_data" : f"otpauth://totp/{ issuer } :{ user .email } ?secret={ secret } &issuer={ issuer } "
66+ "qr_code_data" : f"otpauth://totp/{ issuer } :{ user .email } ?secret={ secret } &issuer={ issuer } " ,
6967 }
7068
7169 async def verify_totp_setup (self , user_id : UUID , code : str ) -> dict [str , list [str ]]:
7270 """Verify TOTP setup and enable 2FA.
73-
71+
7472 Args:
7573 user_id: The user's ID
7674 code: The TOTP code from the authenticator app
77-
75+
7876 Returns:
7977 dict with 'backup_codes' key containing recovery codes
8078 """
@@ -93,7 +91,7 @@ async def verify_totp_setup(self, user_id: UUID, code: str) -> dict[str, list[st
9391
9492 # Generate backup codes
9593 backup_codes = [secrets .token_hex (4 ) for _ in range (10 )]
96-
94+
9795 # Enable 2FA and store backup codes
9896 user .security .two_factor_enabled = True
9997 user .security .two_factor_backup_codes = "," .join (backup_codes )
@@ -104,11 +102,11 @@ async def verify_totp_setup(self, user_id: UUID, code: str) -> dict[str, list[st
104102
105103 async def disable_totp (self , user_id : UUID , code : str ) -> bool :
106104 """Disable TOTP 2FA for a user.
107-
105+
108106 Args:
109107 user_id: The user's ID
110108 code: Current TOTP code or backup code for verification
111-
109+
112110 Returns:
113111 True if successfully disabled
114112 """
@@ -122,7 +120,7 @@ async def disable_totp(self, user_id: UUID, code: str) -> bool:
122120
123121 # Verify code
124122 verified = False
125-
123+
126124 # Check if it's a backup code
127125 if user .security .two_factor_backup_codes :
128126 backup_codes = user .security .two_factor_backup_codes .split ("," )
@@ -151,10 +149,10 @@ async def disable_totp(self, user_id: UUID, code: str) -> bool:
151149
152150 async def send_email_2fa_code (self , user_id : UUID ) -> bool :
153151 """Send a 2FA code via email.
154-
152+
155153 Args:
156154 user_id: The user's ID
157-
155+
158156 Returns:
159157 True if email was sent successfully
160158 """
@@ -168,14 +166,15 @@ async def send_email_2fa_code(self, user_id: UUID) -> bool:
168166
169167 # Generate a 6-digit code
170168 code = secrets .token_hex (3 )[:6 ]
171-
169+
172170 # Store the code temporarily in security settings (with expiry info)
173171 # In production, you'd want to store this in Redis with TTL
174172 if not user .security :
175173 raise ValueError ("User security not found" )
176-
174+
177175 # Store code with timestamp (format: "code:timestamp")
178176 import time
177+
179178 user .security .two_factor_secret = f"email_code:{ code } :{ int (time .time ())} "
180179 await self ._user_repository .save_security (user .security )
181180 await self ._unit_of_work .commit ()
@@ -191,22 +190,22 @@ async def send_email_2fa_code(self, user_id: UUID) -> bool:
191190 </body>
192191 </html>
193192 """
194-
193+
195194 await self ._email_service .send_email (
196195 to = user .email ,
197196 subject = "Your Verification Code" ,
198197 html_body = html_body ,
199198 )
200-
199+
201200 return True
202201
203202 async def verify_email_2fa_code (self , user_id : UUID , code : str ) -> bool :
204203 """Verify an email-based 2FA code.
205-
204+
206205 Args:
207206 user_id: The user's ID
208207 code: The code received via email
209-
208+
210209 Returns:
211210 True if code is valid
212211 """
@@ -222,17 +221,18 @@ async def verify_email_2fa_code(self, user_id: UUID, code: str) -> bool:
222221 parts = stored .split (":" )
223222 if len (parts ) != 3 :
224223 raise ValueError ("Invalid code format" )
225-
224+
226225 stored_code = parts [1 ]
227226 timestamp = int (parts [2 ])
228-
227+
229228 import time
229+
230230 current_time = int (time .time ())
231-
231+
232232 # Code expires after 10 minutes
233233 if current_time - timestamp > 600 :
234234 raise ValueError ("Code has expired" )
235-
235+
236236 if stored_code != code :
237237 raise ValueError ("Invalid code" )
238238
@@ -244,35 +244,35 @@ async def verify_email_2fa_code(self, user_id: UUID, code: str) -> bool:
244244 return True
245245
246246 async def verify_2fa_code (
247- self ,
248- user : User ,
249- code : str ,
250- method : Literal ["totp" , "email" , "backup" ] = "totp"
247+ self , user : User , code : str , method : Literal ["totp" , "email" , "backup" ] = "totp"
251248 ) -> bool :
252249 """Verify a 2FA code using the specified method.
253-
250+
254251 Args:
255252 user: The user entity
256253 code: The verification code
257254 method: The verification method ('totp', 'email', or 'backup')
258-
255+
259256 Returns:
260257 True if verification successful
261258 """
262259 if not user .security :
263260 raise ValueError ("User security not found" )
264261
265262 if method == "totp" :
266- if not user .security .two_factor_secret or not user .security .two_factor_enabled :
263+ if (
264+ not user .security .two_factor_secret
265+ or not user .security .two_factor_enabled
266+ ):
267267 raise ValueError ("TOTP 2FA is not enabled" )
268-
268+
269269 totp = pyotp .TOTP (user .security .two_factor_secret )
270270 return bool (totp .verify (code , valid_window = 1 ))
271271
272272 elif method == "backup" :
273273 if not user .security .two_factor_backup_codes :
274274 raise ValueError ("No backup codes available" )
275-
275+
276276 backup_codes = user .security .two_factor_backup_codes .split ("," )
277277 if code in backup_codes :
278278 # Remove used backup code
@@ -289,13 +289,15 @@ async def verify_2fa_code(
289289
290290 return False
291291
292- async def regenerate_backup_codes (self , user_id : UUID , verify_code : str ) -> dict [str , list [str ]]:
292+ async def regenerate_backup_codes (
293+ self , user_id : UUID , verify_code : str
294+ ) -> dict [str , list [str ]]:
293295 """Regenerate backup codes for a user.
294-
296+
295297 Args:
296298 user_id: The user's ID
297299 verify_code: Current TOTP code for verification
298-
300+
299301 Returns:
300302 dict with 'backup_codes' key containing new recovery codes
301303 """
0 commit comments