examples/nc.py
changeset 58 ecd89f9605ac
parent 55 99c4344a35ce
equal deleted inserted replaced
57:8c4032265c8c 58:ecd89f9605ac
     9 from qmsk.net.lib.event2 import base, event
     9 from qmsk.net.lib.event2 import base, event
    10 from qmsk.net.lib.event2.constants import *
    10 from qmsk.net.lib.event2.constants import *
    11 from qmsk.net.lib.event2.event import CallbackEvent as cb_event
    11 from qmsk.net.lib.event2.event import CallbackEvent as cb_event
    12 
    12 
    13 import sys, os, fcntl, errno
    13 import sys, os, fcntl, errno
    14 import optparse
    14 import optparse, logging
    15 
    15 
    16 # global options
    16 # global options
    17 options = None
    17 options = None
    18 
    18 
    19 # global event_base
    19 # global event_base
    20 ev_base = base.event_base()
    20 ev_base = base.event_base()
    21 
    21 
    22 # buffer size to use, 4k - perhaps slightly smaller
    22 # buffer size to use, 4k - perhaps slightly smaller
    23 BUFSIZE = 4 * 1024
    23 BUFSIZE = 4 * 1024
       
    24 
       
    25 # root logger
       
    26 log = logging.getLogger()
       
    27 
    24 
    28 
    25 def parse_argv (argv) :
    29 def parse_argv (argv) :
    26     global options
    30     global options
    27 
    31 
    28     prog = argv.pop(0)
    32     prog = argv.pop(0)
    31 
    35 
    32     parser.add_option('-4', "--ipv4", help="Force AF_INET", action='store_true')
    36     parser.add_option('-4', "--ipv4", help="Force AF_INET", action='store_true')
    33     parser.add_option('-6', "--ipv6", help="Force AF_INET6", action='store_true')
    37     parser.add_option('-6', "--ipv6", help="Force AF_INET6", action='store_true')
    34     parser.add_option('-v', "--verbose", help="Display status output", action='store_true')
    38     parser.add_option('-v', "--verbose", help="Display status output", action='store_true')
    35     parser.add_option('-d', "--debug", help="Display extra output", action='store_true')
    39     parser.add_option('-d', "--debug", help="Display extra output", action='store_true')
       
    40     parser.add_option('-c', "--connect", help="Act in client mode (default)", action='store_true')
       
    41     parser.add_option('-l', "--listen", help="Act in server mode", action='store_true')
    36     parser.add_option('-w', "--timeout", help="Timeout for connect()", type='float')
    42     parser.add_option('-w', "--timeout", help="Timeout for connect()", type='float')
    37 
    43 
    38     options, args = parser.parse_args(argv)
    44     options, args = parser.parse_args(argv)
    39 
    45 
    40     if options.ipv4 and options.ipv6 :
    46     if options.ipv4 and options.ipv6 :
    41         raise Exception("-4 and -6 are mutually exclusive!")
    47         raise Exception("-4 and -6 are mutually exclusive")
    42 
    48 
    43     if options.debug :
    49     if options.connect and options.listen :
    44         # enable both
    50         raise Exception("-c and -l are mutually exclusive")
    45         options.verbose = True
       
    46 
    51 
    47     return args
    52     return args
    48 
       
    49 def log_msg (prefix, msg, *args) :
       
    50     if args :
       
    51         msg = msg % args
       
    52     
       
    53     sys.stderr.write("%s %s\n" % (prefix, msg))
       
    54 
       
    55 def log_err (msg, *args) :
       
    56     log_msg('!!!', msg, *args)
       
    57 
       
    58 def log_warn (msg, *args) :
       
    59     log_msg('+++', msg, *args)
       
    60 
       
    61 def log_info (msg, *args) :
       
    62     if options.verbose :
       
    63         log_msg('***', msg, *args)
       
    64 
       
    65 def log_debug (msg, *args) :
       
    66     if options.debug :
       
    67         log_msg('---', msg, *args)
       
    68 
    53 
    69 
    54 
    70 def setnonblocking (file) :
    55 def setnonblocking (file) :
    71     """
    56     """
    72         Set the non-blocking IO flag on the given file object
    57         Set the non-blocking IO flag on the given file object
    79         Try and perform a series of non-blocking connects to the given host:port using the given family and socktype,
    64         Try and perform a series of non-blocking connects to the given host:port using the given family and socktype,
    80         yielding a series of socket objects.
    65         yielding a series of socket objects.
    81     """
    66     """
    82 
    67 
    83     for ai in address.getaddrinfo(host, port, family, socktype) :
    68     for ai in address.getaddrinfo(host, port, family, socktype) :
    84         log_info("sock_connect: ai: %s", ai)
    69         log.debug("Attempting connect() to: %s", ai)
    85 
    70 
    86         # build socket
    71         # build socket
    87         try :
    72         try :
    88             # construct
    73             # construct
    89             log_debug("sock_connect: socket(%d, %d, %d)", ai.family, ai.socktype, ai.protocol)
    74             log.debug("socket(%d, %d, %d)", ai.family, ai.socktype, ai.protocol)
    90             sock = socket.socket(ai.family, ai.socktype, ai.protocol)
    75             sock = socket.socket(ai.family, ai.socktype, ai.protocol)
    91             log_debug("sock_connect: socket=%s", sock)
       
    92             
    76             
    93             # set nonblock mode
    77             # set nonblock mode
    94             log_debug("sock_connect: setnonblocking(%s)", sock)
    78             log.debug("sock=%s: setnonblocking()", sock)
    95             setnonblocking(sock)
    79             setnonblocking(sock)
    96             
    80             
    97             # start connect
    81             # start connect
    98             try :
    82             try :
    99                 log_debug("sock_connect: connect(%s)", ai.addr)
    83                 log.debug("sock=%s: connect(%s)", sock, ai.addr)
   100                 sock.connect(ai.addr)
    84                 sock.connect(ai.addr)
   101 
    85             
       
    86             # XXX: socket.error
   102             except OSError, e :
    87             except OSError, e :
       
    88                 # this should return EINPROGRESS
   103                 if e.errno != errno.EINPROGRESS :
    89                 if e.errno != errno.EINPROGRESS :
   104                     raise
    90                     raise
   105             
    91             
   106             else :
    92             else :
   107                 # XXX: wut???
    93                 # wut???
   108                 log_warn("sock_connect: connect: didn't return EINPROGRESS")
    94                 log.warning("Connect was blocking")
   109 
    95 
   110         except OSError, e :
    96         except OSError, e :
   111             # fsck
    97             # fsck
   112             log_warn("sock_connect: %s: %s", ai.addr, e)
    98             log.warning("Failed to connect to %s: %s", ai.addr, e)
   113 
    99 
   114         else :
   100         else :
   115             # yay
   101             # yay
       
   102             log.info("Connecting to %s...", ai.addr)
       
   103 
   116             yield sock
   104             yield sock
   117 
   105 
       
   106 def sock_listen (port, family, socktype) :
       
   107     """
       
   108         Try and bind onto the given local service port, yielding a series of listen()'ing socket objects
       
   109     """
       
   110 
       
   111     for ai in address.getaddrinfo(None, port, family, socktype) :
       
   112         log.debug("Binding on: %s", ai)
       
   113 
       
   114         # build socket
       
   115         try :
       
   116             sock = socket.socket(ai.family, ai.socktype, ai.protocol)
       
   117 
       
   118             setnonblocking(sock)
       
   119 
       
   120             # bind local
       
   121             sock.bind(ai.addr)
       
   122 
       
   123             # listen
       
   124             sock.listen(5)
       
   125 
       
   126         except OSError, e :
       
   127             # fsck, next
       
   128             log.warning("Failed to listen on %s: %s", ai.addr, e)
       
   129 
       
   130         else :
       
   131             # yay
       
   132             log.debug("Listening on %s", ai.addr)
       
   133 
       
   134             yield sock
   118 
   135 
   119 def client_connect_next (sock_iter) :
   136 def client_connect_next (sock_iter) :
   120     """
   137     """
   121         Attempt to run the given connect operation, on the given iterable of sockets.
   138         Given an interable of sockets in the non-blocking connect() state, will attempt to wait for each, proceeding to 
       
   139         client_connected when one succeeds.
   122     """
   140     """
   123 
   141 
   124     for sock in sock_iter :
   142     for sock in sock_iter :
   125         # pend for writing
   143         # pend for writing
   126         log_debug("client_connect_next: cb_event(%d, EV_WRITE, on_connect, %r)", sock.fd, sock)
   144         log.debug("cb_event(%d, EV_WRITE, on_connect, %r)", sock.fd, sock)
   127         ev = cb_event(ev_base, sock.fd, EV_WRITE, on_connect, sock, sock_iter)
   145         ev = cb_event(ev_base, sock.fd, EV_WRITE, on_connect, sock, sock_iter)
   128 
   146 
   129         # wait specified timeout
   147         # wait specified timeout
   130         log_debug("client_connect_next: %r: add(%s)", ev, options.timeout)
   148         log.debug("ev=%r: add(timeout=%s)", ev, options.timeout)
   131         ev.add(options.timeout)
   149         ev.add(options.timeout)
   132 
   150 
   133         # ok
   151         # ok
   134         break
   152         break
   135 
   153 
   136     else :
   154     else :
   137         # fail, ran out of addresses to try
   155         # fail, ran out of addresses to try
   138         log_err("client_connect_next: ran out of addresses to try")
   156         log.error("Unable to connect")
   139 
       
   140 
   157 
   141 def on_connect (ev, events, sock, sock_iter) :
   158 def on_connect (ev, events, sock, sock_iter) :
   142     """
   159     """
   143         Outbound connect EV_WRITE callback, i.e. connection failed or was established
   160         Outbound connect EV_WRITE callback, i.e. connection failed or was established
   144     """
   161     """
   145 
   162 
   146     log_debug("on_connect: ev=%r, events=%#x", ev, events)
   163     log.debug("ev=%r: events=%#04x", ev, events)
   147 
   164 
   148     # test for timeout
   165     # test for timeout
   149     if events & EV_TIMEOUT :
   166     if events & EV_TIMEOUT :
   150         log_warn("on_connect: connect failed, timeout")
   167         log.warning("Connection to %s timed out", sock.getpeername())
   151 
   168 
   152         # keep trying
   169         # keep trying
   153         return client_connect_next(sock_iter)
   170         return client_connect_next(sock_iter)
   154 
   171 
   155 
   172 
   156     # test for errno
   173     # test for errno
   157     err = sock.getsockopt_int(SOL_SOCKET, SO_ERROR)
   174     err = sock.getsockopt_int(SOL_SOCKET, SO_ERROR)
   158 
   175 
   159     if err :
   176     if err :
   160         # fail
   177         # fail
   161         log_warn("on_connect: connect failed, errno=%d", err)
   178         log.warning("Connection to %s failed: %d", sock.getpeername(), err)
   162         
   179         
   163         # keep trying
   180         # keep trying
   164         return client_connect_next(sock_iter)
   181         return client_connect_next(sock_iter)
   165 
   182 
   166 
   183 
   171 def client_connected (sock) :
   188 def client_connected (sock) :
   172     """
   189     """
   173         Connection established, start shovelling data
   190         Connection established, start shovelling data
   174     """
   191     """
   175 
   192 
   176     log_info("Client connected to %s (from %s)", sock.getpeername(), sock.getsockname())
   193     log.info("Connected to %s (from %s)", sock.getpeername(), sock.getsockname())
   177     
   194     
   178     # go!
   195     # go!
   179     start_sock(sock, sys.stdin.fileno(), sys.stdout.fileno())
   196     start_sock(sock)
   180 
   197 
   181 
   198 
   182 def start_sock (sock, fd_in, fd_out) :
   199 def server_start_accept (sock) :
       
   200     """
       
   201        Start waiting for connections on the given socket
       
   202     """
       
   203 
       
   204     log.info("Listening on %s", sock.getsockname())
       
   205     
       
   206     ev = cb_event(ev_base, sock.fd, EV_READ | EV_PERSIST, on_server_accept, sock)
       
   207     ev.add()
       
   208 
       
   209 def on_server_accept (ev, what, sock) :
       
   210     """
       
   211         Server socket is ready for accept()
       
   212     """
       
   213 
       
   214     # no timeout support yet
       
   215     assert what == EV_READ
       
   216     
       
   217     log.debug("Incoming connection on %s", sock.getsockname())
       
   218 
       
   219     # accept new client socket
       
   220     client, addr = sock.accept()
       
   221 
       
   222     # say so
       
   223     log.info("Client connected from %s (to %s)", client.getpeername(), client.getsockname())
       
   224     
       
   225     # handle it
       
   226     start_sock(client)
       
   227 
       
   228 
       
   229 def start_sock (sock, fd_in=0, fd_out=1) :
   183     """
   230     """
   184         Start shipping data between sock and fin/fout.
   231         Start shipping data between sock and fin/fout.
   185     """
   232     """
       
   233     
       
   234     log.debug("Starting pipeline: %s -> %s -> %s", fd_in, sock, fd_out)
   186 
   235 
   187     # event for each
   236     # event for each
   188     ev_in = cb_event(ev_base, fd_in, EV_READ, on_read_in, sock, fd_in)
   237     ev_in = cb_event(ev_base, fd_in, EV_READ | EV_PERSIST, on_read_in, sock, fd_in)
   189     ev_out = cb_event(ev_base, sock.fd, EV_READ, on_read_sock, sock, fd_out)
   238     ev_out = cb_event(ev_base, sock.fd, EV_READ | EV_PERSIST, on_read_sock, sock, fd_out)
   190 
   239 
   191     ev_in.add()
   240     ev_in.add()
   192     ev_out.add()
   241     ev_out.add()
   193 
   242 
   194 
   243 
   205 
   254 
   206     if buf :
   255     if buf :
   207         # pass on
   256         # pass on
   208         assert sock.write(buf) == len(buf)
   257         assert sock.write(buf) == len(buf)
   209 
   258 
   210         # keep going
   259     else :
   211         ev.add()
   260         ev.delete()
   212 
   261         
   213     else :
       
   214         # EOF
   262         # EOF
   215         sock.shutdown(SHUT_WR)
   263         sock.shutdown(SHUT_WR)
   216 
   264         
       
   265         log.info("EOF on input")
   217 
   266 
   218 def on_read_sock (ev, events, sock, fd_out) :
   267 def on_read_sock (ev, events, sock, fd_out) :
   219     """
   268     """
   220         Data readable on sock, ship to fd_out
   269         Data readable on sock, ship to fd_out
   221     """
   270     """
   228 
   277 
   229     if buf :
   278     if buf :
   230         # pass on
   279         # pass on
   231         assert os.write(fd_out, buf) == len(buf)
   280         assert os.write(fd_out, buf) == len(buf)
   232 
   281 
   233         # keep going
   282     else :
   234         ev.add()
   283         ev.delete()
   235 
   284 
   236     else :
       
   237         # EOF
   285         # EOF
   238         os.close(fd_out)
   286         os.close(fd_out)
   239         
   287         
   240         log_info("EOF from %s", sock.getpeername())
   288         log.info("EOF from %s", sock.getpeername())
   241 
   289 
   242 def run_client (host, port) :
   290 def make_sockargs () :
   243     """
   291     """
   244         Execute in client-mode
   292         Returns (family, socktype) pair to use
   245     """
   293     """
   246 
       
   247     # figure out AF to use
   294     # figure out AF to use
   248     if options.ipv4 :
   295     if options.ipv4 :
   249         family = AF_INET
   296         family = AF_INET
   250 
   297 
   251     elif options.ipv6 :
   298     elif options.ipv6 :
   255         family = AF_UNSPEC
   302         family = AF_UNSPEC
   256 
   303 
   257     # fixed socktype
   304     # fixed socktype
   258     socktype = SOCK_STREAM
   305     socktype = SOCK_STREAM
   259 
   306 
       
   307     return family, socktype
       
   308 
       
   309 def run_client (host, port) :
       
   310     """
       
   311         Execute in client-mode
       
   312     """
       
   313 
       
   314     family, socktype = make_sockargs()
       
   315 
   260     # look up the address and start a non-blocking connect
   316     # look up the address and start a non-blocking connect
   261     sock_iter = sock_connect(host, port, family, socktype)
   317     sock_iter = sock_connect(host, port, family, socktype)
   262     
   318     
   263     # start waiting
   319     # start waiting
   264     client_connect_next(sock_iter)
   320     client_connect_next(sock_iter)
   265 
   321 
       
   322 def run_server (port) :
       
   323     """
       
   324         Execute in server-mode
       
   325     """
       
   326 
       
   327     family, socktype = make_sockargs()
       
   328 
       
   329     # get the first valid socket
       
   330     for sock in sock_listen(port, family, socktype) :
       
   331         break
       
   332 
       
   333     else :
       
   334         return log.error("Unable to listen on local service: %s", port)
       
   335         
       
   336     # wait
       
   337     server_start_accept(sock)
       
   338 
   266 def main (argv) :
   339 def main (argv) :
       
   340     # setup logging
       
   341     logging.basicConfig(
       
   342         format  = "[%(levelname)7s] %(funcName)15s: %(message)s",
       
   343         level   = logging.WARNING,
       
   344     )
       
   345 
   267     # parse args
   346     # parse args
   268     args = parse_argv(argv)
   347     args = parse_argv(argv)
   269     
   348     
   270     # XXX: support listen mode
   349     # pre-options
   271     host, port = args
   350     if options.debug :
   272 
   351         log.setLevel(logging.DEBUG)
   273     run_client(host, port)
   352 
   274         
   353     elif options.verbose :
       
   354         log.setLevel(logging.INFO)
       
   355 
       
   356     # setup 
       
   357     if options.listen :
       
   358         port, = args
       
   359 
       
   360         run_server(port)
       
   361 
       
   362     else : # options.connect
       
   363         host, port = args
       
   364 
       
   365         run_client(host, port)
       
   366     
       
   367 
   275     # run mainloop
   368     # run mainloop
   276     log_debug("main: entering event loop")
   369     log.debug("Entering event loop")
   277 
   370 
   278     if ev_base.loop() :
   371     if ev_base.loop() :
   279         log_debug("main: event loop done")
   372         log.debug("Event loop done")
   280 
   373 
   281     else :
   374     else :
   282         log_err("main: event loop was idle!")
   375         log.warning("Nothing to do!")
   283 
   376 
   284 if __name__ == '__main__' :
   377 if __name__ == '__main__' :
   285     main(sys.argv)
   378     main(sys.argv)
   286 
   379