aboutsummaryrefslogtreecommitdiffstats
path: root/autoscan/autoscan.py
diff options
context:
space:
mode:
Diffstat (limited to 'autoscan/autoscan.py')
-rwxr-xr-x[-rw-r--r--]autoscan/autoscan.py115
1 files changed, 95 insertions, 20 deletions
diff --git a/autoscan/autoscan.py b/autoscan/autoscan.py
index f614db3..f3a5734 100644..100755
--- a/autoscan/autoscan.py
+++ b/autoscan/autoscan.py
@@ -6,42 +6,59 @@
# Usage: autoscan.py [interfaces]
# by default, monitor all network interfaces
+# Should work on all Linux versions
# Each time network connectivity become available after a cut-off,
-# run some tests and store results in a file db
+# run some tests and store results in
+# YYYYMMDD_hhmmss_interface/testname/output"
# * ifconfig
# * if WIFI, iwconfig
+# * /etc/resolv.conf
+# * 15s pcap
# * route -n
# * traceroute
# * local net IP scan
# * public IP (curl ifconfig.me)
# * ping 8.8.8.8
+# TODO
+# rename log directory to YYYYMMDD_hhmmss_interface_[pubip/localip]
+
import sys
+import os
import time
import subprocess
+import traceback
+import re
+import argparse
-class Mon_iface(object):
+class Autoscan_iface(object):
PUBIP = "8.8.8.8"
- def __init__(self, iface, logpath):
+ def __init__(self, iface, logpath=".", verbose=False):
self.iface = iface
self.logpath = logpath
+ self.verbose = verbose
self.date = None # set by _do_tests()
- def run(self):
+ def monitor(self):
self._do_tests()
while True:
self._wait_down()
self._wait_up()
self._do_tests()
+ def run_now(self):
+ self._do_tests()
+
def _wait_up(self):
while True:
out, err, code = self._exec(
['ifconfig', self.iface])
up = re.search(r'UP', out)
- if up:
+ ip4 = re.search(r'inet (\S+)', out)
+ ip6 = re.search(r'inet6 (\S+)', out)
+ if up and (ip4 or ip6):
break
time.sleep(1)
@@ -52,17 +69,28 @@ class Mon_iface(object):
up = re.search(r'UP', out)
if not up:
break
+ # XXX also consider sleep as interface down
time.sleep(1)
def _do_tests(self):
self.date = time.strftime("%Y%m%d_%H%M%S", time.gmtime())
- self._test_ifconfig()
- self._test_iwconfig()
- self._test_route()
- self._test_scan()
- self._test_pubip_get()
- self._test_pubip_ping()
- self._test_pubip_traceroute()
+ self._do_tests_run(self._test_ifconfig)
+ self._do_tests_run(self._test_iwconfig)
+ self._do_tests_run(self._test_route)
+ #self._do_tests_run(self._test_resolv)
+ #self._do_tests_run(self._test_pubip_get)
+ self._do_tests_run(self._test_pubip_ping)
+ #self._do_tests_run(self._test_pubip_traceroute)
+ #self._do_tests_run(self._test_pcap)
+ #self._do_tests_run(self._test_scan)
+ # XXX rename dir
+
+ def _do_tests_run(self, func):
+ try:
+ func()
+ except Exception, e:
+ print("test %s failed: %s" % (func, e))
+ traceback.print_exc()
def _test_ifconfig(self):
out, err, code = self._exec(
@@ -75,6 +103,24 @@ class Mon_iface(object):
ip6 = re.search(r'inet6 (\S+)', out)
if ip6: self._store("ifconfig/ip6", ip6.group(1))
+ def _test_iwconfig(self):
+ out, err, code = self._exec(
+ ['iwconfig', self.iface])
+ if len(out) == 0:
+ return # not a WIFI interface
+ self._store("iwconfig/out", out)
+ essid = re.search(r'ESSID:(\S+)', out)
+ if essid: self._store("iwconfig/essid", essid.group(1))
+ ap = re.search(r'Access Point: (\S+)', out)
+ if ap: self._store("iwconfig/ap", ap.group(1))
+
+ def _test_route(self):
+ out, err, code = self._exec(
+ ['route', '-n'])
+ self._store("route/out", out)
+ gw = re.findall(r'(\S+)', out.split('\n')[2])[1]
+ if gw: self._store("route/gw", gw)
+
def _test_pubip_ping(self):
out, err, code = self._exec(
['ping', '-W', '3', '-c', '1', self.PUBIP])
@@ -87,17 +133,46 @@ class Mon_iface(object):
out, err = p.communicate()
return out, err, p.returncode
- def _store(self, suffix, val):
+ def _store(self, suffix, txt):
name = "%s/%s_%s/%s" % (self.logpath,
self.date, self.iface, suffix)
+ d = os.path.dirname(name)
+ if not os.path.isdir(d):
+ os.makedirs(d)
+ if self.verbose:
+ print("%s = %s" % (name, txt))
f = open(name, "w+")
- f.write(txt)
+ f.write(str(txt))
f.close()
-logpath = "."
-
-# XXX netifaces
-# XXX thread per interface
-Mon_iface("eth0", logpath)
-
+# XXX all ifaces by default, use netifaces
+
+parser = argparse.ArgumentParser()
+parser.add_argument("interfaces", nargs='+',
+ help="Interfaces to use")
+parser.add_argument("-f", "--foreground", action="store_true",
+ help="Run in foreground, do not daemonize")
+parser.add_argument("-o", "--outdir", action="store", default=".",
+ help="increase output verbosity")
+parser.add_argument("-r", "--runnow", action="store_true",
+ help="Run tests/scans now and exit")
+parser.add_argument("-v", "--verbose", action="store_true",
+ help="increase output verbosity")
+args = parser.parse_args()
+
+for iface in args.interfaces:
+ pid = os.fork()
+ if pid == 0:
+ autoscan = Autoscan_iface(iface, args.outdir, args.verbose)
+ if args.runnow:
+ autoscan.run_now()
+ else:
+ autoscan.monitor()
+ # UNREACHED
+
+if args.foreground:
+ while True:
+ try: os.wait() # XXX wait all pids ?
+ except: break
+