1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
|
# $Id$
#
# Copyright (C) 2013--2014 Dragon Research Labs ("DRL")
# Portions copyright (C) 2009--2012 Internet Systems Consortium ("ISC")
# Portions copyright (C) 2007--2008 American Registry for Internet Numbers ("ARIN")
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notices and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND DRL, ISC, AND ARIN DISCLAIM ALL
# WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL DRL,
# ISC, OR ARIN BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR
# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
"""
HTTP utilities, both client and server.
"""
import time
import socket
import asyncore
import asynchat
import urlparse
import sys
import random
import logging
import rpki.async
import rpki.sundial
import rpki.x509
import rpki.exceptions
import rpki.log
import rpki.POW
logger = logging.getLogger(__name__)
## @var default_content_type
# HTTP content type used for RPKI messages.
# Can be overriden on a per-client or per-server basis.
default_content_type = "application/x-rpki"
## @var want_persistent_client
# Whether we want persistent HTTP client streams, when server also supports them.
want_persistent_client = False
## @var want_persistent_server
# Whether we want persistent HTTP server streams, when client also supports them.
want_persistent_server = False
## @var default_client_timeout
# Default HTTP client connection timeout.
default_client_timeout = rpki.sundial.timedelta(minutes = 5)
## @var default_server_timeout
# Default HTTP server connection timeouts. Given our druthers, we'd
# prefer that the client close the connection, as this avoids the
# problem of client starting to reuse connection just as server closes
# it, so this should be longer than the client timeout.
default_server_timeout = rpki.sundial.timedelta(minutes = 10)
## @var default_http_version
# Preferred HTTP version.
default_http_version = (1, 0)
## @var default_tcp_port
# Default port for clients and servers that don't specify one.
default_tcp_port = 80
## @var enable_ipv6_servers
# Whether to enable IPv6 listeners. Enabled by default, as it should
# be harmless. Has no effect if kernel doesn't support IPv6.
enable_ipv6_servers = True
## @var enable_ipv6_clients
# Whether to consider IPv6 addresses when making connections.
# Disabled by default, as IPv6 connectivity is still a bad joke in
# far too much of the world.
enable_ipv6_clients = False
## @var have_ipv6
# Whether the current machine claims to support IPv6. Note that just
# because the kernel supports it doesn't mean that the machine has
# usable IPv6 connectivity. I don't know of a simple portable way to
# probe for connectivity at runtime (the old test of "can you ping
# SRI-NIC.ARPA?" seems a bit dated...). Don't set this, it's set
# automatically by probing using the socket() system call at runtime.
try:
# pylint: disable=W0702,W0104
socket.socket(socket.AF_INET6).close()
socket.IPPROTO_IPV6
socket.IPV6_V6ONLY
except:
have_ipv6 = False
else:
have_ipv6 = True
## @var use_adns
# Whether to use rpki.adns code. This is still experimental, so it's
# not (yet) enabled by default.
use_adns = False
try:
import rpki.adns
except ImportError:
pass
def supported_address_families(enable_ipv6):
"""
IP address families on which servers should listen, and to consider
when selecting addresses for client connections.
"""
if enable_ipv6 and have_ipv6:
return (socket.AF_INET, socket.AF_INET6)
else:
return (socket.AF_INET,)
def localhost_addrinfo():
"""
Return pseudo-getaddrinfo results for localhost.
"""
result = [(socket.AF_INET, "127.0.0.1")]
if enable_ipv6_clients and have_ipv6:
result.append((socket.AF_INET6, "::1"))
return result
class http_message(object):
"""
Virtual class representing of one HTTP message.
"""
software_name = "ISC RPKI library"
def __init__(self, version = None, body = None, headers = None):
self.version = version
self.body = body
self.headers = headers
self.normalize_headers()
def normalize_headers(self, headers = None):
"""
Clean up (some of) the horrible messes that HTTP allows in its
headers.
"""
if headers is None:
headers = () if self.headers is None else self.headers.items()
translate_underscore = True
else:
translate_underscore = False
result = {}
for k, v in headers:
if translate_underscore:
k = k.replace("_", "-")
k = "-".join(s.capitalize() for s in k.split("-"))
v = v.strip()
if k in result:
result[k] += ", " + v
else:
result[k] = v
self.headers = result
@classmethod
def parse_from_wire(cls, headers):
"""
Parse and normalize an incoming HTTP message.
"""
self = cls()
headers = headers.split("\r\n")
self.parse_first_line(*headers.pop(0).split(None, 2))
for i in xrange(len(headers) - 2, -1, -1):
if headers[i + 1][0].isspace():
headers[i] += headers[i + 1]
del headers[i + 1]
self.normalize_headers([h.split(":", 1) for h in headers])
return self
def format(self):
"""
Format an outgoing HTTP message.
"""
s = self.format_first_line()
if self.body is not None:
assert isinstance(self.body, str)
self.headers["Content-Length"] = len(self.body)
for kv in self.headers.iteritems():
s += "%s: %s\r\n" % kv
s += "\r\n"
if self.body is not None:
s += self.body
return s
def __str__(self):
return self.format()
def parse_version(self, version):
"""
Parse HTTP version, raise an exception if we can't.
"""
if version[:5] != "HTTP/":
raise rpki.exceptions.HTTPBadVersion("Couldn't parse version %s" % version)
self.version = tuple(int(i) for i in version[5:].split("."))
@property
def persistent(self):
"""
Figure out whether this HTTP message encourages a persistent connection.
"""
c = self.headers.get("Connection")
if self.version == (1, 1):
return c is None or "close" not in c.lower()
elif self.version == (1, 0):
return c is not None and "keep-alive" in c.lower()
else:
return False
class http_request(http_message):
"""
HTTP request message.
"""
def __init__(self, cmd = None, path = None, version = default_http_version, body = None, callback = None, errback = None, **headers):
assert cmd == "POST" or body is None
http_message.__init__(self, version = version, body = body, headers = headers)
self.cmd = cmd
self.path = path
self.callback = callback
self.errback = errback
self.retried = False
def parse_first_line(self, cmd, path, version):
"""
Parse first line of HTTP request message.
"""
self.parse_version(version)
self.cmd = cmd
self.path = path
def format_first_line(self):
"""
Format first line of HTTP request message, and set up the
User-Agent header.
"""
self.headers.setdefault("User-Agent", self.software_name)
return "%s %s HTTP/%d.%d\r\n" % (self.cmd, self.path, self.version[0], self.version[1])
def __repr__(self):
return rpki.log.log_repr(self, self.cmd, self.path)
class http_response(http_message):
"""
HTTP response message.
"""
def __init__(self, code = None, reason = None, version = default_http_version, body = None, **headers):
http_message.__init__(self, version = version, body = body, headers = headers)
self.code = code
self.reason = reason
def parse_first_line(self, version, code, reason):
"""
Parse first line of HTTP response message.
"""
self.parse_version(version)
self.code = int(code)
self.reason = reason
def format_first_line(self):
"""
Format first line of HTTP response message, and set up Date and
Server headers.
"""
self.headers.setdefault("Date", time.strftime("%a, %d %b %Y %T GMT"))
self.headers.setdefault("Server", self.software_name)
return "HTTP/%d.%d %s %s\r\n" % (self.version[0], self.version[1], self.code, self.reason)
def __repr__(self):
return rpki.log.log_repr(self, self.code, self.reason)
def addr_to_string(addr):
"""
Convert socket addr tuple to printable string. Assumes 2-element
tuple is IPv4, 4-element tuple is IPv6, throws TypeError for
anything else.
"""
if len(addr) == 2:
return "%s:%d" % (addr[0], addr[1])
if len(addr) == 4:
return "%s.%d" % (addr[0], addr[1])
raise TypeError
@rpki.log.class_logger(logger)
class http_stream(asynchat.async_chat):
"""
Virtual class representing an HTTP message stream.
"""
# Keep pylint happy; @class_logger overwrites this.
logger = None
def __repr__(self):
status = ["connected"] if self.connected else []
try:
status.append(addr_to_string(self.addr))
except TypeError:
pass
return rpki.log.log_repr(self, *status)
def __init__(self, sock = None):
self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
asynchat.async_chat.__init__(self, sock)
self.buffer = []
self.timer = rpki.async.timer(self.handle_timeout)
self.restart()
def restart(self):
"""
(Re)start HTTP message parser, reset timer.
"""
assert not self.buffer
self.chunk_handler = None
self.set_terminator("\r\n\r\n")
self.update_timeout()
def update_timeout(self):
"""
Put this stream's timer in known good state: set it to the
stream's timeout value if we're doing timeouts, otherwise clear
it.
"""
if self.timeout is not None:
self.logger.debug("Setting timeout %s", self.timeout)
self.timer.set(self.timeout)
else:
self.logger.debug("Clearing timeout")
self.timer.cancel()
def collect_incoming_data(self, data):
"""
Buffer incoming data from asynchat.
"""
self.buffer.append(data)
self.update_timeout()
def get_buffer(self):
"""
Consume data buffered from asynchat.
"""
val = "".join(self.buffer)
self.buffer = []
return val
def found_terminator(self):
"""
Asynchat reported that it found whatever terminator we set, so
figure out what to do next. This can be messy, because we can be
in any of several different states:
@li We might be handling chunked HTTP, in which case we have to
initialize the chunk decoder;
@li We might have found the end of the message body, in which case
we can (finally) process it; or
@li We might have just gotten to the end of the message headers,
in which case we have to parse them to figure out which of three
separate mechanisms (chunked, content-length, TCP close) is going
to tell us how to find the end of the message body.
"""
self.update_timeout()
if self.chunk_handler:
self.chunk_handler()
elif not isinstance(self.get_terminator(), str):
self.handle_body()
else:
self.msg = self.parse_type.parse_from_wire(self.get_buffer())
if self.msg.version == (1, 1) and "chunked" in self.msg.headers.get("Transfer-Encoding", "").lower():
self.msg.body = []
self.chunk_handler = self.chunk_header
self.set_terminator("\r\n")
elif "Content-Length" in self.msg.headers:
self.set_terminator(int(self.msg.headers["Content-Length"]))
else:
self.handle_no_content_length()
def chunk_header(self):
"""
Asynchat just handed us what should be the header of one chunk of
a chunked encoding stream. If this chunk has a body, set the
stream up to read it; otherwise, this is the last chunk, so start
the process of exiting the chunk decoder.
"""
n = int(self.get_buffer().partition(";")[0], 16)
self.logger.debug("Chunk length %s", n)
if n:
self.chunk_handler = self.chunk_body
self.set_terminator(n)
else:
self.msg.body = "".join(self.msg.body)
self.chunk_handler = self.chunk_discard_trailer
def chunk_body(self):
"""
Asynchat just handed us what should be the body of a chunk of the
body of a chunked message (sic). Save it, and prepare to move on
to the next chunk.
"""
self.logger.debug("Chunk body")
self.msg.body += self.buffer
self.buffer = []
self.chunk_handler = self.chunk_discard_crlf
self.set_terminator("\r\n")
def chunk_discard_crlf(self):
"""
Consume the CRLF that terminates a chunk, reinitialize chunk
decoder to be ready for the next chunk.
"""
self.logger.debug("Chunk CRLF")
s = self.get_buffer()
assert s == "", "%r: Expected chunk CRLF, got '%s'" % (self, s)
self.chunk_handler = self.chunk_header
def chunk_discard_trailer(self):
"""
Consume chunk trailer, which should be empty, then (finally!) exit
the chunk decoder and hand complete message off to the application.
"""
self.logger.debug("Chunk trailer")
s = self.get_buffer()
assert s == "", "%r: Expected end of chunk trailers, got '%s'" % (self, s)
self.chunk_handler = None
self.handle_message()
def handle_body(self):
"""
Hand normal (not chunked) message off to the application.
"""
self.msg.body = self.get_buffer()
self.handle_message()
def handle_error(self):
"""
Asynchat (or asyncore, or somebody) raised an exception. See
whether it's one we should just pass along, otherwise log a stack
trace and close the stream.
"""
self.timer.cancel()
etype = sys.exc_info()[0]
if etype in (SystemExit, rpki.async.ExitNow):
raise
if etype is not rpki.exceptions.HTTPClientAborted:
self.logger.exception("Closing due to error")
self.close()
def handle_timeout(self):
"""
Inactivity timer expired, close connection with prejudice.
"""
self.logger.debug("Timeout, closing")
self.close()
def handle_close(self):
"""
Wrapper around asynchat connection close handler, so that we can
log the event, cancel timer, and so forth.
"""
self.logger.debug("Close event in HTTP stream handler")
self.timer.cancel()
asynchat.async_chat.handle_close(self)
@rpki.log.class_logger(logger)
class http_server(http_stream):
"""
HTTP server stream.
"""
## @var parse_type
# Stream parser should look for incoming HTTP request messages.
parse_type = http_request
## @var timeout
# Use the default server timeout value set in the module header.
timeout = default_server_timeout
def __init__(self, sock, handlers):
self.handlers = handlers
self.received_content_type = None
http_stream.__init__(self, sock = sock)
self.expect_close = not want_persistent_server
self.logger.debug("Starting")
def handle_no_content_length(self):
"""
Handle an incoming message that used neither chunking nor a
Content-Length header (that is: this message will be the last one
in this server stream). No special action required.
"""
self.handle_message()
def find_handler(self, path):
"""
Helper method to search self.handlers.
"""
for h in self.handlers:
if path.startswith(h[0]):
return h[1], h[2] if len(h) > 2 else (default_content_type,)
return None, None
def handle_message(self):
"""
HTTP layer managed to deliver a complete HTTP request to
us, figure out what to do with it. Check the command and
Content-Type, look for a handler, and if everything looks right,
pass the message body, path, and a reply callback to the handler.
"""
self.logger.debug("Received request %r", self.msg)
if not self.msg.persistent:
self.expect_close = True
handler, allowed_content_types = self.find_handler(self.msg.path)
self.received_content_type = self.msg.headers["Content-Type"]
error = None
if self.msg.cmd != "POST":
error = 501, "No handler for method %s" % self.msg.cmd
elif self.received_content_type not in allowed_content_types:
error = 415, "No handler for Content-Type %s" % self.received_content_type
elif handler is None:
error = 404, "No handler for URL %s" % self.msg.path
if error is None:
try:
handler(self.msg.body, self.msg.path, self.send_reply)
except (rpki.async.ExitNow, SystemExit):
raise
except Exception, e:
self.logger.exception("Unhandled exception while handling HTTP request")
self.send_error(500, reason = "Unhandled exception %s: %s" % (e.__class__.__name__, e))
else:
self.send_error(code = error[0], reason = error[1])
def send_error(self, code, reason):
"""
Send an error response to this request.
"""
self.send_message(code = code, reason = reason)
def send_reply(self, code, body = None, reason = "OK"):
"""
Send a reply to this request.
"""
self.send_message(code = code, body = body, reason = reason)
def send_message(self, code, reason = "OK", body = None):
"""
Queue up reply message. If both parties agree that connection is
persistant, and if no error occurred, restart this stream to
listen for next message; otherwise, queue up a close event for
this stream so it will shut down once the reply has been sent.
"""
self.logger.debug("Sending response %s %s", code, reason)
if code >= 400:
self.expect_close = True
msg = http_response(code = code, reason = reason, body = body,
Content_Type = self.received_content_type,
Connection = "Close" if self.expect_close else "Keep-Alive")
self.push(msg.format())
if self.expect_close:
self.logger.debug("Closing")
self.timer.cancel()
self.close_when_done()
else:
self.logger.debug("Listening for next message")
self.restart()
@rpki.log.class_logger(logger)
class http_listener(asyncore.dispatcher):
"""
Listener for incoming HTTP connections.
"""
def __repr__(self):
try:
status = (addr_to_string(self.addr),)
except TypeError:
status = ()
return rpki.log.log_repr(self, *status)
def __init__(self, handlers, addrinfo):
self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
asyncore.dispatcher.__init__(self)
self.handlers = handlers
try:
af, socktype, proto, canonname, sockaddr = addrinfo # pylint: disable=W0612
self.create_socket(af, socktype)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except AttributeError:
pass
if have_ipv6 and af == socket.AF_INET6:
self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
self.bind(sockaddr)
self.listen(5)
except Exception:
self.logger.exception("Couldn't set up HTTP listener")
self.close()
for h in handlers:
self.logger.debug("Handling %s", h[0])
def handle_accept(self):
"""
Asyncore says we have an incoming connection, spawn an http_server
stream for it and pass along all of our handler data.
"""
try:
res = self.accept()
if res is None:
raise
sock, addr = res # pylint: disable=W0633
self.logger.debug("Accepting connection from %s", addr_to_string(addr))
http_server(sock = sock, handlers = self.handlers)
except (rpki.async.ExitNow, SystemExit):
raise
except Exception:
self.logger.exception("Unable to accept connection")
def handle_error(self):
"""
Asyncore signaled an error, pass it along or log it.
"""
if sys.exc_info()[0] in (SystemExit, rpki.async.ExitNow):
raise
self.logger.exception("Error in HTTP listener")
@rpki.log.class_logger(logger)
class http_client(http_stream):
"""
HTTP client stream.
"""
## @var parse_type
# Stream parser should look for incoming HTTP response messages.
parse_type = http_response
## @var timeout
# Use the default client timeout value set in the module header.
timeout = default_client_timeout
## @var state
# Application layer connection state.
state = None
def __init__(self, queue, hostport):
http_stream.__init__(self)
self.logger.debug("Creating new connection to %s", addr_to_string(hostport))
self.queue = queue
self.host = hostport[0]
self.port = hostport[1]
self.set_state("opening")
self.expect_close = not want_persistent_client
def start(self):
"""
Create socket and request a connection.
"""
if not use_adns:
self.logger.debug("Not using ADNS")
self.gotaddrinfo([(socket.AF_INET, self.host)])
elif self.host == "localhost":
self.logger.debug("Bypassing DNS for localhost")
self.gotaddrinfo(localhost_addrinfo())
else:
families = supported_address_families(enable_ipv6_clients)
self.logger.debug("Starting ADNS lookup for %s in families %r", self.host, families)
rpki.adns.getaddrinfo(self.gotaddrinfo, self.dns_error, self.host, families)
def dns_error(self, e):
"""
Handle DNS lookup errors. For now, just whack the connection.
Undoubtedly we should do something better with diagnostics here.
"""
self.handle_error()
def gotaddrinfo(self, addrinfo):
"""
Got address data from DNS, create socket and request connection.
"""
try:
self.af, self.address = random.choice(addrinfo)
self.logger.debug("Connecting to AF %s host %s port %s addr %s", self.af, self.host, self.port, self.address)
self.create_socket(self.af, socket.SOCK_STREAM)
self.connect((self.address, self.port))
if self.addr is None:
self.addr = (self.host, self.port)
self.update_timeout()
except (rpki.async.ExitNow, SystemExit):
raise
except Exception:
self.handle_error()
def handle_connect(self):
"""
Asyncore says socket has connected.
"""
self.logger.debug("Socket connected")
self.set_state("idle")
assert self.queue.client is self
self.queue.send_request()
def set_state(self, state):
"""
Set HTTP client connection state.
"""
self.logger.debug("State transition %s => %s", self.state, state)
self.state = state
def handle_no_content_length(self):
"""
Handle response message that used neither chunking nor a
Content-Length header (that is: this message will be the last one
in this server stream). In this case we want to read until we
reach the end of the data stream.
"""
self.set_terminator(None)
def send_request(self, msg):
"""
Queue up request message and kickstart connection.
"""
self.logger.debug("Sending request %r", msg)
assert self.state == "idle", "%r: state should be idle, is %s" % (self, self.state)
self.set_state("request-sent")
msg.headers["Connection"] = "Close" if self.expect_close else "Keep-Alive"
self.push(msg.format())
self.restart()
def handle_message(self):
"""
Handle incoming HTTP response message. Make sure we're in a state
where we expect to see such a message (and allow the mysterious
empty messages that Apache sends during connection close, no idea
what that is supposed to be about). If everybody agrees that the
connection should stay open, put it into an idle state; otherwise,
arrange for the stream to shut down.
"""
self.logger.debug("Message received, state %s", self.state)
if not self.msg.persistent:
self.expect_close = True
if self.state != "request-sent":
if self.state == "closing":
assert not self.msg.body
self.logger.debug("Ignoring empty response received while closing")
return
raise rpki.exceptions.HTTPUnexpectedState("%r received message while in unexpected state %s" % (self, self.state))
if self.expect_close:
self.logger.debug("Closing")
self.set_state("closing")
self.close_when_done()
else:
self.logger.debug("Idling")
self.set_state("idle")
self.update_timeout()
if self.msg.code != 200:
errmsg = "HTTP request failed"
if self.msg.code is not None:
errmsg += " with status %s" % self.msg.code
if self.msg.reason:
errmsg += ", reason %s" % self.msg.reason
if self.msg.body:
errmsg += ", response %s" % self.msg.body
raise rpki.exceptions.HTTPRequestFailed(errmsg)
self.queue.return_result(self, self.msg, detach = self.expect_close)
def handle_close(self):
"""
Asyncore signaled connection close. If we were waiting for that
to find the end of a response message, process the resulting
message now; if we were waiting for the response to a request we
sent, signal the error.
"""
http_stream.handle_close(self)
self.logger.debug("State %s", self.state)
if self.get_terminator() is None:
self.handle_body()
elif self.state == "request-sent":
raise rpki.exceptions.HTTPClientAborted("HTTP request aborted by close event")
else:
self.queue.detach(self)
def handle_timeout(self):
"""
Connection idle timer has expired. Shut down connection in any
case, noisily if we weren't idle.
"""
bad = self.state not in ("idle", "closing")
if bad:
self.logger.warning("Timeout while in state %s", self.state)
http_stream.handle_timeout(self)
if bad:
try:
raise rpki.exceptions.HTTPTimeout
except: # pylint: disable=W0702
self.handle_error()
else:
self.queue.detach(self)
def handle_error(self):
"""
Asyncore says something threw an exception. Log it, then shut
down the connection and pass back the exception.
"""
eclass, edata = sys.exc_info()[0:2]
self.logger.warning("Error on HTTP client connection %s:%s %s %s", self.host, self.port, eclass, edata)
http_stream.handle_error(self)
self.queue.return_result(self, edata, detach = True)
@rpki.log.class_logger(logger)
class http_queue(object):
"""
Queue of pending HTTP requests for a single destination. This class
is very tightly coupled to http_client; http_client handles the HTTP
stream itself, this class provides a slightly higher-level API.
"""
def __repr__(self):
return rpki.log.log_repr(self, addr_to_string(self.hostport))
def __init__(self, hostport):
self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
self.hostport = hostport
self.client = None
self.logger.debug("Created")
self.queue = []
def request(self, *requests):
"""
Append http_request object(s) to this queue.
"""
self.logger.debug("Adding requests %r", requests)
self.queue.extend(requests)
def restart(self):
"""
Send next request for this queue, if we can. This may involve
starting a new http_client stream, reusing an existing idle
stream, or just ignoring this request if there's an active client
stream already; in the last case, handling of the response (or
exception, or timeout) for the query currently in progress will
call this method when it's time to kick out the next query.
"""
try:
if self.client is None:
self.client = http_client(self, self.hostport)
self.logger.debug("Attached client %r", self.client)
self.client.start()
elif self.client.state == "idle":
self.logger.debug("Sending request to existing client %r", self.client)
self.send_request()
else:
self.logger.debug("Client %r exists in state %r", self.client, self.client.state)
except (rpki.async.ExitNow, SystemExit):
raise
except Exception, e:
self.return_result(self.client, e, detach = True)
def send_request(self):
"""
Kick out the next query in this queue, if any.
"""
if self.queue:
self.client.send_request(self.queue[0])
def detach(self, client_):
"""
Detatch a client from this queue. Silently ignores attempting to
detach a client that is not attached to this queue, to simplify
handling of what otherwise would be a nasty set of race
conditions.
"""
if client_ is self.client:
self.logger.debug("Detaching client %r", client_)
self.client = None
def return_result(self, client, result, detach = False): # pylint: disable=W0621
"""
Client stream has returned a result, which we need to pass along
to the original caller. Result may be either an HTTP response
message or an exception. In either case, once we're done
processing this result, kick off next message in the queue, if any.
"""
if client is not self.client:
self.logger.warning("Wrong client trying to return result. THIS SHOULD NOT HAPPEN. Dropping result %r", result)
return
if detach:
self.detach(client)
try:
req = self.queue.pop(0)
self.logger.debug("Dequeuing request %r", req)
except IndexError:
self.logger.warning("No caller. THIS SHOULD NOT HAPPEN. Dropping result %r", result)
return
assert isinstance(result, http_response) or isinstance(result, Exception)
if isinstance(result, http_response):
try:
self.logger.debug("Returning result %r to caller", result)
req.callback(result.body)
except (rpki.async.ExitNow, SystemExit):
raise
except Exception, e:
result = e
if isinstance(result, Exception):
try:
self.logger.warning("Returning exception %r to caller: %s", result, result)
req.errback(result)
except (rpki.async.ExitNow, SystemExit):
raise
except Exception:
self.logger.exception("Exception in exception callback, may have lost event chain")
self.logger.debug("Queue: %r", self.queue)
if self.queue:
self.restart()
## @var client_queues
# Map of (host, port) tuples to http_queue objects.
client_queues = {}
def client(msg, url, callback, errback, content_type = default_content_type):
"""
Open client HTTP connection, send a message, set up callbacks to
handle response.
"""
u = urlparse.urlparse(url)
if (u.scheme not in ("", "http") or
u.username is not None or
u.password is not None or
u.params != "" or
u.query != "" or
u.fragment != ""):
raise rpki.exceptions.BadClientURL("Unusable URL %s" % url)
logger.debug("Contacting %s", url)
request = http_request(
cmd = "POST",
path = u.path,
body = msg,
callback = callback,
errback = errback,
Host = u.hostname,
Content_Type = content_type)
hostport = (u.hostname or "localhost", u.port or default_tcp_port)
logger.debug("Created request %r for %s", request, addr_to_string(hostport))
if hostport not in client_queues:
client_queues[hostport] = http_queue(hostport)
client_queues[hostport].request(request)
# Defer connection attempt until after we've had time to process any
# pending I/O events, in case connections have closed.
logger.debug("Scheduling connection startup for %r", request)
rpki.async.event_defer(client_queues[hostport].restart)
def server(handlers, port, host = ""):
"""
Run an HTTP server and wait (forever) for connections.
"""
if not isinstance(handlers, (tuple, list)):
handlers = (("/", handlers),)
# Yes, this is sick. So is getaddrinfo() returning duplicate
# records, which RedHat has the gall to claim is a feature.
ai = []
for af in supported_address_families(enable_ipv6_servers):
try:
if host:
h = host
elif have_ipv6 and af == socket.AF_INET6:
h = "::"
else:
h = "0.0.0.0"
for a in socket.getaddrinfo(h, port, af, socket.SOCK_STREAM):
if a not in ai:
ai.append(a)
except socket.gaierror:
pass
for a in ai:
http_listener(addrinfo = a, handlers = handlers)
rpki.async.event_loop()
class caller(object):
"""
Handle client-side mechanics for protocols based on HTTP, CMS, and
rpki.xml_utils. Calling sequence is intended to nest within
rpki.async.sync_wrapper.
"""
debug = False
def __init__(self, proto, client_key, client_cert, server_ta, server_cert, url, debug = None):
self.proto = proto
self.client_key = client_key
self.client_cert = client_cert
self.server_ta = server_ta
self.server_cert = server_cert
self.url = url
self.cms_timestamp = None
if debug is not None:
self.debug = debug
def __call__(self, cb, eb, *pdus):
def done(r_der):
"""
Handle CMS-wrapped XML response message.
"""
try:
r_cms = self.proto.cms_msg(DER = r_der)
r_msg = r_cms.unwrap((self.server_ta, self.server_cert))
self.cms_timestamp = r_cms.check_replay(self.cms_timestamp, self.url)
if self.debug:
print "<!-- Reply -->"
print r_cms.pretty_print_content()
cb(r_msg)
except (rpki.async.ExitNow, SystemExit):
raise
except Exception, e:
eb(e)
q_msg = self.proto.msg.query(*pdus)
q_cms = self.proto.cms_msg()
q_der = q_cms.wrap(q_msg, self.client_key, self.client_cert)
if self.debug:
print "<!-- Query -->"
print q_cms.pretty_print_content()
client(url = self.url, msg = q_der, callback = done, errback = eb)
|