import requests import grequests import gevent from collections import deque import time import warnings # grbrute - asynchronous URL fetcher based on grequests # Uses multiple simultaneous connections and reuses them (HTTP/1.1) # 2013, Laurent Ghigonis # XXX urllib3 and therefore requests do not support pipelining ... # Python grequests ressources : # http://stackoverflow.com/questions/16015749/in-what-way-is-grequests-asynchronous # https://github.com/kennethreitz/grequests/issues/13 # http://stackoverflow.com/questions/13809650/using-grequests-to-send-a-pool-of-requests-how-can-i-get-the-response-time-of-e # https://gist.github.com/ibrahima/3153647 - request_queue.py # http://rubydoc.info/github/typhoeus/typhoeus/frames/Typhoeus - Ruby Typhoeus class SessionQueue: def __init__(self, pool, cb_response): self.pool = pool self.cb_response = cb_response self.session = requests.Session() self.ongoing = 0 def add(self, url): req = grequests.get(url, session=self.session, hooks = {'response' : self._cb_response_session}) grequests.send(req, self.pool, exception_handler=self._cb_exception) self.ongoing += 1 def _cb_response_session(self, res, verify=None, cert=None, proxies=None, timeout=None, stream=None): self.ongoing -= 1 if self.cb_response(res) is False: self.pool.kill() def _cb_exception(self, req, e): print "ERROR: sending of %s failed, retrying :\n%s" % (req.url, e) grequests.send(req, self.pool, exception_handler=self._cb_exception) class Grbrute: """ url_iter is the iterator that provides the URLs. cb_response should return True for the processing to continue, and False to terminate. If you want to integrate it in a gevent driven program, use block=False""" def __init__(self, url_iter, cb_response=None, nb_sessions=3, req_per_session=10, sleep=0, verbose=False, block=True): self.url_iter = url_iter self.cb_response_user = cb_response self.nb_sessions = nb_sessions self.req_per_session = req_per_session self.sleep = sleep self.verbose = verbose self.pool = grequests.Pool() self.sessions = deque() self.ongoing_total = 0 for i in range(nb_sessions): self.sessions.append(SessionQueue(self.pool, self._cb_response)) self._send() if block: self.pool.join() def _send(self): while self.ongoing_total < self.nb_sessions * self.req_per_session: # get an URL to send try: url = next(self.url_iter) except StopIteration, e: return if self.verbose: print "[-] %s" % url # select a session that has room to send while self.sessions[0].ongoing == self.req_per_session: self.sessions.rotate(1) # send URL using selected sessions self.sessions[0].add(url) self.ongoing_total += 1 self.sessions.rotate(1) def _cb_response(self, res): self.ongoing_total -= 1 cont = True if self.cb_response_user: cont = self.cb_response_user(res) self._send() time.sleep(self.sleep) return cont