work from weekend

This commit is contained in:
Gabor Körber 2021-05-25 18:07:16 +02:00
parent c1fde84ef7
commit 006ab18a19
28 changed files with 1096 additions and 0 deletions

0
blackmesa/__init__.py Normal file
View File

View File

3
blackmesa/core/admin.py Normal file
View File

@ -0,0 +1,3 @@
from django.contrib import admin
# Register your models here.

5
blackmesa/core/apps.py Normal file
View File

@ -0,0 +1,5 @@
from django.apps import AppConfig
class CoreConfig(AppConfig):
name = 'core'

View File

@ -0,0 +1,9 @@
from dataclasses import dataclass
@dataclass
class Planet:
name: str
weight: float
size: float
moons: int

View File

@ -0,0 +1,33 @@
# Generated by Django 2.2.17 on 2021-05-22 10:56
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
initial = True
dependencies = [
]
operations = [
migrations.CreateModel(
name='Celestial',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name', models.CharField(max_length=100)),
('size', models.FloatField()),
('weight', models.FloatField()),
],
),
migrations.CreateModel(
name='Orbit',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('distance', models.FloatField()),
('celestial', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='orbits', to='core.Celestial')),
('parent', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='children', to='core.Celestial')),
],
),
]

View File

19
blackmesa/core/models.py Normal file
View File

@ -0,0 +1,19 @@
from django.db import models
from .workbench import PureManager
# Create your models here.
class Celestial(models.Model):
name = models.CharField(max_length=100)
size = models.FloatField()
weight = models.FloatField()
objects = PureManager()
class Orbit(models.Model):
parent = models.ForeignKey(Celestial, related_name='children', on_delete=models.CASCADE)
celestial = models.ForeignKey(Celestial, related_name='orbits', on_delete=models.CASCADE)
distance = models.FloatField()
objects = PureManager()

View File

View File

@ -0,0 +1,19 @@
from .. import models
import factory
import factory.fuzzy
class CelestialFactory(factory.django.DjangoModelFactory):
name = factory.Faker('city')
weight = factory.fuzzy.FuzzyFloat(100.0, 100000.0)
size = factory.fuzzy.FuzzyFloat(1.0, 8.0)
class Meta:
model = models.Celestial
class OrbitFactory(factory.DjangoModelFactory):
parent = factory.SubFactory(CelestialFactory)
celestial = factory.SubFactory(CelestialFactory)
distance = factory.fuzzy.FuzzyFloat(2.0, 140.0)
class Meta:
model = models.Orbit

View File

@ -0,0 +1,49 @@
from dataclasses import dataclass
from django.test import TestCase
from ..models import Celestial, Orbit
from ..workbench import Lambda, PureDict
from .factories import CelestialFactory, OrbitFactory
from django.db.models.expressions import Expression, Combinable, Value
from django.db.models import F
from django.utils import functional
# Create your tests here.
class TestBase(TestCase):
def setUp(self):
TestCase.setUp(self)
self.celestials = CelestialFactory.create_batch(10)
self.orbits = []
for celestial in self.celestials[1:]:
self.orbits.append(OrbitFactory(parent=self.celestials[0], celestial=celestial))
def test_data(self):
self.assertEqual(Celestial.objects.all().count(), len(self.celestials))
self.assertEqual(Orbit.objects.all().count(), len(self.orbits))
def test_building_blocks(self):
klass = dict
queryset = Celestial.objects.purify(PureDict(), hi=Lambda(lambda x:'hi'))
data = queryset[0]
self.assertIsInstance(data, klass)
self.assertEqual(data['hi'], 'hi')
def test_dataclass(self):
@dataclass
class MyDataclass:
id: int
name: str
hi: str
size: float
weight: float
klass = MyDataclass
queryset = Celestial.objects.purify(klass, hi=Lambda(lambda x:'hi'))
data = queryset[3]
self.assertIsInstance(data, klass)
self.assertEqual(data.hi, 'hi')
first = Celestial.objects.purify(klass, hi=F('name')).first()
self.assertIsInstance(first, klass)
self.assertEqual(first.hi, first.name)

3
blackmesa/core/views.py Normal file
View File

