from django.db import models from cryptography.fernet import InvalidToken from at_django_boilerplate.utils.hash_utils import hexdigest from at_django_boilerplate.utils.encryption_utils import get_fernet from django.db.models.signals import class_prepared from django.db import models from django.db import models from django.core.exceptions import ValidationError from django.core.validators import MinValueValidator, MaxValueValidator import json class EncryptedSearchableTextField(models.TextField): def __init__(self, *args, hash_field=None, **kwargs): self._provided_hash_field = hash_field super().__init__(*args, **kwargs) def contribute_to_class(self, cls, name, private_only=False): super().contribute_to_class(cls, name, private_only) self.hash_field = self._provided_hash_field or f"{self.attname}_hash" # Register with class_prepared (SAFE) class_prepared.connect(self._finalize_model, sender=cls, weak=False) def _finalize_model(self, sender, **kwargs): """ Runs AFTER Django has built the class, but BEFORE migrations finalize. SAFE because we avoid touching registry or relational fields. """ # Use ONLY local_fields — NEVER get_fields() (unsafe before apps are loaded) existing = {f.name for f in sender._meta.local_fields} # Add hash field dynamically if self.hash_field not in existing: sender.add_to_class( self.hash_field, models.CharField( max_length=64, null=True, blank=True, editable=False ), ) # Patch save() only once if not getattr(sender, "_encrypted_searchable_patched", False): original_save = sender.save def new_save(instance, *args, **kwargs): raw = getattr(instance, self.attname) if raw is not None: norm = raw.lower() if isinstance(raw, str) else str(raw) setattr(instance, self.hash_field, hexdigest(norm)) return original_save(instance, *args, **kwargs) sender.save = new_save sender._encrypted_searchable_patched = True # ------------------------------------------- # Encryption # ------------------------------------------- def get_prep_value(self, value): if value is None: return value f = get_fernet() return f.encrypt(value.encode()).decode() def from_db_value(self, value, expression, connection): if value is None: return value try: f = get_fernet() return f.decrypt(value.encode()).decode() except Exception: return value class EncryptedTextField(models.TextField): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Use a key derived from SECRET_KEY, safe for Fernet (32 url-safe base64-encoded bytes) self.fernet = get_fernet() def get_prep_value(self, value): if value is None: return value return self.fernet.encrypt(value.encode()).decode() def from_db_value(self, value, expression, connection): if value is None: return value try: return self.fernet.decrypt(value.encode()).decode() except (InvalidToken, AttributeError): return value # In case it's not encrypted or corrupted def to_python(self, value): if isinstance(value, str): try: return self.fernet.decrypt(value.encode()).decode() except (InvalidToken, AttributeError): return value return value class FlexibleField(models.Field): description = "Flexible typed field (text, bool, number, file)" def __init__(self, *args, data_type='text', **kwargs): self.data_type = data_type super().__init__(*args, **kwargs) def db_type(self, connection): return 'text' def get_prep_value(self, value): if value is None: return None if self.data_type == 'file' and isinstance(value, str): return json.dumps({'type': 'file', 'value': value}) if self.data_type == 'bool': return json.dumps({'type': 'bool', 'value': bool(value)}) if self.data_type == 'number': try: return json.dumps({'type': 'number', 'value': float(value)}) except ValueError: raise ValidationError(f"'{value}' is not a valid number") return json.dumps({'type': 'text', 'value': str(value)}) def from_db_value(self, value, expression, connection): if value is None: return None try: data = json.loads(value) except Exception: return value val_type = data.get('type') val = data.get('value') if val_type == 'bool': return bool(val) if val_type == 'number': return float(val) return val def validate(self, value, model_instance): if self.data_type == 'number' and not isinstance(value, (int, float)): raise ValidationError("This field only accepts numbers.") if self.data_type == 'bool' and not isinstance(value, bool): raise ValidationError("This field only accepts True/False.") super().validate(value, model_instance)