__all__ = [ 'get_cache' , 'cache' , 'DEFAULT_CACHE_ALIAS' , 'InvalidCacheBackendError' , 'CacheKeyWarning' , 'BaseCache' , ]
DEFAULT_CACHE_ALIAS = 'default'
warnings . warn ( "'get_cache' is deprecated in favor of 'caches'." , RemovedInDjango19Warning , stacklevel = 2 )
cache = _create_cache ( backend , ** kwargs )
signals . request_finished . connect ( cache . close )
except KeyError :
import_string ( backend )
location = kwargs . pop ( 'LOCATION' , '' )
else :
params = conf . copy ( )
params . update ( kwargs )
backend = params . pop ( 'BACKEND' )
location = params . pop ( 'LOCATION' , '' )
backend_cls = import_string ( backend )
return backend_cls ( location , params )
self . _caches = local ( )
return self . _caches . caches [ alias ]
except AttributeError :
self . _caches . caches = { }
except KeyError :
cache = _create_cache ( alias )
self . _caches . caches [ alias ] = cache
return getattr ( self . _caches , 'caches' , { } ) . values ( )
caches = CacheHandler ( )
return getattr ( caches [ DEFAULT_CACHE_ALIAS ] , name )
return delattr ( caches [ DEFAULT_CACHE_ALIAS ] , name )
cache = DefaultCacheProxy ( )
for cache in caches . all ( ) :
cache . close ( )
signals . request_finished . connect ( close_caches )
DEFAULT_TIMEOUT = object ( )
MEMCACHE_MAX_KEY_LENGTH = 250
if key_func is not None :
if callable ( key_func ) :
else :
return import_string ( key_func )
timeout = params . get ( 'timeout' , params . get ( 'TIMEOUT' , 300 ) )
if timeout is not None :
timeout = int ( timeout )
timeout = 300
options = params . get ( 'OPTIONS' , { } )
max_entries = params . get ( 'max_entries' , options . get ( 'MAX_ENTRIES' , 300 ) )
self . _max_entries = int ( max_entries )
self . _max_entries = 300
cull_frequency = params . get ( 'cull_frequency' , options . get ( 'CULL_FREQUENCY' , 3 ) )
self . _cull_frequency = int ( cull_frequency )
self . key_prefix = params . get ( 'KEY_PREFIX' , '' )
self . key_func = get_key_func ( params . get ( 'KEY_FUNCTION' , None ) )
if timeout == DEFAULT_TIMEOUT :
timeout = - 1
return None if timeout is None else time . time ( ) + timeout
d = { }
for k in keys :
if val is not None :
d [ k ] = val
if ord ( char ) < 33 or ord ( char ) == 127 :
self . app_label = 'django_cache'
self . model_name = 'cacheentry'
self . verbose_name = 'cache entry'
self . verbose_name_plural = 'cache entries'
self . object_name = 'CacheEntry'
_meta = Options ( table )
self . cache_model_class = CacheEntry
db = router . db_for_read ( self . cache_model_class )
table = connections [ db ] . ops . quote_name ( self . _table )
with connections [ db ] . cursor ( ) as cursor :
cursor . execute ( "SELECT cache_key, value, expires FROM %s " "WHERE cache_key = %%s" % table , [ key ] )
row = cursor . fetchone ( )
if row is None :
now = timezone . now ( )
expires = row [ 2 ]
expires = typecast_timestamp ( str ( expires ) )
if expires < now :
db = router . db_for_write ( self . cache_model_class )
with connections [ db ] . cursor ( ) as cursor :
value = connections [ db ] . ops . process_clob ( row [ 1 ] )
return pickle . loads ( base64 . b64decode ( force_bytes ( value ) ) )
timeout = self . get_backend_timeout ( timeout )
db = router . db_for_write ( self . cache_model_class )
table = connections [ db ] . ops . quote_name ( self . _table )
with connections [ db ] . cursor ( ) as cursor :
cursor . execute ( "SELECT COUNT(*) FROM %s" % table )
num = cursor . fetchone ( ) [ 0 ]
now = timezone . now ( )
now = now . replace ( microsecond = 0 )
if timeout is None :
exp = datetime . max
exp = datetime . utcfromtimestamp ( timeout )
else :
exp = datetime . fromtimestamp ( timeout )
exp = exp . replace ( microsecond = 0 )
if num > self . _max_entries :
self . _cull ( db , cursor , now )
pickled = pickle . dumps ( value , pickle . HIGHEST_PROTOCOL )
b64encoded = base64 . b64encode ( pickled )
b64encoded = b64encoded . decode ( 'latin1' )
with transaction . atomic ( using = db ) :
cursor . execute ( "SELECT cache_key, expires FROM %s " "WHERE cache_key = %%s" % table , [ key ] )
result = cursor . fetchone ( )
if result :
current_expires = result [ 1 ]
current_expires = typecast_timestamp ( str ( current_expires ) )
exp = connections [ db ] . ops . value_to_db_datetime ( exp )
if result and ( mode == 'set' or ( mode == 'add' and current_expires < now ) ) :
cursor . execute ( "UPDATE %s SET value = %%s, expires = %%s " "WHERE cache_key = %%s" % table , [ b64encoded , exp , key ] )
else :
cursor . execute ( "INSERT INTO %s (cache_key, value, expires) " "VALUES (%%s, %%s, %%s)" % table , [ key , b64encoded , exp ] )
except DatabaseError :
else :
db = router . db_for_write ( self . cache_model_class )
table = connections [ db ] . ops . quote_name ( self . _table )
with connections [ db ] . cursor ( ) as cursor :
db = router . db_for_read ( self . cache_model_class )
table = connections [ db ] . ops . quote_name ( self . _table )
now = datetime . utcnow ( )
else :
now = datetime . now ( )
now = now . replace ( microsecond = 0 )
with connections [ db ] . cursor ( ) as cursor :
cursor . execute ( "SELECT cache_key FROM %s " "WHERE cache_key = %%s and expires > %%s" % table , [ key , connections [ db ] . ops . value_to_db_datetime ( now ) ] )
else :
now = now . replace ( tzinfo = None )
table = connections [ db ] . ops . quote_name ( self . _table )
cursor . execute ( "DELETE FROM %s WHERE expires < %%s" % table , [ connections [ db ] . ops . value_to_db_datetime ( now ) ] )
cursor . execute ( "SELECT COUNT(*) FROM %s" % table )
num = cursor . fetchone ( ) [ 0 ]
if num > self . _max_entries :
cull_num = num // self . _cull_frequency
cursor . execute ( connections [ db ] . ops . cache_key_culling_sql ( ) % table , [ cull_num ] )
cursor . execute ( "DELETE FROM %s " "WHERE cache_key < %%s" % table , [ cursor . fetchone ( ) [ 0 ] ] )
db = router . db_for_write ( self . cache_model_class )
table = connections [ db ] . ops . quote_name ( self . _table )
with connections [ db ] . cursor ( ) as cursor :
cursor . execute ( 'DELETE FROM %s' % table )
cache_suffix = '.djcache'
with io . open ( fname , 'rb' ) as f :
return pickle . loads ( zlib . decompress ( f . read ( ) ) )
except IOError as e :
if e . errno == errno . ENOENT :
fd , tmp_path = tempfile . mkstemp ( dir = self . _dir )
with io . open ( fd , 'wb' ) as f :
expiry = self . get_backend_timeout ( timeout )
f . write ( pickle . dumps ( expiry , - 1 ) )
f . write ( zlib . compress ( pickle . dumps ( value ) , - 1 ) )
file_move_safe ( tmp_path , fname , allow_overwrite = True )
finally :
if not renamed :
except OSError as e :
if e . errno != errno . ENOENT :
with io . open ( fname , 'rb' ) as f :
filelist = self . _list_cache_files ( )
if num_entries < self . _max_entries :
filelist = random . sample ( filelist , int ( num_entries / self . _cull_frequency ) )
for fname in filelist :
except OSError as e :
if e . errno != errno . EEXIST :
for fname in self . _list_cache_files ( ) :
exp = pickle . load ( f )
if exp is not None and exp < time . time ( ) :
f . close ( )
_caches = { }
_expire_info = { }
_locks = { }
self . _cache = _caches . setdefault ( name , { } )
self . _expire_info = _expire_info . setdefault ( name , { } )
self . _lock = _locks . setdefault ( name , RWLock ( ) )
pickled = pickle . dumps ( value , pickle . HIGHEST_PROTOCOL )
pickled = None
with self . _lock . reader ( ) :
if pickled is not None :
return pickle . loads ( pickled )
except pickle . PickleError :
except KeyError :
pickled = pickle . dumps ( value , pickle . HIGHEST_PROTOCOL )
pickled = pickle . dumps ( new_value , pickle . HIGHEST_PROTOCOL )
with self . _lock . reader ( ) :
except KeyError :
if exp is None or exp > time . time ( ) :
else :
for k in doomed :
except KeyError :
except KeyError :
renamed_methods = ( ( '_get_memcache_timeout' , 'get_backend_timeout' , RemovedInDjango19Warning ) , )
self . _servers = server . split ( ';' )
else :
self . _servers = server
self . LibraryValueNotFoundException = value_not_found_exception
self . _lib = library
self . _options = params . get ( 'OPTIONS' , None )
@ property
if timeout == DEFAULT_TIMEOUT :
if timeout is None :
timeout = - 1
if timeout > 2592000 :
timeout += int ( time . time ( ) )
if val is None :
new_keys = [ self . make_key ( x , version = version ) for x in keys ]
ret = self . _cache . get_multi ( new_keys )
if ret :
_ = { }
m = dict ( zip ( new_keys , keys ) )
for k , v in ret . items ( ) :
_ [ m [ k ] ] = v
ret = _
self . _cache . disconnect_all ( )
if delta < 0 :
val = None
if val is None :
if delta < 0 :
val = None
if val is None :
safe_data = { }
self . _cache . delete_multi ( map ( l , keys ) )
self . _cache . flush_all ( )
super ( MemcachedCache , self ) . __init__ ( server , params , library = memcache , value_not_found_exception = ValueError )
@ property
super ( PyLibMCCache , self ) . __init__ ( server , params , library = pylibmc , value_not_found_exception = pylibmc . NotFound )
@ cached_property
client . behaviors = self . _options
TEMPLATE_FRAGMENT_KEY_TEMPLATE = 'template.cache.%s.%s'
if vary_on is None :
vary_on = ( )
key = ':' . join ( urlquote ( var ) for var in vary_on )
return TEMPLATE_FRAGMENT_KEY_TEMPLATE % ( fragment_name , args . hexdigest ( ) )
from . messages import ( CheckMessage , Debug , Info , Warning , Error , Critical , DEBUG , INFO , WARNING , ERROR , CRITICAL )
from . registry import register , run_checks , tag_exists , Tags
__all__ = [ 'CheckMessage' , 'Debug' , 'Info' , 'Warning' , 'Error' , 'Critical' , 'DEBUG' , 'INFO' , 'WARNING' , 'ERROR' , 'CRITICAL' , 'register' , 'run_checks' , 'tag_exists' , 'Tags' , ]
@ register ( Tags . compatibility )
errors = [ ]
errors . extend ( _check_test_runner ( ** kwargs ) )
errors . extend ( _check_boolean_field_default_value ( ** kwargs ) )
weight = 0
weight += 2
except AttributeError :
except AttributeError :
weight += 2
weight += 2
weight += 2
weight += 1
if 'django.middleware.clickjacking.XFrameOptionsMiddleware' not in set ( settings . MIDDLEWARE_CLASSES ) :
weight += 1
if weight >= 6 :
return [ Warning ( "Some project unittests may not execute as expected." , hint = ( "Django 1.6 introduced a new default test runner. It looks like " "this project was generated using Django 1.5 or earlier. You should " "ensure your tests are all running & behaving as expected. See " "https://docs.djangoproject.com/en/dev/releases/1.6/#new-test-runner " "for more information." ) , obj = None , id = '1_6.W001' , ) ]
else :
@ register ( Tags . compatibility )
errors = [ ]
errors . extend ( _check_middleware_classes ( ** kwargs ) )
return [ Warning ( "MIDDLEWARE_CLASSES is not set." , hint = ( "Django 1.7 changed the global defaults for the MIDDLEWARE_CLASSES. " "django.contrib.sessions.middleware.SessionMiddleware, " "django.contrib.auth.middleware.AuthenticationMiddleware, and " "django.contrib.messages.middleware.MessageMiddleware were removed from the defaults. " "If your project needs these middleware then you should configure this setting." ) , obj = None , id = '1_7.W001' , ) ]
else :
DEBUG = 10
INFO = 20
WARNING = 30
ERROR = 40
CRITICAL = 50
@ python_2_unicode_compatible
self . level = level
self . msg = msg
self . hint = hint
self . id = id
return all ( getattr ( self , attr ) == getattr ( other , attr ) for attr in [ 'level' , 'msg' , 'hint' , 'obj' , 'id' ] )
obj = "?"
app = model . _meta . app_label
obj = '%s.%s' % ( app , model . _meta . object_name )
else :
obj = force_str ( self . obj )
hint = "\n\tHINT: %s" % self . hint if self . hint else ''
return "%s: %s%s%s" % ( obj , id , self . msg , hint )
@ register ( Tags . models )
return list ( chain ( * errors ) )
@ register ( Tags . models , Tags . signals )
errors = [ ]
for name in dir ( models . signals ) :
obj = getattr ( models . signals , name )
for reference , receivers in obj . unresolved_references . items ( ) :
for receiver , _ , _ in receivers :
description = "The '%s' function" % receiver . __name__
else :
errors . append ( Error ( "%s was connected to the '%s' signal " "with a lazy reference to the '%s' sender, " "which has not been installed." % ( description , name , '.' . join ( reference ) ) , obj = receiver . __module__ , hint = None , id = 'signals.E001' ) )
admin = 'admin'
compatibility = 'compatibility'
models = 'models'
signals = 'signals'
check . tags = tags
if check not in self . registered_checks :
errors = [ ]
if tags is not None :
checks = [ check for check in self . registered_checks if hasattr ( check , 'tags' ) and set ( check . tags ) & set ( tags ) ]
else :
checks = self . registered_checks
for check in checks :
new_errors = check ( app_configs = app_configs )
assert is_iterable ( new_errors ) , ( "The function %r did not return a list. All functions registered " "with the checks registry must return a list." % check )
errors . extend ( new_errors )
return set ( chain ( * [ check . tags for check in self . registered_checks if hasattr ( check , 'tags' ) ] ) )
registry = CheckRegistry ( )
run_checks = registry . run_checks
tag_exists = registry . tag_exists
if token is None :
else :
_get_val = lazy ( _get_val , six . text_type )
return { 'csrf_token' : _get_val ( ) }
context_extras = { }
context_extras = { }
context_extras [ 'LANGUAGE_CODE' ] = translation . get_language ( )
context_extras [ 'LANGUAGE_BIDI' ] = translation . get_language_bidi ( )
return { 'TIME_ZONE' : timezone . get_current_timezone_name ( ) }
NON_FIELD_ERRORS = '__all__'
else :
messages = ValidationError ( messages )
self . error_list . extend ( message . error_list )
else :
self . code = code
self . params = params
@ property
@ property
return reduce ( operator . add , dict ( self ) . values ( ) )
else :
error_dict . setdefault ( NON_FIELD_ERRORS , [ ] ) . extend ( self . error_list )
else :
if error . params :
__all__ = [ 'File' ]
from io import BytesIO , StringIO , UnsupportedOperation
@ python_2_unicode_compatible
DEFAULT_CHUNK_SIZE = 64 * 2 ** 10
if name is None :
name = getattr ( file , 'name' , None )
if hasattr ( file , 'mode' ) :
self . mode = file . mode
except ( OSError , TypeError ) :
pos = self . file . tell ( )
size = property ( _get_size , _set_size )
closed = property ( _get_closed )
if not chunk_size :
chunk_size = self . DEFAULT_CHUNK_SIZE
except ( AttributeError , UnsupportedOperation ) :
if not chunk_size :
chunk_size = self . DEFAULT_CHUNK_SIZE
buffer_ = None
chunk_buffer = BytesIO ( chunk )
for line in chunk_buffer :
if buffer_ :
line = buffer_ + line
buffer_ = None
if line [ - 1 : ] in ( b'\n' , b'\r' ) :
else :
buffer_ = line
if buffer_ is not None :
else :
@ python_2_unicode_compatible
else :
stream_class = BytesIO
content = force_bytes ( content )
super ( ContentFile , self ) . __init__ ( stream_class ( content ) , name = name )
width = property ( _get_width )
height = property ( _get_height )
from PIL import ImageFile as PillowImageFile
if hasattr ( file_or_path , 'read' ) :
file = file_or_path
file_pos = file . tell ( )
file . seek ( 0 )
else :
file = open ( file_or_path , 'rb' )
chunk_size = 1024
while 1 :
except zlib . error as e :
else :
if p . image :
chunk_size *= 2
finally :
if close :
file . close ( )
else :
file . seek ( file_pos )
__all__ = ( 'LOCK_EX' , 'LOCK_SH' , 'LOCK_NB' , 'lock' , 'unlock' )
return f . fileno ( ) if hasattr ( f , 'fileno' ) else f
from ctypes import ( sizeof , c_ulong , c_void_p , c_int64 , Structure , Union , POINTER , windll , byref )
from ctypes . wintypes import BOOL , DWORD , HANDLE
LOCK_SH = 0
LOCK_NB = 0x1
LOCK_EX = 0x2
if sizeof ( c_ulong ) != sizeof ( c_void_p ) :
ULONG_PTR = c_int64
else :
ULONG_PTR = c_ulong
PVOID = c_void_p
_fields_ = [ ( 'Offset' , DWORD ) , ( 'OffsetHigh' , DWORD ) ]
_anonymous_ = [ '_offset' ]
_fields_ = [ ( '_offset' , _OFFSET ) , ( 'Pointer' , PVOID ) ]
_anonymous_ = [ '_offset_union' ]
_fields_ = [ ( 'Internal' , ULONG_PTR ) , ( 'InternalHigh' , ULONG_PTR ) , ( '_offset_union' , _OFFSET_UNION ) , ( 'hEvent' , HANDLE ) ]
LPOVERLAPPED = POINTER ( OVERLAPPED )
LockFileEx = windll . kernel32 . LockFileEx
LockFileEx . restype = BOOL
LockFileEx . argtypes = [ HANDLE , DWORD , DWORD , DWORD , DWORD , LPOVERLAPPED ]
UnlockFileEx = windll . kernel32 . UnlockFileEx
UnlockFileEx . restype = BOOL
UnlockFileEx . argtypes = [ HANDLE , DWORD , DWORD , DWORD , LPOVERLAPPED ]
hfile = msvcrt . get_osfhandle ( _fd ( f ) )
overlapped = OVERLAPPED ( )
ret = LockFileEx ( hfile , flags , 0 , 0 , 0xFFFF0000 , byref ( overlapped ) )
hfile = msvcrt . get_osfhandle ( _fd ( f ) )
overlapped = OVERLAPPED ( )
ret = UnlockFileEx ( hfile , 0 , 0 , 0xFFFF0000 , byref ( overlapped ) )
else :
LOCK_SH = fcntl . LOCK_SH
LOCK_NB = fcntl . LOCK_NB
LOCK_EX = fcntl . LOCK_EX
LOCK_EX = LOCK_SH = LOCK_NB = 0
else :
ret = fcntl . flock ( _fd ( f ) , flags )
ret = fcntl . flock ( _fd ( f ) , fcntl . LOCK_UN )
from shutil import copystat
mode = stat . S_IMODE ( st . st_mode )
os . utime ( dst , ( st . st_atime , st . st_mtime ) )
__all__ = [ 'file_move_safe' ]
except OSError :
if _samefile ( old_file_name , new_file_name ) :
os . rename ( old_file_name , new_file_name )
except OSError :
with open ( old_file_name , 'rb' ) as old_file :
locks . lock ( fd , locks . LOCK_EX )
current_chunk = None
while current_chunk != b'' :
current_chunk = old_file . read ( chunk_size )
finally :
locks . unlock ( fd )
copystat ( old_file_name , new_file_name )
except OSError as e :
if getattr ( e , 'winerror' , 0 ) != 32 and getattr ( e , 'errno' , 0 ) != 13 :
__all__ = ( 'Storage' , 'FileSystemStorage' , 'DefaultStorage' , 'default_storage' )
return self . _open ( name , mode )
if name is None :
name = content . name
if not hasattr ( content , 'chunks' ) :
content = File ( content )
count = itertools . count ( 1 )
@ deconstructible
if location is None :
self . base_location = location
if base_url is None :
base_url += '/'
self . base_url = base_url
self . file_permissions_mode = ( file_permissions_mode if file_permissions_mode is not None else settings . FILE_UPLOAD_PERMISSIONS )
self . directory_permissions_mode = ( directory_permissions_mode if directory_permissions_mode is not None else settings . FILE_UPLOAD_DIRECTORY_PERMISSIONS )
finally :
else :
except OSError as e :
if e . errno != errno . EEXIST :
if hasattr ( content , 'temporary_file_path' ) :
file_move_safe ( content . temporary_file_path ( ) , full_path )
else :
fd = os . open ( full_path , flags , 0o666 )
_file = None
locks . lock ( fd , locks . LOCK_EX )
for chunk in content . chunks ( ) :
if _file is None :
_file = os . fdopen ( fd , mode )
_file . write ( chunk )
finally :
locks . unlock ( fd )
if _file is not None :
_file . close ( )
else :
except OSError as e :
if e . errno == errno . EEXIST :
else :
else :
assert name , "The name argument is not allowed to be empty."
except OSError as e :
if e . errno != errno . ENOENT :
directories , files = [ ] , [ ]
else :
return urljoin ( self . base_url , filepath_to_uri ( name ) )
self . _wrapped = get_storage_class ( ) ( )
default_storage = DefaultStorage ( )
__all__ = ( 'NamedTemporaryFile' , 'gettempdir' , )
fd , name = tempfile . mkstemp ( suffix = suffix , prefix = prefix , dir = dir )
self . file = os . fdopen ( fd , mode , bufsize )
unlink = os . unlink
except ( OSError , IOError ) :
except ( OSError ) :
@ property
self . file . __exit__ ( exc , value , tb )
NamedTemporaryFile = TemporaryFile
else :
NamedTemporaryFile = tempfile . NamedTemporaryFile
gettempdir = tempfile . gettempdir
__all__ = ( 'UploadedFile' , 'TemporaryUploadedFile' , 'InMemoryUploadedFile' , 'SimpleUploadedFile' )
DEFAULT_CHUNK_SIZE = 64 * 2 ** 10
self . size = size
self . content_type = content_type
self . charset = charset
self . content_type_extra = content_type_extra
if name is not None :
ext = ext [ : 255 ]
name = property ( _get_name , _set_name )
file = tempfile . NamedTemporaryFile ( suffix = '.upload' , dir = settings . FILE_UPLOAD_TEMP_DIR )
else :
file = tempfile . NamedTemporaryFile ( suffix = '.upload' )
super ( TemporaryUploadedFile , self ) . __init__ ( file , name , content_type , size , charset , content_type_extra )
except OSError as e :
if e . errno != errno . ENOENT :
super ( InMemoryUploadedFile , self ) . __init__ ( file , name , content_type , size , charset , content_type_extra )
self . field_name = field_name
content = content or b''
@ classmethod
return cls ( file_dict [ 'filename' ] , file_dict [ 'content' ] , file_dict . get ( 'content-type' , 'text/plain' ) )
__all__ = [ 'UploadFileException' , 'StopUpload' , 'SkipFile' , 'FileUploadHandler' , 'TemporaryFileUploadHandler' , 'MemoryFileUploadHandler' , 'load_handler' , 'StopFutureHandlers' ]
@ python_2_unicode_compatible
self . connection_reset = connection_reset
return 'StopUpload: Halt current upload.'
else :
chunk_size = 64 * 2 ** 10
self . field_name = field_name
self . file_name = file_name
self . content_type = content_type
self . content_length = content_length
self . charset = charset
self . content_type_extra = content_type_extra
else :
else :
response_fixes = [ http . fix_location_header , http . conditional_content_removal , ]
request_middleware = [ ]
mw_class = import_string ( middleware_path )
mw_instance = mw_class ( )
except MiddlewareNotUsed :
if hasattr ( mw_instance , 'process_request' ) :
if hasattr ( mw_instance , 'process_view' ) :
if hasattr ( mw_instance , 'process_template_response' ) :
self . _template_response_middleware . insert ( 0 , mw_instance . process_template_response )
if hasattr ( mw_instance , 'process_response' ) :
self . _response_middleware . insert ( 0 , mw_instance . process_response )
if hasattr ( mw_instance , 'process_exception' ) :
self . _exception_middleware . insert ( 0 , mw_instance . process_exception )
self . _request_middleware = request_middleware
non_atomic_requests = getattr ( view , '_non_atomic_requests' , set ( ) )
for db in connections . all ( ) :
if ( db . settings_dict [ 'ATOMIC_REQUESTS' ] and db . alias not in non_atomic_requests ) :
view = transaction . atomic ( using = db . alias ) ( view )
callback , param_dict = resolver . resolve_error_handler ( status_code )
except :
urlresolvers . set_urlconf ( urlconf )
resolver = urlresolvers . RegexURLResolver ( r'^/' , urlconf )
for middleware_method in self . _request_middleware :
urlresolvers . set_urlconf ( urlconf )
resolver = urlresolvers . RegexURLResolver ( r'^/' , urlconf )
callback , callback_args , callback_kwargs = resolver_match
request . resolver_match = resolver_match
for middleware_method in self . _view_middleware :
wrapped_callback = self . make_view_atomic ( callback )
except Exception as e :
for middleware_method in self . _exception_middleware :
view_name = callback . __name__
else :
view_name = callback . __class__ . __name__ + '.__call__'
for middleware_method in self . _template_response_middleware :
except http . Http404 as e :
else :
except PermissionDenied :
except SuspiciousOperation as e :
security_logger = logging . getLogger ( 'django.security.%s' % e . __class__ . __name__ )
except SystemExit :
except :
for middleware_method in self . _response_middleware :
except :
if resolver . urlconf_module is None :
six . reraise ( * exc_info )
callback , param_dict = resolver . resolve_error_handler ( 500 )
ISO_8859_1 , UTF_8 = str ( 'iso-8859-1' ) , str ( 'utf-8' )
self . remaining = limit
self . buf_size = buf_size
if size is None or size > self . remaining :
if size == 0 :
result = self . stream . read ( size )
if size is None :
else :
if size :
else :
if not chunk :
sio = BytesIO ( self . buffer )
if size :
line = sio . readline ( size )
else :
line = sio . readline ( )
self . buffer = sio . read ( )
script_name = get_script_name ( environ )
path_info = get_path_info ( environ )
if not path_info :
path_info = '/'
self . environ = environ
self . path_info = path_info
self . path = '%s/%s' % ( script_name . rstrip ( '/' ) , path_info . replace ( '/' , '' , 1 ) )
self . META [ 'PATH_INFO' ] = path_info
self . META [ 'SCRIPT_NAME' ] = script_name
self . method = environ [ 'REQUEST_METHOD' ] . upper ( )
_ , content_params = cgi . parse_header ( environ . get ( 'CONTENT_TYPE' , '' ) )
if 'charset' in content_params :
codecs . lookup ( content_params [ 'charset' ] )
except LookupError :
else :
self . encoding = content_params [ 'charset' ]
content_length = int ( environ . get ( 'CONTENT_LENGTH' ) )
content_length = 0
return self . environ . get ( 'wsgi.url_scheme' )
@ cached_property
raw_query_string = get_bytes_from_wsgi ( self . environ , 'QUERY_STRING' , '' )
return http . QueryDict ( raw_query_string , encoding = self . _encoding )
@ cached_property
raw_cookie = get_str_from_wsgi ( self . environ , 'HTTP_COOKIE' , '' )
return http . parse_cookie ( raw_cookie )
POST = property ( _get_post , _set_post )
FILES = property ( _get_files )
initLock = Lock ( )
request_class = WSGIRequest
except :
set_script_prefix ( get_script_name ( environ ) )
signals . request_started . send ( sender = self . __class__ )
except UnicodeDecodeError :
logger . warning ( 'Bad Request (UnicodeDecodeError)' , exc_info = sys . exc_info ( ) , extra = { 'status_code' : 400 , } )
else :
start_response ( force_str ( status ) , response_headers )
path_info = get_bytes_from_wsgi ( environ , 'PATH_INFO' , '/' )
return path_info . decode ( UTF_8 )
script_url = get_bytes_from_wsgi ( environ , 'SCRIPT_URL' , '' )
if not script_url :
script_url = get_bytes_from_wsgi ( environ , 'REDIRECT_URL' , '' )
if script_url :
path_info = get_bytes_from_wsgi ( environ , 'PATH_INFO' , '' )
else :
script_name = get_bytes_from_wsgi ( environ , 'SCRIPT_NAME' , '' )
return script_name . decode ( UTF_8 )
from django . core . mail . message import ( EmailMessage , EmailMultiAlternatives , SafeMIMEText , SafeMIMEMultipart , DEFAULT_ATTACHMENT_MIME_TYPE , make_msgid , BadHeaderError , forbid_multi_line_headers )
__all__ = [ 'CachedDnsName' , 'DNS_NAME' , 'EmailMessage' , 'EmailMultiAlternatives' , 'SafeMIMEText' , 'SafeMIMEMultipart' , 'DEFAULT_ATTACHMENT_MIME_TYPE' , 'make_msgid' , 'BadHeaderError' , 'forbid_multi_line_headers' , 'get_connection' , 'send_mail' , 'send_mass_mail' , 'mail_admins' , 'mail_managers' , ]
return klass ( fail_silently = fail_silently , ** kwds )
if html_message :
mail . attach_alternative ( html_message , 'text/html' )
if html_message :
mail . attach_alternative ( html_message , 'text/html' )
mail . send ( fail_silently = fail_silently )
if html_message :
mail . attach_alternative ( html_message , 'text/html' )
mail . send ( fail_silently = fail_silently )
self . fail_silently = fail_silently
self . stream = kwargs . pop ( 'stream' , sys . stdout )
self . _lock = threading . RLock ( )
msg_data = msg . as_bytes ( )
charset = msg . get_charset ( ) . get_output_charset ( ) if msg . get_charset ( ) else 'utf-8'
msg_data = msg_data . decode ( charset )
if not email_messages :
msg_count = 0
stream_created = self . open ( )
msg_count += 1
if stream_created :
except Exception :
self . file_path = kwargs . pop ( 'file_path' )
else :
except OSError as err :
timestamp = datetime . datetime . now ( ) . strftime ( "%Y%m%d-%H%M%S" )
fname = "%s-%s.log" % ( timestamp , abs ( id ( self ) ) )
finally :
if not hasattr ( mail , 'outbox' ) :
mail . outbox = [ ]
msg_count = 0
msg_count += 1
mail . outbox . extend ( messages )
super ( EmailBackend , self ) . __init__ ( fail_silently = fail_silently )
self . username = settings . EMAIL_HOST_USER if username is None else username
self . password = settings . EMAIL_HOST_PASSWORD if password is None else password
self . use_tls = settings . EMAIL_USE_TLS if use_tls is None else use_tls
self . use_ssl = settings . EMAIL_USE_SSL if use_ssl is None else use_ssl
self . timeout = timeout
self . _lock = threading . RLock ( )
connection_class = smtplib . SMTP_SSL if self . use_ssl else smtplib . SMTP
connection_params = { 'local_hostname' : DNS_NAME . get_fqdn ( ) }
connection_params [ 'timeout' ] = self . timeout
except smtplib . SMTPException :
except ( ssl . SSLError , smtplib . SMTPServerDisconnected ) :
except smtplib . SMTPException :
finally :
if not email_messages :
new_conn_created = self . open ( )
num_sent = 0
if sent :
num_sent += 1
if new_conn_created :
if not email_message . recipients ( ) :
from_email = sanitize_address ( email_message . from_email , email_message . encoding )
recipients = [ sanitize_address ( addr , email_message . encoding ) for addr in email_message . recipients ( ) ]
self . connection . sendmail ( from_email , recipients , message . as_bytes ( ) )
except smtplib . SMTPException :
from email import ( charset as Charset , encoders as Encoders , message_from_string , generator )
from email . mime . text import MIMEText
from email . mime . multipart import MIMEMultipart
from email . mime . base import MIMEBase
from email . mime . message import MIMEMessage
from email . header import Header
from email . utils import formatdate , getaddresses , formataddr , parseaddr
utf8_charset = Charset . Charset ( 'utf-8' )
utf8_charset . body_encoding = None
DEFAULT_ATTACHMENT_MIME_TYPE = 'application/octet-stream'
timeval = time . time ( )
utcdate = time . strftime ( '%Y%m%d%H%M%S' , time . gmtime ( timeval ) )
except AttributeError :
pid = 1
randint = random . randrange ( 100000 )
if idstring is None :
idstring = ''
else :
idstring = '.' + idstring
idhost = DNS_NAME
msgid = '<%s.%s.%s%s@%s>' % ( utcdate , pid , randint , idstring , idhost )
ADDRESS_HEADERS = set ( [ 'from' , 'sender' , 'reply-to' , 'to' , 'cc' , 'bcc' , 'resent-from' , 'resent-sender' , 'resent-to' , 'resent-cc' , 'resent-bcc' , ] )
val = force_text ( val )
if '\n' in val or '\r' in val :
raise BadHeaderError ( "Header values can't contain newlines (got %r for header %r)" % ( val , name ) )
val . encode ( 'ascii' )
except UnicodeEncodeError :
if name . lower ( ) in ADDRESS_HEADERS :
val = ', ' . join ( sanitize_address ( addr , encoding ) for addr in getaddresses ( ( val , ) ) )
else :
val = Header ( val , encoding ) . encode ( )
else :
if name . lower ( ) == 'subject' :
val = Header ( val ) . encode ( )
addr = parseaddr ( force_text ( addr ) )
nm , addr = addr
nm = Header ( nm , encoding ) . encode ( )
except UnicodeEncodeError :
nm = Header ( nm , 'utf-8' ) . encode ( )
addr . encode ( 'ascii' )
except UnicodeEncodeError :
if '@' in addr :
localpart , domain = addr . split ( '@' , 1 )
localpart = str ( Header ( localpart , encoding ) )
domain = domain . encode ( 'idna' ) . decode ( 'ascii' )
addr = '@' . join ( [ localpart , domain ] )
else :
addr = Header ( addr , encoding ) . encode ( )
return formataddr ( ( nm , addr ) )
fp = six . StringIO ( )
g = generator . Generator ( fp , mangle_from_ = False )
g . flatten ( self , unixfrom = unixfrom )
if six . PY2 :
as_bytes = as_string
else :
fp = six . BytesIO ( )
g = generator . BytesGenerator ( fp , mangle_from_ = False )
g . flatten ( self , unixfrom = unixfrom )
name , val = forbid_multi_line_headers ( name , val , 'ascii' )
MIMEMessage . __setitem__ ( self , name , val )
if charset == 'utf-8' :
del self [ 'Content-Transfer-Encoding' ]
if ( 3 , 2 ) < sys . version_info < ( 3 , 3 , 4 ) :
payload = text . encode ( utf8_charset . output_charset )
self . _payload = payload . decode ( 'ascii' , 'surrogateescape' )
self . set_charset ( utf8_charset )
else :
self . set_payload ( text , utf8_charset )
self . replace_header ( 'Content-Type' , 'text/%s; charset="%s"' % ( subtype , charset ) )
else :
MIMEText . __init__ ( self , text , subtype , charset )
name , val = forbid_multi_line_headers ( name , val , self . encoding )
MIMEText . __setitem__ ( self , name , val )
MIMEMultipart . __init__ ( self , _subtype , boundary , _subparts , ** _params )
name , val = forbid_multi_line_headers ( name , val , self . encoding )
MIMEMultipart . __setitem__ ( self , name , val )
content_subtype = 'plain'
mixed_subtype = 'mixed'
encoding = None
if to :
else :
if cc :
self . cc = list ( cc )
else :
if bcc :
self . bcc = list ( bcc )
else :
self . from_email = from_email or settings . DEFAULT_FROM_EMAIL
self . subject = subject
self . body = body
self . attachments = attachments or [ ]
self . extra_headers = headers or { }
msg = self . _create_message ( msg )
msg [ 'Subject' ] = self . subject
msg [ 'Cc' ] = ', ' . join ( self . cc )
if 'date' not in header_names :
msg [ 'Date' ] = formatdate ( )
if name . lower ( ) in ( 'from' , 'to' ) :
assert content is None
assert mimetype is None
else :
assert content is not None
content = f . read ( )
self . attach ( filename , content , mimetype )
body_msg = msg
msg = SafeMIMEMultipart ( _subtype = self . mixed_subtype , encoding = encoding )
msg . attach ( body_msg )
for attachment in self . attachments :
msg . attach ( attachment )
else :
msg . attach ( self . _create_attachment ( * attachment ) )
basetype , subtype = mimetype . split ( '/' , 1 )
if basetype == 'text' :
attachment = SafeMIMEText ( content , subtype , encoding )
content = message_from_string ( content )
attachment = SafeMIMEMessage ( content , subtype )
else :
attachment = MIMEBase ( basetype , subtype )
attachment . set_payload ( content )
Encoders . encode_base64 ( attachment )
if mimetype is None :
mimetype , _ = mimetypes . guess_type ( filename )
if mimetype is None :
mimetype = DEFAULT_ATTACHMENT_MIME_TYPE
attachment = self . _create_mime_attachment ( content , mimetype )
if filename :
filename . encode ( 'ascii' )
except UnicodeEncodeError :
if six . PY2 :
filename = filename . encode ( 'utf-8' )
filename = ( 'utf-8' , '' , filename )
attachment . add_header ( 'Content-Disposition' , 'attachment' , filename = filename )
alternative_subtype = 'alternative'
super ( EmailMultiAlternatives , self ) . __init__ ( subject , body , from_email , to , bcc , connection , attachments , headers , cc )
self . alternatives = alternatives or [ ]
assert content is not None
assert mimetype is not None
body_msg = msg
msg = SafeMIMEMultipart ( _subtype = self . alternative_subtype , encoding = encoding )
msg . attach ( body_msg )
for alternative in self . alternatives :
msg . attach ( self . _create_mime_attachment ( * alternative ) )
self . _fqdn = socket . getfqdn ( )
DNS_NAME = CachedDnsName ( )
return [ f [ : - 3 ] for f in os . listdir ( command_dir ) if not f . startswith ( '_' ) and f . endswith ( '.py' ) ]
except OSError :
module = import_module ( '%s.management.commands.%s' % ( app_name , name ) )
@ lru_cache . lru_cache ( maxsize = None )
commands = { name : 'django.core' for name in find_commands ( __path__ [ 0 ] ) }
app_name = get_commands ( ) [ name ]
except KeyError :
command = app_name
else :
command = load_command_class ( app_name , name )
if command . use_argparse :
defaults = dict ( defaults . _get_kwargs ( ) , ** options )
else :
defaults = dict ( defaults . __dict__ , ** options )
self . argv = argv or sys . argv [ : ]
if commands_only :
usage = sorted ( get_commands ( ) . keys ( ) )
else :
usage = [ "" , "Type '%s help ' for help on a specific subcommand." % self . prog_name , "" , "Available subcommands:" , ]
commands_dict = collections . defaultdict ( lambda : [ ] )
for name , app in six . iteritems ( get_commands ( ) ) :
else :
app = app . rpartition ( '.' ) [ - 1 ]
style = color_style ( )
for app in sorted ( commands_dict . keys ( ) ) :
for name in sorted ( commands_dict [ app ] ) :
commands = get_commands ( )
app_name = commands [ subcommand ]
except KeyError :
sys . stderr . write ( "Unknown command: %r\nType '%s help' for usage.\n" % ( subcommand , self . prog_name ) )
sys . exit ( 1 )
klass = app_name
else :
klass = load_command_class ( app_name , subcommand )
cwords = os . environ [ 'COMP_WORDS' ] . split ( ) [ 1 : ]
cword = int ( os . environ [ 'COMP_CWORD' ] )
curr = cwords [ cword - 1 ]
except IndexError :
curr = ''
subcommands = list ( get_commands ( ) ) + [ 'help' ]
options = [ ( '--help' , None ) ]
if cword == 1 :
print ( ' ' . join ( sorted ( filter ( lambda x : x . startswith ( curr ) , subcommands ) ) ) )
subcommand_cls = self . fetch_command ( cwords [ 0 ] )
if cwords [ 0 ] == 'runfcgi' :
options += [ ( k , 1 ) for k in FASTCGI_OPTIONS ]
elif cwords [ 0 ] in ( 'dumpdata' , 'sql' , 'sqlall' , 'sqlclear' , 'sqlcustom' , 'sqlindexes' , 'sqlsequencereset' , 'test' ) :
app_configs = apps . get_app_configs ( )
if subcommand_cls . use_argparse :
options += [ ( sorted ( s_opt . option_strings ) [ 0 ] , s_opt . nargs != 0 ) for s_opt in parser . _actions if s_opt . option_strings ]
else :
options += [ ( s_opt . get_opt_string ( ) , s_opt . nargs ) for s_opt in parser . option_list ]
prev_opts = [ x . split ( '=' ) [ 0 ] for x in cwords [ 1 : cword - 1 ] ]
options = [ opt for opt in options if opt [ 0 ] not in prev_opts ]
options = sorted ( ( k , v ) for k , v in options if k . startswith ( curr ) )
for option in options :
opt_label = option [ 0 ]
if option [ 1 ] :
opt_label += '='
print ( opt_label )
sys . exit ( 1 )
except IndexError :
subcommand = 'help'
handle_default_options ( options )
except CommandError :
no_settings_commands = [ 'help' , 'version' , '--help' , '--version' , '-h' , 'compilemessages' , 'makemessages' , 'startapp' , 'startproject' , ]
except ImproperlyConfigured as exc :
self . settings_exception = exc
if subcommand in no_settings_commands :
django . setup ( )
if subcommand == 'help' :
sys . stdout . write ( self . main_help_text ( commands_only = True ) + '\n' )
sys . stdout . write ( self . main_help_text ( ) + '\n' )
else :
sys . stdout . write ( django . get_version ( ) + '\n' )
sys . stdout . write ( self . main_help_text ( ) + '\n' )
else :
utility = ManagementUtility ( argv )
utility . execute ( )
from argparse import ArgumentParser
from optparse import OptionParser
self . cmd = cmd
if self . cmd . _called_from_command_line :
else :
if options . pythonpath :
if hasattr ( out , 'isatty' ) and out . isatty ( ) :
self . style_func = style_func
self . ending = ending
ending = self . ending if ending is None else ending
if ending and not msg . endswith ( ending ) :
msg += ending
style_func = [ f for f in ( style_func , self . style_func , lambda x : x ) if f is not None ] [ 0 ]
self . _out . write ( force_str ( style_func ( msg ) ) )
option_list = ( )
help = ''
has_old_option = hasattr ( self , 'requires_model_validation' )
has_new_option = hasattr ( self , 'requires_system_checks' )
if has_old_option :
warnings . warn ( '"requires_model_validation" is deprecated ' 'in favor of "requires_system_checks".' , RemovedInDjango19Warning )
if has_old_option and has_new_option :
raise ImproperlyConfigured ( 'Command %s defines both "requires_model_validation" ' 'and "requires_system_checks", which is illegal. Use only ' '"requires_system_checks".' % self . __class__ . __name__ )
@ property
usage = '%%prog %s [options] %s' % ( subcommand , self . args )
else :
warnings . warn ( "OptionParser usage for Django management commands " "is deprecated, use ArgumentParser instead" , RemovedInDjango20Warning )
parser = OptionParser ( prog = prog_name , usage = self . usage ( subcommand ) , version = self . get_version ( ) )
parser . add_option ( '-v' , '--verbosity' , action = 'store' , dest = 'verbosity' , default = '1' , type = 'choice' , choices = [ '0' , '1' , '2' , '3' ] , help = 'Verbosity level; 0=minimal output, 1=normal output, 2=verbose output, 3=very verbose output' )
for opt in self . option_list :
else :
cmd_options = vars ( options )
else :
else :
cmd_options = vars ( options )
handle_default_options ( options )
except Exception as e :
stderr = getattr ( self , 'stderr' , OutputWrapper ( sys . stderr , self . style . ERROR ) )
stderr . write ( '%s: %s' % ( e . __class__ . __name__ , e ) )
sys . exit ( 1 )
self . stdout = OutputWrapper ( options . get ( 'stdout' , sys . stdout ) )
if options . get ( 'no_color' ) :
self . stderr = OutputWrapper ( options . get ( 'stderr' , sys . stderr ) )
else :
self . stderr = OutputWrapper ( options . get ( 'stderr' , sys . stderr ) , self . style . ERROR )
saved_locale = None
raise CommandError ( "Incompatible values of 'leave_locale_alone' " "(%s) and 'can_import_settings' (%s) command " "options." % ( self . leave_locale_alone , self . can_import_settings ) )
saved_locale = translation . get_language ( )
translation . activate ( 'en-us' )
if ( self . requires_system_checks and not options . get ( 'skip_validation' ) and not options . get ( 'skip_checks' ) ) :
if output :
finally :
if saved_locale is not None :
translation . activate ( saved_locale )
app_configs = None
else :
return self . check ( app_configs = app_configs , display_num_errors = display_num_errors )
all_issues = checks . run_checks ( app_configs = app_configs , tags = tags )
msg = ""
visible_issue_count = 0
if all_issues :
debugs = [ e for e in all_issues if e . level < checks . INFO and not e . is_silenced ( ) ]
infos = [ e for e in all_issues if checks . INFO <= e . level < checks . WARNING and not e . is_silenced ( ) ]
warnings = [ e for e in all_issues if checks . WARNING <= e . level < checks . ERROR and not e . is_silenced ( ) ]
errors = [ e for e in all_issues if checks . ERROR <= e . level < checks . CRITICAL ]
criticals = [ e for e in all_issues if checks . CRITICAL <= e . level ]
sorted_issues = [ ( criticals , 'CRITICALS' ) , ( errors , 'ERRORS' ) , ( warnings , 'WARNINGS' ) , ( infos , 'INFOS' ) , ( debugs , 'DEBUGS' ) , ]
for issues , group_name in sorted_issues :
if issues :
formatted = ( color_style ( ) . ERROR ( force_str ( e ) ) if e . is_serious ( ) else color_style ( ) . WARNING ( force_str ( e ) ) for e in issues )
formatted = "\n" . join ( sorted ( formatted ) )
msg += '\n%s:\n%s\n' % ( group_name , formatted )
if msg :
msg = "System check identified some issues:\n%s" % msg
if display_num_errors :
if msg :
msg += '\n'
msg += "System check identified %s (%s silenced)." % ( "no issues" if visible_issue_count == 0 else "1 issue" if visible_issue_count == 1 else "%s issues" % visible_issue_count , len ( all_issues ) - visible_issue_count , )
if any ( e . is_serious ( ) and not e . is_silenced ( ) for e in all_issues ) :
missing_args_message = "Enter at least one application label."
app_configs = [ apps . get_app_config ( app_label ) for app_label in app_labels ]
output = [ ]
if app_output :
handle_app = self . handle_app
except AttributeError :
else :
warnings . warn ( "AppCommand.handle_app() is superseded by " "AppCommand.handle_app_config()." , RemovedInDjango19Warning , stacklevel = 2 )
label = 'label'
missing_args_message = "Enter at least one %s." % label
output = [ ]
for label in labels :
label_output = self . handle_label ( label , ** options )
if label_output :
warnings . warn ( "NoArgsCommand class is deprecated and will be removed in Django 2.0. " "Use BaseCommand instead, which takes no arguments by default." , RemovedInDjango20Warning )
plat = sys . platform
supported_platform = plat != 'Pocket PC' and ( plat != 'win32' or 'ANSICON' in os . environ )
is_a_tty = hasattr ( sys . stdout , 'isatty' ) and sys . stdout . isatty ( )
if not supported_platform or not is_a_tty :
if not supports_color ( ) :
style = no_style ( )
else :
DJANGO_COLORS = os . environ . get ( 'DJANGO_COLORS' , '' )
color_settings = termcolors . parse_color_setting ( DJANGO_COLORS )
if color_settings :
style = dummy ( )
for role in termcolors . PALETTES [ termcolors . NOCOLOR_PALETTE ] :
format = color_settings . get ( role , { } )
setattr ( style , role , termcolors . make_style ( ** format ) )
style . ERROR_OUTPUT = style . ERROR
else :
style = no_style ( )
help = "Checks the entire Django project for potential problems."
if options . get ( 'list_tags' ) :
self . stdout . write ( '\n' . join ( sorted ( registry . tags_available ( ) ) ) )
if app_labels :
app_configs = [ apps . get_app_config ( app_label ) for app_label in app_labels ]
else :
app_configs = None
tags = options . get ( 'tags' , None )
self . check ( app_configs = app_configs , tags = tags , display_num_errors = True )
with open ( fn , 'rb' ) as f :
sample = f . read ( 4 )
return sample [ : 3 ] == b'\xef\xbb\xbf' or sample . startswith ( codecs . BOM_UTF16_LE ) or sample . startswith ( codecs . BOM_UTF16_BE )
except ( IOError , OSError ) :
help = 'Compiles .po files to .mo files for use with builtin gettext support.'
program = 'msgfmt'
program_options = [ '--check-format' ]
locale = options . get ( 'locale' )
exclude = options . get ( 'exclude' )
if find_command ( self . program ) is None :
raise CommandError ( "Can't find %s. Make sure you have GNU gettext " "tools 0.15 or newer installed." % self . program )
if not basedirs :
raise CommandError ( "This script should be run from the Django Git " "checkout or your project or app tree, or with " "the settings module specified." )
all_locales = [ ]
for basedir in basedirs :
locales = locale or all_locales
locales = set ( locales ) - set ( exclude )
for basedir in basedirs :
if locales :
else :
dirs = [ basedir ]
locations = [ ]
for ldir in dirs :
for dirpath , dirnames , filenames in os . walk ( ldir ) :
locations . extend ( ( dirpath , f ) for f in filenames if f . endswith ( '.po' ) )
if locations :
self . compile_messages ( locations )
for i , ( dirpath , f ) in enumerate ( locations ) :
self . stdout . write ( 'processing file %s in %s\n' % ( f , dirpath ) )
if has_bom ( po_path ) :
raise CommandError ( "The %s file has a BOM (Byte Order Mark). " "Django only supports .po files encoded in " "UTF-8 and without any BOM." % po_path )
if i == 0 and not is_writable ( npath ( base_path + '.mo' ) ) :
self . stderr . write ( "The po files under %s are in a seemingly not writable location. " "mo files will not be updated/created." % dirpath )
args = [ self . program ] + self . program_options + [ '-o' , npath ( base_path + '.mo' ) , npath ( base_path + '.po' ) ]
output , errors , status = popen_wrapper ( args )
if status :
if errors :
msg = "Execution of %s failed: %s" % ( self . program , errors )
else :
msg = "Execution of %s failed" % self . program
help = "Creates the tables needed to use the SQL cache backend."
db = options . get ( 'database' )
for tablename in tablenames :
self . create_table ( db , tablename )
else :
cache = caches [ cache_alias ]
self . create_table ( db , cache . _table )
cache = BaseDatabaseCache ( tablename , { } )
if not router . allow_migrate ( database , cache . cache_model_class ) :
self . stdout . write ( "Cache table '%s' already exists." % tablename )
table_output = [ ]
index_output = [ ]
for f in fields :
if f . primary_key :
if f . db_index :
unique = "UNIQUE " if f . unique else ""
index_output . append ( "CREATE %sINDEX %s ON %s (%s);" % ( unique , qn ( '%s_%s' % ( tablename , f . name ) ) , qn ( tablename ) , qn ( f . name ) ) )
full_statement = [ "CREATE TABLE %s (" % qn ( tablename ) ]
for i , line in enumerate ( table_output ) :
curs . execute ( "\n" . join ( full_statement ) )
except DatabaseError as e :
for statement in index_output :
curs . execute ( statement )
self . stdout . write ( "Cache table '%s' created." % tablename )
help = ( "Runs the command-line client for specified database, or the " "default database if none is provided." )
except OSError :
return dict ( ( k , repr ( v ) ) for k , v in module . __dict__ . items ( ) if not omittable ( k ) )
default_settings = module_to_dict ( global_settings )
output = [ ]
help = ( "Output the contents of the database as a fixture of the given " "format (using each model's default manager unless --all is " "specified)." )
parser . add_argument ( '--pks' , dest = 'primary_keys' , help = "Only dump objects with given primary keys. " "Accepts a comma separated list of keys. " "This option will only work when you specify one model." )
format = options . get ( 'format' )
indent = options . get ( 'indent' )
using = options . get ( 'database' )
excludes = options . get ( 'exclude' )
output = options . get ( 'output' )
show_traceback = options . get ( 'traceback' )
use_natural_keys = options . get ( 'use_natural_keys' )
if use_natural_keys :
warnings . warn ( "``--natural`` is deprecated; use ``--natural-foreign`` instead." , RemovedInDjango19Warning )
use_natural_foreign_keys = options . get ( 'use_natural_foreign_keys' ) or use_natural_keys
use_natural_primary_keys = options . get ( 'use_natural_primary_keys' )
use_base_manager = options . get ( 'use_base_manager' )
pks = options . get ( 'primary_keys' )
if pks :
primary_keys = pks . split ( ',' )
else :
primary_keys = [ ]
excluded_apps = set ( )
excluded_models = set ( )
for exclude in excludes :
if '.' in exclude :
model = apps . get_model ( exclude )
except LookupError :
excluded_models . add ( model )
else :
except LookupError :
if primary_keys :
else :
app_list = OrderedDict ( )
for label in app_labels :
app_label , model_label = label . split ( '.' )
except LookupError :
except LookupError :
if app_list_value is not None :
if model not in app_list_value :
if primary_keys :
app_label = label
except LookupError :
if format not in serializers . get_public_serializer_formats ( ) :
serializers . get_serializer ( format )
except serializers . SerializerDoesNotExist :
for model in sort_dependencies ( app_list . items ( ) ) :
if model in excluded_models :
if not model . _meta . proxy and router . allow_migrate ( using , model ) :
if use_base_manager :
objects = model . _base_manager
else :
objects = model . _default_manager
queryset = objects . using ( using ) . order_by ( model . _meta . pk . name )
if primary_keys :
queryset = queryset . filter ( pk__in = primary_keys )
for obj in queryset . iterator ( ) :
stream = open ( output , 'w' ) if output else None
serializers . serialize ( format , get_objects ( ) , indent = indent , use_natural_foreign_keys = use_natural_foreign_keys , use_natural_primary_keys = use_natural_primary_keys , stream = stream or self . stdout )
finally :
if stream :
stream . close ( )
except Exception as e :
if show_traceback :
model_dependencies = [ ]
models = set ( )
if model_list is None :
for model in model_list :
models . add ( model )
if hasattr ( model , 'natural_key' ) :
deps = getattr ( model . natural_key , 'dependencies' , [ ] )
if deps :
deps = [ apps . get_model ( dep ) for dep in deps ]
else :
deps = [ ]
if hasattr ( rel_model , 'natural_key' ) and rel_model != model :
if hasattr ( rel_model , 'natural_key' ) and rel_model != model :
model_dependencies . reverse ( )
model_list = [ ]
while model_dependencies :
skipped = [ ]
while model_dependencies :
model , deps = model_dependencies . pop ( )
for candidate in ( ( d not in models or d in model_list ) for d in deps ) :
if not candidate :
if found :
else :
if not changed :
raise CommandError ( "Can't resolve dependencies for %s in serialized app list." % ', ' . join ( '%s.%s' % ( model . _meta . app_label , model . _meta . object_name ) for model , deps in sorted ( skipped , key = lambda obj : obj [ 0 ] . __name__ ) ) )
model_dependencies = skipped
help = ( 'Removes ALL DATA from the database, including data added during ' 'migrations. Unmigrated apps will also have their initial_data ' 'fixture reloaded. Does not achieve a "fresh install" state.' )
database = options . get ( 'database' )
verbosity = options . get ( 'verbosity' )
interactive = options . get ( 'interactive' )
reset_sequences = options . get ( 'reset_sequences' , True )
allow_cascade = options . get ( 'allow_cascade' , False )
inhibit_post_migrate = options . get ( 'inhibit_post_migrate' , False )
sql_list = sql_flush ( self . style , connection , only_django = True , reset_sequences = reset_sequences , allow_cascade = allow_cascade )
if interactive :
confirm = input ( """You have requested a flush of the database. This will IRREVERSIBLY DESTROY all data currently in the %r database, and return each table to an empty state. Are you sure you want to do this? Type 'yes' to continue, or 'no' to cancel: """ % connection . settings_dict [ 'NAME' ] )
else :
confirm = 'yes'
if confirm == 'yes' :
for sql in sql_list :
cursor . execute ( sql )
except Exception as e :
new_msg = ( "Database %s couldn't be flushed. Possible reasons:\n" " * The database isn't running or isn't configured correctly.\n" " * At least one of the expected database tables doesn't exist.\n" " * The SQL was invalid.\n" "Hint: Look at the output of 'django-admin.py sqlflush'. That's the SQL this command wasn't able to run.\n" "The full error: %s" ) % ( connection . settings_dict [ 'NAME' ] , e )
six . reraise ( CommandError , CommandError ( new_msg ) , sys . exc_info ( ) [ 2 ] )
if not inhibit_post_migrate :
self . emit_post_migrate ( verbosity , interactive , database )
if options . get ( 'load_initial_data' ) :
call_command ( 'loaddata' , 'initial_data' , ** options )
else :
self . stdout . write ( "Flush cancelled.\n" )
@ staticmethod
all_models = [ ]
emit_post_migrate_signal ( set ( all_models ) , verbosity , interactive , database )
help = "Introspects the database tables in the given database and outputs a Django model module."
db_module = 'django.db'
for line in self . handle_inspection ( options ) :
except NotImplementedError :
raise CommandError ( "Database inspection isn't supported for the currently selected database backend." )
table_name_filter = options . get ( 'table_name_filter' )
table2model = lambda table_name : re . sub ( r'[^a-zA-Z0-9]' , '' , table_name . title ( ) )
strip_prefix = lambda s : s [ 1 : ] if s . startswith ( "u'" ) else s
yield "# * Remove `managed = False` lines if you wish to allow Django to create, modify, and delete the table"
yield "# Feel free to rename the models, but don't rename db_table values or field names."
yield "# Also note: You'll have to insert the output of 'django-admin.py sqlcustom [app_label]'"
known_models = [ ]
if table_name_filter is not None and callable ( table_name_filter ) :
if not table_name_filter ( table_name ) :
except NotImplementedError :
relations = { }
except NotImplementedError :
indexes = { }
used_column_names = [ ]
comment_notes = [ ]
extra_params = OrderedDict ( )
column_name = row [ 0 ]
is_relation = i in relations
att_name , params , notes = self . normalize_col_name ( column_name , used_column_names , is_relation )
extra_params . update ( params )
comment_notes . extend ( notes )
if column_name in indexes :
if indexes [ column_name ] [ 'primary_key' ] :
if is_relation :
rel_to = "self" if relations [ i ] [ 1 ] == table_name else table2model ( relations [ i ] [ 1 ] )
if rel_to in known_models :
field_type = 'ForeignKey(%s' % rel_to
else :
field_type = "ForeignKey('%s'" % rel_to
else :
field_type , field_params , field_notes = self . get_field_type ( connection , table_name , row )
extra_params . update ( field_params )
comment_notes . extend ( field_notes )
field_type += '('
if field_type == 'AutoField(' :
if row [ 6 ] :
if field_type == 'BooleanField(' :
field_type = 'NullBooleanField('
else :
if field_type not in ( 'TextField(' , 'CharField(' ) :
field_desc = '%s = %s%s' % ( att_name , '' if '.' in field_type else 'models.' , field_type , )
if extra_params :
if not field_desc . endswith ( '(' ) :
field_desc += ', '
field_desc += ', ' . join ( [ '%s=%s' % ( k , strip_prefix ( repr ( v ) ) ) for k , v in extra_params . items ( ) ] )
field_desc += ')'
if comment_notes :
field_desc += ' # ' + ' ' . join ( comment_notes )
for meta_line in self . get_meta ( table_name ) :
field_params = { }
field_notes = [ ]
new_name = col_name . lower ( )
if new_name != col_name :
if is_relation :
if new_name . endswith ( '_id' ) :
new_name = new_name [ : - 3 ]
else :
field_params [ 'db_column' ] = col_name
new_name , num_repl = re . subn ( r'\W' , '_' , new_name )
if num_repl > 0 :
if new_name . find ( '__' ) >= 0 :
while new_name . find ( '__' ) >= 0 :
new_name = new_name . replace ( '__' , '_' )
if col_name . lower ( ) . find ( '__' ) >= 0 :
if new_name . startswith ( '_' ) :
if new_name . endswith ( '_' ) :
new_name = '%sfield' % new_name
if keyword . iskeyword ( new_name ) :
new_name += '_field'
if new_name [ 0 ] . isdigit ( ) :
new_name = 'number_%s' % new_name
if new_name in used_column_names :
num = 0
while '%s_%d' % ( new_name , num ) in used_column_names :
num += 1
new_name = '%s_%d' % ( new_name , num )
if col_name != new_name and field_notes :
field_params [ 'db_column' ] = col_name
return new_name , field_params , field_notes
field_params = OrderedDict ( )
field_notes = [ ]
field_type = connection . introspection . get_field_type ( row [ 1 ] , row )
except KeyError :
field_type = 'TextField'
if type ( field_type ) is tuple :
field_type , new_params = field_type
field_params . update ( new_params )
if field_type == 'CharField' and row [ 3 ] :
field_params [ 'max_length' ] = int ( row [ 3 ] )
if field_type == 'DecimalField' :
if row [ 4 ] is None or row [ 5 ] is None :
field_notes . append ( 'max_digits and decimal_places have been guessed, as this ' 'database handles decimal fields as float' )
field_params [ 'max_digits' ] = row [ 4 ] if row [ 4 ] is not None else 10
field_params [ 'decimal_places' ] = row [ 5 ] if row [ 5 ] is not None else 5
else :
field_params [ 'max_digits' ] = row [ 4 ]
field_params [ 'decimal_places' ] = row [ 5 ]
return field_type , field_params , field_notes
help = 'Installs the named fixture(s) in the database.'
missing_args_message = ( "No database fixture specified. Please provide the " "path of at least one fixture in the command line." )
self . ignore = options . get ( 'ignore' )
self . app_label = options . get ( 'app_label' )
self . hide_empty = options . get ( 'hide_empty' , False )
with transaction . atomic ( using = self . using ) :
self . loaddata ( fixture_labels )
if transaction . get_autocommit ( self . using ) :
self . serialization_formats = serializers . get_public_serializer_formats ( )
self . compression_formats = { None : ( open , 'rb' ) , 'gz' : ( gzip . GzipFile , 'rb' ) , 'zip' : ( SingleZipReader , 'r' ) , }
if has_bz2 :
self . compression_formats [ 'bz2' ] = ( bz2 . BZ2File , 'r' )
for fixture_label in fixture_labels :
self . load_label ( fixture_label )
table_names = [ model . _meta . db_table for model in self . models ]
except Exception as e :
e . args = ( "Problem installing fixtures: %s" % e , )
sequence_sql = connection . ops . sequence_reset_sql ( no_style ( ) , self . models )
if sequence_sql :
self . stdout . write ( "Resetting sequences\n" )
for line in sequence_sql :
cursor . execute ( line )
else :
for fixture_file , fixture_dir , fixture_name in self . find_fixtures ( fixture_label ) :
open_method , mode = self . compression_formats [ cmp_fmt ]
fixture = open_method ( fixture_file , mode )
objects_in_fixture = 0
loaded_objects_in_fixture = 0
self . stdout . write ( "Installing %s fixture '%s' from %s." % ( ser_fmt , fixture_name , humanize ( fixture_dir ) ) )
objects = serializers . deserialize ( ser_fmt , fixture , using = self . using , ignorenonexistent = self . ignore )
for obj in objects :
objects_in_fixture += 1
if router . allow_migrate ( self . using , obj . object . __class__ ) :
loaded_objects_in_fixture += 1
self . models . add ( obj . object . __class__ )
obj . save ( using = self . using )
except ( DatabaseError , IntegrityError ) as e :
e . args = ( "Could not load %(app_label)s.%(object_name)s(pk=%(pk)s): %(error_msg)s" % { 'app_label' : obj . object . _meta . app_label , 'object_name' : obj . object . _meta . object_name , 'pk' : obj . object . pk , 'error_msg' : force_text ( e ) } , )
self . loaded_object_count += loaded_objects_in_fixture
self . fixture_object_count += objects_in_fixture
except Exception as e :
e . args = ( "Problem installing fixture '%s': %s" % ( fixture_file , e ) , )
finally :
fixture . close ( )
if objects_in_fixture == 0 :
warnings . warn ( "No fixture data found for '%s'. (File format may be " "invalid.)" % fixture_name , RuntimeWarning )
@ lru_cache . lru_cache ( maxsize = None )
fixture_name , ser_fmt , cmp_fmt = self . parse_name ( fixture_label )
databases = [ self . using , None ]
cmp_fmts = list ( self . compression_formats . keys ( ) ) if cmp_fmt is None else [ cmp_fmt ]
ser_fmts = serializers . get_public_serializer_formats ( ) if ser_fmt is None else [ ser_fmt ]
self . stdout . write ( "Loading '%s' fixtures..." % fixture_name )
else :
fixture_dirs = self . fixture_dirs
suffixes = ( '.' . join ( ext for ext in combo if ext ) for combo in product ( databases , ser_fmts , cmp_fmts ) )
targets = set ( '.' . join ( ( fixture_name , suffix ) ) for suffix in suffixes )
fixture_files = [ ]
for fixture_dir in fixture_dirs :
self . stdout . write ( "Checking %s for fixtures..." % humanize ( fixture_dir ) )
fixture_files_in_dir = [ ]
self . stdout . write ( "No fixture '%s' in %s." % ( fixture_name , humanize ( fixture_dir ) ) )
fixture_files . extend ( fixture_files_in_dir )
if fixture_name != 'initial_data' and not fixture_files :
warnings . warn ( "No fixture named '%s' found." % fixture_name )
@ cached_property
dirs = [ ]
parts = fixture_name . rsplit ( '.' , 2 )
cmp_fmt = parts [ - 1 ]
parts = parts [ : - 1 ]
else :
cmp_fmt = None
if parts [ - 1 ] in self . serialization_formats :
ser_fmt = parts [ - 1 ]
parts = parts [ : - 1 ]
else :
raise CommandError ( "Problem installing fixture '%s': %s is not a known " "serialization format." % ( '' . join ( parts [ : - 1 ] ) , parts [ - 1 ] ) )
else :
ser_fmt = None
name = '.' . join ( parts )
return name , ser_fmt , cmp_fmt
plural_forms_re = re . compile ( r'^(?P"Plural-Forms.+?\\n")\s*$' , re . MULTILINE | re . DOTALL )
STATUS_OK = 0
for program in programs :
if find_command ( program ) is None :
raise CommandError ( "Can't find %s. Make sure you have GNU " "gettext tools 0.15 or newer installed." % program )
@ total_ordering
self . dirpath = dirpath
self . locale_dir = locale_dir
@ property
if command . verbosity > 1 :
if domain == 'djangojs' and file_ext in command . extensions :
src_data = fp . read ( )
src_data = prepare_js_for_gettext ( src_data )
with io . open ( work_file , "w" , encoding = 'utf-8' ) as fp :
fp . write ( src_data )
args = [ 'xgettext' , '-d' , domain , '--language=C' , '--keyword=gettext_noop' , '--keyword=gettext_lazy' , '--keyword=ngettext_lazy:1,2' , '--keyword=pgettext:1c,2' , '--keyword=npgettext:1c,2,3' , '--output=-' ] + command . xgettext_options
elif domain == 'django' and ( file_ext == '.py' or file_ext in command . extensions ) :
is_templatized = file_ext in command . extensions
if is_templatized :
src_data = fp . read ( )
thefile = '%s.py' % self . file
content = templatize ( src_data , orig_file [ 2 : ] )
fp . write ( content )
args = [ 'xgettext' , '-d' , domain , '--language=Python' , '--keyword=gettext_noop' , '--keyword=gettext_lazy' , '--keyword=ngettext_lazy:1,2' , '--keyword=ugettext_noop' , '--keyword=ugettext_lazy' , '--keyword=ungettext_lazy:1,2' , '--keyword=pgettext:1c,2' , '--keyword=npgettext:1c,2,3' , '--keyword=pgettext_lazy:1c,2' , '--keyword=npgettext_lazy:1c,2,3' , '--output=-' ] + command . xgettext_options
else :
msgs , errors , status = popen_wrapper ( args )
if errors :
if status != STATUS_OK :
if is_templatized :
command . stdout . write ( errors )
if msgs :
if six . PY2 :
msgs = msgs . decode ( 'utf-8' )
if is_templatized :
old = '#: ' + work_file
new = '#: ' + orig_file
else :
old = '#: ' + work_file [ 2 : ]
new = '#: ' + orig_file [ 2 : ]
msgs = msgs . replace ( old , new )
write_pot_file ( potfile , msgs )
if is_templatized :
else :
msgs = msgs . replace ( 'charset=CHARSET' , 'charset=UTF-8' )
with io . open ( potfile , 'a' , encoding = 'utf-8' ) as fp :
fp . write ( msgs )
help = ( "Runs over the entire source tree of the current directory and " "pulls out all strings marked for translation. It creates (or updates) a message " "file in the conf/locale (in the django tree) or locale (for projects and " "applications) directory.\n\nYou must run this command with one of either the " "--locale, --exclude or --all options." )
msgmerge_options = [ '-q' , '--previous' ]
msguniq_options = [ '--to-code=utf-8' ]
msgattrib_options = [ '--no-obsolete' ]
xgettext_options = [ '--from-code=UTF-8' , '--add-comments=Translators' ]
locale = options . get ( 'locale' )
exclude = options . get ( 'exclude' )
self . domain = options . get ( 'domain' )
process_all = options . get ( 'all' )
extensions = options . get ( 'extensions' )
self . symlinks = options . get ( 'symlinks' )
ignore_patterns = options . get ( 'ignore_patterns' )
if options . get ( 'use_default_ignore_patterns' ) :
ignore_patterns += [ 'CVS' , '.*' , '*~' , '*.pyc' ]
self . ignore_patterns = list ( set ( ignore_patterns ) )
if options . get ( 'no_wrap' ) :
if options . get ( 'no_location' ) :
self . no_obsolete = options . get ( 'no_obsolete' )
self . keep_pot = options . get ( 'keep_pot' )
if self . domain not in ( 'django' , 'djangojs' ) :
exts = extensions if extensions else [ 'js' ]
else :
exts = extensions if extensions else [ 'html' , 'txt' ]
self . extensions = handle_extensions ( exts )
if ( locale is None and not exclude and not process_all ) or self . domain is None :
else :
else :
if process_all :
locales = all_locales
else :
locales = locale or all_locales
locales = set ( locales ) - set ( exclude )
if locales :
check_programs ( 'msguniq' , 'msgmerge' , 'msgattrib' )
check_programs ( 'xgettext' )
potfiles = self . build_potfiles ( )
for locale in locales :
self . stdout . write ( "processing locale %s\n" % locale )
for potfile in potfiles :
self . write_po_file ( potfile , locale )
finally :
file_list = self . find_files ( "." )
for f in file_list :
except UnicodeDecodeError :
self . stdout . write ( "UnicodeDecodeError: skipped file %s in %s" % ( f . file , f . dirpath ) )
potfiles = [ ]
args = [ 'msguniq' ] + self . msguniq_options + [ potfile ]
msgs , errors , status = popen_wrapper ( args )
if six . PY2 :
msgs = msgs . decode ( 'utf-8' )
if errors :
if status != STATUS_OK :
with io . open ( potfile , 'w' , encoding = 'utf-8' ) as fp :
fp . write ( msgs )
ignore = lambda pattern : fnmatch . fnmatchcase ( filename , pattern )
return any ( ignore ( pattern ) for pattern in ignore_patterns )
norm_patterns = [ p [ : - len ( dir_suffix ) ] if p . endswith ( dir_suffix ) else p for p in self . ignore_patterns ]
all_files = [ ]
for dirpath , dirnames , filenames in os . walk ( root , topdown = True , followlinks = self . symlinks ) :
for dirname in dirnames [ : ] :
dirnames . remove ( dirname )
self . stdout . write ( 'ignoring directory %s\n' % dirname )
dirnames . remove ( dirname )
for filename in filenames :
if is_ignored ( file_path , self . ignore_patterns ) :
self . stdout . write ( 'ignoring file %s in %s\n' % ( filename , dirpath ) )
else :
locale_dir = None
if not locale_dir :
locale_dir = self . default_locale_path
if not locale_dir :
return sorted ( all_files )
args = [ 'msgmerge' ] + self . msgmerge_options + [ pofile , potfile ]
msgs , errors , status = popen_wrapper ( args )
if six . PY2 :
msgs = msgs . decode ( 'utf-8' )
if errors :
if status != STATUS_OK :
else :
with io . open ( potfile , 'r' , encoding = 'utf-8' ) as fp :
msgs = fp . read ( )
msgs = self . copy_plural_forms ( msgs , locale )
msgs = msgs . replace ( "#. #-#-#-#-# %s.pot (PACKAGE VERSION) #-#-#-#-#\n" % self . domain , "" )
with io . open ( pofile , 'w' , encoding = 'utf-8' ) as fp :
fp . write ( msgs )
args = [ 'msgattrib' ] + self . msgattrib_options + [ '-o' , pofile , pofile ]
msgs , errors , status = popen_wrapper ( args )
if errors :
if status != STATUS_OK :
domains = ( 'djangojs' , 'django' )
else :
domains = ( 'django' , )
for domain in domains :
with io . open ( django_po , 'r' , encoding = 'utf-8' ) as fp :
m = plural_forms_re . search ( fp . read ( ) )
if m :
self . stdout . write ( "copying plural forms: %s\n" % plural_form_line )
lines = [ ]
for line in msgs . split ( '\n' ) :
if not found and ( not line or plural_forms_re . search ( line ) ) :
line = '%s\n' % plural_form_line
msgs = '\n' . join ( lines )
help = "Creates new migration(s) for apps."
self . interactive = options . get ( 'interactive' )
self . dry_run = options . get ( 'dry_run' , False )
self . merge = options . get ( 'merge' , False )
app_labels = set ( app_labels )
bad_app_labels = set ( )
for app_label in app_labels :
apps . get_app_config ( app_label )
except LookupError :
bad_app_labels . add ( app_label )
if bad_app_labels :
for app_label in bad_app_labels :
self . stderr . write ( "App '%s' could not be found. Is it in INSTALLED_APPS?" % app_label )
sys . exit ( 2 )
conflicts = loader . detect_conflicts ( )
name_str = "; " . join ( "%s in %s" % ( ", " . join ( names ) , app ) for app , names in conflicts . items ( ) )
raise CommandError ( "Conflicting migrations detected (%s).\nTo fix them run 'python manage.py makemigrations --merge'" % name_str )
self . stdout . write ( "No conflicts detected to merge." )
return self . handle_merge ( loader , conflicts )
autodetector = MigrationAutodetector ( loader . project_state ( ) , ProjectState . from_apps ( apps ) , InteractiveMigrationQuestioner ( specified_apps = app_labels , dry_run = self . dry_run ) , )
if not app_labels :
changes = dict ( ( app , [ Migration ( "custom" , app ) ] ) for app in app_labels )
changes = autodetector . arrange_for_graph ( changes , loader . graph )
self . write_migration_files ( changes )
changes = autodetector . changes ( graph = loader . graph , trim_to_apps = app_labels or None , convert_apps = app_labels or None , )
self . stdout . write ( "No changes detected in app '%s'" % app_labels . pop ( ) )
self . stdout . write ( "No changes detected in apps '%s'" % ( "', '" . join ( app_labels ) ) )
else :
self . stdout . write ( "No changes detected" )
self . write_migration_files ( changes )
directory_created = { }
for app_label , app_migrations in changes . items ( ) :
for migration in app_migrations :
writer = MigrationWriter ( migration )
for operation in migration . operations :
open ( init_path , "w" ) . close ( )
migration_string = writer . as_string ( )
fh . write ( migration_string )
self . stdout . write ( "%s\n" % writer . as_string ( ) )
questioner = InteractiveMigrationQuestioner ( )
else :
questioner = MigrationQuestioner ( defaults = { 'ask_merge' : True } )
for app_label , migration_names in conflicts . items ( ) :
merge_migrations = [ ]
for migration_name in migration_names :
migration = loader . get_migration ( app_label , migration_name )
migration . ancestry = loader . graph . forwards_plan ( ( app_label , migration_name ) )
common_ancestor = None
for level in zip ( * [ m . ancestry for m in merge_migrations ] ) :
if reduce ( operator . eq , level ) :
common_ancestor = level [ 0 ]
else :
if common_ancestor is None :
for migration in merge_migrations :
migration . branch = migration . ancestry [ ( migration . ancestry . index ( common_ancestor ) + 1 ) : ]
migration . merged_operations = [ ]
for node_app , node_name in migration . branch :
migration . merged_operations . extend ( loader . get_migration ( node_app , node_name ) . operations )
for migration in merge_migrations :
for operation in migration . merged_operations :
if questioner . ask_merge ( app_label ) :
numbers = [ MigrationAutodetector . parse_number ( migration . name ) for migration in merge_migrations ]
biggest_number = max ( [ x for x in numbers if x is not None ] )
biggest_number = 1
subclass = type ( "Migration" , ( Migration , ) , { "dependencies" : [ ( app_label , migration . name ) for migration in merge_migrations ] , } )
new_migration = subclass ( "%04i_merge" % ( biggest_number + 1 ) , app_label )
writer = MigrationWriter ( new_migration )
fh . write ( writer . as_string ( ) )
self . stdout . write ( "\nCreated new merge migration %s" % writer . path )
help = "Updates database schema. Manages both apps with migrations and those without."
self . interactive = options . get ( 'interactive' )
self . show_traceback = options . get ( 'traceback' )
self . load_initial_data = options . get ( 'load_initial_data' )
self . test_database = options . get ( 'test_database' , False )
db = options . get ( 'database' )
return self . show_migration_list ( connection , [ options [ 'app_label' ] ] if options [ 'app_label' ] else None )
executor = MigrationExecutor ( connection , self . migration_progress_callback )
conflicts = executor . loader . detect_conflicts ( )
if conflicts :
name_str = "; " . join ( "%s in %s" % ( ", " . join ( names ) , app ) for app , names in conflicts . items ( ) )
raise CommandError ( "Conflicting migrations detected (%s).\nTo fix them run 'python manage.py makemigrations --merge'" % name_str )
if options [ 'app_label' ] and options [ 'migration_name' ] :
app_label , migration_name = options [ 'app_label' ] , options [ 'migration_name' ]
if app_label not in executor . loader . migrated_apps :
if migration_name == "zero" :
targets = [ ( app_label , None ) ]
else :
migration = executor . loader . get_migration_by_prefix ( app_label , migration_name )
except AmbiguityError :
except KeyError :
targets = [ ( app_label , migration . name ) ]
app_label = options [ 'app_label' ]
if app_label not in executor . loader . migrated_apps :
else :
targets = executor . loader . graph . leaf_nodes ( )
plan = executor . migration_plan ( targets )
if run_syncdb and executor . loader . unmigrated_apps :
self . stdout . write ( self . style . MIGRATE_LABEL ( " Synchronize unmigrated apps: " ) + ( ", " . join ( executor . loader . unmigrated_apps ) ) )
if target_app_labels_only :
else :
if targets [ 0 ] [ 1 ] is None :
else :
self . stdout . write ( self . style . MIGRATE_LABEL ( " Target specific migration: " ) + "%s, from %s" % ( targets [ 0 ] [ 1 ] , targets [ 0 ] [ 0 ] ) )
if run_syncdb and executor . loader . unmigrated_apps :
created_models = self . sync_apps ( connection , executor . loader . unmigrated_apps )
else :
created_models = [ ]
if not plan :
self . stdout . write ( " No migrations to apply." )
autodetector = MigrationAutodetector ( executor . loader . project_state ( ) , ProjectState . from_apps ( apps ) , )
changes = autodetector . changes ( graph = executor . loader . graph )
if changes :
self . stdout . write ( self . style . NOTICE ( " Your models have changes that are not yet reflected in a migration, and so won't be applied." ) )
self . stdout . write ( self . style . NOTICE ( " Run 'manage.py makemigrations' to make new migrations, and then re-run 'manage.py migrate' to apply them." ) )
else :
executor . migrate ( targets , plan , fake = options . get ( "fake" , False ) )
if action == "apply_start" :
self . stdout . write ( " Applying %s..." % migration , ending = "" )
if fake :
else :
self . stdout . write ( " Unapplying %s..." % migration , ending = "" )
if fake :
else :
created_models = set ( )
pending_references = { }
opts = model . _meta
return not ( ( converter ( opts . db_table ) in tables ) or ( opts . auto_created and converter ( opts . auto_created . _meta . db_table ) in tables ) )
manifest = OrderedDict ( ( app_name , list ( filter ( model_installed , model_list ) ) ) for app_name , model_list in all_models )
create_models = set ( itertools . chain ( * manifest . values ( ) ) )
self . stdout . write ( " Creating tables...\n" )
for app_name , model_list in manifest . items ( ) :
for model in model_list :
self . stdout . write ( " Processing %s.%s model\n" % ( app_name , model . _meta . object_name ) )
seen_models . add ( model )
created_models . add ( model )
for refto , refs in references . items ( ) :
pending_references . setdefault ( refto , [ ] ) . extend ( refs )
if refto in seen_models :
self . stdout . write ( " Creating table %s\n" % model . _meta . db_table )
for statement in sql :
cursor . execute ( statement )
finally :
cursor . close ( )
self . stdout . write ( " Installing custom SQL...\n" )
for app_name , model_list in manifest . items ( ) :
for model in model_list :
if model in created_models :
if custom_sql :
self . stdout . write ( " Installing custom SQL for %s.%s model\n" % ( app_name , model . _meta . object_name ) )
for sql in custom_sql :
cursor . execute ( sql )
except Exception as e :
self . stderr . write ( " Failed to install custom SQL for %s.%s model: %s\n" % ( app_name , model . _meta . object_name , e ) )
traceback . print_exc ( )
else :
self . stdout . write ( " No custom SQL for %s.%s model\n" % ( app_name , model . _meta . object_name ) )
self . stdout . write ( " Installing indexes...\n" )
for app_name , model_list in manifest . items ( ) :
for model in model_list :
if model in created_models :
if index_sql :
self . stdout . write ( " Installing index for %s.%s model\n" % ( app_name , model . _meta . object_name ) )
for sql in index_sql :
cursor . execute ( sql )
except Exception as e :
self . stderr . write ( " Failed to install index for %s.%s model: %s\n" % ( app_name , model . _meta . object_name , e ) )
finally :
cursor . close ( )
for app_label in app_labels :
graph = loader . graph
if app_names :
invalid_apps = [ ]
for app_name in app_names :
if app_name not in loader . migrated_apps :
if invalid_apps :
else :
app_names = sorted ( loader . migrated_apps )
for app_name in app_names :
shown = set ( )
for node in graph . leaf_nodes ( app_name ) :
for plan_node in graph . forwards_plan ( node ) :
if plan_node not in shown and plan_node [ 0 ] == app_name :
title = plan_node [ 1 ]
if graph . nodes [ plan_node ] . replaces :
if plan_node in loader . applied_migrations :
else :
shown . add ( plan_node )
if not shown :
help = "Runs this project as a FastCGI application. Requires flup."
warnings . warn ( "FastCGI support has been deprecated and will be removed in Django 1.9." , RemovedInDjango19Warning )
except AttributeError :
naiveip_re = re . compile ( r"""^(?: (?P (?P\d{1,3}(?:\.\d{1,3}){3}) | # IPv4 address (?P\[[a-fA-F0-9:]+\]) | # IPv6 address (?P[a-zA-Z0-9-]+(?:\.[a-zA-Z0-9-]+)*) # FQDN ):)?(?P\d+)$""" , re . X )
DEFAULT_PORT = "8000"
help = "Starts a lightweight Web server for development."
self . use_ipv6 = options . get ( 'use_ipv6' )
if self . use_ipv6 and not socket . has_ipv6 :
if not options . get ( 'addrport' ) :
self . port = DEFAULT_PORT
else :
m = re . match ( naiveip_re , options [ 'addrport' ] )
if m is None :
if not self . port . isdigit ( ) :
if _ipv6 :
use_reloader = options . get ( 'use_reloader' )
if use_reloader :
autoreload . main ( self . inner_run , None , options )
else :
threading = options . get ( 'use_threading' )
shutdown_message = options . get ( 'shutdown_message' , '' )
quit_command = 'CTRL-BREAK' if sys . platform == 'win32' else 'CONTROL-C'
self . stdout . write ( "Performing system checks...\n\n" )
except ImproperlyConfigured :
now = datetime . now ( ) . strftime ( '%B %d, %Y - %X' )
if six . PY2 :
now = now . decode ( 'utf-8' )
except socket . error as e :
ERRORS = { errno . EACCES : "You don't have permission to access that port." , errno . EADDRINUSE : "That port is already in use." , errno . EADDRNOTAVAIL : "That IP address can't be assigned-to." , }
error_text = ERRORS [ e . errno ]
except KeyError :
error_text = str ( e )
self . stderr . write ( "Error: %s" % error_text )
except KeyboardInterrupt :
if shutdown_message :
sys . exit ( 0 )
executor = MigrationExecutor ( connections [ DEFAULT_DB_ALIAS ] )
plan = executor . migration_plan ( executor . loader . graph . leaf_nodes ( ) )
if plan :
self . stdout . write ( self . style . NOTICE ( "\nYou have unapplied migrations; your app may not work properly until they are applied." ) )
self . stdout . write ( self . style . NOTICE ( "Run 'python manage.py migrate' to apply them.\n" ) )
BaseRunserverCommand = Command
help = "Runs a Python interactive interpreter. Tries to use IPython or bpython, if one of them is available."
shells = [ 'ipython' , 'bpython' ]
parser . add_argument ( '-i' , '--interface' , choices = self . shells , dest = 'interface' , help = 'Specify an interactive interpreter interface. Available options: "ipython" and "bpython"' )
from IPython . Shell import IPShell
shell = IPShell ( argv = [ ] )
shell . mainloop ( )
from IPython . frontend . terminal . ipapp import TerminalIPythonApp
app = TerminalIPythonApp . instance ( )
app . initialize ( argv = [ ] )
app . start ( )
from IPython import start_ipython
start_ipython ( argv = [ ] )
ip ( )
else :
bpython . embed ( )
available_shells = [ shell ] if shell else self . shells
for shell in available_shells :
if options [ 'plain' ] :
self . run_shell ( shell = options [ 'interface' ] )
imported_objects = { }
else :
readline . set_completer ( rlcompleter . Completer ( imported_objects ) . complete )
readline . parse_and_bind ( "tab:complete" )
if not options [ 'no_startup' ] :
for pythonrc in ( os . environ . get ( "PYTHONSTARTUP" ) , '~/.pythonrc.py' ) :
if not pythonrc :
with open ( pythonrc ) as handle :
exec ( compile ( handle . read ( ) , pythonrc , 'exec' ) , imported_objects )
except NameError :
code . interact ( local = imported_objects )
help = "Prints the CREATE TABLE SQL statements for the given app name(s)."
help = "Prints the CREATE TABLE, custom SQL and CREATE INDEX SQL statements for the given model module name(s)."
help = "Prints the DROP TABLE SQL statements for the given app name(s)."
help = "Prints the custom table modifying SQL statements for the given app name(s)."
help = "Prints the DROP INDEX SQL statements for the given model module name(s)."
help = "Returns a list of the SQL statements required to return all tables in the database to the state they were in just after they were installed."
return '\n' . join ( sql_flush ( self . style , connections [ options [ 'database' ] ] , only_django = True ) )
help = "Prints the CREATE INDEX SQL statements for the given model module name(s)."
help = "Prints the SQL statements for the named migration."
app_label , migration_name = options [ 'app_label' ] , options [ 'migration_name' ]
if app_label not in executor . loader . migrated_apps :
migration = executor . loader . get_migration_by_prefix ( app_label , migration_name )
except AmbiguityError :
except KeyError :
targets = [ ( app_label , migration . name ) ]
plan = [ ( executor . loader . graph . nodes [ targets [ 0 ] ] , options [ 'backwards' ] ) ]
sql_statements = executor . collect_sql ( plan )
for statement in sql_statements :
help = 'Prints the SQL statements for resetting sequences for the given app name(s).'
help = "Squashes an existing set of migrations (from first until specified) into a single new one."
self . interactive = options . get ( 'interactive' )
app_label , migration_name = options [ 'app_label' ] , options [ 'migration_name' ]
executor = MigrationExecutor ( connections [ DEFAULT_DB_ALIAS ] )
if app_label not in executor . loader . migrated_apps :
migration = executor . loader . get_migration_by_prefix ( app_label , migration_name )
except AmbiguityError :
except KeyError :
migrations_to_squash = [ executor . loader . get_migration ( al , mn ) for al , mn in executor . loader . graph . forwards_plan ( ( migration . app_label , migration . name ) ) if al == migration . app_label ]
for migration in migrations_to_squash :
answer = None
while not answer or answer not in "yn" :
answer = six . moves . input ( "Do you wish to proceed? [yN] " )
if not answer :
answer = "n"
else :
answer = answer [ 0 ] . lower ( )
if answer != "y" :
operations = [ ]
for smigration in migrations_to_squash :
operations . extend ( smigration . operations )
optimizer = MigrationOptimizer ( )
new_operations = optimizer . optimize ( operations , migration . app_label )
self . stdout . write ( " No optimizations possible." )
else :
replaces = [ ]
for migration in migrations_to_squash :
if migration . replaces :
replaces . extend ( migration . replaces )
else :
subclass = type ( "Migration" , ( migrations . Migration , ) , { "dependencies" : [ ] , "operations" : new_operations , "replaces" : replaces , } )
new_migration = subclass ( "0001_squashed_%s" % migration . name , app_label )
writer = MigrationWriter ( new_migration )
fh . write ( writer . as_string ( ) )
self . stdout . write ( " You should commit this migration but leave the old ones in place;" )
self . stdout . write ( " the new migration will be used for new installs. Once you are sure" )
self . stdout . write ( " all instances of the codebase have applied the migrations you squashed," )
help = ( "Creates a Django app directory structure for the given app " "name in the current directory or optionally in the given " "directory." )
missing_args_message = "You must provide an application name."
app_name , target = options . pop ( 'name' ) , options . pop ( 'directory' )
self . validate_name ( app_name , "app" )
import_module ( app_name )
else :
super ( Command , self ) . handle ( 'app' , app_name , target , ** options )
help = ( "Creates a Django project directory structure for the given " "project name in the current directory or optionally in the " "given directory." )
missing_args_message = "You must provide a project name."
project_name , target = options . pop ( 'name' ) , options . pop ( 'directory' )
self . validate_name ( project_name , "project" )
import_module ( project_name )
else :
chars = 'abcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*(-_=+)'
options [ 'secret_key' ] = get_random_string ( 50 , chars )
super ( Command , self ) . handle ( 'project' , project_name , target , ** options )
help = "Deprecated - use 'migrate' instead."
warnings . warn ( "The syncdb command will be removed in Django 1.9" , RemovedInDjango19Warning )
call_command ( "migrate" , ** options )
apps . get_model ( 'auth' , 'Permission' )
except LookupError :
UserModel = get_user_model ( )
if not UserModel . _default_manager . exists ( ) and options . get ( 'interactive' ) :
msg = ( "\nYou have installed Django's auth system, and " "don't have any superusers defined.\nWould you like to create one " "now? (yes/no): " )
confirm = input ( msg )
while 1 :
if confirm not in ( 'yes' , 'no' ) :
confirm = input ( 'Please enter either "yes" or "no": ' )
if confirm == 'yes' :
call_command ( "createsuperuser" , interactive = True , database = options [ 'database' ] )
help = 'Discover and run tests in the specified modules or the current directory.'
option = '--testrunner='
for arg in argv [ 2 : ] :
if arg . startswith ( option ) :
super ( Command , self ) . run_from_argv ( argv )
parser . add_argument ( 'args' , metavar = 'test_label' , nargs = '*' , help = 'Module paths to test; can be modulename, modulename.TestCase or modulename.TestCase.test_method' )
if hasattr ( test_runner_class , 'option_list' ) :
raise RuntimeError ( "The method to extend accepted command-line arguments by the " "test management command has changed in Django 1.8. Please " "create an add_arguments class method to achieve this." )
if hasattr ( test_runner_class , 'add_arguments' ) :
if options [ 'verbosity' ] > 0 :
logger = logging . getLogger ( 'py.warnings' )
handler = logging . StreamHandler ( )
logger . addHandler ( handler )
if options [ 'verbosity' ] > 0 :
logger . removeHandler ( handler )
TestRunner = get_runner ( settings , options . get ( 'testrunner' ) )
if options . get ( 'liveserver' ) is not None :
os . environ [ 'DJANGO_LIVE_TEST_SERVER_ADDRESS' ] = options [ 'liveserver' ]
del options [ 'liveserver' ]
test_runner = TestRunner ( ** options )
failures = test_runner . run_tests ( test_labels )
if failures :
sys . exit ( bool ( failures ) )
help = 'Runs a development server with data from the given fixture(s).'
verbosity = options . get ( 'verbosity' )
interactive = options . get ( 'interactive' )
call_command ( 'loaddata' , * fixture_labels , ** { 'verbosity' : verbosity } )
shutdown_message = '\nServer stopped.\nNote that the test database, %r, has not been deleted. You can explore it on your own.' % db_name
call_command ( 'runserver' , addrport = options [ 'addrport' ] , shutdown_message = shutdown_message , use_reloader = False , use_ipv6 = options [ 'use_ipv6' ] , use_threading = use_threading )
help = 'Deprecated. Use "check" command instead. ' + CheckCommand . help
warnings . warn ( '"validate" has been deprecated in favor of "check".' , RemovedInDjango19Warning )
raise CommandError ( "App '%s' has migrations. Only the sqlmigrate and sqlflush commands can be used when an app has migrations." % app_config . label )
if connection . settings_dict [ 'ENGINE' ] == 'django.db.backends.dummy' :
raise CommandError ( "Django doesn't know which syntax to use for your SQL statements,\n" + "because you haven't properly specified the ENGINE setting for the database.\n" + "see: https://docs.djangoproject.com/en/dev/ref/settings/#databases" )
final_output = [ ]
pending_references = { }
final_output . extend ( output )
for refto , refs in references . items ( ) :
pending_references . setdefault ( refto , [ ] ) . extend ( refs )
if refto in known_models :
known_models . add ( model )
not_installed_models = set ( pending_references . keys ( ) )
if not_installed_models :
alter_sql = [ ]
for model in not_installed_models :
if alter_sql :
final_output . append ( '-- The following references should be added but depend on non-existent tables:' )
final_output . extend ( alter_sql )
except Exception :
cursor = None
if cursor :
else :
table_names = [ ]
output = [ ]
to_delete = set ( )
references_to_delete = { }
for model in app_models :
opts = model . _meta
for f in opts . local_fields :
if f . rel and f . rel . to not in to_delete :
to_delete . add ( model )
for model in app_models :
finally :
if cursor and close_connection :
cursor . close ( )
if only_django :
else :
output = [ ]
for model in app_models :
output = [ ]
output = [ ]
comment_re = re . compile ( r"^((?:'[^']*'|[^'])*?)--.*$" )
statements = [ ]
statement = [ ]
for line in content . split ( "\n" ) :
cleaned_line = comment_re . sub ( r"\1" , line ) . strip ( )
if not cleaned_line :
if cleaned_line . endswith ( ";" ) :
statement = [ ]
opts = model . _meta
app_dirs = [ ]
app_dir = apps . get_app_config ( model . _meta . app_label ) . path
warnings . warn ( "Custom SQL location '/models/sql' is " "deprecated, use '/sql' instead." , RemovedInDjango19Warning )
output = [ ]
if opts . managed :
post_sql_fields = [ f for f in opts . local_fields if hasattr ( f , 'post_create_sql' ) ]
for f in post_sql_fields :
output . extend ( f . post_create_sql ( style , model . _meta . db_table ) )
backend_name = connection . settings_dict [ 'ENGINE' ] . split ( '.' ) [ - 1 ]
sql_files = [ ]
for app_dir in app_dirs :
for sql_file in sql_files :
output . extend ( connection . ops . prepare_sql_script ( fp . read ( ) , _allow_fallback = True ) )
if verbosity >= 2 :
print ( "Running pre-migrate handlers for application %s" % app_config . label )
if verbosity >= 2 :
print ( "Running post-migrate handlers for application %s" % app_config . label )
_drive_re = re . compile ( '^([a-z]):' , re . I )
_url_drive_re = re . compile ( '^([a-z])[:|]' , re . I )
url_schemes = [ 'http' , 'https' , 'ftp' ]
self . app_or_project = app_or_project
self . validate_name ( name , app_or_project )
if target is None :
except OSError as e :
if e . errno == errno . EEXIST :
else :
else :
extensions = tuple ( handle_extensions ( options [ 'extensions' ] , ignored = ( ) ) )
extra_files = [ ]
for file in options [ 'files' ] :
extra_files . extend ( map ( lambda x : x . strip ( ) , file . split ( ',' ) ) )
self . stdout . write ( "Rendering %s template files with " "extensions: %s\n" % ( app_or_project , ', ' . join ( extensions ) ) )
self . stdout . write ( "Rendering %s template files with " "filenames: %s\n" % ( app_or_project , ', ' . join ( extra_files ) ) )
base_name = '%s_name' % app_or_project
base_subdir = '%s_template' % app_or_project
base_directory = '%s_directory' % app_or_project
if django . VERSION [ - 2 ] != 'final' :
docs_version = 'dev'
else :
docs_version = '%d.%d' % django . VERSION [ : 2 ]
context = Context ( dict ( options , ** { base_name : name , base_directory : top_dir , 'docs_version' : docs_version , } ) , autoescape = False )
template_dir = self . handle_template ( options [ 'template' ] , base_subdir )
for root , dirs , files in os . walk ( template_dir ) :
path_rest = root [ prefix_length : ]
relative_dir = path_rest . replace ( base_name , name )
if relative_dir :
for dirname in dirs [ : ] :
if dirname . startswith ( '.' ) or dirname == '__pycache__' :
dirs . remove ( dirname )
for filename in files :
new_path = path . join ( top_dir , relative_dir , filename . replace ( base_name , name ) )
raise CommandError ( "%s already exists, overlaying a " "project or app into an existing " "directory won't replace conflicting " "files" % new_path )
with open ( old_path , 'rb' ) as template_file :
content = template_file . read ( )
if filename . endswith ( extensions ) or filename in extra_files :
content = content . decode ( 'utf-8' )
template = Template ( content )
content = template . render ( context )
content = content . encode ( 'utf-8' )
with open ( new_path , 'wb' ) as new_file :
new_file . write ( content )
self . stdout . write ( "Creating %s\n" % new_path )
shutil . copymode ( old_path , new_path )
self . make_writeable ( new_path )
except OSError :
self . stderr . write ( "Notice: Couldn't set permission bits on %s. You're " "probably using an uncommon filesystem setup. No " "problem." % new_path , self . style . NOTICE )
self . stdout . write ( "Cleaning up temporary files.\n" )
for path_to_remove in self . paths_to_remove :
else :
shutil . rmtree ( path_to_remove , onerror = rmtree_errorhandler )
if template is None :
else :
if template . startswith ( 'file://' ) :
template = template [ 7 : ]
expanded_template = path . normpath ( expanded_template )
if self . is_url ( template ) :
absolute_path = self . download ( template )
else :
return self . extract ( absolute_path )
if name is None :
if not re . search ( r'^[_a-zA-Z]\w*$' , name ) :
if not re . search ( r'^[_a-zA-Z]' , name ) :
message = 'make sure the name begins with a letter or underscore'
else :
message = 'use only numbers, letters and underscores'
tmp = url . rstrip ( '/' )
filename = tmp . split ( '/' ) [ - 1 ]
if url . endswith ( '/' ) :
display_url = tmp + '/'
else :
display_url = url
prefix = 'django_%s_template_' % self . app_or_project
tempdir = tempfile . mkdtemp ( prefix = prefix , suffix = '_download' )
filename , display_url = cleanup_url ( url )
self . stdout . write ( "Downloading %s\n" % display_url )
the_path , info = urlretrieve ( url , path . join ( tempdir , filename ) )
except IOError as e :
used_name = the_path . split ( '/' ) [ - 1 ]
content_disposition = info . get ( 'content-disposition' )
if content_disposition :
_ , params = cgi . parse_header ( content_disposition )
guessed_filename = params . get ( 'filename' ) or used_name
else :
guessed_filename = used_name
ext = self . splitext ( guessed_filename ) [ 1 ]
content_type = info . get ( 'content-type' )
if not ext and content_type :
ext = mimetypes . guess_extension ( content_type )
if ext :
guessed_filename += ext
if used_name != guessed_filename :
shutil . move ( the_path , guessed_path )
base , ext = posixpath . splitext ( the_path )
if base . lower ( ) . endswith ( '.tar' ) :
ext = base [ - 4 : ] + ext
base = base [ : - 4 ]
prefix = 'django_%s_template_' % self . app_or_project
tempdir = tempfile . mkdtemp ( prefix = prefix , suffix = '_extract' )
self . stdout . write ( "Extracting %s\n" % filename )
archive . extract ( filename , tempdir )
except ( archive . ArchiveException , IOError ) as e :
if ':' not in template :
scheme = template . split ( ':' , 1 ) [ 0 ] . lower ( )
return scheme in self . url_schemes
if sys . platform . startswith ( 'java' ) :
new_permissions = stat . S_IMODE ( st . st_mode ) | stat . S_IWUSR
from subprocess import PIPE , Popen
p = Popen ( args , shell = False , stdout = PIPE , stderr = PIPE , close_fds = os . name != 'nt' , universal_newlines = True )
except OSError as e :
strerror = force_text ( e . strerror , DEFAULT_LOCALE_ENCODING , strings_only = True )
six . reraise ( os_err_exc_type , os_err_exc_type ( 'Error executing %s: %s' % ( args [ 0 ] , strerror ) ) , sys . exc_info ( ) [ 2 ] )
output , errors = p . communicate ( )
return ( output , force_text ( errors , DEFAULT_LOCALE_ENCODING , strings_only = True ) , p . returncode )
ext_list = [ ]
for ext in extensions :
ext_list . extend ( ext . replace ( ' ' , '' ) . split ( ',' ) )
for i , ext in enumerate ( ext_list ) :
if not ext . startswith ( '.' ) :
ext_list [ i ] = '.%s' % ext_list [ i ]
return set ( x for x in ext_list if x . strip ( '.' ) not in ignored )
if pathext is None :
for ext in pathext :
if cmd . endswith ( ext ) :
pathext = [ '' ]
for ext in pathext :
fext = f + ext
from math import ceil
self . object_list = object_list
self . per_page = int ( per_page )
self . orphans = int ( orphans )
self . allow_empty_first_page = allow_empty_first_page
number = int ( number )
if number < 1 :
else :
number = self . validate_number ( number )
except ( AttributeError , TypeError ) :
count = property ( _get_count )
else :
num_pages = property ( _get_num_pages )
page_range = property ( _get_page_range )
QuerySetPaginator = Paginator
self . object_list = object_list
self . number = number
self . paginator = paginator
BUILTIN_SERIALIZERS = { "xml" : "django.core.serializers.xml_serializer" , "python" : "django.core.serializers.python" , "json" : "django.core.serializers.json" , "yaml" : "django.core.serializers.pyyaml" , }
_serializers = { }
self . exception = exception
if serializers is None and not _serializers :
_load_serializers ( )
module = importlib . import_module ( serializer_module )
bad_serializer = BadSerializer ( exc )
module = type ( 'BadSerializerModule' , ( object , ) , { 'Deserializer' : bad_serializer , 'Serializer' : bad_serializer , } )
if serializers is None :
_serializers [ format ] = module
else :
serializers [ format ] = module
if not _serializers :
_load_serializers ( )
if format not in _serializers :
del _serializers [ format ]
if not _serializers :
_load_serializers ( )
if format not in _serializers :
return _serializers [ format ] . Serializer
if not _serializers :
_load_serializers ( )
if not _serializers :
_load_serializers ( )
return [ k for k , v in six . iteritems ( _serializers ) if not v . Serializer . internal_use_only ]
if not _serializers :
_load_serializers ( )
if format not in _serializers :
return _serializers [ format ] . Deserializer
s = get_serializer ( format ) ( )
s . serialize ( queryset , ** options )
d = get_deserializer ( format )
global _serializers
serializers = { }
for format in BUILTIN_SERIALIZERS :
register_serializer ( format , BUILTIN_SERIALIZERS [ format ] , serializers )
register_serializer ( format , settings . SERIALIZATION_MODULES [ format ] , serializers )
_serializers = serializers
self . stream = options . pop ( "stream" , six . StringIO ( ) )
self . selected_fields = options . pop ( "fields" , None )
self . use_natural_keys = options . pop ( "use_natural_keys" , False )
warnings . warn ( "``use_natural_keys`` is deprecated; use ``use_natural_foreign_keys`` instead." , RemovedInDjango19Warning )
self . use_natural_primary_keys = options . pop ( 'use_natural_primary_keys' , False )
for obj in queryset :
concrete_model = obj . _meta . concrete_model
else :
if callable ( getattr ( self . stream , 'getvalue' , None ) ) :
self . stream = six . StringIO ( stream_or_string )
else :
self . m2m_data = m2m_data
models . Model . save_base ( self . object , using = using , raw = True )
if self . m2m_data and save_m2m :
for accessor_name , object_list in self . m2m_data . items ( ) :
setattr ( self . object , accessor_name , object_list )
if ( obj . pk is None and hasattr ( Model , 'natural_key' ) and hasattr ( Model . _default_manager , 'get_by_natural_key' ) ) :
natural_key = obj . natural_key ( )
obj . pk = Model . _default_manager . db_manager ( db ) . get_by_natural_key ( * natural_key ) . pk
except Model . DoesNotExist :
if json . __version__ . split ( '.' ) >= [ '2' , '1' , '3' ] :
self . json_kwargs . pop ( 'stream' , None )
self . json_kwargs . pop ( 'fields' , None )
self . json_kwargs [ 'separators' ] = ( ',' , ': ' )
indent = self . options . get ( "indent" )
if not indent :
if indent :
return super ( PythonSerializer , self ) . getvalue ( )
stream_or_string = stream_or_string . read ( )
stream_or_string = stream_or_string . decode ( 'utf-8' )
objects = json . loads ( stream_or_string )
for obj in PythonDeserializer ( objects , ** options ) :
except GeneratorExit :
except Exception as e :
six . reraise ( DeserializationError , DeserializationError ( e ) , sys . exc_info ( ) [ 2 ] )
r = o . isoformat ( )
if o . microsecond :
r = r [ : 23 ] + r [ 26 : ]
if r . endswith ( '+00:00' ) :
r = r [ : - 6 ] + 'Z'
if is_aware ( o ) :
r = o . isoformat ( )
if o . microsecond :
r = r [ : 12 ]
else :
DateTimeAwareJSONEncoder = DjangoJSONEncoder
if not self . use_natural_primary_keys or not hasattr ( obj , 'natural_key' ) :
else :
if related :
else :
else :
else :
db = options . pop ( 'using' , DEFAULT_DB_ALIAS )
ignore = options . pop ( 'ignorenonexistent' , False )
for d in object_list :
Model = _get_model ( d [ "model" ] )
except base . DeserializationError :
if ignore :
else :
if 'pk' in d :
data [ Model . _meta . pk . attname ] = Model . _meta . pk . to_python ( d . get ( "pk" , None ) )
m2m_data = { }
model_fields = Model . _meta . get_all_field_names ( )
for ( field_name , field_value ) in six . iteritems ( d [ "fields" ] ) :
if ignore and field_name not in model_fields :
else :
else :
if field_value is not None :
else :
else :
else :
else :
return apps . get_model ( model_identifier )
except ( LookupError , TypeError ) :
from yaml import CSafeLoader as SafeLoader
from yaml import CSafeDumper as SafeDumper
from yaml import SafeLoader , SafeDumper
return self . represent_scalar ( 'tag:yaml.org,2002:str' , str ( data ) )
DjangoSafeDumper . add_representer ( decimal . Decimal , DjangoSafeDumper . represent_decimal )
else :
return super ( PythonSerializer , self ) . getvalue ( )
stream_or_string = stream_or_string . decode ( 'utf-8' )
stream = StringIO ( stream_or_string )
else :
stream = stream_or_string
for obj in PythonDeserializer ( yaml . load ( stream , Loader = SafeLoader ) , ** options ) :
except GeneratorExit :
except Exception as e :
six . reraise ( DeserializationError , DeserializationError ( e ) , sys . exc_info ( ) [ 2 ] )
from xml . dom import pulldom
from xml . sax import handler
from xml . sax . expatreader import ExpatParser as _ExpatParser
self . xml . startDocument ( )
self . xml . startElement ( "django-objects" , { "version" : "1.0" } )
self . xml . endElement ( "django-objects" )
self . xml . endDocument ( )
if not hasattr ( obj , "_meta" ) :
raise base . SerializationError ( "Non-model object (%s) encountered during serialization" % type ( obj ) )
attrs = { "model" : smart_text ( obj . _meta ) }
if not self . use_natural_primary_keys or not hasattr ( obj , 'natural_key' ) :
obj_pk = obj . _get_pk_val ( )
if obj_pk is not None :
attrs [ 'pk' ] = smart_text ( obj_pk )
self . xml . startElement ( "object" , attrs )
self . xml . endElement ( "object" )
else :
if related_att is not None :
related = related . natural_key ( )
for key_value in related :
self . xml . startElement ( "natural" , { } )
self . xml . characters ( smart_text ( key_value ) )
self . xml . endElement ( "natural" )
else :
self . xml . characters ( smart_text ( related_att ) )
else :
self . xml . startElement ( "object" , { } )
for key_value in natural :
self . xml . startElement ( "natural" , { } )
self . xml . characters ( smart_text ( key_value ) )
self . xml . endElement ( "natural" )
self . xml . endElement ( "object" )
else :
self . xml . addQuickElement ( "object" , attrs = { 'pk' : smart_text ( value . _get_pk_val ( ) ) } )
handle_m2m ( relobj )
self . db = options . pop ( 'using' , DEFAULT_DB_ALIAS )
self . ignore = options . pop ( 'ignorenonexistent' , False )
for event , node in self . event_stream :
if event == "START_ELEMENT" and node . nodeName == "object" :
self . event_stream . expandNode ( node )
Model = self . _get_model_from_node ( node , "model" )
if node . hasAttribute ( 'pk' ) :
data [ Model . _meta . pk . attname ] = Model . _meta . pk . to_python ( node . getAttribute ( 'pk' ) )
m2m_data = { }
model_fields = Model . _meta . get_all_field_names ( )
field_name = field_node . getAttribute ( "name" )
if not field_name :
if self . ignore and field_name not in model_fields :
else :
if field_node . getElementsByTagName ( 'None' ) :
else :
return base . DeserializedObject ( obj , m2m_data )
if node . getElementsByTagName ( 'None' ) :
else :
keys = node . getElementsByTagName ( 'natural' )
if keys :
field_value = [ getInnerText ( k ) . strip ( ) for k in keys ]
obj_pk = obj_pk . pk
else :
field_value = getInnerText ( node ) . strip ( )
else :
field_value = getInnerText ( node ) . strip ( )
keys = n . getElementsByTagName ( 'natural' )
if keys :
field_value = [ getInnerText ( k ) . strip ( ) for k in keys ]
else :
else :
return [ m2m_convert ( c ) for c in node . getElementsByTagName ( "object" ) ]
model_identifier = node . getAttribute ( attr )
if not model_identifier :
raise base . DeserializationError ( "<%s> node is missing the required '%s' attribute" % ( node . nodeName , attr ) )
return apps . get_model ( model_identifier )
except ( LookupError , TypeError ) :
inner_text = [ ]
for child in node . childNodes :
if child . nodeType == child . TEXT_NODE or child . nodeType == child . CDATA_SECTION_NODE :
elif child . nodeType == child . ELEMENT_NODE :
inner_text . extend ( getInnerText ( child ) )
else :
self . setFeature ( handler . feature_external_ges , False )
self . setFeature ( handler . feature_external_pes , False )
_ExpatParser . reset ( self )
self . sysid = sysid
self . pubid = pubid
tpl = "DTDForbidden(name='{}', system_id={!r}, public_id={!r})"
self . base = base
self . sysid = sysid
self . pubid = pubid
self . notation_name = notation_name
tpl = "EntitiesForbidden(name='{}', system_id={!r}, public_id={!r})"
self . base = base
self . sysid = sysid
self . pubid = pubid
tpl = "ExternalReferenceForbidden(system_id='{}', public_id={})"
from wsgiref import simple_server
from wsgiref . util import FileWrapper
__all__ = ( 'WSGIServer' , 'WSGIRequestHandler' )
if app_path is None :
return import_string ( app_path )
msg = ( "WSGI application '%(app_path)s' could not be loaded; " "Error importing module: '%(exception)s'" % ( { 'app_path' : app_path , 'exception' : e , } ) )
six . reraise ( ImproperlyConfigured , ImproperlyConfigured ( msg ) , sys . exc_info ( ) [ 2 ] )
request_queue_size = 10
self . address_family = socket . AF_INET6
super ( WSGIServer , self ) . server_bind ( )
msg = "[%s] %s\n" % ( self . log_date_time_string ( ) , format % args )
msg = self . style . HTTP_SUCCESS ( msg )
msg = self . style . HTTP_INFO ( msg )
msg = self . style . HTTP_NOT_MODIFIED ( msg )
msg = self . style . HTTP_REDIRECT ( msg )
msg = self . style . HTTP_NOT_FOUND ( msg )
msg = self . style . HTTP_BAD_REQUEST ( msg )
else :
msg = self . style . HTTP_SERVER_ERROR ( msg )
sys . stderr . write ( msg )
server_address = ( addr , port )
if threading :
httpd_cls = type ( str ( 'WSGIServer' ) , ( socketserver . ThreadingMixIn , WSGIServer ) , { } )
else :
httpd_cls = WSGIServer
httpd = httpd_cls ( server_address , WSGIRequestHandler , ipv6 = ipv6 )
if threading :
httpd . set_app ( wsgi_handler )
httpd . serve_forever ( )
__version__ = "0.1"
__all__ = [ "runfastcgi" ]
FASTCGI_OPTIONS = { 'protocol' : 'fcgi' , 'host' : None , 'port' : None , 'socket' : None , 'method' : 'fork' , 'daemonize' : None , 'workdir' : '/' , 'pidfile' : None , 'maxspare' : 5 , 'minspare' : 2 , 'maxchildren' : 50 , 'maxrequests' : 0 , 'debug' : None , 'outlog' : None , 'errlog' : None , 'umask' : None , }
print ( FASTCGI_HELP )
options = FASTCGI_OPTIONS . copy ( )
for x in argset :
if "=" in x :
k , v = x . split ( '=' , 1 )
else :
options [ k . lower ( ) ] = v
if "help" in options :
sys . stderr . write ( "ERROR: %s\n" % e )
sys . stderr . write ( " Unable to load the flup package. In order to run django\n" )
sys . stderr . write ( " as a FastCGI application, you will need to get flup from\n" )
sys . stderr . write ( " http://www.saddi.com/software/flup/ If you've already\n" )
sys . stderr . write ( " installed flup, then make sure you have it in your PYTHONPATH.\n" )
flup_module = 'server.' + options [ 'protocol' ]
if options [ 'method' ] in ( 'prefork' , 'fork' ) :
wsgi_opts = { 'maxSpare' : int ( options [ "maxspare" ] ) , 'minSpare' : int ( options [ "minspare" ] ) , 'maxChildren' : int ( options [ "maxchildren" ] ) , 'maxRequests' : int ( options [ "maxrequests" ] ) , }
flup_module += '_fork'
wsgi_opts = { 'maxSpare' : int ( options [ "maxspare" ] ) , 'minSpare' : int ( options [ "minspare" ] ) , 'maxThreads' : int ( options [ "maxchildren" ] ) , }
else :
return fastcgi_help ( "ERROR: Implementation must be one of prefork or " "thread." )
wsgi_opts [ 'debug' ] = options [ 'debug' ] is not None
module = importlib . import_module ( '.%s' % flup_module , 'flup' )
WSGIServer = module . WSGIServer
except Exception :
print ( "Can't import flup." + flup_module )
if options [ "host" ] and options [ "port" ] and not options [ "socket" ] :
wsgi_opts [ 'bindAddress' ] = ( options [ "host" ] , int ( options [ "port" ] ) )
wsgi_opts [ 'bindAddress' ] = options [ "socket" ]
wsgi_opts [ 'bindAddress' ] = None
else :
return fastcgi_help ( "Invalid combination of host, port, socket." )
if options [ "daemonize" ] is None :
daemonize = ( wsgi_opts [ 'bindAddress' ] is not None )
else :
else :
return fastcgi_help ( "ERROR: Invalid option for daemonize " "parameter." )
daemon_kwargs = { }
if options [ 'outlog' ] :
daemon_kwargs [ 'out_log' ] = options [ 'outlog' ]
if options [ 'errlog' ] :
daemon_kwargs [ 'err_log' ] = options [ 'errlog' ]
if options [ 'umask' ] :
daemon_kwargs [ 'umask' ] = int ( options [ 'umask' ] , 8 )
if daemonize :
become_daemon ( our_home_dir = options [ "workdir" ] , ** daemon_kwargs )
if options [ "pidfile" ] :
with open ( options [ "pidfile" ] , "w" ) as fp :
WSGIServer ( get_internal_wsgi_application ( ) , ** wsgi_opts ) . run ( )
if __name__ == '__main__' :
runfastcgi ( sys . argv [ 1 : ] )
request_started = Signal ( )
request_finished = Signal ( )
return base64 . urlsafe_b64encode ( s ) . strip ( b'=' )
return base64 . urlsafe_b64decode ( s + pad )
return Signer ( b'django.http.cookies' + key , salt = salt )
return json . dumps ( obj , separators = ( ',' , ':' ) ) . encode ( 'latin-1' )
if compress :
if is_compressed :
base64d = b'.' + base64d
return TimestampSigner ( key , salt = salt ) . sign ( base64d )
base64d = force_bytes ( TimestampSigner ( key , salt = salt ) . unsign ( s , max_age = max_age ) )
if base64d [ : 1 ] == b'.' :
base64d = base64d [ 1 : ]
if decompress :
self . sep = force_str ( sep )
signed_value = force_str ( signed_value )
if self . sep not in signed_value :
value , sig = signed_value . rsplit ( self . sep , 1 )
if constant_time_compare ( sig , self . signature ( value ) ) :
return baseconv . base62 . encode ( int ( time . time ( ) ) )
timestamp = baseconv . base62 . decode ( timestamp )
if max_age is not None :
age = time . time ( ) - timestamp
if age > max_age :
raise SignatureExpired ( 'Signature age %s > %s seconds' % ( age , max_age ) )
_prefixes = local ( )
_urlconfs = local ( )
self . url_name = url_name
self . app_name = app_name
if namespaces :
self . namespaces = [ x for x in namespaces if x ]
else :
else :
view_path = url_name or self . _func_path
@ lru_cache . lru_cache ( maxsize = None )
if callable ( lookup_view ) :
mod_name , func_name = get_mod_func ( lookup_view )
if not func_name :
if can_fail :
else :
mod = import_module ( mod_name )
if can_fail :
else :
parentmod , submod = get_mod_func ( mod_name )
if submod and not module_has_submodule ( import_module ( parentmod ) , submod ) :
else :
else :
view_func = getattr ( mod , func_name )
except AttributeError :
if can_fail :
else :
else :
if not callable ( view_func ) :
@ lru_cache . lru_cache ( maxsize = None )
if urlconf is None :
return RegexURLResolver ( r'^/' , urlconf )
@ lru_cache . lru_cache ( maxsize = None )
ns_resolver = RegexURLResolver ( ns_pattern , resolver . url_patterns )
return RegexURLResolver ( r'^/' , [ ns_resolver ] )
dot = callback . rindex ( '.' )
return callback [ : dot ] , callback [ dot + 1 : ]
@ property
language_code = get_language ( )
else :
regex = force_text ( self . _regex )
compiled_regex = re . compile ( regex , re . UNICODE )
except re . error as e :
self . _regex_dict [ language_code ] = compiled_regex
if callable ( callback ) :
else :
self . default_args = default_args or { }
if not prefix or not hasattr ( self , '_callback_str' ) :
if match :
kwargs = match . groupdict ( )
else :
kwargs . update ( self . default_args )
@ property
self . urlconf_name = urlconf_name
self . default_kwargs = default_kwargs or { }
self . namespace = namespace
self . app_name = app_name
urlconf_repr = '<%s list>' % self . urlconf_name [ 0 ] . __class__ . __name__
else :
urlconf_repr = repr ( self . urlconf_name )
lookups = MultiValueDict ( )
namespaces = { }
apps = { }
language_code = get_language ( )
for pattern in reversed ( self . url_patterns ) :
if hasattr ( pattern , '_callback_str' ) :
self . _callback_strs . add ( pattern . _callback_str )
callback = pattern . _callback
if not hasattr ( callback , '__name__' ) :
lookup_str = callback . __module__ + "." + callback . __class__ . __name__
else :
lookup_str = callback . __module__ + "." + callback . __name__
self . _callback_strs . add ( lookup_str )
p_pattern = pattern . regex . pattern
if p_pattern . startswith ( '^' ) :
p_pattern = p_pattern [ 1 : ]
if pattern . namespace :
namespaces [ pattern . namespace ] = ( p_pattern , pattern )
if pattern . app_name :
else :
parent_pat = pattern . regex . pattern
for name in pattern . reverse_dict :
for matches , pat , defaults in pattern . reverse_dict . getlist ( name ) :
new_matches = normalize ( parent_pat + pat )
lookups . appendlist ( name , ( new_matches , p_pattern + pat , dict ( defaults , ** pattern . default_kwargs ) ) )
for namespace , ( prefix , sub_pattern ) in pattern . namespace_dict . items ( ) :
namespaces [ namespace ] = ( p_pattern + prefix , sub_pattern )
for app_name , namespace_list in pattern . app_dict . items ( ) :
apps . setdefault ( app_name , [ ] ) . extend ( namespace_list )
self . _callback_strs . update ( pattern . _callback_strs )
else :
if pattern . name is not None :
self . _reverse_dict [ language_code ] = lookups
self . _namespace_dict [ language_code ] = namespaces
self . _app_dict [ language_code ] = apps
@ property
language_code = get_language ( )
if language_code not in self . _reverse_dict :
@ property
language_code = get_language ( )
if language_code not in self . _namespace_dict :
@ property
language_code = get_language ( )
if language_code not in self . _app_dict :
tried = [ ]
if match :
for pattern in self . url_patterns :
sub_match = pattern . resolve ( new_path )
except Resolver404 as e :
sub_tried = e . args [ 0 ] . get ( 'tried' )
if sub_tried is not None :
tried . extend ( [ pattern ] + t for t in sub_tried )
else :
else :
if sub_match :
sub_match_dict = dict ( match . groupdict ( ) , ** self . default_kwargs )
sub_match_dict . update ( sub_match . kwargs )
return ResolverMatch ( sub_match . func , sub_match . args , sub_match_dict , sub_match . url_name , self . app_name or sub_match . app_name , [ self . namespace ] + sub_match . namespaces )
@ property
except AttributeError :
@ property
patterns = getattr ( self . urlconf_module , "urlpatterns" , self . urlconf_module )
iter ( patterns )
msg = ( "The included urlconf '{name}' does not appear to have any " "patterns in it. If you see valid patterns in the file then " "the issue is probably caused by a circular import." )
callback = getattr ( self . urlconf_module , 'handler%s' % view_type , None )
if not callback :
callback = getattr ( urls , 'handler%s' % view_type )
return get_callable ( callback ) , { }
text_kwargs = dict ( ( k , force_text ( v ) ) for ( k , v ) in kwargs . items ( ) )
original_lookup = lookup_view
if lookup_view in self . _callback_strs :
lookup_view = get_callable ( lookup_view , True )
else :
if not callable ( original_lookup ) and callable ( lookup_view ) :
warnings . warn ( 'Reversing by dotted path is deprecated (%s).' % original_lookup , RemovedInDjango20Warning , stacklevel = 3 )
possibilities = self . reverse_dict . getlist ( lookup_view )
prefix_norm , prefix_args = normalize ( urlquote ( _prefix ) ) [ 0 ]
for possibility , pattern , defaults in possibilities :
for result , params in possibility :
candidate_subs = dict ( zip ( prefix_args + params , text_args ) )
else :
if set ( kwargs . keys ( ) ) | set ( defaults . keys ( ) ) != set ( params ) | set ( defaults . keys ( ) ) | set ( prefix_args ) :
for k , v in defaults . items ( ) :
if not matches :
candidate_subs = text_kwargs
candidate_pat = prefix_norm . replace ( '%' , '%%' ) + result
if re . search ( '^%s%s' % ( prefix_norm , pattern ) , candidate_pat % candidate_subs , re . UNICODE ) :
candidate_subs = dict ( ( k , urlquote ( v ) ) for ( k , v ) in candidate_subs . items ( ) )
return candidate_pat % candidate_subs
m = getattr ( lookup_view , '__module__' , None )
n = getattr ( lookup_view , '__name__' , None )
if m is not None and n is not None :
lookup_view_s = "%s.%s" % ( m , n )
else :
lookup_view_s = lookup_view
patterns = [ pattern for ( possibility , pattern , defaults ) in possibilities ]
raise NoReverseMatch ( "Reverse for '%s' with arguments '%s' and keyword " "arguments '%s' not found. %d pattern(s) tried: %s" % ( lookup_view_s , args , kwargs , len ( patterns ) , patterns ) )
super ( LocaleRegexURLResolver , self ) . __init__ ( None , urlconf_name , default_kwargs , app_name , namespace )
@ property
language_code = get_language ( )
regex_compiled = re . compile ( '^%s/' % language_code , re . UNICODE )
self . _regex_dict [ language_code ] = regex_compiled
if urlconf is None :
urlconf = get_urlconf ( )
if urlconf is None :
urlconf = get_urlconf ( )
resolver = get_resolver ( urlconf )
if prefix is None :
prefix = get_script_prefix ( )
view = viewname
else :
parts = viewname . split ( ':' )
parts . reverse ( )
view = parts [ 0 ]
resolved_path = [ ]
ns_pattern = ''
app_list = resolver . app_dict [ ns ]
if current_app and current_app in app_list :
ns = current_app
ns = app_list [ 0 ]
except KeyError :
extra , resolver = resolver . namespace_dict [ ns ]
ns_pattern = ns_pattern + extra
if resolved_path :
else :
if ns_pattern :
resolver = get_ns_resolver ( ns_pattern , resolver )
reverse_lazy = lazy ( reverse , str )
get_callable . cache_clear ( )
get_resolver . cache_clear ( )
get_ns_resolver . cache_clear ( )
if not prefix . endswith ( '/' ) :
prefix += '/'
except AttributeError :
if urlconf_name :
else :
except Resolver404 :
EMPTY_VALUES = ( None , '' , [ ] , ( ) , { } )
@ deconstructible
regex = ''
code = 'invalid'
flags = 0
if regex is not None :
self . regex = regex
if code is not None :
self . code = code
if inverse_match is not None :
self . inverse_match = inverse_match
if flags is not None :
self . flags = flags
@ deconstructible
regex = re . compile ( r'^(?:[a-z0-9\.\-]*)://' r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}(? digits :
digits = decimals
whole_digits = digits - decimals
attrs = super ( DecimalField , self ) . widget_attrs ( widget )
step = str ( Decimal ( '1' ) / 10 ** self . decimal_places ) . lower ( )
else :
step = 'any'
attrs . setdefault ( 'step' , step )
if input_formats is not None :
self . input_formats = input_formats
widget = DateInput
input_formats = formats . get_format_lazy ( 'DATE_INPUT_FORMATS' )
default_error_messages = { 'invalid' : _ ( 'Enter a valid date.' ) , }
return datetime . datetime . strptime ( force_str ( value ) , format ) . date ( )
widget = TimeInput
input_formats = formats . get_format_lazy ( 'TIME_INPUT_FORMATS' )
default_error_messages = { 'invalid' : _ ( 'Enter a valid time.' ) }
return datetime . datetime . strptime ( force_str ( value ) , format ) . time ( )
widget = DateTimeInput
input_formats = formats . get_format_lazy ( 'DATETIME_INPUT_FORMATS' )
default_error_messages = { 'invalid' : _ ( 'Enter a valid date/time.' ) , }
warnings . warn ( 'Using SplitDateTimeWidget with DateTimeField is deprecated. ' 'Use SplitDateTimeField instead.' , RemovedInDjango19Warning , stacklevel = 2 )
return datetime . datetime . strptime ( force_str ( value ) , format )
if error_message is not None :
error_messages = kwargs . get ( 'error_messages' ) or { }
error_messages [ 'invalid' ] = error_message
kwargs [ 'error_messages' ] = error_messages
regex = re . compile ( regex , re . UNICODE )
self . _regex_validator = validators . RegexValidator ( regex = regex )
regex = property ( _get_regex , _set_regex )
widget = EmailInput
default_validators = [ validators . validate_email ]
widget = ClearableFileInput
default_error_messages = { 'invalid' : _ ( "No file was submitted. Check the encoding type on the form." ) , 'missing' : _ ( "No file was submitted." ) , 'empty' : _ ( "The submitted file is empty." ) , 'max_length' : ungettext_lazy ( 'Ensure this filename has at most %(max)d character (it has %(length)d).' , 'Ensure this filename has at most %(max)d characters (it has %(length)d).' , 'max' ) , 'contradiction' : _ ( 'Please either submit a file or check the clear checkbox, not both.' ) }
self . max_length = kwargs . pop ( 'max_length' , None )
self . allow_empty_file = kwargs . pop ( 'allow_empty_file' , False )
except AttributeError :
if not file_name :
if not self . allow_empty_file and not file_size :
default_error_messages = { 'invalid_image' : _ ( "Upload a valid image. The file you uploaded was either not an image or a corrupted image." ) , }
if f is None :
from PIL import Image
else :
else :
Image . open ( file ) . verify ( )
except Exception :
six . reraise ( ValidationError , ValidationError ( self . error_messages [ 'invalid_image' ] , code = 'invalid_image' , ) , sys . exc_info ( ) [ 2 ] )
if hasattr ( f , 'seek' ) and callable ( f . seek ) :
f . seek ( 0 )
widget = URLInput
default_error_messages = { 'invalid' : _ ( 'Enter a valid URL.' ) , }
default_validators = [ validators . URLValidator ( ) ]
return list ( urlsplit ( url ) )
if not url_fields [ 0 ] :
url_fields [ 0 ] = 'http'
if not url_fields [ 1 ] :
url_fields [ 1 ] = url_fields [ 2 ]
url_fields [ 2 ] = ''
url_fields = split_url ( urlunsplit ( url_fields ) )
widget = CheckboxInput
else :
widget = NullBooleanSelect
else :
if initial is not None :
initial = bool ( initial )
widget = Select
default_error_messages = { 'invalid_choice' : _ ( 'Select a valid choice. %(value)s is not one of the available choices.' ) , }
super ( ChoiceField , self ) . __init__ ( required = required , widget = widget , label = label , initial = initial , help_text = help_text , * args , ** kwargs )
self . choices = choices
result = super ( ChoiceField , self ) . __deepcopy__ ( memo )
result . _choices = copy . deepcopy ( self . _choices , memo )
choices = property ( _get_choices , _set_choices )
for k2 , v2 in v :
if value == k2 or text_value == force_text ( k2 ) :
else :
self . coerce = kwargs . pop ( 'coerce' , lambda val : val )
self . empty_value = kwargs . pop ( 'empty_value' , '' )
hidden_widget = MultipleHiddenInput
widget = SelectMultiple
default_error_messages = { 'invalid_choice' : _ ( 'Select a valid choice. %(value)s is not one of the available choices.' ) , 'invalid_list' : _ ( 'Enter a list of values.' ) , }
if not self . valid_value ( val ) :
if initial is None :
initial = [ ]
return data_set != initial_set
self . coerce = kwargs . pop ( 'coerce' , lambda val : val )
self . empty_value = kwargs . pop ( 'empty_value' , [ ] )
new_value = [ ]
for f in fields :
self . fields = fields
default_error_messages = { 'invalid' : _ ( 'Enter a list of values.' ) , 'incomplete' : _ ( 'Enter a complete value.' ) , }
self . require_all_fields = kwargs . pop ( 'require_all_fields' , True )
for f in fields :
f . error_messages . setdefault ( 'incomplete' , self . error_messages [ 'incomplete' ] )
self . fields = fields
result = super ( MultiValueField , self ) . __deepcopy__ ( memo )
result . fields = tuple ( [ x . __deepcopy__ ( memo ) for x in self . fields ] )
clean_data = [ ]
errors = [ ]
else :
else :
except IndexError :
field_value = None
except ValidationError as e :
errors . extend ( m for m in e . error_list if m not in errors )
if errors :
out = self . compress ( clean_data )
if initial is None :
else :
initial = self . widget . decompress ( initial )
self . allow_files , self . allow_folders = allow_files , allow_folders
super ( FilePathField , self ) . __init__ ( choices = ( ) , required = required , widget = widget , label = label , initial = initial , help_text = help_text , * args , ** kwargs )
else :
if recursive :
for f in files :
for f in dirs :
if f == '__pycache__' :
else :
if f == '__pycache__' :
except OSError :
widget = SplitDateTimeWidget
hidden_widget = SplitHiddenDateTimeWidget
default_error_messages = { 'invalid_date' : _ ( 'Enter a valid date.' ) , 'invalid_time' : _ ( 'Enter a valid time.' ) , }
errors = self . default_error_messages . copy ( )
errors . update ( kwargs [ 'error_messages' ] )
localize = kwargs . get ( 'localize' , False )
fields = ( DateField ( input_formats = input_date_formats , error_messages = { 'invalid' : errors [ 'invalid_date' ] } , localize = localize ) , TimeField ( input_formats = input_time_formats , error_messages = { 'invalid' : errors [ 'invalid_time' ] } , localize = localize ) , )
if data_list :
result = datetime . datetime . combine ( * data_list )
default_validators = [ validators . validate_ipv4_address ]
warnings . warn ( "IPAddressField has been deprecated. Use GenericIPAddressField instead." , RemovedInDjango19Warning )
self . unpack_ipv4 = unpack_ipv4
self . default_validators = validators . ip_address_validators ( protocol , unpack_ipv4 ) [ 0 ]
default_validators = [ validators . validate_slug ]
__all__ = ( 'BaseForm' , 'Form' )
if not name :
return name . replace ( '_' , ' ' ) . capitalize ( )
warnings . warn ( "get_declared_fields is deprecated and will be removed in Django 1.9." , RemovedInDjango19Warning , stacklevel = 2 , )
if with_base_fields :
for base in bases [ : : - 1 ] :
if hasattr ( base , 'base_fields' ) :
fields = list ( six . iteritems ( base . base_fields ) ) + fields
else :
for base in bases [ : : - 1 ] :
if hasattr ( base , 'declared_fields' ) :
fields = list ( six . iteritems ( base . declared_fields ) ) + fields
current_fields = [ ]
current_fields . sort ( key = lambda x : x [ 1 ] . creation_counter )
attrs [ 'declared_fields' ] = OrderedDict ( current_fields )
new_class = ( super ( DeclarativeFieldsMetaclass , mcs ) . __new__ ( mcs , name , bases , attrs ) )
declared_fields = OrderedDict ( )
for base in reversed ( new_class . __mro__ ) :
if hasattr ( base , 'declared_fields' ) :
declared_fields . update ( base . declared_fields )
declared_fields . pop ( attr )
new_class . base_fields = declared_fields
new_class . declared_fields = declared_fields
@ python_2_unicode_compatible
self . files = files or { }
self . auto_id = auto_id
self . prefix = prefix
self . initial = initial or { }
self . error_class = error_class
self . label_suffix = label_suffix if label_suffix is not None else _ ( ':' )
self . empty_permitted = empty_permitted
except KeyError :
@ property
return 'initial-%s' % self . add_prefix ( field_name )
top_errors = self . non_field_errors ( )
output , hidden_fields = [ ] , [ ]
html_class_attr = ''
bf_errors = self . error_class ( [ conditional_escape ( error ) for error in bf . errors ] )
if bf . is_hidden :
if bf_errors :
top_errors . extend ( [ _ ( '(Hidden field %(name)s) %(error)s' ) % { 'name' : name , 'error' : force_text ( e ) } for e in bf_errors ] )
else :
css_classes = bf . css_classes ( )
if css_classes :
if errors_on_separate_row and bf_errors :
if bf . label :
label = conditional_escape ( force_text ( bf . label ) )
label = bf . label_tag ( label ) or ''
else :
label = ''
else :
help_text = ''
output . append ( normal_row % { 'errors' : force_text ( bf_errors ) , 'label' : force_text ( label ) , 'field' : six . text_type ( bf ) , 'help_text' : help_text , 'html_class_attr' : html_class_attr , 'field_name' : bf . html_name , } )
if top_errors :
output . insert ( 0 , error_row % force_text ( top_errors ) )
if hidden_fields :
str_hidden = '' . join ( hidden_fields )
if output :
last_row = output [ - 1 ]
if not last_row . endswith ( row_ender ) :
last_row = ( normal_row % { 'errors' : '' , 'label' : '' , 'field' : '' , 'help_text' : '' , 'html_class_attr' : html_class_attr } )
else :
return self . _html_output ( normal_row = '%(label)s | %(errors)s%(field)s%(help_text)s |
' , error_row = '%s |
' , row_ender = '' , help_text_html = '
%s' , errors_on_separate_row = False )
return self . _html_output ( normal_row = '%(errors)s%(label)s %(field)s%(help_text)s' , error_row = '%s' , row_ender = '' , help_text_html = ' %s' , errors_on_separate_row = False )
return self . _html_output ( normal_row = '%(label)s %(field)s%(help_text)s
' , error_row = '%s' , row_ender = '
' , help_text_html = ' %s' , errors_on_separate_row = True )
prefix = self . add_prefix ( fieldname )
error = ValidationError ( error )
if hasattr ( error , 'error_dict' ) :
else :
error = error . error_dict
else :
else :
if code is None :
if error . code == code :
self . _errors = ErrorDict ( )
else :
except ValidationError as e :
except ValidationError as e :
else :
if cleaned_data is not None :
self . cleaned_data = cleaned_data
@ property
prefixed_name = self . add_prefix ( name )
if callable ( initial_value ) :
initial_value = initial_value ( )
else :
initial_prefixed_name = self . add_initial_prefix ( name )
except ValidationError :
@ property
media = Media ( )
media = media + field . widget . media
@ python_2_unicode_compatible
self . html_name = form . add_prefix ( name )
self . html_initial_name = form . add_initial_prefix ( name )
self . label = pretty_name ( name )
else :
attrs = { 'id' : id_ } if id_ else { }
return list ( self . __iter__ ( ) ) [ idx ]
@ property
if not widget :
attrs = attrs or { }
auto_id = self . auto_id
if auto_id and 'id' not in attrs and 'id' not in widget . attrs :
if not only_initial :
attrs [ 'id' ] = auto_id
else :
attrs [ 'id' ] = self . html_initial_id
if not only_initial :
else :
return self . as_widget ( TextInput ( ) , attrs , ** kwargs )
return self . as_widget ( Textarea ( ) , attrs , ** kwargs )
@ property
else :
contents = contents or self . label
if label_suffix is None :
if label_suffix and contents and contents [ - 1 ] not in _ ( ':?.!' ) :
contents = format_html ( '{0}{1}' , contents , label_suffix )
id_ = widget . attrs . get ( 'id' ) or self . auto_id
if id_ :
id_for_label = widget . id_for_label ( id_ )
if id_for_label :
attrs = dict ( attrs or { } , ** { 'for' : id_for_label } )
attrs = attrs or { }
else :
attrs = flatatt ( attrs ) if attrs else ''
contents = format_html ( '' , attrs , contents )
else :
contents = conditional_escape ( contents )
if hasattr ( extra_classes , 'split' ) :
extra_classes = extra_classes . split ( )
extra_classes = set ( extra_classes or [ ] )
extra_classes . add ( self . form . error_css_class )
extra_classes . add ( self . form . required_css_class )
@ property
@ property
auto_id = self . form . auto_id
if auto_id and '%s' in smart_text ( auto_id ) :
return smart_text ( auto_id ) % self . html_name
@ property
id_ = widget . attrs . get ( 'id' ) or self . auto_id
return widget . id_for_label ( id_ )
__all__ = ( 'BaseFormSet' , 'formset_factory' , 'all_valid' )
TOTAL_FORM_COUNT = 'TOTAL_FORMS'
INITIAL_FORM_COUNT = 'INITIAL_FORMS'
MIN_NUM_FORM_COUNT = 'MIN_NUM_FORMS'
MAX_NUM_FORM_COUNT = 'MAX_NUM_FORMS'
ORDERING_FIELD_NAME = 'ORDER'
DELETION_FIELD_NAME = 'DELETE'
DEFAULT_MIN_NUM = 0
DEFAULT_MAX_NUM = 1000
self . base_fields [ TOTAL_FORM_COUNT ] = IntegerField ( widget = HiddenInput )
self . base_fields [ INITIAL_FORM_COUNT ] = IntegerField ( widget = HiddenInput )
self . base_fields [ MIN_NUM_FORM_COUNT ] = IntegerField ( required = False , widget = HiddenInput )
self . base_fields [ MAX_NUM_FORM_COUNT ] = IntegerField ( required = False , widget = HiddenInput )
@ python_2_unicode_compatible
self . auto_id = auto_id
self . files = files or { }
self . initial = initial
self . error_class = error_class
@ property
if not form . is_valid ( ) :
else :
else :
total_forms = initial_forms
total_forms = self . max_num
return self . management_form . cleaned_data [ INITIAL_FORM_COUNT ]
else :
@ cached_property
defaults [ 'files' ] = self . files
if self . initial and 'initial' not in kwargs :
defaults [ 'initial' ] = self . initial [ i ]
except IndexError :
defaults . update ( kwargs )
form = self . form ( ** defaults )
@ property
@ property
@ property
@ property
return [ form . cleaned_data for form in self . forms ]
@ property
if i >= self . initial_form_count ( ) and not form . has_changed ( ) :
@ property
if i >= self . initial_form_count ( ) and not form . has_changed ( ) :
if k [ 1 ] is None :
@ classmethod
@ property
forms_valid &= form . is_valid ( )
return forms_valid and not self . non_form_errors ( )
raise ValidationError ( ungettext ( "Please submit %d or fewer forms." , "Please submit %d or fewer forms." , self . max_num ) % self . max_num , code = 'too_many_forms' , )
raise ValidationError ( ungettext ( "Please submit %d or more forms." , "Please submit %d or more forms." , self . min_num ) % self . min_num , code = 'too_few_forms' )
except ValidationError as e :
return any ( form . has_changed ( ) for form in self )
if index is not None and index < self . initial_form_count ( ) :
form . fields [ ORDERING_FIELD_NAME ] = IntegerField ( label = _ ( 'Order' ) , initial = index + 1 , required = False )
else :
form . fields [ ORDERING_FIELD_NAME ] = IntegerField ( label = _ ( 'Order' ) , required = False )
form . fields [ DELETION_FIELD_NAME ] = BooleanField ( label = _ ( 'Delete' ) , required = False )
else :
return self . empty_form . is_multipart ( )
@ property
else :
forms = ' ' . join ( form . as_table ( ) for form in self )
return mark_safe ( '\n' . join ( [ six . text_type ( self . management_form ) , forms ] ) )
forms = ' ' . join ( form . as_p ( ) for form in self )
return mark_safe ( '\n' . join ( [ six . text_type ( self . management_form ) , forms ] ) )
forms = ' ' . join ( form . as_ul ( ) for form in self )
return mark_safe ( '\n' . join ( [ six . text_type ( self . management_form ) , forms ] ) )
if min_num is None :
min_num = DEFAULT_MIN_NUM
if max_num is None :
max_num = DEFAULT_MAX_NUM
absolute_max = max_num + DEFAULT_MAX_NUM
attrs = { 'form' : form , 'extra' : extra , 'can_order' : can_order , 'can_delete' : can_delete , 'min_num' : min_num , 'max_num' : max_num , 'absolute_max' : absolute_max , 'validate_min' : validate_min , 'validate_max' : validate_max }
return type ( form . __name__ + str ( 'FormSet' ) , ( formset , ) , attrs )
for formset in formsets :
if not formset . is_valid ( ) :
__all__ = ( 'ModelForm' , 'BaseModelForm' , 'model_to_dict' , 'fields_for_model' , 'save_instance' , 'ModelChoiceField' , 'ModelMultipleChoiceField' , 'ALL_FIELDS' , 'BaseModelFormSet' , 'modelformset_factory' , 'BaseInlineFormSet' , 'inlineformset_factory' , )
ALL_FIELDS = '__all__'
opts = instance . _meta
cleaned_data = form . cleaned_data
file_field_list = [ ]
for f in opts . fields :
if fields is not None and f . name not in fields :
if exclude and f . name in exclude :
else :
f . save_form_data ( instance , cleaned_data [ f . name ] )
for f in file_field_list :
f . save_form_data ( instance , cleaned_data [ f . name ] )
if construct :
instance = construct_instance ( form , instance , fields , exclude )
opts = instance . _meta
if form . errors :
cleaned_data = form . cleaned_data
for f in opts . many_to_many + opts . virtual_fields :
if not hasattr ( f , 'save_form_data' ) :
if fields and f . name not in fields :
if exclude and f . name in exclude :
if f . name in cleaned_data :
f . save_form_data ( instance , cleaned_data [ f . name ] )
if commit :
instance . save ( )
save_m2m ( )
else :
form . save_m2m = save_m2m
opts = instance . _meta
for f in opts . concrete_fields + opts . virtual_fields + opts . many_to_many :
if fields and f . name not in fields :
if exclude and f . name in exclude :
if instance . pk is None :
else :
qs = f . value_from_object ( instance )
if qs . _result_cache is not None :
else :
data [ f . name ] = list ( qs . values_list ( 'pk' , flat = True ) )
else :
field_list = [ ]
ignored = [ ]
opts = model . _meta
for f in sorted ( opts . concrete_fields + sortable_virtual_fields + opts . many_to_many ) :
if fields is not None and f . name not in fields :
if exclude and f . name in exclude :
if widgets and f . name in widgets :
kwargs [ 'widget' ] = widgets [ f . name ]
if localized_fields == ALL_FIELDS or ( localized_fields and f . name in localized_fields ) :
if labels and f . name in labels :
kwargs [ 'label' ] = labels [ f . name ]
if help_texts and f . name in help_texts :
kwargs [ 'help_text' ] = help_texts [ f . name ]
if error_messages and f . name in error_messages :
kwargs [ 'error_messages' ] = error_messages [ f . name ]
if formfield_callback is None :
formfield = f . formfield ( ** kwargs )
else :
formfield = formfield_callback ( f , ** kwargs )
if formfield :
else :
field_dict = OrderedDict ( field_list )
if fields :
field_dict = OrderedDict ( [ ( f , field_dict . get ( f ) ) for f in fields if ( ( not exclude ) or ( exclude and f not in exclude ) ) and ( f not in ignored ) ] )
self . model = getattr ( options , 'model' , None )
self . fields = getattr ( options , 'fields' , None )
self . exclude = getattr ( options , 'exclude' , None )
self . widgets = getattr ( options , 'widgets' , None )
self . localized_fields = getattr ( options , 'localized_fields' , None )
self . labels = getattr ( options , 'labels' , None )
self . help_texts = getattr ( options , 'help_texts' , None )
self . error_messages = getattr ( options , 'error_messages' , None )
formfield_callback = attrs . pop ( 'formfield_callback' , None )
new_class = super ( ModelFormMetaclass , mcs ) . __new__ ( mcs , name , bases , attrs )
if bases == ( BaseModelForm , ) :
opts = new_class . _meta = ModelFormOptions ( getattr ( new_class , 'Meta' , None ) )
for opt in [ 'fields' , 'exclude' , 'localized_fields' ] :
if opts . model :
if opts . fields is None and opts . exclude is None :
raise ImproperlyConfigured ( "Creating a ModelForm without either the 'fields' attribute " "or the 'exclude' attribute is prohibited; form %s " "needs updating." % name )
if opts . fields == ALL_FIELDS :
opts . fields = None
fields = fields_for_model ( opts . model , opts . fields , opts . exclude , opts . widgets , formfield_callback , opts . localized_fields , opts . labels , opts . help_texts , opts . error_messages )
none_model_fields = [ k for k , v in six . iteritems ( fields ) if not v ]
missing_fields = ( set ( none_model_fields ) - set ( new_class . declared_fields . keys ( ) ) )
if missing_fields :
fields . update ( new_class . declared_fields )
else :
fields = new_class . declared_fields
new_class . base_fields = fields
if opts . model is None :
if instance is None :
self . instance = opts . model ( )
object_data = { }
else :
self . instance = instance
object_data = model_to_dict ( instance , opts . fields , opts . exclude )
if initial is not None :
object_data . update ( initial )
super ( BaseModelForm , self ) . __init__ ( data , files , auto_id , prefix , object_data , error_class , label_suffix , empty_permitted )
formfield = self . fields [ field_name ]
if hasattr ( formfield , 'queryset' ) :
exclude = [ ]
else :
if not f . blank and not form_field . required and field_value in form_field . empty_values :
if ( field == NON_FIELD_ERRORS and opts . error_messages and NON_FIELD_ERRORS in opts . error_messages ) :
error_messages = opts . error_messages [ NON_FIELD_ERRORS ]
else :
exclude = self . _get_validation_exclusions ( )
construct_instance_exclude = list ( exclude )
self . instance . full_clean ( exclude = exclude , validate_unique = False )
except ValidationError as e :
exclude = self . _get_validation_exclusions ( )
self . instance . validate_unique ( exclude = exclude )
except ValidationError as e :
fail_message = 'created'
else :
fail_message = 'changed'
attrs = { 'model' : model }
if fields is not None :
attrs [ 'fields' ] = fields
if exclude is not None :
attrs [ 'exclude' ] = exclude
if widgets is not None :
attrs [ 'widgets' ] = widgets
if localized_fields is not None :
attrs [ 'localized_fields' ] = localized_fields
if labels is not None :
attrs [ 'labels' ] = labels
if help_texts is not None :
attrs [ 'help_texts' ] = help_texts
if error_messages is not None :
attrs [ 'error_messages' ] = error_messages
parent = ( object , )
if hasattr ( form , 'Meta' ) :
parent = ( form . Meta , object )
Meta = type ( str ( 'Meta' ) , parent , attrs )
class_name = model . __name__ + str ( 'Form' )
form_class_attrs = { 'Meta' : Meta , 'formfield_callback' : formfield_callback }
if ( getattr ( Meta , 'fields' , None ) is None and getattr ( Meta , 'exclude' , None ) is None ) :
raise ImproperlyConfigured ( "Calling modelform_factory without defining 'fields' or " "'exclude' explicitly is prohibited." )
return type ( form ) ( class_name , ( form , ) , form_class_attrs )
model = None
self . queryset = queryset
self . initial_extra = kwargs . pop ( 'initial' , None )
defaults . update ( kwargs )
return super ( BaseModelFormSet , self ) . initial_form_count ( )
pk_field = self . model . _meta . pk
to_python = self . _get_to_python ( pk_field )
pk = to_python ( pk )
kwargs [ 'instance' ] = self . _existing_object ( pk )
kwargs [ 'instance' ] = self . get_queryset ( ) [ i ]
except IndexError :
return super ( BaseModelFormSet , self ) . _construct_form ( i , ** kwargs )
else :
qs = self . model . _default_manager . get_queryset ( )
if not qs . ordered :
qs = qs . order_by ( self . model . _meta . pk . name )
self . _queryset = qs
return form . save ( commit = commit )
return form . save ( commit = commit )
if not commit :
form . save_m2m ( )
self . save_m2m = save_m2m
all_unique_checks = set ( )
all_date_checks = set ( )
forms_to_delete = self . deleted_forms
valid_forms = [ form for form in self . forms if form . is_valid ( ) and form not in forms_to_delete ]
for form in valid_forms :
exclude = form . _get_validation_exclusions ( )
unique_checks , date_checks = form . instance . _get_unique_checks ( exclude = exclude )
all_unique_checks = all_unique_checks . union ( set ( unique_checks ) )
all_date_checks = all_date_checks . union ( set ( date_checks ) )
errors = [ ]
for uclass , unique_check in all_unique_checks :
seen_data = set ( )
for form in valid_forms :
row_data = tuple ( d . _get_pk_val ( ) if hasattr ( d , '_get_pk_val' ) else d for d in row_data )
if row_data and None not in row_data :
if row_data in seen_data :
seen_data . add ( row_data )
for date_check in all_date_checks :
seen_data = set ( )
for form in valid_forms :
if ( form . cleaned_data and form . cleaned_data [ field ] is not None and form . cleaned_data [ unique_for ] is not None ) :
if lookup == 'date' :
date = form . cleaned_data [ unique_for ]
date_data = ( date . year , date . month , date . day )
else :
date_data = ( getattr ( form . cleaned_data [ unique_for ] , lookup ) , )
if errors :
else :
return ugettext ( "Please correct the duplicate data for %(field_name)s " "which must be unique for the %(lookup)s in %(date_field)s." ) % { 'field_name' : date_check [ 2 ] , 'date_field' : date_check [ 3 ] , 'lookup' : six . text_type ( date_check [ 1 ] ) , }
return ugettext ( "Please correct the duplicate values below." )
saved_instances = [ ]
forms_to_delete = self . deleted_forms
obj = form . instance
if form in forms_to_delete :
if obj . pk is None :
if commit :
obj . delete ( )
if not commit :
if not form . has_changed ( ) :
if not commit :
return ( ( not pk . editable ) or ( pk . auto_created or isinstance ( pk , AutoField ) ) or ( pk . rel and pk . rel . parent_link and pk_is_not_editable ( pk . rel . to . _meta . pk ) ) )
if pk_is_not_editable ( pk ) or pk . name not in form . fields :
if form . is_bound :
pk_value = form . instance . pk
else :
if index is not None :
pk_value = self . get_queryset ( ) [ index ] . pk
else :
pk_value = None
except IndexError :
pk_value = None
qs = pk . rel . to . _default_manager . get_queryset ( )
else :
qs = self . model . _default_manager . get_queryset ( )
qs = qs . using ( form . instance . _state . db )
if form . _meta . widgets :
widget = form . _meta . widgets . get ( self . _pk_field . name , HiddenInput )
else :
widget = HiddenInput
form . fields [ self . _pk_field . name ] = ModelChoiceField ( qs , initial = pk_value , required = False , widget = widget )
super ( BaseModelFormSet , self ) . add_fields ( form , index )
meta = getattr ( form , 'Meta' , None )
if meta is None :
meta = type ( str ( 'Meta' ) , ( object , ) , { } )
if ( getattr ( meta , 'fields' , fields ) is None and getattr ( meta , 'exclude' , exclude ) is None ) :
raise ImproperlyConfigured ( "Calling modelformset_factory without defining 'fields' or " "'exclude' explicitly is prohibited." )
form = modelform_factory ( model , form = form , fields = fields , exclude = exclude , formfield_callback = formfield_callback , widgets = widgets , localized_fields = localized_fields , labels = labels , help_texts = help_texts , error_messages = error_messages )
FormSet = formset_factory ( form , formset , extra = extra , min_num = min_num , max_num = max_num , can_order = can_order , can_delete = can_delete , validate_min = validate_min , validate_max = validate_max )
FormSet . model = model
if instance is None :
else :
self . instance = instance
self . save_as_new = save_as_new
if queryset is None :
queryset = self . model . _default_manager
else :
qs = queryset . none ( )
super ( BaseInlineFormSet , self ) . __init__ ( data , files , prefix = prefix , queryset = qs , ** kwargs )
return super ( BaseInlineFormSet , self ) . initial_form_count ( )
form = super ( BaseInlineFormSet , self ) . _construct_form ( i , ** kwargs )
@ classmethod
return RelatedObject ( cls . fk . rel . to , cls . model , cls . fk ) . get_accessor_name ( ) . replace ( '+' , '' )
setattr ( obj , self . fk . get_attname ( ) , getattr ( pk_value , 'pk' , pk_value ) )
if commit :
obj . save ( )
if commit and hasattr ( form , 'save_m2m' ) :
form . save_m2m ( )
super ( BaseInlineFormSet , self ) . add_fields ( form , index )
else :
kwargs = { 'label' : getattr ( form . fields . get ( name ) , 'label' , capfirst ( self . fk . verbose_name ) ) }
if self . fk . rel . field_name != self . fk . rel . to . _meta . pk . name :
kwargs [ 'to_field' ] = self . fk . rel . field_name
form . fields [ name ] = InlineForeignKeyField ( self . instance , ** kwargs )
if form . _meta . fields :
form . _meta . fields = list ( form . _meta . fields )
return super ( BaseInlineFormSet , self ) . get_unique_error_message ( unique_check )
opts = model . _meta
if fk_name :
fks_to_parent = [ f for f in opts . fields if f . name == fk_name ]
fk = fks_to_parent [ 0 ]
if not isinstance ( fk , ForeignKey ) or ( fk . rel . to != parent_model and fk . rel . to not in parent_model . _meta . get_parent_list ( ) ) :
raise ValueError ( "fk_name '%s' is not a ForeignKey to '%s.%'." % ( fk_name , parent_model . _meta . app_label , parent_model . _meta . object_name ) )
else :
fks_to_parent = [ f for f in opts . fields if isinstance ( f , ForeignKey ) and ( f . rel . to == parent_model or f . rel . to in parent_model . _meta . get_parent_list ( ) ) ]
fk = fks_to_parent [ 0 ]
if can_fail :
raise ValueError ( "'%s.%s' has no ForeignKey to '%s.%s'." % ( model . _meta . app_label , model . _meta . object_name , parent_model . _meta . app_label , parent_model . _meta . object_name ) )
else :
raise ValueError ( "'%s.%s' has more than one ForeignKey to '%s.%s'." % ( model . _meta . app_label , model . _meta . object_name , parent_model . _meta . app_label , parent_model . _meta . object_name ) )
fk = _get_foreign_key ( parent_model , model , fk_name = fk_name )
if fk . unique :
max_num = 1
kwargs = { 'form' : form , 'formfield_callback' : formfield_callback , 'formset' : formset , 'extra' : extra , 'can_delete' : can_delete , 'can_order' : can_order , 'fields' : fields , 'exclude' : exclude , 'min_num' : min_num , 'max_num' : max_num , 'widgets' : widgets , 'validate_min' : validate_min , 'validate_max' : validate_max , 'localized_fields' : localized_fields , 'labels' : labels , 'help_texts' : help_texts , 'error_messages' : error_messages , }
FormSet = modelformset_factory ( model , ** kwargs )
FormSet . fk = fk
widget = HiddenInput
self . parent_instance = parent_instance
self . pk_field = kwargs . pop ( "pk_field" , False )
self . to_field = kwargs . pop ( "to_field" , None )
else :
kwargs [ "initial" ] = self . parent_instance . pk
else :
orig = self . parent_instance . pk
else :
default_error_messages = { 'invalid_choice' : _ ( 'Select a valid choice. That choice is not one of' ' the available choices.' ) , }
if required and ( initial is not None ) :
else :
self . empty_label = empty_label
if cache_choices is not None :
warnings . warn ( "cache_choices has been deprecated and will be " "removed in Django 1.9." , RemovedInDjango19Warning , stacklevel = 2 )
else :
self . cache_choices = cache_choices
self . queryset = queryset
self . to_field_name = to_field_name
result = super ( ChoiceField , self ) . __deepcopy__ ( memo )
result . queryset = result . queryset
queryset = property ( _get_queryset , _set_queryset )
choices = property ( _get_choices , ChoiceField . _set_choices )
else :
except ( ValueError , self . queryset . model . DoesNotExist ) :
initial_value = initial if initial is not None else ''
return force_text ( self . prepare_value ( initial_value ) ) != force_text ( data_value )
widget = SelectMultiple
hidden_widget = MultipleHiddenInput
default_error_messages = { 'list' : _ ( 'Enter a list of values.' ) , 'invalid_choice' : _ ( 'Select a valid choice. %(value)s is not one of the' ' available choices.' ) , 'invalid_pk_value' : _ ( '"%(pk)s" is not a valid value for a primary key.' ) }
to_py = super ( ModelMultipleChoiceField , self ) . to_python
return [ to_py ( val ) for val in value ]
if force_text ( val ) not in pks :
if initial is None :
initial = [ ]
return data_set != initial_set
return ( form_class is not None and ( hasattr ( form_class , '_meta' ) and ( form_class . _meta . fields is not None or form_class . _meta . exclude is not None ) ) )
warnings . warn ( "The django.forms.util module has been renamed. " "Use django.forms.utils instead." , RemovedInDjango19Warning , stacklevel = 2 )
from UserList import UserList
boolean_attrs = [ ]
del attrs [ attr ]
del attrs [ attr ]
return ( format_html_join ( '' , ' {0}="{1}"' , sorted ( attrs . items ( ) ) ) + format_html_join ( '' , ' {0}' , sorted ( boolean_attrs ) ) )
@ python_2_unicode_compatible
return json . dumps ( { f : e . get_json_data ( escape_html ) for f , e in self . items ( ) } )
return format_html ( '' , format_html_join ( '' , '{0}{1}' , ( ( k , force_text ( v ) ) for k , v in self . items ( ) ) ) )
output = [ ]
@ python_2_unicode_compatible
if error_class is None :
else :
self . error_class = 'errorlist {}' . format ( error_class )
errors = [ ]
return json . dumps ( self . get_json_data ( escape_html ) )
return format_html ( '' , self . error_class , format_html_join ( '' , '{0}' , ( ( force_text ( e ) , ) for e in self ) ) )
current_timezone = timezone . get_current_timezone ( )
except Exception :
message = _ ( '%(datetime)s couldn\'t be interpreted ' 'in time zone %(current_timezone)s; it ' 'may be ambiguous or it may not exist.' )
params = { 'datetime' : value , 'current_timezone' : current_timezone }
six . reraise ( ValidationError , ValidationError ( message , code = 'ambiguous_timezone' , params = params , ) , sys . exc_info ( ) [ 2 ] )
current_timezone = timezone . get_current_timezone ( )
__all__ = ( 'Media' , 'MediaDefiningClass' , 'Widget' , 'TextInput' , 'EmailInput' , 'URLInput' , 'NumberInput' , 'PasswordInput' , 'HiddenInput' , 'MultipleHiddenInput' , 'ClearableFileInput' , 'FileInput' , 'DateInput' , 'DateTimeInput' , 'TimeInput' , 'Textarea' , 'CheckboxInput' , 'Select' , 'NullBooleanSelect' , 'SelectMultiple' , 'RadioSelect' , 'CheckboxSelectMultiple' , 'MultiWidget' , 'SplitDateTimeWidget' , 'SplitHiddenDateTimeWidget' , )
MEDIA_TYPES = ( 'css' , 'js' )
@ python_2_unicode_compatible
if media :
media_attrs = media . __dict__
else :
media_attrs = kwargs
for name in MEDIA_TYPES :
getattr ( self , 'add_' + name ) ( media_attrs . get ( name , None ) )
return mark_safe ( '\n' . join ( chain ( * [ getattr ( self , 'render_' + name ) ( ) for name in MEDIA_TYPES ] ) ) )
media = sorted ( self . _css . keys ( ) )
return chain ( * [ [ format_html ( '' , self . absolute_path ( path ) , medium ) for path in self . _css [ medium ] ] for medium in media ] )
if prefix is None :
else :
if name in MEDIA_TYPES :
if not self . _css . get ( medium ) or path not in self . _css [ medium ] :
combined = Media ( )
for name in MEDIA_TYPES :
getattr ( combined , 'add_' + name ) ( getattr ( self , '_' + name , None ) )
getattr ( combined , 'add_' + name ) ( getattr ( other , '_' + name , None ) )
sup_cls = super ( cls , self )
base = sup_cls . media
except AttributeError :
base = Media ( )
definition = getattr ( cls , 'Media' , None )
if definition :
extend = getattr ( definition , 'extend' , True )
if extend :
m = base
else :
m = Media ( )
for medium in extend :
m = m + base [ medium ]
else :
else :
return property ( _media )
new_class = ( super ( MediaDefiningClass , mcs ) . __new__ ( mcs , name , bases , attrs ) )
if 'media' not in attrs :
new_class . media = media_property ( new_class )
@ python_2_unicode_compatible
self . parent_widget = parent_widget
if attrs is not None :
else :
obj = copy . copy ( self )
obj . attrs = self . attrs . copy ( )
@ property
attrs = dict ( self . attrs , ** kwargs )
if extra_attrs :
attrs . update ( extra_attrs )
input_type = None
return format_html ( '' , flatatt ( final_attrs ) )
input_type = 'text'
if attrs is not None :
input_type = 'number'
input_type = 'email'
input_type = 'url'
input_type = 'password'
self . render_value = render_value
input_type = 'hidden'
self . choices = choices
id_ = final_attrs . get ( 'id' , None )
inputs = [ ]
if id_ :
input_attrs [ 'id' ] = '%s_%s' % ( id_ , i )
return mark_safe ( '\n' . join ( inputs ) )
input_type = 'file'
return super ( FileInput , self ) . render ( name , None , attrs = attrs )
FILE_INPUT_CONTRADICTION = object ( )
initial_text = ugettext_lazy ( 'Currently' )
input_text = ugettext_lazy ( 'Change' )
clear_checkbox_label = ugettext_lazy ( 'Clear' )
template_with_initial = '%(initial_text)s: %(initial)s %(clear_template)s
%(input_text)s: %(input)s'
template_with_clear = '%(clear)s '
url_markup_template = '{1}'
substitutions = { 'initial_text' : self . initial_text , 'input_text' : self . input_text , 'clear_template' : '' , 'clear_checkbox_label' : self . clear_checkbox_label , }
template = '%(input)s'
substitutions [ 'input' ] = super ( ClearableFileInput , self ) . render ( name , value , attrs )
template = self . template_with_initial
checkbox_name = self . clear_checkbox_name ( name )
checkbox_id = self . clear_checkbox_id ( checkbox_name )
substitutions [ 'clear_checkbox_name' ] = conditional_escape ( checkbox_name )
substitutions [ 'clear_checkbox_id' ] = conditional_escape ( checkbox_id )
substitutions [ 'clear' ] = CheckboxInput ( ) . render ( checkbox_name , False , attrs = { 'id' : checkbox_id } )
substitutions [ 'clear_template' ] = self . template_with_clear % substitutions
return mark_safe ( template % substitutions )
if upload :
default_attrs = { 'cols' : '40' , 'rows' : '10' }
if attrs :
default_attrs . update ( attrs )
return format_html ( '' , flatatt ( final_attrs ) , force_text ( value ) )
format_key = ''
self . format = format if format else None
return formats . localize_input ( value , self . format or formats . get_format ( self . format_key ) [ 0 ] )
format_key = 'DATE_INPUT_FORMATS'
format_key = 'DATETIME_INPUT_FORMATS'
format_key = 'TIME_INPUT_FORMATS'
self . check_test = boolean_check if check_test is None else check_test
final_attrs = self . build_attrs ( attrs , type = 'checkbox' , name = name )
final_attrs [ 'checked' ] = 'checked'
return format_html ( '' , flatatt ( final_attrs ) )
self . choices = list ( choices )
output = [ format_html ( '