@ -0,0 +1,3 @@
from django.shortcuts import render
# Create your views here.

240
blackmesa/core/workbench.py Normal file
View File

@ -0,0 +1,240 @@
from django.db.models import QuerySet
from django.db.models.expressions import BaseExpression, Combinable
from django.db.models.query import ValuesIterable
from django.db.models.manager import Manager
"""
Base Idea:
- to have a queryset function that easily allows you to build some sort of "pure class" in a queryset.
- make it able to seamlessly use annotation functions
- allow callback modication of data, which is needed if your dataclass is frozen.
By default the PureManager and PureQuerySet will use PureDataclass as handler, expecting your pureclass to be a dataclasses.dataclass type.
An Example:
@dataclass
class MyClass
id: int
some_relation: str
next_id: int
SomeModel.objects.filter(...).exclude(...).purify(MyClass, some_relation=F('some__model__relation'), next_id=Lambda(lambda x: x.get('id')+1))
This allows you to move an iterator into another layer, where it either can be consumed, or used as an input for another queryset call,
but still guaranteeing, that regular usage of the iterator will not yield any smart object.
It is good if you want to build a best-of-both-worlds approach for subquery-capable repository pattern, which is one of the biggest issues if you want
to keep your business logic out of the repo layer, but still want to utilize djangos queryset mechanics properly.
"""
## Useful for queryset function purify()
class BaseLeap:
skip = False # if skip is true, this leap will not be actually processed.
resolves_field = True # if resolves_field is true, this leap will be called for a single field with resolve()
post_processing = False # if post_processing is true, this leap will in the end be called with dbdata, and be able to manipulate the whole dictionary.
def resolve(self, model, dbdata):
raise NotImplementedError
def post_process(self, model, dbdata):
raise NotImplementedError
class Leap(BaseLeap):
""" value function that leaps SQL handling. """
def __init__(self, value=None):
self.value = value
def resolve(self, model, dbdata):
return self.value
class Lambda(Leap):
""" leap value that calls a lambda expression. """
def __init__(self, callback):
self.callback = callback if callable(callback) else None
def resolve(self, model, dbdata):
# at this point i could check if callback needs 0-2 arguments and decide the call.
if self.callback:
return self.callback(dbdata)
class Skip(BaseLeap):
""" Skips this key from being retrieved from the database or used in the dataclass instantiation """
skip = True
resolves_field = False
class Callback(BaseLeap):
resolves_field = False
post_processing = True
def __init__(self, callback):
self.callback = callback
def post_process(self, model, dbdata):
if self.callback:
return self.callback(dbdata)
## Wrapper to handle some sort of pure baseclass
class PureHandler:
""" handler for a pure baseclass
defines how a pureclass can be created, and how to retrieve all field names, and the required ones.
"""
@classmethod
def wrap(cls, klass):
return cls(klass)
def __init__(self, klass):
self.klass = klass
def create(self, **kwargs):
return self.klass(**kwargs)
def get_field_names(self):
return self.klass.__dict__.keys()
@property
def pureclass(self):
return self.klass
@property
def required_keys(self):
return self.get_field_names()
class PureDict(PureHandler):
""" PureHandler that outputs a dictionary """
def __init__(self, klass=None):
# it is not required to define dict, but you could do OrderedDict e.g.
self.klass = klass or dict
def get_field_names(self):
# dictionary has no required fields.
return []
class PureDataclass(PureHandler):
""" handles dataclasses.dataclass derivatives """
def create(self, **kwargs):
# clean field names to be only valid if they are on the dataclass.
pure_fields = self.get_field_names()
kwargs = {k: v for k, v in kwargs.items() if k in pure_fields}
return self.klass(**kwargs)
def get_field_names(self):
return list(self.klass.__dataclass_fields__.keys())
# @TODO: PurePydantic
###### QuerySet Plugin.
class PureIterable(ValuesIterable):
"""
Iterable returned by purify() that yields a pure class for each row.
Replaces the standard iterable of the queryset.
"""
def __iter__(self):
queryset = self.queryset
model = self.queryset.model
query = queryset.query
compiler = query.get_compiler(queryset.db)
pure_data = getattr(queryset, '_pureclass_extra', {})
pure_handler = queryset._pureclass
# extra(select=...) cols are always at the start of the row.
names = [
*query.extra_select,
*query.values_select,
*query.annotation_select,
]
indexes = range(len(names))
for row in compiler.results_iter(chunked_fetch=self.chunked_fetch, chunk_size=self.chunk_size):
dbdata = {names[i]: row[i] for i in indexes}
# post-processors will be able to rewrite the whole dictionary.
post_processors = []
# we overwrite db data bluntly for now. actually we would provide callbacks the current dict.
for k, v in pure_data.items():
if v.resolves_field:
dbdata[k] = v.resolve(model, dbdata)
if v.post_processing:
post_processors.append(v)
if post_processors:
for processor in post_processors:
processed = processor.post_process(model, dbdata)
if processed is not None:
dbdata = processed
yield pure_handler.create(**dbdata)
class PureQuerySetMixin:
_pureclass_handler = PureDataclass
def purify(self, *args, **kwargs):
"""
generates pure objects
Acts like values(), however:
- first argument is a pureclass or purehandler, if not a string.
- if pureclass is not defined in purify, you have to define it on the queryset, or the model, with _pureclass,
otherwise it will raise a RuntimeError.
- keyword arguments of type "Leap" are used as deferred values, and resolved independently.
- values() is called with every required_key on the dataclass not handled by a Leap
"""
if len(args) and not isinstance(args[0], str):
# we assume this is our dataclass
# @TODO better checks.
handler = args[0]
args = args[1:]
else:
# determine dataclass.
handler = getattr(self, '_pureclass', getattr(self.model, '_pureclass', None))
if not handler:
raise RuntimeError("Trying to purify a class without destination class.")
if not isinstance(handler, PureHandler):
handler = self._pureclass_handler.wrap(handler)
all_keys = [*args, *kwargs.keys()]
unhandled_keys = list(set(handler.required_keys) - set(all_keys))
args = [*args, *unhandled_keys]
new_kw = {}
extra = {}
for k, v in kwargs.items():
if isinstance(v, Leap):
if not v.skip:
extra[k] = v
elif isinstance(v, BaseExpression) or isinstance(v, Combinable):
new_kw[k] = v
else:
new_kw[k] = v
# copy ourself with values() and save the results on the cloned queryset.
values = self.values(*args, **new_kw)
values._iterable_class = PureIterable
values._pureclass_extra = extra
values._pureclass = handler
return values
class PureQuerySet(PureQuerySetMixin, QuerySet):
# overwrite cloning.
def _clone(self):
c = super()._clone()
for key in ['_pureclass', '_pureclass_extra', '_pureclass_handler']:
if hasattr(self, key):
setattr(c, key, getattr(self, key))
return c
# i use a mixin instead for better clarity. purify is completely safe, as it does not call _chain.
# however you can also simply do:
#class PureManager(BaseManager.from_queryset(PureQuerySet)):
# pass
class PureManager(PureQuerySetMixin, Manager):
def get_queryset(self):
return PureQuerySet(self.model, using=self._db)

