keenetic-dpr-bypass/main.py

202 lines
5.5 KiB
Python

import httplib
import hashlib
import json
import argparse
import socket
import urllib
import ConfigParser
import os
import sys
parent_dir = os.path.abspath(os.path.dirname(__file__))
vendor_dir = os.path.join(parent_dir, 'vendor')
sys.path.append(vendor_dir)
from netaddr import IPNetwork
class KeeneticAPI:
def __init__(self, destination="192.168.1.1"):
self._conn = httplib.HTTPConnection(destination)
self._cookie = ""
pass
def _get(self, url):
headers = {"Cookie": self._cookie}
self._conn.request("GET", url, "", headers)
res = self._conn.getresponse()
data = res.read()
return res, data
def _post(self, url, body):
headers = {"Content-type": "application/json", "Cookie": self._cookie}
self._conn.request("POST", url, body, headers)
res = self._conn.getresponse()
data = res.read()
return res, data
def _delete(self, url):
headers = {"Cookie": self._cookie}
self._conn.request("DELETE", url, "", headers)
res = self._conn.getresponse()
data = res.read()
return res, data
def auth(self, login, passw):
res, _ = self._get("/auth")
self._cookie = res.getheader("Set-Cookie")
if res.status == 401:
md5 = login + ":" + res.getheader("X-NDM-Realm") + ":" + passw
md5 = hashlib.md5(md5.encode('utf-8'))
sha = res.getheader("X-NDM-Challenge") + md5.hexdigest()
sha = hashlib.sha256(sha.encode('utf-8'))
self._post("/auth", json.dumps({"login": login, "password": sha.hexdigest()}))
def get_ip_route(self):
res, data = self._get("/rci/ip/route")
return json.loads(data)
def set_ip_route(self, route_data):
res, data = self._post("/rci/ip/route", json.dumps(route_data))
return res.status == 200
def remove_ip_route(self, route):
res, _ = self._delete("/rci/ip/route?" + urllib.urlencode(route))
return res.status == 200
def get_interfaces(self):
res, data = self._get("/rci/show/interface")
return json.loads(data, encoding='utf-8')
def set_ip_routes(self, routes):
res, _ = self._post("/rci/ip/route", json.dumps(routes))
return res.status == 200
def interfaces(args):
config = ConfigParser.ConfigParser()
config.read(args.config_file)
api = KeeneticAPI()
api.auth(
config.get('auth', 'login'),
config.get('auth', 'password')
)
ifaces = api.get_interfaces()
max_len = 0
for iface in ifaces:
max_len = max(len(iface), max_len)
max_len = max(len(ifaces[iface].get('description') or '-'), max_len)
print "{:^{width}} {:^{width}}".format("ID", "DESCRIPTION", width=max_len)
for iface in ifaces:
print "{:^{width}} {:^{width}}".format(iface, (ifaces[iface].get('description') or '-'), width=max_len)
def start(args):
config = ConfigParser.ConfigParser()
config.read(args.config_file)
api = KeeneticAPI()
api.auth(
config.get('auth', 'login'),
config.get('auth', 'password')
)
old_routes = api.get_ip_route()
def filter_old_routes(item):
return item['comment'].startswith('[A]')
old_routes = list(filter(filter_old_routes, old_routes))
for route in old_routes:
api.remove_ip_route(route)
routes_dir = os.path.abspath(config.get('app', 'dir'))
routes = list()
for filename in os.listdir(routes_dir):
fp = os.path.join(routes_dir, filename)
if not os.path.isfile(fp):
continue
interface_name = os.path.splitext(filename)[0]
data = list()
with open(fp) as f:
comment = ''
lst = list()
for line in f:
ln = line.strip()
if ln == '':
continue
if ln.startswith('#'):
if len(lst) > 0:
data.append(
(lst, comment)
)
comment = ln.lstrip('# ')
lst = list()
else:
lst.append(ln)
if len(lst) > 0:
data.append(
(lst, comment)
)
for group in data:
hosts, description = group
for host in hosts:
if not host[0].isdigit():
ipnet = socket.gethostbyname(host)
else:
ipnet = host
ipnetwork = IPNetwork(ipnet)
routes.append(
{
'network': str(ipnetwork.network),
'mask': str(ipnetwork.netmask),
'interface': interface_name,
'auto': True,
'comment': '[A] {}'.format(description)
}
)
api = KeeneticAPI()
api.auth(
config.get('auth', 'login'),
config.get('auth', 'password')
)
api.set_ip_routes(routes)
def main():
parser = argparse.ArgumentParser(description="a script to do stuff")
parser.add_argument("--config-file", default="app.cfg")
subparsers = parser.add_subparsers()
parser_interfaces = subparsers.add_parser('interfaces')
parser_interfaces.set_defaults(func=interfaces)
parser_start = subparsers.add_parser('start')
parser_start.set_defaults(func=start)
args = parser.parse_args()
args.func(args)
if __name__ == "__main__":
main()