aboutsummaryrefslogtreecommitdiff
path: root/rpki/http.py
blob: e41b00801e9d56b015b63b2bba23a72731cd7f53 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
# $Id$
#
# Copyright (C) 2013--2014  Dragon Research Labs ("DRL")
# Portions copyright (C) 2009--2012  Internet Systems Consortium ("ISC")
# Portions copyright (C) 2007--2008  American Registry for Internet Numbers ("ARIN")
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notices and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND DRL, ISC, AND ARIN DISCLAIM ALL
# WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS.  IN NO EVENT SHALL DRL,
# ISC, OR ARIN BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR
# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

"""
HTTP utilities, both client and server.
"""

import time
import socket
import asyncore
import asynchat
import urlparse
import sys
import random
import logging
import rpki.async
import rpki.sundial
import rpki.x509
import rpki.exceptions
import rpki.log
import rpki.POW

logger = logging.getLogger(__name__)

## @var rpki_content_type
# HTTP content type used for all RPKI messages.
rpki_content_type = "application/x-rpki"

## @var want_persistent_client
# Whether we want persistent HTTP client streams, when server also supports them.
want_persistent_client = False

## @var want_persistent_server
# Whether we want persistent HTTP server streams, when client also supports them.
want_persistent_server = False

## @var default_client_timeout
# Default HTTP client connection timeout.
default_client_timeout = rpki.sundial.timedelta(minutes = 5)

## @var default_server_timeout
# Default HTTP server connection timeouts.  Given our druthers, we'd
# prefer that the client close the connection, as this avoids the
# problem of client starting to reuse connection just as server closes
# it, so this should be longer than the client timeout.
default_server_timeout = rpki.sundial.timedelta(minutes = 10)

## @var default_http_version
# Preferred HTTP version.
default_http_version = (1, 0)

## @var default_tcp_port
# Default port for clients and servers that don't specify one.
default_tcp_port = 80

## @var enable_ipv6_servers
# Whether to enable IPv6 listeners.  Enabled by default, as it should
# be harmless.  Has no effect if kernel doesn't support IPv6.
enable_ipv6_servers = True

## @var enable_ipv6_clients
# Whether to consider IPv6 addresses when making connections.
# Disabled by default, as IPv6 connectivity is still a bad joke in
# far too much of the world.
enable_ipv6_clients = False

## @var have_ipv6
# Whether the current machine claims to support IPv6.  Note that just
# because the kernel supports it doesn't mean that the machine has
# usable IPv6 connectivity.  I don't know of a simple portable way to
# probe for connectivity at runtime (the old test of "can you ping
# SRI-NIC.ARPA?" seems a bit dated...).  Don't set this, it's set
# automatically by probing using the socket() system call at runtime.
try:
  # pylint: disable=W0702,W0104
  socket.socket(socket.AF_INET6).close()
  socket.IPPROTO_IPV6
  socket.IPV6_V6ONLY
except:
  have_ipv6 = False
else:
  have_ipv6 = True

## @var use_adns

# Whether to use rpki.adns code.  This is still experimental, so it's
# not (yet) enabled by default.
use_adns = False
try:
  import rpki.adns
except ImportError:
  pass

def supported_address_families(enable_ipv6):
  """
  IP address families on which servers should listen, and to consider
  when selecting addresses for client connections.
  """

  if enable_ipv6 and have_ipv6:
    return (socket.AF_INET, socket.AF_INET6)
  else:
    return (socket.AF_INET,)

def localhost_addrinfo():
  """
  Return pseudo-getaddrinfo results for localhost.
  """

  result = [(socket.AF_INET, "127.0.0.1")]
  if enable_ipv6_clients and have_ipv6:
    result.append((socket.AF_INET6, "::1"))
  return result

