Python源码示例:httpx.AsyncClient()
示例1
def get_challenges(
client: httpx.AsyncClient, page_index: int = 0, page_size: int = 999
):
"""Get challenges, given the relevant parameters."""
return (
await client.post(
constants.Challenges.URL,
headers=dict(accessToken=constants.Challenges.TOKEN),
json={
"pageIndex": page_index,
"pageSize": page_size,
"orderBy": [{"desc": "answerDate"}],
"where": [
{"field": "sys.versionStatus", "equalTo": "published"},
{"field": "sys.contentTypeId", "in": ["mathsQuiz"]},
],
"fields": ["entryTitle", "category", "sys", "description", "answer"],
},
)
).json()["items"]
示例2
def update_challenge(self):
"""Check the Kings site for the latest challenges."""
print("Updating maths challenges...")
latest_challenge = float("inf")
latest_challenge = int(
self.channel.topic.split("Nerds, the lot of you | Challenge ")[1].split(
" "
)[0][:-1]
)
async with httpx.AsyncClient() as client:
challenges = await get_challenges(client)
for number, challenge in enumerate(challenges[::-1], 1):
title = challenge["entryTitle"]
if number > latest_challenge:
await self.challenge(self.channel, len(challenges) - number + 1)
await self.channel.edit(topic=constants.Challenges.TOPIC.format(title))
print("Maths challenges successfully updated.")
示例3
def test_unix_connection():
app = Sanic(name=__name__)
@app.get("/")
def handler(request):
return text(f"{request.conn_info.server}")
@app.listener("after_server_start")
async def client(app, loop):
try:
async with httpx.AsyncClient(uds=SOCKPATH) as client:
r = await client.get("http://myhost.invalid/")
assert r.status_code == 200
assert r.text == os.path.abspath(SOCKPATH)
finally:
app.stop()
app.run(host="myhost.invalid", unix=SOCKPATH)
示例4
def choose_http_client(session):
try:
import aiohttp
if session is None or isinstance(session, aiohttp.ClientSession):
from aiochclient.http_clients.aiohttp import AiohttpHttpClient
return AiohttpHttpClient
except ImportError:
pass
try:
import httpx
if session is None or isinstance(session, httpx.AsyncClient):
from aiochclient.http_clients.httpx import HttpxHttpClient
return HttpxHttpClient
except ImportError:
pass
raise ChClientError('Async http client heeded. Please install aiohttp or httpx')
示例5
def test_user_that_does_not_follows_another_will_receive_profile_without_follow(
app: FastAPI, authorized_client: AsyncClient, pool: Pool
) -> None:
async with pool.acquire() as conn:
users_repo = UsersRepository(conn)
user = await users_repo.create_user(
username="user_for_following",
email="test-for-following@email.com",
password="password",
)
response = await authorized_client.get(
app.url_path_for("profiles:get-profile", username=user.username)
)
profile = ProfileInResponse(**response.json())
assert profile.profile.username == user.username
assert not profile.profile.following
示例6
def test_user_that_follows_another_will_receive_profile_with_follow(
app: FastAPI, authorized_client: AsyncClient, pool: Pool, test_user: UserInDB
) -> None:
async with pool.acquire() as conn:
users_repo = UsersRepository(conn)
user = await users_repo.create_user(
username="user_for_following",
email="test-for-following@email.com",
password="password",
)
profiles_repo = ProfilesRepository(conn)
await profiles_repo.add_user_into_followers(
target_user=user, requested_user=test_user
)
response = await authorized_client.get(
app.url_path_for("profiles:get-profile", username=user.username)
)
profile = ProfileInResponse(**response.json())
assert profile.profile.username == user.username
assert profile.profile.following
示例7
def test_user_success_registration(
app: FastAPI, client: AsyncClient, pool: Pool
) -> None:
email, username, password = "test@test.com", "username", "password"
registration_json = {
"user": {"email": email, "username": username, "password": password}
}
response = await client.post(
app.url_path_for("auth:register"), json=registration_json
)
assert response.status_code == HTTP_201_CREATED
async with pool.acquire() as conn:
repo = UsersRepository(conn)
user = await repo.get_user_by_email(email=email)
assert user.email == email
assert user.username == username
assert user.check_password(password)
示例8
def test_failed_user_registration_when_some_credentials_are_taken(
app: FastAPI,
client: AsyncClient,
test_user: UserInDB,
credentials_part: str,
credentials_value: str,
) -> None:
registration_json = {
"user": {
"email": "test@test.com",
"username": "username",
"password": "password",
}
}
registration_json["user"][credentials_part] = credentials_value
response = await client.post(
app.url_path_for("auth:register"), json=registration_json
)
assert response.status_code == HTTP_400_BAD_REQUEST
示例9
def test_user_can_update_own_profile(
app: FastAPI,
authorized_client: AsyncClient,
test_user: UserInDB,
token: str,
update_value: str,
update_field: str,
) -> None:
response = await authorized_client.put(
app.url_path_for("users:update-current-user"),
json={"user": {update_field: update_value}},
)
assert response.status_code == status.HTTP_200_OK
user_profile = UserInResponse(**response.json()).dict()
assert user_profile["user"][update_field] == update_value
示例10
def test_user_can_change_password(
app: FastAPI,
authorized_client: AsyncClient,
test_user: UserInDB,
token: str,
pool: Pool,
) -> None:
response = await authorized_client.put(
app.url_path_for("users:update-current-user"),
json={"user": {"password": "new_password"}},
)
assert response.status_code == status.HTTP_200_OK
user_profile = UserInResponse(**response.json())
async with pool.acquire() as connection:
users_repo = UsersRepository(connection)
user = await users_repo.get_user_by_username(
username=user_profile.user.username
)
assert user.check_password("new_password")
示例11
def test_user_can_not_take_already_used_credentials(
app: FastAPI,
authorized_client: AsyncClient,
pool: Pool,
token: str,
credentials_part: str,
credentials_value: str,
) -> None:
user_dict = {
"username": "not_taken_username",
"password": "password",
"email": "free_email@email.com",
}
user_dict.update({credentials_part: credentials_value})
async with pool.acquire() as conn:
users_repo = UsersRepository(conn)
await users_repo.create_user(**user_dict)
response = await authorized_client.put(
app.url_path_for("users:update-current-user"),
json={"user": {credentials_part: credentials_value}},
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
示例12
def test_user_can_add_comment_for_article(
app: FastAPI, authorized_client: AsyncClient, test_article: Article
) -> None:
created_comment_response = await authorized_client.post(
app.url_path_for("comments:create-comment-for-article", slug=test_article.slug),
json={"comment": {"body": "comment"}},
)
created_comment = CommentInResponse(**created_comment_response.json())
comments_for_article_response = await authorized_client.get(
app.url_path_for("comments:get-comments-for-article", slug=test_article.slug)
)
comments = ListOfCommentsInResponse(**comments_for_article_response.json())
assert created_comment.comment == comments.comments[0]
示例13
def test_user_can_delete_own_comment(
app: FastAPI, authorized_client: AsyncClient, test_article: Article
) -> None:
created_comment_response = await authorized_client.post(
app.url_path_for("comments:create-comment-for-article", slug=test_article.slug),
json={"comment": {"body": "comment"}},
)
created_comment = CommentInResponse(**created_comment_response.json())
await authorized_client.delete(
app.url_path_for(
"comments:delete-comment-from-article",
slug=test_article.slug,
comment_id=str(created_comment.comment.id_),
)
)
comments_for_article_response = await authorized_client.get(
app.url_path_for("comments:get-comments-for-article", slug=test_article.slug)
)
comments = ListOfCommentsInResponse(**comments_for_article_response.json())
assert len(comments.comments) == 0
示例14
def test_user_can_not_change_article_state_twice(
app: FastAPI,
authorized_client: AsyncClient,
test_article: Article,
test_user: UserInDB,
pool: Pool,
api_method: str,
route_name: str,
favorite_state: bool,
) -> None:
if favorite_state:
async with pool.acquire() as connection:
articles_repo = ArticlesRepository(connection)
await articles_repo.add_article_into_favorites(
article=test_article, user=test_user
)
response = await authorized_client.request(
api_method, app.url_path_for(route_name, slug=test_article.slug)
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
示例15
def monkeypatch():
@CassettePatcherBuilder._build_patchers_from_mock_triples_decorator
def _async_httpx(self):
new_async_client_send = async_vcr_send(self._cassette, httpx.AsyncClient.send)
yield httpx.AsyncClient, "send", new_async_client_send
@CassettePatcherBuilder._build_patchers_from_mock_triples_decorator
def _sync_httpx(self):
new_sync_client_send = sync_vcr_send(self._cassette, httpx.Client.send)
yield httpx.Client, "send", new_sync_client_send
real_build = CassettePatcherBuilder.build
def patched_build(self):
return itertools.chain(real_build(self), _sync_httpx(self), _async_httpx(self))
CassettePatcherBuilder.build = patched_build
示例16
def call_action(self, action: str, **params) -> Any:
if not self._api_root:
raise ApiNotAvailable
headers = {}
if self._access_token:
headers['Authorization'] = 'Bearer ' + self._access_token
try:
async with httpx.AsyncClient() as client:
resp = await client.post(self._api_root + action,
json=params, headers=headers)
if 200 <= resp.status_code < 300:
return _handle_api_result(json.loads(resp.text))
raise HttpFailed(resp.status_code)
except httpx.InvalidURL:
raise NetworkError('API root url invalid')
except httpx.HTTPError:
raise NetworkError('HTTP request failed')
示例17
def __init__(self, auth: Optional[Union[LoginAuthenticator, FileAuthenticator]] = None,
session=None, is_async=False, **options) -> None:
self.auth = auth
self.is_async = is_async
self.session = (session or (httpx.AsyncClient() if is_async
else httpx.Client()))
headers = {
"Accept": "application/json",
"Content-Type": "application/json"
}
self.session.headers.update(headers)
locale = test_convert("locale", options.get("locale", auth.locale))
domain = options.get("domain", locale.domain)
self.api_root_url = options.get("url", f"https://api.audible.{domain}")
self.timeout = options.get('timeout', 10)
示例18
def test_success(self, test_app_client: httpx.AsyncClient, oauth_client):
with asynctest.patch.object(oauth_client, "get_authorization_url") as mock:
mock.return_value = "AUTHORIZATION_URL"
response = await test_app_client.get(
"/authorize",
params={
"authentication_backend": "mock",
"scopes": ["scope1", "scope2"],
},
)
assert response.status_code == status.HTTP_200_OK
mock.assert_awaited_once()
data = response.json()
assert "authorization_url" in data
示例19
def test_with_redirect_url(
self, test_app_client_redirect_url: httpx.AsyncClient, oauth_client
):
with asynctest.patch.object(oauth_client, "get_authorization_url") as mock:
mock.return_value = "AUTHORIZATION_URL"
response = await test_app_client_redirect_url.get(
"/authorize",
params={
"authentication_backend": "mock",
"scopes": ["scope1", "scope2"],
},
)
assert response.status_code == status.HTTP_200_OK
mock.assert_awaited_once()
data = response.json()
assert "authorization_url" in data
示例20
def get_challenge(number: int) -> dict:
async with httpx.AsyncClient() as client:
challenge, *_ = await get_challenges(client, page_index=number - 1, page_size=1)
question = (
await client.post(
constants.Challenges.URL,
headers=dict(accessToken=constants.Challenges.TOKEN),
json={
"pageIndex": 0,
"pageSize": 1,
"where": [
{"field": "sys.slug", "equalTo": challenge["sys"]["slug"]},
{"field": "sys.versionStatus", "equalTo": "published"},
],
},
)
).json()["items"][0]["question"]
asset = question[1]["value"]["asset"]["sys"] if len(question) > 1 else None
return {
"title": challenge["entryTitle"],
"published": dateutil.parser.isoparse(
challenge["sys"]["version"]["created"]
).strftime("%d/%m/%Y"),
"category": challenge["category"][0]["entryTitle"],
"challenge": convert(question[0]["value"]).replace(" ", "")[:-1],
"image": (
(
"https://www.kingsmathsschool.com"
"".join(
asset["uri"].rpartition("/")[:2] + (asset["properties"]["filename"],)
)
)
if asset
else ""
),
"description": challenge["description"],
"slug": challenge["sys"]["slug"],
}
示例21
def client(app, loop):
try:
async with httpx.AsyncClient(uds=SOCKPATH) as client:
r = await client.get("http://myhost.invalid/")
assert r.status_code == 200
assert r.text == os.path.abspath(SOCKPATH)
finally:
app.stop()
示例22
def get_new_session(self):
return httpx.AsyncClient(verify=False)
示例23
def send(self, request: Request) -> Response:
"""Send request with new client."""
async with AsyncClient() as client:
return await client.request(
request.method,
request.url,
data=request.data or None,
params=request.params or None,
headers=request.headers,
**self.httpx_kwargs,
)
示例24
def __init__(self, client: AsyncClient = None, **httpx_kwargs):
from tekore import default_httpx_kwargs
self.httpx_kwargs = httpx_kwargs or default_httpx_kwargs
self.client = client or AsyncClient()
示例25
def test_async_client_is_reused(self):
mock = AsyncMock()
with patch(module + '.AsyncClient.request', mock):
s = AsyncPersistentSender()
c1 = s.client
await s.send(Request())
await s.send(Request())
c2 = s.client
assert c1 is c2
示例26
def test_async_client_is_not_reused(self):
client = AsyncClient()
client.request = AsyncMock()
mock = MagicMock(return_value=client)
with patch(module + '.AsyncClient', mock):
s = AsyncTransientSender()
await s.send(Request())
await s.send(Request())
assert mock.call_count == 2
示例27
def __init__(self, session: Optional[AsyncClient]):
if session:
self._session = session
else:
self._session = AsyncClient()
示例28
def fetch_ngrok_tunnels_data() -> List[Dict[str, Any]]:
async with httpx.AsyncClient() as http_client:
url = NGROK_API_ROOT + "/tunnels"
response = await http_client.get(url)
response.raise_for_status()
data = response.json()
result = data.get("tunnels", []) # type: ignore
return result
示例29
def __init__(self):
self.client = httpx.AsyncClient(timeout=0.3, verify=settings["VERIFY_HTTPS"])
示例30
def __init__(self):
self.client = httpx.AsyncClient(verify=settings["VERIFY_HTTPS"])