diff options
author | Rob Austein <sra@hactrn.net> | 2015-10-26 06:29:00 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2015-10-26 06:29:00 +0000 |
commit | b46deb1417dc3596e9ac9fe2fe8cc0b7f42457e7 (patch) | |
tree | ca0dc0276d1adc168bc3337ce0564c4ec4957c1b /rpki/rcynic.py | |
parent | 397beaf6d9900dc3b3cb612c89ebf1d57b1d16f6 (diff) |
"Any programmer who fails to comply with the standard naming, formatting,
or commenting conventions should be shot. If it so happens that it is
inconvenient to shoot him, then he is to be politely requested to recode
his program in adherence to the above standard."
-- Michael Spier, Digital Equipment Corporation
svn path=/branches/tk705/; revision=6152
Diffstat (limited to 'rpki/rcynic.py')
-rw-r--r-- | rpki/rcynic.py | 434 |
1 files changed, 217 insertions, 217 deletions
diff --git a/rpki/rcynic.py b/rpki/rcynic.py index a36e4a4e..3307e926 100644 --- a/rpki/rcynic.py +++ b/rpki/rcynic.py @@ -25,142 +25,142 @@ import rpki.resource_set from xml.etree.ElementTree import ElementTree class UnknownObject(rpki.exceptions.RPKI_Exception): - """ - Unrecognized object in rcynic result cache. - """ + """ + Unrecognized object in rcynic result cache. + """ class NotRsyncURI(rpki.exceptions.RPKI_Exception): - """ - URI is not an rsync URI. - """ + """ + URI is not an rsync URI. + """ class rcynic_object(object): - """ - An object read from rcynic cache. - """ + """ + An object read from rcynic cache. + """ - def __init__(self, filename, **kwargs): - self.filename = filename - for k, v in kwargs.iteritems(): - setattr(self, k, v) - self.obj = self.obj_class(DER_file = filename) + def __init__(self, filename, **kwargs): + self.filename = filename + for k, v in kwargs.iteritems(): + setattr(self, k, v) + self.obj = self.obj_class(DER_file = filename) - def __repr__(self): - return "<%s %s %s at 0x%x>" % (self.__class__.__name__, self.uri, self.resources, id(self)) + def __repr__(self): + return "<%s %s %s at 0x%x>" % (self.__class__.__name__, self.uri, self.resources, id(self)) - def show_attrs(self, *attrs): - """ - Print a bunch of object attributes, quietly ignoring any that - might be missing. - """ + def show_attrs(self, *attrs): + """ + Print a bunch of object attributes, quietly ignoring any that + might be missing. + """ - for a in attrs: - try: - print "%s: %s" % (a.capitalize(), getattr(self, a)) - except AttributeError: - pass + for a in attrs: + try: + print "%s: %s" % (a.capitalize(), getattr(self, a)) + except AttributeError: + pass - def show(self): - """ - Print common object attributes. - """ + def show(self): + """ + Print common object attributes. + """ - self.show_attrs("filename", "uri", "status", "timestamp") + self.show_attrs("filename", "uri", "status", "timestamp") class rcynic_certificate(rcynic_object): - """ - A certificate from rcynic cache. - """ - - obj_class = rpki.x509.X509 - - def __init__(self, filename, **kwargs): - rcynic_object.__init__(self, filename, **kwargs) - self.notBefore = self.obj.getNotBefore() - self.notAfter = self.obj.getNotAfter() - self.aia_uri = self.obj.get_aia_uri() - self.sia_directory_uri = self.obj.get_sia_directory_uri() - self.manifest_uri = self.obj.get_sia_manifest_uri() - self.resources = self.obj.get_3779resources() - self.is_ca = self.obj.is_CA() - self.serial = self.obj.getSerial() - self.issuer = self.obj.getIssuer() - self.subject = self.obj.getSubject() - self.ski = self.obj.hSKI() - self.aki = self.obj.hAKI() - - def show(self): """ - Print certificate attributes. + A certificate from rcynic cache. """ - rcynic_object.show(self) - self.show_attrs("notBefore", "notAfter", "aia_uri", "sia_directory_uri", "resources") + obj_class = rpki.x509.X509 + + def __init__(self, filename, **kwargs): + rcynic_object.__init__(self, filename, **kwargs) + self.notBefore = self.obj.getNotBefore() + self.notAfter = self.obj.getNotAfter() + self.aia_uri = self.obj.get_aia_uri() + self.sia_directory_uri = self.obj.get_sia_directory_uri() + self.manifest_uri = self.obj.get_sia_manifest_uri() + self.resources = self.obj.get_3779resources() + self.is_ca = self.obj.is_CA() + self.serial = self.obj.getSerial() + self.issuer = self.obj.getIssuer() + self.subject = self.obj.getSubject() + self.ski = self.obj.hSKI() + self.aki = self.obj.hAKI() + + def show(self): + """ + Print certificate attributes. + """ + + rcynic_object.show(self) + self.show_attrs("notBefore", "notAfter", "aia_uri", "sia_directory_uri", "resources") class rcynic_roa(rcynic_object): - """ - A ROA from rcynic cache. - """ - - obj_class = rpki.x509.ROA - - def __init__(self, filename, **kwargs): - rcynic_object.__init__(self, filename, **kwargs) - self.obj.extract() - self.asID = self.obj.get_POW().getASID() - self.prefix_sets = [] - v4, v6 = self.obj.get_POW().getPrefixes() - if v4: - self.prefix_sets.append(rpki.resource_set.roa_prefix_set_ipv4([ - rpki.resource_set.roa_prefix_ipv4(p[0], p[1], p[2]) for p in v4])) - if v6: - self.prefix_sets.append(rpki.resource_set.roa_prefix_set_ipv6([ - rpki.resource_set.roa_prefix_ipv6(p[0], p[1], p[2]) for p in v6])) - self.ee = rpki.x509.X509(POW = self.obj.get_POW().certs()[0]) - self.notBefore = self.ee.getNotBefore() - self.notAfter = self.ee.getNotAfter() - self.aia_uri = self.ee.get_aia_uri() - self.resources = self.ee.get_3779resources() - self.issuer = self.ee.getIssuer() - self.serial = self.ee.getSerial() - self.subject = self.ee.getSubject() - self.aki = self.ee.hAKI() - self.ski = self.ee.hSKI() - - def show(self): """ - Print ROA attributes. + A ROA from rcynic cache. """ - rcynic_object.show(self) - self.show_attrs("notBefore", "notAfter", "aia_uri", "resources", "asID") - if self.prefix_sets: - print "Prefixes:", ",".join(str(i) for i in self.prefix_sets) + obj_class = rpki.x509.ROA + + def __init__(self, filename, **kwargs): + rcynic_object.__init__(self, filename, **kwargs) + self.obj.extract() + self.asID = self.obj.get_POW().getASID() + self.prefix_sets = [] + v4, v6 = self.obj.get_POW().getPrefixes() + if v4: + self.prefix_sets.append(rpki.resource_set.roa_prefix_set_ipv4([ + rpki.resource_set.roa_prefix_ipv4(p[0], p[1], p[2]) for p in v4])) + if v6: + self.prefix_sets.append(rpki.resource_set.roa_prefix_set_ipv6([ + rpki.resource_set.roa_prefix_ipv6(p[0], p[1], p[2]) for p in v6])) + self.ee = rpki.x509.X509(POW = self.obj.get_POW().certs()[0]) + self.notBefore = self.ee.getNotBefore() + self.notAfter = self.ee.getNotAfter() + self.aia_uri = self.ee.get_aia_uri() + self.resources = self.ee.get_3779resources() + self.issuer = self.ee.getIssuer() + self.serial = self.ee.getSerial() + self.subject = self.ee.getSubject() + self.aki = self.ee.hAKI() + self.ski = self.ee.hSKI() + + def show(self): + """ + Print ROA attributes. + """ + + rcynic_object.show(self) + self.show_attrs("notBefore", "notAfter", "aia_uri", "resources", "asID") + if self.prefix_sets: + print "Prefixes:", ",".join(str(i) for i in self.prefix_sets) class rcynic_ghostbuster(rcynic_object): - """ - Ghostbuster record from the rcynic cache. - """ - - obj_class = rpki.x509.Ghostbuster - - def __init__(self, *args, **kwargs): - rcynic_object.__init__(self, *args, **kwargs) - self.obj.extract() - self.vcard = self.obj.get_content() - self.ee = rpki.x509.X509(POW = self.obj.get_POW().certs()[0]) - self.notBefore = self.ee.getNotBefore() - self.notAfter = self.ee.getNotAfter() - self.aia_uri = self.ee.get_aia_uri() - self.issuer = self.ee.getIssuer() - self.serial = self.ee.getSerial() - self.subject = self.ee.getSubject() - self.aki = self.ee.hAKI() - self.ski = self.ee.hSKI() - - def show(self): - rcynic_object.show(self) - self.show_attrs("notBefore", "notAfter", "vcard") + """ + Ghostbuster record from the rcynic cache. + """ + + obj_class = rpki.x509.Ghostbuster + + def __init__(self, *args, **kwargs): + rcynic_object.__init__(self, *args, **kwargs) + self.obj.extract() + self.vcard = self.obj.get_content() + self.ee = rpki.x509.X509(POW = self.obj.get_POW().certs()[0]) + self.notBefore = self.ee.getNotBefore() + self.notAfter = self.ee.getNotAfter() + self.aia_uri = self.ee.get_aia_uri() + self.issuer = self.ee.getIssuer() + self.serial = self.ee.getSerial() + self.subject = self.ee.getSubject() + self.aki = self.ee.hAKI() + self.ski = self.ee.hSKI() + + def show(self): + rcynic_object.show(self) + self.show_attrs("notBefore", "notAfter", "vcard") file_name_classes = { ".cer" : rcynic_certificate, @@ -168,112 +168,112 @@ file_name_classes = { ".roa" : rcynic_roa } class rcynic_file_iterator(object): - """ - Iterate over files in an rcynic output tree, yielding a Python - representation of each object found. - """ - - def __init__(self, rcynic_root, - authenticated_subdir = "authenticated"): - self.rcynic_dir = os.path.join(rcynic_root, authenticated_subdir) - - def __iter__(self): - for root, dirs, files in os.walk(self.rcynic_dir): # pylint: disable=W0612 - for filename in files: - filename = os.path.join(root, filename) - ext = os.path.splitext(filename)[1] - if ext in file_name_classes: - yield file_name_classes[ext](filename) + """ + Iterate over files in an rcynic output tree, yielding a Python + representation of each object found. + """ + + def __init__(self, rcynic_root, + authenticated_subdir = "authenticated"): + self.rcynic_dir = os.path.join(rcynic_root, authenticated_subdir) + + def __iter__(self): + for root, dirs, files in os.walk(self.rcynic_dir): # pylint: disable=W0612 + for filename in files: + filename = os.path.join(root, filename) + ext = os.path.splitext(filename)[1] + if ext in file_name_classes: + yield file_name_classes[ext](filename) class validation_status_element(object): - def __init__(self, *args, **kwargs): - self.attrs = [] - for k, v in kwargs.iteritems(): - setattr(self, k, v) - # attribute names are saved so that the __repr__ method can - # display the subset of attributes the user specified - self.attrs.append(k) - self._obj = None - - def get_obj(self): - if not self._obj: - self._obj = self.file_class(filename=self.filename, uri=self.uri) - return self._obj - - def __repr__(self): - v = [self.__class__.__name__, 'id=%s' % str(id(self))] - v.extend(['%s=%s' % (x, getattr(self, x)) for x in self.attrs]) - return '<%s>' % (' '.join(v),) - - obj = property(get_obj) + def __init__(self, *args, **kwargs): + self.attrs = [] + for k, v in kwargs.iteritems(): + setattr(self, k, v) + # attribute names are saved so that the __repr__ method can + # display the subset of attributes the user specified + self.attrs.append(k) + self._obj = None + + def get_obj(self): + if not self._obj: + self._obj = self.file_class(filename=self.filename, uri=self.uri) + return self._obj + + def __repr__(self): + v = [self.__class__.__name__, 'id=%s' % str(id(self))] + v.extend(['%s=%s' % (x, getattr(self, x)) for x in self.attrs]) + return '<%s>' % (' '.join(v),) + + obj = property(get_obj) class rcynic_xml_iterator(object): - """ - Iterate over validation_status entries in the XML output from an - rcynic run. Yields a tuple for each entry: - - timestamp, generation, status, object - - where URI, status, and timestamp are the corresponding values from - the XML element, OK is a boolean indicating whether validation was - considered succesful, and object is a Python representation of the - object in question. If OK is True, object will be from rcynic's - authenticated output tree; otherwise, object will be from rcynic's - unauthenticated output tree. - - Note that it is possible for the same URI to appear in more than one - validation_status element; in such cases, the succesful case (OK - True) should be the last entry (as rcynic will stop trying once it - gets a good copy), but there may be multiple failures, which might - or might not have different status codes. - """ - - def __init__(self, rcynic_root, xml_file, - authenticated_old_subdir = "authenticated.old", - unauthenticated_subdir = "unauthenticated"): - self.rcynic_root = rcynic_root - self.xml_file = xml_file - self.authenticated_subdir = os.path.join(rcynic_root, 'authenticated') - self.authenticated_old_subdir = os.path.join(rcynic_root, authenticated_old_subdir) - self.unauthenticated_subdir = os.path.join(rcynic_root, unauthenticated_subdir) - - base_uri = "rsync://" - - def uri_to_filename(self, uri): - if uri.startswith(self.base_uri): - return uri[len(self.base_uri):] - else: - raise NotRsyncURI("Not an rsync URI %r" % uri) - - def __iter__(self): - for validation_status in ElementTree(file=self.xml_file).getroot().getiterator("validation_status"): - timestamp = validation_status.get("timestamp") - status = validation_status.get("status") - uri = validation_status.text.strip() - generation = validation_status.get("generation") - - # determine the path to this object - if status == 'object_accepted': - d = self.authenticated_subdir - elif generation == 'backup': - d = self.authenticated_old_subdir - else: - d = self.unauthenticated_subdir - - filename = os.path.join(d, self.uri_to_filename(uri)) - - ext = os.path.splitext(filename)[1] - if ext in file_name_classes: - yield validation_status_element(timestamp = timestamp, generation = generation, - uri=uri, status = status, filename = filename, - file_class = file_name_classes[ext]) + """ + Iterate over validation_status entries in the XML output from an + rcynic run. Yields a tuple for each entry: + + timestamp, generation, status, object + + where URI, status, and timestamp are the corresponding values from + the XML element, OK is a boolean indicating whether validation was + considered succesful, and object is a Python representation of the + object in question. If OK is True, object will be from rcynic's + authenticated output tree; otherwise, object will be from rcynic's + unauthenticated output tree. + + Note that it is possible for the same URI to appear in more than one + validation_status element; in such cases, the succesful case (OK + True) should be the last entry (as rcynic will stop trying once it + gets a good copy), but there may be multiple failures, which might + or might not have different status codes. + """ + + def __init__(self, rcynic_root, xml_file, + authenticated_old_subdir = "authenticated.old", + unauthenticated_subdir = "unauthenticated"): + self.rcynic_root = rcynic_root + self.xml_file = xml_file + self.authenticated_subdir = os.path.join(rcynic_root, 'authenticated') + self.authenticated_old_subdir = os.path.join(rcynic_root, authenticated_old_subdir) + self.unauthenticated_subdir = os.path.join(rcynic_root, unauthenticated_subdir) + + base_uri = "rsync://" + + def uri_to_filename(self, uri): + if uri.startswith(self.base_uri): + return uri[len(self.base_uri):] + else: + raise NotRsyncURI("Not an rsync URI %r" % uri) + + def __iter__(self): + for validation_status in ElementTree(file=self.xml_file).getroot().getiterator("validation_status"): + timestamp = validation_status.get("timestamp") + status = validation_status.get("status") + uri = validation_status.text.strip() + generation = validation_status.get("generation") + + # determine the path to this object + if status == 'object_accepted': + d = self.authenticated_subdir + elif generation == 'backup': + d = self.authenticated_old_subdir + else: + d = self.unauthenticated_subdir + + filename = os.path.join(d, self.uri_to_filename(uri)) + + ext = os.path.splitext(filename)[1] + if ext in file_name_classes: + yield validation_status_element(timestamp = timestamp, generation = generation, + uri=uri, status = status, filename = filename, + file_class = file_name_classes[ext]) def label_iterator(xml_file): - """ - Returns an iterator which contains all defined labels from an rcynic XML - output file. Each item is a tuple of the form - (label, kind, description). - """ - - for label in ElementTree(file=xml_file).find("labels"): - yield label.tag, label.get("kind"), label.text.strip() + """ + Returns an iterator which contains all defined labels from an rcynic XML + output file. Each item is a tuple of the form + (label, kind, description). + """ + + for label in ElementTree(file=xml_file).find("labels"): + yield label.tag, label.get("kind"), label.text.strip() |