Skip to content

Commit

Permalink
patch: fix many signals at once reception
Browse files Browse the repository at this point in the history
  • Loading branch information
janosimas committed Jun 16, 2016
1 parent 179e1c6 commit 710b060
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 116 deletions.
4 changes: 0 additions & 4 deletions src/examples/core/ListenToTcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,6 @@ terrama2::core::DataSeriesPtr buildInputDataSeries()

int main(int argc, char* argv[])
{
//TODO: fix Listen to tcp example
std::cout << "NOT WORKING" << std::endl;
return 1;

terrama2::core::initializeTerraMA();
QCoreApplication app(argc, argv);

Expand Down
220 changes: 109 additions & 111 deletions src/terrama2/core/network/TcpManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,7 @@ class RaiiBlock
uint32_t& block_;
};

class RaiiSocket
{
public:
RaiiSocket(QTcpSocket* socket) : socket_(socket) {}
~RaiiSocket() {if(socket_) socket_->readAll();}

QTcpSocket* socket_;
};

bool terrama2::core::TcpManager::updateListeningPort(int port)
bool terrama2::core::TcpManager::updateListeningPort(uint32_t port)
{
if(serverPort() == port)
return true;
Expand Down Expand Up @@ -175,124 +166,131 @@ bool terrama2::core::TcpManager::sendLog(const QByteArray& bytearray)

void terrama2::core::TcpManager::readReadySlot()
{
RaiiBlock block(blockSize_);
RaiiSocket clearSocketOnExit(tcpSocket_.get());

QDataStream in(tcpSocket_.get());
TERRAMA2_LOG_DEBUG() << "bytes available: " << tcpSocket_->bytesAvailable();

Q_UNUSED(block)
if(blockSize_ == 0)
{
if(tcpSocket_->bytesAvailable() < static_cast<int>(sizeof(uint32_t)))
{
TERRAMA2_LOG_ERROR() << QObject::tr("Error receiving remote configuration.\nInvalid message size.");
return;
}

//Raii block
RaiiBlock block(blockSize_);

in >> blockSize_;
TERRAMA2_LOG_DEBUG() << "message size: " << blockSize_;
}
QDataStream in(tcpSocket_.get());
TERRAMA2_LOG_DEBUG() << "bytes available: " << tcpSocket_->bytesAvailable();

if(tcpSocket_->bytesAvailable() != blockSize_)
{
auto bytearray = tcpSocket_->readAll();
TERRAMA2_LOG_DEBUG() << bytearray.right(bytearray.size()-4).data();
TERRAMA2_LOG_ERROR() << QObject::tr("Error receiving remote configuration.\nWrong message size.");
return;
}
Q_UNUSED(block)
if(blockSize_ == 0)
{
if(tcpSocket_->bytesAvailable() < static_cast<int>(sizeof(uint32_t)))
{
TERRAMA2_LOG_ERROR() << QObject::tr("Error receiving remote configuration.\nInvalid message size.");
return;
}

int sigInt = -1;
in >> sigInt;

TcpSignal signal = static_cast<TcpSignal>(sigInt);
if(signal != TcpSignal::UPDATE_SERVICE_SIGNAL && !serviceManager_->serviceLoaded())
{
// wait for TcpSignals::UPDATE_SERVICE_SIGNAL
return;
}
in >> blockSize_;
TERRAMA2_LOG_DEBUG() << "message size: " << blockSize_;
}

switch(signal)
{
case TcpSignal::UPDATE_SERVICE_SIGNAL:
if(tcpSocket_->bytesAvailable() < blockSize_)
{
QByteArray bytearray = tcpSocket_->readAll();

updateService(bytearray);
break;
auto bytearray = tcpSocket_->readAll();
TERRAMA2_LOG_DEBUG() << bytearray.right(bytearray.size()-4).data();
TERRAMA2_LOG_ERROR() << QObject::tr("Error receiving remote configuration.\nWrong message size.");
return;
}
case TcpSignal::TERMINATE_SERVICE_SIGNAL:
{
TERRAMA2_LOG_DEBUG() << "TERMINATE_SERVICE_SIGNAL";

serviceManager_->setShuttingDownProcessInitiated();
int sigInt = -1;
in >> sigInt;

emit stopSignal();
break;
}
case TcpSignal::ADD_DATA_SIGNAL:
{
TERRAMA2_LOG_DEBUG() << "ADD_DATA_SIGNAL";
QByteArray bytearray = tcpSocket_->readAll();
//read signal
TcpSignal signal = static_cast<TcpSignal>(sigInt);
//update left blockSize
blockSize_-=sizeof(TcpSignal);

addData(bytearray);
break;
}
case TcpSignal::REMOVE_DATA_SIGNAL:
if(signal != TcpSignal::UPDATE_SERVICE_SIGNAL && !serviceManager_->serviceLoaded())
{
TERRAMA2_LOG_DEBUG() << "REMOVE_DATA_SIGNAL";
QByteArray bytearray = tcpSocket_->readAll();

removeData(bytearray);
break;
// wait for TcpSignals::UPDATE_SERVICE_SIGNAL
return;
}
case TcpSignal::START_PROCESS_SIGNAL:
{
TERRAMA2_LOG_DEBUG() << "START_PROCESS_SIGNAL";
int dataId;
in >> dataId;

emit startProcess(dataId);

break;
}
case TcpSignal::STATUS_SIGNAL:
switch(signal)
{
TERRAMA2_LOG_DEBUG() << "STATUS_SIGNAL";
QByteArray bytearray;
QDataStream out(&bytearray, QIODevice::WriteOnly);

auto jsonObj = ServiceManager::getInstance().status();
QJsonDocument doc(jsonObj);

out << static_cast<uint32_t>(0);
out << static_cast<uint32_t>(TcpSignal::STATUS_SIGNAL);
out << doc.toJson(QJsonDocument::Compact);
bytearray.remove(8, 4);//Remove QByteArray header
out.device()->seek(0);
out << static_cast<uint32_t>(bytearray.size() - sizeof(uint32_t));

// wait while sending message
qint64 written = tcpSocket_->write(bytearray);
if(written == -1 || !tcpSocket_->waitForBytesWritten(30000))
TERRAMA2_LOG_WARNING() << QObject::tr("Unable to establish connection with server.");

break;
case TcpSignal::UPDATE_SERVICE_SIGNAL:
{
QByteArray bytearray = tcpSocket_->read(blockSize_);

updateService(bytearray);
break;
}
case TcpSignal::TERMINATE_SERVICE_SIGNAL:
{
TERRAMA2_LOG_DEBUG() << "TERMINATE_SERVICE_SIGNAL";

serviceManager_->setShuttingDownProcessInitiated();

emit stopSignal();
break;
}
case TcpSignal::ADD_DATA_SIGNAL:
{
TERRAMA2_LOG_DEBUG() << "ADD_DATA_SIGNAL";
QByteArray bytearray = tcpSocket_->read(blockSize_);

addData(bytearray);
break;
}
case TcpSignal::REMOVE_DATA_SIGNAL:
{
TERRAMA2_LOG_DEBUG() << "REMOVE_DATA_SIGNAL";
QByteArray bytearray = tcpSocket_->read(blockSize_);

removeData(bytearray);
break;
}
case TcpSignal::START_PROCESS_SIGNAL:
{
TERRAMA2_LOG_DEBUG() << "START_PROCESS_SIGNAL";
uint32_t dataId;
in >> dataId;

emit startProcess(dataId);

break;
}
case TcpSignal::STATUS_SIGNAL:
{
TERRAMA2_LOG_DEBUG() << "STATUS_SIGNAL";
QByteArray bytearray;
QDataStream out(&bytearray, QIODevice::WriteOnly);

auto jsonObj = ServiceManager::getInstance().status();
QJsonDocument doc(jsonObj);

out << static_cast<uint32_t>(0);
out << static_cast<uint32_t>(TcpSignal::STATUS_SIGNAL);
out << doc.toJson(QJsonDocument::Compact);
bytearray.remove(8, 4);//Remove QByteArray header
out.device()->seek(0);
out << static_cast<uint32_t>(bytearray.size() - sizeof(uint32_t));

// wait while sending message
qint64 written = tcpSocket_->write(bytearray);
if(written == -1 || !tcpSocket_->waitForBytesWritten(30000))
TERRAMA2_LOG_WARNING() << QObject::tr("Unable to establish connection with server.");

break;
}
case TcpSignal::LOG_SIGNAL:
{
TERRAMA2_LOG_DEBUG() << "LOG_SIGNAL";

// removeData(bytearray);
break;
}
default:
TERRAMA2_LOG_ERROR() << QObject::tr("Error\n Unknown signal received.");
break;
}
case TcpSignal::LOG_SIGNAL:
{
TERRAMA2_LOG_DEBUG() << "LOG_SIGNAL";

// removeData(bytearray);
break;
}
default:
TERRAMA2_LOG_ERROR() << QObject::tr("Error\n Unknown signal received.");
break;
}
}//end of Raii block

blockSize_ = 0;
if(tcpSocket_.get() && !tcpSocket_->atEnd())
readReadySlot();
}

void terrama2::core::TcpManager::receiveConnection()
Expand Down
2 changes: 1 addition & 1 deletion src/terrama2/core/network/TcpManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ namespace terrama2
using QTcpServer::listen;

public slots:
bool updateListeningPort(int);
bool updateListeningPort(uint32_t);

signals:
//! Emited when the service should be terminated.
Expand Down

0 comments on commit 710b060

Please sign in to comment.