View File

View File

@ -0,0 +1,249 @@
from django.db.models import QuerySet
from django.db.models.expressions import BaseExpression, Combinable
from django.db.models.query import ValuesIterable
from django.db.models.manager import Manager
"""
Base Idea:
- to have a queryset function that easily allows you to build some sort of "record type class" in a queryset.
- make it able to seamlessly use annotation functions
- allow modification of initial values with callbacks, which is needed if your dataclass is frozen.
By default the RecordManager and RecordQuerySet will use RecordDataclass as handler, expecting your record to be a dataclasses.dataclass type.
An Example:
@dataclass
class MyDataClass
id: int
some_relation: str
next_id: int
SomeModel.objects.filter(...).records(MyDataClass, some_relation=F('model__relation'), next_id=Lambda(lambda x: x.get('id')+1))
This allows you to move an iterator into another layer, where it either can be consumed, or used as an input for another queryset call,
but still guaranteeing, that regular usage of the iterator will not yield any smart object.
It is good if you want to build a best-of-both-worlds approach for subquery-capable repository pattern, which is one of the biggest issues if you want
to keep your business logic out of the repo layer, but still want to utilize djangos queryset mechanics properly.
records() will take anything values() would take, but additionally it allows:
- to pass the record type as first argument
- to pass Adjunct classes as keyword argument value
"""
## Useful for queryset function records()
class BaseAdjunct:
"""
Any Adjunct data which does not translate into SQL, but rather adds data programmatically.
"""
skip = False # if skip is true, this adjunct will not be actually processed.
resolves_field = True # if resolves_field is true, this adjunct will be called for a single field with resolve()
post_processing = False # if post_processing is true, this adjunct will in the end be called with dbdata, and be able to manipulate the whole dictionary.
def resolve(self, model, dbdata):
raise NotImplementedError
def post_process(self, model, dbdata):
raise NotImplementedError
class Adjunct(BaseAdjunct):
""" value function that adds data, without SQL handling. """
def __init__(self, value=None):
self.value = value
def resolve(self, model, dbdata):
return self.value
class Lambda(Adjunct):
""" adjunct value that returns a field value with a callback. """
def __init__(self, callback):
self.callback = callback if callable(callback) else None
def resolve(self, model, dbdata):
# at this point i could check if callback needs 0-2 arguments and decide the call.
if self.callback:
return self.callback(dbdata)
class Skip(BaseAdjunct):
""" Skips this key from being retrieved from the database or used in the dataclass instantiation """
skip = True
resolves_field = False
class Callback(BaseAdjunct):
""" calls a callback which can modify the whole initialization dictionary. """
resolves_field = False
post_processing = True
def __init__(self, callback):
self.callback = callback
def post_process(self, model, dbdata):
if self.callback:
return self.callback(dbdata)
## Wrapper to handle some sort of record baseclass
class RecordHandler:
""" handler for a record type
defines how a record can be created, and how to retrieve all field names, and the required ones.
"""
@classmethod
def wrap(cls, klass):
return cls(klass)
def __init__(self, klass):
self.klass = klass
def create(self, **kwargs):
return self.klass(**kwargs)
def get_field_names(self):
return self.klass.__dict__.keys()
@property
def record(self):
return self.klass
@property
def required_keys(self):
return self.get_field_names()
class RecordDict(RecordHandler):
""" RecordHandler that outputs a dictionary """
def __init__(self, klass=None):
# it is not required to define dict, but you could do OrderedDict e.g.
self.klass = klass or dict
def get_field_names(self):
# dictionary has no required fields.
return []
class RecordDataclass(RecordHandler):
""" handles dataclasses.dataclass derivatives """
def create(self, **kwargs):
# clean field names to be only valid if they are on the dataclass.
record_fields = self.get_field_names()
kwargs = {k: v for k, v in kwargs.items() if k in record_fields}
return self.klass(**kwargs)
def get_field_names(self):
return list(self.klass.__dataclass_fields__.keys())
# @TODO: RecordPydantic
# @TODO: RecordAttrs
###### QuerySet Plugin.
class RecordIterable(ValuesIterable):
"""
Iterable returned by records() that yields a record class for each row.
Replaces the standard iterable of the queryset.
"""
def __iter__(self):
queryset = self.queryset
model = self.queryset.model
query = queryset.query
compiler = query.get_compiler(queryset.db)
record_data = getattr(queryset, '_record_extra', {})
record_handler = queryset._record
# extra(select=...) cols are always at the start of the row.
names = [
*query.extra_select,
*query.values_select,
*query.annotation_select,
]
indexes = range(len(names))
for row in compiler.results_iter(chunked_fetch=self.chunked_fetch, chunk_size=self.chunk_size):
dbdata = {names[i]: row[i] for i in indexes}
# post-processors will be able to rewrite the whole dictionary.
post_processors = []
# we overwrite db data bluntly for now. actually we would provide callbacks the current dict.
for k, v in record_data.items():
if v.resolves_field:
dbdata[k] = v.resolve(model, dbdata)
if v.post_processing:
post_processors.append(v)
if post_processors:
for processor in post_processors:
processed = processor.post_process(model, dbdata)
if processed is not None:
dbdata = processed
yield record_handler.create(**dbdata)
class RecordQuerySetMixin:
_record_handler = RecordDataclass
def records(self, *args, **kwargs):
"""
generates record objects
Acts like values(), however:
- you can pass a record type or RecordHandler as first argument.
- if record type is not defined in records(), you have to define it on the queryset, or the model, with _record,
otherwise it will raise a RuntimeError.
- keyword arguments of type "Adjunct" are used as deferred values, and resolved independently.
- values() is called with every required_key on the dataclass not handled by an Adjunct
"""
if len(args) and not isinstance(args[0], str):
# we assume this is our dataclass
# @TODO better checks.
handler = args[0]
args = args[1:]
else:
# determine dataclass.
handler = getattr(self, '_record', getattr(self.model, '_record', None))
if not handler:
raise RuntimeError("Trying to records a class without destination class.")
if not isinstance(handler, RecordHandler):
handler = self._record_handler.wrap(handler)
all_keys = [*args, *kwargs.keys()]
unhandled_keys = list(set(handler.required_keys) - set(all_keys))
args = [*args, *unhandled_keys]
new_kw = {}
extra = {}
for k, v in kwargs.items():
if isinstance(v, BaseAdjunct):
if not v.skip:
extra[k] = v
elif isinstance(v, BaseExpression) or isinstance(v, Combinable):
new_kw[k] = v
else:
new_kw[k] = v
# copy ourself with values() and save the results on the cloned queryset.
values = self.values(*args, **new_kw)
values._iterable_class = RecordIterable
values._record_extra = extra
values._record = handler
return values
class RecordQuerySet(RecordQuerySetMixin, QuerySet):
# overwrite cloning.
def _clone(self):
c = super()._clone()
for key in ['_record', '_record_extra', '_record_handler']:
if hasattr(self, key):
setattr(c, key, getattr(self, key))
return c
# i use a mixin instead for better clarity. records is completely safe, as it does not call _chain.
# however you can also simply do:
#class RecordManager(BaseManager.from_queryset(RecordQuerySet)):
# pass
class RecordManager(RecordQuerySetMixin, Manager):
def get_queryset(self):
return RecordQuerySet(self.model, using=self._db)

