# Django imports from django.contrib.auth.decorators import user_passes_test from django.contrib.auth import get_user_model from django.contrib.sessions.models import Session from django.core.cache import cache from django.core.paginator import Paginator, EmptyPage from django.shortcuts import render from django.http import HttpResponseRedirect, JsonResponse from django.urls import reverse from django.views.decorators.http import require_POST from django.views.decorators.csrf import csrf_exempt from django.db.models import F from django.utils import timezone from django.db.models import Count # Standard library imports import logging import ipaddress import uuid import json import re from datetime import datetime, timedelta from collections import Counter, defaultdict # Third-party imports import requests # Local app imports from at_django_boilerplate.user_activity.models import ( PageVisit, SessionConsentModel,DailyUserStats,ClickEvent ) from at_django_boilerplate.utils.geolocation import get_city_and_country_from_ip logger = logging.getLogger(__name__) User = get_user_model() LIVE_WINDOW_SECONDS = 40 @csrf_exempt @require_POST def user_consent_update(request): try: payload = json.loads(request.body) consent_id = int(payload.get("consent_id")) except (ValueError, json.JSONDecodeError): return JsonResponse( {"error": "Invalid JSON payload"}, status=400 ) session_id = request.session.session_key if not session_id: request.session.create() session_id = request.session.session_key consent, _ = SessionConsentModel.objects.get_or_create( session_id=session_id ) if consent_id == 1: consent.accept_all_consent_received = True consent.essential_only_consent_received = False elif consent_id == 2: consent.essential_only_consent_received = True consent.accept_all_consent_received = False else: return JsonResponse( {"error": "Invalid consent_id"}, status=400 ) consent.save() return JsonResponse({ "success": True, "consent_received": consent.consent_received }) # IP & Geolocation Utilities (unchanged) def is_private_ip(ip_address): try: ip = ipaddress.ip_address(ip_address) return ip.is_private except ValueError: return False def get_client_ip(request): x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR') if x_forwarded_for: ip = x_forwarded_for.split(',')[0].strip() else: ip = request.META.get('REMOTE_ADDR') if not ip or is_private_ip(ip): return get_public_ip() # fallback for localhost/dev return ip def get_public_ip(): cached_ip = cache.get('public_ip') if cached_ip: return cached_ip try: response = requests.get('https://api.ipify.org?format=json', timeout=5) response.raise_for_status() ip = response.json().get('ip') if ip: cache.set('public_ip', ip, timeout=3600) return ip except requests.RequestException as e: logger.error(f"Failed to fetch public IP: {e}") return 'Unknown' def _format_location(data): city = data.get('city', '') region = data.get('regionName', '') country = data.get('country', '') parts = [part for part in [city, region, country] if part] return ', '.join(parts) if parts else 'Unknown' GEO_TTL = 60 * 60 * 24 # 24 hours def get_location_from_ip(ip_address): cache_key = f'geo_{ip_address}' cached = cache.get(cache_key) if cached: return cached if not ip_address or is_private_ip(ip_address): cache.set(cache_key, 'Local Network', 86400) return 'Local Network' try: response = requests.get( f'https://ip-api.com/json/{ip_address}', params={'fields': 'status,message,country,regionName,city'}, timeout=2 ) response.raise_for_status() data = response.json() if data.get('status') == 'success': location = _format_location(data) return location except requests.RequestException as e: logger.error(f"Geo lookup failed for {ip_address}: {e}") cache.set(cache_key, 'Unknown', 86400) return 'Unknown' # Utility Functions (updated for session data changes) def is_admin(user): return user.is_authenticated and user.is_staff and user.is_superuser def _is_valid_session_key(session_key): try: uuid.UUID(session_key) return True except ValueError: return False def _parse_login_time(timestamp): if not timestamp: return None try: return timezone.datetime.fromisoformat(timestamp) except (ValueError, TypeError): logger.warning(f"Invalid timestamp: {timestamp}") return None # Financial Year Utilities (unchanged) def get_fy_dates(fy): try: start_year = int(fy.split('-')[0]) end_year = int(fy.split('-')[1]) + 2000 start_date = timezone.make_aware(datetime(start_year, 4, 1)) end_date = timezone.make_aware(datetime(end_year, 3, 31, 23, 59, 59)) return start_date, end_date except (ValueError, IndexError): logger.error(f"Invalid FY format: {fy}") today = timezone.now() start_year = today.year if today.month >= 4 else today.year - 1 start_date = timezone.make_aware(datetime(start_year, 4, 1)) end_date = timezone.make_aware(datetime(start_year + 1, 3, 31, 23, 59, 59)) return start_date, end_date def get_fy_choices(): today = timezone.now() current_year = today.year if today.month >= 4 else today.year - 1 fy_choices = [f"{year}-{str(year + 1)[2:]}" for year in range(2020, current_year + 1)] return fy_choices # Device Detection (unchanged) def detect_device(user_agent): if not user_agent: return 'Unknown' user_agent = user_agent.lower() if 'mobile' in user_agent and 'tablet' not in user_agent: return 'Mobile' elif 'tablet' in user_agent or 'ipad' in user_agent: return 'Tablet' elif 'windows' in user_agent or 'macintosh' in user_agent or 'linux' in user_agent: return 'Desktop' else: return 'Other' # Path Filtering (unchanged) def is_excluded_path(path): if not path: return True admin_patterns = [ r'^/admin/', r'^/admin_session/(?!user-activity/).*$', # Exclude /admin_session/ paths except /admin_session/user-activity/ ] for pattern in admin_patterns: if re.match(pattern, path): return True media_patterns = [ r'^/media/', r'^/static/', r'\.(jpg|jpeg|png|gif|svg|css|js|woff|woff2|ttf|eot|ico|mp4|webm|ogg|mp3|wav)$', ] for pattern in media_patterns: if re.search(pattern, path, re.IGNORECASE): return True return False # Analytics Functions (unchanged) def get_location_analytics(visits): if not visits: logger.debug("No visits for location analytics") return {'labels': [], 'counts': [], 'percentages': [], 'data': []} location_counts = Counter(visit['location'] for visit in visits if visit['location']) total_visits = sum(location_counts.values()) if total_visits == 0: logger.debug("No valid locations found") return {'labels': [], 'counts': [], 'percentages': [], 'data': []} top_locations = location_counts.most_common(10) labels = [location for location, _ in top_locations] counts = [count for _, count in top_locations] percentages = [f"{(count / total_visits * 100):.2f}" for count in counts] data = list(zip(labels, counts, percentages)) logger.debug(f"Location Analytics: labels={labels}, counts={counts}, percentages={percentages}") return { 'labels': labels, 'counts': counts, 'percentages': percentages, 'data': data } def get_paths_by_location(visits): if not visits: logger.debug("No visits for path analytics") return {'locations': []} location_paths = defaultdict(Counter) for visit in visits: location = visit['location'] path = visit['path'] if location and path and not is_excluded_path(path): location_paths[location][path] += 1 locations_data = [] for location, path_counts in location_paths.items(): total_visits = sum(path_counts.values()) if total_visits == 0: continue top_paths = path_counts.most_common(5) labels = [path for path, _ in top_paths] counts = [count for _, count in top_paths] percentages = [f"{(count / total_visits * 100):.2f}" for count in counts] data = list(zip(labels, counts, percentages)) locations_data.append({ 'location': location, 'labels': labels, 'counts': counts, 'percentages': percentages, 'data': data }) locations_data.sort(key=lambda x: sum(x['counts']), reverse=True) locations_data = locations_data[:10] logger.debug(f"Paths by Location: {locations_data}") return {'locations': locations_data} def get_devices_by_location(visits): if not visits: logger.debug("No visits for device analytics") return {'locations': []} location_devices = defaultdict(Counter) for visit in visits: location = visit['location'] device = detect_device(visit['user_agent']) if location: location_devices[location][device] += 1 locations_data = [] for location, device_counts in location_devices.items(): total_visits = sum(device_counts.values()) labels = list(device_counts.keys()) counts = list(device_counts.values()) percentages = [f"{(count / total_visits * 100):.2f}" for count in counts] data = list(zip(labels, counts, percentages)) locations_data.append({ 'location': location, 'labels': labels, 'counts': counts, 'percentages': percentages, 'data': data }) locations_data.sort(key=lambda x: sum(x['counts']), reverse=True) locations_data = locations_data[:10] logger.debug(f"Devices by Location: {locations_data}") return {'locations': locations_data} @user_passes_test(is_admin) def user_activity(request): """ Public Website Analytics Dashboard - Shows only anonymous (guest) user visits to public pages - Supports custom date range (?from=YYYY-MM-DD&to=YYYY-MM-DD) - Falls back to Financial Year filter """ ip_address = get_client_ip(request) tmp = get_city_and_country_from_ip(ip=ip_address) current_location = tmp.get('city', 'Unknown') # === Custom Date Range Filter === date_from_str = request.GET.get('from') date_to_str = request.GET.get('to') custom_date_range = False start_date = end_date = None if date_from_str and date_to_str: try: start_date = timezone.make_aware(datetime.strptime(date_from_str, '%Y-%m-%d')) end_date = timezone.make_aware( datetime.strptime(date_to_str, '%Y-%m-%d') + timedelta(days=1) - timedelta(seconds=1) ) if start_date <= end_date: custom_date_range = True else: # Swap if user entered backwards start_date, end_date = end_date, start_date custom_date_range = True except ValueError: pass # Invalid dates → fall back to FY # === Financial Year Fallback === fy_choices = get_fy_choices() selected_fy = request.GET.get('fy', fy_choices[-1] if fy_choices else '2024-25') if not custom_date_range: start_date, end_date = get_fy_dates(selected_fy) display_period = ( f"{start_date.strftime('%b %d, %Y')} – {end_date.strftime('%b %d, %Y')}" if custom_date_range else f"FY {selected_fy}" ) # === Fetch Public Visits (anonymous only) === public_visits_qs = PageVisit.objects.filter( timestamp__range=(start_date, end_date), user__isnull=True, ).order_by('-timestamp') # === New vs Returning Users === new_users_count = PageVisit.objects.filter( timestamp__range=(start_date, end_date), is_new_user=True ).values('session_id').distinct().count() returning_users_count = PageVisit.objects.filter( timestamp__range=(start_date, end_date), is_new_user=False ).values('session_id').distinct().count() # Build visit data (no path exclusion needed — handled in middleware) visit_data = [] for visit in public_visits_qs: # Fix legacy missing locations if not visit.location or visit.location in ['', 'Unknown']: geo = get_city_and_country_from_ip(ip=visit.ip_address) visit.location = geo.get('city', 'Unknown') visit.save(update_fields=['location']) visit_data.append({ 'path': visit.path, 'location': visit.location or 'Unknown', 'user_agent': visit.user_agent or 'Unknown', }) # Analytics location_analytics = get_location_analytics(visit_data) paths_by_location = get_paths_by_location(visit_data) devices_by_location = get_devices_by_location(visit_data) # Top 10 most visited pages path_counter = Counter(v['path'] for v in visit_data) top_paths = path_counter.most_common(10) top_parts = [{'name': path, 'visits': count} for path, count in top_paths] live_visitors_count = get_live_visitors() context = { 'current_ip': ip_address, 'current_location': current_location, 'total_visits': len(visit_data), 'location_analytics': location_analytics, 'paths_by_location': paths_by_location, 'devices_by_location': devices_by_location, 'top_parts': top_parts, 'fy_choices': fy_choices, 'selected_fy': selected_fy, 'display_period': display_period, 'date_from': date_from_str or '', 'date_to': date_to_str or '', 'is_custom_range': custom_date_range, 'new_users_count': new_users_count, 'returning_users_count': returning_users_count, 'live_visitors_count': live_visitors_count, } return render(request, 'active_sessions.html', context) def track_page_visit(request): user = request.user if request.user.is_authenticated else None session_id = request.session.session_key or request.session.create() referrer = request.META.get('HTTP_REFERER', '') # Check if user already visited in this session last_visit = PageVisit.objects.filter(session_id=session_id).order_by('-timestamp').first() visit_count = 1 if last_visit: visit_count = last_visit.visit_count + 1 PageVisit.objects.create( user=user, session_id=session_id, url=request.build_absolute_uri(), path=request.path, ip_address=get_client_ip(request), user_agent=request.META.get('HTTP_USER_AGENT', ''), session_data=dict(request.session), referrer=referrer, visit_count=visit_count ) def get_client_ip(request): """Extracts IP address safely""" x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR') if x_forwarded_for: ip = x_forwarded_for.split(',')[0] else: ip = request.META.get('REMOTE_ADDR') return ip def daily_user_report(): today = timezone.now().date() start = timezone.make_aware(datetime.combine(today, datetime.min.time())) end = timezone.make_aware(datetime.combine(today, datetime.max.time())) new_users = PageVisit.objects.filter( is_new_user=True, timestamp__range=(start, end) ).count() returning_users = PageVisit.objects.filter( is_new_user=False, timestamp__range=(start, end) ).count() DailyUserStats.objects.update_or_create( date=today, defaults={ 'new_users': new_users, 'returning_users': returning_users } ) def get_live_visitors(): window_start = timezone.now() - timedelta(seconds=LIVE_WINDOW_SECONDS) return ( PageVisit.objects .filter(timestamp__gte=window_start) .values('session_id') .distinct() .count() ) @csrf_exempt @require_POST def track_click(request): data = json.loads(request.body) ClickEvent.objects.create( session_id=data["session_id"], page_path=data["path"], element_text=data["text"], element_type=data["type"] ) return ''