Skip to content

Commit

Permalink
Add timeout and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
oalfonsobmat committed Oct 17, 2023
1 parent f2cbb2a commit aba152a
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 10 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,5 @@ Response: Generator[Tuple[int, str], None, None]. For each input tuple an output

TODO:

- tests
- add flag to specify how many requests can fail, this will need to specify also which codes are "ok" or which are "not ok" do decide when to increment this count and decide to stop
- include the missing methods like PUT, DELETE, etc
18 changes: 14 additions & 4 deletions patata/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@
import traceback
from typing import List, Generator, Optional, Iterable, Callable

# from collections.abc import Iterable # only for >=3.9

import aiohttp
from aiohttp.client import ClientTimeout
from aiohttp_retry import RetryClient, ExponentialRetry

from .models import Request, Response
Expand Down Expand Up @@ -41,6 +40,7 @@
VERBOSE_LEVEL_INFO,
VERBOSE_LEVEL_DEBUG,
]
DEFAULT_TIMEOUT = 60 * 5 # 5 minutes, the aiohttp default


class Patata:
Expand Down Expand Up @@ -80,6 +80,7 @@ def http(
requests: Iterable[Request],
callbacks: Iterable[Callable] = [],
retries: int = 1,
timeout: float = DEFAULT_TIMEOUT,
) -> Generator[Response, None, None]:
"""Uses multiprocessing and aiohttp to retrieve GET or POST requests in parallel and
concurrently
Expand Down Expand Up @@ -143,6 +144,7 @@ def http(
logger.info(f" pool_submit_size: {self.pool_submit_size}")
logger.info(f" verbose_level: {self.verbose_level}")
logger.info(f" retries: {retries}")
logger.info(f" timeout: {timeout}")

init_time = time.time()
requests_in_queue = 0
Expand All @@ -161,6 +163,7 @@ def http(
callbacks=callbacks,
verbose_level=self.verbose_level,
retries=retries,
timeout=retries,
)
future.add_done_callback(self._future_done_callback)
else: # run in the main thread
Expand All @@ -171,6 +174,7 @@ def http(
callbacks=callbacks,
verbose_level=self.verbose_level,
retries=retries,
timeout=timeout,
)
)

Expand Down Expand Up @@ -255,14 +259,17 @@ def run(
callbacks: Iterable[Callable],
verbose_level: int = VERBOSE_LEVEL_INFO,
retries: int = 1,
timeout: float = DEFAULT_TIMEOUT,
) -> List[Response]:
if method.upper() not in VALID_METHODS:
raise InvalidMethodError(
f"The method {method} is not valid. Valid methods: {VALID_METHODS}"
)

responses = asyncio.run(
cls._make_requests_async(method.lower(), requests, verbose_level, retries)
cls._make_requests_async(
method.lower(), requests, verbose_level, retries, timeout
)
)

for response in responses:
Expand All @@ -284,8 +291,11 @@ async def _make_requests_async(
requests: List[Request],
verbose_level: int = VERBOSE_LEVEL_INFO,
retries: int = 1,
timeout: float = DEFAULT_TIMEOUT,
) -> List[Response]:
async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(
timeout=ClientTimeout(total=timeout)
) as session:
retry_options = ExponentialRetry(attempts=retries)
retry_client = RetryClient(session, retry_options=retry_options)
tasks = []
Expand Down
136 changes: 131 additions & 5 deletions tests/test_client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,133 @@
from patata import Patata
from http.server import HTTPServer, BaseHTTPRequestHandler
import pytest
import threading
from time import sleep

from patata import Patata, Request, Response

def test_dummy():
client = Patata()
assert client
client.close()

servers_urls = {
"200": "",
"500": "",
"wrong_headers": "",
"timeout": "",
}


def run_server_200():
global servers_urls

class WebRequestHandler200(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write('{"status": "ok"}'.encode("utf-8"))

server = HTTPServer(("", 0), WebRequestHandler200)
servers_urls["200"] = f"http://{server.server_name}:{server.server_port}"
server.serve_forever()


def run_server_500():
global servers_urls

class WebRequestHandler500(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(500)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write('{"status": "ko"}'.encode("utf-8"))

server = HTTPServer(("", 0), WebRequestHandler500)
servers_urls["500"] = f"http://{server.server_name}:{server.server_port}"
server.serve_forever()


def run_server_wrong_headers():
global servers_urls

class WebRequestHandlerWrongHeaders(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.wfile.write('{"status": "no_end_headers"}'.encode("utf-8"))

server = HTTPServer(("", 0), WebRequestHandlerWrongHeaders)
servers_urls["wrong_headers"] = f"http://{server.server_name}:{server.server_port}"
server.serve_forever()


def run_server_timeout():
global servers_urls

class WebRequestHandlerTimeout(BaseHTTPRequestHandler):
def do_GET(self):
sleep(100_000)

server = HTTPServer(("", 0), WebRequestHandlerTimeout)
servers_urls["timeout"] = f"http://{server.server_name}:{server.server_port}"
server.serve_forever()


@pytest.fixture(autouse=True, scope="session")
def http_servers():
thread_200 = threading.Thread(target=run_server_200)
thread_200.daemon = True
thread_200.start()
thread_500 = threading.Thread(target=run_server_500)
thread_500.daemon = True
thread_500.start()
thread_wrong_headers = threading.Thread(target=run_server_wrong_headers)
thread_wrong_headers.daemon = True
thread_wrong_headers.start()
thread_timeout = threading.Thread(target=run_server_timeout)
thread_timeout.daemon = True
thread_timeout.start()


def test_response_200():
with Patata() as client:
responses = list(client.http("get", [Request(id_=1, url=servers_urls["200"])]))
assert responses == [Response(id_=1, status_code=200, data={"status": "ok"})]


def test_response_200__retry(): # TODO
pass


def test_response_500():
with Patata() as client:
responses = list(client.http("get", [Request(id_=1, url=servers_urls["500"])]))
assert responses == [Response(id_=1, status_code=500, data={"status": "ko"})]


def test_response_500__retry(): # TODO
pass


def test_server_doesnt_close_headers():
with Patata() as client:
responses = list(
client.http("get", [Request(id_=1, url=servers_urls["wrong_headers"])])
)
assert responses[0].status_code == 500


def test_server_doesnt_close_headers__retry(): # TODO
pass


def test_server_timeout():
with Patata() as client:
responses = list(
client.http(
"get", [Request(id_=1, url=servers_urls["timeout"])], timeout=0.1
)
)
assert responses[0].status_code == 500
assert "TimeoutError" in responses[0].data["exception_traceback"]


def test_server_timeout__retry(): # TODO
pass

0 comments on commit aba152a

Please sign in to comment.