Skip to content

Commit

Permalink
Add 'test' directory.
Browse files Browse the repository at this point in the history
  • Loading branch information
montag451 committed Mar 15, 2012
1 parent c80091d commit 50f5b7f
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 0 deletions.
87 changes: 87 additions & 0 deletions test/test_tap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import sys
import optparse
import socket
import select
import errno
import pytun

class TunnelServer(object):

def __init__(self, taddr, tmask, tmtu, laddr, lport, raddr, rport):
self._tap = pytun.TunTapDevice(flags=pytun.IFF_TAP|pytun.IFF_NO_PI)
self._tap.addr = taddr
self._tap.netmask = tmask
self._tap.mtu = tmtu
self._tap.up()
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._sock.bind((laddr, lport))
self._raddr = raddr
self._rport = rport

def run(self):
mtu = self._tap.mtu
r = [self._tap, self._sock]; w = []; x = []
to_tap = ''
to_sock = ''
while True:
try:
r, w, x = select.select(r, w, x)
if self._tap in r:
to_sock = self._tap.read(mtu)
if self._sock in r:
to_tap, addr = self._sock.recvfrom(65535)
if addr[0] != self._raddr or addr[1] != self._rport:
to_tap = '' # drop packet
if self._tap in w:
self._tap.write(to_tap)
to_tap = ''
if self._sock in w:
self._sock.sendto(to_sock, (self._raddr, self._rport))
to_sock = ''
r = []; w = []
if to_tap:
w.append(self._tap)
else:
r.append(self._sock)
if to_sock:
w.append(self._sock)
else:
r.append(self._tap)
except (select.error, socket.error, pytun.Error), e:
if e[0] == errno.EINTR:
continue
print >> sys.stderr, str(e)
break

def main():
parser = optparse.OptionParser()
parser.add_option('--tap-addr', dest='taddr',
help='set tunnel local address')
parser.add_option('--tap-netmask', default='255.255.255.0',dest='tmask',
help='set tunnel netmask')
parser.add_option('--tap-mtu', type='int', default=1500,dest='tmtu',
help='set tunnel MTU')
parser.add_option('--local-addr', default='0.0.0.0', dest='laddr',
help='set local address [%default]')
parser.add_option('--local-port', type='int', default=12000, dest='lport',
help='set local port [%default]')
parser.add_option('--remote-addr', dest='raddr',
help='set remote address')
parser.add_option('--remote-port', type='int', dest='rport',
help='set remote port')
opt, args = parser.parse_args()
if not (opt.taddr and opt.raddr and opt.rport):
parser.print_help()
return 1
try:
server = TunnelServer(opt.taddr, opt.tmask, opt.tmtu, opt.laddr,
opt.lport, opt.raddr, opt.rport)
except (pytun.Error, socket.error), e:
print >> sys.stderr, str(e)
return 1
server.run()
return 0

if __name__ == '__main__':
sys.exit(main())

90 changes: 90 additions & 0 deletions test/test_tun.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import sys
import optparse
import socket
import select
import errno
import pytun

class TunnelServer(object):

def __init__(self, taddr, tdstaddr, tmask, tmtu, laddr, lport, raddr, rport):
self._tun = pytun.TunTapDevice(flags=pytun.IFF_TUN|pytun.IFF_NO_PI)
self._tun.addr = taddr
self._tun.dstaddr = tdstaddr
self._tun.netmask = tmask
self._tun.mtu = tmtu
self._tun.up()
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._sock.bind((laddr, lport))
self._raddr = raddr
self._rport = rport

def run(self):
mtu = self._tun.mtu
r = [self._tun, self._sock]; w = []; x = []
to_tun = ''
to_sock = ''
while True:
try:
r, w, x = select.select(r, w, x)
if self._tun in r:
to_sock = self._tun.read(mtu)
if self._sock in r:
to_tun, addr = self._sock.recvfrom(65535)
if addr[0] != self._raddr or addr[1] != self._rport:
to_tun = '' # drop packet
if self._tun in w:
self._tun.write(to_tun)
to_tun = ''
if self._sock in w:
self._sock.sendto(to_sock, (self._raddr, self._rport))
to_sock = ''
r = []; w = []
if to_tun:
w.append(self._tun)
else:
r.append(self._sock)
if to_sock:
w.append(self._sock)
else:
r.append(self._tun)
except (select.error, socket.error, pytun.Error), e:
if e[0] == errno.EINTR:
continue
print >> sys.stderr, str(e)
break

def main():
parser = optparse.OptionParser()
parser.add_option('--tun-addr', dest='taddr',
help='set tunnel local address')
parser.add_option('--tun-dstaddr', dest='tdstaddr',
help='set tunnel destination address')
parser.add_option('--tun-netmask', default='255.255.255.0',dest='tmask',
help='set tunnel netmask')
parser.add_option('--tun-mtu', type='int', default=1500,dest='tmtu',
help='set tunnel MTU')
parser.add_option('--local-addr', default='0.0.0.0', dest='laddr',
help='set local address [%default]')
parser.add_option('--local-port', type='int', default=12000, dest='lport',
help='set local port [%default]')
parser.add_option('--remote-addr', dest='raddr',
help='set remote address')
parser.add_option('--remote-port', type='int', dest='rport',
help='set remote port')
opt, args = parser.parse_args()
if not (opt.taddr and opt.tdstaddr and opt.raddr and opt.rport):
parser.print_help()
return 1
try:
server = TunnelServer(opt.taddr, opt.tdstaddr, opt.tmask, opt.tmtu,
opt.laddr, opt.lport, opt.raddr, opt.rport)
except (pytun.Error, socket.error), e:
print >> sys.stderr, str(e)
return 1
server.run()
return 0

if __name__ == '__main__':
sys.exit(main())

0 comments on commit 50f5b7f

Please sign in to comment.