1 """ |
|
2 A simple TCP client in the kind of syslog.fifo/file. |
|
3 |
|
4 Interface: fileno(), __iter__, __call__ |
|
5 """ |
|
6 |
|
7 # XXX: absolute import plz |
|
8 socket = __import__('socket') |
|
9 |
|
10 import select |
|
11 import errno |
|
12 |
|
13 import urlparse |
|
14 |
|
15 import logging; log = logging.getLogger('pvl.socket') |
|
16 |
|
17 # order matters! |
|
18 URL = ( |
|
19 # scheme family socktype |
|
20 ( 'unix', (socket.AF_UNIX, None ) ), # socktype is given |
|
21 ( 'tcp', (0, socket.SOCK_STREAM ) ), # AF_UNSPEC |
|
22 ( 'udp', (0, socket.SOCK_DGRAM ) ), # AF_UNSPEC |
|
23 ) |
|
24 |
|
25 URL_SCHEMES = dict(URL) |
|
26 |
|
27 def parse (str, port=None, scheme='tcp', unix=socket.SOCK_DGRAM) : |
|
28 """ |
|
29 Parse given string into (AF_*, SOCK_*, host, port). |
|
30 |
|
31 For AF_UNIX, the path is in host, and port is empty, and the socktype is the given unix=... value. |
|
32 """ |
|
33 |
|
34 family, socktype = URL_SCHEMES[scheme] |
|
35 url = urlparse.urlparse(str) |
|
36 |
|
37 # TODO: UNIX? |
|
38 if url.scheme and url.netloc : |
|
39 # proper url |
|
40 family, socktype = URL_SCHEMES[url.scheme] |
|
41 |
|
42 return family, socktype, url.hostname, url.port or port |
|
43 |
|
44 elif url.scheme and url.path : |
|
45 # host:port |
|
46 return family, socktype, url.scheme, int(url.path) |
|
47 |
|
48 elif url.path : |
|
49 # host |
|
50 return family, socktype, url.path, port |
|
51 |
|
52 else : |
|
53 raise ValueError("unparseable connect URL: %s", str) |
|
54 |
|
55 def connect (str, *args, **kwargs) : |
|
56 """ |
|
57 Returns a connected socket for given parse()'d string. |
|
58 """ |
|
59 |
|
60 family, socktype, host, port = parse(str, *args, **kwargs) |
|
61 |
|
62 if family == socket.AF_UNIX : |
|
63 raise ValueError("XXX: AF_UNIX is not yet supported", str) |
|
64 |
|
65 else : # AF_UNSPEC |
|
66 return connect_inet(host, port, family=family, socktype=socktype) |
|
67 |
|
68 def connect_inet (host=None, port=None, family=socket.AF_UNSPEC, socktype=socket.SOCK_STREAM) : |
|
69 """ |
|
70 Return a TCP/UDP socket connected to the given host/port using getaddrinfo. |
|
71 |
|
72 TODO: timeout? |
|
73 """ |
|
74 |
|
75 log.debug("%s:%s: family=%s, socktype=%s", host, port, family, socktype) |
|
76 |
|
77 if host : |
|
78 flags = socket.AI_CANONNAME |
|
79 else : |
|
80 flags = 0 |
|
81 |
|
82 addrinfo = socket.getaddrinfo(host, port, family, socktype, 0, flags) |
|
83 |
|
84 if not addrinfo : |
|
85 raise Exception("getaddrinfo: %s:%s: no results" % (host, port)) |
|
86 |
|
87 for af, st, proto, name, addr in addrinfo : |
|
88 try : |
|
89 sock = socket.socket(af, st, proto) |
|
90 |
|
91 except socket.error as error : |
|
92 log.warning("%s:%s: socket: %s", host, port, error) |
|
93 continue |
|
94 |
|
95 log.debug("%s:%s: socket: %s", host, port, sock) |
|
96 |
|
97 try : |
|
98 sock.connect(addr) |
|
99 |
|
100 except socket.error as error : |
|
101 log.warning("%s:%s: connect: %s", host, port, error) |
|
102 continue |
|
103 |
|
104 log.debug("%s:%s: connect", host, port) |
|
105 log.info("%s", name) |
|
106 |
|
107 return sock |
|
108 |
|
109 else : |
|
110 raise Exception("Unable to connect: %s:%s: %s" % (host, port, error)) |
|
111 |
|
112 def reverse (sockaddr, numeric_host=False, numeric_port=True) : |
|
113 """ |
|
114 Resolve given sockaddr, returning (host, port). |
|
115 """ |
|
116 |
|
117 flags = 0 |
|
118 |
|
119 if numeric_host : |
|
120 flags |= socket.NI_NUMERICHOST |
|
121 |
|
122 if numeric_port : |
|
123 flags |= socket.NI_NUMERICSERV |
|
124 |
|
125 return socket.getnameinfo(sockaddr, flags) |
|
126 |
|
127 def socket_str (sock) : |
|
128 # get connected peer |
|
129 try : |
|
130 peer = sock.getpeername() |
|
131 |
|
132 except socket.error as ex : |
|
133 # fails if socket is not connected XXX: even after EOF on read..? |
|
134 return str(ex) |
|
135 |
|
136 # lookup scheme |
|
137 for scheme, (family, socktype) in URL : |
|
138 if family and family != sock.family : |
|
139 continue |
|
140 elif socktype and socktype != sock.type : |
|
141 continue |
|
142 else : |
|
143 break |
|
144 else : |
|
145 scheme = None |
|
146 |
|
147 host, port = reverse(peer) |
|
148 |
|
149 if scheme : |
|
150 return "{scheme}://{host}:{port}".format(scheme=scheme, host=host, port=port) |
|
151 else : |
|
152 return "{host}:{port}".format(host=host, port=port) |
|
153 |
|
154 def nonblocking (call, *args, **kwargs) : |
|
155 """ |
|
156 Call the given function, which read/writes on a nonblocking file, and return None if it would have blocked. |
|
157 |
|
158 Raises EOFError on SIGPIPE/EPIPE. |
|
159 |
|
160 # XXX: does python handle SIGPIPE for us? |
|
161 """ |
|
162 |
|
163 try : |
|
164 return call(*args, **kwargs) |
|
165 |
|
166 except socket.error as ex : |
|
167 # block? |
|
168 if ex.errno == errno.EAGAIN or ex.errno == errno.EWOULDBLOCK: |
|
169 # empty |
|
170 return None |
|
171 |
|
172 elif ex.errno == errno.EPIPE : |
|
173 # XXX: write-eof? |
|
174 raise EOFError() |
|
175 |
|
176 else : |
|
177 raise |
|
178 |
|
179 class ReadStream (object) : |
|
180 """ |
|
181 Buffered stream, supporting non-blocking/line-based reads. |
|
182 """ |
|
183 |
|
184 BLOCK=512 |
|
185 |
|
186 def __init__ (self, sock, buffer=None) : |
|
187 """ |
|
188 TODO: buffer - maximum line length |
|
189 """ |
|
190 |
|
191 self.sock = sock |
|
192 self._buf = '' |
|
193 |
|
194 def fileno (self) : |
|
195 return self.sock.fileno() |
|
196 |
|
197 def _read (self, block=BLOCK) : |
|
198 """ |
|
199 Read up to n bytes from socket. |
|
200 |
|
201 Returns None if we would block. |
|
202 Raises EOFError on EOF. |
|
203 """ |
|
204 |
|
205 buf = nonblocking(self.sock.recv, block) |
|
206 |
|
207 log.debug("%s: %s", self, buf) |
|
208 |
|
209 if buf is None : |
|
210 return None |
|
211 elif buf : |
|
212 return buf |
|
213 else : |
|
214 raise EOFError() |
|
215 |
|
216 def peek (self) : |
|
217 """ |
|
218 Peek at data in buffer. |
|
219 """ |
|
220 |
|
221 return self._buf |
|
222 |
|
223 def read (self) : |
|
224 """ |
|
225 Read and return any available input. |
|
226 |
|
227 Returns None if blocking. |
|
228 """ |
|
229 |
|
230 if self._buf : |
|
231 buf, self._buf = self._buf, '' |
|
232 |
|
233 else : |
|
234 buf = self._read() |
|
235 |
|
236 return buf |
|
237 |
|
238 def readline (self) : |
|
239 """ |
|
240 Read and return next waiting line from input. |
|
241 |
|
242 Line is returned without trailing '\r\n' or '\n'. |
|
243 |
|
244 Returns None if there is no line available. |
|
245 |
|
246 XXX: trailing data in buf when _read() raises EOFError? |
|
247 """ |
|
248 |
|
249 while '\n' not in self._buf : |
|
250 # read chunk |
|
251 read = self._read() |
|
252 |
|
253 if read is None : |
|
254 return None |
|
255 |
|
256 self._buf += read |
|
257 |
|
258 # split out one line |
|
259 line, self._buf = self._buf.split('\n', 1) |
|
260 |
|
261 # in case we had \r\n |
|
262 line = line.rstrip('\r') |
|
263 |
|
264 log.debug("%s: %s", self, line) |
|
265 |
|
266 return line |
|
267 |
|
268 def readlines (self) : |
|
269 """ |
|
270 Read any available input, yielding lines. |
|
271 |
|
272 Returns None if thre is no more input available. |
|
273 |
|
274 Raises EOFError in the socket was closed. |
|
275 """ |
|
276 |
|
277 while True : |
|
278 line = self.readline() |
|
279 |
|
280 if line is None : |
|
281 return |
|
282 else : |
|
283 yield line |
|
284 |
|
285 __iter__ = readlines |
|
286 |
|
287 def __str__ (self) : |
|
288 return socket_str(self.sock) |
|
289 |
|
290 class WriteStream (object) : |
|
291 """ |
|
292 Writable stream, supporting non-blocking/buffered writes. |
|
293 |
|
294 XXX: buffering is completely untested |
|
295 """ |
|
296 |
|
297 EOL = '\n' |
|
298 |
|
299 def __init__ (self, sock, buffer=None) : |
|
300 """ |
|
301 TODO: buffer - maximum outgoing buffer length |
|
302 """ |
|
303 |
|
304 self.sock = sock |
|
305 self._buf = buffer |
|
306 |
|
307 def _write (self, buf) : |
|
308 """ |
|
309 Write given data to socket, returning the number of bytes written, or None, if buffering is enabled. |
|
310 """ |
|
311 |
|
312 send = nonblocking(self.sock.send, buf) |
|
313 |
|
314 # eof on write? |
|
315 if send is None : |
|
316 return None |
|
317 |
|
318 elif send : |
|
319 # ok, message (partially) written |
|
320 return send |
|
321 |
|
322 else : |
|
323 # XXX: zero-length send? how do we handle this? What does it actually mean? |
|
324 # handle as a wouldblock... |
|
325 return None |
|
326 |
|
327 def write (self, data) : |
|
328 """ |
|
329 Write given data to socket. |
|
330 |
|
331 TODO: buffer small chunks -> select writable -> write? |
|
332 |
|
333 Buffers if not able to write, or raises EOFError (hah!) |
|
334 """ |
|
335 |
|
336 if not self._buf : |
|
337 # write directly |
|
338 while data : |
|
339 write = self._write(data) |
|
340 |
|
341 if write : |
|
342 # remaining data |
|
343 data = data[write:] |
|
344 |
|
345 else : |
|
346 # cannot write more |
|
347 break |
|
348 |
|
349 if not data : |
|
350 # sent |
|
351 return |
|
352 |
|
353 if self._buf is None : |
|
354 # no write buffering, and socket buffer full! |
|
355 raise EOFError() |
|
356 |
|
357 # append to outgoing buffer |
|
358 self._buf += data |
|
359 |
|
360 def writeline (self, line, eol=EOL) : |
|
361 """ |
|
362 Write out line. |
|
363 """ |
|
364 |
|
365 log.debug("%s: %s", self, line) |
|
366 |
|
367 self.write(str(line)) |
|
368 self.write(eol) |
|
369 |
|
370 def __call__ (self, *lines) : |
|
371 for line in lines : |
|
372 self.writeline(line) |
|
373 |
|
374 # TODO: flush |
|
375 |
|
376 def __str__ (self) : |
|
377 return socket_str(self.sock) |
|
378 |
|
379 |
|