class http_message(object):
  """
  Virtual class representing of one HTTP message.
  """

  software_name = "ISC RPKI library"

  def __init__(self, version = None, body = None, headers = None):
    self.version = version
    self.body = body
    self.headers = headers
    self.normalize_headers()

  def normalize_headers(self, headers = None):
    """
    Clean up (some of) the horrible messes that HTTP allows in its
    headers.
    """

    if headers is None:
      headers = () if self.headers is None else self.headers.items()
      translate_underscore = True
    else:
      translate_underscore = False
    result = {}
    for k, v in headers:
      if translate_underscore:
        k = k.replace("_", "-")
      k = "-".join(s.capitalize() for s in k.split("-"))
      v = v.strip()
      if k in result:
        result[k] += ", " + v
      else:
        result[k] = v
    self.headers = result

  @classmethod
  def parse_from_wire(cls, headers):
    """
    Parse and normalize an incoming HTTP message.
    """

    self = cls()
    headers = headers.split("\r\n")
    self.parse_first_line(*headers.pop(0).split(None, 2))
    for i in xrange(len(headers) - 2, -1, -1):
      if headers[i + 1][0].isspace():
        headers[i] += headers[i + 1]
        del headers[i + 1]
    self.normalize_headers([h.split(":", 1) for h in headers])
    return self

  def format(self):
    """
    Format an outgoing HTTP message.
    """

    s = self.format_first_line()
    if self.body is not None:
      assert isinstance(self.body, str)
      self.headers["Content-Length"] = len(self.body)
    for kv in self.headers.iteritems():
      s += "%s: %s\r\n" % kv
    s += "\r\n"
    if self.body is not None:
      s += self.body
    return s

  def __str__(self):
    return self.format()

  def parse_version(self, version):
    """
    Parse HTTP version, raise an exception if we can't.
    """

    if version[:5] != "HTTP/":
      raise rpki.exceptions.HTTPBadVersion("Couldn't parse version %s" % version)
    self.version = tuple(int(i) for i in version[5:].split("."))

  @property
  def persistent(self):
    """
    Figure out whether this HTTP message encourages a persistent connection.
    """

    c = self.headers.get("Connection")
    if self.version == (1, 1):
      return c is None or "close" not in c.lower()
    elif self.version == (1, 0):
      return c is not None and "keep-alive" in c.lower()
    else:
      return False

class http_request(http_message):
  """
  HTTP request message.
  """

  def __init__(self, cmd = None, path = None, version = default_http_version, body = None, callback = None, errback = None, **headers):
    assert cmd == "POST" or body is None
    http_message.__init__(self, version = version, body = body, headers = headers)
    self.cmd = cmd
    self.path = path
    self.callback = callback
    self.errback = errback
    self.retried = False

  def parse_first_line(self, cmd, path, version):
    """
    Parse first line of HTTP request message.
    """

    self.parse_version(version)
    self.cmd = cmd
    self.path = path

  def format_first_line(self):
    """
    Format first line of HTTP request message, and set up the
    User-Agent header.
    """

    self.headers.setdefault("User-Agent", self.software_name)
    return "%s %s HTTP/%d.%d\r\n" % (self.cmd, self.path, self.version[0], self.version[1])

  def __repr__(self):
    return rpki.log.log_repr(self, self.cmd, self.path)

class http_response(http_message):
  """
  HTTP response message.
  """

  def __init__(self, code = None, reason = None, version = default_http_version, body = None, **headers):
    http_message.__init__(self, version = version, body = body, headers = headers)
    self.code = code
    self.reason = reason

  def parse_first_line(self, version, code, reason):
    """
    Parse first line of HTTP response message.
    """

    self.parse_version(version)
    self.code = int(code)
    self.reason = reason

  def format_first_line(self):
    """
    Format first line of HTTP response message, and set up Date and
    Server headers.
    """

    self.headers.setdefault("Date", time.strftime("%a, %d %b %Y %T GMT"))
    self.headers.setdefault("Server", self.software_name)
    return "HTTP/%d.%d %s %s\r\n" % (self.version[0], self.version[1], self.code, self.reason)

  def __repr__(self):
    return rpki.log.log_repr(self, self.code, self.reason)

