From ea8bac6cb2d82ade2fa0d566fbdcd1a604c63312 Mon Sep 17 00:00:00 2001 From: Laurent Ghigonis Date: Wed, 17 Apr 2013 05:52:20 +0200 Subject: add grbrute - asynchronous URL fetcher based on grequests --- toys/grbrute.py | 84 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 toys/grbrute.py (limited to 'toys') diff --git a/toys/grbrute.py b/toys/grbrute.py new file mode 100644 index 0000000..5dfa48e --- /dev/null +++ b/toys/grbrute.py @@ -0,0 +1,84 @@ +import requests +import grequests +import gevent +from collections import deque +import time +import warnings + +# grbrute - asynchronous URL fetcher based on grequests +# Uses multiple simultaneous connections (HTTP/1.1) and multiple parralel +# requests per connection. +# 2013, Laurent Ghigonis + +# Python grequests ressources : +# http://stackoverflow.com/questions/16015749/in-what-way-is-grequests-asynchronous +# https://github.com/kennethreitz/grequests/issues/13 +# http://stackoverflow.com/questions/13809650/using-grequests-to-send-a-pool-of-requests-how-can-i-get-the-response-time-of-e +# https://gist.github.com/ibrahima/3153647 - request_queue.py +# http://rubydoc.info/github/typhoeus/typhoeus/frames/Typhoeus - Ruby Typhoeus + +class SessionQueue: + def __init__(self, pool, cb_response): + self.pool = pool + self.cb_response = cb_response + self.session = requests.Session() + self.ongoing = 0 + + def add(self, url): + req = grequests.get(url, session=self.session, + hooks = {'response' : self._cb_response_session}) + grequests.send(req, self.pool, exception_handler=self._cb_exception) + self.ongoing += 1 + + def _cb_response_session(self, res): + self.ongoing -= 1 + self.cb_response(res) + + def _cb_exception(self, req, e): + print "ERROR: sending of %s failed, retrying :\n%s" % (req.url, e) + grequests.send(req, self.pool, exception_handler=self._cb_exception) + +class Grbrute: + """ url_iter is the iterator that provides the URLs. + If you want to integrate it in a gevent driven program, use block=False""" + def __init__(self, url_iter, cb_response=None, + nb_sessions=3, req_per_session=10, sleep=0, + verbose=False, block=True): + self.url_iter = url_iter + self.cb_response_user = cb_response + self.nb_sessions = nb_sessions + self.req_per_session = req_per_session + self.sleep = sleep + self.verbose = verbose + self.pool = grequests.Pool() + self.sessions = deque() + self.ongoing_total = 0 + for i in range(nb_sessions): + self.sessions.append(SessionQueue(self.pool, self._cb_response)) + self._send() + if block: + self.pool.join() + + def _send(self): + while self.ongoing_total < self.nb_sessions * self.req_per_session: + # get an URL to send + try: + url = next(self.url_iter) + except StopIteration, e: + return + if self.verbose: + print "[-] %s" % url + # select a session that has room to send + while self.sessions[0].ongoing == self.req_per_session: + self.sessions.rotate(1) + # send URL using selected sessions + self.sessions[0].add(url) + self.ongoing_total += 1 + self.sessions.rotate(1) + + def _cb_response(self, res): + self.ongoing_total -= 1 + if self.cb_response_user: + self.cb_response_user(res) + self._send() + time.sleep(self.sleep) -- cgit v1.2.3-59-g8ed1b