diff options
Diffstat (limited to 'rpki/rcynic.py')
-rw-r--r-- | rpki/rcynic.py | 447 |
1 files changed, 227 insertions, 220 deletions
diff --git a/rpki/rcynic.py b/rpki/rcynic.py index 10ad7516..c6ad60d5 100644 --- a/rpki/rcynic.py +++ b/rpki/rcynic.py @@ -25,251 +25,258 @@ import rpki.resource_set from xml.etree.ElementTree import ElementTree class UnknownObject(rpki.exceptions.RPKI_Exception): - """ - Unrecognized object in rcynic result cache. - """ + """ + Unrecognized object in rcynic result cache. + """ class NotRsyncURI(rpki.exceptions.RPKI_Exception): - """ - URI is not an rsync URI. - """ + """ + URI is not an rsync URI. + """ class rcynic_object(object): - """ - An object read from rcynic cache. - """ + """ + An object read from rcynic cache. + """ - def __init__(self, filename, **kwargs): - self.filename = filename - for k, v in kwargs.iteritems(): - setattr(self, k, v) - self.obj = self.obj_class(DER_file = filename) + def __init__(self, filename, **kwargs): + self.filename = filename + for k, v in kwargs.iteritems(): + setattr(self, k, v) + self.obj = self.obj_class(DER_file = filename) # pylint: disable=E1101 - def __repr__(self): - return "<%s %s %s at 0x%x>" % (self.__class__.__name__, self.uri, self.resources, id(self)) + def __repr__(self): + # pylint: disable=E1101 + return "<%s %s %s at 0x%x>" % (self.__class__.__name__, self.uri, self.resources, id(self)) - def show_attrs(self, *attrs): - """ - Print a bunch of object attributes, quietly ignoring any that - might be missing. - """ - for a in attrs: - try: - print "%s: %s" % (a.capitalize(), getattr(self, a)) - except AttributeError: - pass + def show_attrs(self, *attrs): + """ + Print a bunch of object attributes, quietly ignoring any that + might be missing. + """ - def show(self): - """ - Print common object attributes. - """ - self.show_attrs("filename", "uri", "status", "timestamp") + for a in attrs: + try: + print "%s: %s" % (a.capitalize(), getattr(self, a)) + except AttributeError: + pass + + def show(self): + """ + Print common object attributes. + """ + + self.show_attrs("filename", "uri", "status", "timestamp") class rcynic_certificate(rcynic_object): - """ - A certificate from rcynic cache. - """ - - obj_class = rpki.x509.X509 - - def __init__(self, filename, **kwargs): - rcynic_object.__init__(self, filename, **kwargs) - self.notBefore = self.obj.getNotBefore() - self.notAfter = self.obj.getNotAfter() - self.aia_uri = self.obj.get_aia_uri() - self.sia_directory_uri = self.obj.get_sia_directory_uri() - self.manifest_uri = self.obj.get_sia_manifest_uri() - self.resources = self.obj.get_3779resources() - self.is_ca = self.obj.is_CA() - self.serial = self.obj.getSerial() - self.issuer = self.obj.getIssuer() - self.subject = self.obj.getSubject() - self.ski = self.obj.hSKI() - self.aki = self.obj.hAKI() - - def show(self): """ - Print certificate attributes. + A certificate from rcynic cache. """ - rcynic_object.show(self) - self.show_attrs("notBefore", "notAfter", "aia_uri", "sia_directory_uri", "resources") + + obj_class = rpki.x509.X509 + + def __init__(self, filename, **kwargs): + rcynic_object.__init__(self, filename, **kwargs) + self.notBefore = self.obj.getNotBefore() + self.notAfter = self.obj.getNotAfter() + self.aia_uri = self.obj.get_aia_uri() + self.sia_directory_uri = self.obj.get_sia_directory_uri() + self.manifest_uri = self.obj.get_sia_manifest_uri() + self.resources = self.obj.get_3779resources() + self.is_ca = self.obj.is_CA() + self.serial = self.obj.getSerial() + self.issuer = self.obj.getIssuer() + self.subject = self.obj.getSubject() + self.ski = self.obj.hSKI() + self.aki = self.obj.hAKI() + + def show(self): + """ + Print certificate attributes. + """ + + rcynic_object.show(self) + self.show_attrs("notBefore", "notAfter", "aia_uri", "sia_directory_uri", "resources") class rcynic_roa(rcynic_object): - """ - A ROA from rcynic cache. - """ - - obj_class = rpki.x509.ROA - - def __init__(self, filename, **kwargs): - rcynic_object.__init__(self, filename, **kwargs) - self.obj.extract() - self.asID = self.obj.get_POW().getASID() - self.prefix_sets = [] - v4, v6 = self.obj.get_POW().getPrefixes() - if v4: - self.prefix_sets.append(rpki.resource_set.roa_prefix_set_ipv4([ - rpki.resource_set.roa_prefix_ipv4(p[0], p[1], p[2]) for p in v4])) - if v6: - self.prefix_sets.append(rpki.resource_set.roa_prefix_set_ipv6([ - rpki.resource_set.roa_prefix_ipv6(p[0], p[1], p[2]) for p in v6])) - self.ee = rpki.x509.X509(POW = self.obj.get_POW().certs()[0]) - self.notBefore = self.ee.getNotBefore() - self.notAfter = self.ee.getNotAfter() - self.aia_uri = self.ee.get_aia_uri() - self.resources = self.ee.get_3779resources() - self.issuer = self.ee.getIssuer() - self.serial = self.ee.getSerial() - self.subject = self.ee.getSubject() - self.aki = self.ee.hAKI() - self.ski = self.ee.hSKI() - - def show(self): """ - Print ROA attributes. + A ROA from rcynic cache. """ - rcynic_object.show(self) - self.show_attrs("notBefore", "notAfter", "aia_uri", "resources", "asID") - if self.prefix_sets: - print "Prefixes:", ",".join(str(i) for i in self.prefix_sets) + + obj_class = rpki.x509.ROA + + def __init__(self, filename, **kwargs): + rcynic_object.__init__(self, filename, **kwargs) + self.obj.extract() + self.asID = self.obj.get_POW().getASID() + self.prefix_sets = [] + v4, v6 = self.obj.get_POW().getPrefixes() + if v4: + self.prefix_sets.append(rpki.resource_set.roa_prefix_set_ipv4([ + rpki.resource_set.roa_prefix_ipv4(p[0], p[1], p[2]) for p in v4])) + if v6: + self.prefix_sets.append(rpki.resource_set.roa_prefix_set_ipv6([ + rpki.resource_set.roa_prefix_ipv6(p[0], p[1], p[2]) for p in v6])) + self.ee = rpki.x509.X509(POW = self.obj.get_POW().certs()[0]) + self.notBefore = self.ee.getNotBefore() + self.notAfter = self.ee.getNotAfter() + self.aia_uri = self.ee.get_aia_uri() + self.resources = self.ee.get_3779resources() + self.issuer = self.ee.getIssuer() + self.serial = self.ee.getSerial() + self.subject = self.ee.getSubject() + self.aki = self.ee.hAKI() + self.ski = self.ee.hSKI() + + def show(self): + """ + Print ROA attributes. + """ + + rcynic_object.show(self) + self.show_attrs("notBefore", "notAfter", "aia_uri", "resources", "asID") + if self.prefix_sets: + print "Prefixes:", ",".join(str(i) for i in self.prefix_sets) class rcynic_ghostbuster(rcynic_object): - """ - Ghostbuster record from the rcynic cache. - """ - - obj_class = rpki.x509.Ghostbuster - - def __init__(self, *args, **kwargs): - rcynic_object.__init__(self, *args, **kwargs) - self.obj.extract() - self.vcard = self.obj.get_content() - self.ee = rpki.x509.X509(POW = self.obj.get_POW().certs()[0]) - self.notBefore = self.ee.getNotBefore() - self.notAfter = self.ee.getNotAfter() - self.aia_uri = self.ee.get_aia_uri() - self.issuer = self.ee.getIssuer() - self.serial = self.ee.getSerial() - self.subject = self.ee.getSubject() - self.aki = self.ee.hAKI() - self.ski = self.ee.hSKI() - - def show(self): - rcynic_object.show(self) - self.show_attrs("notBefore", "notAfter", "vcard") + """ + Ghostbuster record from the rcynic cache. + """ + + obj_class = rpki.x509.Ghostbuster + + def __init__(self, *args, **kwargs): + rcynic_object.__init__(self, *args, **kwargs) + self.obj.extract() + self.vcard = self.obj.get_content() + self.ee = rpki.x509.X509(POW = self.obj.get_POW().certs()[0]) + self.notBefore = self.ee.getNotBefore() + self.notAfter = self.ee.getNotAfter() + self.aia_uri = self.ee.get_aia_uri() + self.issuer = self.ee.getIssuer() + self.serial = self.ee.getSerial() + self.subject = self.ee.getSubject() + self.aki = self.ee.hAKI() + self.ski = self.ee.hSKI() + + def show(self): + rcynic_object.show(self) + self.show_attrs("notBefore", "notAfter", "vcard") file_name_classes = { - ".cer" : rcynic_certificate, - ".gbr" : rcynic_ghostbuster, - ".roa" : rcynic_roa } + ".cer" : rcynic_certificate, + ".gbr" : rcynic_ghostbuster, + ".roa" : rcynic_roa } class rcynic_file_iterator(object): - """ - Iterate over files in an rcynic output tree, yielding a Python - representation of each object found. - """ - - def __init__(self, rcynic_root, - authenticated_subdir = "authenticated"): - self.rcynic_dir = os.path.join(rcynic_root, authenticated_subdir) - - def __iter__(self): - for root, dirs, files in os.walk(self.rcynic_dir): # pylint: disable=W0612 - for filename in files: - filename = os.path.join(root, filename) - ext = os.path.splitext(filename)[1] - if ext in file_name_classes: - yield file_name_classes[ext](filename) + """ + Iterate over files in an rcynic output tree, yielding a Python + representation of each object found. + """ + + def __init__(self, rcynic_root, + authenticated_subdir = "authenticated"): + self.rcynic_dir = os.path.join(rcynic_root, authenticated_subdir) + + def __iter__(self): + # pylint: disable=W0612 + for root, dirs, files in os.walk(self.rcynic_dir): + for filename in files: + filename = os.path.join(root, filename) + ext = os.path.splitext(filename)[1] + if ext in file_name_classes: + yield file_name_classes[ext](filename) class validation_status_element(object): - def __init__(self, *args, **kwargs): - self.attrs = [] - for k, v in kwargs.iteritems(): - setattr(self, k, v) - # attribute names are saved so that the __repr__ method can - # display the subset of attributes the user specified - self.attrs.append(k) - self._obj = None - - def get_obj(self): - if not self._obj: - self._obj = self.file_class(filename=self.filename, uri=self.uri) - return self._obj - - def __repr__(self): - v = [self.__class__.__name__, 'id=%s' % str(id(self))] - v.extend(['%s=%s' % (x, getattr(self, x)) for x in self.attrs]) - return '<%s>' % (' '.join(v),) - - obj = property(get_obj) + def __init__(self, *args, **kwargs): + self.attrs = [] + for k, v in kwargs.iteritems(): + setattr(self, k, v) + # attribute names are saved so that the __repr__ method can + # display the subset of attributes the user specified + self.attrs.append(k) + self._obj = None + + def get_obj(self): + # pylint: disable=E1101 + if not self._obj: + self._obj = self.file_class(filename=self.filename, uri=self.uri) + return self._obj + + def __repr__(self): + v = [self.__class__.__name__, 'id=%s' % str(id(self))] + v.extend(['%s=%s' % (x, getattr(self, x)) for x in self.attrs]) + return '<%s>' % (' '.join(v),) + + obj = property(get_obj) class rcynic_xml_iterator(object): - """ - Iterate over validation_status entries in the XML output from an - rcynic run. Yields a tuple for each entry: - - timestamp, generation, status, object - - where URI, status, and timestamp are the corresponding values from - the XML element, OK is a boolean indicating whether validation was - considered succesful, and object is a Python representation of the - object in question. If OK is True, object will be from rcynic's - authenticated output tree; otherwise, object will be from rcynic's - unauthenticated output tree. - - Note that it is possible for the same URI to appear in more than one - validation_status element; in such cases, the succesful case (OK - True) should be the last entry (as rcynic will stop trying once it - gets a good copy), but there may be multiple failures, which might - or might not have different status codes. - """ - - def __init__(self, rcynic_root, xml_file, - authenticated_old_subdir = "authenticated.old", - unauthenticated_subdir = "unauthenticated"): - self.rcynic_root = rcynic_root - self.xml_file = xml_file - self.authenticated_subdir = os.path.join(rcynic_root, 'authenticated') - self.authenticated_old_subdir = os.path.join(rcynic_root, authenticated_old_subdir) - self.unauthenticated_subdir = os.path.join(rcynic_root, unauthenticated_subdir) - - base_uri = "rsync://" - - def uri_to_filename(self, uri): - if uri.startswith(self.base_uri): - return uri[len(self.base_uri):] - else: - raise NotRsyncURI("Not an rsync URI %r" % uri) - - def __iter__(self): - for validation_status in ElementTree(file=self.xml_file).getroot().getiterator("validation_status"): - timestamp = validation_status.get("timestamp") - status = validation_status.get("status") - uri = validation_status.text.strip() - generation = validation_status.get("generation") - - # determine the path to this object - if status == 'object_accepted': - d = self.authenticated_subdir - elif generation == 'backup': - d = self.authenticated_old_subdir - else: - d = self.unauthenticated_subdir - - filename = os.path.join(d, self.uri_to_filename(uri)) - - ext = os.path.splitext(filename)[1] - if ext in file_name_classes: - yield validation_status_element(timestamp = timestamp, generation = generation, - uri=uri, status = status, filename = filename, - file_class = file_name_classes[ext]) + """ + Iterate over validation_status entries in the XML output from an + rcynic run. Yields a tuple for each entry: + + timestamp, generation, status, object + + where URI, status, and timestamp are the corresponding values from + the XML element, OK is a boolean indicating whether validation was + considered succesful, and object is a Python representation of the + object in question. If OK is True, object will be from rcynic's + authenticated output tree; otherwise, object will be from rcynic's + unauthenticated output tree. + + Note that it is possible for the same URI to appear in more than one + validation_status element; in such cases, the succesful case (OK + True) should be the last entry (as rcynic will stop trying once it + gets a good copy), but there may be multiple failures, which might + or might not have different status codes. + """ + + def __init__(self, rcynic_root, xml_file, + authenticated_old_subdir = "authenticated.old", + unauthenticated_subdir = "unauthenticated"): + self.rcynic_root = rcynic_root + self.xml_file = xml_file + self.authenticated_subdir = os.path.join(rcynic_root, 'authenticated') + self.authenticated_old_subdir = os.path.join(rcynic_root, authenticated_old_subdir) + self.unauthenticated_subdir = os.path.join(rcynic_root, unauthenticated_subdir) + + base_uri = "rsync://" + + def uri_to_filename(self, uri): + if uri.startswith(self.base_uri): + return uri[len(self.base_uri):] + else: + raise NotRsyncURI("Not an rsync URI %r" % uri) + + def __iter__(self): + for validation_status in ElementTree(file=self.xml_file).getroot().getiterator("validation_status"): + timestamp = validation_status.get("timestamp") + status = validation_status.get("status") + uri = validation_status.text.strip() + generation = validation_status.get("generation") + + # determine the path to this object + if status == 'object_accepted': + d = self.authenticated_subdir + elif generation == 'backup': + d = self.authenticated_old_subdir + else: + d = self.unauthenticated_subdir + + filename = os.path.join(d, self.uri_to_filename(uri)) + + ext = os.path.splitext(filename)[1] + if ext in file_name_classes: + yield validation_status_element(timestamp = timestamp, generation = generation, + uri=uri, status = status, filename = filename, + file_class = file_name_classes[ext]) def label_iterator(xml_file): - """ - Returns an iterator which contains all defined labels from an rcynic XML - output file. Each item is a tuple of the form - (label, kind, description). - """ - - for label in ElementTree(file=xml_file).find("labels"): - yield label.tag, label.get("kind"), label.text.strip() + """ + Returns an iterator which contains all defined labels from an rcynic XML + output file. Each item is a tuple of the form + (label, kind, description). + """ + + for label in ElementTree(file=xml_file).find("labels"): + yield label.tag, label.get("kind"), label.text.strip() |