def addr_to_string(addr):
  """
  Convert socket addr tuple to printable string.  Assumes 2-element
  tuple is IPv4, 4-element tuple is IPv6, throws TypeError for
  anything else.
  """

  if len(addr) == 2:
    return "%s:%d" % (addr[0], addr[1])
  if len(addr) == 4:
    return "%s.%d" % (addr[0], addr[1])
  raise TypeError

@rpki.log.class_logger(logger)
class http_stream(asynchat.async_chat):
  """
  Virtual class representing an HTTP message stream.
  """

  # Keep pylint happy; @class_logger overwrites this.
  logger = None

  def __repr__(self):
    status = ["connected"] if self.connected else []
    try:
      status.append(addr_to_string(self.addr))
    except TypeError:
      pass
    return rpki.log.log_repr(self, *status)

  def __init__(self, sock = None):
    self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
    asynchat.async_chat.__init__(self, sock)
    self.buffer = []
    self.timer = rpki.async.timer(self.handle_timeout)
    self.restart()

  def restart(self):
    """
    (Re)start HTTP message parser, reset timer.
    """

    assert not self.buffer
    self.chunk_handler = None
    self.set_terminator("\r\n\r\n")
    self.update_timeout()

  def update_timeout(self):
    """
    Put this stream's timer in known good state: set it to the
    stream's timeout value if we're doing timeouts, otherwise clear
    it.
    """

    if self.timeout is not None:
      self.logger.debug("Setting timeout %s", self.timeout)
      self.timer.set(self.timeout)
    else:
      self.logger.debug("Clearing timeout")
      self.timer.cancel()

  def collect_incoming_data(self, data):
    """
    Buffer incoming data from asynchat.
    """

    self.buffer.append(data)
    self.update_timeout()

  def get_buffer(self):
    """
    Consume data buffered from asynchat.
    """

    val = "".join(self.buffer)
    self.buffer = []
    return val

  def found_terminator(self):
    """
    Asynchat reported that it found whatever terminator we set, so
    figure out what to do next.  This can be messy, because we can be
    in any of several different states:

    @li We might be handling chunked HTTP, in which case we have to
    initialize the chunk decoder;

    @li We might have found the end of the message body, in which case
    we can (finally) process it; or

    @li We might have just gotten to the end of the message headers,
    in which case we have to parse them to figure out which of three
    separate mechanisms (chunked, content-length, TCP close) is going
    to tell us how to find the end of the message body.
    """

    self.update_timeout()
    if self.chunk_handler:
      self.chunk_handler()
    elif not isinstance(self.get_terminator(), str):
      self.handle_body()
    else:
      self.msg = self.parse_type.parse_from_wire(self.get_buffer())
      if self.msg.version == (1, 1) and "chunked" in self.msg.headers.get("Transfer-Encoding", "").lower():
        self.msg.body = []
        self.chunk_handler = self.chunk_header
        self.set_terminator("\r\n")
      elif "Content-Length" in self.msg.headers:
        self.set_terminator(int(self.msg.headers["Content-Length"]))
      else:
        self.handle_no_content_length()

  def chunk_header(self):
    """
    Asynchat just handed us what should be the header of one chunk of
    a chunked encoding stream.  If this chunk has a body, set the
    stream up to read it; otherwise, this is the last chunk, so start
    the process of exiting the chunk decoder.
    """

    n = int(self.get_buffer().partition(";")[0], 16)
    self.logger.debug("Chunk length %s", n)
    if n:
      self.chunk_handler = self.chunk_body
      self.set_terminator(n)
    else:
      self.msg.body = "".join(self.msg.body)
      self.chunk_handler = self.chunk_discard_trailer

  def chunk_body(self):
    """
    Asynchat just handed us what should be the body of a chunk of the
    body of a chunked message (sic).  Save it, and prepare to move on
    to the next chunk.
    """

    self.logger.debug("Chunk body")
    self.msg.body += self.buffer
    self.buffer = []
    self.chunk_handler = self.chunk_discard_crlf
    self.set_terminator("\r\n")

  def chunk_discard_crlf(self):
    """
    Consume the CRLF that terminates a chunk, reinitialize chunk
    decoder to be ready for the next chunk.
    """

    self.logger.debug("Chunk CRLF")
    s = self.get_buffer()
    assert s == "", "%r: Expected chunk CRLF, got '%s'" % (self, s)
    self.chunk_handler = self.chunk_header

  def chunk_discard_trailer(self):
    """
    Consume chunk trailer, which should be empty, then (finally!) exit
    the chunk decoder and hand complete message off to the application.
    """

    self.logger.debug("Chunk trailer")
    s = self.get_buffer()
    assert s == "", "%r: Expected end of chunk trailers, got '%s'" % (self, s)
    self.chunk_handler = None
    self.handle_message()

  def handle_body(self):
    """
    Hand normal (not chunked) message off to the application.
    """

    self.msg.body = self.get_buffer()
    self.handle_message()

  def handle_error(self):
    """
    Asynchat (or asyncore, or somebody) raised an exception.  See
    whether it's one we should just pass along, otherwise log a stack
    trace and close the stream.
    """

    self.timer.cancel()
    etype = sys.exc_info()[0]
    if etype in (SystemExit, rpki.async.ExitNow):
      raise
    if etype is not rpki.exceptions.HTTPClientAborted:
      self.logger.exception("Closing due to error")
      self.close()

  def handle_timeout(self):
    """
    Inactivity timer expired, close connection with prejudice.
    """

    self.logger.debug("Timeout, closing")
    self.close()

  def handle_close(self):
    """
    Wrapper around asynchat connection close handler, so that we can
    log the event, cancel timer, and so forth.
    """

    self.logger.debug("Close event in HTTP stream handler")
    self.timer.cancel()
    asynchat.async_chat.handle_close(self)

