aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--toys/brhute_threaded.py75
-rw-r--r--toys/brhute_twisted.py75
-rw-r--r--toys/httplib_pipelining.py96
-rw-r--r--toys/pphidden_async.py8
-rw-r--r--toys/threaded_resolver.py302
-rw-r--r--toys/twisted_http.py26
-rw-r--r--toys/twisted_http_persistent.py39
-rw-r--r--toys/twisted_http_simultaneous.py39
8 files changed, 657 insertions, 3 deletions
diff --git a/toys/brhute_threaded.py b/toys/brhute_threaded.py
new file mode 100644
index 0000000..22ab2bb
--- /dev/null
+++ b/toys/brhute_threaded.py
@@ -0,0 +1,75 @@
+from collections import deque
+import time
+import httplib_pipelining
+import threading
+import threaded_resolver
+import Queue
+import warnings
+
+# grbrute - asynchronous URL fetcher using threads and HTTP pipelining
+# Makes multiple simultaneous connections and multiple requests per connections
+# 2013, Laurent Ghigonis <laurent@p1sec.com>
+
+# XXX multiple processes, autodetect and repartir connections
+# DNS resolving with asyncdns
+
+class Brhute_connection(threading.Thread):
+ def __init__(self, ip, port, queue, req_per_connection, cb_response,
+ interval=0, verbose=False):
+ threading.Thread.__init__(self)
+ self.queue = queue
+ self.ip = ip
+ self.port = port
+ self.req_per_connection = req_per_connection
+ self.cb_response_user = cb_response
+ self.interval = interval
+ self.verbose = verbose
+
+ def run(self):
+ conn = httplib_pipelining.HTTP_pipeline(self.ip, self.queue, self.cb_response_user)
+ conn.run()
+
+class Brhute_ip:
+ """ Fetch URLs from one IP
+ url_iter is the iterator that provides the URLs.
+ cb_response should return True for the processing to continue, and False
+ to terminate.
+ If you want to integrate it in a gevent driven program, use block=False"""
+ def __init__(self, url_iter, ip, port=80, cb_response=None,
+ nb_connections=3, req_per_connection=10, interval=0,
+ verbose=False, block=True):
+ warnings.warn("XXX WARNING: WORK IN PROGRESS")
+ warnings.warn("XXX WARNING: Don't expect this to work")
+ self.url_iter = url_iter
+ self.ip = ip
+ self.port = port
+ self.cb_response_user = cb_response
+ self.nb_connections = nb_connections
+ self.req_per_connection = req_per_connection
+ self.interval = interval
+ self.verbose = verbose
+
+ queue = Queue.Queue()
+
+ for i in range(nb_connections):
+ c = Brhute_connection(ip, port, queue,
+ req_per_connection, cb_response,
+ interval, verbose)
+ c.setDaemon(True)
+ c.start()
+
+ for host, url in url_iter:
+ queue.put(url)
+ queue.join()
+
+class Brhute_multi_ip:
+ """Fetch URLs from multiple IPs pointing to the same content"""
+ def __init__(self):
+ warnings.warn("XXX WARNING: WORK IN PROGRESS")
+ warnings.warn("XXX WARNING: Don't excpect this to work")
+
+class Brhute:
+ """Fetch URLs"""
+ def __init__(self):
+ warnings.warn("XXX WARNING: WORK IN PROGRESS")
+ warnings.warn("XXX WARNING: Don't excpect this to work")
diff --git a/toys/brhute_twisted.py b/toys/brhute_twisted.py
new file mode 100644
index 0000000..22ab2bb
--- /dev/null
+++ b/toys/brhute_twisted.py
@@ -0,0 +1,75 @@
+from collections import deque
+import time
+import httplib_pipelining
+import threading
+import threaded_resolver
+import Queue
+import warnings
+
+# grbrute - asynchronous URL fetcher using threads and HTTP pipelining
+# Makes multiple simultaneous connections and multiple requests per connections
+# 2013, Laurent Ghigonis <laurent@p1sec.com>
+
+# XXX multiple processes, autodetect and repartir connections
+# DNS resolving with asyncdns
+
+class Brhute_connection(threading.Thread):
+ def __init__(self, ip, port, queue, req_per_connection, cb_response,
+ interval=0, verbose=False):
+ threading.Thread.__init__(self)
+ self.queue = queue
+ self.ip = ip
+ self.port = port
+ self.req_per_connection = req_per_connection
+ self.cb_response_user = cb_response
+ self.interval = interval
+ self.verbose = verbose
+
+ def run(self):
+ conn = httplib_pipelining.HTTP_pipeline(self.ip, self.queue, self.cb_response_user)
+ conn.run()
+
+class Brhute_ip:
+ """ Fetch URLs from one IP
+ url_iter is the iterator that provides the URLs.
+ cb_response should return True for the processing to continue, and False
+ to terminate.
+ If you want to integrate it in a gevent driven program, use block=False"""
+ def __init__(self, url_iter, ip, port=80, cb_response=None,
+ nb_connections=3, req_per_connection=10, interval=0,
+ verbose=False, block=True):
+ warnings.warn("XXX WARNING: WORK IN PROGRESS")
+ warnings.warn("XXX WARNING: Don't expect this to work")
+ self.url_iter = url_iter
+ self.ip = ip
+ self.port = port
+ self.cb_response_user = cb_response
+ self.nb_connections = nb_connections
+ self.req_per_connection = req_per_connection
+ self.interval = interval
+ self.verbose = verbose
+
+ queue = Queue.Queue()
+
+ for i in range(nb_connections):
+ c = Brhute_connection(ip, port, queue,
+ req_per_connection, cb_response,
+ interval, verbose)
+ c.setDaemon(True)
+ c.start()
+
+ for host, url in url_iter:
+ queue.put(url)
+ queue.join()
+
+class Brhute_multi_ip:
+ """Fetch URLs from multiple IPs pointing to the same content"""
+ def __init__(self):
+ warnings.warn("XXX WARNING: WORK IN PROGRESS")
+ warnings.warn("XXX WARNING: Don't excpect this to work")
+
+class Brhute:
+ """Fetch URLs"""
+ def __init__(self):
+ warnings.warn("XXX WARNING: WORK IN PROGRESS")
+ warnings.warn("XXX WARNING: Don't excpect this to work")
diff --git a/toys/httplib_pipelining.py b/toys/httplib_pipelining.py
new file mode 100644
index 0000000..5f5ac4e
--- /dev/null
+++ b/toys/httplib_pipelining.py
@@ -0,0 +1,96 @@
+# Based on "python-http-pipelining" by Markus J @ ActiveState Code / Recipes
+# http://code.activestate.com/recipes/576673-python-http-pipelining/
+# Rewrote by: 2013 Laurent Ghigonis <laurent@p1sec.com>
+
+from httplib import HTTPConnection, _CS_IDLE
+import urlparse
+
+class HTTP_pipeline():
+ def __init__(self, domain, queue, cb_response,
+ max_out_bound=4, debuglevel=0):
+ print "XXX HTTP_pipeline.__init__"
+ self.queue = queue
+ self.cb_response = cb_response
+ self.max_out_bound = max_out_bound
+ self.debuglevel = debuglevel
+ self.conn = HTTPConnection(domain)
+ self.conn.set_debuglevel(debuglevel)
+ self.respobjs = list()
+ self.data = list()
+ self.headers = {'Host':domain,
+ 'Content-Length':0,
+ 'Connection':'Keep-Alive'}
+
+ def run(self):
+ print "XXX HTTP_pipeline.run"
+ while True:
+ # Send
+ out_bound = 0
+ while out_bound < self.max_out_bound:
+ page = self.queue.get()
+ if self.debuglevel > 0:
+ print 'Sending request for %r...' % (page,)
+ self.conn._HTTPConnection__state = _CS_IDLE # FU private variable!
+ self.conn.request("GET", page, None, self.headers)
+ res = self.conn.response_class(self.conn.sock, strict=self.conn.strict, method=self.conn._method)
+ self.respobjs.append(res)
+ self.data.append(None)
+ out_bound += 1
+ # Try to read a response
+ for i,resp in enumerate(self.respobjs):
+ if resp is None:
+ continue
+ if self.debuglevel > 0:
+ print 'Retrieving %r...' % (resp)
+ out_bound -= 1
+ skip_read = False
+ resp.begin()
+ if self.debuglevel > 0:
+ print ' %d %s' % (resp.status, resp.reason)
+ if 200 <= resp.status < 300:
+ # Ok
+ data = resp.read()
+ cookie = resp.getheader('Set-Cookie')
+ if cookie is not None:
+ self.headers['Cookie'] = cookie
+ skip_read = True
+ self.respobjs.remove(resp)
+ self.cb_response(resp, data)
+ elif 300 <= resp.status < 400:
+ # Redirect
+ loc = resp.getheader('Location')
+ parsed = loc and urlparse.urlparse(loc)
+ if not parsed:
+ # Missing or empty location header
+ data = (resp.status, resp.reason)
+ elif parsed.netloc != '' and parsed.netloc != host:
+ # Redirect to another host
+ data = (resp.status, resp.reason, loc)
+ else:
+ # Redirect URL
+ path = urlparse.urlunparse(parsed._replace(scheme='',netloc='',fragment=''))
+ #print ' Updated %r to %r' % (pages[i],path)
+ #pages[i] = path
+ data = (resp.status, resp.reason, path)
+ self.respobjs.remove(resp)
+ self.cb_response(resp, data)
+ elif resp.status >= 400:
+ # Failed
+ data = (resp.status, resp.reason)
+ self.respobjs.remove(resp)
+ self.cb_response(resp, data)
+ if resp.will_close:
+ # Connection (will be) closed, need to resend
+ self.conn.close()
+ if self.debuglevel > 0:
+ print ' Connection closed'
+ # XXX reconnect
+ # XXX resend
+ break
+ elif not skip_read:
+ resp.read() # read any data
+ if any(r is None for r in enumerate(self.respobjs)):
+ # Send another pending request
+ break
+ else:
+ break # All respobjs are None?
diff --git a/toys/pphidden_async.py b/toys/pphidden_async.py
index 7df3ad1..463364f 100644
--- a/toys/pphidden_async.py
+++ b/toys/pphidden_async.py
@@ -1,7 +1,7 @@
import sys
import argparse
import grbrute
-import brhute
+import brhute_threaded
# http://www.pointerpointer.com/gridPositions.json
@@ -67,10 +67,12 @@ if args.backend == "gbrute":
grbrute.Grbrute(url_iter, cb_response_grbrute, verbose=args.verbose)
elif args.backend == "brhute":
url_iter = Pp_url(args.image, args.start_x, args.start_y)
- brhute.Brhute_ip(url_iter, "207.171.163.203", # Amazon
- cb_response=cb_response_brhute, verbose=args.verbose)
+ #brhute_threaded.Brhute_ip(url_iter, "207.171.163.203", # Amazon
+ # cb_response=cb_response_brhute, verbose=args.verbose)
#brhute.Brhute_ip(url_iter, "173.194.34.14", # Google
# cb_response=cb_response_brhute, verbose=args.verbose)
+ brhute_threaded.Brhute_ip(url_iter, "www.pointerpointer.com", # Amazon
+ cb_response=cb_response_brhute, verbose=args.verbose)
else:
print "Error: Unknown backend specified"
sys.exit(1)
diff --git a/toys/threaded_resolver.py b/toys/threaded_resolver.py
new file mode 100644
index 0000000..a2d347b
--- /dev/null
+++ b/toys/threaded_resolver.py
@@ -0,0 +1,302 @@
+# From pyxmpp2 resolver.py
+# Made standalone by laurent
+# https://raw.github.com/Jajcus/pyxmpp2/master/pyxmpp2/resolver.py
+
+# (C) Copyright 2003-2011 Jacek Konieczny <jajcus@jajcus.net>
+# (C) Copyright 2013 Laurent Ghigonis <laurent@p1sec.com>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License Version
+# 2.1 as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this program; if not, write to the Free Software
+# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+#
+
+"""DNS resolever with SRV record support.
+
+Normative reference:
+ - `RFC 1035 <http://www.ietf.org/rfc/rfc1035.txt>`__
+ - `RFC 2782 <http://www.ietf.org/rfc/rfc2782.txt>`__
+"""
+
+import socket
+import random
+import logging
+import threading
+import Queue
+
+import dns.resolver
+import dns.name
+import dns.exception
+
+DEFAULT_SETTINGS = {"ipv4": True, "ipv6": False, "prefer_ipv6": False}
+
+def is_ipv6_available():
+ """Check if IPv6 is available.
+
+ :Return: `True` when an IPv6 socket can be created.
+ """
+ try:
+ socket.socket(socket.AF_INET6).close()
+ except (socket.error, AttributeError):
+ return False
+ return True
+
+def is_ipv4_available():
+ """Check if IPv4 is available.
+
+ :Return: `True` when an IPv4 socket can be created.
+ """
+ try:
+ socket.socket(socket.AF_INET).close()
+ except socket.error:
+ return False
+ return True
+
+def shuffle_srv(records):
+ """Randomly reorder SRV records using their weights.
+
+ :Parameters:
+ - `records`: SRV records to shuffle.
+ :Types:
+ - `records`: sequence of :dns:`dns.rdtypes.IN.SRV`
+
+ :return: reordered records.
+ :returntype: `list` of :dns:`dns.rdtypes.IN.SRV`"""
+ if not records:
+ return []
+ ret = []
+ while len(records) > 1:
+ weight_sum = 0
+ for rrecord in records:
+ weight_sum += rrecord.weight + 0.1
+ thres = random.random() * weight_sum
+ weight_sum = 0
+ for rrecord in records:
+ weight_sum += rrecord.weight + 0.1
+ if thres < weight_sum:
+ records.remove(rrecord)
+ ret.append(rrecord)
+ break
+ ret.append(records[0])
+ return ret
+
+def reorder_srv(records):
+ """Reorder SRV records using their priorities and weights.
+
+ :Parameters:
+ - `records`: SRV records to shuffle.
+ :Types:
+ - `records`: `list` of :dns:`dns.rdtypes.IN.SRV`
+
+ :return: reordered records.
+ :returntype: `list` of :dns:`dns.rdtypes.IN.SRV`"""
+ records = list(records)
+ records.sort()
+ ret = []
+ tmp = []
+ for rrecord in records:
+ if not tmp or rrecord.priority == tmp[0].priority:
+ tmp.append(rrecord)
+ continue
+ ret += shuffle_srv(tmp)
+ tmp = [rrecord]
+ if tmp:
+ ret += shuffle_srv(tmp)
+ return ret
+
+class BlockingResolver():
+ """Blocking resolver using the DNSPython package.
+
+ Both `resolve_srv` and `resolve_address` will block until the
+ lookup completes or fail and then call the callback immediately.
+ """
+ def __init__(self, settings = None):
+ if settings:
+ self.settings = settings
+ else:
+ self.settings = DEFAULT_SETTINGS
+
+ def resolve_srv(self, domain, service, protocol, callback):
+ """Start looking up an SRV record for `service` at `domain`.
+
+ `callback` will be called with a properly sorted list of (hostname,
+ port) pairs on success. The list will be empty on error and it will
+ contain only (".", 0) when the service is explicitely disabled.
+
+ :Parameters:
+ - `domain`: domain name to look up
+ - `service`: service name e.g. 'xmpp-client'
+ - `protocol`: protocol name, e.g. 'tcp'
+ - `callback`: a function to be called with a list of received
+ addresses
+ :Types:
+ - `domain`: `unicode`
+ - `service`: `unicode`
+ - `protocol`: `unicode`
+ - `callback`: function accepting a single argument
+ """
+ if isinstance(domain, unicode):
+ domain = domain.encode("idna").decode("us-ascii")
+ domain = "_{0}._{1}.{2}".format(service, protocol, domain)
+ try:
+ records = dns.resolver.query(domain, 'SRV')
+ except dns.exception.DNSException, err:
+ logger.warning("Could not resolve {0!r}: {1}"
+ .format(domain, err.__class__.__name__))
+ callback([])
+ return
+ if not records:
+ callback([])
+ return
+
+ result = []
+ for record in reorder_srv(records):
+ hostname = record.target.to_text()
+ if hostname in (".", ""):
+ continue
+ result.append((hostname, record.port))
+
+ if not result:
+ callback([(".", 0)])
+ else:
+ callback(result)
+ return
+
+ def resolve_address(self, hostname, callback, allow_cname = True):
+ """Start looking up an A or AAAA record.
+
+ `callback` will be called with a list of (family, address) tuples
+ (each holiding socket.AF_* and IPv4 or IPv6 address literal) on
+ success. The list will be empty on error.
+
+ :Parameters:
+ - `hostname`: the host name to look up
+ - `callback`: a function to be called with a list of received
+ addresses
+ - `allow_cname`: `True` if CNAMEs should be followed
+ :Types:
+ - `hostname`: `unicode`
+ - `callback`: function accepting a single argument
+ - `allow_cname`: `bool`
+ """
+ if isinstance(hostname, unicode):
+ hostname = hostname.encode("idna").decode("us-ascii")
+ rtypes = []
+ if self.settings["ipv6"]:
+ rtypes.append(("AAAA", socket.AF_INET6))
+ if self.settings["ipv4"]:
+ rtypes.append(("A", socket.AF_INET))
+ if not self.settings["prefer_ipv6"]:
+ rtypes.reverse()
+ exception = None
+ result = []
+ for rtype, rfamily in rtypes:
+ try:
+ try:
+ records = dns.resolver.query(hostname, rtype)
+ except dns.exception.DNSException:
+ records = dns.resolver.query(hostname + ".", rtype)
+ except dns.exception.DNSException, err:
+ exception = err
+ continue
+ if not allow_cname and records.rrset.name != dns.name.from_text(
+ hostname):
+ logger.warning("Unexpected CNAME record found for {0!r}"
+ .format(hostname))
+ continue
+ if records:
+ for record in records:
+ result.append((rfamily, record.to_text()))
+
+ if not result and exception:
+ logger.warning("Could not resolve {0!r}: {1}".format(hostname,
+ exception.__class__.__name__))
+ callback(result)
+
+class ThreadedResolver():
+ """Base class for threaded resolvers.
+
+ Starts worker threads, each running a blocking resolver implementation
+ and communicates with them to provide non-blocking asynchronous API.
+ """
+ def __init__(self, settings = None, max_threads = 1):
+ if settings:
+ self.settings = settings
+ else:
+ self.settings = DEFAULT_SETTINGS
+ self.threads = []
+ self.queue = Queue.Queue()
+ self.lock = threading.RLock()
+ self.max_threads = max_threads
+ self.last_thread_n = 0
+
+def _make_resolver(self):
+ """Threaded resolver implementation using the DNSPython
+ :dns:`dns.resolver` module.
+ """
+ return BlockingResolver(self.settings)
+
+ def stop(self):
+ """Stop the resolver threads.
+ """
+ with self.lock:
+ for dummy in self.threads:
+ self.queue.put(None)
+
+ def _start_thread(self):
+ """Start a new working thread unless the maximum number of threads
+ has been reached or the request queue is empty.
+ """
+ with self.lock:
+ if self.threads and self.queue.empty():
+ return
+ if len(self.threads) >= self.max_threads:
+ return
+ thread_n = self.last_thread_n + 1
+ self.last_thread_n = thread_n
+ thread = threading.Thread(target = self._run,
+ name = "{0!r} #{1}".format(self, thread_n),
+ args = (thread_n,))
+ self.threads.append(thread)
+ thread.daemon = True
+ thread.start()
+
+ def resolve_address(self, hostname, callback, allow_cname = True):
+ request = ("resolve_address", (hostname, callback, allow_cname))
+ self._start_thread()
+ self.queue.put(request)
+
+ def resolve_srv(self, domain, service, protocol, callback):
+ request = ("resolve_srv", (domain, service, protocol, callback))
+ self._start_thread()
+ self.queue.put(request)
+
+ def _run(self, thread_n):
+ """The thread function."""
+ try:
+ logger.debug("{0!r}: entering thread #{1}"
+ .format(self, thread_n))
+ resolver = self._make_resolver()
+ while True:
+ request = self.queue.get()
+ if request is None:
+ break
+ method, args = request
+ logger.debug(" calling {0!r}.{1}{2!r}"
+ .format(resolver, method, args))
+ getattr(resolver, method)(*args) # pylint: disable=W0142
+ self.queue.task_done()
+ logger.debug("{0!r}: leaving thread #{1}"
+ .format(self, thread_n))
+ finally:
+ self.threads.remove(threading.currentThread())
+
+# vi: sts=4 et sw=4
diff --git a/toys/twisted_http.py b/toys/twisted_http.py
new file mode 100644
index 0000000..805c78a
--- /dev/null
+++ b/toys/twisted_http.py
@@ -0,0 +1,26 @@
+from twisted.web import client
+from twisted.internet import reactor, defer
+
+urls = [
+ 'http://www.python.org',
+ 'http://stackoverflow.com',
+ 'http://www.twistedmatrix.com',
+ 'http://www.google.com',
+ 'http://www.google.com/toto',
+ 'http://www.google.com/titi',
+ 'http://www.google.com/tata',
+ 'http://launchpad.net',
+ 'http://github.com',
+ 'http://bitbucket.org',
+]
+
+def finish(results):
+ for result in results:
+ print 'GOT PAGE', len(result), 'bytes'
+ reactor.stop()
+
+factory = client.HTTPClientFactory()
+waiting = [client.getPage(url, factory) for url in urls]
+defer.gatherResults(waiting).addCallback(finish)
+
+reactor.run()
diff --git a/toys/twisted_http_persistent.py b/toys/twisted_http_persistent.py
new file mode 100644
index 0000000..333bcb3
--- /dev/null
+++ b/toys/twisted_http_persistent.py
@@ -0,0 +1,39 @@
+from twisted.internet import reactor
+from twisted.internet.defer import Deferred, DeferredList
+from twisted.internet.protocol import Protocol
+from twisted.web.client import Agent, HTTPConnectionPool
+
+class IgnoreBody(Protocol):
+ def __init__(self, deferred):
+ self.deferred = deferred
+
+ def dataReceived(self, bytes):
+ pass
+
+ def connectionLost(self, reason):
+ self.deferred.callback(None)
+
+
+def cbRequest(response):
+ print 'Response code:', response.code
+ finished = Deferred()
+ response.deliverBody(IgnoreBody(finished))
+ return finished
+
+pool = HTTPConnectionPool(reactor)
+agent = Agent(reactor, pool=pool)
+
+def requestGet(url):
+ d = agent.request('GET', url)
+ d.addCallback(cbRequest)
+ return d
+
+# Two requests to the same host:
+d = requestGet('http://google.com/titi').addCallback(
+ lambda ign: requestGet("http://google.com/tata"))
+def cbShutdown(ignored):
+ reactor.stop()
+d.addCallback(cbShutdown)
+
+reactor.run()
+
diff --git a/toys/twisted_http_simultaneous.py b/toys/twisted_http_simultaneous.py
new file mode 100644
index 0000000..b9de196
--- /dev/null
+++ b/toys/twisted_http_simultaneous.py
@@ -0,0 +1,39 @@
+from twisted.internet import reactor
+from twisted.internet.defer import Deferred, DeferredList
+from twisted.internet.protocol import Protocol
+from twisted.web.client import Agent, HTTPConnectionPool
+
+class IgnoreBody(Protocol):
+ def __init__(self, deferred):
+ self.deferred = deferred
+
+ def dataReceived(self, bytes):
+ pass
+
+ def connectionLost(self, reason):
+ self.deferred.callback(None)
+
+
+def cbRequest(response):
+ print 'Response code:', response.code
+ finished = Deferred()
+ response.deliverBody(IgnoreBody(finished))
+ return finished
+
+pool = HTTPConnectionPool(reactor, persistent=True)
+pool.maxPersistentPerHost = 1
+agent = Agent(reactor, pool=pool)
+
+def requestGet(url):
+ d = agent.request('GET', url)
+ d.addCallback(cbRequest)
+ return d
+
+# Two requests to the same host:
+requestGet('http://google.com/titi')
+requestGet("http://google.com/tata")
+requestGet("http://google.com/toto")
+requestGet("http://google.com/tralala")
+
+reactor.run()
+