View File

View File

@ -0,0 +1,7 @@
from django.apps import AppConfig
# Only include tests in your INSTALLED_APPS if you want to test against django models.
class CommonTestsConfig(AppConfig):
name = 'tests.celestials'
label = 'celestials_tests'

View File

@ -0,0 +1,47 @@
from . import models
import factory
import factory.fuzzy
class CelestialFactory(factory.django.DjangoModelFactory):
orbits = factory.LazyAttribute(lambda c: CelestialFactory(celestial_type=c.celestial_type - 1) if c.celestial_type and c.celestial_type > 1 else None)
name = factory.Faker('city')
# 1 sun, 7 planets, 3 moons, 4 asteroids, 5 stations
celestial_type = factory.Iterator([1, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5, 5])
weight = factory.fuzzy.FuzzyFloat(100.0, 100000.0)
size = factory.fuzzy.FuzzyFloat(1.0, 8.0)
class Meta:
model = models.Celestial
class PersonFactory(factory.DjangoModelFactory):
origin = factory.SubFactory(CelestialFactory)
first_name = factory.Faker('first_name')
last_name = factory.Faker('last_name')
age = factory.fuzzy.FuzzyInteger(9, 79)
class Meta:
model = models.Person
class SpaceportFactory(factory.DjangoModelFactory):
name = factory.LazyAttribute(lambda sp: f'Port {sp.celestial.name}')
celestial = factory.SubFactory(CelestialFactory, celestial_type=factory.Iterator([2,2,3,4,5]))
class Meta:
model = models.Spaceport
class VisitorFactory(factory.DjangoModelFactory):
person = factory.SubFactory(PersonFactory)
spaceport = factory.SubFactory(SpaceportFactory)
luggage_weight = factory.fuzzy.FuzzyFloat(1.0, 100.0)
class Meta:
model = models.Visitor
class CitizenFactory(factory.DjangoModelFactory):
planet = factory.SubFactory(CelestialFactory, celestial_type=2)
person = factory.SubFactory(PersonFactory, origin=factory.SelfAttribute('planet'))
clearance_level = factory.fuzzy.FuzzyInteger(0, 4)
class Meta:
model = models.Citizen

