163 lines
5.4 KiB
Python
Executable File
163 lines
5.4 KiB
Python
Executable File
from django.db import models
|
|
from cryptography.fernet import InvalidToken
|
|
from at_django_boilerplate.utils.hash_utils import hexdigest
|
|
from at_django_boilerplate.utils.encryption_utils import get_fernet
|
|
from django.db.models.signals import class_prepared
|
|
from django.db import models
|
|
from django.db import models
|
|
from django.core.exceptions import ValidationError
|
|
from django.core.validators import MinValueValidator, MaxValueValidator
|
|
import json
|
|
|
|
class EncryptedSearchableTextField(models.TextField):
|
|
|
|
def __init__(self, *args, hash_field=None, **kwargs):
|
|
self._provided_hash_field = hash_field
|
|
super().__init__(*args, **kwargs)
|
|
|
|
def contribute_to_class(self, cls, name, private_only=False):
|
|
super().contribute_to_class(cls, name, private_only)
|
|
|
|
self.hash_field = self._provided_hash_field or f"{self.attname}_hash"
|
|
|
|
# Register with class_prepared (SAFE)
|
|
class_prepared.connect(self._finalize_model, sender=cls, weak=False)
|
|
|
|
def _finalize_model(self, sender, **kwargs):
|
|
"""
|
|
Runs AFTER Django has built the class, but BEFORE migrations finalize.
|
|
SAFE because we avoid touching registry or relational fields.
|
|
"""
|
|
|
|
# Use ONLY local_fields — NEVER get_fields() (unsafe before apps are loaded)
|
|
existing = {f.name for f in sender._meta.local_fields}
|
|
|
|
# Add hash field dynamically
|
|
if self.hash_field not in existing:
|
|
sender.add_to_class(
|
|
self.hash_field,
|
|
models.CharField(
|
|
max_length=64,
|
|
null=True,
|
|
blank=True,
|
|
editable=False
|
|
),
|
|
)
|
|
|
|
# Patch save() only once
|
|
if not getattr(sender, "_encrypted_searchable_patched", False):
|
|
|
|
original_save = sender.save
|
|
|
|
def new_save(instance, *args, **kwargs):
|
|
raw = getattr(instance, self.attname)
|
|
if raw is not None:
|
|
norm = raw.lower() if isinstance(raw, str) else str(raw)
|
|
setattr(instance, self.hash_field, hexdigest(norm))
|
|
return original_save(instance, *args, **kwargs)
|
|
|
|
sender.save = new_save
|
|
sender._encrypted_searchable_patched = True
|
|
|
|
# -------------------------------------------
|
|
# Encryption
|
|
# -------------------------------------------
|
|
def get_prep_value(self, value):
|
|
if value is None:
|
|
return value
|
|
f = get_fernet()
|
|
return f.encrypt(value.encode()).decode()
|
|
|
|
def from_db_value(self, value, expression, connection):
|
|
if value is None:
|
|
return value
|
|
try:
|
|
f = get_fernet()
|
|
return f.decrypt(value.encode()).decode()
|
|
except Exception:
|
|
return value
|
|
|
|
|
|
|
|
|
|
class EncryptedTextField(models.TextField):
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
|
|
# Use a key derived from SECRET_KEY, safe for Fernet (32 url-safe base64-encoded bytes)
|
|
self.fernet = get_fernet()
|
|
|
|
def get_prep_value(self, value):
|
|
if value is None:
|
|
return value
|
|
return self.fernet.encrypt(value.encode()).decode()
|
|
|
|
def from_db_value(self, value, expression, connection):
|
|
if value is None:
|
|
return value
|
|
try:
|
|
return self.fernet.decrypt(value.encode()).decode()
|
|
except (InvalidToken, AttributeError):
|
|
return value # In case it's not encrypted or corrupted
|
|
|
|
def to_python(self, value):
|
|
if isinstance(value, str):
|
|
try:
|
|
return self.fernet.decrypt(value.encode()).decode()
|
|
except (InvalidToken, AttributeError):
|
|
return value
|
|
return value
|
|
|
|
|
|
|
|
|
|
class FlexibleField(models.Field):
|
|
description = "Flexible typed field (text, bool, number, file)"
|
|
|
|
def __init__(self, *args, data_type='text', **kwargs):
|
|
self.data_type = data_type
|
|
super().__init__(*args, **kwargs)
|
|
|
|
def db_type(self, connection):
|
|
return 'text'
|
|
|
|
def get_prep_value(self, value):
|
|
if value is None:
|
|
return None
|
|
|
|
if self.data_type == 'file' and isinstance(value, str):
|
|
return json.dumps({'type': 'file', 'value': value})
|
|
|
|
if self.data_type == 'bool':
|
|
return json.dumps({'type': 'bool', 'value': bool(value)})
|
|
|
|
if self.data_type == 'number':
|
|
try:
|
|
return json.dumps({'type': 'number', 'value': float(value)})
|
|
except ValueError:
|
|
raise ValidationError(f"'{value}' is not a valid number")
|
|
|
|
return json.dumps({'type': 'text', 'value': str(value)})
|
|
|
|
def from_db_value(self, value, expression, connection):
|
|
if value is None:
|
|
return None
|
|
try:
|
|
data = json.loads(value)
|
|
except Exception:
|
|
return value
|
|
val_type = data.get('type')
|
|
val = data.get('value')
|
|
|
|
if val_type == 'bool':
|
|
return bool(val)
|
|
if val_type == 'number':
|
|
return float(val)
|
|
return val
|
|
|
|
def validate(self, value, model_instance):
|
|
if self.data_type == 'number' and not isinstance(value, (int, float)):
|
|
raise ValidationError("This field only accepts numbers.")
|
|
if self.data_type == 'bool' and not isinstance(value, bool):
|
|
raise ValidationError("This field only accepts True/False.")
|
|
super().validate(value, model_instance) |