Python源码示例:django.apps.apps.get_app_config()
示例1
def make_state_items(self):
state_items = [
{
'model_name': app._meta.model_name,
'verbose_name': app._meta.verbose_name,
'icon': app._meta.icon,
'icon_color': 'bg-' + app._meta.icon_color,
'level': app._meta.level,
'metric': app._meta.metric,
'count': app.objects.filter(
onidc=self.request.user.onidc).filter(
**app._meta.default_filters).count(),
} for app in apps.get_app_config('idcops').get_models() if getattr(
app._meta,
'dashboard')]
return state_items
示例2
def construct_menus():
model_names = []
for app in apps.get_app_config('idcops').get_models():
opts = app._meta
if not getattr(opts, 'hidden', False):
meta = {
'model_name': opts.model_name,
'verbose_name': opts.verbose_name,
'icon': opts.icon,
'icon_color': 'text-' + opts.icon_color,
'level': opts.level,
}
model_names.append(meta)
counts = list(set([i.get('level') for i in model_names]))
new_menus = []
for i in counts:
new_menus.append(
[c for c in model_names if c.get('level') == i]
)
return new_menus
示例3
def initial_user_config(instance, created, **kwargs):
if created:
models = apps.get_app_config('idcops').get_models()
exclude = ['onidc', 'deleted', 'mark']
configures = []
for model in models:
fds = [f for f in fields_for_model(model) if f not in exclude]
_fields = getattr(model._meta, 'list_display', fds)
fields = _fields if isinstance(_fields, list) else fds
content = {'list_only_date': 1, 'list_display': fields}
config = dict(
onidc=instance.onidc,
creator=instance,
mark='list',
content_type=get_content_type_for_model(model),
content=json.dumps(content),
)
configures.append(Configure(**config))
Configure.objects.bulk_create(configures)
示例4
def handle(self, *args, **options):
ethereum_client = EthereumClientProvider()
app_name = apps.get_app_config('relay').verbose_name
network_name = ethereum_client.get_network().name.capitalize()
startup_message = f'Starting {app_name} version {__version__} on {network_name}'
self.stdout.write(self.style.SUCCESS(startup_message))
if settings.SLACK_API_WEBHOOK:
try:
r = requests.post(settings.SLACK_API_WEBHOOK, json={'text': startup_message})
if r.ok:
self.stdout.write(self.style.SUCCESS(f'Slack configured, "{startup_message}" sent'))
else:
raise RequestException()
except RequestException:
self.stdout.write(self.style.ERROR('Cannot send slack notification'))
else:
self.stdout.write(self.style.SUCCESS('Slack not configured, ignoring'))
示例5
def setUp(self):
class Meta:
permissions = (
("copy_apptestmodel3", "Can copy apptestmodel3"),
)
attrs_1 = {
'__module__': 'tests.test_app.models',
}
attrs_2 = {
'__module__': 'tests.test_app.models',
'Meta': Meta,
}
self.appconfig = apps.get_app_config('test_app')
self.model1 = type(str('AppTestModel1'), (models.Model, ),
attrs_1.copy())
self.model2 = type(str('AppTestModel2'), (models.Model, ),
attrs_1.copy())
self.model3 = type(str('AppTestModel3'), (models.Model, ),
attrs_2.copy())
示例6
def test_signal_receiver_registered_in_test(mocker, signal):
"""Ensure migration signal receivers registered in tests are called."""
signal_receiver_mock = mocker.MagicMock()
main_app_config = apps.get_app_config('main_app')
signal.connect(
signal_receiver_mock,
sender=main_app_config,
dispatch_uid=DISPATCH_UID,
)
verbosity = 0
interactive = False
# call `migrate` management command to trigger ``pre_migrate`` and
# ``post_migrate`` signals
call_command('migrate', verbosity=verbosity, interactive=interactive)
signal_receiver_mock.assert_called_once_with(
sender=main_app_config,
app_config=main_app_config,
apps=mocker.ANY, # we don't have any reference to this object
using=DEFAULT_DB_ALIAS,
verbosity=verbosity,
interactive=interactive,
plan=mocker.ANY, # not important for this test
signal=signal,
)
示例7
def prepare(self):
"""Connect testing ``pre_migrate`` and ``post_migrate`` receivers."""
self.pre_migrate_receiver_mock = mock.MagicMock()
self.post_migrate_receiver_mock = mock.MagicMock()
# ``old_apps`` is not real ``ProjectState`` instance, so we cannot use
# it to get "original" main_app ``AppConfig`` instance needed to
# connect signal receiver, that's the reason we are using
# ``apps`` imported directly from ``django.apps``
self.main_app_config = apps.get_app_config('main_app')
pre_migrate.connect(
self.pre_migrate_receiver_mock,
sender=self.main_app_config,
)
post_migrate.connect(
self.post_migrate_receiver_mock,
sender=self.main_app_config,
)
示例8
def app_index(self, request, app_label, extra_context=None):
app_dict = self._build_app_dict(request, app_label)
if not app_dict:
raise Http404('The requested admin page does not exist.')
# Sort the models alphabetically within each app.
app_dict['models'].sort(key=lambda x: x['name'])
app_name = apps.get_app_config(app_label).verbose_name
context = dict(
self.each_context(request),
title=_('%(app)s administration') % {'app': app_name},
app_list=[app_dict],
app_label=app_label,
)
context.update(extra_context or {})
request.current_app = self.name
return TemplateResponse(request, self.app_index_template or [
'admin/%s/app_index.html' % app_label,
'admin/app_index.html'
], context)
# This global object represents the default admin site, for the common case.
# You can instantiate AdminSite in your own code to create a custom admin site.
示例9
def get_view_names(seo_views):
output = []
for name in seo_views:
try:
app = apps.get_app_config(name).models_module
except:
output.append(name)
else:
app_name = app.__name__.split(".")[:-1]
app_name.append("urls")
try:
urls = importlib.import_module(".".join(app_name)).urlpatterns
except (ImportError, AttributeError):
output.append(name)
else:
for url in urls:
if getattr(url, 'name', None):
output.append(url.name)
return output
示例10
def _set_seo_models(self, value):
""" Gets the actual models to be used. """
seo_models = []
for model_name in value:
if "." in model_name:
app_label, model_name = model_name.split(".", 1)
model = apps.get_model(app_label, model_name)
if model:
seo_models.append(model)
else:
app = apps.get_app_config(model_name)
if app:
seo_models.extend(app.get_models())
# This fix the trouble on Django 1.9 when django-seo conflicts with session model
seo_models = [
model for model in seo_models if model._meta.model_name != 'session' and model._meta.app_label != 'sessions'
]
self.seo_models = seo_models
示例11
def test_do_drop_all_analytics_tables(self) -> None:
user = self.create_user()
stream = self.create_stream_with_recipient()[0]
count_args = {'property': 'test', 'end_time': self.TIME_ZERO, 'value': 10}
UserCount.objects.create(user=user, realm=user.realm, **count_args)
StreamCount.objects.create(stream=stream, realm=stream.realm, **count_args)
RealmCount.objects.create(realm=user.realm, **count_args)
InstallationCount.objects.create(**count_args)
FillState.objects.create(property='test', end_time=self.TIME_ZERO, state=FillState.DONE)
analytics = apps.get_app_config('analytics')
for table in list(analytics.models.values()):
self.assertTrue(table.objects.exists())
do_drop_all_analytics_tables()
for table in list(analytics.models.values()):
self.assertFalse(table.objects.exists())
示例12
def test_do_drop_single_stat(self) -> None:
user = self.create_user()
stream = self.create_stream_with_recipient()[0]
count_args_to_delete = {'property': 'to_delete', 'end_time': self.TIME_ZERO, 'value': 10}
count_args_to_save = {'property': 'to_save', 'end_time': self.TIME_ZERO, 'value': 10}
for count_args in [count_args_to_delete, count_args_to_save]:
UserCount.objects.create(user=user, realm=user.realm, **count_args)
StreamCount.objects.create(stream=stream, realm=stream.realm, **count_args)
RealmCount.objects.create(realm=user.realm, **count_args)
InstallationCount.objects.create(**count_args)
FillState.objects.create(property='to_delete', end_time=self.TIME_ZERO, state=FillState.DONE)
FillState.objects.create(property='to_save', end_time=self.TIME_ZERO, state=FillState.DONE)
analytics = apps.get_app_config('analytics')
for table in list(analytics.models.values()):
self.assertTrue(table.objects.exists())
do_drop_single_stat('to_delete')
for table in list(analytics.models.values()):
self.assertFalse(table.objects.filter(property='to_delete').exists())
self.assertTrue(table.objects.filter(property='to_save').exists())
示例13
def app_index(self, request, app_label, extra_context=None):
app_name = apps.get_app_config(app_label).verbose_name
context = self.each_context(request)
app_list = context['app_list']
current_app_list = get_from_list(False, app_list, 'app_label', app_label)
context.update(dict(
title=_('%(app)s administration') % {'app': app_name},
# current_app_list=[app_dict],
current_app_list=[current_app_list],
app_label=app_label,
app_name=app_name,
))
context.update(extra_context or {})
request.current_app = self.name
return TemplateResponse(request, self.app_index_template or [
'admin/%s/app_index.html' % app_label,
'admin/app_index.html'
], context)
示例14
def app_index(self, request, app_label, extra_context=None):
app_dict = self._build_app_dict(request, app_label)
if not app_dict:
raise Http404('The requested admin page does not exist.')
# Sort the models alphabetically within each app.
app_dict['models'].sort(key=lambda x: x['name'])
app_name = apps.get_app_config(app_label).verbose_name
context = {
**self.each_context(request),
'title': _('%(app)s administration') % {'app': app_name},
'app_list': [app_dict],
'app_label': app_label,
**(extra_context or {}),
}
request.current_app = self.name
return TemplateResponse(request, self.app_index_template or [
'admin/%s/app_index.html' % app_label,
'admin/app_index.html'
], context)
示例15
def app_index(self, request, app_label, extra_context=None):
app_dict = self._build_app_dict(request, app_label)
if not app_dict:
raise Http404('The requested admin page does not exist.')
# Sort the models alphabetically within each app.
app_dict['models'].sort(key=lambda x: x['name'])
app_name = apps.get_app_config(app_label).verbose_name
context = dict(
self.each_context(request),
title=_('%(app)s administration') % {'app': app_name},
app_list=[app_dict],
app_label=app_label,
)
context.update(extra_context or {})
request.current_app = self.name
return TemplateResponse(request, self.app_index_template or [
'admin/%s/app_index.html' % app_label,
'admin/app_index.html'
], context)
# This global object represents the default admin site, for the common case.
# You can instantiate AdminSite in your own code to create a custom admin site.
示例16
def get_index_mapping(index: str) -> dict:
"""
Return the JSON mapping file for an index.
Mappings are stored as JSON files in the mappings subdirectory of this
app. They must be saved as {{index}}.json.
Args:
index: string, the name of the index to look for.
"""
# app_path = apps.get_app_config('elasticsearch_django').path
mappings_dir = get_setting("mappings_dir")
filename = "%s.json" % index
path = os.path.join(mappings_dir, filename)
with open(path, "r") as f:
return json.load(f)
示例17
def django_elasticapm_client(request):
client_config = getattr(request, "param", {})
client_config.setdefault("service_name", "app")
client_config.setdefault("secret_token", "secret")
client_config.setdefault("span_frames_min_duration", -1)
app = apps.get_app_config("elasticapm.contrib.django")
old_client = app.client
client = TempStoreClient(**client_config)
register_handlers(client)
instrument(client)
app.client = client
yield client
client.close()
app.client = old_client
if old_client:
register_handlers(old_client)
instrument(old_client)
示例18
def django_sending_elasticapm_client(request, validating_httpserver):
validating_httpserver.serve_content(code=202, content="", headers={"Location": "http://example.com/foo"})
client_config = getattr(request, "param", {})
client_config.setdefault("server_url", validating_httpserver.url)
client_config.setdefault("service_name", "app")
client_config.setdefault("secret_token", "secret")
client_config.setdefault("transport_class", "elasticapm.transport.http.Transport")
client_config.setdefault("span_frames_min_duration", -1)
app = apps.get_app_config("elasticapm.contrib.django")
old_client = app.client
client = DjangoClient(**client_config)
register_handlers(client)
instrument(client)
app.client = client
client.httpserver = validating_httpserver
yield client
client.close()
app.client = old_client
if old_client:
register_handlers(old_client)
instrument(old_client)
示例19
def app_index(self, request, app_label, extra_context=None):
app_dict = self._build_app_dict(request, app_label)
if not app_dict:
raise Http404('The requested admin page does not exist.')
# Sort the models alphabetically within each app.
app_dict['models'].sort(key=lambda x: x['name'])
app_name = apps.get_app_config(app_label).verbose_name
context = dict(self.each_context(request),
title=_('%(app)s administration') % {'app': app_name},
app_list=[app_dict],
app_label=app_label,
)
context.update(extra_context or {})
request.current_app = self.name
return TemplateResponse(request, self.app_index_template or [
'admin/%s/app_index.html' % app_label,
'admin/app_index.html'
], context)
# This global object represents the default admin site, for the common case.
# You can instantiate AdminSite in your own code to create a custom admin site.
示例20
def temporary_migration_module(self, app_label="migrations", module=None):
"""
Shamelessly copied from Django.
See django.tests.migrations.test_base.MigrationTestBase.temporary_migration_module
Allows testing management commands in a temporary migrations module.
Wrap all invocations to makemigrations and squashmigrations with this
context manager in order to avoid creating migration files in your
source tree inadvertently.
"""
with tempfile.TemporaryDirectory() as temp_dir:
target_dir = tempfile.mkdtemp(dir=temp_dir)
with open(os.path.join(target_dir, "__init__.py"), "w"):
pass
target_migrations_dir = os.path.join(target_dir, "migrations")
if module is None:
module = apps.get_app_config(app_label).name + ".migrations"
try:
source_migrations_dir = module_dir(import_module(module))
except (ImportError, ValueError):
pass
else:
shutil.copytree(source_migrations_dir, target_migrations_dir)
with extend_sys_path(temp_dir):
new_module = os.path.basename(target_dir) + ".migrations"
with self.settings(MIGRATION_MODULES={app_label: new_module}):
yield target_migrations_dir
示例21
def reset_db(self, cursor):
# flush is needed here to remove the admin user from the auth tables
management.call_command("flush", "--noinput")
# unapply all of the Arches migrations (the Arches "app" is labeled "models")
management.call_command("migrate", fake=True, app_label="models", migration_name="zero")
# get the table names for all Arches models and then drop these tables
arches_models = apps.get_app_config("models").get_models()
table_names = [i._meta.db_table for i in arches_models]
for t in table_names:
cursor.execute("DROP TABLE IF EXISTS {} CASCADE".format(t))
示例22
def process_request(self, request):
request.mdk_session = apps.get_app_config("datawire_mdk").mdk.join(
request.META.get("HTTP_X_MDK_CONTEXT"))
request.mdk_session.start_interaction()
示例23
def ask_initial(self, app_label):
"Should we create an initial migration for the app?"
# If it was specified on the command line, definitely true
if app_label in self.specified_apps:
return True
# Otherwise, we look to see if it has a migrations module
# without any Python files in it, apart from __init__.py.
# Apps from the new app template will have these; the python
# file check will ensure we skip South ones.
try:
app_config = apps.get_app_config(app_label)
except LookupError: # It's a fake app.
return self.defaults.get("ask_initial", False)
migrations_import_path = "%s.%s" % (app_config.name, MIGRATIONS_MODULE_NAME)
try:
migrations_module = importlib.import_module(migrations_import_path)
except ImportError:
return self.defaults.get("ask_initial", False)
else:
if hasattr(migrations_module, "__file__"):
filenames = os.listdir(os.path.dirname(migrations_module.__file__))
elif hasattr(migrations_module, "__path__"):
if len(migrations_module.__path__) > 1:
return False
filenames = os.listdir(list(migrations_module.__path__)[0])
return not any(x.endswith(".py") for x in filenames if x != "__init__.py")
示例24
def migrations_module(cls, app_label):
if app_label in settings.MIGRATION_MODULES:
return settings.MIGRATION_MODULES[app_label]
else:
app_package_name = apps.get_app_config(app_label).name
return '%s.%s' % (app_package_name, MIGRATIONS_MODULE_NAME)
示例25
def path(self):
migrations_package_name = MigrationLoader.migrations_module(self.migration.app_label)
# See if we can import the migrations module directly
try:
migrations_module = import_module(migrations_package_name)
# Python 3 fails when the migrations directory does not have a
# __init__.py file
if not hasattr(migrations_module, '__file__'):
raise ImportError
basedir = os.path.dirname(upath(migrations_module.__file__))
except ImportError:
app_config = apps.get_app_config(self.migration.app_label)
migrations_package_basename = migrations_package_name.split(".")[-1]
# Alright, see if it's a direct submodule of the app
if '%s.%s' % (app_config.name, migrations_package_basename) == migrations_package_name:
basedir = os.path.join(app_config.path, migrations_package_basename)
else:
# In case of using MIGRATION_MODULES setting and the custom
# package doesn't exist, create one.
package_dirs = migrations_package_name.split(".")
create_path = os.path.join(upath(sys.path[0]), *package_dirs)
if not os.path.isdir(create_path):
os.makedirs(create_path)
for i in range(1, len(package_dirs) + 1):
init_dir = os.path.join(upath(sys.path[0]), *package_dirs[:i])
init_path = os.path.join(init_dir, "__init__.py")
if not os.path.isfile(init_path):
open(init_path, "w").close()
return os.path.join(create_path, self.filename)
return os.path.join(basedir, self.filename)
示例26
def handle(self, *app_labels, **options):
include_deployment_checks = options['deploy']
if options.get('list_tags'):
self.stdout.write('\n'.join(sorted(registry.tags_available(include_deployment_checks))))
return
if app_labels:
app_configs = [apps.get_app_config(app_label) for app_label in app_labels]
else:
app_configs = None
tags = options.get('tags', None)
if tags:
try:
invalid_tag = next(
tag for tag in tags if not checks.tag_exists(tag, include_deployment_checks)
)
except StopIteration:
# no invalid tags
pass
else:
raise CommandError('There is no system check with the "%s" tag.' % invalid_tag)
self.check(
app_configs=app_configs,
tags=tags,
display_num_errors=True,
include_deployment_checks=include_deployment_checks,
)
示例27
def custom_sql_for_model(model, style, connection):
opts = model._meta
app_dirs = []
app_dir = apps.get_app_config(model._meta.app_label).path
app_dirs.append(os.path.normpath(os.path.join(app_dir, 'sql')))
# Deprecated location -- remove in Django 1.9
old_app_dir = os.path.normpath(os.path.join(app_dir, 'models/sql'))
if os.path.exists(old_app_dir):
warnings.warn("Custom SQL location '<app_label>/models/sql' is "
"deprecated, use '<app_label>/sql' instead.",
RemovedInDjango19Warning)
app_dirs.append(old_app_dir)
output = []
# Post-creation SQL should come before any initial SQL data is loaded.
# However, this should not be done for models that are unmanaged or
# for fields that are part of a parent model (via model inheritance).
if opts.managed:
post_sql_fields = [f for f in opts.local_fields if hasattr(f, 'post_create_sql')]
for f in post_sql_fields:
output.extend(f.post_create_sql(style, model._meta.db_table))
# Find custom SQL, if it's available.
backend_name = connection.settings_dict['ENGINE'].split('.')[-1]
sql_files = []
for app_dir in app_dirs:
sql_files.append(os.path.join(app_dir, "%s.%s.sql" % (opts.model_name, backend_name)))
sql_files.append(os.path.join(app_dir, "%s.sql" % opts.model_name))
for sql_file in sql_files:
if os.path.exists(sql_file):
with io.open(sql_file, encoding=settings.FILE_CHARSET) as fp:
output.extend(connection.ops.prepare_sql_script(fp.read(), _allow_fallback=True))
return output
示例28
def __init__(self, name, model, verbose_name=None, section=None):
self.name = name
if section is None:
section = apps.get_app_config(model._meta.app_label).verbose_name
self.section = section
if verbose_name is None:
verbose_name = name
self.verbose_name = verbose_name
示例29
def handle(self, *args, **options):
# We need to execute the post migration callback manually in order
# to append the view permission on the proxy model. Then the following
# script will create the appropriate content type and move the
# permissions under this. If we don't call the callback the script
# will create only the basic permissions (add, change, delete)
update_permissions(
apps.get_app_config('admin_view_permission'),
apps.get_app_config('admin_view_permission'),
verbosity=1,
interactive=True,
using='default',
)
for model in apps.get_models():
opts = model._meta
ctype, created = ContentType.objects.get_or_create(
app_label=opts.app_label,
model=opts.object_name.lower(),
)
for codename, name in get_all_permissions(opts, ctype):
perm, created = Permission.objects.get_or_create(
codename=codename,
content_type=ctype,
defaults={'name': name},
)
if created:
self.delete_parent_perms(perm)
self.stdout.write('Adding permission {}\n'.format(perm))
示例30
def handle(self, *args, **options):
cache_alias = options['cache_alias']
db_alias = options['db_alias']
verbosity = int(options['verbosity'])
labels = options['app_label[.model_name]']
models = []
for label in labels:
try:
models.extend(apps.get_app_config(label).get_models())
except LookupError:
app_label = '.'.join(label.split('.')[:-1])
model_name = label.split('.')[-1]
models.append(apps.get_model(app_label, model_name))
cache_str = '' if cache_alias is None else "on cache '%s'" % cache_alias
db_str = '' if db_alias is None else "for database '%s'" % db_alias
keys_str = 'keys for %s models' % len(models) if labels else 'all keys'
if verbosity > 0:
self.stdout.write(' '.join(filter(bool, ['Invalidating', keys_str,
cache_str, db_str]))
+ '...')
invalidate(*models, cache_alias=cache_alias, db_alias=db_alias)
if verbosity > 0:
self.stdout.write('Cache keys successfully invalidated.')