View File

@ -0,0 +1,47 @@
from .factories import CelestialFactory, SpaceportFactory
class Stars:
@classmethod
def create_sol(cls, context=None):
if context is None:
context = object()
celestials = [CelestialFactory(name="Sol", celestial_type=1, size=100)]
context.sun = sun = celestials[0]
context.planets = planets = [
CelestialFactory(name='Mercur', celestial_type=2, orbits=sun, size=2.4), #0
CelestialFactory(name='Venus', celestial_type=2, orbits=sun, size=6),
CelestialFactory(name='Terra', celestial_type=2, orbits=sun, size=6.4), #2
CelestialFactory(name='Mars', celestial_type=2, orbits=sun, size=3.4),
CelestialFactory(name='Jupiter',celestial_type=2, orbits=sun, size=69.9), #4
CelestialFactory(name='Saturn', celestial_type=2, orbits=sun, size=58.2),
CelestialFactory(name='Uranus', celestial_type=2, orbits=sun, size=25.4), #6
CelestialFactory(name='Neptune',celestial_type=2, orbits=sun, size=24.6),
CelestialFactory(name='Pluto',celestial_type=3, orbits=sun, size=1.1), #8
]
celestials.extend(planets)
context.moons = moons = [
CelestialFactory(name='Luna', celestial_type=3, orbits=planets[2], size=1.7), #0
CelestialFactory(name='Phobos', celestial_type=4, orbits=planets[3], size=0.006),
CelestialFactory(name='Deimos', celestial_type=4, orbits=planets[3], size=0.011), #2
CelestialFactory(name='Io', celestial_type=3, orbits=planets[4], size=1.8),
CelestialFactory(name='Europa', celestial_type=3, orbits=planets[4], size=1.5), #4
CelestialFactory(name='Ganymede', celestial_type=3, orbits=planets[4], size=2.6),
CelestialFactory(name='Callisto', celestial_type=3, orbits=planets[4], size=2.4), #6
#...
CelestialFactory(name='Charon', celestial_type=4, orbits=planets[8], size=0.6)
]
celestials.extend(moons)
context.celestials = celestials
# create space ports
context.spaceports = [
SpaceportFactory(celestial=planets[2], name="Houston IPS", ),
SpaceportFactory(celestial=moons[0], name='Copernicus'),
SpaceportFactory(celestial=planets[3], name='Utopia Planitia'),
SpaceportFactory(celestial=moons[2], name='Ares Station'),
]

