518 lines
16 KiB
Python
Executable File
518 lines
16 KiB
Python
Executable File
# Django imports
|
||
from django.contrib.auth.decorators import user_passes_test
|
||
from django.contrib.auth import get_user_model
|
||
from django.contrib.sessions.models import Session
|
||
from django.core.cache import cache
|
||
from django.core.paginator import Paginator, EmptyPage
|
||
from django.shortcuts import render
|
||
from django.http import HttpResponseRedirect, JsonResponse
|
||
from django.urls import reverse
|
||
from django.views.decorators.http import require_POST
|
||
from django.views.decorators.csrf import csrf_exempt
|
||
from django.db.models import F
|
||
from django.utils import timezone
|
||
from django.db.models import Count
|
||
|
||
# Standard library imports
|
||
import logging
|
||
import ipaddress
|
||
import uuid
|
||
import json
|
||
import re
|
||
from datetime import datetime, timedelta
|
||
from collections import Counter, defaultdict
|
||
|
||
# Third-party imports
|
||
import requests
|
||
|
||
# Local app imports
|
||
from at_django_boilerplate.user_activity.models import (
|
||
PageVisit,
|
||
SessionConsentModel,DailyUserStats,ClickEvent
|
||
)
|
||
from at_django_boilerplate.utils.geolocation import get_city_and_country_from_ip
|
||
|
||
logger = logging.getLogger(__name__)
|
||
User = get_user_model()
|
||
LIVE_WINDOW_SECONDS = 40
|
||
|
||
|
||
|
||
|
||
@csrf_exempt
|
||
@require_POST
|
||
def user_consent_update(request):
|
||
try:
|
||
payload = json.loads(request.body)
|
||
consent_id = int(payload.get("consent_id"))
|
||
except (ValueError, json.JSONDecodeError):
|
||
return JsonResponse(
|
||
{"error": "Invalid JSON payload"},
|
||
status=400
|
||
)
|
||
|
||
session_id = request.session.session_key
|
||
if not session_id:
|
||
request.session.create()
|
||
session_id = request.session.session_key
|
||
|
||
consent, _ = SessionConsentModel.objects.get_or_create(
|
||
session_id=session_id
|
||
)
|
||
|
||
if consent_id == 1:
|
||
consent.accept_all_consent_received = True
|
||
consent.essential_only_consent_received = False
|
||
elif consent_id == 2:
|
||
consent.essential_only_consent_received = True
|
||
consent.accept_all_consent_received = False
|
||
else:
|
||
return JsonResponse(
|
||
{"error": "Invalid consent_id"},
|
||
status=400
|
||
)
|
||
|
||
consent.save()
|
||
|
||
return JsonResponse({
|
||
"success": True,
|
||
"consent_received": consent.consent_received
|
||
})
|
||
|
||
|
||
# IP & Geolocation Utilities (unchanged)
|
||
def is_private_ip(ip_address):
|
||
try:
|
||
ip = ipaddress.ip_address(ip_address)
|
||
return ip.is_private
|
||
except ValueError:
|
||
return False
|
||
|
||
def get_client_ip(request):
|
||
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
|
||
if x_forwarded_for:
|
||
ip = x_forwarded_for.split(',')[0].strip()
|
||
else:
|
||
ip = request.META.get('REMOTE_ADDR')
|
||
|
||
if not ip or is_private_ip(ip):
|
||
return get_public_ip() # fallback for localhost/dev
|
||
return ip
|
||
|
||
def get_public_ip():
|
||
cached_ip = cache.get('public_ip')
|
||
if cached_ip:
|
||
return cached_ip
|
||
try:
|
||
response = requests.get('https://api.ipify.org?format=json', timeout=5)
|
||
response.raise_for_status()
|
||
ip = response.json().get('ip')
|
||
if ip:
|
||
cache.set('public_ip', ip, timeout=3600)
|
||
return ip
|
||
except requests.RequestException as e:
|
||
logger.error(f"Failed to fetch public IP: {e}")
|
||
return 'Unknown'
|
||
|
||
def _format_location(data):
|
||
city = data.get('city', '')
|
||
region = data.get('regionName', '')
|
||
country = data.get('country', '')
|
||
parts = [part for part in [city, region, country] if part]
|
||
return ', '.join(parts) if parts else 'Unknown'
|
||
|
||
GEO_TTL = 60 * 60 * 24 # 24 hours
|
||
|
||
def get_location_from_ip(ip_address):
|
||
cache_key = f'geo_{ip_address}'
|
||
cached = cache.get(cache_key)
|
||
if cached:
|
||
return cached
|
||
|
||
if not ip_address or is_private_ip(ip_address):
|
||
cache.set(cache_key, 'Local Network', 86400)
|
||
return 'Local Network'
|
||
|
||
|
||
try:
|
||
response = requests.get(
|
||
f'https://ip-api.com/json/{ip_address}',
|
||
params={'fields': 'status,message,country,regionName,city'},
|
||
timeout=2
|
||
)
|
||
response.raise_for_status()
|
||
data = response.json()
|
||
|
||
if data.get('status') == 'success':
|
||
location = _format_location(data)
|
||
|
||
return location
|
||
|
||
except requests.RequestException as e:
|
||
logger.error(f"Geo lookup failed for {ip_address}: {e}")
|
||
|
||
cache.set(cache_key, 'Unknown', 86400)
|
||
return 'Unknown'
|
||
|
||
|
||
# Utility Functions (updated for session data changes)
|
||
def is_admin(user):
|
||
return user.is_authenticated and user.is_staff and user.is_superuser
|
||
|
||
def _is_valid_session_key(session_key):
|
||
try:
|
||
uuid.UUID(session_key)
|
||
return True
|
||
except ValueError:
|
||
return False
|
||
|
||
def _parse_login_time(timestamp):
|
||
if not timestamp:
|
||
return None
|
||
try:
|
||
return timezone.datetime.fromisoformat(timestamp)
|
||
except (ValueError, TypeError):
|
||
logger.warning(f"Invalid timestamp: {timestamp}")
|
||
return None
|
||
|
||
# Financial Year Utilities (unchanged)
|
||
def get_fy_dates(fy):
|
||
try:
|
||
start_year = int(fy.split('-')[0])
|
||
end_year = int(fy.split('-')[1]) + 2000
|
||
start_date = timezone.make_aware(datetime(start_year, 4, 1))
|
||
end_date = timezone.make_aware(datetime(end_year, 3, 31, 23, 59, 59))
|
||
return start_date, end_date
|
||
except (ValueError, IndexError):
|
||
logger.error(f"Invalid FY format: {fy}")
|
||
today = timezone.now()
|
||
start_year = today.year if today.month >= 4 else today.year - 1
|
||
start_date = timezone.make_aware(datetime(start_year, 4, 1))
|
||
end_date = timezone.make_aware(datetime(start_year + 1, 3, 31, 23, 59, 59))
|
||
return start_date, end_date
|
||
|
||
def get_fy_choices():
|
||
today = timezone.now()
|
||
current_year = today.year if today.month >= 4 else today.year - 1
|
||
fy_choices = [f"{year}-{str(year + 1)[2:]}" for year in range(2020, current_year + 1)]
|
||
return fy_choices
|
||
|
||
# Device Detection (unchanged)
|
||
def detect_device(user_agent):
|
||
if not user_agent:
|
||
return 'Unknown'
|
||
user_agent = user_agent.lower()
|
||
if 'mobile' in user_agent and 'tablet' not in user_agent:
|
||
return 'Mobile'
|
||
elif 'tablet' in user_agent or 'ipad' in user_agent:
|
||
return 'Tablet'
|
||
elif 'windows' in user_agent or 'macintosh' in user_agent or 'linux' in user_agent:
|
||
return 'Desktop'
|
||
else:
|
||
return 'Other'
|
||
|
||
# Path Filtering (unchanged)
|
||
def is_excluded_path(path):
|
||
if not path:
|
||
return True
|
||
|
||
admin_patterns = [
|
||
r'^/admin/',
|
||
r'^/admin_session/(?!user-activity/).*$', # Exclude /admin_session/ paths except /admin_session/user-activity/
|
||
]
|
||
for pattern in admin_patterns:
|
||
if re.match(pattern, path):
|
||
return True
|
||
|
||
media_patterns = [
|
||
r'^/media/',
|
||
r'^/static/',
|
||
r'\.(jpg|jpeg|png|gif|svg|css|js|woff|woff2|ttf|eot|ico|mp4|webm|ogg|mp3|wav)$',
|
||
]
|
||
for pattern in media_patterns:
|
||
if re.search(pattern, path, re.IGNORECASE):
|
||
return True
|
||
|
||
return False
|
||
|
||
# Analytics Functions (unchanged)
|
||
def get_location_analytics(visits):
|
||
if not visits:
|
||
logger.debug("No visits for location analytics")
|
||
return {'labels': [], 'counts': [], 'percentages': [], 'data': []}
|
||
|
||
location_counts = Counter(visit['location'] for visit in visits if visit['location'])
|
||
total_visits = sum(location_counts.values())
|
||
if total_visits == 0:
|
||
logger.debug("No valid locations found")
|
||
return {'labels': [], 'counts': [], 'percentages': [], 'data': []}
|
||
|
||
top_locations = location_counts.most_common(10)
|
||
labels = [location for location, _ in top_locations]
|
||
counts = [count for _, count in top_locations]
|
||
percentages = [f"{(count / total_visits * 100):.2f}" for count in counts]
|
||
data = list(zip(labels, counts, percentages))
|
||
|
||
logger.debug(f"Location Analytics: labels={labels}, counts={counts}, percentages={percentages}")
|
||
return {
|
||
'labels': labels,
|
||
'counts': counts,
|
||
'percentages': percentages,
|
||
'data': data
|
||
}
|
||
|
||
def get_paths_by_location(visits):
|
||
if not visits:
|
||
logger.debug("No visits for path analytics")
|
||
return {'locations': []}
|
||
|
||
location_paths = defaultdict(Counter)
|
||
for visit in visits:
|
||
location = visit['location']
|
||
path = visit['path']
|
||
if location and path and not is_excluded_path(path):
|
||
location_paths[location][path] += 1
|
||
|
||
locations_data = []
|
||
for location, path_counts in location_paths.items():
|
||
total_visits = sum(path_counts.values())
|
||
if total_visits == 0:
|
||
continue
|
||
top_paths = path_counts.most_common(5)
|
||
labels = [path for path, _ in top_paths]
|
||
counts = [count for _, count in top_paths]
|
||
percentages = [f"{(count / total_visits * 100):.2f}" for count in counts]
|
||
data = list(zip(labels, counts, percentages))
|
||
locations_data.append({
|
||
'location': location,
|
||
'labels': labels,
|
||
'counts': counts,
|
||
'percentages': percentages,
|
||
'data': data
|
||
})
|
||
|
||
locations_data.sort(key=lambda x: sum(x['counts']), reverse=True)
|
||
locations_data = locations_data[:10]
|
||
|
||
logger.debug(f"Paths by Location: {locations_data}")
|
||
return {'locations': locations_data}
|
||
|
||
def get_devices_by_location(visits):
|
||
if not visits:
|
||
logger.debug("No visits for device analytics")
|
||
return {'locations': []}
|
||
|
||
location_devices = defaultdict(Counter)
|
||
for visit in visits:
|
||
location = visit['location']
|
||
device = detect_device(visit['user_agent'])
|
||
if location:
|
||
location_devices[location][device] += 1
|
||
|
||
locations_data = []
|
||
for location, device_counts in location_devices.items():
|
||
total_visits = sum(device_counts.values())
|
||
labels = list(device_counts.keys())
|
||
counts = list(device_counts.values())
|
||
percentages = [f"{(count / total_visits * 100):.2f}" for count in counts]
|
||
data = list(zip(labels, counts, percentages))
|
||
locations_data.append({
|
||
'location': location,
|
||
'labels': labels,
|
||
'counts': counts,
|
||
'percentages': percentages,
|
||
'data': data
|
||
})
|
||
|
||
locations_data.sort(key=lambda x: sum(x['counts']), reverse=True)
|
||
locations_data = locations_data[:10]
|
||
|
||
logger.debug(f"Devices by Location: {locations_data}")
|
||
return {'locations': locations_data}
|
||
@user_passes_test(is_admin)
|
||
def user_activity(request):
|
||
"""
|
||
Public Website Analytics Dashboard
|
||
- Shows only anonymous (guest) user visits to public pages
|
||
- Supports custom date range (?from=YYYY-MM-DD&to=YYYY-MM-DD)
|
||
- Falls back to Financial Year filter
|
||
"""
|
||
ip_address = get_client_ip(request)
|
||
tmp = get_city_and_country_from_ip(ip=ip_address)
|
||
current_location = tmp.get('city', 'Unknown')
|
||
|
||
# === Custom Date Range Filter ===
|
||
date_from_str = request.GET.get('from')
|
||
date_to_str = request.GET.get('to')
|
||
custom_date_range = False
|
||
start_date = end_date = None
|
||
|
||
if date_from_str and date_to_str:
|
||
try:
|
||
start_date = timezone.make_aware(datetime.strptime(date_from_str, '%Y-%m-%d'))
|
||
end_date = timezone.make_aware(
|
||
datetime.strptime(date_to_str, '%Y-%m-%d') + timedelta(days=1) - timedelta(seconds=1)
|
||
)
|
||
if start_date <= end_date:
|
||
custom_date_range = True
|
||
else:
|
||
# Swap if user entered backwards
|
||
start_date, end_date = end_date, start_date
|
||
custom_date_range = True
|
||
except ValueError:
|
||
pass # Invalid dates → fall back to FY
|
||
|
||
# === Financial Year Fallback ===
|
||
fy_choices = get_fy_choices()
|
||
selected_fy = request.GET.get('fy', fy_choices[-1] if fy_choices else '2024-25')
|
||
|
||
if not custom_date_range:
|
||
start_date, end_date = get_fy_dates(selected_fy)
|
||
|
||
display_period = (
|
||
f"{start_date.strftime('%b %d, %Y')} – {end_date.strftime('%b %d, %Y')}"
|
||
if custom_date_range else
|
||
f"FY {selected_fy}"
|
||
)
|
||
|
||
# === Fetch Public Visits (anonymous only) ===
|
||
public_visits_qs = PageVisit.objects.filter(
|
||
timestamp__range=(start_date, end_date),
|
||
user__isnull=True,
|
||
).order_by('-timestamp')
|
||
# === New vs Returning Users ===
|
||
new_users_count = PageVisit.objects.filter(
|
||
timestamp__range=(start_date, end_date),
|
||
is_new_user=True
|
||
).values('session_id').distinct().count()
|
||
|
||
returning_users_count = PageVisit.objects.filter(
|
||
timestamp__range=(start_date, end_date),
|
||
is_new_user=False
|
||
).values('session_id').distinct().count()
|
||
|
||
# Build visit data (no path exclusion needed — handled in middleware)
|
||
visit_data = []
|
||
for visit in public_visits_qs:
|
||
# Fix legacy missing locations
|
||
if not visit.location or visit.location in ['', 'Unknown']:
|
||
geo = get_city_and_country_from_ip(ip=visit.ip_address)
|
||
visit.location = geo.get('city', 'Unknown')
|
||
visit.save(update_fields=['location'])
|
||
|
||
visit_data.append({
|
||
'path': visit.path,
|
||
'location': visit.location or 'Unknown',
|
||
'user_agent': visit.user_agent or 'Unknown',
|
||
})
|
||
|
||
# Analytics
|
||
location_analytics = get_location_analytics(visit_data)
|
||
paths_by_location = get_paths_by_location(visit_data)
|
||
devices_by_location = get_devices_by_location(visit_data)
|
||
|
||
# Top 10 most visited pages
|
||
path_counter = Counter(v['path'] for v in visit_data)
|
||
top_paths = path_counter.most_common(10)
|
||
top_parts = [{'name': path, 'visits': count} for path, count in top_paths]
|
||
live_visitors_count = get_live_visitors()
|
||
|
||
context = {
|
||
'current_ip': ip_address,
|
||
'current_location': current_location,
|
||
'total_visits': len(visit_data),
|
||
'location_analytics': location_analytics,
|
||
'paths_by_location': paths_by_location,
|
||
'devices_by_location': devices_by_location,
|
||
'top_parts': top_parts,
|
||
'fy_choices': fy_choices,
|
||
'selected_fy': selected_fy,
|
||
'display_period': display_period,
|
||
'date_from': date_from_str or '',
|
||
'date_to': date_to_str or '',
|
||
'is_custom_range': custom_date_range,
|
||
'new_users_count': new_users_count,
|
||
'returning_users_count': returning_users_count,
|
||
'live_visitors_count': live_visitors_count,
|
||
|
||
}
|
||
|
||
return render(request, 'active_sessions.html', context)
|
||
|
||
def track_page_visit(request):
|
||
user = request.user if request.user.is_authenticated else None
|
||
session_id = request.session.session_key or request.session.create()
|
||
referrer = request.META.get('HTTP_REFERER', '')
|
||
|
||
# Check if user already visited in this session
|
||
last_visit = PageVisit.objects.filter(session_id=session_id).order_by('-timestamp').first()
|
||
visit_count = 1
|
||
if last_visit:
|
||
visit_count = last_visit.visit_count + 1
|
||
|
||
PageVisit.objects.create(
|
||
user=user,
|
||
session_id=session_id,
|
||
url=request.build_absolute_uri(),
|
||
path=request.path,
|
||
ip_address=get_client_ip(request),
|
||
user_agent=request.META.get('HTTP_USER_AGENT', ''),
|
||
session_data=dict(request.session),
|
||
referrer=referrer,
|
||
visit_count=visit_count
|
||
)
|
||
|
||
def get_client_ip(request):
|
||
"""Extracts IP address safely"""
|
||
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
|
||
if x_forwarded_for:
|
||
ip = x_forwarded_for.split(',')[0]
|
||
else:
|
||
ip = request.META.get('REMOTE_ADDR')
|
||
return ip
|
||
|
||
def daily_user_report():
|
||
today = timezone.now().date()
|
||
start = timezone.make_aware(datetime.combine(today, datetime.min.time()))
|
||
end = timezone.make_aware(datetime.combine(today, datetime.max.time()))
|
||
|
||
new_users = PageVisit.objects.filter(
|
||
is_new_user=True,
|
||
timestamp__range=(start, end)
|
||
).count()
|
||
|
||
returning_users = PageVisit.objects.filter(
|
||
is_new_user=False,
|
||
timestamp__range=(start, end)
|
||
).count()
|
||
|
||
DailyUserStats.objects.update_or_create(
|
||
date=today,
|
||
defaults={
|
||
'new_users': new_users,
|
||
'returning_users': returning_users
|
||
}
|
||
)
|
||
def get_live_visitors():
|
||
window_start = timezone.now() - timedelta(seconds=LIVE_WINDOW_SECONDS)
|
||
|
||
return (
|
||
PageVisit.objects
|
||
.filter(timestamp__gte=window_start)
|
||
.values('session_id')
|
||
.distinct()
|
||
.count()
|
||
)
|
||
|
||
@csrf_exempt
|
||
@require_POST
|
||
def track_click(request):
|
||
data = json.loads(request.body)
|
||
ClickEvent.objects.create(
|
||
session_id=data["session_id"],
|
||
page_path=data["path"],
|
||
element_text=data["text"],
|
||
element_type=data["type"]
|
||
)
|
||
return ''
|