examples/nc.py
author Tero Marttila <terom@fixme.fi>
Mon, 05 Oct 2009 17:12:29 +0300
changeset 58 ecd89f9605ac
parent 55 99c4344a35ce
permissions -rw-r--r--
use logging for nc.py
#!/usr/bin/env python
"""
    Simple (limited) netcat-like example, implemented using the socket/lib.event2 layer.
"""

from qmsk.net.socket import socket, address
from qmsk.net.socket.constants import *

from qmsk.net.lib.event2 import base, event
from qmsk.net.lib.event2.constants import *
from qmsk.net.lib.event2.event import CallbackEvent as cb_event

import sys, os, fcntl, errno
import optparse, logging

# global options
options = None

# global event_base
ev_base = base.event_base()

# buffer size to use, 4k - perhaps slightly smaller
BUFSIZE = 4 * 1024

# root logger
log = logging.getLogger()


def parse_argv (argv) :
    global options

    prog = argv.pop(0)

    parser = optparse.OptionParser(prog=prog)

    parser.add_option('-4', "--ipv4", help="Force AF_INET", action='store_true')
    parser.add_option('-6', "--ipv6", help="Force AF_INET6", action='store_true')
    parser.add_option('-v', "--verbose", help="Display status output", action='store_true')
    parser.add_option('-d', "--debug", help="Display extra output", action='store_true')
    parser.add_option('-c', "--connect", help="Act in client mode (default)", action='store_true')
    parser.add_option('-l', "--listen", help="Act in server mode", action='store_true')
    parser.add_option('-w', "--timeout", help="Timeout for connect()", type='float')

    options, args = parser.parse_args(argv)

    if options.ipv4 and options.ipv6 :
        raise Exception("-4 and -6 are mutually exclusive")

    if options.connect and options.listen :
        raise Exception("-c and -l are mutually exclusive")

    return args


def setnonblocking (file) :
    """
        Set the non-blocking IO flag on the given file object
    """

    fcntl.fcntl(file.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)

def sock_connect (host, port, family, socktype) :
    """
        Try and perform a series of non-blocking connects to the given host:port using the given family and socktype,
        yielding a series of socket objects.
    """

    for ai in address.getaddrinfo(host, port, family, socktype) :
        log.debug("Attempting connect() to: %s", ai)

        # build socket
        try :
            # construct
            log.debug("socket(%d, %d, %d)", ai.family, ai.socktype, ai.protocol)
            sock = socket.socket(ai.family, ai.socktype, ai.protocol)
            
            # set nonblock mode
            log.debug("sock=%s: setnonblocking()", sock)
            setnonblocking(sock)
            
            # start connect
            try :
                log.debug("sock=%s: connect(%s)", sock, ai.addr)
                sock.connect(ai.addr)
            
            # XXX: socket.error
            except OSError, e :
                # this should return EINPROGRESS
                if e.errno != errno.EINPROGRESS :
                    raise
            
            else :
                # wut???
                log.warning("Connect was blocking")

        except OSError, e :
            # fsck
            log.warning("Failed to connect to %s: %s", ai.addr, e)

        else :
            # yay
            log.info("Connecting to %s...", ai.addr)

            yield sock

def sock_listen (port, family, socktype) :
    """
        Try and bind onto the given local service port, yielding a series of listen()'ing socket objects
    """

    for ai in address.getaddrinfo(None, port, family, socktype) :
        log.debug("Binding on: %s", ai)

        # build socket
        try :
            sock = socket.socket(ai.family, ai.socktype, ai.protocol)

            setnonblocking(sock)

            # bind local
            sock.bind(ai.addr)

            # listen
            sock.listen(5)

        except OSError, e :
            # fsck, next
            log.warning("Failed to listen on %s: %s", ai.addr, e)

        else :
            # yay
            log.debug("Listening on %s", ai.addr)

            yield sock

def client_connect_next (sock_iter) :
    """
        Given an interable of sockets in the non-blocking connect() state, will attempt to wait for each, proceeding to 
        client_connected when one succeeds.
    """

    for sock in sock_iter :
        # pend for writing
        log.debug("cb_event(%d, EV_WRITE, on_connect, %r)", sock.fd, sock)
        ev = cb_event(ev_base, sock.fd, EV_WRITE, on_connect, sock, sock_iter)

        # wait specified timeout
        log.debug("ev=%r: add(timeout=%s)", ev, options.timeout)
        ev.add(options.timeout)

        # ok
        break

    else :
        # fail, ran out of addresses to try
        log.error("Unable to connect")