View File

@ -0,0 +1,50 @@
from django.db import models
from ...records import RecordManager
class Celestial(models.Model):
CELESTIAL_TYPES = ((0, 'Unknown'),
(1, 'Star'),
(2, 'Planet'),
(3, 'Planetoid'),
(4, 'Asteroid'),
(5, 'Station'))
orbits = models.ForeignKey('self', blank=True, null=True, related_name='orbitals', on_delete=models.CASCADE)
name = models.CharField(max_length=100)
celestial_type = models.IntegerField(choices=CELESTIAL_TYPES, default=int)
weight = models.FloatField(default=float)
size = models.FloatField(default=float)
objects = RecordManager()
@property
def is_moon(self):
return 5 > self.celestial_type > 1 and self.orbits and 5 > self.orbits.celestial_type > 1
class Spaceport(models.Model):
name = models.CharField(max_length=100)
celestial = models.ForeignKey(Celestial, related_name='spaceports', on_delete=models.CASCADE)
objects = RecordManager()
class Person(models.Model):
origin = models.ForeignKey(Celestial, related_name='children', blank=True, null=True, on_delete=models.CASCADE)
first_name = models.CharField(max_length=100)
last_name = models.CharField(max_length=100)
age = models.IntegerField(blank=True, null=True)
objects = RecordManager()
class Visitor(models.Model):
person = models.ForeignKey(Person, related_name='as_visitor', on_delete=models.CASCADE)
spaceport = models.ForeignKey(Spaceport, related_name='visitors', on_delete=models.CASCADE)
luggage_weight = models.FloatField(blank=True, null=True, default=float)
objects = RecordManager()
class Citizen(models.Model):
planet = models.ForeignKey(Celestial, related_name='citizens', on_delete=models.CASCADE)
person = models.ForeignKey(Person, related_name='citizenships', on_delete=models.CASCADE)
clearance_level = models.IntegerField(blank=True, null=True)
objects = RecordManager()

