pvl/verkko/utils.py
author Tero Marttila <terom@paivola.fi>
Sun, 10 Feb 2013 13:20:29 +0200
changeset 205 f7658198c224
parent 184 eef756d892e9
permissions -rw-r--r--
pvl.verkko.hosts: refactor RealtimeHandler to use HostsTable
"""
    DHCP... stuff
"""

import re
import functools
from datetime import datetime, timedelta

TIMEDELTA_RE = re.compile(r'(\d+)([a-z]*)', re.IGNORECASE)
TIMEDELTA_UNITS = {
    'd':    'days',
    'h':    'hours',
    'm':    'minutes',
    's':    'seconds',
}

def parse_timedelta (expr) :
    """
        Parse timeout -> timedelta

        >>> parse_timedelta('1d')
        datetime.timedelta(1)
        >>> parse_timedelta('1h')
        datetime.timedelta(0, 3600)
        >>> parse_timedelta('15m')
        datetime.timedelta(0, 900)
        >>> parse_timedelta('1d1h1s')
        datetime.timedelta(1, 3601)
    """
    
    what = {}

    for (value, unit) in TIMEDELTA_RE.findall(expr) :
        unit = unit.lower()
        value = int(value)

        if unit in TIMEDELTA_UNITS :
            what[TIMEDELTA_UNITS[unit]] = value
        else :
            raise ValueError(unit)
    
    return timedelta(**what)

def format_timedelta (td):
    """
        datetime.timedelta -> str

        >>> print timedelta_str(timedelta(days=1))
        1d
        >>> print timedelta_str(timedelta(hours=6))
        6h
        >>> print timedelta_str(timedelta(days=2, hours=6, seconds=120))
        2d6h2m
    """

    # divmod
    days = td.days

    seconds = td.seconds
    minutes, seconds = divmod(seconds, 60)
    hours, minutes = divmod(minutes, 60)
    
    # format
    data = (
        (days,      'd'),
        (hours,     'h'),
        (minutes,   'm'),
        (seconds,   's'),
    )

    return ''.join('%d%s' % (count, unit) for count, unit in data if count)

def parse_addr (addr, pad=False) :
    """
        Parse IPv4 addr -> int.

            partial     - allow partial addrs; right-pad with .0

        >>> print "%#010x/%d" % parse_addr('1.2.3.4')
        0x01020304/32
        >>> print "%#010x/%d" % parse_addr('1.2', pad=True)
        0x01020000/16

    """

    # split net
    addr = [int(part) for part in addr.split('.') if part]

    addrlen = len(addr) * 8

    # fixup base?
    if len(addr) == 4 :
        # fine
        pass
    
    elif len(addr) > 4 :
        raise ValueError("Invalid IPv4 net: {0}".format(addr))

    elif len(addr) < 4 and pad :
        # pad
        addr += [0] * (4 - len(addr))
    
    else :
        raise ValueError("Incomplete IPv4 addr: {0}".format(addr))

    # pack to int
    return functools.reduce(lambda a, b: a * 256 + b, addr), addrlen
    
def parse_net (expr) :
    """
        Parse given expr into (base, mask).

        >>> print "%#010x/%#010x" % parse_net('1.2.3.4')
        0x01020304/0xffffffff
        >>> print "%#010x/%#010x" % parse_net('1.2.0.0/16')
        0x01020000/0xffff0000
        >>> print "%#010x/%#010x" % parse_net('1.2')
        0x01020000/0xffff0000
    """

    if '/' in expr :
        net, masklen = expr.split('/', 1)
        
        masklen = int(masklen)

    else :
        net = expr
        masklen = None

    base, baselen = parse_addr(net, pad=True)

    if not masklen :
        # implicit mask, by leaving off octets in the base
        masklen = baselen

    elif masklen > 32 :
        raise ValueError("Invalid IPv4 mask: /{0:d}".format(masklen))
    
    # pack
    mask = (0xffffffff << (32 - masklen)) & 0xffffffff

    # verify
    if base & ~mask :
        raise ValueError("Invalid IPv4 net base: {base:x} & {mask:x}".format(base=base, mask=mask))

    return base, mask, masklen

def IPv4Address (addr) :
    """
        Parse IPv4 address to int.
    """

    addr, len = parse_addr(addr)

    return addr

class IPv4Network (object) :
    """
        Parse and match network masks.

        XXX: is used as a dict key
    """

    def __init__ (self, expr) :
        self.expr = expr
        self.base, self.mask, self.masklen = parse_net(expr)

    def __contains__ (self, addr) :
        return (addr & self.mask) == self.base

    def __str__ (self) :
        return self.expr

    def __repr__ (self) :
        return "IPv4Network(%r)" % (self.expr, )

if __name__ == '__main__' :
    import logging

    logging.basicConfig()

    import doctest
    doctest.testmod()