Files
2026-01-07 12:09:20 +05:30

518 lines
16 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Django imports
from django.contrib.auth.decorators import user_passes_test
from django.contrib.auth import get_user_model
from django.contrib.sessions.models import Session
from django.core.cache import cache
from django.core.paginator import Paginator, EmptyPage
from django.shortcuts import render
from django.http import HttpResponseRedirect, JsonResponse
from django.urls import reverse
from django.views.decorators.http import require_POST
from django.views.decorators.csrf import csrf_exempt
from django.db.models import F
from django.utils import timezone
from django.db.models import Count
# Standard library imports
import logging
import ipaddress
import uuid
import json
import re
from datetime import datetime, timedelta
from collections import Counter, defaultdict
# Third-party imports
import requests
# Local app imports
from at_django_boilerplate.user_activity.models import (
PageVisit,
SessionConsentModel,DailyUserStats,ClickEvent
)
from at_django_boilerplate.utils.geolocation import get_city_and_country_from_ip
logger = logging.getLogger(__name__)
User = get_user_model()
LIVE_WINDOW_SECONDS = 40
@csrf_exempt
@require_POST
def user_consent_update(request):
try:
payload = json.loads(request.body)
consent_id = int(payload.get("consent_id"))
except (ValueError, json.JSONDecodeError):
return JsonResponse(
{"error": "Invalid JSON payload"},
status=400
)
session_id = request.session.session_key
if not session_id:
request.session.create()
session_id = request.session.session_key
consent, _ = SessionConsentModel.objects.get_or_create(
session_id=session_id
)
if consent_id == 1:
consent.accept_all_consent_received = True
consent.essential_only_consent_received = False
elif consent_id == 2:
consent.essential_only_consent_received = True
consent.accept_all_consent_received = False
else:
return JsonResponse(
{"error": "Invalid consent_id"},
status=400
)
consent.save()
return JsonResponse({
"success": True,
"consent_received": consent.consent_received
})
# IP & Geolocation Utilities (unchanged)
def is_private_ip(ip_address):
try:
ip = ipaddress.ip_address(ip_address)
return ip.is_private
except ValueError:
return False
def get_client_ip(request):
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0].strip()
else:
ip = request.META.get('REMOTE_ADDR')
if not ip or is_private_ip(ip):
return get_public_ip() # fallback for localhost/dev
return ip
def get_public_ip():
cached_ip = cache.get('public_ip')
if cached_ip:
return cached_ip
try:
response = requests.get('https://api.ipify.org?format=json', timeout=5)
response.raise_for_status()
ip = response.json().get('ip')
if ip:
cache.set('public_ip', ip, timeout=3600)
return ip
except requests.RequestException as e:
logger.error(f"Failed to fetch public IP: {e}")
return 'Unknown'
def _format_location(data):
city = data.get('city', '')
region = data.get('regionName', '')
country = data.get('country', '')
parts = [part for part in [city, region, country] if part]
return ', '.join(parts) if parts else 'Unknown'
GEO_TTL = 60 * 60 * 24 # 24 hours
def get_location_from_ip(ip_address):
cache_key = f'geo_{ip_address}'
cached = cache.get(cache_key)
if cached:
return cached
if not ip_address or is_private_ip(ip_address):
cache.set(cache_key, 'Local Network', 86400)
return 'Local Network'
try:
response = requests.get(
f'https://ip-api.com/json/{ip_address}',
params={'fields': 'status,message,country,regionName,city'},
timeout=2
)
response.raise_for_status()
data = response.json()
if data.get('status') == 'success':
location = _format_location(data)
return location
except requests.RequestException as e:
logger.error(f"Geo lookup failed for {ip_address}: {e}")
cache.set(cache_key, 'Unknown', 86400)
return 'Unknown'
# Utility Functions (updated for session data changes)
def is_admin(user):
return user.is_authenticated and user.is_staff and user.is_superuser
def _is_valid_session_key(session_key):
try:
uuid.UUID(session_key)
return True
except ValueError:
return False
def _parse_login_time(timestamp):
if not timestamp:
return None
try:
return timezone.datetime.fromisoformat(timestamp)
except (ValueError, TypeError):
logger.warning(f"Invalid timestamp: {timestamp}")
return None
# Financial Year Utilities (unchanged)
def get_fy_dates(fy):
try:
start_year = int(fy.split('-')[0])
end_year = int(fy.split('-')[1]) + 2000
start_date = timezone.make_aware(datetime(start_year, 4, 1))
end_date = timezone.make_aware(datetime(end_year, 3, 31, 23, 59, 59))
return start_date, end_date
except (ValueError, IndexError):
logger.error(f"Invalid FY format: {fy}")
today = timezone.now()
start_year = today.year if today.month >= 4 else today.year - 1
start_date = timezone.make_aware(datetime(start_year, 4, 1))
end_date = timezone.make_aware(datetime(start_year + 1, 3, 31, 23, 59, 59))
return start_date, end_date
def get_fy_choices():
today = timezone.now()
current_year = today.year if today.month >= 4 else today.year - 1
fy_choices = [f"{year}-{str(year + 1)[2:]}" for year in range(2020, current_year + 1)]
return fy_choices
# Device Detection (unchanged)
def detect_device(user_agent):
if not user_agent:
return 'Unknown'
user_agent = user_agent.lower()
if 'mobile' in user_agent and 'tablet' not in user_agent:
return 'Mobile'
elif 'tablet' in user_agent or 'ipad' in user_agent:
return 'Tablet'
elif 'windows' in user_agent or 'macintosh' in user_agent or 'linux' in user_agent:
return 'Desktop'
else:
return 'Other'
# Path Filtering (unchanged)
def is_excluded_path(path):
if not path:
return True
admin_patterns = [
r'^/admin/',
r'^/admin_session/(?!user-activity/).*$', # Exclude /admin_session/ paths except /admin_session/user-activity/
]
for pattern in admin_patterns:
if re.match(pattern, path):
return True
media_patterns = [
r'^/media/',
r'^/static/',
r'\.(jpg|jpeg|png|gif|svg|css|js|woff|woff2|ttf|eot|ico|mp4|webm|ogg|mp3|wav)$',
]
for pattern in media_patterns:
if re.search(pattern, path, re.IGNORECASE):
return True
return False
# Analytics Functions (unchanged)
def get_location_analytics(visits):
if not visits:
logger.debug("No visits for location analytics")
return {'labels': [], 'counts': [], 'percentages': [], 'data': []}
location_counts = Counter(visit['location'] for visit in visits if visit['location'])
total_visits = sum(location_counts.values())
if total_visits == 0:
logger.debug("No valid locations found")
return {'labels': [], 'counts': [], 'percentages': [], 'data': []}
top_locations = location_counts.most_common(10)
labels = [location for location, _ in top_locations]
counts = [count for _, count in top_locations]
percentages = [f"{(count / total_visits * 100):.2f}" for count in counts]
data = list(zip(labels, counts, percentages))
logger.debug(f"Location Analytics: labels={labels}, counts={counts}, percentages={percentages}")
return {
'labels': labels,
'counts': counts,
'percentages': percentages,
'data': data
}
def get_paths_by_location(visits):
if not visits:
logger.debug("No visits for path analytics")
return {'locations': []}
location_paths = defaultdict(Counter)
for visit in visits:
location = visit['location']
path = visit['path']
if location and path and not is_excluded_path(path):
location_paths[location][path] += 1
locations_data = []
for location, path_counts in location_paths.items():
total_visits = sum(path_counts.values())
if total_visits == 0:
continue
top_paths = path_counts.most_common(5)
labels = [path for path, _ in top_paths]
counts = [count for _, count in top_paths]
percentages = [f"{(count / total_visits * 100):.2f}" for count in counts]
data = list(zip(labels, counts, percentages))
locations_data.append({
'location': location,
'labels': labels,
'counts': counts,
'percentages': percentages,
'data': data
})
locations_data.sort(key=lambda x: sum(x['counts']), reverse=True)
locations_data = locations_data[:10]
logger.debug(f"Paths by Location: {locations_data}")
return {'locations': locations_data}
def get_devices_by_location(visits):
if not visits:
logger.debug("No visits for device analytics")
return {'locations': []}
location_devices = defaultdict(Counter)
for visit in visits:
location = visit['location']
device = detect_device(visit['user_agent'])
if location:
location_devices[location][device] += 1
locations_data = []
for location, device_counts in location_devices.items():
total_visits = sum(device_counts.values())
labels = list(device_counts.keys())
counts = list(device_counts.values())
percentages = [f"{(count / total_visits * 100):.2f}" for count in counts]
data = list(zip(labels, counts, percentages))
locations_data.append({
'location': location,
'labels': labels,
'counts': counts,
'percentages': percentages,
'data': data
})
locations_data.sort(key=lambda x: sum(x['counts']), reverse=True)
locations_data = locations_data[:10]
logger.debug(f"Devices by Location: {locations_data}")
return {'locations': locations_data}
@user_passes_test(is_admin)
def user_activity(request):
"""
Public Website Analytics Dashboard
- Shows only anonymous (guest) user visits to public pages
- Supports custom date range (?from=YYYY-MM-DD&to=YYYY-MM-DD)
- Falls back to Financial Year filter
"""
ip_address = get_client_ip(request)
tmp = get_city_and_country_from_ip(ip=ip_address)
current_location = tmp.get('city', 'Unknown')
# === Custom Date Range Filter ===
date_from_str = request.GET.get('from')
date_to_str = request.GET.get('to')
custom_date_range = False
start_date = end_date = None
if date_from_str and date_to_str:
try:
start_date = timezone.make_aware(datetime.strptime(date_from_str, '%Y-%m-%d'))
end_date = timezone.make_aware(
datetime.strptime(date_to_str, '%Y-%m-%d') + timedelta(days=1) - timedelta(seconds=1)
)
if start_date <= end_date:
custom_date_range = True
else:
# Swap if user entered backwards
start_date, end_date = end_date, start_date
custom_date_range = True
except ValueError:
pass # Invalid dates → fall back to FY
# === Financial Year Fallback ===
fy_choices = get_fy_choices()
selected_fy = request.GET.get('fy', fy_choices[-1] if fy_choices else '2024-25')
if not custom_date_range:
start_date, end_date = get_fy_dates(selected_fy)
display_period = (
f"{start_date.strftime('%b %d, %Y')} {end_date.strftime('%b %d, %Y')}"
if custom_date_range else
f"FY {selected_fy}"
)
# === Fetch Public Visits (anonymous only) ===
public_visits_qs = PageVisit.objects.filter(
timestamp__range=(start_date, end_date),
user__isnull=True,
).order_by('-timestamp')
# === New vs Returning Users ===
new_users_count = PageVisit.objects.filter(
timestamp__range=(start_date, end_date),
is_new_user=True
).values('session_id').distinct().count()
returning_users_count = PageVisit.objects.filter(
timestamp__range=(start_date, end_date),
is_new_user=False
).values('session_id').distinct().count()
# Build visit data (no path exclusion needed — handled in middleware)
visit_data = []
for visit in public_visits_qs:
# Fix legacy missing locations
if not visit.location or visit.location in ['', 'Unknown']:
geo = get_city_and_country_from_ip(ip=visit.ip_address)
visit.location = geo.get('city', 'Unknown')
visit.save(update_fields=['location'])
visit_data.append({
'path': visit.path,
'location': visit.location or 'Unknown',
'user_agent': visit.user_agent or 'Unknown',
})
# Analytics
location_analytics = get_location_analytics(visit_data)
paths_by_location = get_paths_by_location(visit_data)
devices_by_location = get_devices_by_location(visit_data)
# Top 10 most visited pages
path_counter = Counter(v['path'] for v in visit_data)
top_paths = path_counter.most_common(10)
top_parts = [{'name': path, 'visits': count} for path, count in top_paths]
live_visitors_count = get_live_visitors()
context = {
'current_ip': ip_address,
'current_location': current_location,
'total_visits': len(visit_data),
'location_analytics': location_analytics,
'paths_by_location': paths_by_location,
'devices_by_location': devices_by_location,
'top_parts': top_parts,
'fy_choices': fy_choices,
'selected_fy': selected_fy,
'display_period': display_period,
'date_from': date_from_str or '',
'date_to': date_to_str or '',
'is_custom_range': custom_date_range,
'new_users_count': new_users_count,
'returning_users_count': returning_users_count,
'live_visitors_count': live_visitors_count,
}
return render(request, 'active_sessions.html', context)
def track_page_visit(request):
user = request.user if request.user.is_authenticated else None
session_id = request.session.session_key or request.session.create()
referrer = request.META.get('HTTP_REFERER', '')
# Check if user already visited in this session
last_visit = PageVisit.objects.filter(session_id=session_id).order_by('-timestamp').first()
visit_count = 1
if last_visit:
visit_count = last_visit.visit_count + 1
PageVisit.objects.create(
user=user,
session_id=session_id,
url=request.build_absolute_uri(),
path=request.path,
ip_address=get_client_ip(request),
user_agent=request.META.get('HTTP_USER_AGENT', ''),
session_data=dict(request.session),
referrer=referrer,
visit_count=visit_count
)
def get_client_ip(request):
"""Extracts IP address safely"""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
def daily_user_report():
today = timezone.now().date()
start = timezone.make_aware(datetime.combine(today, datetime.min.time()))
end = timezone.make_aware(datetime.combine(today, datetime.max.time()))
new_users = PageVisit.objects.filter(
is_new_user=True,
timestamp__range=(start, end)
).count()
returning_users = PageVisit.objects.filter(
is_new_user=False,
timestamp__range=(start, end)
).count()
DailyUserStats.objects.update_or_create(
date=today,
defaults={
'new_users': new_users,
'returning_users': returning_users
}
)
def get_live_visitors():
window_start = timezone.now() - timedelta(seconds=LIVE_WINDOW_SECONDS)
return (
PageVisit.objects
.filter(timestamp__gte=window_start)
.values('session_id')
.distinct()
.count()
)
@csrf_exempt
@require_POST
def track_click(request):
data = json.loads(request.body)
ClickEvent.objects.create(
session_id=data["session_id"],
page_path=data["path"],
element_text=data["text"],
element_type=data["type"]
)
return ''