View File

@ -0,0 +1,7 @@
# Celestials Test Database
this is a royalty free no guarantees given test case
The galaxy.Stars helper class can create a sun system as test data.
Part of StarGenerator

View File

@ -0,0 +1,2 @@
Django < 3
factory

View File

@ -0,0 +1,126 @@
from dataclasses import dataclass
from unittest.case import skipIf
from django.db.models import F
from django.test.testcases import TestCase
import mock
from ..records import Lambda, Adjunct, Callback
from django.test.utils import tag
from click.types import INT
try:
from .celestials.models import Celestial
from .celestials.galaxy import Stars
celestials_installed = True
except RuntimeError:
celestials_installed = False
@dataclass
class Entity:
id: int
@dataclass
class SpaceRock:
id: int
name: str
orbits_name: str
is_moon: bool
@tag('library')
@skipIf(not celestials_installed, "Celestials Testpackage not installed into INSTALLED_APPS.")
class TestQueryBuilder(TestCase):
def setUp(self):
super().setUp()
Stars.create_sol(context=self)
def test_records(self):
entities = Celestial.objects.filter(orbits__name='Sol', celestial_type__lte=4).records(Entity)
self.assertEqual(len(entities), len(self.planets))
# test whether we really return dataclass as result, even with first.
self.assertIsInstance(entities.first(), Entity)
# find moons. test whether i can use entities to do an SQL query. works because i have only one key.
self.assertEqual(len(self.moons), Celestial.objects.filter(orbits__in=entities).count())
# this is pretty much the same as
self.assertEqual(len(self.moons), len(Celestial.objects.filter(
orbits__in=Celestial.objects.filter(orbits__name='Sol', celestial_type__lte=4)).values_list('id', flat=True)))
def test_lambda(self):
# this tests whether our own celestial type or the celestial type of what we orbit is correct for being a moon. parameter is a dictionary.
is_moon = lambda entry: True if 5 > (entry.get('celestial_type') or 0) > 1 and 5 > (entry.get('orbits_type') or 0) > 1 else False
entities = Celestial.objects.records(SpaceRock, # we want our output to be a SpaceRock dataclass.
'celestial_type', # we include the key celestial_type into our query.
id=Adjunct(None), # we blank out id to test Adjunct working.
orbits_name=F('orbits__name'), # we set our custom orbits_name to a related field value
orbits_type=F('orbits__celestial_type'), # our lambda needs this data.
is_moon=Lambda(is_moon)) # lambda over result
self.assertEqual(len(entities), len(self.celestials))
for idx, entity in enumerate(entities):
dbdata = self.celestials[idx]
model = Celestial.objects.filter(id=dbdata.id).first()
self.assertEqual(entity.name, dbdata.name)
self.assertIsNone(entity.id)
self.assertEqual(entity.is_moon, model.is_moon)
def test_callbacks(self):
side_effect = lambda x:x
callback_one = mock.Mock(side_effect=side_effect)
callback_two = mock.Mock(side_effect=side_effect)
entities = Celestial.objects.all().records(Entity, callback_one=Callback(callback_one), callback_two=Callback(callback_two))
self.assertEqual(len(entities), len(self.celestials))
self.assertEqual(callback_one.call_count, len(self.celestials))
self.assertEqual(callback_two.call_count, len(self.celestials))
def test_double_value_technique(self):
"""
Records open a new sort of technique for late calling details from dataclasses.
imagine you have a dataclass called EntityIndex, which has only one field from the database: id.
however it has custom fields, where you store a lambda expression.
you could e.g. use it in a subquery, while still access the data.
however this will hit the database twice if you evaluate the iterator yourself, as the lambda is not lazy.
"""
planet_queryset = Celestial.objects.filter(orbits__name='Sol', celestial_type__lte=4)
@dataclass
class DetailedEntity:
id: int
name: str
@dataclass
class IndexEntity:
id: int
detail: DetailedEntity
def get_details_exec(data):
return Celestial.objects.filter(pk=data.get('id')).records(DetailedEntity).first()
get_details = mock.Mock(side_effect=get_details_exec)
# retrieves data per key only.
my_planets = planet_queryset.records(IndexEntity, detail=Lambda(get_details))
my_moons = Celestial.objects.filter(orbits__in=my_planets).records(IndexEntity, detail=Lambda(get_details)) # legal
# django does not consume the iterator internally for subqueries:
self.assertEqual(get_details.call_count, 0)
# consume it ourselves...
for planet in my_planets:
self.assertIsNotNone(planet.detail.name)
self.assertEqual(get_details.call_count, len(self.planets))
# but...
self.assertEqual(len(my_moons), len(self.moons))
self.assertEqual(get_details.call_count, len(self.planets) + len(self.moons))

