Skip to content

Commit

Permalink
v1.0.4
Browse files Browse the repository at this point in the history
  • Loading branch information
xxnet committed Jan 31, 2015
1 parent 9ad3787 commit 9f43428
Show file tree
Hide file tree
Showing 17 changed files with 567 additions and 242 deletions.
8 changes: 3 additions & 5 deletions data/goagent/config.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
modules:
goagent: {auto_start: 1, current_version: 3.1.31, ignore_version: 3.1.31}
launcher: {current_version: 1.0.0, ignore_version: 1.0.0}
update: {check_update: 0, last_path: /media/release/XX-Net/launcher/1.0.0, node_id: !!python/long '8796754053427',
uuid: 63b44f9f-8256-4f44-a512-1da2bdebb62b}
socks_proxy:
{host:"", port:1080, user:"", password:""}
appids: {appids: "", password:""}
2 changes: 1 addition & 1 deletion goagent/3.1.32/local/cert_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,5 +375,5 @@ def check_ca():


if __name__ == '__main__':
capath = os.path.join(os.path.dirname(os.path.abspath(__file__)), CertUtil.ca_keyfile)
#capath = os.path.join(os.path.dirname(os.path.abspath(__file__)), CertUtil.ca_keyfile)
CertUtil.check_ca()
2 changes: 1 addition & 1 deletion goagent/3.1.32/local/check_ip.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def test_main():

if __name__ == "__main__":
#test("203.165.14.230", 10) #gws
test('208.117.224.213', 10)
test('210.139.253.39', 100)
#test("218.176.242.24")
#test_main()

Expand Down
159 changes: 132 additions & 27 deletions goagent/3.1.32/local/connect_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import logging
from config import config
import threading
import operator

current_path = os.path.dirname(os.path.abspath(__file__))
python_path = os.path.abspath( os.path.join(current_path, os.pardir, os.pardir, os.pardir, 'python27', '1.0'))
Expand All @@ -41,6 +42,60 @@
g_cacertfile = os.path.join(current_path, "cacert.pem")


class Connect_pool(object):
pool_lock = threading.Lock()
not_empty = threading.Condition(pool_lock)
pool = {}

def qsize(self):
return len(self.pool)

def put(self, item):
speed, sock = item
self.pool_lock.acquire()
try:
self.pool[sock] = speed
self.not_empty.notify()
finally:
self.pool_lock.release()

def get(self, block=True):
self.not_empty.acquire()
try:
if not block:
if not self.qsize():
raise
else:
while not self.qsize():
self.not_empty.wait()
item = self._get()
return item
finally:
self.not_empty.release()

def get_nowait(self):
return self.get(False)

def _get(self):
#pool = sorted(self.pool.items(), key=operator.itemgetter(1))
#k,v = pool[0]
#self.pool.pop(k)
#return (v, k)

fastest_time = 9999
fastest_sock = None
for sock in self.pool:
time = self.pool[sock]
if time < fastest_time:
fastest_time = time
fastest_sock = sock

self.pool.pop(fastest_sock)
return (fastest_time, fastest_sock)




class Https_connection_manager(object):

thread_num_lock = threading.Lock()
Expand Down Expand Up @@ -102,10 +157,10 @@ def __init__(self):
self.max_retry = 3
self.timeout = 3
self.max_timeout = 5
self.max_thread_num = 30
self.min_connection_num = 30
self.max_thread_num = 20
self.min_connection_num = 20

self.conn_pool = Queue.Queue()
self.conn_pool = Connect_pool() #Queue.PriorityQueue()


# set_ciphers as Modern Browsers
Expand All @@ -119,8 +174,15 @@ def __init__(self):
if hasattr(OpenSSL.SSL, 'SESS_CACHE_BOTH'):
self.openssl_context.set_session_cache_mode(OpenSSL.SSL.SESS_CACHE_BOTH)

def save_ssl_connection_for_reuse(self, socket):
self.conn_pool.put( (time.time(), socket) )
def save_ssl_connection_for_reuse(self, ssl_sock):
if self.conn_pool.qsize() > 5:
if ssl_sock.handshake_time > 200 and ssl_sock.create_time - time.time() > 10:
return
if self.conn_pool.qsize() > self.min_connection_num - 5:
if ssl_sock.handshake_time > 300:
return
ssl_sock.last_use_time = time.time()
self.conn_pool.put( (ssl_sock.handshake_time, ssl_sock) )

def create_ssl_connection(self):

Expand Down Expand Up @@ -160,9 +222,12 @@ def _create_ssl_connection(ip_port):
handshake_time = int((time_handshaked - time_connected) * 1000)

google_ip.update_ip(ip, handshake_time)

#logging.debug("create_ssl update ip:%s time:%d", ip, handshake_time)
# sometimes, we want to use raw tcp socket directly(select/epoll), so setattr it to ssl socket.
ssl_sock.ip = ip
ssl_sock.sock = sock
ssl_sock.create_time = time_begin
ssl_sock.handshake_time = handshake_time

# verify SSL certificate issuer.
def check_ssl_cert(ssl_sock):
Expand All @@ -178,9 +243,10 @@ def check_ssl_cert(ssl_sock):

