From 63c8c82c078242935525ae90f6eb0dec7c44ed77 Mon Sep 17 00:00:00 2001 From: Laurent Ghigonis Date: Sat, 20 Apr 2013 10:11:44 +0200 Subject: brhute: add a few more spikes --- toys/brhute_threaded.py | 75 ++++++++++ toys/brhute_twisted.py | 75 ++++++++++ toys/httplib_pipelining.py | 96 ++++++++++++ toys/pphidden_async.py | 8 +- toys/threaded_resolver.py | 302 ++++++++++++++++++++++++++++++++++++++ toys/twisted_http.py | 26 ++++ toys/twisted_http_persistent.py | 39 +++++ toys/twisted_http_simultaneous.py | 39 +++++ 8 files changed, 657 insertions(+), 3 deletions(-) create mode 100644 toys/brhute_threaded.py create mode 100644 toys/brhute_twisted.py create mode 100644 toys/httplib_pipelining.py create mode 100644 toys/threaded_resolver.py create mode 100644 toys/twisted_http.py create mode 100644 toys/twisted_http_persistent.py create mode 100644 toys/twisted_http_simultaneous.py (limited to 'toys') diff --git a/toys/brhute_threaded.py b/toys/brhute_threaded.py new file mode 100644 index 0000000..22ab2bb --- /dev/null +++ b/toys/brhute_threaded.py @@ -0,0 +1,75 @@ +from collections import deque +import time +import httplib_pipelining +import threading +import threaded_resolver +import Queue +import warnings + +# grbrute - asynchronous URL fetcher using threads and HTTP pipelining +# Makes multiple simultaneous connections and multiple requests per connections +# 2013, Laurent Ghigonis + +# XXX multiple processes, autodetect and repartir connections +# DNS resolving with asyncdns + +class Brhute_connection(threading.Thread): + def __init__(self, ip, port, queue, req_per_connection, cb_response, + interval=0, verbose=False): + threading.Thread.__init__(self) + self.queue = queue + self.ip = ip + self.port = port + self.req_per_connection = req_per_connection + self.cb_response_user = cb_response + self.interval = interval + self.verbose = verbose + + def run(self): + conn = httplib_pipelining.HTTP_pipeline(self.ip, self.queue, self.cb_response_user) + conn.run() + +class Brhute_ip: + """ Fetch URLs from one IP + url_iter is the iterator that provides the URLs. + cb_response should return True for the processing to continue, and False + to terminate. + If you want to integrate it in a gevent driven program, use block=False""" + def __init__(self, url_iter, ip, port=80, cb_response=None, + nb_connections=3, req_per_connection=10, interval=0, + verbose=False, block=True): + warnings.warn("XXX WARNING: WORK IN PROGRESS") + warnings.warn("XXX WARNING: Don't expect this to work") + self.url_iter = url_iter + self.ip = ip + self.port = port + self.cb_response_user = cb_response + self.nb_connections = nb_connections + self.req_per_connection = req_per_connection + self.interval = interval + self.verbose = verbose + + queue = Queue.Queue() + + for i in range(nb_connections): + c = Brhute_connection(ip, port, queue, + req_per_connection, cb_response, + interval, verbose) + c.setDaemon(True) + c.start() + + for host, url in url_iter: + queue.put(url) + queue.join() + +class Brhute_multi_ip: + """Fetch URLs from multiple IPs pointing to the same content""" + def __init__(self): + warnings.warn("XXX WARNING: WORK IN PROGRESS") + warnings.warn("XXX WARNING: Don't excpect this to work") + +class Brhute: + """Fetch URLs""" + def __init__(self): + warnings.warn("XXX WARNING: WORK IN PROGRESS") + warnings.warn("XXX WARNING: Don't excpect this to work") diff --git a/toys/brhute_twisted.py b/toys/brhute_twisted.py new file mode 100644 index 0000000..22ab2bb --- /dev/null +++ b/toys/brhute_twisted.py @@ -0,0 +1,75 @@ +from collections import deque +import time +import httplib_pipelining +import threading +import threaded_resolver +import Queue +import warnings + +# grbrute - asynchronous URL fetcher using threads and HTTP pipelining +# Makes multiple simultaneous connections and multiple requests per connections +# 2013, Laurent Ghigonis + +# XXX multiple processes, autodetect and repartir connections +# DNS resolving with asyncdns + +class Brhute_connection(threading.Thread): + def __init__(self, ip, port, queue, req_per_connection, cb_response, + interval=0, verbose=False): + threading.Thread.__init__(self) + self.queue = queue + self.ip = ip + self.port = port + self.req_per_connection = req_per_connection + self.cb_response_user = cb_response + self.interval = interval + self.verbose = verbose + + def run(self): + conn = httplib_pipelining.HTTP_pipeline(self.ip, self.queue, self.cb_response_user) + conn.run() + +class Brhute_ip: + """ Fetch URLs from one IP + url_iter is the iterator that provides the URLs. + cb_response should return True for the processing to continue, and False + to terminate. + If you want to integrate it in a gevent driven program, use block=False""" + def __init__(self, url_iter, ip, port=80, cb_response=None, + nb_connections=3, req_per_connection=10, interval=0, + verbose=False, block=True): + warnings.warn("XXX WARNING: WORK IN PROGRESS") + warnings.warn("XXX WARNING: Don't expect this to work") + self.url_iter = url_iter + self.ip = ip + self.port = port + self.cb_response_user = cb_response + self.nb_connections = nb_connections + self.req_per_connection = req_per_connection + self.interval = interval + self.verbose = verbose + + queue = Queue.Queue() + + for i in range(nb_connections): + c = Brhute_connection(ip, port, queue, + req_per_connection, cb_response, + interval, verbose) + c.setDaemon(True) + c.start() + + for host, url in url_iter: + queue.put(url) + queue.join() + +class Brhute_multi_ip: + """Fetch URLs from multiple IPs pointing to the same content""" + def __init__(self): + warnings.warn("XXX WARNING: WORK IN PROGRESS") + warnings.warn("XXX WARNING: Don't excpect this to work") + +class Brhute: + """Fetch URLs""" + def __init__(self): + warnings.warn("XXX WARNING: WORK IN PROGRESS") + warnings.warn("XXX WARNING: Don't excpect this to work") diff --git a/toys/httplib_pipelining.py b/toys/httplib_pipelining.py new file mode 100644 index 0000000..5f5ac4e --- /dev/null +++ b/toys/httplib_pipelining.py @@ -0,0 +1,96 @@ +# Based on "python-http-pipelining" by Markus J @ ActiveState Code / Recipes +# http://code.activestate.com/recipes/576673-python-http-pipelining/ +# Rewrote by: 2013 Laurent Ghigonis + +from httplib import HTTPConnection, _CS_IDLE +import urlparse + +class HTTP_pipeline(): + def __init__(self, domain, queue, cb_response, + max_out_bound=4, debuglevel=0): + print "XXX HTTP_pipeline.__init__" + self.queue = queue + self.cb_response = cb_response + self.max_out_bound = max_out_bound + self.debuglevel = debuglevel + self.conn = HTTPConnection(domain) + self.conn.set_debuglevel(debuglevel) + self.respobjs = list() + self.data = list() + self.headers = {'Host':domain, + 'Content-Length':0, + 'Connection':'Keep-Alive'} + + def run(self): + print "XXX HTTP_pipeline.run" + while True: + # Send + out_bound = 0 + while out_bound < self.max_out_bound: + page = self.queue.get() + if self.debuglevel > 0: + print 'Sending request for %r...' % (page,) + self.conn._HTTPConnection__state = _CS_IDLE # FU private variable! + self.conn.request("GET", page, None, self.headers) + res = self.conn.response_class(self.conn.sock, strict=self.conn.strict, method=self.conn._method) + self.respobjs.append(res) + self.data.append(None) + out_bound += 1 + # Try to read a response + for i,resp in enumerate(self.respobjs): + if resp is None: + continue + if self.debuglevel > 0: + print 'Retrieving %r...' % (resp) + out_bound -= 1 + skip_read = False + resp.begin() + if self.debuglevel > 0: + print ' %d %s' % (resp.status, resp.reason) + if 200 <= resp.status < 300: + # Ok + data = resp.read() + cookie = resp.getheader('Set-Cookie') + if cookie is not None: + self.headers['Cookie'] = cookie + skip_read = True + self.respobjs.remove(resp) + self.cb_response(resp, data) + elif 300 <= resp.status < 400: + # Redirect + loc = resp.getheader('Location') + parsed = loc and urlparse.urlparse(loc) + if not parsed: + # Missing or empty location header + data = (resp.status, resp.reason) + elif parsed.netloc != '' and parsed.netloc != host: + # Redirect to another host + data = (resp.status, resp.reason, loc) + else: + # Redirect URL + path = urlparse.urlunparse(parsed._replace(scheme='',netloc='',fragment='')) + #print ' Updated %r to %r' % (pages[i],path) + #pages[i] = path + data = (resp.status, resp.reason, path) + self.respobjs.remove(resp) + self.cb_response(resp, data) + elif resp.status >= 400: + # Failed + data = (resp.status, resp.reason) + self.respobjs.remove(resp) + self.cb_response(resp, data) + if resp.will_close: + # Connection (will be) closed, need to resend + self.conn.close() + if self.debuglevel > 0: + print ' Connection closed' + # XXX reconnect + # XXX resend + break + elif not skip_read: + resp.read() # read any data + if any(r is None for r in enumerate(self.respobjs)): + # Send another pending request + break + else: + break # All respobjs are None? diff --git a/toys/pphidden_async.py b/toys/pphidden_async.py index 7df3ad1..463364f 100644 --- a/toys/pphidden_async.py +++ b/toys/pphidden_async.py @@ -1,7 +1,7 @@ import sys import argparse import grbrute -import brhute +import brhute_threaded # http://www.pointerpointer.com/gridPositions.json @@ -67,10 +67,12 @@ if args.backend == "gbrute": grbrute.Grbrute(url_iter, cb_response_grbrute, verbose=args.verbose) elif args.backend == "brhute": url_iter = Pp_url(args.image, args.start_x, args.start_y) - brhute.Brhute_ip(url_iter, "207.171.163.203", # Amazon - cb_response=cb_response_brhute, verbose=args.verbose) + #brhute_threaded.Brhute_ip(url_iter, "207.171.163.203", # Amazon + # cb_response=cb_response_brhute, verbose=args.verbose) #brhute.Brhute_ip(url_iter, "173.194.34.14", # Google # cb_response=cb_response_brhute, verbose=args.verbose) + brhute_threaded.Brhute_ip(url_iter, "www.pointerpointer.com", # Amazon + cb_response=cb_response_brhute, verbose=args.verbose) else: print "Error: Unknown backend specified" sys.exit(1) diff --git a/toys/threaded_resolver.py b/toys/threaded_resolver.py new file mode 100644 index 0000000..a2d347b --- /dev/null +++ b/toys/threaded_resolver.py @@ -0,0 +1,302 @@ +# From pyxmpp2 resolver.py +# Made standalone by laurent +# https://raw.github.com/Jajcus/pyxmpp2/master/pyxmpp2/resolver.py + +# (C) Copyright 2003-2011 Jacek Konieczny +# (C) Copyright 2013 Laurent Ghigonis +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License Version +# 2.1 as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this program; if not, write to the Free Software +# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. +# + +"""DNS resolever with SRV record support. + +Normative reference: + - `RFC 1035 `__ + - `RFC 2782 `__ +""" + +import socket +import random +import logging +import threading +import Queue + +import dns.resolver +import dns.name +import dns.exception + +DEFAULT_SETTINGS = {"ipv4": True, "ipv6": False, "prefer_ipv6": False} + +def is_ipv6_available(): + """Check if IPv6 is available. + + :Return: `True` when an IPv6 socket can be created. + """ + try: + socket.socket(socket.AF_INET6).close() + except (socket.error, AttributeError): + return False + return True + +def is_ipv4_available(): + """Check if IPv4 is available. + + :Return: `True` when an IPv4 socket can be created. + """ + try: + socket.socket(socket.AF_INET).close() + except socket.error: + return False + return True + +def shuffle_srv(records): + """Randomly reorder SRV records using their weights. + + :Parameters: + - `records`: SRV records to shuffle. + :Types: + - `records`: sequence of :dns:`dns.rdtypes.IN.SRV` + + :return: reordered records. + :returntype: `list` of :dns:`dns.rdtypes.IN.SRV`""" + if not records: + return [] + ret = [] + while len(records) > 1: + weight_sum = 0 + for rrecord in records: + weight_sum += rrecord.weight + 0.1 + thres = random.random() * weight_sum + weight_sum = 0 + for rrecord in records: + weight_sum += rrecord.weight + 0.1 + if thres < weight_sum: + records.remove(rrecord) + ret.append(rrecord) + break + ret.append(records[0]) + return ret + +def reorder_srv(records): + """Reorder SRV records using their priorities and weights. + + :Parameters: + - `records`: SRV records to shuffle. + :Types: + - `records`: `list` of :dns:`dns.rdtypes.IN.SRV` + + :return: reordered records. + :returntype: `list` of :dns:`dns.rdtypes.IN.SRV`""" + records = list(records) + records.sort() + ret = [] + tmp = [] + for rrecord in records: + if not tmp or rrecord.priority == tmp[0].priority: + tmp.append(rrecord) + continue + ret += shuffle_srv(tmp) + tmp = [rrecord] + if tmp: + ret += shuffle_srv(tmp) + return ret + +class BlockingResolver(): + """Blocking resolver using the DNSPython package. + + Both `resolve_srv` and `resolve_address` will block until the + lookup completes or fail and then call the callback immediately. + """ + def __init__(self, settings = None): + if settings: + self.settings = settings + else: + self.settings = DEFAULT_SETTINGS + + def resolve_srv(self, domain, service, protocol, callback): + """Start looking up an SRV record for `service` at `domain`. + + `callback` will be called with a properly sorted list of (hostname, + port) pairs on success. The list will be empty on error and it will + contain only (".", 0) when the service is explicitely disabled. + + :Parameters: + - `domain`: domain name to look up + - `service`: service name e.g. 'xmpp-client' + - `protocol`: protocol name, e.g. 'tcp' + - `callback`: a function to be called with a list of received + addresses + :Types: + - `domain`: `unicode` + - `service`: `unicode` + - `protocol`: `unicode` + - `callback`: function accepting a single argument + """ + if isinstance(domain, unicode): + domain = domain.encode("idna").decode("us-ascii") + domain = "_{0}._{1}.{2}".format(service, protocol, domain) + try: + records = dns.resolver.query(domain, 'SRV') + except dns.exception.DNSException, err: + logger.warning("Could not resolve {0!r}: {1}" + .format(domain, err.__class__.__name__)) + callback([]) + return + if not records: + callback([]) + return + + result = [] + for record in reorder_srv(records): + hostname = record.target.to_text() + if hostname in (".", ""): + continue + result.append((hostname, record.port)) + + if not result: + callback([(".", 0)]) + else: + callback(result) + return + + def resolve_address(self, hostname, callback, allow_cname = True): + """Start looking up an A or AAAA record. + + `callback` will be called with a list of (family, address) tuples + (each holiding socket.AF_* and IPv4 or IPv6 address literal) on + success. The list will be empty on error. + + :Parameters: + - `hostname`: the host name to look up + - `callback`: a function to be called with a list of received + addresses + - `allow_cname`: `True` if CNAMEs should be followed + :Types: + - `hostname`: `unicode` + - `callback`: function accepting a single argument + - `allow_cname`: `bool` + """ + if isinstance(hostname, unicode): + hostname = hostname.encode("idna").decode("us-ascii") + rtypes = [] + if self.settings["ipv6"]: + rtypes.append(("AAAA", socket.AF_INET6)) + if self.settings["ipv4"]: + rtypes.append(("A", socket.AF_INET)) + if not self.settings["prefer_ipv6"]: + rtypes.reverse() + exception = None + result = [] + for rtype, rfamily in rtypes: + try: + try: + records = dns.resolver.query(hostname, rtype) + except dns.exception.DNSException: + records = dns.resolver.query(hostname + ".", rtype) + except dns.exception.DNSException, err: + exception = err + continue + if not allow_cname and records.rrset.name != dns.name.from_text( + hostname): + logger.warning("Unexpected CNAME record found for {0!r}" + .format(hostname)) + continue + if records: + for record in records: + result.append((rfamily, record.to_text())) + + if not result and exception: + logger.warning("Could not resolve {0!r}: {1}".format(hostname, + exception.__class__.__name__)) + callback(result) + +class ThreadedResolver(): + """Base class for threaded resolvers. + + Starts worker threads, each running a blocking resolver implementation + and communicates with them to provide non-blocking asynchronous API. + """ + def __init__(self, settings = None, max_threads = 1): + if settings: + self.settings = settings + else: + self.settings = DEFAULT_SETTINGS + self.threads = [] + self.queue = Queue.Queue() + self.lock = threading.RLock() + self.max_threads = max_threads + self.last_thread_n = 0 + +def _make_resolver(self): + """Threaded resolver implementation using the DNSPython + :dns:`dns.resolver` module. + """ + return BlockingResolver(self.settings) + + def stop(self): + """Stop the resolver threads. + """ + with self.lock: + for dummy in self.threads: + self.queue.put(None) + + def _start_thread(self): + """Start a new working thread unless the maximum number of threads + has been reached or the request queue is empty. + """ + with self.lock: + if self.threads and self.queue.empty(): + return + if len(self.threads) >= self.max_threads: + return + thread_n = self.last_thread_n + 1 + self.last_thread_n = thread_n + thread = threading.Thread(target = self._run, + name = "{0!r} #{1}".format(self, thread_n), + args = (thread_n,)) + self.threads.append(thread) + thread.daemon = True + thread.start() + + def resolve_address(self, hostname, callback, allow_cname = True): + request = ("resolve_address", (hostname, callback, allow_cname)) + self._start_thread() + self.queue.put(request) + + def resolve_srv(self, domain, service, protocol, callback): + request = ("resolve_srv", (domain, service, protocol, callback)) + self._start_thread() + self.queue.put(request) + + def _run(self, thread_n): + """The thread function.""" + try: + logger.debug("{0!r}: entering thread #{1}" + .format(self, thread_n)) + resolver = self._make_resolver() + while True: + request = self.queue.get() + if request is None: + break + method, args = request + logger.debug(" calling {0!r}.{1}{2!r}" + .format(resolver, method, args)) + getattr(resolver, method)(*args) # pylint: disable=W0142 + self.queue.task_done() + logger.debug("{0!r}: leaving thread #{1}" + .format(self, thread_n)) + finally: + self.threads.remove(threading.currentThread()) + +# vi: sts=4 et sw=4 diff --git a/toys/twisted_http.py b/toys/twisted_http.py new file mode 100644 index 0000000..805c78a --- /dev/null +++ b/toys/twisted_http.py @@ -0,0 +1,26 @@ +from twisted.web import client +from twisted.internet import reactor, defer + +urls = [ + 'http://www.python.org', + 'http://stackoverflow.com', + 'http://www.twistedmatrix.com', + 'http://www.google.com', + 'http://www.google.com/toto', + 'http://www.google.com/titi', + 'http://www.google.com/tata', + 'http://launchpad.net', + 'http://github.com', + 'http://bitbucket.org', +] + +def finish(results): + for result in results: + print 'GOT PAGE', len(result), 'bytes' + reactor.stop() + +factory = client.HTTPClientFactory() +waiting = [client.getPage(url, factory) for url in urls] +defer.gatherResults(waiting).addCallback(finish) + +reactor.run() diff --git a/toys/twisted_http_persistent.py b/toys/twisted_http_persistent.py new file mode 100644 index 0000000..333bcb3 --- /dev/null +++ b/toys/twisted_http_persistent.py @@ -0,0 +1,39 @@ +from twisted.internet import reactor +from twisted.internet.defer import Deferred, DeferredList +from twisted.internet.protocol import Protocol +from twisted.web.client import Agent, HTTPConnectionPool + +class IgnoreBody(Protocol): + def __init__(self, deferred): + self.deferred = deferred + + def dataReceived(self, bytes): + pass + + def connectionLost(self, reason): + self.deferred.callback(None) + + +def cbRequest(response): + print 'Response code:', response.code + finished = Deferred() + response.deliverBody(IgnoreBody(finished)) + return finished + +pool = HTTPConnectionPool(reactor) +agent = Agent(reactor, pool=pool) + +def requestGet(url): + d = agent.request('GET', url) + d.addCallback(cbRequest) + return d + +# Two requests to the same host: +d = requestGet('http://google.com/titi').addCallback( + lambda ign: requestGet("http://google.com/tata")) +def cbShutdown(ignored): + reactor.stop() +d.addCallback(cbShutdown) + +reactor.run() + diff --git a/toys/twisted_http_simultaneous.py b/toys/twisted_http_simultaneous.py new file mode 100644 index 0000000..b9de196 --- /dev/null +++ b/toys/twisted_http_simultaneous.py @@ -0,0 +1,39 @@ +from twisted.internet import reactor +from twisted.internet.defer import Deferred, DeferredList +from twisted.internet.protocol import Protocol +from twisted.web.client import Agent, HTTPConnectionPool + +class IgnoreBody(Protocol): + def __init__(self, deferred): + self.deferred = deferred + + def dataReceived(self, bytes): + pass + + def connectionLost(self, reason): + self.deferred.callback(None) + + +def cbRequest(response): + print 'Response code:', response.code + finished = Deferred() + response.deliverBody(IgnoreBody(finished)) + return finished + +pool = HTTPConnectionPool(reactor, persistent=True) +pool.maxPersistentPerHost = 1 +agent = Agent(reactor, pool=pool) + +def requestGet(url): + d = agent.request('GET', url) + d.addCallback(cbRequest) + return d + +# Two requests to the same host: +requestGet('http://google.com/titi') +requestGet("http://google.com/tata") +requestGet("http://google.com/toto") +requestGet("http://google.com/tralala") + +reactor.run() + -- cgit v1.2.3-59-g8ed1b