Python源码示例:django.apps.apps.get_model()
示例1
def actions_have_consistent_hashes(app_configs, **kwargs):
errors = []
try:
Action = apps.get_model("recipes", "Action")
actions = list(Action.objects.filter(implementation__isnull=False))
except (ProgrammingError, OperationalError, ImproperlyConfigured) as e:
errors.append(Info(f"Could not retrieve actions: {e}", id=INFO_COULD_NOT_RETRIEVE_ACTIONS))
else:
for action in actions:
if action.compute_implementation_hash() != action.implementation_hash:
msg = "Action '{action}' (id={action.id}) has a mismatched hash".format(
action=action
)
errors.append(Error(msg, id=ERROR_MISMATCHED_ACTION_HASH))
return errors
示例2
def test_phone_registration_sends_message(client, mocker, backend):
url = reverse("phone-register")
phone_number = PHONE_NUMBER
data = {"phone_number": phone_number}
twilio_api = mocker.patch(
"phone_verify.services.PhoneVerificationService.send_verification"
)
response = client.post(url, data)
assert response.status_code == 200
assert twilio_api.called
assert "session_token" in response.data
SMSVerification = apps.get_model("phone_verify", "SMSVerification")
assert SMSVerification.objects.get(
session_token=response.data["session_token"], phone_number=phone_number
)
示例3
def get_usersettings_model():
"""
Returns the ``UserSettings`` model that is active in this project.
"""
try:
from django.apps import apps
get_model = apps.get_model
except ImportError:
from django.db.models.loading import get_model
try:
app_label, model_name = settings.USERSETTINGS_MODEL.split('.')
except ValueError:
raise ImproperlyConfigured('USERSETTINGS_MODEL must be of the '
'form "app_label.model_name"')
usersettings_model = get_model(app_label, model_name)
if usersettings_model is None:
raise ImproperlyConfigured('USERSETTINGS_MODEL refers to model "%s" that has '
'not been installed' % settings.USERSETTINGS_MODEL)
return usersettings_model
示例4
def handle(self, *args, **options):
app_name = options['app']
model_name = options['model']
field_name = options['field']
model = apps.get_model(app_name, model_name)
fields = model._meta.get_fields()
matching_fields = [f for f in fields if isinstance(f, models.FileField) and f.name == field_name]
field = matching_fields[0]
storage = field.storage
for o in model.objects.all():
# threads aren't usually interrupted: https://stackoverflow.com/a/842567/6871666
t = Thread(target=move_file, args=(model, storage, field_name, field, o))
t.start()
t.join()
示例5
def context(self, context):
btns = []
for b in self.q_btns:
btn = {}
if 'model' in b:
model = self.get_model(b['model'])
if not self.user.has_perm("%s.view_%s" % (model._meta.app_label, model._meta.model_name)):
continue
btn['url'] = reverse("%s:%s_%s_%s" % (self.admin_site.app_name, model._meta.app_label,
model._meta.model_name, b.get('view', 'changelist')))
btn['title'] = model._meta.verbose_name
btn['icon'] = self.dashboard.get_model_icon(model)
else:
try:
btn['url'] = reverse(b['url'])
except NoReverseMatch:
btn['url'] = b['url']
if 'title' in b:
btn['title'] = b['title']
if 'icon' in b:
btn['icon'] = b['icon']
btns.append(btn)
context.update({'btns': btns})
示例6
def export(dirname):
"""
Take all the annotated images, copy them to the specified directory,
and write out annotated.json with the annotation for each image.
"""
Image = apps.get_model('train', 'Image')
annotated = Image.objects.filter(annotation__isnull=False)
data = []
for i in annotated:
base = os.path.basename(i.path)
# copy image to directory
shutil.copy(i.path, os.path.join(dirname, base))
# add bounding boxes to JSON
data.append({
'image_name': base,
'image_annotation': i.annotation
})
with open(os.path.join(dirname, 'annotated.json'), 'w') as f:
json.dump(data, f)
return annotated.count()
示例7
def _load_field(app_label, model_name, field_name):
return apps.get_model(app_label, model_name)._meta.get_field(field_name)
# A guide to Field parameters:
#
# * name: The name of the field specified in the model.
# * attname: The attribute to use on the model object. This is the same as
# "name", except in the case of ForeignKeys, where "_id" is
# appended.
# * db_column: The db_column specified in the model (or None).
# * column: The database column for this field. This is the same as
# "attname", except if db_column is specified.
#
# Code that introspects values, or does other dynamic things, should use
# attname. For example, this gets the primary key value of object "obj":
#
# getattr(obj, opts.pk.attname)
示例8
def handle(self, **options):
warnings.warn("The syncdb command will be removed in Django 1.9", RemovedInDjango19Warning)
call_command("migrate", **options)
try:
apps.get_model('auth', 'Permission')
except LookupError:
return
UserModel = get_user_model()
if not UserModel._default_manager.exists() and options.get('interactive'):
msg = ("\nYou have installed Django's auth system, and "
"don't have any superusers defined.\nWould you like to create one "
"now? (yes/no): ")
confirm = input(msg)
while 1:
if confirm not in ('yes', 'no'):
confirm = input('Please enter either "yes" or "no": ')
continue
if confirm == 'yes':
call_command("createsuperuser", interactive=True, database=options['database'])
break
示例9
def _get_model_from_node(self, node, attr):
"""
Helper to look up a model from a <object model=...> or a <field
rel=... to=...> node.
"""
model_identifier = node.getAttribute(attr)
if not model_identifier:
raise base.DeserializationError(
"<%s> node is missing the required '%s' attribute"
% (node.nodeName, attr))
try:
return apps.get_model(model_identifier)
except (LookupError, TypeError):
raise base.DeserializationError(
"<%s> node has invalid model identifier: '%s'"
% (node.nodeName, model_identifier))
示例10
def __init__(self, **kwargs):
self.hashid_salt = kwargs.pop('salt', settings.HASHID_FIELD_SALT)
self.hashid_min_length = kwargs.pop('min_length', 7)
self.hashid_alphabet = kwargs.pop('alphabet', Hashids.ALPHABET)
source_field = kwargs.pop('source_field', None)
if source_field:
from hashid_field import HashidField, HashidAutoField
if isinstance(source_field, str):
try:
app_label, model_name, field_name = source_field.split(".")
except ValueError:
raise ValueError(self.usage_text)
model = apps.get_model(app_label, model_name)
source_field = model._meta.get_field(field_name)
elif not isinstance(source_field, (HashidField, HashidAutoField)):
raise TypeError(self.usage_text)
self.hashid_salt, self.hashid_min_length, self.hashid_alphabet = \
source_field.salt, source_field.min_length, source_field.alphabet
self._hashids = Hashids(salt=self.hashid_salt, min_length=self.hashid_min_length, alphabet=self.hashid_alphabet)
super().__init__(**kwargs)
示例11
def __init__(self, model_class, allow_unsaved=False, *args, **kwargs):
super(ModelField, self).__init__(*args, **kwargs)
try:
model_class._meta.model_name
except AttributeError:
assert isinstance(model_class, str), self.error_model_class % {
'cls_name': self.__class__.__name__,
'model_class': model_class
}
self.model_class = model_class
if isinstance(model_class, str):
label = model_class.split('.')
app_label = ".".join(label[:-1])
model_name = label[-1]
self.model_class = apps.get_model(app_label, model_name)
self.allow_unsaved = allow_unsaved
示例12
def convert_all_videos(app_label, model_name, object_pk):
"""
Automatically converts all videos of a given instance.
"""
# get instance
Model = apps.get_model(app_label=app_label, model_name=model_name)
instance = Model.objects.get(pk=object_pk)
# search for `VideoFields`
fields = instance._meta.fields
for field in fields:
if isinstance(field, VideoField):
if not getattr(instance, field.name):
# ignore empty fields
continue
# trigger conversion
fieldfile = getattr(instance, field.name)
convert_video(fieldfile)
示例13
def populate_labelling_msg_fields(apps, schema_editor):
Labelling = apps.get_model("msgs", "Labelling")
Message = apps.get_model("msgs", "Message")
max_id = 0
num_updated = 0
while True:
id_batch = list(
Labelling.objects.filter(id__gt=max_id, message_created_on=None)
.values_list("id", flat=True)
.order_by("id")[:BATCH_SIZE]
)
if not id_batch:
break
Labelling.objects.filter(id__in=id_batch).update(
message_is_flagged=Subquery(Message.objects.filter(id=OuterRef("message_id")).values("is_flagged")[:1]),
message_is_archived=Subquery(Message.objects.filter(id=OuterRef("message_id")).values("is_archived")[:1]),
message_created_on=Subquery(Message.objects.filter(id=OuterRef("message_id")).values("created_on")[:1]),
)
max_id = id_batch[-1]
num_updated += len(id_batch)
print(f" > Updated {num_updated} instances of labelling")
示例14
def _check_swappable(cls):
"""Check if the swapped model exists."""
errors = []
if cls._meta.swapped:
try:
apps.get_model(cls._meta.swapped)
except ValueError:
errors.append(
checks.Error(
"'%s' is not of the form 'app_label.app_name'." % cls._meta.swappable,
id='models.E001',
)
)
except LookupError:
app_label, model_name = cls._meta.swapped.split('.')
errors.append(
checks.Error(
"'%s' references '%s.%s', which has not been "
"installed, or is abstract." % (
cls._meta.swappable, app_label, model_name
),
id='models.E002',
)
)
return errors
示例15
def _load_field(app_label, model_name, field_name):
return apps.get_model(app_label, model_name)._meta.get_field(field_name)
# A guide to Field parameters:
#
# * name: The name of the field specified in the model.
# * attname: The attribute to use on the model object. This is the same as
# "name", except in the case of ForeignKeys, where "_id" is
# appended.
# * db_column: The db_column specified in the model (or None).
# * column: The database column for this field. This is the same as
# "attname", except if db_column is specified.
#
# Code that introspects values, or does other dynamic things, should use
# attname. For example, this gets the primary key value of object "obj":
#
# getattr(obj, opts.pk.attname)
示例16
def _get_sitemap_full_url(sitemap_url):
if not django_apps.is_installed('django.contrib.sites'):
raise ImproperlyConfigured("ping_google requires django.contrib.sites, which isn't installed.")
if sitemap_url is None:
try:
# First, try to get the "index" sitemap URL.
sitemap_url = reverse('django.contrib.sitemaps.views.index')
except NoReverseMatch:
try:
# Next, try for the "global" sitemap URL.
sitemap_url = reverse('django.contrib.sitemaps.views.sitemap')
except NoReverseMatch:
pass
if sitemap_url is None:
raise SitemapNotFound("You didn't provide a sitemap_url, and the sitemap URL couldn't be auto-detected.")
Site = django_apps.get_model('sites.Site')
current_site = Site.objects.get_current()
return 'http://%s%s' % (current_site.domain, sitemap_url)
示例17
def get_model(model):
"""
Given a model name as ``app_label.ModelName``, returns the Django model.
"""
try:
if isinstance(model, str):
app_label, model_name = model.split('.', 1)
m = loading.get_model(app_label, model_name)
if not m: # pragma: no cover
raise LookupError() # Django < 1.7 just returns None
return m
elif issubclass(model, models.Model):
return model
except (LookupError, ValueError):
pass
raise ValueError(model)
示例18
def define_relationship(name, from_model, to_model):
if name in _relationship_registry:
raise KeyError(name)
_from_ctype = from_model
_to_ctype = to_model
if isinstance(_from_ctype, str):
_from_ctype = get_model(_from_ctype)
if isinstance(_to_ctype, str):
_to_ctype = get_model(_to_ctype)
if not isinstance(_from_ctype, ContentType):
_from_ctype = ContentType.objects.get_for_model(_from_ctype)
if not isinstance(_to_ctype, ContentType):
_to_ctype = ContentType.objects.get_for_model(_to_ctype)
relationship = Relationship(name=name,
from_content_type=_from_ctype,
to_content_type=_to_ctype)
_relationship_registry[name] = relationship
return relationship
示例19
def _add_backend(self, backend):
""" Builds a subclass model for the given backend """
md_type = backend.verbose_name
base = backend().get_model(self)
# TODO: Rename this field
new_md_attrs = {'_metadata': self.metadata, '__module__': __name__}
new_md_meta = {}
new_md_meta['verbose_name'] = '%s (%s)' % (self.verbose_name, md_type)
new_md_meta['verbose_name_plural'] = '%s (%s)' % (self.verbose_name_plural, md_type)
new_md_meta['unique_together'] = base._meta.unique_together
new_md_attrs['Meta'] = type("Meta", (), new_md_meta)
new_md_attrs['_metadata_type'] = backend.name
if six.PY2:
md_type = str(md_type)
model = type("%s%s" % (self.name, "".join(md_type.split())), (base, self.MetadataBaseModel), new_md_attrs.copy())
self.models[backend.name] = model
# This is a little dangerous, but because we set __module__ to __name__, the model needs tobe accessible here
globals()[model.__name__] = model
示例20
def _set_seo_models(self, value):
""" Gets the actual models to be used. """
seo_models = []
for model_name in value:
if "." in model_name:
app_label, model_name = model_name.split(".", 1)
model = apps.get_model(app_label, model_name)
if model:
seo_models.append(model)
else:
app = apps.get_app_config(model_name)
if app:
seo_models.extend(app.get_models())
# This fix the trouble on Django 1.9 when django-seo conflicts with session model
seo_models = [
model for model in seo_models if model._meta.model_name != 'session' and model._meta.app_label != 'sessions'
]
self.seo_models = seo_models
示例21
def handle_orm_call(self, request):
method_name = request.msg.data['method'].split('__')[1]
app_label, _ = method_name.split('.')
action, model_name = _.split('_')
model = apps.get_model('{}.{}'.format(app_label, model_name))
if action == 'view':
return await self._model_view(request, model)
elif action == 'add':
return await self._model_add(request, model)
elif action == 'change':
return await self._model_change(request, model)
elif action == 'delete':
return await self._model_delete(request, model)
# login / logout
示例22
def change_publisher(self, new_publisher):
"""
Changing the publisher of a Journal is a heavy task:
we need to update all the OaiRecords associated with
this Journal to map to the new publisher
"""
oa_status_changed = self.publisher.oa_status != new_publisher.oa_status
self.publisher = new_publisher
self.save()
self.oairecord_set.all().update(publisher = new_publisher)
if oa_status_changed:
papers = get_model('papers', 'Paper').objects.filter(
oairecord__journal=self.pk)
for p in papers:
p.update_availability()
p.invalidate_cache()
示例23
def prep(cls, parent=None):
"""
Instantiates new specific visibility settings according to the defaults
specified in the proxy model. Immediately updates the database.
The 'class-only method' decorator protects this from being hazardously
called in templates, with unpleasant consequences.
"""
try:
container = apps.get_model(cls._CONTAINER_MODEL)
except AttributeError:
raise TypeError("Only specific visibility settings may be created") from None
if parent:
assert hasattr(parent, '_state'), (
"{!r} is not a Model instance!".format(parent))
assert isinstance(parent, container), (
"{!r} is not a {}.{}!".format(parent, container.__module__, container.__name__))
initial = {'{}{}'.format(cls._PREFIX, field): value for field, value in cls.defaults.items()}
return cls.objects.create(
model_id=getattr(parent, 'pk', None),
model_type=cls.type(),
content_type=ContentType.objects.get_for_model(container),
**initial
)
示例24
def get_global_model(model_path):
try:
app_label, model_name = model_path.rsplit('.', 1)
except ValueError:
raise template.TemplateSyntaxError(
"Templatetag template_tag_name requires the following format: 'app_label.ModelName'. "
"Received '%s'." % model_path
)
model_class = apps.get_model(app_label, model_name)
if not model_class:
raise template.TemplateSyntaxError(
"Could not get the model name '%(model)s' from the application "
"named '%(app)s'" % {
'model': model_name,
'app': app_label,
}
)
return model_class.get_global()
示例25
def render(self, name, value, attrs=None, renderer=None, choices=()):
if value is None:
value = []
if not isinstance(value, (list, tuple)):
# This is a ForeignKey field. We must allow only one item.
value = [value]
values = get_model(self.model).objects.filter(pk__in=value)
try:
final_attrs = self.build_attrs(attrs, name=name)
except TypeError as e:
# Fallback for django 1.10+
final_attrs = self.build_attrs(attrs, extra_attrs={'name': name})
return render_to_string('searchableselect/select.html', dict(
field_id=final_attrs['id'],
field_name=final_attrs['name'],
values=values,
model=self.model,
search_field=self.search_field,
limit=self.limit,
many=self.many
))
示例26
def members(self, folder, **kwargs):
direct = kwargs.get("direct", True)
user = kwargs.get("user")
Document = apps.get_model("documents", "Document")
folders = self.filter(parent=folder)
documents = Document.objects.filter(folder=folder)
if user:
folders = folders.for_user(user)
documents = documents.for_user(user)
M = sorted(itertools.chain(folders, documents), key=operator.attrgetter("name"))
if direct:
return M
for child in folders:
M.extend(self.members(child, **kwargs))
return M
示例27
def recipe_signatures_are_correct(app_configs, **kwargs):
errors = []
try:
Recipe = apps.get_model("recipes", "Recipe")
# pre-fetch signatures, to avoid race condition with deleted signatures
signed_recipes = list(Recipe.objects.exclude(signature=None).select_related("signature"))
except (ProgrammingError, OperationalError, ImproperlyConfigured) as e:
errors.append(Info(f"Could not retrieve recipes: {e}", id=INFO_COULD_NOT_RETRIEVE_RECIPES))
return errors
try:
for recipe in signed_recipes:
data = recipe.canonical_json()
signature = recipe.signature.signature
pubkey = recipe.signature.public_key
x5u = recipe.signature.x5u
try:
if x5u:
signing.verify_signature_x5u(data, signature, x5u)
else:
signing.verify_signature_pubkey(data, signature, pubkey)
except signing.BadSignature as e:
msg = "Recipe '{recipe}' (id={recipe.id}) has a bad signature: {detail}".format(
recipe=recipe, detail=e.detail
)
errors.append(Error(msg, id=ERROR_INVALID_RECIPE_SIGNATURE))
except requests.RequestException as exc:
msg = (
f"The signature for recipe with ID {recipe.id} could not be be verified due to "
f"network error when requesting the url {x5u!r}. {exc}"
)
errors.append(Error(msg, id=ERROR_COULD_NOT_VERIFY_CERTIFICATE))
except (ProgrammingError, OperationalError, ImproperlyConfigured) as e:
errors.append(
Warning(f"Could not check signatures: {e}", id=WARNING_COULD_NOT_CHECK_SIGNATURES)
)
return errors
示例28
def action_signatures_are_correct(app_configs, **kwargs):
errors = []
try:
Action = apps.get_model("recipes", "Action")
# pre-fetch signatures, to avoid race condition with deleted signatures
signed_actions = list(Action.objects.exclude(signature=None).select_related("signature"))
except (ProgrammingError, OperationalError, ImproperlyConfigured) as e:
msg = f"Could not retrieve actions: f{e}"
errors.append(Info(msg, id=INFO_COULD_NOT_RETRIEVE_ACTIONS))
return errors
try:
for action in signed_actions:
data = action.canonical_json()
signature = action.signature.signature
pubkey = action.signature.public_key
x5u = action.signature.x5u
try:
if x5u:
signing.verify_signature_x5u(data, signature, x5u)
else:
signing.verify_signature_pubkey(data, signature, pubkey)
except signing.BadSignature as e:
msg = f"Action '{action}' (id={action.id}) has a bad signature: {e.detail}"
errors.append(Error(msg, id=ERROR_INVALID_ACTION_SIGNATURE))
except requests.RequestException as exc:
msg = (
f"The signature for action with ID {action.id} could not be be verified due to "
f"network error when requesting the url {x5u!r}. {exc}"
)
errors.append(Error(msg, id=ERROR_COULD_NOT_VERIFY_CERTIFICATE))
except (ProgrammingError, OperationalError, ImproperlyConfigured) as e:
errors.append(
Warning(f"Could not check signatures: {e}", id=WARNING_COULD_NOT_CHECK_SIGNATURES)
)
return errors
示例29
def signatures_use_good_certificates(app_configs, **kwargs):
errors = []
expire_early = None
if settings.CERTIFICATES_EXPIRE_EARLY_DAYS:
expire_early = timedelta(days=settings.CERTIFICATES_EXPIRE_EARLY_DAYS)
try:
Recipe = apps.get_model("recipes", "Recipe")
Action = apps.get_model("recipes", "Action")
x5u_urls = defaultdict(list)
for recipe in Recipe.objects.exclude(signature__x5u=None).select_related("signature"):
x5u_urls[recipe.signature.x5u].append(str(recipe))
for action in Action.objects.exclude(signature__x5u=None).select_related("signature"):
x5u_urls[action.signature.x5u].append(str(action))
except (ProgrammingError, OperationalError, ImproperlyConfigured) as e:
msg = f"Could not retrieve signatures: {e}"
errors.append(Info(msg, id=INFO_COULD_NOT_RETRIEVE_SIGNATURES))
else:
for url in x5u_urls:
try:
signing.verify_x5u(url, expire_early)
except signing.BadCertificate as exc:
bad_object_names = x5u_urls[url]
msg = (
f"{len(bad_object_names)} objects are signed with a bad cert: "
f"{bad_object_names}. {exc.detail}. Certificate url is {url}. "
)
errors.append(Error(msg, id=ERROR_BAD_SIGNING_CERTIFICATE))
except requests.RequestException as exc:
bad_object_names = x5u_urls[url]
msg = (
f"The certificate at {url} could not be fetched due to a network error to "
f"verify. {len(bad_object_names)} objects are signed with this certificate: "
f"{bad_object_names}. {exc}"
)
errors.append(Error(msg, id=ERROR_COULD_NOT_VERIFY_CERTIFICATE))
return errors
示例30
def create_verification(**kwargs):
SMSVerification = apps.get_model("phone_verify", "SMSVerification")
verification = G(SMSVerification, **kwargs)
return verification