aboutsummaryrefslogtreecommitdiff
path: root/rpki/rcynic.py
diff options
context:
space:
mode:
Diffstat (limited to 'rpki/rcynic.py')
-rw-r--r--rpki/rcynic.py447
1 files changed, 227 insertions, 220 deletions
diff --git a/rpki/rcynic.py b/rpki/rcynic.py
index 10ad7516..c6ad60d5 100644
--- a/rpki/rcynic.py
+++ b/rpki/rcynic.py
@@ -25,251 +25,258 @@ import rpki.resource_set
from xml.etree.ElementTree import ElementTree
class UnknownObject(rpki.exceptions.RPKI_Exception):
- """
- Unrecognized object in rcynic result cache.
- """
+ """
+ Unrecognized object in rcynic result cache.
+ """
class NotRsyncURI(rpki.exceptions.RPKI_Exception):
- """
- URI is not an rsync URI.
- """
+ """
+ URI is not an rsync URI.
+ """
class rcynic_object(object):
- """
- An object read from rcynic cache.
- """
+ """
+ An object read from rcynic cache.
+ """
- def __init__(self, filename, **kwargs):
- self.filename = filename
- for k, v in kwargs.iteritems():
- setattr(self, k, v)
- self.obj = self.obj_class(DER_file = filename)
+ def __init__(self, filename, **kwargs):
+ self.filename = filename
+ for k, v in kwargs.iteritems():
+ setattr(self, k, v)
+ self.obj = self.obj_class(DER_file = filename) # pylint: disable=E1101
- def __repr__(self):
- return "<%s %s %s at 0x%x>" % (self.__class__.__name__, self.uri, self.resources, id(self))
+ def __repr__(self):
+ # pylint: disable=E1101
+ return "<%s %s %s at 0x%x>" % (self.__class__.__name__, self.uri, self.resources, id(self))
- def show_attrs(self, *attrs):
- """
- Print a bunch of object attributes, quietly ignoring any that
- might be missing.
- """
- for a in attrs:
- try:
- print "%s: %s" % (a.capitalize(), getattr(self, a))
- except AttributeError:
- pass
+ def show_attrs(self, *attrs):
+ """
+ Print a bunch of object attributes, quietly ignoring any that
+ might be missing.
+ """
- def show(self):
- """
- Print common object attributes.
- """
- self.show_attrs("filename", "uri", "status", "timestamp")
+ for a in attrs:
+ try:
+ print "%s: %s" % (a.capitalize(), getattr(self, a))
+ except AttributeError:
+ pass
+
+ def show(self):
+ """
+ Print common object attributes.
+ """
+
+ self.show_attrs("filename", "uri", "status", "timestamp")
class rcynic_certificate(rcynic_object):
- """
- A certificate from rcynic cache.
- """
-
- obj_class = rpki.x509.X509
-
- def __init__(self, filename, **kwargs):
- rcynic_object.__init__(self, filename, **kwargs)
- self.notBefore = self.obj.getNotBefore()
- self.notAfter = self.obj.getNotAfter()
- self.aia_uri = self.obj.get_aia_uri()
- self.sia_directory_uri = self.obj.get_sia_directory_uri()
- self.manifest_uri = self.obj.get_sia_manifest_uri()
- self.resources = self.obj.get_3779resources()
- self.is_ca = self.obj.is_CA()
- self.serial = self.obj.getSerial()
- self.issuer = self.obj.getIssuer()
- self.subject = self.obj.getSubject()
- self.ski = self.obj.hSKI()
- self.aki = self.obj.hAKI()
-
- def show(self):
"""
- Print certificate attributes.
+ A certificate from rcynic cache.
"""
- rcynic_object.show(self)
- self.show_attrs("notBefore", "notAfter", "aia_uri", "sia_directory_uri", "resources")
+
+ obj_class = rpki.x509.X509
+
+ def __init__(self, filename, **kwargs):
+ rcynic_object.__init__(self, filename, **kwargs)
+ self.notBefore = self.obj.getNotBefore()
+ self.notAfter = self.obj.getNotAfter()
+ self.aia_uri = self.obj.get_aia_uri()
+ self.sia_directory_uri = self.obj.get_sia_directory_uri()
+ self.manifest_uri = self.obj.get_sia_manifest_uri()
+ self.resources = self.obj.get_3779resources()
+ self.is_ca = self.obj.is_CA()
+ self.serial = self.obj.getSerial()
+ self.issuer = self.obj.getIssuer()
+ self.subject = self.obj.getSubject()
+ self.ski = self.obj.hSKI()
+ self.aki = self.obj.hAKI()
+
+ def show(self):
+ """
+ Print certificate attributes.
+ """
+
+ rcynic_object.show(self)
+ self.show_attrs("notBefore", "notAfter", "aia_uri", "sia_directory_uri", "resources")
class rcynic_roa(rcynic_object):
- """
- A ROA from rcynic cache.
- """
-
- obj_class = rpki.x509.ROA
-
- def __init__(self, filename, **kwargs):
- rcynic_object.__init__(self, filename, **kwargs)
- self.obj.extract()
- self.asID = self.obj.get_POW().getASID()
- self.prefix_sets = []
- v4, v6 = self.obj.get_POW().getPrefixes()
- if v4:
- self.prefix_sets.append(rpki.resource_set.roa_prefix_set_ipv4([
- rpki.resource_set.roa_prefix_ipv4(p[0], p[1], p[2]) for p in v4]))
- if v6:
- self.prefix_sets.append(rpki.resource_set.roa_prefix_set_ipv6([
- rpki.resource_set.roa_prefix_ipv6(p[0], p[1], p[2]) for p in v6]))
- self.ee = rpki.x509.X509(POW = self.obj.get_POW().certs()[0])
- self.notBefore = self.ee.getNotBefore()
- self.notAfter = self.ee.getNotAfter()
- self.aia_uri = self.ee.get_aia_uri()
- self.resources = self.ee.get_3779resources()
- self.issuer = self.ee.getIssuer()
- self.serial = self.ee.getSerial()
- self.subject = self.ee.getSubject()
- self.aki = self.ee.hAKI()
- self.ski = self.ee.hSKI()
-
- def show(self):
"""
- Print ROA attributes.
+ A ROA from rcynic cache.
"""
- rcynic_object.show(self)
- self.show_attrs("notBefore", "notAfter", "aia_uri", "resources", "asID")
- if self.prefix_sets:
- print "Prefixes:", ",".join(str(i) for i in self.prefix_sets)
+
+ obj_class = rpki.x509.ROA
+
+ def __init__(self, filename, **kwargs):
+ rcynic_object.__init__(self, filename, **kwargs)
+ self.obj.extract()
+ self.asID = self.obj.get_POW().getASID()
+ self.prefix_sets = []
+ v4, v6 = self.obj.get_POW().getPrefixes()
+ if v4:
+ self.prefix_sets.append(rpki.resource_set.roa_prefix_set_ipv4([
+ rpki.resource_set.roa_prefix_ipv4(p[0], p[1], p[2]) for p in v4]))
+ if v6:
+ self.prefix_sets.append(rpki.resource_set.roa_prefix_set_ipv6([
+ rpki.resource_set.roa_prefix_ipv6(p[0], p[1], p[2]) for p in v6]))
+ self.ee = rpki.x509.X509(POW = self.obj.get_POW().certs()[0])
+ self.notBefore = self.ee.getNotBefore()
+ self.notAfter = self.ee.getNotAfter()
+ self.aia_uri = self.ee.get_aia_uri()
+ self.resources = self.ee.get_3779resources()
+ self.issuer = self.ee.getIssuer()
+ self.serial = self.ee.getSerial()
+ self.subject = self.ee.getSubject()
+ self.aki = self.ee.hAKI()
+ self.ski = self.ee.hSKI()
+
+ def show(self):
+ """
+ Print ROA attributes.
+ """
+
+ rcynic_object.show(self)
+ self.show_attrs("notBefore", "notAfter", "aia_uri", "resources", "asID")
+ if self.prefix_sets:
+ print "Prefixes:", ",".join(str(i) for i in self.prefix_sets)
class rcynic_ghostbuster(rcynic_object):
- """
- Ghostbuster record from the rcynic cache.
- """
-
- obj_class = rpki.x509.Ghostbuster
-
- def __init__(self, *args, **kwargs):
- rcynic_object.__init__(self, *args, **kwargs)
- self.obj.extract()
- self.vcard = self.obj.get_content()
- self.ee = rpki.x509.X509(POW = self.obj.get_POW().certs()[0])
- self.notBefore = self.ee.getNotBefore()
- self.notAfter = self.ee.getNotAfter()
- self.aia_uri = self.ee.get_aia_uri()
- self.issuer = self.ee.getIssuer()
- self.serial = self.ee.getSerial()
- self.subject = self.ee.getSubject()
- self.aki = self.ee.hAKI()
- self.ski = self.ee.hSKI()
-
- def show(self):
- rcynic_object.show(self)
- self.show_attrs("notBefore", "notAfter", "vcard")
+ """
+ Ghostbuster record from the rcynic cache.
+ """
+
+ obj_class = rpki.x509.Ghostbuster
+
+ def __init__(self, *args, **kwargs):
+ rcynic_object.__init__(self, *args, **kwargs)
+ self.obj.extract()
+ self.vcard = self.obj.get_content()
+ self.ee = rpki.x509.X509(POW = self.obj.get_POW().certs()[0])
+ self.notBefore = self.ee.getNotBefore()
+ self.notAfter = self.ee.getNotAfter()
+ self.aia_uri = self.ee.get_aia_uri()
+ self.issuer = self.ee.getIssuer()
+ self.serial = self.ee.getSerial()
+ self.subject = self.ee.getSubject()
+ self.aki = self.ee.hAKI()
+ self.ski = self.ee.hSKI()
+
+ def show(self):
+ rcynic_object.show(self)
+ self.show_attrs("notBefore", "notAfter", "vcard")
file_name_classes = {
- ".cer" : rcynic_certificate,
- ".gbr" : rcynic_ghostbuster,
- ".roa" : rcynic_roa }
+ ".cer" : rcynic_certificate,
+ ".gbr" : rcynic_ghostbuster,
+ ".roa" : rcynic_roa }
class rcynic_file_iterator(object):
- """
- Iterate over files in an rcynic output tree, yielding a Python
- representation of each object found.
- """
-
- def __init__(self, rcynic_root,
- authenticated_subdir = "authenticated"):
- self.rcynic_dir = os.path.join(rcynic_root, authenticated_subdir)
-
- def __iter__(self):
- for root, dirs, files in os.walk(self.rcynic_dir): # pylint: disable=W0612
- for filename in files:
- filename = os.path.join(root, filename)
- ext = os.path.splitext(filename)[1]
- if ext in file_name_classes:
- yield file_name_classes[ext](filename)
+ """
+ Iterate over files in an rcynic output tree, yielding a Python
+ representation of each object found.
+ """
+
+ def __init__(self, rcynic_root,
+ authenticated_subdir = "authenticated"):
+ self.rcynic_dir = os.path.join(rcynic_root, authenticated_subdir)
+
+ def __iter__(self):
+ # pylint: disable=W0612
+ for root, dirs, files in os.walk(self.rcynic_dir):
+ for filename in files:
+ filename = os.path.join(root, filename)
+ ext = os.path.splitext(filename)[1]
+ if ext in file_name_classes:
+ yield file_name_classes[ext](filename)
class validation_status_element(object):
- def __init__(self, *args, **kwargs):
- self.attrs = []
- for k, v in kwargs.iteritems():
- setattr(self, k, v)
- # attribute names are saved so that the __repr__ method can
- # display the subset of attributes the user specified
- self.attrs.append(k)
- self._obj = None
-
- def get_obj(self):
- if not self._obj:
- self._obj = self.file_class(filename=self.filename, uri=self.uri)
- return self._obj
-
- def __repr__(self):
- v = [self.__class__.__name__, 'id=%s' % str(id(self))]
- v.extend(['%s=%s' % (x, getattr(self, x)) for x in self.attrs])
- return '<%s>' % (' '.join(v),)
-
- obj = property(get_obj)
+ def __init__(self, *args, **kwargs):
+ self.attrs = []
+ for k, v in kwargs.iteritems():
+ setattr(self, k, v)
+ # attribute names are saved so that the __repr__ method can
+ # display the subset of attributes the user specified
+ self.attrs.append(k)
+ self._obj = None
+
+ def get_obj(self):
+ # pylint: disable=E1101
+ if not self._obj:
+ self._obj = self.file_class(filename=self.filename, uri=self.uri)
+ return self._obj
+
+ def __repr__(self):
+ v = [self.__class__.__name__, 'id=%s' % str(id(self))]
+ v.extend(['%s=%s' % (x, getattr(self, x)) for x in self.attrs])
+ return '<%s>' % (' '.join(v),)
+
+ obj = property(get_obj)
class rcynic_xml_iterator(object):
- """
- Iterate over validation_status entries in the XML output from an
- rcynic run. Yields a tuple for each entry:
-
- timestamp, generation, status, object
-
- where URI, status, and timestamp are the corresponding values from
- the XML element, OK is a boolean indicating whether validation was
- considered succesful, and object is a Python representation of the
- object in question. If OK is True, object will be from rcynic's
- authenticated output tree; otherwise, object will be from rcynic's
- unauthenticated output tree.
-
- Note that it is possible for the same URI to appear in more than one
- validation_status element; in such cases, the succesful case (OK
- True) should be the last entry (as rcynic will stop trying once it
- gets a good copy), but there may be multiple failures, which might
- or might not have different status codes.
- """
-
- def __init__(self, rcynic_root, xml_file,
- authenticated_old_subdir = "authenticated.old",
- unauthenticated_subdir = "unauthenticated"):
- self.rcynic_root = rcynic_root
- self.xml_file = xml_file
- self.authenticated_subdir = os.path.join(rcynic_root, 'authenticated')
- self.authenticated_old_subdir = os.path.join(rcynic_root, authenticated_old_subdir)
- self.unauthenticated_subdir = os.path.join(rcynic_root, unauthenticated_subdir)
-
- base_uri = "rsync://"
-
- def uri_to_filename(self, uri):
- if uri.startswith(self.base_uri):
- return uri[len(self.base_uri):]
- else:
- raise NotRsyncURI("Not an rsync URI %r" % uri)
-
- def __iter__(self):
- for validation_status in ElementTree(file=self.xml_file).getroot().getiterator("validation_status"):
- timestamp = validation_status.get("timestamp")
- status = validation_status.get("status")
- uri = validation_status.text.strip()
- generation = validation_status.get("generation")
-
- # determine the path to this object
- if status == 'object_accepted':
- d = self.authenticated_subdir
- elif generation == 'backup':
- d = self.authenticated_old_subdir
- else:
- d = self.unauthenticated_subdir
-
- filename = os.path.join(d, self.uri_to_filename(uri))
-
- ext = os.path.splitext(filename)[1]
- if ext in file_name_classes:
- yield validation_status_element(timestamp = timestamp, generation = generation,
- uri=uri, status = status, filename = filename,
- file_class = file_name_classes[ext])
+ """
+ Iterate over validation_status entries in the XML output from an
+ rcynic run. Yields a tuple for each entry:
+
+ timestamp, generation, status, object
+
+ where URI, status, and timestamp are the corresponding values from
+ the XML element, OK is a boolean indicating whether validation was
+ considered succesful, and object is a Python representation of the
+ object in question. If OK is True, object will be from rcynic's
+ authenticated output tree; otherwise, object will be from rcynic's
+ unauthenticated output tree.
+
+ Note that it is possible for the same URI to appear in more than one
+ validation_status element; in such cases, the succesful case (OK
+ True) should be the last entry (as rcynic will stop trying once it
+ gets a good copy), but there may be multiple failures, which might
+ or might not have different status codes.
+ """
+
+ def __init__(self, rcynic_root, xml_file,
+ authenticated_old_subdir = "authenticated.old",
+ unauthenticated_subdir = "unauthenticated"):
+ self.rcynic_root = rcynic_root
+ self.xml_file = xml_file
+ self.authenticated_subdir = os.path.join(rcynic_root, 'authenticated')
+ self.authenticated_old_subdir = os.path.join(rcynic_root, authenticated_old_subdir)
+ self.unauthenticated_subdir = os.path.join(rcynic_root, unauthenticated_subdir)
+
+ base_uri = "rsync://"
+
+ def uri_to_filename(self, uri):
+ if uri.startswith(self.base_uri):
+ return uri[len(self.base_uri):]
+ else:
+ raise NotRsyncURI("Not an rsync URI %r" % uri)
+
+ def __iter__(self):
+ for validation_status in ElementTree(file=self.xml_file).getroot().getiterator("validation_status"):
+ timestamp = validation_status.get("timestamp")
+ status = validation_status.get("status")
+ uri = validation_status.text.strip()
+ generation = validation_status.get("generation")
+
+ # determine the path to this object
+ if status == 'object_accepted':
+ d = self.authenticated_subdir
+ elif generation == 'backup':
+ d = self.authenticated_old_subdir
+ else:
+ d = self.unauthenticated_subdir
+
+ filename = os.path.join(d, self.uri_to_filename(uri))
+
+ ext = os.path.splitext(filename)[1]
+ if ext in file_name_classes:
+ yield validation_status_element(timestamp = timestamp, generation = generation,
+ uri=uri, status = status, filename = filename,
+ file_class = file_name_classes[ext])
def label_iterator(xml_file):
- """
- Returns an iterator which contains all defined labels from an rcynic XML
- output file. Each item is a tuple of the form
- (label, kind, description).
- """
-
- for label in ElementTree(file=xml_file).find("labels"):
- yield label.tag, label.get("kind"), label.text.strip()
+ """
+ Returns an iterator which contains all defined labels from an rcynic XML
+ output file. Each item is a tuple of the form
+ (label, kind, description).
+ """
+
+ for label in ElementTree(file=xml_file).find("labels"):
+ yield label.tag, label.get("kind"), label.text.strip()