122
blackmesa/settings.py Normal file
View File

@ -0,0 +1,122 @@
"""
Django settings for blackmesa project.
Generated by 'django-admin startproject' using Django 2.2.17.
For more information on this file, see
https://docs.djangoproject.com/en/2.2/topics/settings/
For the full list of settings and their values, see
https://docs.djangoproject.com/en/2.2/ref/settings/
"""
import os
# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/2.2/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = 'jwxo^d-t$id!khv&lobo7m_q+vzsgrm4+1y1%2s*54b-k(zl4='
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True
ALLOWED_HOSTS = []
# Application definition
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'blackmesa.core',
'blackmesa.records.tests.celestials',
]
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
ROOT_URLCONF = 'blackmesa.urls'
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
},
},
]
WSGI_APPLICATION = 'blackmesa.wsgi.application'
# Database
# https://docs.djangoproject.com/en/2.2/ref/settings/#databases
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': os.path.join(BASE_DIR, 'db.sqlite3'),
}
}
# Password validation
# https://docs.djangoproject.com/en/2.2/ref/settings/#auth-password-validators
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
]
# Internationalization
# https://docs.djangoproject.com/en/2.2/topics/i18n/
LANGUAGE_CODE = 'en-us'
TIME_ZONE = 'UTC'
USE_I18N = True
USE_L10N = True
USE_TZ = True
# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/2.2/howto/static-files/
STATIC_URL = '/static/'

21
blackmesa/urls.py Normal file
View File

@ -0,0 +1,21 @@
"""blackmesa URL Configuration
The `urlpatterns` list routes URLs to views. For more information please see:
https://docs.djangoproject.com/en/2.2/topics/http/urls/
Examples:
Function views
1. Add an import: from my_app import views
2. Add a URL to urlpatterns: path('', views.home, name='home')
Class-based views
1. Add an import: from other_app.views import Home
2. Add a URL to urlpatterns: path('', Home.as_view(), name='home')
Including another URLconf
1. Import the include() function: from django.urls import include, path
2. Add a URL to urlpatterns: path('blog/', include('blog.urls'))
"""
from django.contrib import admin
from django.urls import path
urlpatterns = [
path('admin/', admin.site.urls),
]

16
blackmesa/wsgi.py Normal file
View File

@ -0,0 +1,16 @@
"""
WSGI config for blackmesa project.
It exposes the WSGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/2.2/howto/deployment/wsgi/
"""
import os
from django.core.wsgi import get_wsgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'blackmesa.settings')
application = get_wsgi_application()

22
manage.py Executable file
View File

@ -0,0 +1,22 @@
#!/usr/bin/env python
"""Django's command-line utility for administrative tasks."""
import os
import sys
def main():
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'blackmesa.settings')
os.environ['DJANGO_SETTINGS_MODULE'] = 'blackmesa.settings'
try:
from django.core.management import execute_from_command_line
except ImportError as exc:
raise ImportError(
"Couldn't import Django. Are you sure it's installed and "
"available on your PYTHONPATH environment variable? Did you "
"forget to activate a virtual environment?"
) from exc
execute_from_command_line(sys.argv)
if __name__ == '__main__':
main()