tl;dr super-fast and easy hn client, based on python 3.6+
As this is my first blog post, I’ll focus on downloading a set of Hackernews posts and user data to use as the data set for the machine learning exercises we’ll look at in future posts.
We’ll start by exploring the different options to create a local copy of HN’s data (or of any other site), and will continue to improve and optimize our solution.
All of the benchmarking was done on my laptop –
Macbook Pro 16 RAM , i7, 100mb/sec download speed
Setup:
#You need python 3.6+ #To start a python virtualenv python3 -m venv ./venv # Great way of working with python from shell pip install jupyter pip install haxor pip install aiohttp
Official API
We’ll start by looking at the official hacker-news API.
While it appears quite neat and simple (and supports Firebase out of the box),
there isn’t a publicly available Python SDK.
Semi-official (Most popular)
from hackernews import HackerNews hn = HackerNews() def get_last_n_stories(n=10): max_item = hn.get_max_item() return [hn.get_item(max_item-i) for i in range(n)] last_n_stories = get_last_n_stories()
using the time command to measure the performance:
time python downloadHackerNews/sync_hn_python_download.py python downloadHackerNews/sync_hn_python_download.py 0.32s user 0.04s system 3% cpu 10.630 total
As hn.get_max_item() was around 13,000,000 when I checked, it would have taken me (130,00,000/10)*10.6 =13780000 seconds (or 3800 hours) to download all the posts (not including users data).
Using asyncio – first try (Naive)
At this stage, I decided to try using Python 3’s asyncio and aiohttp in lieu of attempting to improve the haxor code (using requests + sessions + threads).
I based my initial work on this great post: Making 1 million requests with python-aiohttp .
I won’t delve into how asyncio works here, so I definitely recommend that post as a starting point.
import asyncio from aiohttp import ClientSession GET_POST_URL = "https://hacker-news.firebaseio.com/v0/item/{}.json" GET_MAX_ITEM_URL = 'https://hacker-news.firebaseio.com/v0/maxitem.json' # A simple async fetch function async def fetch(url, session): # (a)waiting for for the server response to come back (during that time the event loop is free) async with session.get(url) as response: # reading the response and parsing in is also async operation return await response.json() # The main function to download, get the number of posts to download as n async def run(n): # fetch all responses within one Client session, # keep connection alive for all requests. async with ClientSession() as session: async with session.get(GET_MAX_ITEM_URL) as resp: max_item = await resp.json() # init the future, each future is a url-request tasks = [asyncio.ensure_future(fetch(GET_POST_URL.format(max_item - i) , session)) for i in range(n)] # wait for all responses to come back return await asyncio.gather(*tasks) def get_last_n_stories(n=10): loop = asyncio.get_event_loop() future = asyncio.ensure_future(run(n)) return loop.run_until_complete(future) responses = get_last_n_stories()
sample response:
{'by': 'adrienne', 'id': 13430261, 'parent': 13425436, 'text': 'Except she has <i>very clearly stated her preference</i> in this regard, and that is trivial to discover. Why are you people so afraid to Google things?', 'time': 1484773488, 'type': 'comment'}
And the performance:
python downloadHackerNews/async_hn_python_download.py 0.20s user 0.02s system 12% cpu 1.718 total
Right off the bat, we got a speed boost. But can we improve it further?
The answer is Sure!
asyncio – second try (Consumer/Producer)
But first, let’s try to understand the limitations with the naive solution.
- The number of open coroutines sclaes linearly with the number of requests (if we are fetching 100,000 posts we’ll generate 100,000 futures) – not scalable.
- The number of open TCP onnections is 20 by default – too low.
- When we reach high rates (1000 reqs/sec) we sometimes fail to process the response due to server slowdowns – we need some retry mechanism.
In order to solve all of these problems we’ll use a different approach: producer -> queue -> consumers.
The design is simple, as we have n consumers which read from the queue, and 1 producer which fills the queue. This way we have only n open coroutines no matter how many requests we have.
We will alsoe dead-letter-queue, which will store a failed requests.
Let’s start with the consumer code:
async def consumer(main_queue, dlq, session, responses): while True: try: # Fetch the url from the queue, blocking until the queue has item url = await main_queue.get() # Try to get a response from the sever async with session.get(url, timeout=10) as response: # Check we got a valid response response.raise_for_status() # append it to the responses lists responses.append(await response.json()) # telling the queue we finished processing the massage main_queue.task_done() # In case of a time in our get request/ problem with response code except (HttpProcessingError, asyncio.TimeoutError) as e: logger.debug("Problem with %s, Moving to DLQ" % url) # we put the url in the dlq, so other consumers wil handle it await dlq.put (url) # lower the pace asyncio.sleep(5) # telling the queue we finished processing the massage main_queue.task_done()
The consumer reads a URL from the main queue -> performs an HTTP request -> inserts the response into the responses array.
In case a consumer encounters a problem it writes the URL to the DLQ, sleeps, and reads the next URL from the queue.
async def produce(queue, itr_items, base_url): for item in itr_items: # if the queue is full(reached maxsize) this line will be blocked # until a consumer will finish processing a url await queue.put(base_url.format(item))
The producer goes over the iterable and put his items in the queue. in case the queue is full it is blocked until the consumer read messages from the queue.
async def download_last_n_posts(session, n, consumer_num): # We init the main_queue with a fixed size, and the dlq with unlimited size main_queue, dlq, responses = asyncio.Queue(maxsize=2000), asyncio.Queue(), [] # we init the consumers, as the queues are empty at first, # they will be blocked on the main_queue.get() consumers = [asyncio.ensure_future( consumer(main_queue, dlq, session, responses)) for _ in range(consumer_num)] # init the dlq consumers, same as the base consumers, # Only main_queue is the dlq. dlq_consumers = [asyncio.ensure_future( consumer(main_queue=dlq, dlq=dlq, session=session, responses=responses)) for _ in range(10)] # get the max item from the API async with session.get(URL_MAX_ITEM) as resp: max_item = await resp.json() producer = await produce(queue=main_queue, itr_items= range(max_item, max_item - n, -1), base_url=URL_GET_POST) # wait for all item's inside the main_queue to get task_done await main_queue.join() # wait for all item's inside the dlq to get task_done await dlq.join() # cancel all coroutines for consumer_future in consumers + dlq_consumers: consumer_future.cancel() return responses
- In lines 2-14 we initialize the queues and the consumer
- 15-16 we fetch the last item from HN
- 17-18 we initialize the producer – the producer will finish only after all the URLs are inserted into the queue.
- 20 we wait for the task_done call for all the URLs (this means we had at least one attempt per each URL)
- 22 we wait for the task_done call for all URLs that entered the DLQ(a message can be processed more than once)
- 24-25 remove all the consumers (no need of them as we already got all posts)
Full code:
import asyncio import json import logging import aiohttp from aiohttp import HttpProcessingError, ClientSession # setting up logger logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) console = logging.StreamHandler() logger.addHandler(console) MAX_CONNECTION = 1000 URL_GET_POST = "https://hacker-news.firebaseio.com/v0/item/{}.json" URL_MAX_ITEM = 'https://hacker-news.firebaseio.com/v0/maxitem.json' # asyncio http consumer async def consumer(main_queue, dlq, session, responses): while True: try: # Fetch the url from the queue, blocking until the queue has item url = await main_queue.get() # Try to get a response from the sever async with session.get(url, timeout=10) as response: # Check we got a valid response response.raise_for_status() # append it to the responses lists responses.append(await response.json()) # telling the queue we finished processing the massage main_queue.task_done() # In case of a time in our get request/ problem with response code except (HttpProcessingError, asyncio.TimeoutError) as e: logger.debug("Problem with %s, Moving to DLQ. main_queue: (%s), dlq: (%s)" % (url, str(main_queue.qsize()), str(dlq.qsize()))) # we put the url in the dlq, so other consumers wil handle it await dlq.put(url) # lower the pace asyncio.sleep(5) # telling the queue we finished processing the massage main_queue.task_done() async def produce(queue, itr_items, base_url): for item in itr_items: # if the queue is full(reached maxsize) this line will be blocked # until a consumer will finish processing a url await queue.put(base_url.format(item)) async def download_last_n_posts(session, n, consumer_num): # We init the main_queue with a fixed size, and the dlq with unlimited size main_queue, dlq, responses = asyncio.Queue(maxsize=2000), asyncio.Queue(), [] # we init the consumers, as the queues are empty at first, # they will be blocked on the main_queue.get() consumers = [asyncio.ensure_future( consumer(main_queue, dlq, session, responses)) for _ in range(consumer_num)] # init the dlq consumers, same as the base consumers, # Only main_queue is the dlq. dlq_consumers = [asyncio.ensure_future( consumer(main_queue=dlq, dlq=dlq, session=session, responses=responses)) for _ in range(10)] # get the max item from the API async with session.get(URL_MAX_ITEM) as resp: max_item = await resp.json() producer = await produce(queue=main_queue, itr_items= range(max_item, max_item - n, -1), base_url=URL_GET_POST) # wait for all item's inside the main_queue to get task_done await main_queue.join() # wait for all item's inside the dlq to get task_done await dlq.join() # cancel all coroutines for consumer_future in consumers + dlq_consumers: consumer_future.cancel() return responses async def run(loop, n): conn_num = min(MAX_CONNECTION, n) # we init more connectors to get better performance with ClientSession(loop=loop, connector=aiohttp.TCPConnector(limit=conn_num)) as session: posts = await download_last_n_posts(session, n, conn_num) with open("hn_posts.json") as f: json.dump(posts) if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(run(loop, n=1000000))
Using this code we can reach 1,000-2,000 requests per second (single thread), and a total run time of 13 minutes, 11 seconds! Not bad for a single-threaded client : )
python downloadHackerNews/download_hn_for_blog.py 624.53s user 23.78s system 81% cpu 13:11.37 total
Summary
I pushed the full wrapper for the Hackernews API we saw here to Github: asyncio-hn.
Feel free to ask any questions (and of course, any improvements will be most appreciated)
And lastly – stay tuned for using (basic) sanic and the hackernews wrapper for some neat python3 combination ?
Leave a Reply