def on_connect (ev, events, sock, sock_iter) :
    """
        Outbound connect EV_WRITE callback, i.e. connection failed or was established
    """

    log.debug("ev=%r: events=%#04x", ev, events)

    # test for timeout
    if events & EV_TIMEOUT :
        log.warning("Connection to %s timed out", sock.getpeername())

        # keep trying
        return client_connect_next(sock_iter)


    # test for errno
    err = sock.getsockopt_int(SOL_SOCKET, SO_ERROR)

    if err :
        # fail
        log.warning("Connection to %s failed: %d", sock.getpeername(), err)
        
        # keep trying
        return client_connect_next(sock_iter)


    # ok, connected
    client_connected(sock)


def client_connected (sock) :
    """
        Connection established, start shovelling data
    """

    log.info("Connected to %s (from %s)", sock.getpeername(), sock.getsockname())
    
    # go!
    start_sock(sock)


def server_start_accept (sock) :
    """
       Start waiting for connections on the given socket
    """

    log.info("Listening on %s", sock.getsockname())
    
    ev = cb_event(ev_base, sock.fd, EV_READ | EV_PERSIST, on_server_accept, sock)
    ev.add()

def on_server_accept (ev, what, sock) :
    """
        Server socket is ready for accept()
    """

    # no timeout support yet
    assert what == EV_READ
    
    log.debug("Incoming connection on %s", sock.getsockname())

    # accept new client socket
    client, addr = sock.accept()

    # say so
    log.info("Client connected from %s (to %s)", client.getpeername(), client.getsockname())
    
    # handle it
    start_sock(client)


def start_sock (sock, fd_in=0, fd_out=1) :
    """
        Start shipping data between sock and fin/fout.
    """
    
    log.debug("Starting pipeline: %s -> %s -> %s", fd_in, sock, fd_out)

    # event for each
    ev_in = cb_event(ev_base, fd_in, EV_READ | EV_PERSIST, on_read_in, sock, fd_in)
    ev_out = cb_event(ev_base, sock.fd, EV_READ | EV_PERSIST, on_read_sock, sock, fd_out)

    ev_in.add()
    ev_out.add()


def on_read_in (ev, events, sock, fd_in) :
    """
        Data readable on ev.fd, ship to sock
    """
    
    # no timeouts yet
    assert events & EV_READ
    
    # try and read
    buf = os.read(fd_in, BUFSIZE)

    if buf :
        # pass on
        assert sock.write(buf) == len(buf)

    else :
        ev.delete()
        
        # EOF
        sock.shutdown(SHUT_WR)
        
        log.info("EOF on input")

def on_read_sock (ev, events, sock, fd_out) :
    """
        Data readable on sock, ship to fd_out
    """
    
    # no timeouts yet
    assert events & EV_READ

    # try and read
    buf = sock.read(BUFSIZE)

    if buf :
        # pass on
        assert os.write(fd_out, buf) == len(buf)

    else :
        ev.delete()

        # EOF
        os.close(fd_out)
        
        log.info("EOF from %s", sock.getpeername())

def make_sockargs () :
    """
        Returns (family, socktype) pair to use
    """
    # figure out AF to use
    if options.ipv4 :
        family = AF_INET

    elif options.ipv6 :
        family = AF_INET6

    else :
        family = AF_UNSPEC

    # fixed socktype
    socktype = SOCK_STREAM

    return family, socktype

def run_client (host, port) :
    """
        Execute in client-mode
    """

    family, socktype = make_sockargs()

    # look up the address and start a non-blocking connect
    sock_iter = sock_connect(host, port, family, socktype)
    
    # start waiting
    client_connect_next(sock_iter)

def run_server (port) :
    """
        Execute in server-mode
    """

    family, socktype = make_sockargs()

    # get the first valid socket
    for sock in sock_listen(port, family, socktype) :
        break

    else :
        return log.error("Unable to listen on local service: %s", port)
        
    # wait
    server_start_accept(sock)

def main (argv) :
    # setup logging
    logging.basicConfig(
        format  = "[%(levelname)7s] %(funcName)15s: %(message)s",
        level   = logging.WARNING,
    )

    # parse args
    args = parse_argv(argv)
    
    # pre-options
    if options.debug :
        log.setLevel(logging.DEBUG)

    elif options.verbose :
        log.setLevel(logging.INFO)

    # setup 
    if options.listen :
        port, = args

        run_server(port)

    else : # options.connect
        host, port = args

        run_client(host, port)
    

    # run mainloop
    log.debug("Entering event loop")

    if ev_base.loop() :
        log.debug("Event loop done")

    else :
        log.warning("Nothing to do!")

if __name__ == '__main__' :
    main(sys.argv)