keenetic-dpr-bypass/main.py
2023-09-07 11:30:22 +03:00

196 lines
5.4 KiB
Python

import httplib
import hashlib
import json
import argparse
import os
import socket
import urllib
import ConfigParser
from os.path import abspath
from netaddr import IPAddress, IPNetwork
class KeeneticAPI:
def __init__(self, destination="192.168.1.1"):
self._conn = httplib.HTTPConnection(destination)
self._cookie = ""
pass
def _get(self, url):
headers = {"Cookie": self._cookie}
self._conn.request("GET", url, "", headers)
res = self._conn.getresponse()
data = res.read()
return res, data
def _post(self, url, body):
headers = {"Content-type": "application/json", "Cookie": self._cookie}
self._conn.request("POST", url, body, headers)
res = self._conn.getresponse()
data = res.read()
return res, data
def _delete(self, url):
headers = {"Cookie": self._cookie}
self._conn.request("DELETE", url, "", headers)
res = self._conn.getresponse()
data = res.read()
return res, data
def auth(self, login, passw):
res, _ = self._get("/auth")
self._cookie = res.getheader("Set-Cookie")
if res.status == 401:
md5 = login + ":" + res.getheader("X-NDM-Realm") + ":" + passw
md5 = hashlib.md5(md5.encode('utf-8'))
sha = res.getheader("X-NDM-Challenge") + md5.hexdigest()
sha = hashlib.sha256(sha.encode('utf-8'))
self._post("/auth", json.dumps({"login": login, "password": sha.hexdigest()}))
def get_ip_route(self):
res, data = self._get("/rci/ip/route")
return json.loads(data)
def set_ip_route(self, route_data):
res, data = self._post("/rci/ip/route", json.dumps(route_data))
return res.status == 200
def remove_ip_route(self, route):
res, _ = self._delete("/rci/ip/route?" + urllib.urlencode(route))
return res.status == 200
def get_interfaces(self):
res, data = self._get("/rci/show/interface")
return json.loads(data, encoding='utf-8')
def set_ip_routes(self, routes):
res, _ = self._post("/rci/ip/route", json.dumps(routes))
return res.status == 200
def interfaces(args):
config = ConfigParser.ConfigParser()
config.read(args.config_file)
api = KeeneticAPI()
api.auth(
config.get('auth', 'login'),
config.get('auth', 'password')
)
ifaces = api.get_interfaces()
max_len = 0
for iface in ifaces:
max_len = max(len(iface), max_len)
max_len = max(len(ifaces[iface].get('description') or '-'), max_len)
print "{:^{width}} {:^{width}}".format("ID", "DESCRIPTION", width=max_len)
for iface in ifaces:
print "{:^{width}} {:^{width}}".format(iface, (ifaces[iface].get('description') or '-'), width=max_len)
def start(args):
config = ConfigParser.ConfigParser()
config.read(args.config_file)
api = KeeneticAPI()
api.auth(
config.get('auth', 'login'),
config.get('auth', 'password')
)
old_routes = api.get_ip_route()
def filter_old_routes(item):
return item['comment'].startswith('[A]')
old_routes = list(filter(filter_old_routes, old_routes))
for route in old_routes:
api.remove_ip_route(route)
routes_dir = abspath(config.get('app', 'dir'))
routes = list()
for filename in os.listdir(routes_dir):
fp = os.path.join(routes_dir, filename)
if not os.path.isfile(fp):
continue
interface_name = os.path.splitext(filename)[0]
data = list()
with open(fp) as f:
comment = ''
lst = list()
for line in f:
ln = line.strip()
if ln == '':
continue
if ln.startswith('#'):
if len(lst) > 0:
data.append(
(lst, comment)
)
comment = ln.lstrip('# ')
lst = list()
else:
lst.append(ln)
if len(lst) > 0:
data.append(
(lst, comment)
)
for group in data:
hosts, description = group
for host in hosts:
if not host[0].isdigit():
ipnet = socket.gethostbyname(host)
else:
ipnet = host
ipnetwork = IPNetwork(ipnet)
routes.append(
{
'network': str(ipnetwork.network),
'mask': str(ipnetwork.netmask),
'interface': interface_name,
'auto': True,
'comment': '[A] {}'.format(description)
}
)
api = KeeneticAPI()
api.auth(
config.get('auth', 'login'),
config.get('auth', 'password')
)
api.set_ip_routes(routes)
def main():
parser = argparse.ArgumentParser(description="a script to do stuff")
parser.add_argument("--config-file", default="app.cfg")
subparsers = parser.add_subparsers()
parser_interfaces = subparsers.add_parser('interfaces')
parser_interfaces.set_defaults(func=interfaces)
parser_start = subparsers.add_parser('start')
parser_start.set_defaults(func=start)
args = parser.parse_args()
args.func(args)
if __name__ == "__main__":
main()