--- a/examples/nc.py Sat Sep 26 23:51:31 2009 +0300
+++ b/examples/nc.py Mon Oct 05 17:12:29 2009 +0300
@@ -11,7 +11,7 @@
from qmsk.net.lib.event2.event import CallbackEvent as cb_event
import sys, os, fcntl, errno
-import optparse
+import optparse, logging
# global options
options = None
@@ -22,6 +22,10 @@
# buffer size to use, 4k - perhaps slightly smaller
BUFSIZE = 4 * 1024
+# root logger
+log = logging.getLogger()
+
+
def parse_argv (argv) :
global options
@@ -33,39 +37,20 @@
parser.add_option('-6', "--ipv6", help="Force AF_INET6", action='store_true')
parser.add_option('-v', "--verbose", help="Display status output", action='store_true')
parser.add_option('-d', "--debug", help="Display extra output", action='store_true')
+ parser.add_option('-c', "--connect", help="Act in client mode (default)", action='store_true')
+ parser.add_option('-l', "--listen", help="Act in server mode", action='store_true')
parser.add_option('-w', "--timeout", help="Timeout for connect()", type='float')
options, args = parser.parse_args(argv)
if options.ipv4 and options.ipv6 :
- raise Exception("-4 and -6 are mutually exclusive!")
+ raise Exception("-4 and -6 are mutually exclusive")
- if options.debug :
- # enable both
- options.verbose = True
+ if options.connect and options.listen :
+ raise Exception("-c and -l are mutually exclusive")
return args
-def log_msg (prefix, msg, *args) :
- if args :
- msg = msg % args
-
- sys.stderr.write("%s %s\n" % (prefix, msg))
-
-def log_err (msg, *args) :
- log_msg('!!!', msg, *args)
-
-def log_warn (msg, *args) :
- log_msg('+++', msg, *args)
-
-def log_info (msg, *args) :
- if options.verbose :
- log_msg('***', msg, *args)
-
-def log_debug (msg, *args) :
- if options.debug :
- log_msg('---', msg, *args)
-
def setnonblocking (file) :
"""
@@ -81,53 +66,86 @@
"""
for ai in address.getaddrinfo(host, port, family, socktype) :
- log_info("sock_connect: ai: %s", ai)
+ log.debug("Attempting connect() to: %s", ai)
# build socket
try :
# construct
- log_debug("sock_connect: socket(%d, %d, %d)", ai.family, ai.socktype, ai.protocol)
+ log.debug("socket(%d, %d, %d)", ai.family, ai.socktype, ai.protocol)
sock = socket.socket(ai.family, ai.socktype, ai.protocol)
- log_debug("sock_connect: socket=%s", sock)
# set nonblock mode
- log_debug("sock_connect: setnonblocking(%s)", sock)
+ log.debug("sock=%s: setnonblocking()", sock)
setnonblocking(sock)
# start connect
try :
- log_debug("sock_connect: connect(%s)", ai.addr)
+ log.debug("sock=%s: connect(%s)", sock, ai.addr)
sock.connect(ai.addr)
-
+
+ # XXX: socket.error
except OSError, e :
+ # this should return EINPROGRESS
if e.errno != errno.EINPROGRESS :
raise
else :
- # XXX: wut???
- log_warn("sock_connect: connect: didn't return EINPROGRESS")
+ # wut???
+ log.warning("Connect was blocking")
except OSError, e :
# fsck
- log_warn("sock_connect: %s: %s", ai.addr, e)
+ log.warning("Failed to connect to %s: %s", ai.addr, e)
else :
# yay
+ log.info("Connecting to %s...", ai.addr)
+
yield sock
+def sock_listen (port, family, socktype) :
+ """
+ Try and bind onto the given local service port, yielding a series of listen()'ing socket objects
+ """
+
+ for ai in address.getaddrinfo(None, port, family, socktype) :
+ log.debug("Binding on: %s", ai)
+
+ # build socket
+ try :
+ sock = socket.socket(ai.family, ai.socktype, ai.protocol)
+
+ setnonblocking(sock)
+
+ # bind local
+ sock.bind(ai.addr)
+
+ # listen
+ sock.listen(5)
+
+ except OSError, e :
+ # fsck, next
+ log.warning("Failed to listen on %s: %s", ai.addr, e)
+
+ else :
+ # yay
+ log.debug("Listening on %s", ai.addr)
+
+ yield sock
def client_connect_next (sock_iter) :
"""
- Attempt to run the given connect operation, on the given iterable of sockets.
+ Given an interable of sockets in the non-blocking connect() state, will attempt to wait for each, proceeding to
+ client_connected when one succeeds.
"""
for sock in sock_iter :
# pend for writing
- log_debug("client_connect_next: cb_event(%d, EV_WRITE, on_connect, %r)", sock.fd, sock)
+ log.debug("cb_event(%d, EV_WRITE, on_connect, %r)", sock.fd, sock)
ev = cb_event(ev_base, sock.fd, EV_WRITE, on_connect, sock, sock_iter)
# wait specified timeout
- log_debug("client_connect_next: %r: add(%s)", ev, options.timeout)
+ log.debug("ev=%r: add(timeout=%s)", ev, options.timeout)
ev.add(options.timeout)
# ok
@@ -135,19 +153,18 @@
else :
# fail, ran out of addresses to try
- log_err("client_connect_next: ran out of addresses to try")
-
+ log.error("Unable to connect")
def on_connect (ev, events, sock, sock_iter) :
"""
Outbound connect EV_WRITE callback, i.e. connection failed or was established
"""
- log_debug("on_connect: ev=%r, events=%#x", ev, events)
+ log.debug("ev=%r: events=%#04x", ev, events)
# test for timeout
if events & EV_TIMEOUT :
- log_warn("on_connect: connect failed, timeout")
+ log.warning("Connection to %s timed out", sock.getpeername())
# keep trying
return client_connect_next(sock_iter)
@@ -158,7 +175,7 @@
if err :
# fail
- log_warn("on_connect: connect failed, errno=%d", err)
+ log.warning("Connection to %s failed: %d", sock.getpeername(), err)
# keep trying
return client_connect_next(sock_iter)
@@ -173,20 +190,52 @@
Connection established, start shovelling data
"""
- log_info("Client connected to %s (from %s)", sock.getpeername(), sock.getsockname())
+ log.info("Connected to %s (from %s)", sock.getpeername(), sock.getsockname())
# go!
- start_sock(sock, sys.stdin.fileno(), sys.stdout.fileno())
+ start_sock(sock)
-def start_sock (sock, fd_in, fd_out) :
+def server_start_accept (sock) :
+ """
+ Start waiting for connections on the given socket
+ """
+
+ log.info("Listening on %s", sock.getsockname())
+
+ ev = cb_event(ev_base, sock.fd, EV_READ | EV_PERSIST, on_server_accept, sock)
+ ev.add()
+
+def on_server_accept (ev, what, sock) :
+ """
+ Server socket is ready for accept()
+ """
+
+ # no timeout support yet
+ assert what == EV_READ
+
+ log.debug("Incoming connection on %s", sock.getsockname())
+
+ # accept new client socket
+ client, addr = sock.accept()
+
+ # say so
+ log.info("Client connected from %s (to %s)", client.getpeername(), client.getsockname())
+
+ # handle it
+ start_sock(client)
+
+
+def start_sock (sock, fd_in=0, fd_out=1) :
"""
Start shipping data between sock and fin/fout.
"""
+
+ log.debug("Starting pipeline: %s -> %s -> %s", fd_in, sock, fd_out)
# event for each
- ev_in = cb_event(ev_base, fd_in, EV_READ, on_read_in, sock, fd_in)
- ev_out = cb_event(ev_base, sock.fd, EV_READ, on_read_sock, sock, fd_out)
+ ev_in = cb_event(ev_base, fd_in, EV_READ | EV_PERSIST, on_read_in, sock, fd_in)
+ ev_out = cb_event(ev_base, sock.fd, EV_READ | EV_PERSIST, on_read_sock, sock, fd_out)
ev_in.add()
ev_out.add()
@@ -207,13 +256,13 @@
# pass on
assert sock.write(buf) == len(buf)
- # keep going
- ev.add()
-
else :
+ ev.delete()
+
# EOF
sock.shutdown(SHUT_WR)
-
+
+ log.info("EOF on input")
def on_read_sock (ev, events, sock, fd_out) :
"""
@@ -230,20 +279,18 @@
# pass on
assert os.write(fd_out, buf) == len(buf)
- # keep going
- ev.add()
+ else :
+ ev.delete()
- else :
# EOF
os.close(fd_out)
- log_info("EOF from %s", sock.getpeername())
+ log.info("EOF from %s", sock.getpeername())
-def run_client (host, port) :
+def make_sockargs () :
"""
- Execute in client-mode
+ Returns (family, socktype) pair to use
"""
-
# figure out AF to use
if options.ipv4 :
family = AF_INET
@@ -257,29 +304,75 @@
# fixed socktype
socktype = SOCK_STREAM
+ return family, socktype
+
+def run_client (host, port) :
+ """
+ Execute in client-mode
+ """
+
+ family, socktype = make_sockargs()
+
# look up the address and start a non-blocking connect
sock_iter = sock_connect(host, port, family, socktype)
# start waiting
client_connect_next(sock_iter)
+def run_server (port) :
+ """
+ Execute in server-mode
+ """
+
+ family, socktype = make_sockargs()
+
+ # get the first valid socket
+ for sock in sock_listen(port, family, socktype) :
+ break
+
+ else :
+ return log.error("Unable to listen on local service: %s", port)
+
+ # wait
+ server_start_accept(sock)
+
def main (argv) :
+ # setup logging
+ logging.basicConfig(
+ format = "[%(levelname)7s] %(funcName)15s: %(message)s",
+ level = logging.WARNING,
+ )
+
# parse args
args = parse_argv(argv)
- # XXX: support listen mode
- host, port = args
+ # pre-options
+ if options.debug :
+ log.setLevel(logging.DEBUG)
- run_client(host, port)
-
+ elif options.verbose :
+ log.setLevel(logging.INFO)
+
+ # setup
+ if options.listen :
+ port, = args
+
+ run_server(port)
+
+ else : # options.connect
+ host, port = args
+
+ run_client(host, port)
+
+
# run mainloop
- log_debug("main: entering event loop")
+ log.debug("Entering event loop")
if ev_base.loop() :
- log_debug("main: event loop done")
+ log.debug("Event loop done")
else :
- log_err("main: event loop was idle!")
+ log.warning("Nothing to do!")
if __name__ == '__main__' :
main(sys.argv)