Files
B42/at_django_boilerplate/accounts/utils.py
2026-01-07 12:09:20 +05:30

190 lines
6.3 KiB
Python
Executable File

from django.utils.encoding import force_bytes
from django.utils.http import urlsafe_base64_encode
from django.template.loader import render_to_string
from .tokens import account_activation_token
from django.core.mail import EmailMessage
from django.shortcuts import redirect
from django.core.mail import EmailMultiAlternatives
from django.conf import settings
from django.contrib.auth import login
from .models import CustomUser
from at_django_boilerplate.utils.hash_utils import hexdigest
from at_django_boilerplate.utils.encryption_utils import EncryptionUtils
def send_activation_email(request, user, to_email):
try:
domain = request.get_host()
message = render_to_string('registration/verify_email_message.html', {
'request': request,
'user': user,
'domain': domain,
'uid': urlsafe_base64_encode(force_bytes(user.pk)),
'token': account_activation_token.make_token(user),
})
subject = 'Welcome to we-kwick. Verify your Email'
from_email = settings.EMAIL_HOST_USER
_to_email = [to_email]
text_content = ''
html_content = message
email = EmailMultiAlternatives(subject, text_content, from_email, _to_email)
email.attach_alternative(html_content, "text/html")
email.send()
print(f'Sent Joining Email to {to_email} ')
return True
except Exception as e:
print('Failed to Send Email')
return False
def redirect_to_next_or_home(request):
try:
if 'next' in request.POST:
_next = request.POST.get('next')
if _next is not None:
if _next == '' or _next == '/':
return redirect('home')
else:
return redirect(_next)
else:
return redirect('home')
else:
return redirect('home')
except Exception as e:
print('Exception Post:', e)
return redirect('home')
def save_user(request,user_form):
email = user_form.cleaned_data['email'].lower()
contact_number = user_form.cleaned_data['contact_number']
email_hash = hexdigest(email)
existing_email = CustomUser.objects.filter(email_hash=email_hash)
existing_email_exists = existing_email.exists()
existing_number_exists = False
if all([contact_number != '',contact_number != None]):
contact_number_hash = hexdigest(contact_number)
existing_number = CustomUser.objects.filter(contact_number_hash=contact_number_hash)
existing_number_exists = existing_number.exists()
if existing_email_exists:
user_form.add_error('email', 'Email is already in use.')
elif existing_number_exists:
user_form.add_error('contact_number', 'Contact number is already in use.')
else:
try:
user = user_form.save(commit=False)
user.set_password(user.password)
user.save(custom_save=True)
login(request, user)
user.is_active = False
user.save()
send_activation_email(request=request,user=user,to_email=user.get_decrypted_email())
return True,user
except Exception as e:
return False,user_form
return False,user_form
# Otp
import random
import re
from django.core.mail import send_mail
from django.core.cache import cache
from django.conf import settings
PHONE_REGEX = re.compile(r'^\+?\d{10,15}$')
def generate_otp(length=6):
return ''.join([str(random.randint(0, 9)) for _ in range(length)])
# def send_otp(identifier, purpose='login'):
# """
# Sends an OTP via email or SMS based on the identifier.
# Returns a tuple: (success: bool, method: 'email'|'sms')
# """
# otp = generate_otp()
# cache_key = f'otp_{purpose}_{identifier}'
# if PHONE_REGEX.match(identifier):
# method = 'sms'
# cache.set(cache_key, otp, timeout=300)
# # Replace with your SMS API logic (e.g., Twilio)
# print(f"📲 SMS OTP sent to {identifier}: {otp}")
# else:
# method = 'email'
# cache.set(cache_key, otp, timeout=300)
# try:
# send_mail(
# subject=f"Your {purpose.capitalize()} OTP",
# message=f"Your {purpose} OTP is: {otp}\n\nThis OTP is valid for 5 minutes.",
# from_email=settings.EMAIL_HOST_USER,
# recipient_list=[identifier],
# fail_silently=False,
# )
# except Exception as e:
# print(f"❌ Failed to send OTP email to {identifier}: {e}")
# return False, 'email'
# return True, method
from django.template.loader import render_to_string
from django.core.mail import EmailMessage
def send_otp(identifier, purpose='login'):
"""
Sends an OTP via email or SMS based on the identifier.
Returns a tuple: (success: bool, method: 'email'|'sms')
"""
otp = generate_otp()
cache_key = f'otp_{purpose}_{identifier}'
if PHONE_REGEX.match(identifier):
method = 'sms'
cache.set(cache_key, otp, timeout=300)
# Replace with your SMS API logic
print(f"📲 SMS OTP sent to {identifier}: {otp}")
else:
method = 'email'
cache.set(cache_key, otp, timeout=300)
try:
email_subject = f"Your {purpose.capitalize()} OTP"
email_body = render_to_string('otp/otp_email.html', {
'otp': otp,
'purpose': purpose,
'identifier': identifier,
'validity_minutes': 5,
})
email = EmailMessage(
subject=email_subject,
body=email_body,
from_email=settings.EMAIL_HOST_USER,
to=[identifier],
)
email.content_subtype = 'html' # Important for HTML emails
email.send(fail_silently=False)
except Exception as e:
print(f"❌ Failed to send OTP email to {identifier}: {e}")
return False, 'email'
return True, method
def verify_otp(identifier, user_input_otp, purpose='login'):
"""
Verifies the OTP entered by the user against the cache.
"""
cache_key = f'otp_{purpose}_{identifier}'
cached_otp = cache.get(cache_key)
if cached_otp and cached_otp == user_input_otp:
cache.delete(cache_key)
return True
return False