|
1 """ |
|
2 Non-blocking fifo reads. |
|
3 """ |
|
4 |
|
5 import os |
|
6 import errno |
|
7 import fcntl |
|
8 |
|
9 import logging |
|
10 |
|
11 log = logging.getLogger('pvl.syslog.fifo') |
|
12 |
|
13 class Pipe (object) : |
|
14 """ |
|
15 A pipe from a fd. |
|
16 |
|
17 Supports reading lines in a non-blocking fashion. |
|
18 """ |
|
19 |
|
20 @classmethod |
|
21 def file (cls, file) : |
|
22 """ |
|
23 Create Pipe from file, e.g. sys.stdin. |
|
24 |
|
25 Puts fd into nonblocking mode, which means that the given file will stop working! |
|
26 """ |
|
27 |
|
28 fd = file.fileno() |
|
29 |
|
30 log.debug("%s: %s", file, fd) |
|
31 |
|
32 fl = fcntl.fcntl(fd, fcntl.F_GETFL) |
|
33 fl |= os.O_NONBLOCK |
|
34 fcntl.fcntl(fd, fcntl.F_SETFL, fl) |
|
35 |
|
36 return cls(fd) |
|
37 |
|
38 def __init__ (self, fd) : |
|
39 """ |
|
40 May pass fd=None to open as closed. |
|
41 """ |
|
42 |
|
43 self._fd = fd |
|
44 self._buf = '' |
|
45 |
|
46 log.debug("pipe: %d", fd) |
|
47 |
|
48 def open (self, fd) : |
|
49 """ |
|
50 re-open closed pipe to use the given fd. |
|
51 |
|
52 Raises ValueError if already open. |
|
53 """ |
|
54 |
|
55 if self._fd is None : |
|
56 self._fd = fd |
|
57 else : |
|
58 raise ValueError("%s: re-opening already open pipe: %s" % (self, fd)) |
|
59 |
|
60 # XXX: good idea? |
|
61 def __nonzero__ (self) : |
|
62 """ |
|
63 Test if we are open. |
|
64 |
|
65 XXX: signal EOF as well? |
|
66 """ |
|
67 |
|
68 return self._fd is not None |
|
69 |
|
70 def fileno (self) : |
|
71 """ |
|
72 Return the internal fd. |
|
73 |
|
74 Raises ValueError if we are closed. |
|
75 XXX: EOFError? |
|
76 """ |
|
77 |
|
78 if self._fd is None : |
|
79 raise ValueError("I/O operation on closed pipe: %s" % (self, )) |
|
80 else : |
|
81 return self._fd |
|
82 |
|
83 # XXX: this is almost identical to pvl.socket.ReadStream |
|
84 def read (self, n=512) : |
|
85 """ |
|
86 Read up to n bytes. |
|
87 |
|
88 Returns None if we would block. |
|
89 Raises EOFError on EOF, or closed. |
|
90 """ |
|
91 |
|
92 try : |
|
93 buf = os.read(self.fileno(), n) |
|
94 |
|
95 except OSError as ex : |
|
96 # block? |
|
97 if ex.errno == errno.EAGAIN : |
|
98 # empty |
|
99 buf = None |
|
100 |
|
101 else : |
|
102 raise |
|
103 |
|
104 log.debug("%s: %s", self, buf) |
|
105 |
|
106 if buf is None : |
|
107 return None |
|
108 elif buf : |
|
109 return buf |
|
110 else : |
|
111 raise EOFError() |
|
112 |
|
113 def readline (self) : |
|
114 """ |
|
115 Read and return next waiting line from input. |
|
116 |
|
117 Line is returned without trailing '\n'. |
|
118 |
|
119 Returns None if there is no line available. |
|
120 Raises EOFError if the fifo write end was closed. |
|
121 """ |
|
122 |
|
123 while '\n' not in self._buf : |
|
124 # read chunk |
|
125 read = self.read() |
|
126 |
|
127 if read is None : |
|
128 return None |
|
129 |
|
130 self._buf += read |
|
131 |
|
132 # split out one line |
|
133 line, self._buf = self._buf.split('\n', 1) |
|
134 |
|
135 log.debug("%s", line) |
|
136 |
|
137 return line |
|
138 |
|
139 def readlines (self) : |
|
140 """ |
|
141 Read any available input, yielding lines. |
|
142 |
|
143 Re-opens the FIFO on EOF. |
|
144 |
|
145 Returns None if there was no more input available, or the fifo was re-opened after EOF. |
|
146 """ |
|
147 |
|
148 while True : |
|
149 # pull line |
|
150 line = self.readline() |
|
151 |
|
152 if line : |
|
153 yield line |
|
154 else : |
|
155 return # block |
|
156 |
|
157 __iter__ = readlines |
|
158 |
|
159 def close (self) : |
|
160 """ |
|
161 Close our fd, if open. |
|
162 |
|
163 May be open()'d again. Meanwhile, all operatations will raise EOFError. |
|
164 |
|
165 log.warn's if already closed. |
|
166 """ |
|
167 |
|
168 if self._fd is None : |
|
169 log.warn("%s: already closed", self) |
|
170 |
|
171 else : |
|
172 log.debug("%s: %s", self, self._fd) |
|
173 |
|
174 os.close(self._fd) |
|
175 self._fd = None |
|
176 |
|
177 def __str__ (self) : |
|
178 return "pipe({self._fd})".format(self=self) |
|
179 |
|
180 class Fifo (Pipe) : |
|
181 """ |
|
182 A named pipe(7) on the filesystem. |
|
183 |
|
184 Supports reading lines in a non-blocking fashion, and re-opening on EOF. |
|
185 """ |
|
186 |
|
187 def __init__ (self, path) : |
|
188 self.path = path |
|
189 Pipe.__init__(self, self._open()) |
|
190 |
|
191 def _open (self) : |
|
192 """ |
|
193 Open the internal fd (nonblocking). |
|
194 """ |
|
195 |
|
196 fd = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK) |
|
197 |
|
198 log.debug("%s: open: %s", self, fd) |
|
199 |
|
200 return fd |
|
201 |
|
202 def open (self) : |
|
203 """ |
|
204 Re-open the FIFO. |
|
205 |
|
206 Used when the writing end was closed, and read gave EOF. Opening the fifo again will clear the EOF condition, |
|
207 and resume nonblocking mode. |
|
208 |
|
209 Raises ValueError() if already open. close() first. |
|
210 """ |
|
211 |
|
212 Pipe.open(self, self._open()) |
|
213 |
|
214 def readlines (self) : |
|
215 """ |
|
216 Read any available input, yielding lines. |
|
217 |
|
218 Re-opens the FIFO on EOF. |
|
219 |
|
220 Returns None if there was no more input available, or the fifo was re-opened after EOF. |
|
221 """ |
|
222 |
|
223 while True : |
|
224 try : |
|
225 # pull line |
|
226 line = self.readline() |
|
227 |
|
228 except EOFError : |
|
229 log.debug("%s: EOF: reopen", self) |
|
230 |
|
231 # reopen and go back to waiting |
|
232 self.close() |
|
233 self.open() |
|
234 |
|
235 return |
|
236 |
|
237 if line is None : |
|
238 log.debug("%s: EOF: wait", self) |
|
239 return # wait |
|
240 else : |
|
241 yield line |
|
242 |
|
243 __iter__ = readlines |
|
244 |
|
245 def __str__ (self) : |
|
246 return self.path |
|
247 |
|
248 # XXX: we need to figure out what references we have lying around, and clean those out! |
|
249 def __del__ (self) : |
|
250 """ |
|
251 Cleanup |
|
252 """ |
|
253 |
|
254 if self._fd is not None : |
|
255 self.close() |
|
256 |