@rpki.log.class_logger(logger)
class http_server(http_stream):
  """
  HTTP server stream.
  """

  ## @var parse_type
  # Stream parser should look for incoming HTTP request messages.
  parse_type = http_request

  ## @var timeout
  # Use the default server timeout value set in the module header.
  timeout = default_server_timeout

  def __init__(self, sock, handlers):
    self.handlers = handlers
    http_stream.__init__(self, sock = sock)
    self.expect_close = not want_persistent_server
    self.logger.debug("Starting")

  def handle_no_content_length(self):
    """
    Handle an incoming message that used neither chunking nor a
    Content-Length header (that is: this message will be the last one
    in this server stream).  No special action required.
    """

    self.handle_message()

  def find_handler(self, path):
    """
    Helper method to search self.handlers.
    """

    for s, h in self.handlers:
      if path.startswith(s):
        return h
    return None

  def handle_message(self):
    """
    HTTP layer managed to deliver a complete HTTP request to
    us, figure out what to do with it.  Check the command and
    Content-Type, look for a handler, and if everything looks right,
    pass the message body, path, and a reply callback to the handler.
    """

    self.logger.debug("Received request %r", self.msg)
    if not self.msg.persistent:
      self.expect_close = True
    handler = self.find_handler(self.msg.path)
    error = None
    if self.msg.cmd != "POST":
      error = 501, "No handler for method %s" % self.msg.cmd
    elif self.msg.headers["Content-Type"] != rpki_content_type:
      error = 415, "No handler for Content-Type %s" % self.headers["Content-Type"]
    elif handler is None:
      error = 404, "No handler for URL %s" % self.msg.path
    if error is None:
      try:
        handler(self.msg.body, self.msg.path, self.send_reply)
      except (rpki.async.ExitNow, SystemExit):
        raise
      except Exception, e:
        self.logger.exception("Unhandled exception while handling HTTP request")
        self.send_error(500, reason = "Unhandled exception %s: %s" % (e.__class__.__name__, e))
    else:
      self.send_error(code = error[0], reason = error[1])

  def send_error(self, code, reason):
    """
    Send an error response to this request.
    """

    self.send_message(code = code, reason = reason)

  def send_reply(self, code, body = None, reason = "OK"):
    """
    Send a reply to this request.
    """

    self.send_message(code = code, body = body, reason = reason)

  def send_message(self, code, reason = "OK", body = None):
    """
    Queue up reply message.  If both parties agree that connection is
    persistant, and if no error occurred, restart this stream to
    listen for next message; otherwise, queue up a close event for
    this stream so it will shut down once the reply has been sent.
    """

    self.logger.debug("Sending response %s %s", code, reason)
    if code >= 400:
      self.expect_close = True
    msg = http_response(code = code, reason = reason, body = body,
                        Content_Type = rpki_content_type,
                        Connection = "Close" if self.expect_close else "Keep-Alive")
    self.push(msg.format())
    if self.expect_close:
      self.logger.debug("Closing")
      self.timer.cancel()
      self.close_when_done()
    else:
      self.logger.debug("Listening for next message")
      self.restart()

