Python源码示例:sqlparse.sql.Token()
示例1
def _process_case(self, tlist):
offset_ = len('case ') + len('when ')
cases = tlist.get_cases(skip_ws=True)
# align the end as well
end_token = tlist.token_next_by(m=(T.Keyword, 'END'))[1]
cases.append((None, [end_token]))
condition_width = [len(' '.join(map(text_type, cond))) if cond else 0
for cond, _ in cases]
max_cond_width = max(condition_width)
for i, (cond, value) in enumerate(cases):
# cond is None when 'else or end'
stmt = cond[0] if cond else value[0]
if i > 0:
tlist.insert_before(stmt, self.nl(
offset_ - len(text_type(stmt))))
if cond:
ws = sql.Token(T.Whitespace, self.char * (
max_cond_width - condition_width[i]))
tlist.insert_after(cond[-1], ws)
示例2
def _process(self, group, stream):
for token in stream:
if token.is_whitespace and '\n' in token.value:
if token.value.endswith('\n'):
self.line = ''
else:
self.line = token.value.splitlines()[-1]
elif token.is_group and type(token) not in self.keep_together:
token.tokens = self._process(token, token.tokens)
else:
val = text_type(token)
if len(self.line) + len(val) > self.width:
match = re.search(r'^ +', self.line)
if match is not None:
indent = match.group()
else:
indent = ''
yield sql.Token(T.Whitespace, '\n{0}'.format(indent))
self.line = indent
self.line += val
yield token
示例3
def _process(tlist):
def get_next_comment():
# TODO(andi) Comment types should be unified, see related issue38
return tlist.token_next_by(i=sql.Comment, t=T.Comment)
tidx, token = get_next_comment()
while token:
pidx, prev_ = tlist.token_prev(tidx, skip_ws=False)
nidx, next_ = tlist.token_next(tidx, skip_ws=False)
# Replace by whitespace if prev and next exist and if they're not
# whitespaces. This doesn't apply if prev or next is a paranthesis.
if (prev_ is None or next_ is None or
prev_.is_whitespace or prev_.match(T.Punctuation, '(') or
next_.is_whitespace or next_.match(T.Punctuation, ')')):
tlist.tokens.remove(token)
else:
tlist.tokens[tidx] = sql.Token(T.Whitespace, ' ')
tidx, token = get_next_comment()
示例4
def _process(self, tlist):
token = self._get_next_comment(tlist)
while token:
tidx = tlist.token_index(token)
prev = tlist.token_prev(tidx, False)
next_ = tlist.token_next(tidx, False)
# Replace by whitespace if prev and next exist and if they're not
# whitespaces. This doesn't apply if prev or next is a paranthesis.
if (prev is not None and next_ is not None
and not prev.is_whitespace() and not next_.is_whitespace()
and not (prev.match(T.Punctuation, '(')
or next_.match(T.Punctuation, ')'))):
tlist.tokens[tidx] = sql.Token(T.Whitespace, ' ')
else:
tlist.tokens.pop(tidx)
token = self._get_next_comment(tlist)
示例5
def process(self, stack, stmt):
if isinstance(stmt, sql.Statement):
self._curr_stmt = stmt
self._process(stmt)
if isinstance(stmt, sql.Statement):
if self._last_stmt is not None:
if unicode(self._last_stmt).endswith('\n'):
nl = '\n'
else:
nl = '\n\n'
stmt.tokens.insert(
0, sql.Token(T.Whitespace, nl))
if self._last_stmt != stmt:
self._last_stmt = stmt
# FIXME: Doesn't work ;)
示例6
def _process(self, stack, group, stream):
for token in stream:
if token.is_whitespace() and '\n' in token.value:
if token.value.endswith('\n'):
self.line = ''
else:
self.line = token.value.splitlines()[-1]
elif (token.is_group()
and not token.__class__ in self.keep_together):
token.tokens = self._process(stack, token, token.tokens)
else:
val = unicode(token)
if len(self.line) + len(val) > self.width:
match = re.search('^ +', self.line)
if match is not None:
indent = match.group()
else:
indent = ''
yield sql.Token(T.Whitespace, '\n%s' % indent)
self.line = indent
self.line += val
yield token
示例7
def pop(self):
next_val = self.peek()
self.index += 1
# We need to handle three cases here where the next_val could be:
# 1. <table_name> ('business')
# 2. <database_name>.<table_name> ('yelp.business')
# 3. <database_name>.<table_name> <extended_query>
# ('yelp.business change col_one col_two')
# In all the cases we should return a token consisting of only the table
# name or if the database name is present then the database name and the
# table name. Case #3 occurs because SQLParse incorrectly parses certain
# queries.
if isinstance(next_val, Identifier):
tokens = next_val.tokens
if len(tokens) > 1 and tokens[1].value == '.':
str_token = "{db_name}{punctuation}{table_name}".format(
db_name=tokens[0].value,
punctuation=tokens[1].value,
table_name=tokens[2].value
)
return TK(Token.Name, str_token)
else:
return next_val.token_first()
return next_val
示例8
def _process(self, tlist):
token = self._get_next_comment(tlist)
while token:
tidx = tlist.token_index(token)
prev = tlist.token_prev(tidx, False)
next_ = tlist.token_next(tidx, False)
# Replace by whitespace if prev and next exist and if they're not
# whitespaces. This doesn't apply if prev or next is a paranthesis.
if (prev is not None and next_ is not None
and not prev.is_whitespace() and not next_.is_whitespace()
and not (prev.match(T.Punctuation, '(')
or next_.match(T.Punctuation, ')'))):
tlist.tokens[tidx] = sql.Token(T.Whitespace, ' ')
else:
tlist.tokens.pop(tidx)
token = self._get_next_comment(tlist)
示例9
def process(self, stack, stmt):
if isinstance(stmt, sql.Statement):
self._curr_stmt = stmt
self._process(stmt)
if isinstance(stmt, sql.Statement):
if self._last_stmt is not None:
if str(self._last_stmt).endswith('\n'):
nl = '\n'
else:
nl = '\n\n'
stmt.tokens.insert(
0, sql.Token(T.Whitespace, nl))
if self._last_stmt != stmt:
self._last_stmt = stmt
# FIXME: Doesn't work ;)
示例10
def _process(self, stack, group, stream):
for token in stream:
if token.is_whitespace() and '\n' in token.value:
if token.value.endswith('\n'):
self.line = ''
else:
self.line = token.value.splitlines()[-1]
elif (token.is_group()
and not token.__class__ in self.keep_together):
token.tokens = self._process(stack, token, token.tokens)
else:
val = str(token)
if len(self.line) + len(val) > self.width:
match = re.search('^ +', self.line)
if match is not None:
indent = match.group()
else:
indent = ''
yield sql.Token(T.Whitespace, '\n%s' % indent)
self.line = indent
self.line += val
yield token
示例11
def _get_alias(self, token):
tkw = token.token_next_match(0, T.Keyword, 'AS')
if tkw is not None:
return tu.token_next_enable(token, tkw)
left = tu.token_next_enable(token)
if not left:
return None
def is_space(tkn):
return tkn.is_whitespace() and tkn.value
spl = token.token_matching(token.token_index(left), [is_space])
if spl:
return tu.token_next_enable(token, spl)
if tu.is_parenthesis(left):
tkn = tu.token_next_enable(token, left)
if tkn and (tu.is_identifier(tkn) or (tkn.ttype in T.Name)):
# (・・・)ALIAS の場合
space = sql.Token(T.Whitespace, "\t") # スペースを付与
token.insert_after(left, space)
return tkn
return None
示例12
def __custom_process_insert_values_lr(self, tlist):
#INSERT の場合VALUES前後に空白1つをセット
values_token = tlist.token_next_match(0, T.Keyword, "VALUES")
if values_token:
prv = tlist.token_prev(values_token, skip_ws=False)
if prv and prv.is_whitespace():
prv.value = " "
prv = tlist.token_prev(prv, skip_ws=False)
while prv and prv.is_whitespace():
prv.value = ""
prv = tlist.token_prev(prv, skip_ws=False)
else:
tlist.insert_before(values_token, sql.Token(T.Whitespace, " "))
nxt = tlist.token_next(values_token, skip_ws=False)
if nxt and nxt.is_whitespace():
nxt.value = " "
nxt = tlist.token_next(nxt, skip_ws=False)
while nxt and nxt.is_whitespace():
nxt.value = ""
nxt = tlist.token_next(nxt, skip_ws=False)
else:
tlist.insert_after(values_token, sql.Token(T.Whitespace, " "))
示例13
def statement2col_defs(cls, token: Token):
from djongo.base import DatabaseWrapper
supported_data_types = set(DatabaseWrapper.data_types.values())
defs = token.value.strip('()').split(',')
for col in defs:
col = col.strip()
name, other = col.split(' ', 1)
if name == 'CONSTRAINT':
yield SQLColumnConstraint()
else:
if col[0] != '"':
raise SQLDecodeError('Column identifier not quoted')
name, other = col[1:].split('"', 1)
other = other.strip()
data_type, constraint_sql = other.split(' ', 1)
if data_type not in supported_data_types:
raise NotSupportedError(f'Data of type: {data_type}')
col_constraints = set(SQLColumnDef._get_constraints(constraint_sql))
yield SQLColumnDef(name=name,
data_type=data_type,
col_constraints=col_constraints)
示例14
def is_tokens(x):
return isinstance(x, list) and len(x) > 0 and isinstance(x[0], S.Token)
示例15
def process(self, stream):
"""Process the stream"""
EOS_TTYPE = T.Whitespace, T.Comment.Single
# Run over all stream tokens
for ttype, value in stream:
# Yield token if we finished a statement and there's no whitespaces
# It will count newline token as a non whitespace. In this context
# whitespace ignores newlines.
# why don't multi line comments also count?
if self.consume_ws and ttype not in EOS_TTYPE:
yield sql.Statement(self.tokens)
# Reset filter and prepare to process next statement
self._reset()
# Change current split level (increase, decrease or remain equal)
self.level += self._change_splitlevel(ttype, value)
# Append the token to the current statement
self.tokens.append(sql.Token(ttype, value))
# Check if we get the end of a statement
if self.level <= 0 and ttype is T.Punctuation and value == ';':
self.consume_ws = True
# Yield pending statement (if any)
if self.tokens:
yield sql.Statement(self.tokens)
示例16
def nl(self, offset=1):
# offset = 1 represent a single space after SELECT
offset = -len(offset) if not isinstance(offset, int) else offset
# add two for the space and parens
indent = self.indent * (2 + self._max_kwd_len)
return sql.Token(T.Whitespace, self.n + self.char * (
self._max_kwd_len + offset + indent + self.offset))
示例17
def _process(self, stream, varname, has_nl):
# SQL query asignation to varname
if self.count > 1:
yield sql.Token(T.Whitespace, '\n')
yield sql.Token(T.Name, varname)
yield sql.Token(T.Whitespace, ' ')
yield sql.Token(T.Operator, '=')
yield sql.Token(T.Whitespace, ' ')
if has_nl:
yield sql.Token(T.Operator, '(')
yield sql.Token(T.Text, "'")
# Print the tokens on the quote
for token in stream:
# Token is a new line separator
if token.is_whitespace and '\n' in token.value:
# Close quote and add a new line
yield sql.Token(T.Text, " '")
yield sql.Token(T.Whitespace, '\n')
# Quote header on secondary lines
yield sql.Token(T.Whitespace, ' ' * (len(varname) + 4))
yield sql.Token(T.Text, "'")
# Indentation
after_lb = token.value.split('\n', 1)[1]
if after_lb:
yield sql.Token(T.Whitespace, after_lb)
continue
# Token has escape chars
elif "'" in token.value:
token.value = token.value.replace("'", "\\'")
# Put the token
yield sql.Token(T.Text, token.value)
# Close quote
yield sql.Token(T.Text, "'")
if has_nl:
yield sql.Token(T.Operator, ')')
示例18
def _process_identifierlist(self, tlist):
identifiers = list(tlist.get_identifiers())
first = next(identifiers.pop(0).flatten())
num_offset = 1 if self.char == '\t' else self._get_offset(first)
if not tlist.within(sql.Function):
with offset(self, num_offset):
position = 0
for token in identifiers:
# Add 1 for the "," separator
position += len(token.value) + 1
if position > (self.wrap_after - self.offset):
adjust = 0
if self.comma_first:
adjust = -2
_, comma = tlist.token_prev(
tlist.token_index(token))
if comma is None:
continue
token = comma
tlist.insert_before(token, self.nl(offset=adjust))
if self.comma_first:
_, ws = tlist.token_next(
tlist.token_index(token), skip_ws=False)
if (ws is not None
and ws.ttype is not T.Text.Whitespace):
tlist.insert_after(
token, sql.Token(T.Whitespace, ' '))
position = 0
self._process_default(tlist)
示例19
def process(self, stmt):
self._curr_stmt = stmt
self._process(stmt)
if self._last_stmt is not None:
nl = '\n' if text_type(self._last_stmt).endswith('\n') else '\n\n'
stmt.tokens.insert(0, sql.Token(T.Whitespace, nl))
self._last_stmt = stmt
return stmt
示例20
def _stripws_identifierlist(self, tlist):
# Removes newlines before commas, see issue140
last_nl = None
for token in list(tlist.tokens):
if last_nl and token.ttype is T.Punctuation and token.value == ',':
tlist.tokens.remove(last_nl)
last_nl = token if token.is_whitespace else None
# next_ = tlist.token_next(token, skip_ws=False)
# if (next_ and not next_.is_whitespace and
# token.ttype is T.Punctuation and token.value == ','):
# tlist.insert_after(token, sql.Token(T.Whitespace, ' '))
return self._stripws_default(tlist)
示例21
def nl(self):
# TODO: newline character should be configurable
space = (self.char * ((self.indent * self.width) + self.offset))
# Detect runaway indenting due to parsing errors
if len(space) > 200:
# something seems to be wrong, flip back
self.indent = self.offset = 0
space = (self.char * ((self.indent * self.width) + self.offset))
ws = '\n' + space
return sql.Token(T.Whitespace, ws)
示例22
def _process(self, stream, varname, has_nl):
# SQL query asignation to varname
if self.count > 1:
yield sql.Token(T.Whitespace, '\n')
yield sql.Token(T.Name, varname)
yield sql.Token(T.Whitespace, ' ')
yield sql.Token(T.Operator, '=')
yield sql.Token(T.Whitespace, ' ')
if has_nl:
yield sql.Token(T.Operator, '(')
yield sql.Token(T.Text, "'")
# Print the tokens on the quote
for token in stream:
# Token is a new line separator
if token.is_whitespace() and '\n' in token.value:
# Close quote and add a new line
yield sql.Token(T.Text, " '")
yield sql.Token(T.Whitespace, '\n')
# Quote header on secondary lines
yield sql.Token(T.Whitespace, ' ' * (len(varname) + 4))
yield sql.Token(T.Text, "'")
# Indentation
after_lb = token.value.split('\n', 1)[1]
if after_lb:
yield sql.Token(T.Whitespace, after_lb)
continue
# Token has escape chars
elif "'" in token.value:
token.value = token.value.replace("'", "\\'")
# Put the token
yield sql.Token(T.Text, token.value)
# Close quote
yield sql.Token(T.Text, "'")
if has_nl:
yield sql.Token(T.Operator, ')')
示例23
def process(self, stack, stream):
"Process the stream"
consume_ws = False
splitlevel = 0
stmt = None
stmt_tokens = []
# Run over all stream tokens
for ttype, value in stream:
# Yield token if we finished a statement and there's no whitespaces
if consume_ws and ttype not in (T.Whitespace, T.Comment.Single):
stmt.tokens = stmt_tokens
yield stmt
# Reset filter and prepare to process next statement
self._reset()
consume_ws = False
splitlevel = 0
stmt = None
# Create a new statement if we are not currently in one of them
if stmt is None:
stmt = Statement()
stmt_tokens = []
# Change current split level (increase, decrease or remain equal)
splitlevel += self._change_splitlevel(ttype, value)
# Append the token to the current statement
stmt_tokens.append(Token(ttype, value))
# Check if we get the end of a statement
if splitlevel <= 0 and ttype is T.Punctuation and value == ';':
consume_ws = True
# Yield pending statement (if any)
if stmt is not None:
stmt.tokens = stmt_tokens
yield stmt
示例24
def test_str(self):
token = sql.Token(None, 'FoO')
self.assertEqual(str(token), 'FoO')
示例25
def test_repr(self):
token = sql.Token(Keyword, 'foo')
tst = "<Keyword 'foo' at 0x"
self.assertEqual(repr(token)[:len(tst)], tst)
token = sql.Token(Keyword, '1234567890')
tst = "<Keyword '123456...' at 0x"
self.assertEqual(repr(token)[:len(tst)], tst)
示例26
def test_flatten(self):
token = sql.Token(Keyword, 'foo')
gen = token.flatten()
self.assertEqual(type(gen), types.GeneratorType)
lgen = list(gen)
self.assertEqual(lgen, [token])
示例27
def test_token_matching(self):
t1 = sql.Token(Keyword, 'foo')
t2 = sql.Token(Punctuation, ',')
x = sql.TokenList([t1, t2])
self.assertEqual(x.token_matching(0, [lambda t: t.ttype is Keyword]),
t1)
self.assertEqual(x.token_matching(0,
[lambda t: t.ttype is Punctuation]),
t2)
self.assertEqual(x.token_matching(1, [lambda t: t.ttype is Keyword]),
None)
示例28
def nl(self):
# TODO: newline character should be configurable
space = (self.char * ((self.indent * self.width) + self.offset))
# Detect runaway indenting due to parsing errors
if len(space) > 200:
# something seems to be wrong, flip back
self.indent = self.offset = 0
space = (self.char * ((self.indent * self.width) + self.offset))
ws = '\n' + space
return sql.Token(T.Whitespace, ws)
示例29
def _process(self, stream, varname, has_nl):
# SQL query asignation to varname
if self.count > 1:
yield sql.Token(T.Whitespace, '\n')
yield sql.Token(T.Name, varname)
yield sql.Token(T.Whitespace, ' ')
yield sql.Token(T.Operator, '=')
yield sql.Token(T.Whitespace, ' ')
if has_nl:
yield sql.Token(T.Operator, '(')
yield sql.Token(T.Text, "'")
# Print the tokens on the quote
for token in stream:
# Token is a new line separator
if token.is_whitespace() and '\n' in token.value:
# Close quote and add a new line
yield sql.Token(T.Text, " '")
yield sql.Token(T.Whitespace, '\n')
# Quote header on secondary lines
yield sql.Token(T.Whitespace, ' ' * (len(varname) + 4))
yield sql.Token(T.Text, "'")
# Indentation
after_lb = token.value.split('\n', 1)[1]
if after_lb:
yield sql.Token(T.Whitespace, after_lb)
continue
# Token has escape chars
elif "'" in token.value:
token.value = token.value.replace("'", "\\'")
# Put the token
yield sql.Token(T.Text, token.value)
# Close quote
yield sql.Token(T.Text, "'")
if has_nl:
yield sql.Token(T.Operator, ')')
示例30
def process(self, stack, stream):
"Process the stream"
consume_ws = False
splitlevel = 0
stmt = None
stmt_tokens = []
# Run over all stream tokens
for ttype, value in stream:
# Yield token if we finished a statement and there's no whitespaces
if consume_ws and ttype not in (T.Whitespace, T.Comment.Single):
stmt.tokens = stmt_tokens
yield stmt
# Reset filter and prepare to process next statement
self._reset()
consume_ws = False
splitlevel = 0
stmt = None
# Create a new statement if we are not currently in one of them
if stmt is None:
stmt = Statement()
stmt_tokens = []
# Change current split level (increase, decrease or remain equal)
splitlevel += self._change_splitlevel(ttype, value)
# Append the token to the current statement
stmt_tokens.append(Token(ttype, value))
# Check if we get the end of a statement
if splitlevel <= 0 and ttype is T.Punctuation and value == ';':
consume_ws = True
# Yield pending statement (if any)
if stmt is not None:
stmt.tokens = stmt_tokens
yield stmt