Source code for crawl.webstore_iface

#!/usr/bin/env python3
# *-* coding: utf-8 *-*
"""Chrome Web Store interface for dbling."""

import asyncio
import logging
from os import path
from tempfile import TemporaryDirectory, TemporaryFile
from time import sleep
from urllib.parse import urlparse, parse_qs

import requests
import uvloop
from lxml import etree
from requests import ConnectionError
from requests.exceptions import ChunkedEncodingError, HTTPError

from common.util import validate_crx_id, get_crx_version, make_download_headers
from common.const import CRX_URL

__all__ = ['DownloadCRXList', 'save_crx', 'ListDownloadFailedError', 'ExtensionUnavailable', 'BadDownloadURL',
           'VersionExtractError']

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

LOG_PATH = path.join(path.dirname(path.realpath(__file__)), '../log', 'crx.log')
DBLING_DIR = path.abspath(path.join(path.dirname(path.realpath(__file__)), '..'))
DONT_OVERWRITE_DOWNLOADED_CRX = False
CHUNK_SIZE = 512
NUM_HTTP_RETIRES = 5

TESTING = False  # 1000  # Set to an int when not False


logging.getLogger('requests').setLevel(logging.WARNING)


[docs]class ListDownloadFailedError(ConnectionError): """Raised when the list download fails."""
[docs]class ExtensionUnavailable(Exception): """Raised when an extension isn't downloadable."""
[docs]class BadDownloadURL(Exception): """Raised when the ID is valid but we can't download the extension."""
[docs]class VersionExtractError(Exception): """Raised when extracting the version number from the URL fails."""
[docs]class DownloadCRXList: """Generate list of extension IDs downloaded from Google. As a generator, this is designed to be used in a ``for`` loop. For example: >>> crx_list = DownloadCRXList(download_url) >>> for crx_id in crx_list: ... print(crx_id) The list of CRXs will be downloaded just prior to when the first item is generated. In other words, instantiating this class doesn't start the download, iterating over the instance starts the download. This is significant given that downloading the list is quite time consuming. """ # Namespace tag used by the downloaded list (XML file) _ns = '{http://www.sitemaps.org/schemas/sitemap/0.9}' list_list_url = 'https://chrome.google.com/webstore/sitemap' def __init__(self, ext_url, *, return_count=False, session=None): """ :param str ext_url: Specially crafted URL that will let us download the list of extensions. :param bool return_count: When True, will return a tuple of the form: ``(crx_id, job_number)``, where ``job_number`` is the index of the ID plus 1. This way, the job number of the last ID returned will be the same as ``len(DownloadCRXList)``. :param requests.Session session: Session object to use when downloading the list. If None, a new :class:`requests.Session` object is created. """ self.ext_url = ext_url self.session = session if isinstance(session, requests.Session) else requests.Session() self._downloaded_list = False self.ret_tup = return_count # Return a tuple (CRX ID, num) self._id_list = [] self._next_id_index = 0 self.sitemap_dir = None def __iter__(self): if not self._downloaded_list: self.download_ids() # Reset the "next ID" index self._next_id_index = 0 return self def __next__(self): try: crx_id = self._id_list[self._next_id_index] except IndexError: raise StopIteration self._next_id_index += 1 return crx_id, self._next_id_index if self.ret_tup else crx_id
[docs] def download_ids(self): """Starting point for downloading all CRX IDs. This function actually creates an event loop and starts the downloads asynchronously. :rtype: None """ loop = asyncio.get_event_loop_policy().new_event_loop() with TemporaryDirectory() as self.sitemap_dir: loop.run_until_complete(self._async_download_lists()) self._downloaded_list = True
[docs] async def _async_download_lists(self): """Download, loop through the list of lists, combine IDs from each. :rtype: None """ logging.info('Downloading the list of extension lists from Google.') # Download the first list resp = _http_get(self.list_list_url, self.session, stream=False, headers=make_download_headers()) if resp is None: logging.critical('Failed to download list of extensions.') raise ListDownloadFailedError('Unable to download list of extensions.') # Save the list local_sitemap = TemporaryFile(dir=self.sitemap_dir) for chunk in resp.iter_content(chunk_size=None): local_sitemap.write(chunk) resp.close() # Go through the list, extracting list URLs ids = set() local_sitemap.seek(0) xml_tree = etree.parse(local_sitemap) num_lists = 0 duplicate_count = 0 for url_tag in xml_tree.iterfind('*/' + self._ns + 'loc'): # Download the URL, get the IDs from it and add them to the set of IDs try: _ids = await self._dl_parse_id_list(url_tag.text) except ListDownloadFailedError: # TODO: How to handle this? raise else: x = len(_ids) y = len(ids) ids |= _ids duplicate_count += (y + x) - len(ids) num_lists += 1 logging.info('Done downloading. Doing some cleanup...') # Close (and delete) the temporary file where the list was stored local_sitemap.close() # Convert IDs to a list, then sort it self._id_list = list(ids) self._id_list.sort() if TESTING: # Truncate the list self._id_list = self._id_list[:TESTING] logging.warning('There were {} duplicate IDs from the {} lists.'.format(duplicate_count - len(self), num_lists))
[docs] async def _dl_parse_id_list(self, list_url): """Download the extension list at the given URL, return set of IDs. :param str list_url: URL of an individual extension list. :return: Set of CRX IDs. :rtype: set """ # Get info from the list URL to indicate our progress in the log message url_data = parse_qs(urlparse(list_url).query) numshards = url_data['numshards'][0] shard = int(url_data['shard'][0]) + 1 log = logging.info if not shard % 100 or shard == int(numshards) else logging.debug shard = ('{:0' + str(len(numshards)) + '}').format(shard) _hl = hl = url_data.get('hl', '') if isinstance(_hl, list): hl = ' (language: {})'.format(_hl[0]) _hl = '_{}'.format(_hl[0]) list_id = '{} of {}{}'.format(shard, numshards, hl) sitemap = TemporaryFile(prefix='sitemap{}_{}_{}'.format(_hl, shard, numshards), suffix='.xml', dir=self.sitemap_dir) # Download the IDs list resp = _http_get(list_url, self.session, stream=False, headers=make_download_headers()) if resp is None: msg = 'Failed to download extension list {}.'.format(list_id) logging.critical(msg) raise ListDownloadFailedError(msg) # Save the list for chunk in resp.iter_content(chunk_size=None): sitemap.write(chunk) resp.close() # Extract the IDs ids = set() sitemap.seek(0) xml_tree = etree.parse(sitemap) for url_tag in xml_tree.iterfind('*/' + self._ns + 'loc'): # Get just the URL path (strips the scheme, netloc, params, query, and fragment segments) crx_id = urlparse(url_tag.text).path # Get the ID (strips everything from the path except the last part) crx_id = path.basename(crx_id) ids.add(crx_id) log('Downloaded extension list {}. Qty: {}'.format(list_id, len(ids))) sitemap.close() return ids
def __len__(self): return len(self._id_list)
[docs]def save_crx(crx_obj, download_url, save_path=None, session=None): """Download the CRX, save in the ``save_path`` directory. The saved file will have the format: ``<extension ID>_<version>.crx`` If ``save_path`` isn't given, this will default to a directory called "downloads" in the CWD. Adds the following keys to ``crx_obj``: - ``version``: Version number of the extension, as obtained from the final URL of the download. This may differ from the version listed in the extension's manifest. - ``filename``: The basename of the CRX file (not the full path) - ``full_path``: The location (full path) of the downloaded CRX file :param crx_obj: Previously collected information about the extension. :type crx_obj: munch.Munch :param download_url: The URL template that already contains the correct Chrome version information and ``{}`` where the ID goes. :type download_url: str :param save_path: Directory where the CRX should be saved. :type save_path: str or None :param session: Optional :class:`~requests.Session` object to use for HTTP requests. :type session: requests.Session or None :return: Updated version of ``crx_obj`` with ``version``, ``filename``, and ``full_path`` information added. If the download wasn't successful, not all of these may have been added, depending on when it failed. :rtype: munch.Munch """ # Check that the ID has a valid form validate_crx_id(crx_obj.id) # Ensure the extension is still available in the Web Store url = CRX_URL % crx_obj.id resp = _http_get(url, session) _ensure_redirect(resp) resp.close() # If the URL we got back was the same one we requested, the download failed if url == resp.url: raise BadDownloadURL # Make the new request to actually download the extension resp = _http_get(download_url.format(crx_obj.id), session, stream=True) try: crx_obj.version = get_crx_version(resp.url.rsplit('extension', 1)[-1]) except IndexError: raise VersionExtractError('{} Problem with extracting CRX version from URL\n URL: {}\n Split URL: {}'. format(crx_obj.id, resp.url, resp.url.rsplit('extension', 1)[-1])) crx_obj.filename = '{}_{}.crx'.format(crx_obj.id, crx_obj.version) # <ID>_<version> if save_path is None: save_path = path.join('.', 'downloads') crx_obj.full_path = path.abspath(path.join(save_path, crx_obj.filename)) if path.exists(crx_obj.full_path): err = FileExistsError() err.errno = '' err.strerror = 'Cannot save CRX to path that already exists' err.filename = crx_obj.full_path raise err with open(crx_obj.full_path, 'wb') as fout: # Write the binary response to the file 512 bytes at a time for chunk in resp.iter_content(chunk_size=512): fout.write(chunk) resp.close() return crx_obj
def _ensure_redirect(resp): """Check that a redirect occurred. :param resp: The response object from GET-ting the extension's URL. :type resp: requests.Response :return: None :rtype: None """ if not len(resp.history): raise ExtensionUnavailable('No redirect occurred while fetching URL %s' % resp.url) class RetryRequest: """Wraps functions that make HTTP requests, retries on failure.""" def __init__(self, f): self.f = f def __call__(self, *args, **kwargs): resp = None for i in range(NUM_HTTP_RETIRES): try: resp = self.f(*args, **kwargs) resp.raise_for_status() # If there was an HTTP error, raise it except (ChunkedEncodingError, ConnectionError, HTTPError): # TODO: Are there other errors we could get that we want to retry after? logging.debug('Encountered error while downloading. Attempting to sleep and retry ({} of {} retries)'. format(i+1, NUM_HTTP_RETIRES)) sleep(10 * (i+1)) else: break return resp @RetryRequest def _http_get(url, session=None, stream=True, **kwargs): """Make a GET request with the URL. Any errors from the HTTP request (non 200 codes) will raise an HTTPError. :param url: The URL to GET. :type url: str :param session: Optional :class:`~requests.Session` object to use to make the GET request. :type session: requests.Session or None :param stream: If `False`, the response content will be immediately downloaded. :type stream: bool :param kwargs: Optional arguments that :func:`requests.get` takes. :type kwargs: dict :return: The :class:`~requests.Response` object containing the server's response to the HTTP request. :rtype: requests.Response """ if isinstance(session, requests.Session): return session.get(url, stream=stream, **kwargs) else: return requests.get(url, stream=stream, **kwargs)