#!/usr/bin/env python # # dtube - Tube Definer, detube the intertubes to look into the right tube # 2013 Laurent Ghigonis # # TODO # * dns country # * -a show all (href ...) # * multiple requests when result truncated # * parser.add_option('-t', action="store", dest="query_type", default=0, type=str, # help='Query type: range, A, PTR, ip, country') # * parser.add_option('-f', action="store", dest="filter", default=None, type=str, # help='Filter regex for company name') import sys import os import requests import requests_cache # pip install requests_cache import pprint import time import random import re from lxml import etree from optparse import OptionParser import conf_dtube def usage(): return """usage: %s [-h] [-v] AS | range-cidr | IP | DNS | country Examples: %s AS23118 %s 91.220.156.0/24 %s 208.67.222.222 %s ovh.net %s US""" % (sys.argv[0], sys.argv[0], sys.argv[0], sys.argv[0], sys.argv[0], sys.argv[0]) # XXX already exists in python libs ? def merge_table(left, right): content = "" w = 0 while True: line = '' if w < len(left): line += left[w].strip() if w < len(right): line += "\t" + right[w].strip() if line == '': break content += line + '\n' w += 1 return content class He: url_headers = {'User-Agent': conf_dtube.url_useragent} @classmethod def initTarget(cls, target, verbose=False): match = re.search(r'^AS([0-9]*)$', target) if match: if verbose: print "Target identified as AS %s" % match.group(1) return HeAS(match.group(1), verbose=verbose) match = re.search(r'^([0-9]+(?:\.[0-9]+){3}/[0-9]+)$', target) if match: if verbose: print "Target identified as IPrange %s" % match.group(1) try: he = HeIPrange(match.group(1), verbose=verbose) except Exception, e: if str(e)[:16] == "Target not found": print e print "Trying to get closest range announcements" ip = HeIP(match.group(1).split('/')[0], verbose=verbose) as_target = max(ip.AS.iterkeys(), key=(lambda key: ip.AS[key]['position'])) iprange = ip.AS[as_target]["IPrange"] print "New IPrange selected: %s\n" % iprange he = HeIPrange(iprange, verbose=verbose) else: raise e return he match = re.search(r'^([0-9]+(?:\.[0-9]+){3})$', target) if match: if verbose: print "Target identified as IP %s" % match.group(1) return HeIP(match.group(1), verbose=verbose) match = re.search(r'(.*\..*)', target) if match: if verbose: print "Target identified as DNS %s" % match.group(1) return HeDNS(match.group(1), verbose=verbose) match = re.search(r'^([^\.0-9]*)$', target) if match: if verbose: print "Target identified as Country %s" % match.group(1) return HeCountry(match.group(1), verbose=verbose) raise Exception("Unable to identify target as AS / IPrange / IP / DNS / Country") @classmethod def parse_AS_announces_table(cls, table_it): as_it = table_it.xpath('tbody/tr') AS = dict() position = 0 if as_it: for a in as_it: AS[a.xpath('td[position()=1]/a/child::text()')[0]] = { "position": position, "AS_href": a.xpath('td[position()=1]/a')[0].get('href'), "IPrange": a.xpath('td[position()=2]/a/child::text()')[0], "IPrange_href": a.xpath('td[position()=2]/a')[0].get('href'), "description": a.xpath('td[position()=3]/child::text()')[0] } position += 1 AS_txt = "" if len(AS.keys()) > 0: for a in AS: AS_txt += a + '\t' + AS[a]["IPrange"] \ + '\t' + AS[a]["description"] + '\n' return AS, AS_txt def __init__(self, url, fromfile=None, verbose=False): self.url = url self.verbose = verbose if fromfile: f = open(fromfile) self.tree = etree.parse(f, etree.HTMLParser()) else: if self.verbose: print url self.html = requests.get(url, headers=He.url_headers) if self.verbose: print self.html.status_code if not self.html.ok: self.html.raise_for_status() self.tree = etree.HTML(self.html.content) if not getattr(self.html, 'from_cache', False): time.sleep(random.random() * 3) # Forced to be kind if self.verbose and self.html.from_cache: print "Response from cache" err = re.search(r'ERROR: (.*) Not Found', self.tree.xpath('//title')[0].text) if err: raise Exception("Target not found : %s" % err.group(1)) class HeAS(He): url_prefix = '/AS' def __init__(self, AS, fromfile=None, verbose=False): self.AS = str(AS) He.__init__(self, conf_dtube.url_root + HeAS.url_prefix + self.AS, fromfile, verbose=verbose) self.parse() def parse(self): # AS Info info_it = self.tree.xpath('//div[@class="asinfotext"]')[0] left_it = info_it.xpath('//div[@class="asleft"]/child::text()') right_it = info_it.xpath('//div[@class="asright"]/a/child::text()') self.info = merge_table(left_it, right_it) # XXX Route propagation # Prefixes self.prefixes = dict() prefixes_l = self.tree.xpath('//table[@id="table_prefixes4"]') if prefixes_l: prefixes_it = prefixes_l[0] for p in prefixes_it.xpath('tbody/tr'): self.prefixes[p.xpath('td/a/child::text()')[0]] = { "href": p.xpath('td/a')[0].get('href'), "description": p.xpath('td[position()=2]/child::text()')[0].strip(), "country": p.xpath('td/div/img')[0].get('title') if p.xpath('td/div/img') else None, } self.prefixes_txt = "" for p in self.prefixes.keys(): self.prefixes_txt += p \ + '\t' + self.prefixes[p]["description"] \ + '\t(' + str(self.prefixes[p]["country"]) + ')' + '\n' # XXX Peers # XXX Whois # XXX IRR def show(self): print "====== INFO ======" print self.info print "====== PREFIXES (%d) ======" % len(self.prefixes) print "Range\t\tDescription\t(Country)" print self.prefixes_txt class HeIPrange(He): url_prefix = '/net/' def __init__(self, IPRange_cidr, fromfile=None, verbose=False): self.IPRange_cidr = str(IPRange_cidr) He.__init__(self, conf_dtube.url_root + HeIPrange.url_prefix + self.IPRange_cidr, fromfile, verbose=verbose) self.parse() def parse(self): # Network Info info_it = self.tree.xpath('//div[@id="netinfo"]/table')[0] (self.AS, self.AS_txt) = He.parse_AS_announces_table(info_it) # XXX Whois # DNS dns_l = self.tree.xpath('//div[@id="dns"]') self.dns = dict() if dns_l: dns_it = dns_l[0] for i in dns_it.xpath('table/tbody/tr'): self.dns[i.xpath('td[position()=1]/a/child::text()')[0]] = { "href": i.xpath('td[position()=1]/a')[0].get('href'), "PTR": i.xpath('td[position()=2]/a/child::text()')[0] if len(i.xpath('td[position()=2]/a/child::text()')) > 0 else "", "PTR_href": i.xpath('td[position()=2]/a')[0].get('href') if len(i.xpath('td[position()=2]/a')) > 0 else "", "A_list": i.xpath('td[position()=3]/a/child::text()'), "A_href_list": [it.get('href') for it in i.xpath('td[position()=3]/a')], } self.dns_txt = "" for d in self.dns.keys(): self.dns_txt += d + '\t' + self.dns[d]["PTR"] if len(self.dns[d]["A_list"]) > 0: self.dns_txt += ' | ' + ' '.join(self.dns[d]["A_list"]) self.dns_txt += '\n' # XXX IRR def show(self): print "====== AS (%d) ======" % len(self.AS) print "AS\tIPrange\t\tDescription" print self.AS_txt print "====== DNS (%d) ======" % len(self.dns) print "IP\t\tPTR\t[| A]" print self.dns_txt class HeIP(He): url_prefix = '/ip/' def __init__(self, IP, fromfile=None, verbose=False): self.IP = str(IP) He.__init__(self, conf_dtube.url_root + HeIP.url_prefix + self.IP, fromfile, verbose=verbose) self.parse() def parse(self): # IP Info info_it = self.tree.xpath('//div[@id="ipinfo"]/table')[0] (self.AS, self.AS_txt) = He.parse_AS_announces_table(info_it) # XXX Whois # DNS dns_it = self.tree.xpath('//div[@id="dns"]')[0] self.dns = dict() self.dns["PTR"] = dns_it.xpath('a[position()=1]/child::text()')[0] if len(dns_it.xpath('a[position()=1]/child::text()')) > 0 else "" self.dns["PTR_href"] = dns_it.xpath('a[position()=1]')[0].get('href') if len(dns_it.xpath('a[position()=1]')) > 0 else "" self.dns["A_list"] = dns_it.xpath('a[position()>1]/child::text()') self.dns["A_href_list"] = [a.get('href') for a in dns_it.xpath('a[position()>1]')] self.dns_txt = "%s\n" % self.dns["PTR"] if len(self.dns.keys()) > 0: self.dns_txt += ' '.join(self.dns["A_list"]) + '\n' # XXX RBL def show(self): print "====== AS (%d) ======" % len(self.AS) print "AS\tIPrange\t\tDescription" print self.AS_txt print "====== DNS (%d) ======" % len(self.dns) print "PTR and A records" print self.dns_txt class HeDNS(He): url_prefix = '/ip/' def __init__(self, dns, fromfile=None, verbose=False): raise Exception("DNS Not supported yet") class HeCountry(He): url_prefix = '/ip/' def __init__(self, country, fromfile=None, verbose=False): raise Exception("Country Not supported yet") if __name__ == '__main__': parser = OptionParser(usage=usage()) parser.add_option('-n', action="store_true", dest="url_nocache", default=False, help='Do not cache answers') parser.add_option('-v', action="store_true", dest="verbose", default=False, help='Verbose') (options, args) = parser.parse_args() if (len(args) != 1): print usage() sys.exit(1) target = args[0] if options.url_nocache is False: requests_cache.install_cache('cache_dtube') print "%s\n" % target he = He.initTarget(target, verbose=options.verbose) he.show()