Python源码示例:sqlparse.sql.Parenthesis()
示例1
def get_cte_from_token(tok, pos0):
cte_name = tok.get_real_name()
if not cte_name:
return None
# Find the start position of the opening parens enclosing the cte body
idx, parens = tok.token_next_by(Parenthesis)
if not parens:
return None
start_pos = pos0 + token_start_pos(tok.tokens, idx)
cte_len = len(str(parens)) # includes parens
stop_pos = start_pos + cte_len
column_names = extract_column_names(parens)
return TableExpression(cte_name, column_names, start_pos, stop_pos)
示例2
def build_comparison(token):
assert type(token) is S.Comparison
m = M.Comparison()
for tok in remove_whitespace(token.tokens):
LOG.debug(" %s %s", tok, type(tok))
if type(tok) is S.Parenthesis:
subtokens = remove_whitespace(tok.tokens)
subquery = tokens_to_sqla(subtokens[1:-1])
if not m.left:
m.left = subquery
else:
m.right = subquery
else:
m = sql_literal_to_model(tok, m)
if not m:
raise Exception("[BUG] Failed to convert %s to model" % tok)
return m
示例3
def _get_primary_key(self, def_tokens):
EXPECT_PRIMARY = 0
EXPECT_KEY = 1
EXPECT_COLUMN = 2
state = EXPECT_PRIMARY
for token in def_tokens:
if state == EXPECT_PRIMARY and token.match(T.Keyword, 'PRIMARY'):
state = EXPECT_KEY
elif state == EXPECT_KEY and token.value.upper() == 'KEY':
state = EXPECT_COLUMN
elif state == EXPECT_COLUMN and isinstance(token, sql.Parenthesis):
return [
self._clean_identifier_quotes(t.value)
for t in token.tokens[1:-1]
if t.ttype in (T.Name, T.Literal.String.Symbol)
]
return []
示例4
def group_comparison(tlist):
sqlcls = (sql.Parenthesis, sql.Function, sql.Identifier,
sql.Operation)
ttypes = T_NUMERICAL + T_STRING + T_NAME
def match(token):
return token.ttype == T.Operator.Comparison
def valid(token):
if imt(token, t=ttypes, i=sqlcls):
return True
elif token and token.is_keyword and token.normalized == 'NULL':
return True
else:
return False
def post(tlist, pidx, tidx, nidx):
return pidx, nidx
valid_prev = valid_next = valid
_group(tlist, sql.Comparison, match,
valid_prev, valid_next, post, extend=False)
示例5
def group_operator(tlist):
ttypes = T_NUMERICAL + T_STRING + T_NAME
sqlcls = (sql.SquareBrackets, sql.Parenthesis, sql.Function,
sql.Identifier, sql.Operation)
def match(token):
return imt(token, t=(T.Operator, T.Wildcard))
def valid(token):
return imt(token, i=sqlcls, t=ttypes)
def post(tlist, pidx, tidx, nidx):
tlist[tidx].ttype = T.Operator
return pidx, nidx
valid_prev = valid_next = valid
_group(tlist, sql.Operation, match,
valid_prev, valid_next, post, extend=False)
示例6
def group_functions(tlist):
has_create = False
has_table = False
for tmp_token in tlist.tokens:
if tmp_token.value == 'CREATE':
has_create = True
if tmp_token.value == 'TABLE':
has_table = True
if has_create and has_table:
return
tidx, token = tlist.token_next_by(t=T.Name)
while token:
nidx, next_ = tlist.token_next(tidx)
if isinstance(next_, sql.Parenthesis):
tlist.group_tokens(sql.Function, tidx, nidx)
tidx, token = tlist.token_next_by(t=T.Name, idx=tidx)
示例7
def get_cte_from_token(tok, pos0):
cte_name = tok.get_real_name()
if not cte_name:
return None
# Find the start position of the opening parens enclosing the cte body
idx, parens = tok.token_next_by(Parenthesis)
if not parens:
return None
start_pos = pos0 + token_start_pos(tok.tokens, idx)
cte_len = len(str(parens)) # includes parens
stop_pos = start_pos + cte_len
column_names = extract_column_names(parens)
return TableExpression(cte_name, column_names, start_pos, stop_pos)
示例8
def parse(self):
tok = self.statement.next()
sql = SQLToken.token2sql(tok, self.query)
right_table = self.right_table = sql.table
tok = self.statement.next()
if not tok.match(tokens.Keyword, 'ON'):
raise SQLDecodeError
tok = self.statement.next()
if isinstance(tok, Parenthesis):
tok = tok[1]
sql = SQLToken.token2sql(tok, self.query)
if right_table == sql.right_table:
self.left_table = sql.left_table
self.left_column = sql.left_column
self.right_column = sql.right_column
else:
self.left_table = sql.right_table
self.left_column = sql.right_column
self.right_column = sql.left_column
示例9
def group_parenthesis(tlist):
_group_matching(tlist, sql.Parenthesis)
示例10
def group_aliased(tlist):
I_ALIAS = (sql.Parenthesis, sql.Function, sql.Case, sql.Identifier,
sql.Operation)
tidx, token = tlist.token_next_by(i=I_ALIAS, t=T.Number)
while token:
nidx, next_ = tlist.token_next(tidx)
if isinstance(next_, sql.Identifier):
tlist.group_tokens(sql.Identifier, tidx, nidx, extend=True)
tidx, token = tlist.token_next_by(i=I_ALIAS, t=T.Number, idx=tidx)
示例11
def group_comparison(tlist):
def _parts_valid(token):
return (token.ttype in (T.String.Symbol, T.String.Single,
T.Name, T.Number, T.Number.Float,
T.Number.Integer, T.Literal,
T.Literal.Number.Integer, T.Name.Placeholder)
or isinstance(token, (sql.Identifier, sql.Parenthesis))
or (token.ttype is T.Keyword
and token.value.upper() in ['NULL', ]))
_group_left_right(tlist, T.Operator.Comparison, None, sql.Comparison,
check_left=_parts_valid, check_right=_parts_valid)
示例12
def group_functions(tlist):
[group_functions(sgroup) for sgroup in tlist.get_sublists()
if not isinstance(sgroup, sql.Function)]
idx = 0
token = tlist.token_next_by_type(idx, T.Name)
while token:
next_ = tlist.token_next(token)
if not isinstance(next_, sql.Parenthesis):
idx = tlist.token_index(token) + 1
else:
func = tlist.group_tokens(sql.Function,
tlist.tokens_between(token, next_))
idx = tlist.token_index(func) + 1
token = tlist.token_next_by_type(idx, T.Name)
示例13
def test_issue9(self):
# make sure where doesn't consume parenthesis
p = sqlparse.parse('(where 1)')[0]
self.assert_(isinstance(p, sql.Statement))
self.assertEqual(len(p.tokens), 1)
self.assert_(isinstance(p.tokens[0], sql.Parenthesis))
prt = p.tokens[0]
self.assertEqual(len(prt.tokens), 3)
self.assertEqual(prt.tokens[0].ttype, T.Punctuation)
self.assertEqual(prt.tokens[-1].ttype, T.Punctuation)
示例14
def test_parenthesis(self):
s = 'select (select (x3) x2) and (y2) bar'
parsed = sqlparse.parse(s)[0]
self.ndiffAssertEqual(s, str(parsed))
self.assertEqual(len(parsed.tokens), 7)
self.assert_(isinstance(parsed.tokens[2], sql.Parenthesis))
self.assert_(isinstance(parsed.tokens[-1], sql.Identifier))
self.assertEqual(len(parsed.tokens[2].tokens), 5)
self.assert_(isinstance(parsed.tokens[2].tokens[3], sql.Identifier))
self.assert_(isinstance(parsed.tokens[2].tokens[3].tokens[0], sql.Parenthesis))
self.assertEqual(len(parsed.tokens[2].tokens[3].tokens), 3)
示例15
def test_function_not_in(self): # issue183
p = sqlparse.parse('in(1, 2)')[0]
self.assertEqual(len(p.tokens), 2)
self.assertEqual(p.tokens[0].ttype, T.Keyword)
self.assert_(isinstance(p.tokens[1], sql.Parenthesis))
示例16
def test_comparison_with_parenthesis(): # issue23
p = sqlparse.parse('(3 + 4) = 7')[0]
assert len(p.tokens) == 1
assert isinstance(p.tokens[0], sql.Comparison)
comp = p.tokens[0]
assert isinstance(comp.left, sql.Parenthesis)
assert comp.right.ttype is T.Number.Integer
示例17
def group_comparison(tlist):
def _parts_valid(token):
return (token.ttype in (T.String.Symbol, T.String.Single,
T.Name, T.Number, T.Number.Float,
T.Number.Integer, T.Literal,
T.Literal.Number.Integer, T.Name.Placeholder)
or isinstance(token, (sql.Identifier, sql.Parenthesis))
or (token.ttype is T.Keyword
and token.value.upper() in ['NULL', ]))
_group_left_right(tlist, T.Operator.Comparison, None, sql.Comparison,
check_left=_parts_valid, check_right=_parts_valid)
示例18
def group_functions(tlist):
[group_functions(sgroup) for sgroup in tlist.get_sublists()
if not isinstance(sgroup, sql.Function)]
idx = 0
token = tlist.token_next_by_type(idx, T.Name)
while token:
next_ = tlist.token_next(token)
if not isinstance(next_, sql.Parenthesis):
idx = tlist.token_index(token) + 1
else:
func = tlist.group_tokens(sql.Function,
tlist.tokens_between(token, next_))
idx = tlist.token_index(func) + 1
token = tlist.token_next_by_type(idx, T.Name)
示例19
def is_parenthesis(token):
"""
括弧判定
"""
return isinstance(token, sql.Parenthesis)
示例20
def sql_recursively_strip(node):
for sub_node in node.get_sublists():
sql_recursively_strip(sub_node)
if isinstance(node, Comment):
return node
sql_strip(node)
# strip duplicate whitespaces between parenthesis
if isinstance(node, Parenthesis):
sql_trim(node, 1)
sql_trim(node, -2)
return node
示例21
def tokens2sql(token: Token,
query: 'query_module.BaseQuery'
) -> Iterator[all_token_types]:
from .functions import SQLFunc
if isinstance(token, Identifier):
# Bug fix for sql parse
if isinstance(token[0], Parenthesis):
try:
int(token[0][1].value)
except ValueError:
raise
else:
yield SQLConstIdentifier(token, query)
elif isinstance(token[0], Function):
yield SQLFunc.token2sql(token, query)
else:
yield SQLIdentifier(token, query)
elif isinstance(token, Function):
yield SQLFunc.token2sql(token, query)
elif isinstance(token, Comparison):
yield SQLComparison(token, query)
elif isinstance(token, IdentifierList):
for tok in token.get_identifiers():
yield from SQLToken.tokens2sql(tok, query)
elif isinstance(token, Parenthesis):
yield SQLPlaceholder(token, query)
else:
raise SQLDecodeError(f'Unsupported: {token.value}')
示例22
def group_brackets(tlist):
"""Group parentheses () or square brackets []
This is just like _group_matching, but complicated by the fact that
round brackets can contain square bracket groups and vice versa
"""
if isinstance(tlist, (sql.Parenthesis, sql.SquareBrackets)):
idx = 1
else:
idx = 0
# Find the first opening bracket
token = tlist.token_next_match(idx, T.Punctuation, ['(', '['])
while token:
start_val = token.value # either '(' or '['
if start_val == '(':
end_val = ')'
group_class = sql.Parenthesis
else:
end_val = ']'
group_class = sql.SquareBrackets
tidx = tlist.token_index(token)
# Find the corresponding closing bracket
end = _find_matching(tidx, tlist, T.Punctuation, start_val,
T.Punctuation, end_val)
if end is None:
idx = tidx + 1
else:
group = tlist.group_tokens(group_class,
tlist.tokens_between(token, end))
# Check for nested bracket groups within this group
group_brackets(group)
idx = tlist.token_index(group) + 1
# Find the next opening bracket
token = tlist.token_next_match(idx, T.Punctuation, ['(', '['])
示例23
def group_brackets(tlist):
"""Group parentheses () or square brackets []
This is just like _group_matching, but complicated by the fact that
round brackets can contain square bracket groups and vice versa
"""
if isinstance(tlist, (sql.Parenthesis, sql.SquareBrackets)):
idx = 1
else:
idx = 0
# Find the first opening bracket
token = tlist.token_next_match(idx, T.Punctuation, ['(', '['])
while token:
start_val = token.value # either '(' or '['
if start_val == '(':
end_val = ')'
group_class = sql.Parenthesis
else:
end_val = ']'
group_class = sql.SquareBrackets
tidx = tlist.token_index(token)
# Find the corresponding closing bracket
end = _find_matching(tidx, tlist, T.Punctuation, start_val,
T.Punctuation, end_val)
if end is None:
idx = tidx + 1
else:
group = tlist.group_tokens(group_class,
tlist.tokens_between(token, end))
# Check for nested bracket groups within this group
group_brackets(group)
idx = tlist.token_index(group) + 1
# Find the next opening bracket
token = tlist.token_next_match(idx, T.Punctuation, ['(', '['])
示例24
def _token2op(self,
tok: Token,
statement: SQLStatement) -> '_Op':
op = None
kw = {'statement': statement, 'query': self.query}
if tok.match(tokens.Keyword, 'AND'):
op = AndOp(**kw)
elif tok.match(tokens.Keyword, 'OR'):
op = OrOp(**kw)
elif tok.match(tokens.Keyword, 'IN'):
op = InOp(**kw)
elif tok.match(tokens.Keyword, 'NOT'):
if statement.next_token.match(tokens.Keyword, 'IN'):
op = NotInOp(**kw)
statement.skip(1)
else:
op = NotOp(**kw)
elif tok.match(tokens.Keyword, 'LIKE'):
op = LikeOp(**kw)
elif tok.match(tokens.Keyword, 'iLIKE'):
op = iLikeOp(**kw)
elif tok.match(tokens.Keyword, 'BETWEEN'):
op = BetweenOp(**kw)
statement.skip(3)
elif tok.match(tokens.Keyword, 'IS'):
op = IsOp(**kw)
elif isinstance(tok, Comparison):
op = CmpOp(tok, self.query)
elif isinstance(tok, Parenthesis):
if (tok[1].match(tokens.Name.Placeholder, '.*', regex=True)
or tok[1].match(tokens.Keyword, 'Null')
or isinstance(tok[1], IdentifierList)
or tok[1].ttype == tokens.DML
):
pass
else:
op = ParenthesisOp(SQLStatement(tok), self.query)
elif tok.match(tokens.Punctuation, (')', '(')):
pass
elif isinstance(tok, Identifier):
pass
else:
raise SQLDecodeError
return op