@rpki.log.class_logger(logger)
class http_listener(asyncore.dispatcher):
  """
  Listener for incoming HTTP connections.
  """

  def __repr__(self):
    try:
      status = (addr_to_string(self.addr),)
    except TypeError:
      status = ()
    return rpki.log.log_repr(self, *status)

  def __init__(self, handlers, addrinfo):
    self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
    asyncore.dispatcher.__init__(self)
    self.handlers = handlers
    try:
      af, socktype, proto, canonname, sockaddr = addrinfo # pylint: disable=W0612
      self.create_socket(af, socktype)
      self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
      try:
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
      except AttributeError:
        pass
      if have_ipv6 and af == socket.AF_INET6:
        self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
      self.bind(sockaddr)
      self.listen(5)
    except Exception:
      self.logger.exception("Couldn't set up HTTP listener")
      self.close()
    for h in handlers:
      self.logger.debug("Handling %s", h[0])

  def handle_accept(self):
    """
    Asyncore says we have an incoming connection, spawn an http_server
    stream for it and pass along all of our handler data.
    """

    try:
      res =  self.accept()
      if res is None:
        raise
      sock, addr = res                  # pylint: disable=W0633
      self.logger.debug("Accepting connection from %s", addr_to_string(addr))
      http_server(sock = sock, handlers = self.handlers)
    except (rpki.async.ExitNow, SystemExit):
      raise
    except Exception:
      self.logger.exception("Unable to accept connection")

  def handle_error(self):
    """
    Asyncore signaled an error, pass it along or log it.
    """

    if sys.exc_info()[0] in (SystemExit, rpki.async.ExitNow):
      raise
    self.logger.exception("Error in HTTP listener")