return ssl_sock
except Exception as e:
logging.debug("create_ssl %s fail:%s", ip, e)
#logging.debug("create_ssl %s fail:%s", ip, e)
google_ip.report_connect_fail(ip)


if ssl_sock:
ssl_sock.close()
if sock:
Expand All @@ -197,10 +263,11 @@ def connect_thread():
break

port = 443
logging.debug("create ssl conn %s", ip_str)
#logging.debug("create ssl conn %s", ip_str)
ssl_sock = _create_ssl_connection( (ip_str, port) )
if ssl_sock:
self.conn_pool.put((time.time(), ssl_sock))
ssl_sock.last_use_time = time.time()
self.conn_pool.put((ssl_sock.handshake_time, ssl_sock))
finally:
self.thread_num_lock.acquire()
self.thread_num -= 1
Expand All @@ -219,30 +286,31 @@ def create_more_connection():

while True:
try:
ctime, sock = self.conn_pool.get_nowait()
handshake_time, ssl_sock = self.conn_pool.get_nowait()
except:
sock = None
ssl_sock = None
break

if time.time() - ctime < 210: # gws ssl connection can keep for 230s after created
if time.time() - ssl_sock.last_use_time < 210: # gws ssl connection can keep for 230s after created
#logging.debug("ssl_pool.get:%s handshake:%d", ssl_sock.ip, handshake_time)
break
else:
sock.close()
ssl_sock.close()
continue

conn_num = self.conn_pool.qsize()
logging.debug("ssl conn_num:%d", conn_num)
#logging.debug("ssl conn_num:%d", conn_num)
if conn_num < self.min_connection_num:
create_more_connection()

if sock:
return sock
if ssl_sock:
return ssl_sock
else:
try:
ctime, sock = self.conn_pool.get()
return sock
handshake_time, ssl_sock = self.conn_pool.get()
return ssl_sock
except Exception as e:
logging.warning("get ssl_pool err:%s", e)
logging.error("get ssl_pool err:%s", e)
return None


Expand Down Expand Up @@ -317,7 +385,9 @@ class Forward_connection_manager():

def create_connection(self, sock_life=5, cache_key=None):
connection_cache_key = cache_key
def _create_connection(ip_port, timeout, queobj):
def _create_connection(ip_port, timeout, queobj, delay=0):
if delay != 0:
time.sleep(delay)
ip = ip_port[0]
sock = None
try:
Expand All @@ -339,14 +409,17 @@ def _create_connection(ip_port, timeout, queobj):
# record TCP connection time
conn_time = time.time() - start_time
google_ip.update_ip(ip, conn_time * 2000)
logging.info("create_tcp update ip:%s time:%d", ip, conn_time * 2000)
logging.debug("tcp conn %s time:%d", ip, conn_time * 1000)

# put ssl socket object to output queobj
queobj.put(sock)
except (socket.error, OSError) as e:
# any socket.error, put Excpetions to output queobj.
queobj.put(e)

logging.debug("tcp conn %s fail", ip)
google_ip.report_connect_fail(ip)
logging.info("create_ssl report fail ip:%s", ip)
if sock:
sock.close()

Expand All @@ -362,7 +435,7 @@ def recycle_connection(count, queobj):
if connection_cache_key:
try:
ctime, sock = self.tcp_connection_cache[connection_cache_key].get_nowait()
if time.time() - ctime < 5:
if time.time() - ctime < sock_life:
return sock
except Queue.Empty:
pass
Expand All @@ -376,17 +449,16 @@ def recycle_connection(count, queobj):

addrs = addresses
queobj = Queue.Queue()
delay = 0
for addr in addrs:
thread.start_new_thread(_create_connection, (addr, timeout, queobj))
thread.start_new_thread(_create_connection, (addr, timeout, queobj, delay))
delay += 0.01
for i in range(len(addrs)):
result = queobj.get()
if not isinstance(result, (socket.error, OSError)):
thread.start_new_thread(recycle_connection, (len(addrs)-i-1, queobj))
return result
else:
if i == 0:
# only output first error
logging.warning('create_connection to %s return %r, try again.', addrs, result)
logging.warning('create_connection to %s fail.', addrs)


def forward_socket(self, local, remote, timeout=60, tick=2, bufsize=8192):
Expand Down Expand Up @@ -424,5 +496,38 @@ def forward_socket(self, local, remote, timeout=60, tick=2, bufsize=8192):
logging.debug("forward closed.")



https_manager = Https_connection_manager()
forwork_manager = Forward_connection_manager()


def test_pool():
pool = Connect_pool()
pool.put((3, "c"))
pool.put((1, "a"))
pool.put((2, "b"))

t, s = pool.get()
print s

t, s = pool.get()
print s

t, s = pool.get()
print s


def test_pool_speed():
pool = Connect_pool()
for i in range(100):
pool.put((i, "%d"%i))

start = time.time()
t, s = pool.get()
print time.time() - start
print s
# sort time is 5ms for 10000
# sort time is 0ms for 100

if __name__ == "__main__":
test_pool_speed()
Loading

0 comments on commit 9f43428

Please sign in to comment.