274 lines
9.9 KiB
Python
Executable File
274 lines
9.9 KiB
Python
Executable File
|
|
from rest_framework.views import APIView
|
|
from rest_framework.response import Response
|
|
from rest_framework import status
|
|
from rest_framework.authtoken.models import Token
|
|
from django.contrib.auth import authenticate
|
|
from rest_framework.permissions import AllowAny, IsAuthenticated
|
|
from rest_framework.authentication import TokenAuthentication
|
|
|
|
from django.utils.http import urlsafe_base64_encode, urlsafe_base64_decode
|
|
|
|
from django.contrib.auth.tokens import default_token_generator
|
|
|
|
from rest_framework.views import APIView
|
|
from rest_framework.response import Response
|
|
from rest_framework import status
|
|
from rest_framework.authtoken.models import Token
|
|
from django.contrib.auth import authenticate
|
|
from rest_framework.permissions import AllowAny, IsAuthenticated
|
|
from rest_framework.authentication import TokenAuthentication
|
|
from django.views.decorators.csrf import csrf_exempt
|
|
|
|
from django.core.mail import send_mail
|
|
from django.template.loader import render_to_string
|
|
from django.utils.html import strip_tags
|
|
from django.conf import settings
|
|
from django.contrib.auth import password_validation
|
|
from django.core.exceptions import ValidationError
|
|
|
|
from at_django_boilerplate.accounts.models import CustomUser
|
|
|
|
|
|
import logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class LoginApiView(APIView):
|
|
permission_classes = [AllowAny] # This makes the endpoint open to all users
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
# Get the username and password from the request data
|
|
username = request.data.get('username')
|
|
password = request.data.get('password')
|
|
|
|
if not username or not password:
|
|
return Response({"message": "Both username and password are required."}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
# Authenticate the user
|
|
user = authenticate(username=username, password=password)
|
|
if user is not None:
|
|
# User is authenticated, generate token
|
|
token, created = Token.objects.get_or_create(user=user)
|
|
# You can retrieve the user's role or permission-based data here
|
|
if user.is_superuser:
|
|
role = "admin"
|
|
elif user.get_manager():
|
|
role = "manager"
|
|
elif user.get_technician():
|
|
role = "technician"
|
|
else:
|
|
role = "customer"
|
|
|
|
return Response({
|
|
"token": token.key,
|
|
"role": role,
|
|
"userId": user.id,
|
|
"username": f"{user.first_name} {user.last_name}",
|
|
"profile_photo": user.profile_photo.url if user.profile_photo.url else None,
|
|
"message": "Login successful"
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
# Invalid credentials
|
|
return Response({"message": "Invalid username or password."}, status=status.HTTP_401_UNAUTHORIZED)
|
|
|
|
|
|
class ValidatePasswordAPIView(APIView):
|
|
permission_classes = [IsAuthenticated]
|
|
authentication_classes = [TokenAuthentication]
|
|
def post(self, request):
|
|
password = request.data.get('password')
|
|
if not password:
|
|
return Response({"success": False, "message": "Password is required."}, status=400)
|
|
user = request.user
|
|
if user.check_password(password):
|
|
return Response({"success": True, "message": "Password is valid."}, status=200)
|
|
return Response({"success": False, "message": "Password is incorrect."}, status=400)
|
|
|
|
|
|
class ResetPasswordAPIView(APIView):
|
|
def post(self, request):
|
|
# Get the email from the request data
|
|
email = request.data.get('email')
|
|
|
|
# Ensure the email is provided
|
|
if not email:
|
|
return Response({"success": False, "message": "Email address is required."}, status=400)
|
|
|
|
try:
|
|
# Check if the user exists in the database
|
|
user = CustomUser.objects.get(email=email)
|
|
except CustomUser.DoesNotExist:
|
|
return Response({"success": False, "message": "User with this email does not exist."}, status=404)
|
|
|
|
# Send the password reset email
|
|
if self.send_reset_email(request, user, email):
|
|
return Response({"success": True, "message": "Please check your email to reset your password."}, status=200)
|
|
else:
|
|
return Response({"success": False, "message": "Failed to send email. Please try again later."}, status=500)
|
|
|
|
|
|
def send_reset_email(self, request, user, email):
|
|
# Generate a password reset token
|
|
token = default_token_generator.make_token(user)
|
|
|
|
# Convert user.pk (integer) to string and then encode it to base64
|
|
uid = urlsafe_base64_encode(str(user.pk).encode('utf-8')) # Encode user ID to base64 string
|
|
|
|
# Prepare the context for the email template
|
|
context = {
|
|
'user': user,
|
|
'reset_link': f"http://localhost:3000/reset-password/{uid}/{token}/" # Update the URL if needed (to your front-end)
|
|
}
|
|
|
|
# Render the HTML email template with the context
|
|
html_message = render_to_string('email_template.html', context)
|
|
|
|
# Subject of the email
|
|
subject = "Password Reset Request"
|
|
|
|
# Strip HTML tags for the plain-text version of the email
|
|
message = strip_tags(html_message)
|
|
|
|
# From email and recipient list
|
|
from_email = settings.DEFAULT_FROM_EMAIL
|
|
recipient_list = [email]
|
|
|
|
try:
|
|
# Send the email
|
|
send_mail(subject, message, from_email, recipient_list, html_message=html_message)
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error sending email: {str(e)}")
|
|
return False
|
|
|
|
class ResetPasswordConfirmAPIView(APIView):
|
|
def get(self, request, uidb64, token):
|
|
try:
|
|
# Decode the user ID from base64
|
|
uid = urlsafe_base64_decode(uidb64).decode('utf-8')
|
|
user = CustomUser.objects.get(pk=uid)
|
|
|
|
# Check if the token is valid
|
|
if default_token_generator.check_token(user, token):
|
|
return Response(
|
|
{"success": True, "message": "Token is valid."},
|
|
status=status.HTTP_200_OK
|
|
)
|
|
else:
|
|
return Response(
|
|
{"success": False, "message": "Invalid or expired token."},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
except (TypeError, ValueError, OverflowError, CustomUser.DoesNotExist):
|
|
return Response(
|
|
{"success": False, "message": "Invalid or expired token."},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
def post(self, request, uidb64, token):
|
|
password = request.data.get('password')
|
|
|
|
if not password:
|
|
return Response(
|
|
{"success": False, "message": "Password is required."},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
try:
|
|
# Decode the user ID from base64
|
|
uid = urlsafe_base64_decode(uidb64).decode('utf-8')
|
|
user = CustomUser.objects.get(pk=uid)
|
|
except (TypeError, ValueError, OverflowError, CustomUser.DoesNotExist):
|
|
return Response(
|
|
{"success": False, "message": "Invalid user ID."},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
try:
|
|
password_validation.validate_password(password=password)
|
|
except ValidationError as e:
|
|
return Response(
|
|
{"success": False, "message": str(e)},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
# Validate the token
|
|
if default_token_generator.check_token(user, token):
|
|
# Set and save the new password
|
|
user.set_password(password)
|
|
user.save()
|
|
return Response(
|
|
{"success": True, "message": "Password has been reset successfully."},
|
|
status=status.HTTP_200_OK
|
|
)
|
|
else:
|
|
return Response(
|
|
{"success": False, "message": "Invalid or expired token."},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
|
|
class UserProfileView(APIView):
|
|
permission_classes = [IsAuthenticated]
|
|
authentication_classes = [TokenAuthentication]
|
|
|
|
def get(self, request):
|
|
user = request.user
|
|
|
|
if user.is_superuser:
|
|
print('Is Superuser')
|
|
users = CustomUser.objects.all()
|
|
data = [
|
|
{
|
|
'id': u.id,
|
|
'email': u.email,
|
|
'first_name': u.first_name,
|
|
'last_name': u.last_name,
|
|
}
|
|
for u in users
|
|
]
|
|
|
|
elif user.is_manager: # Manager user only sees their own utility's data
|
|
utility = user.get_utility() # Get Utility
|
|
users = utility.accounts_in_utility.all()
|
|
|
|
data = [{
|
|
'id': user.id,
|
|
'email': user.email,
|
|
'first_name': user.first_name,
|
|
'last_name': user.last_name,
|
|
'utility': str(utility)
|
|
}
|
|
for u in users
|
|
]
|
|
|
|
elif user.is_technician:
|
|
utility = user.get_utility() # Get Utility
|
|
users = utility.accounts_in_utility.all()
|
|
|
|
data = [{
|
|
'id': u.id,
|
|
'email': u.email,
|
|
'first_name': u.first_name,
|
|
'last_name': u.last_name,
|
|
'utility': str(utility)
|
|
}
|
|
for u in users if not u.is_manager and not u.is_superuser
|
|
]
|
|
|
|
elif user.is_customer:
|
|
utility = user.get_utility() # Get Utility
|
|
data = [{
|
|
'id': user.id,
|
|
'email': user.email,
|
|
'first_name': user.first_name,
|
|
'last_name': user.last_name,
|
|
'utility': str(utility)
|
|
}]
|
|
|
|
else:
|
|
data = None
|
|
|
|
return Response(data, status=status.HTTP_200_OK)
|