@rpki.log.class_logger(logger)
class http_client(http_stream):
  """
  HTTP client stream.
  """

  ## @var parse_type
  # Stream parser should look for incoming HTTP response messages.
  parse_type = http_response

  ## @var timeout
  # Use the default client timeout value set in the module header.
  timeout = default_client_timeout

  ## @var state
  # Application layer connection state.
  state = None

  def __init__(self, queue, hostport):
    http_stream.__init__(self)
    self.logger.debug("Creating new connection to %s", addr_to_string(hostport))
    self.queue = queue
    self.host = hostport[0]
    self.port = hostport[1]
    self.set_state("opening")
    self.expect_close = not want_persistent_client

  def start(self):
    """
    Create socket and request a connection.
    """

    if not use_adns:
      self.logger.debug("Not using ADNS")
      self.gotaddrinfo([(socket.AF_INET, self.host)])
    elif self.host == "localhost":
      self.logger.debug("Bypassing DNS for localhost")
      self.gotaddrinfo(localhost_addrinfo())
    else:
      families = supported_address_families(enable_ipv6_clients)
      self.logger.debug("Starting ADNS lookup for %s in families %r", self.host, families)
      rpki.adns.getaddrinfo(self.gotaddrinfo, self.dns_error, self.host, families)

  def dns_error(self, e):
    """
    Handle DNS lookup errors.  For now, just whack the connection.
    Undoubtedly we should do something better with diagnostics here.
    """

    self.handle_error()

  def gotaddrinfo(self, addrinfo):
    """
    Got address data from DNS, create socket and request connection.
    """

    try:
      self.af, self.address = random.choice(addrinfo)
      self.logger.debug("Connecting to AF %s host %s port %s addr %s", self.af, self.host, self.port, self.address)
      self.create_socket(self.af, socket.SOCK_STREAM)
      self.connect((self.address, self.port))
      if self.addr is None:
        self.addr = (self.host, self.port)
      self.update_timeout()
    except (rpki.async.ExitNow, SystemExit):
      raise
    except Exception:
      self.handle_error()

  def handle_connect(self):
    """
    Asyncore says socket has connected.
    """

    self.logger.debug("Socket connected")
    self.set_state("idle")
    assert self.queue.client is self
    self.queue.send_request()

  def set_state(self, state):
    """
    Set HTTP client connection state.
    """

    self.logger.debug("State transition %s => %s", self.state, state)
    self.state = state

  def handle_no_content_length(self):
    """
    Handle response message that used neither chunking nor a
    Content-Length header (that is: this message will be the last one
    in this server stream).  In this case we want to read until we
    reach the end of the data stream.
    """

    self.set_terminator(None)

  def send_request(self, msg):
    """
    Queue up request message and kickstart connection.
    """

    self.logger.debug("Sending request %r", msg)
    assert self.state == "idle", "%r: state should be idle, is %s" % (self, self.state)
    self.set_state("request-sent")
    msg.headers["Connection"] = "Close" if self.expect_close else "Keep-Alive"
    self.push(msg.format())
    self.restart()

  def handle_message(self):
    """
    Handle incoming HTTP response message.  Make sure we're in a state
    where we expect to see such a message (and allow the mysterious
    empty messages that Apache sends during connection close, no idea
    what that is supposed to be about).  If everybody agrees that the
    connection should stay open, put it into an idle state; otherwise,
    arrange for the stream to shut down.
    """

    self.logger.debug("Message received, state %s", self.state)

    if not self.msg.persistent:
      self.expect_close = True

    if self.state != "request-sent":
      if self.state == "closing":
        assert not self.msg.body
        self.logger.debug("Ignoring empty response received while closing")
        return
      raise rpki.exceptions.HTTPUnexpectedState("%r received message while in unexpected state %s" % (self, self.state))

    if self.expect_close:
      self.logger.debug("Closing")
      self.set_state("closing")
      self.close_when_done()
    else:
      self.logger.debug("Idling")
      self.set_state("idle")
      self.update_timeout()

    if self.msg.code != 200:
      errmsg = "HTTP request failed"
      if self.msg.code is not None:
        errmsg += " with status %s" % self.msg.code
      if self.msg.reason:
        errmsg += ", reason %s" % self.msg.reason
      if self.msg.body:
        errmsg += ", response %s" % self.msg.body
      raise rpki.exceptions.HTTPRequestFailed(errmsg)
    self.queue.return_result(self, self.msg, detach = self.expect_close)

  def handle_close(self):
    """
    Asyncore signaled connection close.  If we were waiting for that
    to find the end of a response message, process the resulting
    message now; if we were waiting for the response to a request we
    sent, signal the error.
    """

    http_stream.handle_close(self)
    self.logger.debug("State %s", self.state)
    if self.get_terminator() is None:
      self.handle_body()
    elif self.state == "request-sent":
      raise rpki.exceptions.HTTPClientAborted("HTTP request aborted by close event")
    else:
      self.queue.detach(self)

  def handle_timeout(self):
    """
    Connection idle timer has expired.  Shut down connection in any
    case, noisily if we weren't idle.
    """

    bad = self.state not in ("idle", "closing")
    if bad:
      self.logger.warning("Timeout while in state %s", self.state)
    http_stream.handle_timeout(self)
    if bad:
      try:
        raise rpki.exceptions.HTTPTimeout
      except:                           # pylint: disable=W0702
        self.handle_error()
    else:
      self.queue.detach(self)

  def handle_error(self):
    """
    Asyncore says something threw an exception.  Log it, then shut
    down the connection and pass back the exception.
    """

    eclass, edata = sys.exc_info()[0:2]
    self.logger.warning("Error on HTTP client connection %s:%s %s %s", self.host, self.port, eclass, edata)
    http_stream.handle_error(self)
    self.queue.return_result(self, edata, detach = True)

