base setup
This commit is contained in:
163
at_django_boilerplate/utils/custom_fields.py
Executable file
163
at_django_boilerplate/utils/custom_fields.py
Executable file
@@ -0,0 +1,163 @@
|
||||
from django.db import models
|
||||
from cryptography.fernet import InvalidToken
|
||||
from at_django_boilerplate.utils.hash_utils import hexdigest
|
||||
from at_django_boilerplate.utils.encryption_utils import get_fernet
|
||||
from django.db.models.signals import class_prepared
|
||||
from django.db import models
|
||||
from django.db import models
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.core.validators import MinValueValidator, MaxValueValidator
|
||||
import json
|
||||
|
||||
class EncryptedSearchableTextField(models.TextField):
|
||||
|
||||
def __init__(self, *args, hash_field=None, **kwargs):
|
||||
self._provided_hash_field = hash_field
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def contribute_to_class(self, cls, name, private_only=False):
|
||||
super().contribute_to_class(cls, name, private_only)
|
||||
|
||||
self.hash_field = self._provided_hash_field or f"{self.attname}_hash"
|
||||
|
||||
# Register with class_prepared (SAFE)
|
||||
class_prepared.connect(self._finalize_model, sender=cls, weak=False)
|
||||
|
||||
def _finalize_model(self, sender, **kwargs):
|
||||
"""
|
||||
Runs AFTER Django has built the class, but BEFORE migrations finalize.
|
||||
SAFE because we avoid touching registry or relational fields.
|
||||
"""
|
||||
|
||||
# Use ONLY local_fields — NEVER get_fields() (unsafe before apps are loaded)
|
||||
existing = {f.name for f in sender._meta.local_fields}
|
||||
|
||||
# Add hash field dynamically
|
||||
if self.hash_field not in existing:
|
||||
sender.add_to_class(
|
||||
self.hash_field,
|
||||
models.CharField(
|
||||
max_length=64,
|
||||
null=True,
|
||||
blank=True,
|
||||
editable=False
|
||||
),
|
||||
)
|
||||
|
||||
# Patch save() only once
|
||||
if not getattr(sender, "_encrypted_searchable_patched", False):
|
||||
|
||||
original_save = sender.save
|
||||
|
||||
def new_save(instance, *args, **kwargs):
|
||||
raw = getattr(instance, self.attname)
|
||||
if raw is not None:
|
||||
norm = raw.lower() if isinstance(raw, str) else str(raw)
|
||||
setattr(instance, self.hash_field, hexdigest(norm))
|
||||
return original_save(instance, *args, **kwargs)
|
||||
|
||||
sender.save = new_save
|
||||
sender._encrypted_searchable_patched = True
|
||||
|
||||
# -------------------------------------------
|
||||
# Encryption
|
||||
# -------------------------------------------
|
||||
def get_prep_value(self, value):
|
||||
if value is None:
|
||||
return value
|
||||
f = get_fernet()
|
||||
return f.encrypt(value.encode()).decode()
|
||||
|
||||
def from_db_value(self, value, expression, connection):
|
||||
if value is None:
|
||||
return value
|
||||
try:
|
||||
f = get_fernet()
|
||||
return f.decrypt(value.encode()).decode()
|
||||
except Exception:
|
||||
return value
|
||||
|
||||
|
||||
|
||||
|
||||
class EncryptedTextField(models.TextField):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
# Use a key derived from SECRET_KEY, safe for Fernet (32 url-safe base64-encoded bytes)
|
||||
self.fernet = get_fernet()
|
||||
|
||||
def get_prep_value(self, value):
|
||||
if value is None:
|
||||
return value
|
||||
return self.fernet.encrypt(value.encode()).decode()
|
||||
|
||||
def from_db_value(self, value, expression, connection):
|
||||
if value is None:
|
||||
return value
|
||||
try:
|
||||
return self.fernet.decrypt(value.encode()).decode()
|
||||
except (InvalidToken, AttributeError):
|
||||
return value # In case it's not encrypted or corrupted
|
||||
|
||||
def to_python(self, value):
|
||||
if isinstance(value, str):
|
||||
try:
|
||||
return self.fernet.decrypt(value.encode()).decode()
|
||||
except (InvalidToken, AttributeError):
|
||||
return value
|
||||
return value
|
||||
|
||||
|
||||
|
||||
|
||||
class FlexibleField(models.Field):
|
||||
description = "Flexible typed field (text, bool, number, file)"
|
||||
|
||||
def __init__(self, *args, data_type='text', **kwargs):
|
||||
self.data_type = data_type
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def db_type(self, connection):
|
||||
return 'text'
|
||||
|
||||
def get_prep_value(self, value):
|
||||
if value is None:
|
||||
return None
|
||||
|
||||
if self.data_type == 'file' and isinstance(value, str):
|
||||
return json.dumps({'type': 'file', 'value': value})
|
||||
|
||||
if self.data_type == 'bool':
|
||||
return json.dumps({'type': 'bool', 'value': bool(value)})
|
||||
|
||||
if self.data_type == 'number':
|
||||
try:
|
||||
return json.dumps({'type': 'number', 'value': float(value)})
|
||||
except ValueError:
|
||||
raise ValidationError(f"'{value}' is not a valid number")
|
||||
|
||||
return json.dumps({'type': 'text', 'value': str(value)})
|
||||
|
||||
def from_db_value(self, value, expression, connection):
|
||||
if value is None:
|
||||
return None
|
||||
try:
|
||||
data = json.loads(value)
|
||||
except Exception:
|
||||
return value
|
||||
val_type = data.get('type')
|
||||
val = data.get('value')
|
||||
|
||||
if val_type == 'bool':
|
||||
return bool(val)
|
||||
if val_type == 'number':
|
||||
return float(val)
|
||||
return val
|
||||
|
||||
def validate(self, value, model_instance):
|
||||
if self.data_type == 'number' and not isinstance(value, (int, float)):
|
||||
raise ValidationError("This field only accepts numbers.")
|
||||
if self.data_type == 'bool' and not isinstance(value, bool):
|
||||
raise ValidationError("This field only accepts True/False.")
|
||||
super().validate(value, model_instance)
|
||||
Reference in New Issue
Block a user