Source code for lexisnexis

import random
import re
import requests
import urllib
import urlparse

import selenium.common.exceptions
import selenium.webdriver
import selenium.webdriver.common.desired_capabilities
import selenium.webdriver.support.ui
from selenium.webdriver.support import expected_conditions


[docs]class LexisNexisScraper: """ Class for downloading documents given a query string to Lexis Nexis academic (http://www.lexisnexis.com/hottopics/lnacademic/). Example:: downloader = LexisNexisScraper(mass_download_mode=True) for (content, (doc_index, doc_count)) in downloader.iter_search_results(6318, 'DATE(=1987)'): print doc_id This code uses `PhantomJS <http://phantomjs.org>`__ and `Selenium Webdriver <http://www.seleniumhq.org/>`__ to scrape LexisNexis pages. """ _RE_STYLESHEET = re.compile(ur'\<STYLE TYPE\=\"text\/css\"\>(\<\!\-\-)?(?P<css_string>.+?)(\-\-\>)?\<\/STYLE\>', flags=re.S | re.U | re.I) _RE_LEXIS_DOC = re.compile(ur'\<DOC NUMBER\=(?P<docid>\d+)\>\s+\<DOCFULL\>(?P<doc>.+?)\<\/DOCFULL\>', flags=re.S | re.U | re.I)
[docs] def __init__(self, wait_timeouts=(15, 180), documents_per_download=(250, 500), user_agent_string=u'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0', mass_download_mode=False): """ Constructs a downloader object. :param float,float wait_timeouts: tuple of `(short, long)` where `short` and `long` are the no. of seconds to wait while page elements are loaded (for Webdriver). `long` timeout is used when waiting for LexisNexis to format documents for mass downloads. :param int,int documents_per_download: a range specifying the number of documents to download each time when using :attr:`mass_download_mode`. :param str user_agent_string: the user agent string that PhantomJS should declare itself to be. :param bool mass_download_mode: whether to mass download articles using the download link or page through each document one by one and download. """ self._USER_AGENT_STRING = user_agent_string self._DOCUMENTS_PER_DOWNLOAD = documents_per_download desired_capabilities = dict(selenium.webdriver.common.desired_capabilities.DesiredCapabilities.PHANTOMJS) desired_capabilities['phantomjs.page.settings.userAgent'] = self._USER_AGENT_STRING self._driver = selenium.webdriver.PhantomJS(desired_capabilities=desired_capabilities) self._driver.set_window_size(800, 600) self._short_wait = selenium.webdriver.support.ui.WebDriverWait(self._driver, wait_timeouts[0], poll_frequency=0.05) self._long_wait = selenium.webdriver.support.ui.WebDriverWait(self._driver, wait_timeouts[1], poll_frequency=1) self.mass_download_mode_ = mass_download_mode
#end def def __del__(self): try: self._driver.quit() except: pass
[docs] def iter_search_results(self, csi, search_query, start_from=1): """ A generator function that executes LexisNexis search query on source data CSI (:attr:`csi`), with query :attr:`search_query` and downloads all documents returned by search. :param str csi: LexisNexis CSI (see `<http://amdev.net/rpt_download.php>`_ for full list). :param str search_query: execute search query string. :param int start_from: document index to start downloading from. :returns: a tuple `(doc_content, (index, results_count))`, where `doc_content` is the HTML content of the `index`th document, and `results_count` is the number of documents returned by specified search query. """ self._driver.get('http://www.lexisnexis.com/hottopics/lnacademic/?' + urllib.urlencode({'verb': 'sr', 'csi': csi, 'sr': search_query})) if not self._have_results(): return [] if self.mass_download_mode_: return self._mass_download(start_from) return self._sequential_download(start_from)
#end def def _have_results(self): # todo: kinda slow, due to having wait for multiple timeouts self._switch_to_frame('main') if self._wait_for_element('//td[text()[contains(., \'No Documents Found\')]]', raise_error=False) is not None: return False if self._wait_for_element('//frame[@title=\'Results Content Frame\']', raise_error=False) is not None: return True if self._wait_for_element('//frame[@title=\'Results Document Content Frame\']', raise_error=False) is not None: return True raise Exception('Page loaded improperly while checking for results frame.') #end def def _mass_download(self, start_from=1): # Returns documents as a list of strings containing HTML self._switch_to_frame('navigation') try: documents_count = int(self._driver.find_element_by_xpath('//form[@name=\'results_docview_DocumentForm\']/input[@name=\'totalDocsInResult\']').get_attribute('value')) except: documents_count = -1 def download_sequence(start, end): docs_left = end - start + 1 cur = start while docs_left > self._DOCUMENTS_PER_DOWNLOAD[1]: download_count = random.randint(*self._DOCUMENTS_PER_DOWNLOAD) yield (cur, cur + download_count - 1) docs_left -= download_count cur += download_count #end while yield (cur, cur + docs_left - 1) #end def def lexis_nexis_download_window_appears(current_handle): def f(driver): for handle in driver.window_handles: if current_handle != handle: driver.switch_to.window(handle) # switch first to check window title if driver.title.endswith('Download Documents'): return True # this is our new window! #end if #end for return False #end def return f #end class for download_start, download_end in download_sequence(start_from, documents_count): self._switch_to_frame('navigation') parent_window_handle = self._driver.current_window_handle # check for download icon and click it self._wait_for_element('//img[@title=\'Download Documents\']').click() # wait for download window to appear self._short_wait.until(lexis_nexis_download_window_appears(parent_window_handle)) self._wait_for_element('//img[@title=\'Download\']') # get all the form items selenium.webdriver.support.ui.Select(self._driver.find_element_by_xpath('//select[@name=\'delFmt\']')).select_by_value('QDS_EF_HTML') selenium.webdriver.support.ui.Select(self._driver.find_element_by_xpath('//select[@name=\'delView\']')).select_by_value('GNBFI') selenium.webdriver.support.ui.Select(self._driver.find_element_by_xpath('//select[@name=\'delFontType\']')).select_by_value('COURIER') # i like courier search_term_bold = self._driver.find_element_by_xpath('//input[@type=\'checkbox\'][@id=\'termBold\']') if not search_term_bold.is_selected(): search_term_bold.click() doc_new_page = self._driver.find_element_by_xpath('//input[@type=\'checkbox\'][@id=\'docnewpg\']') if not doc_new_page.is_selected(): doc_new_page.click() self._driver.find_element_by_xpath('//input[@type=\'radio\'][@id=\'sel\']').click() self._driver.find_element_by_xpath('//input[@type=\'text\'][@id=\'rangetextbox\']').send_keys('{}-{}'.format(download_start, download_end)) self._driver.find_element_by_xpath('//img[@title=\'Download\']').click() download_url = self._long_wait.until(expected_conditions.presence_of_element_located((selenium.webdriver.common.by.By.XPATH, '//center[@class=\'suspendbox\']/p/a'))).get_attribute('href') # set up cookies and use requests library to do download cookies = dict([(cookie['name'], cookie['value']) for cookie in self._driver.get_cookies()]) response = requests.get(download_url, cookies=cookies, headers={'User-Agent': self._USER_AGENT_STRING}) html_content = response.text m = self._RE_STYLESHEET.search(html_content) css_string = m.group('css_string').strip() for i, m in enumerate(self._RE_LEXIS_DOC.finditer(html_content)): page_content = m.group('doc').replace(u'<!-- Hide XML section from browser', '').replace(u'-->', '').strip() page_content = u'\n'.join([u'<HTML>', u'<HEAD>', u'<STYLE TYPE=\"text/css\">', css_string, u'</STYLE>', u'</HEAD>', u'<BODY>', page_content, u'</BODY>', u'</HTML>']) yield (page_content, (download_start + i, documents_count)) #end for self._driver.close() self._driver.switch_to.window(parent_window_handle) #end for #end def def _sequential_download(self, start_from=1): self._switch_to_frame('navigation') try: documents_count = int(self._driver.find_element_by_xpath('//form[@name=\'results_docview_DocumentForm\']/input[@name=\'totalDocsInResult\']').get_attribute('value')) except: documents_count = -1 if documents_count <= 0: return if start_from > documents_count: return if documents_count == 1: self._switch_to_frame('content') page_content = self._driver.page_source yield (page_content, (1, 1)) return #end if self._switch_to_frame('results') # go to results list and grab the first link first_document_url = self._wait_for_element('//td/a[contains(@href, \'/lnacui2api/results/docview/docview.do\')]').get_attribute('href') url_obj = urlparse.urlparse(first_document_url) qs_dict = dict(urlparse.parse_qsl(url_obj.query)) qs_dict['docNo'] = start_from doc_url = urlparse.urlunparse((url_obj.scheme, url_obj.netloc, url_obj.path, url_obj.params, urllib.urlencode(qs_dict), url_obj.fragment)) self._driver.get(doc_url) # jump to the page we want # qs_dict['RELEVANCE'] = 'BOOLEAN' # doesnt seem to work # http://www.lexisnexis.com/lnacui2api/results/docview/docview.do?docLinkInd=true&risb=21_T21153102977&format=GNBFI&sort=RELEVANCE&startDocNo=1&resultsUrlKey=29_T21153102981&cisb=22_T21153102980&treeMax=true&treeWidth=0&csi=6318&docNo=1 for doc_index in xrange(start_from, documents_count + 1): self._switch_to_frame('content', in_iframe=False) page_content = self._driver.page_source yield (page_content, (doc_index, documents_count)) self._switch_to_frame('navigation', in_iframe=False) next_page_elem = self._wait_for_element('//img[@title=\'View next document\']', raise_error=False) if next_page_elem is None: if doc_index != documents_count: raise Exception('Next page icon could not be found: doc_index={}, documents_count={}'.format(doc_index, documents_count)) else: next_page_elem.click() #end while #end def def _switch_to_frame(self, frame_name, in_iframe=True): self._driver.switch_to.default_content() if in_iframe: frame = self._safe_wait(expected_conditions.frame_to_be_available_and_switch_to_it('mainFrame')) if not frame: raise SwitchFrameException(frame_name) #end if try: if frame_name == 'main': return frame elif frame_name == 'results': frame = self._wait_for_element('//frame[@title=\'Results Content Frame\']') elif frame_name == 'navigation': frame = self._wait_for_element('//frame[@title=\'Results Navigation Frame\']') elif frame_name == 'content': frame = self._wait_for_element('//frame[@title=\'Results Document Content Frame\']') except selenium.common.exceptions.TimeoutException: raise SwitchFrameException(frame_name) self._safe_wait(expected_conditions.frame_to_be_available_and_switch_to_it(frame)) return frame #end def def _safe_wait(self, poll_func): try: return self._short_wait.until(poll_func) except selenium.common.exceptions.TimeoutException: return None #end def def _wait_for_element(self, xpath, raise_error=True): elem = self._safe_wait(expected_conditions.presence_of_element_located((selenium.webdriver.common.by.By.XPATH, xpath))) if raise_error and elem is None: raise selenium.common.exceptions.TimeoutException(msg='XPath \'{}\' presence wait timeout.'.format(xpath)) return elem
#end def #end class
[docs]class SwitchFrameException(Exception): """ Exception class when we are unable to load the require page properly. This is usually due to #. Page taking too long to load. This happens sometimes when loading LexisNexis for the first time. #. Improper page loading. """ def __init__(self, frame_name): self.frame_name = frame_name def __str__(self): return u'Exception while switching to frame \'{}\'.'.format(self.frame_name)
#end class