@rpki.log.class_logger(logger)
class http_queue(object):
  """
  Queue of pending HTTP requests for a single destination.  This class
  is very tightly coupled to http_client; http_client handles the HTTP
  stream itself, this class provides a slightly higher-level API.
  """

  def __repr__(self):
    return rpki.log.log_repr(self, addr_to_string(self.hostport))

  def __init__(self, hostport):
    self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
    self.hostport = hostport
    self.client = None
    self.logger.debug("Created")
    self.queue = []

  def request(self, *requests):
    """
    Append http_request object(s) to this queue.
    """

    self.logger.debug("Adding requests %r", requests)
    self.queue.extend(requests)

  def restart(self):
    """
    Send next request for this queue, if we can.  This may involve
    starting a new http_client stream, reusing an existing idle
    stream, or just ignoring this request if there's an active client
    stream already; in the last case, handling of the response (or
    exception, or timeout) for the query currently in progress will
    call this method when it's time to kick out the next query.
    """

    try:
      if self.client is None:
        self.client = http_client(self, self.hostport)
        self.logger.debug("Attached client %r", self.client)
        self.client.start()
      elif self.client.state == "idle":
        self.logger.debug("Sending request to existing client %r", self.client)
        self.send_request()
      else:
        self.logger.debug("Client %r exists in state %r", self.client, self.client.state)
    except (rpki.async.ExitNow, SystemExit):
      raise
    except Exception, e:
      self.return_result(self.client, e, detach = True)

  def send_request(self):
    """
    Kick out the next query in this queue, if any.
    """

    if self.queue:
      self.client.send_request(self.queue[0])

  def detach(self, client_):
    """
    Detatch a client from this queue.  Silently ignores attempting to
    detach a client that is not attached to this queue, to simplify
    handling of what otherwise would be a nasty set of race
    conditions.
    """

    if client_ is self.client:
      self.logger.debug("Detaching client %r", client_)
      self.client = None

  def return_result(self, client, result, detach = False): # pylint: disable=W0621
    """
    Client stream has returned a result, which we need to pass along
    to the original caller.  Result may be either an HTTP response
    message or an exception.  In either case, once we're done
    processing this result, kick off next message in the queue, if any.
    """

    if client is not self.client:
      self.logger.warning("Wrong client trying to return result.  THIS SHOULD NOT HAPPEN.  Dropping result %r", result)
      return

    if detach:
      self.detach(client)

    try:
      req = self.queue.pop(0)
      self.logger.debug("Dequeuing request %r", req)
    except IndexError:
      self.logger.warning("No caller.  THIS SHOULD NOT HAPPEN.  Dropping result %r", result)
      return

    assert isinstance(result, http_response) or isinstance(result, Exception)

    if isinstance(result, http_response):
      try:
        self.logger.debug("Returning result %r to caller", result)
        req.callback(result.body)
      except (rpki.async.ExitNow, SystemExit):
        raise
      except Exception, e:
        result = e

    if isinstance(result, Exception):
      try:
        self.logger.warning("Returning exception %r to caller: %s", result, result)
        req.errback(result)
      except (rpki.async.ExitNow, SystemExit):
        raise
      except Exception:
        self.logger.exception("Exception in exception callback, may have lost event chain")

    self.logger.debug("Queue: %r", self.queue)

    if self.queue:
      self.restart()

