#!/usr/bin/env python
"""
Simple (limited) netcat-like example, implemented using the socket/lib.event2 layer.
"""
from qmsk.net.socket import socket, address
from qmsk.net.socket.constants import *
from qmsk.net.lib.event2 import base, event
from qmsk.net.lib.event2.constants import *
from qmsk.net.lib.event2.event import CallbackEvent as cb_event
import sys, os, fcntl, errno
import optparse, logging
# global options
options = None
# global event_base
ev_base = base.event_base()
# buffer size to use, 4k - perhaps slightly smaller
BUFSIZE = 4 * 1024
# root logger
log = logging.getLogger()
def parse_argv (argv) :
global options
prog = argv.pop(0)
parser = optparse.OptionParser(prog=prog)
parser.add_option('-4', "--ipv4", help="Force AF_INET", action='store_true')
parser.add_option('-6', "--ipv6", help="Force AF_INET6", action='store_true')
parser.add_option('-v', "--verbose", help="Display status output", action='store_true')
parser.add_option('-d', "--debug", help="Display extra output", action='store_true')
parser.add_option('-c', "--connect", help="Act in client mode (default)", action='store_true')
parser.add_option('-l', "--listen", help="Act in server mode", action='store_true')
parser.add_option('-w', "--timeout", help="Timeout for connect()", type='float')
options, args = parser.parse_args(argv)
if options.ipv4 and options.ipv6 :
raise Exception("-4 and -6 are mutually exclusive")
if options.connect and options.listen :
raise Exception("-c and -l are mutually exclusive")
return args
def setnonblocking (file) :
"""
Set the non-blocking IO flag on the given file object
"""
fcntl.fcntl(file.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
def sock_connect (host, port, family, socktype) :
"""
Try and perform a series of non-blocking connects to the given host:port using the given family and socktype,
yielding a series of socket objects.
"""
for ai in address.getaddrinfo(host, port, family, socktype) :
log.debug("Attempting connect() to: %s", ai)
# build socket
try :
# construct
log.debug("socket(%d, %d, %d)", ai.family, ai.socktype, ai.protocol)
sock = socket.socket(ai.family, ai.socktype, ai.protocol)
# set nonblock mode
log.debug("sock=%s: setnonblocking()", sock)
setnonblocking(sock)
# start connect
try :
log.debug("sock=%s: connect(%s)", sock, ai.addr)
sock.connect(ai.addr)
# XXX: socket.error
except OSError, e :
# this should return EINPROGRESS
if e.errno != errno.EINPROGRESS :
raise
else :
# wut???
log.warning("Connect was blocking")
except OSError, e :
# fsck
log.warning("Failed to connect to %s: %s", ai.addr, e)
else :
# yay
log.info("Connecting to %s...", ai.addr)
yield sock
def sock_listen (port, family, socktype) :
"""
Try and bind onto the given local service port, yielding a series of listen()'ing socket objects
"""
for ai in address.getaddrinfo(None, port, family, socktype) :
log.debug("Binding on: %s", ai)
# build socket
try :
sock = socket.socket(ai.family, ai.socktype, ai.protocol)
setnonblocking(sock)
# bind local
sock.bind(ai.addr)
# listen
sock.listen(5)
except OSError, e :
# fsck, next
log.warning("Failed to listen on %s: %s", ai.addr, e)
else :
# yay
log.debug("Listening on %s", ai.addr)
yield sock
def client_connect_next (sock_iter) :
"""
Given an interable of sockets in the non-blocking connect() state, will attempt to wait for each, proceeding to
client_connected when one succeeds.
"""
for sock in sock_iter :
# pend for writing
log.debug("cb_event(%d, EV_WRITE, on_connect, %r)", sock.fd, sock)
ev = cb_event(ev_base, sock.fd, EV_WRITE, on_connect, sock, sock_iter)
# wait specified timeout
log.debug("ev=%r: add(timeout=%s)", ev, options.timeout)
ev.add(options.timeout)
# ok
break
else :
# fail, ran out of addresses to try
log.error("Unable to connect")
def on_connect (ev, events, sock, sock_iter) :
"""
Outbound connect EV_WRITE callback, i.e. connection failed or was established
"""
log.debug("ev=%r: events=%#04x", ev, events)
# test for timeout
if events & EV_TIMEOUT :
log.warning("Connection to %s timed out", sock.getpeername())
# keep trying
return client_connect_next(sock_iter)
# test for errno
err = sock.getsockopt_int(SOL_SOCKET, SO_ERROR)
if err :
# fail
log.warning("Connection to %s failed: %d", sock.getpeername(), err)
# keep trying
return client_connect_next(sock_iter)
# ok, connected
client_connected(sock)
def client_connected (sock) :
"""
Connection established, start shovelling data
"""
log.info("Connected to %s (from %s)", sock.getpeername(), sock.getsockname())
# go!
start_sock(sock)
def server_start_accept (sock) :
"""
Start waiting for connections on the given socket
"""
log.info("Listening on %s", sock.getsockname())
ev = cb_event(ev_base, sock.fd, EV_READ | EV_PERSIST, on_server_accept, sock)
ev.add()
def on_server_accept (ev, what, sock) :
"""
Server socket is ready for accept()
"""
# no timeout support yet
assert what == EV_READ
log.debug("Incoming connection on %s", sock.getsockname())
# accept new client socket
client, addr = sock.accept()
# say so
log.info("Client connected from %s (to %s)", client.getpeername(), client.getsockname())
# handle it
start_sock(client)
def start_sock (sock, fd_in=0, fd_out=1) :
"""
Start shipping data between sock and fin/fout.
"""
log.debug("Starting pipeline: %s -> %s -> %s", fd_in, sock, fd_out)
# event for each
ev_in = cb_event(ev_base, fd_in, EV_READ | EV_PERSIST, on_read_in, sock, fd_in)
ev_out = cb_event(ev_base, sock.fd, EV_READ | EV_PERSIST, on_read_sock, sock, fd_out)
ev_in.add()
ev_out.add()
def on_read_in (ev, events, sock, fd_in) :
"""
Data readable on ev.fd, ship to sock
"""
# no timeouts yet
assert events & EV_READ
# try and read
buf = os.read(fd_in, BUFSIZE)
if buf :
# pass on
assert sock.write(buf) == len(buf)
else :
ev.delete()
# EOF
sock.shutdown(SHUT_WR)
log.info("EOF on input")
def on_read_sock (ev, events, sock, fd_out) :
"""
Data readable on sock, ship to fd_out
"""
# no timeouts yet
assert events & EV_READ
# try and read
buf = sock.read(BUFSIZE)
if buf :
# pass on
assert os.write(fd_out, buf) == len(buf)
else :
ev.delete()
# EOF
os.close(fd_out)
log.info("EOF from %s", sock.getpeername())
def make_sockargs () :
"""
Returns (family, socktype) pair to use
"""
# figure out AF to use
if options.ipv4 :
family = AF_INET
elif options.ipv6 :
family = AF_INET6
else :
family = AF_UNSPEC
# fixed socktype
socktype = SOCK_STREAM
return family, socktype
def run_client (host, port) :
"""
Execute in client-mode
"""
family, socktype = make_sockargs()
# look up the address and start a non-blocking connect
sock_iter = sock_connect(host, port, family, socktype)
# start waiting
client_connect_next(sock_iter)
def run_server (port) :
"""
Execute in server-mode
"""
family, socktype = make_sockargs()
# get the first valid socket
for sock in sock_listen(port, family, socktype) :
break
else :
return log.error("Unable to listen on local service: %s", port)
# wait
server_start_accept(sock)
def main (argv) :
# setup logging
logging.basicConfig(
format = "[%(levelname)7s] %(funcName)15s: %(message)s",
level = logging.WARNING,
)
# parse args
args = parse_argv(argv)
# pre-options
if options.debug :
log.setLevel(logging.DEBUG)
elif options.verbose :
log.setLevel(logging.INFO)
# setup
if options.listen :
port, = args
run_server(port)
else : # options.connect
host, port = args
run_client(host, port)
# run mainloop
log.debug("Entering event loop")
if ev_base.loop() :
log.debug("Event loop done")
else :
log.warning("Nothing to do!")
if __name__ == '__main__' :
main(sys.argv)