|
1 """ |
|
2 A simple TCP client in the kind of syslog.fifo/file. |
|
3 |
|
4 Interface: fileno(), __iter__, __call__ |
|
5 """ |
|
6 |
|
7 # XXX: absolute import plz |
|
8 socket = __import__('socket') |
|
9 |
|
10 import select |
|
11 |
|
12 import urlparse |
|
13 |
|
14 import logging; log = logging.getLogger('pvl.socket') |
|
15 |
|
16 URL = { |
|
17 'tcp': (0, socket.SOCK_STREAM ), # AF_UNSPEC |
|
18 'udp': (0, socket.SOCK_DGRAM ), # AF_UNSPEC |
|
19 'unix': (socket.AF_UNIX, None ), # socktype is given |
|
20 } |
|
21 |
|
22 def parse (str, port=None, scheme='tcp', unix=socket.SOCK_DGRAM) : |
|
23 """ |
|
24 Parse given string into (AF_*, SOCK_*, host, port). |
|
25 |
|
26 For AF_UNIX, the path is in host, and port is empty, and the socktype is the given unix=... value. |
|
27 """ |
|
28 |
|
29 family, socktype = URL[scheme] |
|
30 url = urlparse.urlparse(str) |
|
31 |
|
32 # TODO: UNIX? |
|
33 if url.scheme and url.netloc : |
|
34 # proper url |
|
35 family, socktype = URL[url.scheme] |
|
36 |
|
37 return family, socktype, url.hostname, url.port or port |
|
38 |
|
39 elif url.scheme and url.path : |
|
40 # host:port |
|
41 return family, socktype, url.scheme, int(url.path) |
|
42 |
|
43 elif url.path : |
|
44 # host |
|
45 return family, socktype, url.path, port |
|
46 |
|
47 else : |
|
48 raise ValueError("unparseable connect URL: %s", str) |
|
49 |
|
50 def connect (str, *args, **kwargs) : |
|
51 """ |
|
52 Returns a connected socket for given parse()'d string. |
|
53 """ |
|
54 |
|
55 family, socktype, host, port = parse(str, *args, **kwargs) |
|
56 |
|
57 if family == socket.AF_UNIX : |
|
58 raise ValueError("XXX: AF_UNIX is not yet supported", str) |
|
59 else : # AF_UNSPEC |
|
60 return connect_inet(host, port, family=family, socktype=socktype) |
|
61 |
|
62 def connect_inet (host=None, port=None, family=socket.AF_UNSPEC, socktype=socket.SOCK_STREAM) : |
|
63 """ |
|
64 Return a TCP/UDP socket connected to the given host/port using getaddrinfo. |
|
65 |
|
66 TODO: timeout? |
|
67 """ |
|
68 |
|
69 log.debug("%s:%s: family=%s, socktype=%s", host, port, family, socktype) |
|
70 |
|
71 if host : |
|
72 flags = socket.AI_CANONNAME |
|
73 else : |
|
74 flags = 0 |
|
75 |
|
76 addrinfo = socket.getaddrinfo(host, port, family, socktype, 0, flags) |
|
77 |
|
78 if not addrinfo : |
|
79 raise Exception("getaddrinfo: %s:%s: no results" % (host, port)) |
|
80 |
|
81 for af, st, proto, name, addr in addrinfo : |
|
82 try : |
|
83 sock = socket.socket(af, st, proto) |
|
84 |
|
85 except socket.error as error : |
|
86 log.warning("%s:%s: socket: %s", host, port, error) |
|
87 continue |
|
88 |
|
89 log.debug("%s:%s: socket: %s", host, port, sock) |
|
90 |
|
91 try : |
|
92 sock.connect(addr) |
|
93 |
|
94 except socket.error as error : |
|
95 log.warning("%s:%s: connect: %s", host, port, error) |
|
96 continue |
|
97 |
|
98 log.debug("%s:%s: connect", host, port) |
|
99 log.info("%s", name) |
|
100 |
|
101 return sock |
|
102 |
|
103 else : |
|
104 raise Exception("Unable to connect: %s:%s: %s" % (host, port, error)) |
|
105 |
|
106 def reverse (self, sockaddr, numeric_host=False, numeric_port=True) : |
|
107 """ |
|
108 Resolve given sockaddr, returning (host, port). |
|
109 """ |
|
110 |
|
111 flags = 0 |
|
112 |
|
113 if numeric_host : |
|
114 flags |= socket.NI_NUMERICHOST |
|
115 |
|
116 if numeric_port : |
|
117 flags |= socket.NI_NUMERICSERV |
|
118 |
|
119 return socket.getnameinfo(sockaddr, flags) |
|
120 |
|
121 def nonblocking (call, *args, **kwargs) : |
|
122 """ |
|
123 Call the given function, which read/writes on a nonblocking file, and return None if it would have blocked. |
|
124 """ |
|
125 |
|
126 try : |
|
127 return call(*args, **kwargs) |
|
128 |
|
129 except socket.error as ex : |
|
130 # block? |
|
131 if ex.errno == errno.EAGAIN or ex.errno == errno.EWOULDBLOCK: |
|
132 # empty |
|
133 return None |
|
134 |
|
135 else : |
|
136 raise |
|
137 |
|
138 class ReadStream (object) : |
|
139 """ |
|
140 Buffered stream, supporting non-blocking/line-based reads. |
|
141 """ |
|
142 |
|
143 BLOCK=512 |
|
144 |
|
145 def __init__ (self, sock, buffer=None) : |
|
146 """ |
|
147 TODO: buffer - maximum line length |
|
148 """ |
|
149 |
|
150 self.sock = sock |
|
151 self._buf = '' |
|
152 |
|
153 def fileno (self) : |
|
154 return self.sock.fileno() |
|
155 |
|
156 def _read (self, block=BLOCK) : |
|
157 """ |
|
158 Read up to n bytes from socket. |
|
159 |
|
160 Returns None if we would block. |
|
161 Raises EOFError on EOF. |
|
162 """ |
|
163 |
|
164 buf = nonblocking(self.sock.recv, block) |
|
165 |
|
166 # eof? |
|
167 if not buf : |
|
168 raise EOFError() |
|
169 |
|
170 # ok |
|
171 return buf |
|
172 |
|
173 def peek (self) : |
|
174 """ |
|
175 Peek at data in buffer. |
|
176 """ |
|
177 |
|
178 return self._buf |
|
179 |
|
180 def read (self) : |
|
181 """ |
|
182 Read and return any available input. |
|
183 |
|
184 Returns None if blocking. |
|
185 """ |
|
186 |
|
187 if self._buf : |
|
188 buf, self._buf = self._buf, '' |
|
189 |
|
190 else : |
|
191 buf = self._read() |
|
192 |
|
193 return buf |
|
194 |
|
195 def readline (self) : |
|
196 """ |
|
197 Read and return next waiting line from input. |
|
198 |
|
199 Line is returned without trailing '\r\n' or '\n'. |
|
200 |
|
201 Returns None if there is no line available. |
|
202 """ |
|
203 |
|
204 while '\n' not in self._buf : |
|
205 # read chunk |
|
206 read = self._read() |
|
207 |
|
208 if read is None : |
|
209 return None |
|
210 |
|
211 self._buf += read |
|
212 |
|
213 # split out one line |
|
214 line, self._buf = self._buf.split('\n', 1) |
|
215 |
|
216 # in case we had \r\n |
|
217 line = line.rstrip('\r') |
|
218 |
|
219 return line |
|
220 |
|
221 def readlines (self) : |
|
222 """ |
|
223 Read any available input, yielding lines. |
|
224 |
|
225 Returns None if thre is no more input available. |
|
226 |
|
227 Raises EOFError in the socket was closed. |
|
228 """ |
|
229 |
|
230 while True : |
|
231 line = self.readline() |
|
232 |
|
233 if line is None : |
|
234 # no more |
|
235 return |
|
236 else : |
|
237 yield line |
|
238 |
|
239 __iter__ = readlines |
|
240 |
|
241 class WriteStream (object) : |
|
242 """ |
|
243 Writable stream, supporting non-blocking/buffered writes. |
|
244 |
|
245 XXX: buffering is completely untested |
|
246 """ |
|
247 |
|
248 EOL = '\n' |
|
249 |
|
250 def __init__ (self, sock, buffer=None) : |
|
251 """ |
|
252 TODO: buffer - maximum outgoing buffer length |
|
253 """ |
|
254 |
|
255 self.sock = sock |
|
256 self._buf = buffer |
|
257 |
|
258 def _write (self, buf) : |
|
259 """ |
|
260 Write given data to socket, returning the number of bytes written, or None, if buffering is enabled. |
|
261 """ |
|
262 |
|
263 send = nonblocking(self.sock.send, buf) |
|
264 |
|
265 # eof on write? |
|
266 if send : |
|
267 # ok, message (partially) written |
|
268 return send |
|
269 |
|
270 else : |
|
271 # XXX: how do we handle this? What does it actually mean? |
|
272 # handle as a wouldblock... |
|
273 return None |
|
274 |
|
275 |
|
276 def write (self, data) : |
|
277 """ |
|
278 Write given data to socket. |
|
279 |
|
280 TODO: buffer small chunks -> select writable -> write? |
|
281 |
|
282 Buffers if not able to write, or raises EOFError (hah!) |
|
283 """ |
|
284 |
|
285 if not self._buf : |
|
286 # write directly |
|
287 while data : |
|
288 write = self._write(data) |
|
289 |
|
290 if write : |
|
291 # remaining data |
|
292 data = data[write:] |
|
293 |
|
294 else : |
|
295 # cannot write more |
|
296 break |
|
297 |
|
298 if not data : |
|
299 # sent |
|
300 return |
|
301 |
|
302 if self._buf is None : |
|
303 # no write buffering, and socket buffer full! |
|
304 raise EOFError() |
|
305 |
|
306 # append to outgoing buffer |
|
307 self._buf += data |
|
308 |
|
309 def writeline (self, line, eol=EOL) : |
|
310 """ |
|
311 Write out line. |
|
312 """ |
|
313 |
|
314 self.write(str(line) + eol) |
|
315 |
|
316 def __call__ (self, *lines) : |
|
317 for line in lines : |
|
318 self.writeline(line) |
|
319 |
|
320 # TODO: flush |