From 2d833fc936dc6974fac53dd177cce9e67dc3f1c9 Mon Sep 17 00:00:00 2001 From: Laurent Ghigonis Date: Thu, 28 Feb 2013 00:33:20 +0100 Subject: dtube - Tube Definer, detube the intertubes to look into the right tube --- dtube/conf_dtube.py.template | 1 + dtube/dtube.py | 240 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 241 insertions(+) create mode 100644 dtube/conf_dtube.py.template create mode 100755 dtube/dtube.py (limited to 'dtube') diff --git a/dtube/conf_dtube.py.template b/dtube/conf_dtube.py.template new file mode 100644 index 0000000..83d7174 --- /dev/null +++ b/dtube/conf_dtube.py.template @@ -0,0 +1 @@ +url_root = 'http://127.0.0.1' diff --git a/dtube/dtube.py b/dtube/dtube.py new file mode 100755 index 0000000..076a0c3 --- /dev/null +++ b/dtube/dtube.py @@ -0,0 +1,240 @@ +#!/usr/bin/env python +# +# dtube - Tube Definer, detube the intertubes to look into the right tube +# 2013 Laurent Ghigonis +# +# Get info from AS +# /AS23118 +# AS Info - Route propagation - Prefixes - Peers - Whois - IRR +# +# Get info from IP range +# /net/213.186.33/24 +# Network Info - Whois - DNS - IRR +# +# Get info from IP +# /ip/173.194.34.1 +# XXX +# +# Get info from DNS name +# /dns/google.de +# DNS Info - Website Info - IP Info - Whois +# +# Get info from Country +# /country/US +# XXX + +# TODO +# * command line usage +# * cache + +import sys +import os +import requests +import pprint +import time +import random +import unittest +import re +from lxml import etree +from optparse import OptionParser + +import conf_dtube + +def usage(): + return "usage: %s [-t ranges|A|PTR|ip] [-f filter] [-v] range-cidr|AS|DNS|IP" % sys.argv[0] + +# XXX already exists in python libs ? +def merge_table(left, right): + content = "" + w = 0 + while True: + line = '' + if w < len(left): line += left[w].strip() + if w < len(right): line += "\t" + right[w].strip() + if line == '': + break + content += line + '\n' + w += 1 + return content + +class He: + url_headers = {'User-Agent': "Firefox"} + + @classmethod + def initTarget(cls, target, verbose=False): + match = re.search(r'^AS([0-9]*)$', target) + if match: + print "Target identified as AS %s" % match.group(1) + return HeAS(match.group(1), verbose=verbose) + match = re.search(r'^([0-9]+(?:\.[0-9]+){3}/[0-9]+)$', target) + if match: + print "Target identified as IPrange %s" % match.group(1) + return HeIPrange(match.group(1), verbose=verbose) + match = re.search(r'^([0-9]+(?:\.[0-9]+){3})$', target) + if match: + print "Target identified as IP %s" % match.group(1) + return HeIP(match.group(1), verbose=verbose) + match = re.search(r'(.*\..*)', target) + if match: + print "Target identified as DNS %s" % match.group(1) + return HeDNS(match.group(1), verbose=verbose) + match = re.search(r'^([^\.0-9]*)$', target) + if match: + print "Target identified as Country %s" % match.group(1) + return HeCountry(match.group(1), verbose=verbose) + raise Exception("Unable to identify target as AS / IPrange / IP / DNS / Country") + + def __init__(self, url, fromfile=None, verbose=False): + self.url = url + self.verbose = verbose + if fromfile: + f = open(fromfile) + self.tree = etree.parse(f, etree.HTMLParser()) + else: + if self.verbose: print url + self.html = requests.get(url, headers=He.url_headers) + if self.verbose: print self.html.status_code + if not self.html.ok: + if self.html.status_code == 404: + raise Exception("Target not found") + self.html.raise_for_status() + self.tree = etree.HTML(self.html.content) + time.sleep(random.random() * 3) # Forced to be kind + +class HeAS(He): + url_prefix = '/AS' + + def __init__(self, AS, fromfile=None, verbose=False): + self.AS = str(AS) + He.__init__(self, conf_dtube.url_root + HeAS.url_prefix + self.AS, fromfile, verbose=verbose) + self.parse() + + def parse(self): + # AS Info + info_it = self.tree.xpath('//div[@class="asinfotext"]')[0] + left_it = info_it.xpath('//div[@class="asleft"]/child::text()') + right_it = info_it.xpath('//div[@class="asright"]/a/child::text()') + self.info = merge_table(left_it, right_it) + # XXX Route propagation + # Prefixes + self.prefixes = dict() + prefixes_l = self.tree.xpath('//table[@id="table_prefixes4"]') + if prefixes_l: + prefixes_it = prefixes_l[0] + for p in prefixes_it.xpath('tbody/tr'): + self.prefixes[p.xpath('td/a/child::text()')[0]] = { + "href": p.xpath('td/a')[0].get('href'), + "description": p.xpath('td[position()=2]/child::text()')[0].strip(), + "country": p.xpath('td/div/img')[0].get('title'), + } + pp = pprint.PrettyPrinter() + self.prefixes_txt = pp.pformat(self.prefixes) + # XXX Peers + # XXX Whois + # XXX IRR + + def show(self): + print "====== INFOS ======" + print self.info + print "====== PREFIXES ======" + print self.prefixes_txt + +class HeIPrange(He): + url_prefix = '/net/' + + def __init__(self, IPRange_cidr, fromfile=None, verbose=False): + self.IPRange_cidr = str(IPRange_cidr) + He.__init__(self, conf_dtube.url_root + HeIPrange.url_prefix + self.IPRange_cidr, + fromfile, verbose=verbose) + self.parse() + + def parse(self): + # Network Info + info_it = self.tree.xpath('//div[@id="netinfo"]')[0] + self.info = [e.text for e in info_it.getiterator()] + # XXX + #self.info.remove(None) + #for tab in info_it.xpath('table'): + # head = '\t'.join(tab.xpath('tr/th/child::text()')) + # net = '\t'.join(tab.xpath('tbody/tr/td/a/child::text()')) + # desc = tab.xpath('tbody/tr/td/child::text()')[-1] + # self.info += head + '\n' + net + '\t' + desc + '\n' + # XXX Whois + # DNS + dns_l = self.tree.xpath('//div[@id="dns"]') + self.dns = dict() + if dns_l: + dns_it = dns_l[0] + for i in dns_it.xpath('table/tbody/tr'): + self.dns[i.xpath('td[position()=1]/a/child::text()')[0]] = { + "href": i.xpath('td[position()=1]/a')[0].get('href'), + "PTR": i.xpath('td[position()=2]/a/child::text()')[0], + "PTR_href": i.xpath('td[position()=2]/a')[0].get('href'), + "A_list": i.xpath('td[position()=2]/a/child::text()'), + "A_href_list": [it.get('href') for it in i.xpath('td[position()=3]/a')], + } + pp = pprint.PrettyPrinter() + self.dns_txt = pp.pformat(self.dns) + # XXX IRR + + def show(self): + print "====== INFOS ======" + print self.info + print "====== DNS ======" + print self.dns_txt + +class HeIP(He): + url_prefix = '/ip/' + + def __init__(self, IP, fromfile=None, verbose=False): + raise Exception("IP Not supported yet") + +class HeDNS(He): + url_prefix = '/ip/' + + def __init__(self, dns, fromfile=None, verbose=False): + raise Exception("DNS Not supported yet") + +class HeCountry(He): + url_prefix = '/ip/' + + def __init__(self, country, fromfile=None, verbose=False): + raise Exception("Country Not supported yet") + +class HeAS_unittest(unittest.TestCase): + def test_parse(self): + print "HeAS_unittest.test_parse()" + for t in os.listdir('tests/AS'): + tname = t[:-5] + h = HeAS(tname, fromfile="tests/AS/%s" % t) + self.assertTrue(h is not None) + +class HeIPRange_unittest(unittest.TestCase): + def test_parse(self): + print "HeIPRange_unittest.test_parse()" + for t in os.listdir('tests/IPRange'): + tname = t[:-5] + h = HeIPrange(tname, fromfile="tests/IPRange/%s" % t) + self.assertTrue(h is not None) + +if __name__ == '__main__': + if os.environ.has_key("UNITTEST"): + if len(sys.argv) > 1: + raise Exception("No arguments accepted in UNITTEST mode") + unittest.main() + sys.exit(0) + + parser = OptionParser(usage=usage()) + parser.add_option('-t', action="store", dest="query_type", default=0, type=str, + help='Query type: range, A, PTR, ip, country') + parser.add_option('-f', action="store", dest="filter", default=None, type=str, + help='Filter regex for company name') + parser.add_option('-v', action="store_true", dest="verbose", default=False, + help='verbose') + (options, args) = parser.parse_args() + if (len(args) != 1): + print usage() + sys.exit(1) + target = args[0] + he = He.initTarget(target, verbose=options.verbose) + he.show() -- cgit v1.2.3-59-g8ed1b