449 lines
14 KiB
Python
449 lines
14 KiB
Python
|
#!/usr/bin/env python
|
||
|
#-----------------------------------------------------------------------------
|
||
|
# Copyright (c) 2008 by David P. D. Moss. All rights reserved.
|
||
|
#
|
||
|
# Released under the BSD license. See the LICENSE file for details.
|
||
|
#-----------------------------------------------------------------------------
|
||
|
#
|
||
|
# DISCLAIMER
|
||
|
#
|
||
|
# netaddr is not sponsored nor endorsed by IANA.
|
||
|
#
|
||
|
# Use of data from IANA (Internet Assigned Numbers Authority) is subject to
|
||
|
# copyright and is provided with prior written permission.
|
||
|
#
|
||
|
# IANA data files included with netaddr are not modified in any way but are
|
||
|
# parsed and made available to end users through an API.
|
||
|
#
|
||
|
# See README file and source code for URLs to latest copies of the relevant
|
||
|
# files.
|
||
|
#
|
||
|
#-----------------------------------------------------------------------------
|
||
|
"""
|
||
|
Routines for accessing data published by IANA (Internet Assigned Numbers
|
||
|
Authority).
|
||
|
|
||
|
More details can be found at the following URLs :-
|
||
|
|
||
|
- IANA Home Page - http://www.iana.org/
|
||
|
- IEEE Protocols Information Home Page - http://www.iana.org/protocols/
|
||
|
"""
|
||
|
|
||
|
import sys as _sys
|
||
|
from xml.sax import make_parser, handler
|
||
|
|
||
|
from netaddr.core import Publisher, Subscriber
|
||
|
from netaddr.ip import IPAddress, IPNetwork, IPRange, cidr_abbrev_to_verbose
|
||
|
from netaddr.compat import _dict_items, _callable, _importlib_resources
|
||
|
|
||
|
|
||
|
|
||
|
#: Topic based lookup dictionary for IANA information.
|
||
|
IANA_INFO = {
|
||
|
'IPv4': {},
|
||
|
'IPv6': {},
|
||
|
'IPv6_unicast': {},
|
||
|
'multicast': {},
|
||
|
}
|
||
|
|
||
|
|
||
|
class SaxRecordParser(handler.ContentHandler):
|
||
|
def __init__(self, callback=None):
|
||
|
self._level = 0
|
||
|
self._is_active = False
|
||
|
self._record = None
|
||
|
self._tag_level = None
|
||
|
self._tag_payload = None
|
||
|
self._tag_feeding = None
|
||
|
self._callback = callback
|
||
|
|
||
|
def startElement(self, name, attrs):
|
||
|
self._level += 1
|
||
|
|
||
|
if self._is_active is False:
|
||
|
if name == 'record':
|
||
|
self._is_active = True
|
||
|
self._tag_level = self._level
|
||
|
self._record = {}
|
||
|
if 'date' in attrs:
|
||
|
self._record['date'] = attrs['date']
|
||
|
elif self._level == self._tag_level + 1:
|
||
|
if name == 'xref':
|
||
|
if 'type' in attrs and 'data' in attrs:
|
||
|
l = self._record.setdefault(attrs['type'], [])
|
||
|
l.append(attrs['data'])
|
||
|
else:
|
||
|
self._tag_payload = []
|
||
|
self._tag_feeding = True
|
||
|
else:
|
||
|
self._tag_feeding = False
|
||
|
|
||
|
def endElement(self, name):
|
||
|
if self._is_active is True:
|
||
|
if name == 'record' and self._tag_level == self._level:
|
||
|
self._is_active = False
|
||
|
self._tag_level = None
|
||
|
if _callable(self._callback):
|
||
|
self._callback(self._record)
|
||
|
self._record = None
|
||
|
elif self._level == self._tag_level + 1:
|
||
|
if name != 'xref':
|
||
|
self._record[name] = ''.join(self._tag_payload)
|
||
|
self._tag_payload = None
|
||
|
self._tag_feeding = False
|
||
|
|
||
|
self._level -= 1
|
||
|
|
||
|
def characters(self, content):
|
||
|
if self._tag_feeding is True:
|
||
|
self._tag_payload.append(content)
|
||
|
|
||
|
|
||
|
class XMLRecordParser(Publisher):
|
||
|
"""
|
||
|
A configurable Parser that understands how to parse XML based records.
|
||
|
"""
|
||
|
|
||
|
def __init__(self, fh, **kwargs):
|
||
|
"""
|
||
|
Constructor.
|
||
|
|
||
|
fh - a valid, open file handle to XML based record data.
|
||
|
"""
|
||
|
super(XMLRecordParser, self).__init__()
|
||
|
|
||
|
self.xmlparser = make_parser()
|
||
|
self.xmlparser.setContentHandler(SaxRecordParser(self.consume_record))
|
||
|
|
||
|
self.fh = fh
|
||
|
|
||
|
self.__dict__.update(kwargs)
|
||
|
|
||
|
def process_record(self, rec):
|
||
|
"""
|
||
|
This is the callback method invoked for every record. It is usually
|
||
|
over-ridden by base classes to provide specific record-based logic.
|
||
|
|
||
|
Any record can be vetoed (not passed to registered Subscriber objects)
|
||
|
by simply returning None.
|
||
|
"""
|
||
|
return rec
|
||
|
|
||
|
def consume_record(self, rec):
|
||
|
record = self.process_record(rec)
|
||
|
if record is not None:
|
||
|
self.notify(record)
|
||
|
|
||
|
def parse(self):
|
||
|
"""
|
||
|
Parse and normalises records, notifying registered subscribers with
|
||
|
record data as it is encountered.
|
||
|
"""
|
||
|
self.xmlparser.parse(self.fh)
|
||
|
|
||
|
|
||
|
class IPv4Parser(XMLRecordParser):
|
||
|
"""
|
||
|
A XMLRecordParser that understands how to parse and retrieve data records
|
||
|
from the IANA IPv4 address space file.
|
||
|
|
||
|
It can be found online here :-
|
||
|
|
||
|
- http://www.iana.org/assignments/ipv4-address-space/ipv4-address-space.xml
|
||
|
"""
|
||
|
|
||
|
def __init__(self, fh, **kwargs):
|
||
|
"""
|
||
|
Constructor.
|
||
|
|
||
|
fh - a valid, open file handle to an IANA IPv4 address space file.
|
||
|
|
||
|
kwargs - additional parser options.
|
||
|
"""
|
||
|
super(IPv4Parser, self).__init__(fh)
|
||
|
|
||
|
def process_record(self, rec):
|
||
|
"""
|
||
|
Callback method invoked for every record.
|
||
|
|
||
|
See base class method for more details.
|
||
|
"""
|
||
|
|
||
|
record = {}
|
||
|
for key in ('prefix', 'designation', 'date', 'whois', 'status'):
|
||
|
record[key] = str(rec.get(key, '')).strip()
|
||
|
|
||
|
# Strip leading zeros from octet.
|
||
|
if '/' in record['prefix']:
|
||
|
(octet, prefix) = record['prefix'].split('/')
|
||
|
record['prefix'] = '%d/%d' % (int(octet), int(prefix))
|
||
|
|
||
|
record['status'] = record['status'].capitalize()
|
||
|
|
||
|
return record
|
||
|
|
||
|
|
||
|
class IPv6Parser(XMLRecordParser):
|
||
|
"""
|
||
|
A XMLRecordParser that understands how to parse and retrieve data records
|
||
|
from the IANA IPv6 address space file.
|
||
|
|
||
|
It can be found online here :-
|
||
|
|
||
|
- http://www.iana.org/assignments/ipv6-address-space/ipv6-address-space.xml
|
||
|
"""
|
||
|
|
||
|
def __init__(self, fh, **kwargs):
|
||
|
"""
|
||
|
Constructor.
|
||
|
|
||
|
fh - a valid, open file handle to an IANA IPv6 address space file.
|
||
|
|
||
|
kwargs - additional parser options.
|
||
|
"""
|
||
|
super(IPv6Parser, self).__init__(fh)
|
||
|
|
||
|
def process_record(self, rec):
|
||
|
"""
|
||
|
Callback method invoked for every record.
|
||
|
|
||
|
See base class method for more details.
|
||
|
"""
|
||
|
|
||
|
record = {
|
||
|
'prefix': str(rec.get('prefix', '')).strip(),
|
||
|
'allocation': str(rec.get('description', '')).strip(),
|
||
|
# HACK: -1 instead of 0 is a hacky hack to get 4291 instead of 3513 from
|
||
|
#
|
||
|
# <xref type="rfc" data="rfc3513"/> was later obsoleted by <xref type="rfc" data="rfc4291"/>
|
||
|
#
|
||
|
# I imagine there's no way to solve this in a general way, maybe we should start returning a list
|
||
|
# of RFC-s here?
|
||
|
'reference': str(rec.get('rfc', [''])[-1]).strip(),
|
||
|
}
|
||
|
|
||
|
return record
|
||
|
|
||
|
|
||
|
class IPv6UnicastParser(XMLRecordParser):
|
||
|
"""
|
||
|
A XMLRecordParser that understands how to parse and retrieve data records
|
||
|
from the IANA IPv6 unicast address assignments file.
|
||
|
|
||
|
It can be found online here :-
|
||
|
|
||
|
- http://www.iana.org/assignments/ipv6-unicast-address-assignments/ipv6-unicast-address-assignments.xml
|
||
|
"""
|
||
|
def __init__(self, fh, **kwargs):
|
||
|
"""
|
||
|
Constructor.
|
||
|
|
||
|
fh - a valid, open file handle to an IANA IPv6 address space file.
|
||
|
|
||
|
kwargs - additional parser options.
|
||
|
"""
|
||
|
super(IPv6UnicastParser, self).__init__(fh)
|
||
|
|
||
|
def process_record(self, rec):
|
||
|
"""
|
||
|
Callback method invoked for every record.
|
||
|
|
||
|
See base class method for more details.
|
||
|
"""
|
||
|
record = {
|
||
|
'status': str(rec.get('status', '')).strip(),
|
||
|
'description': str(rec.get('description', '')).strip(),
|
||
|
'prefix': str(rec.get('prefix', '')).strip(),
|
||
|
'date': str(rec.get('date', '')).strip(),
|
||
|
'whois': str(rec.get('whois', '')).strip(),
|
||
|
}
|
||
|
|
||
|
return record
|
||
|
|
||
|
|
||
|
class MulticastParser(XMLRecordParser):
|
||
|
"""
|
||
|
A XMLRecordParser that knows how to process the IANA IPv4 multicast address
|
||
|
allocation file.
|
||
|
|
||
|
It can be found online here :-
|
||
|
|
||
|
- http://www.iana.org/assignments/multicast-addresses/multicast-addresses.xml
|
||
|
"""
|
||
|
|
||
|
def __init__(self, fh, **kwargs):
|
||
|
"""
|
||
|
Constructor.
|
||
|
|
||
|
fh - a valid, open file handle to an IANA IPv4 multicast address
|
||
|
allocation file.
|
||
|
|
||
|
kwargs - additional parser options.
|
||
|
"""
|
||
|
super(MulticastParser, self).__init__(fh)
|
||
|
|
||
|
def normalise_addr(self, addr):
|
||
|
"""
|
||
|
Removes variations from address entries found in this particular file.
|
||
|
"""
|
||
|
if '-' in addr:
|
||
|
(a1, a2) = addr.split('-')
|
||
|
o1 = a1.strip().split('.')
|
||
|
o2 = a2.strip().split('.')
|
||
|
return '%s-%s' % ('.'.join([str(int(i)) for i in o1]),
|
||
|
'.'.join([str(int(i)) for i in o2]))
|
||
|
else:
|
||
|
o1 = addr.strip().split('.')
|
||
|
return '.'.join([str(int(i)) for i in o1])
|
||
|
|
||
|
def process_record(self, rec):
|
||
|
"""
|
||
|
Callback method invoked for every record.
|
||
|
|
||
|
See base class method for more details.
|
||
|
"""
|
||
|
|
||
|
if 'addr' in rec:
|
||
|
record = {
|
||
|
'address': self.normalise_addr(str(rec['addr'])),
|
||
|
'descr': str(rec.get('description', '')),
|
||
|
}
|
||
|
return record
|
||
|
|
||
|
|
||
|
class DictUpdater(Subscriber):
|
||
|
"""
|
||
|
Concrete Subscriber that inserts records received from a Publisher into a
|
||
|
dictionary.
|
||
|
"""
|
||
|
|
||
|
def __init__(self, dct, topic, unique_key):
|
||
|
"""
|
||
|
Constructor.
|
||
|
|
||
|
dct - lookup dict or dict like object to insert records into.
|
||
|
|
||
|
topic - high-level category name of data to be processed.
|
||
|
|
||
|
unique_key - key name in data dict that uniquely identifies it.
|
||
|
"""
|
||
|
self.dct = dct
|
||
|
self.topic = topic
|
||
|
self.unique_key = unique_key
|
||
|
|
||
|
def update(self, data):
|
||
|
"""
|
||
|
Callback function used by Publisher to notify this Subscriber about
|
||
|
an update. Stores topic based information into dictionary passed to
|
||
|
constructor.
|
||
|
"""
|
||
|
data_id = data[self.unique_key]
|
||
|
|
||
|
if self.topic == 'IPv4':
|
||
|
cidr = IPNetwork(cidr_abbrev_to_verbose(data_id))
|
||
|
self.dct[cidr] = data
|
||
|
elif self.topic == 'IPv6':
|
||
|
cidr = IPNetwork(cidr_abbrev_to_verbose(data_id))
|
||
|
self.dct[cidr] = data
|
||
|
elif self.topic == 'IPv6_unicast':
|
||
|
cidr = IPNetwork(data_id)
|
||
|
self.dct[cidr] = data
|
||
|
elif self.topic == 'multicast':
|
||
|
iprange = None
|
||
|
if '-' in data_id:
|
||
|
# See if we can manage a single CIDR.
|
||
|
(first, last) = data_id.split('-')
|
||
|
iprange = IPRange(first, last)
|
||
|
cidrs = iprange.cidrs()
|
||
|
if len(cidrs) == 1:
|
||
|
iprange = cidrs[0]
|
||
|
else:
|
||
|
iprange = IPAddress(data_id)
|
||
|
self.dct[iprange] = data
|
||
|
|
||
|
|
||
|
def load_info():
|
||
|
"""
|
||
|
Parse and load internal IANA data lookups with the latest information from
|
||
|
data files.
|
||
|
"""
|
||
|
ipv4 = IPv4Parser(_importlib_resources.open_binary(__package__, 'ipv4-address-space.xml'))
|
||
|
ipv4.attach(DictUpdater(IANA_INFO['IPv4'], 'IPv4', 'prefix'))
|
||
|
ipv4.parse()
|
||
|
|
||
|
ipv6 = IPv6Parser(_importlib_resources.open_binary(__package__, 'ipv6-address-space.xml'))
|
||
|
ipv6.attach(DictUpdater(IANA_INFO['IPv6'], 'IPv6', 'prefix'))
|
||
|
ipv6.parse()
|
||
|
|
||
|
ipv6ua = IPv6UnicastParser(
|
||
|
_importlib_resources.open_binary(__package__, 'ipv6-unicast-address-assignments.xml'),
|
||
|
)
|
||
|
ipv6ua.attach(DictUpdater(IANA_INFO['IPv6_unicast'], 'IPv6_unicast', 'prefix'))
|
||
|
ipv6ua.parse()
|
||
|
|
||
|
mcast = MulticastParser(_importlib_resources.open_binary(__package__, 'multicast-addresses.xml'))
|
||
|
mcast.attach(DictUpdater(IANA_INFO['multicast'], 'multicast', 'address'))
|
||
|
mcast.parse()
|
||
|
|
||
|
|
||
|
def pprint_info(fh=None):
|
||
|
"""
|
||
|
Pretty prints IANA information to filehandle.
|
||
|
"""
|
||
|
if fh is None:
|
||
|
fh = _sys.stdout
|
||
|
|
||
|
for category in sorted(IANA_INFO):
|
||
|
fh.write('-' * len(category) + "\n")
|
||
|
fh.write(category + "\n")
|
||
|
fh.write('-' * len(category) + "\n")
|
||
|
ipranges = IANA_INFO[category]
|
||
|
for iprange in sorted(ipranges):
|
||
|
details = ipranges[iprange]
|
||
|
fh.write('%-45r' % (iprange) + details + "\n")
|
||
|
|
||
|
|
||
|
def _within_bounds(ip, ip_range):
|
||
|
# Boundary checking for multiple IP classes.
|
||
|
if hasattr(ip_range, 'first'):
|
||
|
# IP network or IP range.
|
||
|
return ip in ip_range
|
||
|
elif hasattr(ip_range, 'value'):
|
||
|
# IP address.
|
||
|
return ip == ip_range
|
||
|
|
||
|
raise Exception('Unsupported IP range or address: %r!' % (ip_range,))
|
||
|
|
||
|
|
||
|
def query(ip_addr):
|
||
|
"""Returns informational data specific to this IP address."""
|
||
|
info = {}
|
||
|
|
||
|
if ip_addr.version == 4:
|
||
|
for cidr, record in _dict_items(IANA_INFO['IPv4']):
|
||
|
if _within_bounds(ip_addr, cidr):
|
||
|
info.setdefault('IPv4', [])
|
||
|
info['IPv4'].append(record)
|
||
|
|
||
|
if ip_addr.is_multicast():
|
||
|
for iprange, record in _dict_items(IANA_INFO['multicast']):
|
||
|
if _within_bounds(ip_addr, iprange):
|
||
|
info.setdefault('Multicast', [])
|
||
|
info['Multicast'].append(record)
|
||
|
|
||
|
elif ip_addr.version == 6:
|
||
|
for cidr, record in _dict_items(IANA_INFO['IPv6']):
|
||
|
if _within_bounds(ip_addr, cidr):
|
||
|
info.setdefault('IPv6', [])
|
||
|
info['IPv6'].append(record)
|
||
|
|
||
|
for cidr, record in _dict_items(IANA_INFO['IPv6_unicast']):
|
||
|
if _within_bounds(ip_addr, cidr):
|
||
|
info.setdefault('IPv6_unicast', [])
|
||
|
info['IPv6_unicast'].append(record)
|
||
|
|
||
|
return info
|
||
|
|
||
|
# On module import, read IANA data files and populate lookups dict.
|
||
|
load_info()
|