## @var client_queues
# Map of (host, port) tuples to http_queue objects.
client_queues = {}

def client(msg, url, callback, errback):
  """
  Open client HTTP connection, send a message, set up callbacks to
  handle response.
  """

  u = urlparse.urlparse(url)

  if (u.scheme not in ("", "http") or
      u.username is not None or
      u.password is not None or
      u.params   != "" or
      u.query    != "" or
      u.fragment != ""):
    raise rpki.exceptions.BadClientURL("Unusable URL %s" % url)

  logger.debug("Contacting %s", url)

  request = http_request(
    cmd                 = "POST",
    path                = u.path,
    body                = msg,
    callback            = callback,
    errback             = errback,
    Host                = u.hostname,
    Content_Type        = rpki_content_type)

  hostport = (u.hostname or "localhost", u.port or default_tcp_port)

  logger.debug("Created request %r for %s", request, addr_to_string(hostport))
  if hostport not in client_queues:
    client_queues[hostport] = http_queue(hostport)
  client_queues[hostport].request(request)

  # Defer connection attempt until after we've had time to process any
  # pending I/O events, in case connections have closed.

  logger.debug("Scheduling connection startup for %r", request)
  rpki.async.event_defer(client_queues[hostport].restart)

def server(handlers, port, host = ""):
  """
  Run an HTTP server and wait (forever) for connections.
  """

  if not isinstance(handlers, (tuple, list)):
    handlers = (("/", handlers),)

  # Yes, this is sick.  So is getaddrinfo() returning duplicate
  # records, which RedHat has the gall to claim is a feature.
  ai = []
  for af in supported_address_families(enable_ipv6_servers):
    try:
      if host:
        h = host
      elif have_ipv6 and af == socket.AF_INET6:
        h = "::"
      else:
        h = "0.0.0.0"
      for a in socket.getaddrinfo(h, port, af, socket.SOCK_STREAM):
        if a not in ai:
          ai.append(a)
    except socket.gaierror:
      pass

  for a in ai:
    http_listener(addrinfo = a, handlers = handlers)

  rpki.async.event_loop()

class caller(object):
  """
  Handle client-side mechanics for protocols based on HTTP, CMS, and
  rpki.xml_utils.  Calling sequence is intended to nest within
  rpki.async.sync_wrapper.
  """

  debug = False

  def __init__(self, proto, client_key, client_cert, server_ta, server_cert, url, debug = None):
    self.proto = proto
    self.client_key = client_key
    self.client_cert = client_cert
    self.server_ta = server_ta
    self.server_cert = server_cert
    self.url = url
    self.cms_timestamp = None
    if debug is not None:
      self.debug = debug

  def __call__(self, cb, eb, *pdus):

    def done(r_der):
      """
      Handle CMS-wrapped XML response message.
      """

      try:
        r_cms = self.proto.cms_msg(DER = r_der)
        r_msg = r_cms.unwrap((self.server_ta, self.server_cert))
        self.cms_timestamp = r_cms.check_replay(self.cms_timestamp, self.url)
        if self.debug:
          print "<!-- Reply -->"
          print r_cms.pretty_print_content()
        cb(r_msg)
      except (rpki.async.ExitNow, SystemExit):
        raise
      except Exception, e:
        eb(e)

    q_msg = self.proto.msg.query(*pdus)
    q_cms = self.proto.cms_msg()
    q_der = q_cms.wrap(q_msg, self.client_key, self.client_cert)
    if self.debug:
      print "<!-- Query -->"
      print q_cms.pretty_print_content()

    client(url = self.url, msg = q_der, callback = done, errback = eb)