Skip to content

Commit

Permalink
Replace ThreadPoolExecutor.submit with ThreadPoolExecutor.map function
Browse files Browse the repository at this point in the history
  • Loading branch information
adl1995 committed Jan 18, 2019
1 parent e24a5e3 commit 6c8874c
Showing 1 changed file with 5 additions and 29 deletions.
34 changes: 5 additions & 29 deletions hips/tiles/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,24 +75,15 @@ def fetch_tiles(tile_metas: List[HipsTileMeta], hips_survey: HipsSurveyPropertie
else:
raise ValueError(f'Invalid package name: {fetch_package}')

_tile_urls = tile_urls(tile_metas, hips_survey)
response_all = fetch_fct(_tile_urls, hips_survey, progress_bar, n_parallel, timeout)
tile_urls = [hips_survey.tile_url(meta) for meta in tile_metas]
response_all = fetch_fct(tile_urls, hips_survey, progress_bar, n_parallel, timeout)

# Sort tiles to match the tile_meta list
# TODO: this doesn't seem like a great solution.
# Use OrderedDict instead?
tiles = []
for tile_url in _tile_urls:
for idx, response in enumerate(response_all):
if response['url'] == tile_url:
tiles.append(HipsTile(tile_metas[idx], response['raw_data']))
for idx, response in enumerate(response_all):
tiles.append(HipsTile(tile_metas[idx], response['raw_data']))

return tiles

def tile_urls(tile_metas: List[HipsTileMeta], hips_survey: HipsSurveyProperties) -> List[str]:
"""Retrun list of tile URLs"""
return [hips_survey.tile_url(meta) for meta in tile_metas]

def fetch_tile_urllib(url: str, timeout: float) -> dict:
"""Fetch a HiPS tile asynchronously."""
with urllib.request.urlopen(url, timeout=timeout) as conn:
Expand All @@ -103,22 +94,7 @@ def tiles_urllib(tile_urls: List[str], hips_survey: HipsSurveyProperties,
progress_bar: bool, n_parallel, timeout: float) -> List[dict]:
"""Generator function to fetch HiPS tiles from a remote URL."""
with concurrent.futures.ThreadPoolExecutor(max_workers=n_parallel) as executor:
futures = []
for url in tile_urls:
future = executor.submit(fetch_tile_urllib, url, timeout)
futures.append(future)

futures = concurrent.futures.as_completed(futures)
if progress_bar:
from tqdm import tqdm
futures = tqdm(futures, total=len(tile_urls), desc='Fetching tiles')

response_all = []
for future in futures:
response_all.append(future.result())

return response_all

return list(executor.map(fetch_tile_urllib, tile_urls, [timeout] * len(tile_urls)))

async def fetch_tile_aiohttp(url: str, session, timeout: float) -> dict:
"""Fetch a HiPS tile asynchronously using aiohttp."""
Expand Down

0 comments on commit 6c8874c

Please sign in to comment.