terom@48: #!/usr/bin/env python terom@48: """ terom@48: Simple (limited) netcat-like example, implemented using the socket/lib.event2 layer. terom@48: """ terom@48: terom@48: from qmsk.net.socket import socket, address terom@48: from qmsk.net.socket.constants import * terom@48: terom@48: from qmsk.net.lib.event2 import base, event terom@48: from qmsk.net.lib.event2.constants import * terom@48: from qmsk.net.lib.event2.event import CallbackEvent as cb_event terom@48: terom@48: import sys, os, fcntl, errno terom@48: import optparse terom@48: terom@48: # global options terom@48: options = None terom@48: terom@48: # global event_base terom@48: ev_base = base.event_base() terom@48: terom@48: terom@48: def parse_argv (argv) : terom@48: global options terom@48: terom@48: prog = argv.pop(0) terom@48: terom@48: parser = optparse.OptionParser(prog=prog) terom@48: terom@48: parser.add_option('-4', "--ipv4", help="Force AF_INET", action='store_true') terom@48: parser.add_option('-6', "--ipv6", help="Force AF_INET6", action='store_true') terom@48: parser.add_option('-v', "--verbose", help="Display status output", action='store_true') terom@48: parser.add_option('-d', "--debug", help="Display extra output", action='store_true') terom@48: parser.add_option('-w', "--timeout", help="Timeout for connect()", type='float') terom@48: terom@48: options, args = parser.parse_args(argv) terom@48: terom@48: if options.ipv4 and options.ipv6 : terom@48: raise Exception("-4 and -6 are mutually exclusive!") terom@48: terom@48: if options.debug : terom@48: # enable both terom@48: options.verbose = True terom@48: terom@48: return args terom@48: terom@48: def log_msg (prefix, msg, *args) : terom@48: if args : terom@48: msg = msg % args terom@48: terom@48: sys.stderr.write("%s %s\n" % (prefix, msg)) terom@48: terom@48: def log_err (msg, *args) : terom@48: log_msg('!!!', msg, *args) terom@48: terom@48: def log_warn (msg, *args) : terom@48: log_msg('+++', msg, *args) terom@48: terom@48: def log_info (msg, *args) : terom@48: if options.verbose : terom@48: log_msg('***', msg, *args) terom@48: terom@48: def log_debug (msg, *args) : terom@48: if options.debug : terom@48: log_msg('---', msg, *args) terom@48: terom@48: terom@48: def setnonblocking (file) : terom@48: """ terom@48: Set the non-blocking IO flag on the given file object terom@48: """ terom@48: terom@48: fcntl.fcntl(file.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) terom@48: terom@48: def sock_connect (host, port, family, socktype) : terom@48: """ terom@48: Try and perform a series of non-blocking connects to the given host:port using the given family and socktype, terom@48: yielding a series of socket objects. terom@48: """ terom@48: terom@48: for ai in address.getaddrinfo(host, port, family, socktype) : terom@54: log_info("sock_connect: ai: %s", ai) terom@48: terom@48: # build socket terom@48: try : terom@48: # construct terom@48: log_debug("sock_connect: socket(%d, %d, %d)", ai.family, ai.socktype, ai.protocol) terom@48: sock = socket.socket(ai.family, ai.socktype, ai.protocol) terom@48: log_debug("sock_connect: socket=%s", sock) terom@48: terom@48: # set nonblock mode terom@48: log_debug("sock_connect: setnonblocking(%s)", sock) terom@48: setnonblocking(sock) terom@48: terom@48: # start connect terom@48: try : terom@48: log_debug("sock_connect: connect(%s)", ai.addr) terom@48: sock.connect(ai.addr) terom@48: terom@48: except OSError, e : terom@48: if e.errno != errno.EINPROGRESS : terom@48: raise terom@48: terom@48: else : terom@48: # XXX: wut??? terom@48: log_warn("sock_connect: connect: didn't return EINPROGRESS") terom@48: terom@48: except OSError, e : terom@48: # fsck terom@48: log_warn("sock_connect: %s: %s", ai.addr, e) terom@48: terom@48: else : terom@48: # yay terom@48: yield sock terom@48: terom@54: def on_connect (ev, events, sock, sock_iter) : terom@48: """ terom@48: Outbound connect EV_WRITE callback, i.e. connection failed or was established terom@48: """ terom@48: terom@54: log_debug("on_connect: ev=%r, events=%#x", ev, events) terom@54: terom@54: # test for timeout terom@54: if events & EV_TIMEOUT : terom@54: log_warn("on_connect: connect failed, timeout") terom@54: terom@54: # keep trying terom@54: return client_connect_next(sock_iter) terom@54: terom@54: terom@54: # test for errno terom@54: err = sock.getsockopt_int(SOL_SOCKET, SO_ERROR) terom@54: terom@54: if err : terom@54: # fail terom@54: log_warn("on_connect: connect failed, errno=%d", err) terom@54: terom@54: # keep trying terom@54: return client_connect_next(sock_iter) terom@54: terom@54: terom@54: # ok, connected terom@54: log_info("on_connect: connected") terom@54: terom@48: terom@48: def client_connect_next (sock_iter) : terom@48: """ terom@48: Attempt to run the given connect operation, on the given iterable of sockets. terom@48: """ terom@48: terom@48: for sock in sock_iter : terom@48: # pend for writing terom@48: log_debug("client_connect_next: cb_event(%d, EV_WRITE, on_connect, %r)", sock.fd, sock) terom@54: ev = cb_event(ev_base, sock.fd, EV_WRITE, on_connect, sock, sock_iter) terom@48: terom@48: # wait specified timeout terom@48: log_debug("client_connect_next: %r: add(%s)", ev, options.timeout) terom@48: ev.add(options.timeout) terom@48: terom@48: # ok terom@48: break terom@48: terom@48: else : terom@48: # fail, ran out of addresses to try terom@48: log_err("client_connect_next: ran out of addresses to try") terom@48: terom@48: def run_client (host, port) : terom@48: """ terom@48: Execute in client-mode terom@48: """ terom@48: terom@48: # figure out AF to use terom@48: if options.ipv4 : terom@48: family = AF_INET terom@48: terom@48: elif options.ipv6 : terom@48: family = AF_INET6 terom@48: terom@48: else : terom@48: family = AF_UNSPEC terom@48: terom@48: # fixed socktype terom@48: socktype = SOCK_STREAM terom@48: terom@48: # look up the address and start a non-blocking connect terom@48: sock_iter = sock_connect(host, port, family, socktype) terom@48: terom@48: # start waiting terom@48: client_connect_next(sock_iter) terom@48: terom@48: def main (argv) : terom@48: # parse args terom@48: args = parse_argv(argv) terom@48: terom@48: # XXX: support listen mode terom@48: host, port = args terom@48: terom@48: run_client(host, port) terom@48: terom@48: # run mainloop terom@48: log_debug("main: entering event loop") terom@48: terom@48: if ev_base.loop() : terom@48: log_debug("main: event loop done") terom@48: terom@48: else : terom@48: log_err("main: event loop was idle!") terom@48: terom@48: if __name__ == '__main__' : terom@48: main(sys.argv) terom@48: