Skip to content

Commit

Permalink
OPC DA: Single thread runner, update executions logic
Browse files Browse the repository at this point in the history
Still not perfect but more in line to expected external event
handler behavior.
  • Loading branch information
kumajaya authored and azoitl committed Apr 17, 2024
1 parent 7f5e478 commit 892aa20
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 36 deletions.
4 changes: 4 additions & 0 deletions src/com/opc/opcconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ bool COpcConnection::ifInGroupList(const std::string& paGroupName){

int COpcConnection::send_connect(const std::string& paGroupName, unsigned long paReqUpdateRate, float paDeadBand,
forte::com_infra::CComLayer* paComCallback, std::vector<COpcProcessVar*> paNewItems){
if(mConnectionState == e_Disconnected){
mEventHandler->enableHandler();
}

if(ifLetEventPass(e_Connect,paGroupName)){
mConnectionState = e_Connecting;
addGroup(paGroupName, paReqUpdateRate, paDeadBand, paComCallback);
Expand Down
57 changes: 23 additions & 34 deletions src/com/opc/opceventhandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,10 @@ bool COpcEventHandler::mIsSemaphoreEmpty = true;
COpcEventHandler::TCallbackDescriptor COpcEventHandler::mCallbackDescCount = 0;

COpcEventHandler::COpcEventHandler(CDeviceExecution& paDeviceExecution) : CExternalEventHandler(paDeviceExecution) {
if (!isAlive()) {
this->start();
// Sleep to allow new thread to start
CThread::sleepThread(100);
}
}

COpcEventHandler::~COpcEventHandler(){
if (!mCommandQueue.isEmpty()) {
resumeSelfSuspend(); //wake-up and execute all commands in queue if exist
}

if (isAlive()) {
setAlive(false);
resumeSelfSuspend();
end();
}

clearCommandQueue(); //re-check and delete all commands in queue if still exist
clearCommandQueue(); //check and delete all commands in queue if not empty
}

void COpcEventHandler::clearCommandQueue(){
Expand All @@ -59,6 +44,22 @@ void COpcEventHandler::clearCommandQueue(){
delete nextCommand;
}
}

for(TCallbackList::iterator itRunner = mComCallbacks.begin(); itRunner != mComCallbacks.end(); ++itRunner){
DEVLOG_ERROR("erase from command callback\n");
mComCallbacks.erase(itRunner);
}
}

void COpcEventHandler::executeCommandQueue(){
while(!mCommandQueue.isEmpty()) {
ICmd* nextCommand = getNextCommand();
if(nextCommand != nullptr) {
nextCommand->runCommand();
delete nextCommand;
nextCommand = nullptr;
}
}
}

void COpcEventHandler::sendCommand(ICmd *paCmd){
Expand All @@ -69,26 +70,14 @@ void COpcEventHandler::sendCommand(ICmd *paCmd){
}

void COpcEventHandler::run(){
HRESULT result = CoInitializeEx(nullptr, COINIT_MULTITHREADED);

if(result == S_OK){
while(isAlive()){
if (!isAlive()) {
break;
}
while(!mCommandQueue.isEmpty()) {
ICmd* nextCommand = getNextCommand();
if(nextCommand != nullptr) {
nextCommand->runCommand();
delete nextCommand;
nextCommand = nullptr;
}
}
selfSuspend();
while(isAlive()){
executeCommandQueue();
if (!isAlive()) {
break;
}
selfSuspend();
}

CoUninitialize();
executeCommandQueue(); // immediately try to clear the command queue after a stop
}

COpcEventHandler::TCallbackDescriptor COpcEventHandler::addComCallback(forte::com_infra::CComLayer* paComCallback){
Expand Down
25 changes: 23 additions & 2 deletions src/com/opc/opceventhandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,31 @@ class COpcEventHandler : public CExternalEventHandler, private CThread{

/* functions needed for the external event handler interface */
void enableHandler() override {
//do nothing, start thread in the constructor
if (!isAlive()) {
start();
DEVLOG_INFO("COpcEventHandler: handler enabled\n");
}
}

void disableHandler() override {
//do nothing, end thread in the destructor
if (!mComCallbacks.empty()) {
DEVLOG_INFO("COpcEventHandler: command callback not empty\n");
resumeSelfSuspend();
return; // refuse, wait the second trigger
}

if (!mCommandQueue.isEmpty()) {
DEVLOG_INFO("COpcEventHandler: command queue not empty\n");
resumeSelfSuspend(); //wake-up, execute all commands in queue and continue
}

mStateSemaphore.timedWait(100000000); // wait 100ms to back wake-up
if (isAlive()) {
resumeSelfSuspend();
setAlive(false);
end();
DEVLOG_INFO("COpcEventHandler: handler disabled\n");
}
}

void setPriority(int) override {
Expand All @@ -65,6 +85,7 @@ class COpcEventHandler : public CExternalEventHandler, private CThread{
private:
ICmd* getNextCommand();
void clearCommandQueue();
void executeCommandQueue();

struct TComContainer{
TCallbackDescriptor mCallbackDesc;
Expand Down

0 comments on commit 892aa20

Please sign in to comment.