equal
deleted
inserted
replaced
1 # read a stream from a fifo |
|
2 |
|
3 from twisted.internet import reactor, interfaces |
|
4 from twisted.python import log |
|
5 from zope.interface import implements |
|
6 |
|
7 import os, fcntl, errno |
|
8 |
|
9 class EOF (Exception) : pass |
|
10 |
|
11 BUF_SIZE = 2048 |
|
12 |
|
13 class Fifo (object) : |
|
14 implements(interfaces.IReadDescriptor) |
|
15 |
|
16 def __init__ (self, path) : |
|
17 self.path = path |
|
18 self.fd = None |
|
19 |
|
20 self._open() |
|
21 |
|
22 def _open (self) : |
|
23 self.fd = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK) |
|
24 |
|
25 reactor.addReader(self) |
|
26 |
|
27 def close (self) : |
|
28 if self.fd : |
|
29 reactor.removeReader(self) |
|
30 os.close(self.fd) |
|
31 |
|
32 self.fd = None |
|
33 |
|
34 def reopen (self) : |
|
35 """ |
|
36 Close and re-open the fifo. This is useful for handling EOF |
|
37 """ |
|
38 self.close() |
|
39 self._open() |
|
40 |
|
41 def _read (self, length) : |
|
42 |
|
43 try : |
|
44 data = os.read(self.fd, length) |
|
45 |
|
46 except OSError, e : |
|
47 if e.errno == errno.EAGAIN : |
|
48 return None |
|
49 else : |
|
50 raise |
|
51 |
|
52 if not data : |
|
53 raise EOF() |
|
54 |
|
55 return data |
|
56 |
|
57 def fileno (self) : |
|
58 return self.fd |
|
59 |
|
60 def doRead (self) : |
|
61 while True : |
|
62 try : |
|
63 data = self._read(BUF_SIZE) |
|
64 except EOF : |
|
65 self.handleEOF() |
|
66 return |
|
67 |
|
68 if data : |
|
69 self.dataReceived(data) |
|
70 else : |
|
71 break |
|
72 |
|
73 def dataReceived (self, data) : |
|
74 pass |
|
75 |
|
76 def handleEOF (self) : |
|
77 pass |
|
78 |
|
79 def connectionLost (self, reason) : |
|
80 self.close() |
|
81 |
|
82 def logPrefix (self) : |
|
83 return "fifo(%s)" % (self.path, ) |
|
84 |
|
85 def __del__ (self) : |
|
86 """ |
|
87 !!! this is important |
|
88 """ |
|
89 self.close() |
|
90 |
|