diff options
author | Rob Austein <sra@hactrn.net> | 2012-03-01 00:47:30 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2012-03-01 00:47:30 +0000 |
commit | ece2557835748dddb880b0b7574f47af9e2d1f3d (patch) | |
tree | fd9f6d4ca9fc3b8271503ce183d98fa8e938a657 | |
parent | 3ba401cb2df54893cb33a1627cbc7bff09ede6d8 (diff) |
Generation of new torrents almost working, except that SFTP doesn't
support atomic rename without a protocol extension which paramiko
doesn't support (yet?). Yeargh.
svn path=/trunk/; revision=4379
-rw-r--r-- | rcynic/rpki-torrent.py | 203 |
1 files changed, 179 insertions, 24 deletions
diff --git a/rcynic/rpki-torrent.py b/rcynic/rpki-torrent.py index b5a329df..e533a264 100644 --- a/rcynic/rpki-torrent.py +++ b/rcynic/rpki-torrent.py @@ -34,6 +34,9 @@ import subprocess import syslog import traceback import ConfigParser +import stat +import time +import errno import transmissionrpc @@ -66,6 +69,8 @@ class TorrentDoesNotMatchManifest(Exception): class TorrentNameDoesNotMatchURL(Exception): "Torrent name doesn't uniquely match a URL." +debug_config = True + def main(): try: syslog_flags = syslog.LOG_PID; @@ -74,10 +79,15 @@ def main(): syslog.openlog("rpki-torrent", syslog_flags) global cfg cfg = MyConfigParser() - cfg.read([os.path.join(dn, fn) - for fn in ("rcynic.conf", "rpki.conf") - for dn in ("/var/rcynic/etc", "/usr/local/etc", "/etc")]) - if all(v in os.environ for v in tr_env_vars): + if debug_config: + cfg.read("rcynic.conf") + else: + cfg.read([os.path.join(dn, fn) + for fn in ("rcynic.conf", "rpki.conf") + for dn in ("/var/rcynic/etc", "/usr/local/etc", "/etc")]) + if cfg.act_as_generator: + generator_main() + elif all(v in os.environ for v in tr_env_vars): torrent_completion_main() elif not any(v in os.environ for v in tr_env_vars): cronjob_main() @@ -89,14 +99,103 @@ def main(): sys.exit(1) +def generator_main(): + import paramiko + + paramiko.util.log_to_file("sftp_debug.log") + + z = ZipFile(url = cfg.generate_url, dir = cfg.zip_dir) + client = TransmissionClient() + + client.remove_torrents(z.torrent_name) + + unauthenticated_dir = cfg.get("rcynic", "unauthenticated") + download_dir = client.get_session().download_dir + torrent_dir = os.path.join(download_dir, z.torrent_name) + torrent_file = os.path.join(cfg.zip_dir, z.torrent_name + ".torrent") + + + syslog.syslog("Synchronizing local data from %s to %s" % (unauthenticated_dir, torrent_dir)) + subprocess.check_call((cfg.rsync_prog, "--archive", "--delete", + os.path.normpath(unauthenticated_dir) + "/", + os.path.normpath(torrent_dir) + "/")) + + syslog.syslog("Creating %s" % torrent_file) + try: + os.unlink(torrent_file) + except OSError, e: + if e.errno != errno.ENOENT: + raise + subprocess.check_call((cfg.mktorrent_prog, + "-a", cfg.tracker_url, + "-c", "RPKI unauthenticated data snapshot generated by rpki-torrent", + "-o", torrent_file, + torrent_dir)) + + syslog.syslog("Generating manifest") + manifest = create_manifest(download_dir, z.torrent_name) + + # We might be able to use client.add_url() here instead, check later if we care. + syslog.syslog("Loading %s" % torrent_file) + f = open(torrent_file, "rb") + client.add(base64.b64encode(f.read())) + f.close() + + syslog.syslog("Creating upload connection") + ssh = paramiko.Transport((cfg.sftp_host, cfg.sftp_port)) + ssh.connect( + username = cfg.sftp_user, + hostkey = paramiko.util.load_host_keys(cfg.sftp_hostkey_file)[cfg.sftp_host]["ssh-rsa"], + pkey = paramiko.RSAKey.from_private_key_file(cfg.sftp_private_key_file)) + sftp = paramiko.SFTPClient.from_transport(ssh) + + zip_filename = os.path.join("data", z.filename) + zip_tempname = zip_filename + ".new" + + syslog.syslog("Creating %s" % zip_tempname) + f = sftp.open(zip_tempname, "wb") + z.set_output_stream(f) + + syslog.syslog("Writing %s to zip" % torrent_file) + z.write( + torrent_file, + arcname = os.path.basename(torrent_file), + compress_type = zipfile.ZIP_DEFLATED) + + manifest_name = z.torrent_name + ".manifest" + + syslog.syslog("Writing %s to zip" % manifest_name) + zi = zipfile.ZipInfo(manifest_name, time.gmtime()[:6]) + zi.external_attr = (stat.S_IFREG | 0644) << 16 + zi.internal_attr = 1 # Text, not binary + z.writestr(zi, + "".join("%s %s\n" % (v, k) for k, v in manifest.iteritems()), + zipfile.ZIP_DEFLATED) + + syslog.syslog("Closing %s and renaming to %s" % (zip_tempname, zip_filename)) + z.close() + f.close() + + # Yearg! It breaks here, because paramiko doesn't use the + # posix-rename@openssh.com extension and the default sftp behavior + # is to fail if the file already exists! So much for atomic + # installation via this hack. I wonder how hard it would be to add + # support for the posix-rename extension? + + sftp.rename(zip_tempname, zip_filename) + + syslog.syslog("Closing upload connection") + sftp.close() + + def cronjob_main(): for zip_url in cfg.zip_urls: - z = ZipFile(url = zip_url, dir = cfg.zip_dir, ta = cfg.zip_ta) - client = transmissionrpc.client.Client() + z = ZipFile(url = zip_url, dir = cfg.zip_dir, ta = cfg.zip_ta) + client = TransmissionClient() if z.fetch(): - remove_torrents(client, z.torrent_name) + client.remove_torrents(z.torrent_name) syslog.syslog("Adding torrent %s" % z.torrent_name) client.add(z.get_torrent()) @@ -109,7 +208,7 @@ def torrent_completion_main(): torrent_id = int(os.getenv("TR_TORRENT_ID")) z = ZipFile(url = cfg.find_url(torrent_name), dir = cfg.zip_dir, ta = cfg.zip_ta) - client = transmissionrpc.client.Client() + client = TransmissionClient() torrent = client.info([torrent_id]).popitem()[1] if torrent.name != torrent_name: @@ -176,12 +275,11 @@ class ZipFile(object): may first need to be fetched via HTTPS. """ - def __init__(self, url, dir, ta, verbose = True, mode = "r"): + def __init__(self, url, dir, ta = None, verbose = True): self.url = url self.dir = dir self.ta = ta self.verbose = verbose - self.mode = mode self.filename = os.path.join(dir, os.path.basename(url)) self.changed = False self.zf = None @@ -193,8 +291,7 @@ class ZipFile(object): def __getattr__(self, name): if self.zf is None: - self.zf = zipfile.ZipFile(self.filename, mode = self.mode, - compression = zipfile.ZIP_DEFLATED) + self.zf = zipfile.ZipFile(self.filename) return getattr(self.zf, name) @@ -207,6 +304,8 @@ class ZipFile(object): You probably don't want to look at this if you can avoid it. """ + assert self.ta is not None + # Yes, we're constructing one-off classes. Look away, look away. class HTTPSConnection(httplib.HTTPSConnection): @@ -262,15 +361,20 @@ class ZipFile(object): os.rename(tempname, self.filename) + def set_output_stream(self, stream): + """ + Set up this zip file for writing to a network stream. + """ + + assert self.zf is None + self.zf = zipfile.ZipFile(stream, "w") + + def fetch(self): """ Fetch zip file from URL given to constructor. - This only works in read mode, makes no sense in write mode. """ - if self.mode != "r": - raise WrongMode - headers = { "User-Agent" : "rpki-torrent" } try: headers["If-Modified-Since"] = email.utils.formatdate( @@ -362,17 +466,22 @@ def create_manifest(topdir, torrent_name): return result -def remove_torrents(client, name): +class TransmissionClient(transmissionrpc.client.Client): """ - Remove any torrents with the given name. In theory there should - never be more than one, but it doesn't cost much to check. + Extension of transmissionrpc.client.Client. """ - ids = [i for i, t in client.list().iteritems() if t.name == name] - if ids: - syslog.syslog("Removing torrent%s %s (%s)" % ( - "" if len(ids) == 1 else "s", name, ", ".join("#%s" % i for i in ids))) - client.remove(ids) + def remove_torrents(self, name): + """ + Remove any torrents with the given name. In theory there should + never be more than one, but it doesn't cost much to check. + """ + + ids = [i for i, t in self.list().iteritems() if t.name == name] + if ids: + syslog.syslog("Removing torrent%s %s (%s)" % ( + "" if len(ids) == 1 else "s", name, ", ".join("#%s" % i for i in ids))) + self.remove(ids) class MyConfigParser(ConfigParser.RawConfigParser): @@ -399,6 +508,52 @@ class MyConfigParser(ConfigParser.RawConfigParser): def run_rcynic_anyway(self): return self.getboolean(self.rpki_torrent_section, "run_rcynic_anyway") + @property + def generate_url(self): + return self.get(self.rpki_torrent_section, "generate_url") + + @property + def act_as_generator(self): + try: + return self.get(self.rpki_torrent_section, "generate_url") != "" + except ConfigParser.Error: + return False + + @property + def rsync_prog(self): + return self.get(self.rpki_torrent_section, "rsync_prog") + + @property + def mktorrent_prog(self): + return self.get(self.rpki_torrent_section, "mktorrent_prog") + + @property + def tracker_url(self): + return self.get(self.rpki_torrent_section, "tracker_url") + + @property + def sftp_host(self): + return self.get(self.rpki_torrent_section, "sftp_host") + + @property + def sftp_port(self): + try: + return self.getint(self.rpki_torrent_section, "sftp_port") + except ConfigParser.Error: + return 22 + + @property + def sftp_user(self): + return self.get(self.rpki_torrent_section, "sftp_user") + + @property + def sftp_hostkey_file(self): + return self.get(self.rpki_torrent_section, "sftp_hostkey_file") + + @property + def sftp_private_key_file(self): + return self.get(self.rpki_torrent_section, "sftp_private_key_file") + def multioption_iter(self, name, getter = None): if getter is None: getter = self.get |