Skip to content

Commit

Permalink
Merge pull request #358 from Tendrl/develop
Browse files Browse the repository at this point in the history
Fix Message socket read/write
  • Loading branch information
r0h4n authored Mar 29, 2017
2 parents 444b06d + ae9486f commit a376096
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 147 deletions.
1 change: 0 additions & 1 deletion etc/create_ceph_cluster_sample_job.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
payload = {
"integration_id": "89604c6b-2eff-4221-96b4-e41319240240",
"run": "tendrl.flows.CreateCluster",
"type": "node",
"created_from": "API",
Expand Down
1 change: 0 additions & 1 deletion etc/create_gluster_cluster_sample_job.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
payload = {
"integration_id": "89604c6b-2eff-4221-96b4-e41319240240",
"run": "tendrl.flows.CreateCluster",
"type": "node",
"created_from": "API",
Expand Down
1 change: 0 additions & 1 deletion tendrl/node_agent/manager/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ def main():
NS.node_agent.config.save()
NS.publisher_id = "node_agent"
NS.message_handler_thread = MessageHandler()
NS.publisher_id = "node_agent"

NS.ceph_provisioner = CephProvisioningManager(
NS.tendrl.definitions.get_parsed_defs()["namespace.tendrl"]['ceph_provisioner']
Expand Down
57 changes: 42 additions & 15 deletions tendrl/node_agent/message/handler.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
import os
from io import BlockingIOError
import sys
import struct
import traceback


import gevent.event
import gevent.greenlet
from gevent.server import StreamServer
from gevent import socket
from gevent.socket import error as socket_error
from gevent.socket import timeout as socket_timeout
from io import BlockingIOError
import os
import sys


from tendrl.commons.message import Message
from tendrl.node_agent.message.logger import Logger
import traceback
from tendrl.commons.logger import Logger


RECEIVE_DATA_SIZE = 4096
MESSAGE_SOCK_PATH = "/var/run/tendrl/message.sock"


Expand All @@ -25,8 +30,11 @@ def __init__(self):

def read_socket(self, sock, *args):
try:
self.data = sock.recv(RECEIVE_DATA_SIZE)
message = Message.from_json(self.data)
size = self._msgLength(sock)
data = self._read(sock, size)
frmt = "=%ds" % size
msg = struct.unpack(frmt, data)
message = Message.from_json(msg[0])
Logger(message)
except (socket_error, socket_timeout):
exc_type, exc_value, exc_tb = sys.exc_info()
Expand All @@ -38,7 +46,21 @@ def read_socket(self, sock, *args):
exc_type, exc_value, exc_tb = sys.exc_info()
traceback.print_exception(
exc_type, exc_value, exc_tb, file=sys.stderr)


def _read(self, sock, size):
data = ''
while len(data) < size:
dataTmp = sock.recv(size-len(data))
data += dataTmp
if dataTmp == '':
raise RuntimeError("Message socket connection broken")
return data

def _msgLength(self, sock):
d = self._read(sock, 4)
s = struct.unpack('=I', d)
return s[0]

def _run(self):
try:
self.server.serve_forever()
Expand All @@ -59,16 +81,21 @@ def bind_unix_listener(self):
socket.SOCK_STREAM)
self.sock.setblocking(0)
self.sock.listen(50)
return self.sock
except (TypeError, BlockingIOError, socket_error, ValueError):
exc_type, exc_value, exc_tb = sys.exc_info()
traceback.print_exception(exc_type, exc_value, exc_tb,
file=sys.stderr)
exc_type, exc_value, exc_tb = sys.exc_info()
traceback.print_exception(exc_type, exc_value, exc_tb,
file=sys.stderr)
pass
try:
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
if os.path.exists(MESSAGE_SOCK_PATH):
os.remove(socket_path)
os.remove(MESSAGE_SOCK_PATH)
self.sock.setblocking(0)
self.sock.bind(MESSAGE_SOCK_PATH)
self.sock.listen(50)
finally:
return self.sock

except:
exc_type, exc_value, exc_tb = sys.exc_info()
traceback.print_exception(exc_type, exc_value, exc_tb,
file=sys.stderr)
116 changes: 0 additions & 116 deletions tendrl/node_agent/message/logger.py

This file was deleted.

11 changes: 6 additions & 5 deletions tendrl/node_agent/objects/cluster_message/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,21 @@
from tendrl.commons import objects


class ClusterMessage(message, objects.BaseObject):
class ClusterMessage(objects.BaseObject, message):
internal = True
def __init__(self, **cluster_message):
self._defs = {}
super(ClusterMessage, self).__init__(**cluster_message)

self.value = 'clusters/%s/Messages/%s'
message.__init__(self, **cluster_message)
objects.BaseObject.__init__(self)

self.value = 'clusters/%s/messages/%s'
self._etcd_cls = _ClusterMessageEtcd

class _ClusterMessageEtcd(etcdobj.EtcdObj):
"""Cluster message object, lazily updated
"""
__name__ = 'clusters/%s/Messages/%s'
__name__ = 'clusters/%s/messages/%s'
_tendrl_cls = ClusterMessage

def render(self):
Expand Down
9 changes: 5 additions & 4 deletions tendrl/node_agent/objects/message/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,20 @@
from tendrl.commons import objects


class Message(message, objects.BaseObject):
class Message(objects.BaseObject, message):
internal = True
def __init__(self, **message_arg):
self._defs = {}
super(Message, self).__init__(**message_arg)
self.value = 'Messages/events/%s'
message.__init__(self, **message_arg)
objects.BaseObject.__init__(self)
self.value = 'messages/events/%s'
self._etcd_cls = _MessageEtcd

class _MessageEtcd(etcdobj.EtcdObj):
"""Message object, lazily updated
"""
__name__ = 'Messages/events/%s'
__name__ = 'messages/events/%s'
_tendrl_cls = Message

def render(self):
Expand Down
9 changes: 5 additions & 4 deletions tendrl/node_agent/objects/node_message/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,21 @@
from tendrl.commons import objects


class NodeMessage(message, objects.BaseObject):
class NodeMessage(objects.BaseObject, message):
internal = True
def __init__(self, **node_message):
self._defs = {}
super(NodeMessage, self).__init__(**node_message)
message.__init__(self, **node_message)
objects.BaseObject.__init__(self)

self.value = 'nodes/%s/Messages/%s'
self.value = 'nodes/%s/messages/%s'
self._etcd_cls = _NodeMessageEtcd

class _NodeMessageEtcd(etcdobj.EtcdObj):
"""Node message object, lazily updated
"""
__name__ = 'nodes/%s/Messages/%s'
__name__ = 'nodes/%s/messages/%s'
_tendrl_cls = NodeMessage

def render(self):
Expand Down

0 comments on commit a376096

Please sign in to comment.