# HG changeset patch # User Tero Marttila # Date 1254751949 -10800 # Node ID ecd89f9605ac8f77c8ab09e445bd406a0f22d57d # Parent 8c4032265c8c1daa33932a2bfb01ad7a7dc50ee0 use logging for nc.py diff -r 8c4032265c8c -r ecd89f9605ac examples/nc.py --- a/examples/nc.py Sat Sep 26 23:51:31 2009 +0300 +++ b/examples/nc.py Mon Oct 05 17:12:29 2009 +0300 @@ -11,7 +11,7 @@ from qmsk.net.lib.event2.event import CallbackEvent as cb_event import sys, os, fcntl, errno -import optparse +import optparse, logging # global options options = None @@ -22,6 +22,10 @@ # buffer size to use, 4k - perhaps slightly smaller BUFSIZE = 4 * 1024 +# root logger +log = logging.getLogger() + + def parse_argv (argv) : global options @@ -33,39 +37,20 @@ parser.add_option('-6', "--ipv6", help="Force AF_INET6", action='store_true') parser.add_option('-v', "--verbose", help="Display status output", action='store_true') parser.add_option('-d', "--debug", help="Display extra output", action='store_true') + parser.add_option('-c', "--connect", help="Act in client mode (default)", action='store_true') + parser.add_option('-l', "--listen", help="Act in server mode", action='store_true') parser.add_option('-w', "--timeout", help="Timeout for connect()", type='float') options, args = parser.parse_args(argv) if options.ipv4 and options.ipv6 : - raise Exception("-4 and -6 are mutually exclusive!") + raise Exception("-4 and -6 are mutually exclusive") - if options.debug : - # enable both - options.verbose = True + if options.connect and options.listen : + raise Exception("-c and -l are mutually exclusive") return args -def log_msg (prefix, msg, *args) : - if args : - msg = msg % args - - sys.stderr.write("%s %s\n" % (prefix, msg)) - -def log_err (msg, *args) : - log_msg('!!!', msg, *args) - -def log_warn (msg, *args) : - log_msg('+++', msg, *args) - -def log_info (msg, *args) : - if options.verbose : - log_msg('***', msg, *args) - -def log_debug (msg, *args) : - if options.debug : - log_msg('---', msg, *args) - def setnonblocking (file) : """ @@ -81,53 +66,86 @@ """ for ai in address.getaddrinfo(host, port, family, socktype) : - log_info("sock_connect: ai: %s", ai) + log.debug("Attempting connect() to: %s", ai) # build socket try : # construct - log_debug("sock_connect: socket(%d, %d, %d)", ai.family, ai.socktype, ai.protocol) + log.debug("socket(%d, %d, %d)", ai.family, ai.socktype, ai.protocol) sock = socket.socket(ai.family, ai.socktype, ai.protocol) - log_debug("sock_connect: socket=%s", sock) # set nonblock mode - log_debug("sock_connect: setnonblocking(%s)", sock) + log.debug("sock=%s: setnonblocking()", sock) setnonblocking(sock) # start connect try : - log_debug("sock_connect: connect(%s)", ai.addr) + log.debug("sock=%s: connect(%s)", sock, ai.addr) sock.connect(ai.addr) - + + # XXX: socket.error except OSError, e : + # this should return EINPROGRESS if e.errno != errno.EINPROGRESS : raise else : - # XXX: wut??? - log_warn("sock_connect: connect: didn't return EINPROGRESS") + # wut??? + log.warning("Connect was blocking") except OSError, e : # fsck - log_warn("sock_connect: %s: %s", ai.addr, e) + log.warning("Failed to connect to %s: %s", ai.addr, e) else : # yay + log.info("Connecting to %s...", ai.addr) + yield sock +def sock_listen (port, family, socktype) : + """ + Try and bind onto the given local service port, yielding a series of listen()'ing socket objects + """ + + for ai in address.getaddrinfo(None, port, family, socktype) : + log.debug("Binding on: %s", ai) + + # build socket + try : + sock = socket.socket(ai.family, ai.socktype, ai.protocol) + + setnonblocking(sock) + + # bind local + sock.bind(ai.addr) + + # listen + sock.listen(5) + + except OSError, e : + # fsck, next + log.warning("Failed to listen on %s: %s", ai.addr, e) + + else : + # yay + log.debug("Listening on %s", ai.addr) + + yield sock def client_connect_next (sock_iter) : """ - Attempt to run the given connect operation, on the given iterable of sockets. + Given an interable of sockets in the non-blocking connect() state, will attempt to wait for each, proceeding to + client_connected when one succeeds. """ for sock in sock_iter : # pend for writing - log_debug("client_connect_next: cb_event(%d, EV_WRITE, on_connect, %r)", sock.fd, sock) + log.debug("cb_event(%d, EV_WRITE, on_connect, %r)", sock.fd, sock) ev = cb_event(ev_base, sock.fd, EV_WRITE, on_connect, sock, sock_iter) # wait specified timeout - log_debug("client_connect_next: %r: add(%s)", ev, options.timeout) + log.debug("ev=%r: add(timeout=%s)", ev, options.timeout) ev.add(options.timeout) # ok @@ -135,19 +153,18 @@ else : # fail, ran out of addresses to try - log_err("client_connect_next: ran out of addresses to try") - + log.error("Unable to connect") def on_connect (ev, events, sock, sock_iter) : """ Outbound connect EV_WRITE callback, i.e. connection failed or was established """ - log_debug("on_connect: ev=%r, events=%#x", ev, events) + log.debug("ev=%r: events=%#04x", ev, events) # test for timeout if events & EV_TIMEOUT : - log_warn("on_connect: connect failed, timeout") + log.warning("Connection to %s timed out", sock.getpeername()) # keep trying return client_connect_next(sock_iter) @@ -158,7 +175,7 @@ if err : # fail - log_warn("on_connect: connect failed, errno=%d", err) + log.warning("Connection to %s failed: %d", sock.getpeername(), err) # keep trying return client_connect_next(sock_iter) @@ -173,20 +190,52 @@ Connection established, start shovelling data """ - log_info("Client connected to %s (from %s)", sock.getpeername(), sock.getsockname()) + log.info("Connected to %s (from %s)", sock.getpeername(), sock.getsockname()) # go! - start_sock(sock, sys.stdin.fileno(), sys.stdout.fileno()) + start_sock(sock) -def start_sock (sock, fd_in, fd_out) : +def server_start_accept (sock) : + """ + Start waiting for connections on the given socket + """ + + log.info("Listening on %s", sock.getsockname()) + + ev = cb_event(ev_base, sock.fd, EV_READ | EV_PERSIST, on_server_accept, sock) + ev.add() + +def on_server_accept (ev, what, sock) : + """ + Server socket is ready for accept() + """ + + # no timeout support yet + assert what == EV_READ + + log.debug("Incoming connection on %s", sock.getsockname()) + + # accept new client socket + client, addr = sock.accept() + + # say so + log.info("Client connected from %s (to %s)", client.getpeername(), client.getsockname()) + + # handle it + start_sock(client) + + +def start_sock (sock, fd_in=0, fd_out=1) : """ Start shipping data between sock and fin/fout. """ + + log.debug("Starting pipeline: %s -> %s -> %s", fd_in, sock, fd_out) # event for each - ev_in = cb_event(ev_base, fd_in, EV_READ, on_read_in, sock, fd_in) - ev_out = cb_event(ev_base, sock.fd, EV_READ, on_read_sock, sock, fd_out) + ev_in = cb_event(ev_base, fd_in, EV_READ | EV_PERSIST, on_read_in, sock, fd_in) + ev_out = cb_event(ev_base, sock.fd, EV_READ | EV_PERSIST, on_read_sock, sock, fd_out) ev_in.add() ev_out.add() @@ -207,13 +256,13 @@ # pass on assert sock.write(buf) == len(buf) - # keep going - ev.add() - else : + ev.delete() + # EOF sock.shutdown(SHUT_WR) - + + log.info("EOF on input") def on_read_sock (ev, events, sock, fd_out) : """ @@ -230,20 +279,18 @@ # pass on assert os.write(fd_out, buf) == len(buf) - # keep going - ev.add() + else : + ev.delete() - else : # EOF os.close(fd_out) - log_info("EOF from %s", sock.getpeername()) + log.info("EOF from %s", sock.getpeername()) -def run_client (host, port) : +def make_sockargs () : """ - Execute in client-mode + Returns (family, socktype) pair to use """ - # figure out AF to use if options.ipv4 : family = AF_INET @@ -257,29 +304,75 @@ # fixed socktype socktype = SOCK_STREAM + return family, socktype + +def run_client (host, port) : + """ + Execute in client-mode + """ + + family, socktype = make_sockargs() + # look up the address and start a non-blocking connect sock_iter = sock_connect(host, port, family, socktype) # start waiting client_connect_next(sock_iter) +def run_server (port) : + """ + Execute in server-mode + """ + + family, socktype = make_sockargs() + + # get the first valid socket + for sock in sock_listen(port, family, socktype) : + break + + else : + return log.error("Unable to listen on local service: %s", port) + + # wait + server_start_accept(sock) + def main (argv) : + # setup logging + logging.basicConfig( + format = "[%(levelname)7s] %(funcName)15s: %(message)s", + level = logging.WARNING, + ) + # parse args args = parse_argv(argv) - # XXX: support listen mode - host, port = args + # pre-options + if options.debug : + log.setLevel(logging.DEBUG) - run_client(host, port) - + elif options.verbose : + log.setLevel(logging.INFO) + + # setup + if options.listen : + port, = args + + run_server(port) + + else : # options.connect + host, port = args + + run_client(host, port) + + # run mainloop - log_debug("main: entering event loop") + log.debug("Entering event loop") if ev_base.loop() : - log_debug("main: event loop done") + log.debug("Event loop done") else : - log_err("main: event loop was idle!") + log.warning("Nothing to do!") if __name__ == '__main__' : main(sys.argv)