Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Visual dicom browser enh #1218

Merged
merged 1 commit into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 94 additions & 57 deletions Libs/Core/ctkJobScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
// Qt includes
#include <QCoreApplication>
#include <QDebug>
#include <QMutexLocker>
#include <QReadLocker>
#include <QWriteLocker>
#include <QSharedPointer>
#include <QThreadPool>
#include <QUuid>
Expand Down Expand Up @@ -68,20 +69,18 @@ void ctkJobSchedulerPrivate::init()
void ctkJobSchedulerPrivate::queueJobsInThreadPool()
{
Q_Q(ctkJobScheduler);
// NOTE: No need to queue jobs with a signal/slot mechanism, since the mutex makes
// sure that concurrent threads append/clean/delete the jobs map.

if (this->FreezeJobsScheduling)
{
return;
}

// No need to queue jobs with a signal/slot mechanism, since the mutex makes
// sure that concurrent threads append/clean/delete the jobs map.

{
// The QMutexLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QMutexLockers within the scheduler's methods.
QMutexLocker locker(&this->QueueMutex);

// The QWriteLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QWriteLockers within the scheduler's methods.
QWriteLocker locker(&this->QueueLock);
foreach (QThread::Priority priority, (QList<QThread::Priority>()
<< QThread::Priority::HighestPriority
<< QThread::Priority::HighPriority
Expand Down Expand Up @@ -109,21 +108,24 @@ void ctkJobSchedulerPrivate::queueJobsInThreadPool()
int numberOfRunningJobsWithSameType = this->getSameTypeJobsInThreadPoolQueueOrRunning(job);
if (numberOfRunningJobsWithSameType >= job->maximumConcurrentJobsPerType())
{
continue;
// When the maximum number of concurrent jobs of the same type is reached,
// return early instead of adding more jobs to an already crowded queue.
// This allows the scheduler time to finish the currently running jobs,
// preventing a jobs traffic jam.
return;
}

logger.debug(QString("ctkDICOMScheduler: creating worker for job %1 in thread %2.\n")
.arg(job->jobUID())
.arg(QString::number(reinterpret_cast<quint64>(QThread::currentThreadId())), 16));
.arg(job->jobUID())
.arg(QString::number(reinterpret_cast<quint64>(QThread::currentThreadId())), 16));

QSharedPointer<ctkAbstractWorker> worker = QSharedPointer<ctkAbstractWorker>(job->createWorker());
worker->setScheduler(*q);
this->Workers.insert(job->jobUID(), worker);

job->setStatus(ctkAbstractJob::JobStatus::Queued);
emit q->jobQueued(job->toVariant());

QSharedPointer<ctkAbstractWorker> worker =
QSharedPointer<ctkAbstractWorker>(job->createWorker());
worker->setScheduler(*q);

this->Workers.insert(job->jobUID(), worker);
this->ThreadPool->start(worker.data(), job->priority());
}
}
Expand Down Expand Up @@ -184,16 +186,40 @@ bool ctkJobSchedulerPrivate::insertJob(QSharedPointer<ctkAbstractJob> job)
{"progress", progressConnection},
};

emit q->jobInitialized(job->toVariant());

logger.debug(QString("ctkDICOMScheduler: creating worker for job %1 in thread %2.\n")
.arg(job->jobUID())
.arg(QString::number(reinterpret_cast<quint64>(QThread::currentThreadId())), 16));

QSharedPointer<ctkAbstractWorker> worker;
{
// The QMutexLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QMutexLockers within the scheduler's methods.
QMutexLocker locker(&this->QueueMutex);
// The QWriteLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QWriteLockers within the scheduler's methods.
QWriteLocker locker(&this->QueueLock);
this->JobsQueue.insert(job->jobUID(), job);
this->JobsConnections.insert(job->jobUID(), connections);

int numberOfRunningJobsWithSameType = this->getSameTypeJobsInThreadPoolQueueOrRunning(job);
if (numberOfRunningJobsWithSameType >= job->maximumConcurrentJobsPerType())
{
return false;
}

logger.debug(QString("ctkDICOMScheduler: creating worker for job %1 in thread %2.\n")
.arg(job->jobUID())
.arg(QString::number(reinterpret_cast<quint64>(QThread::currentThreadId())), 16));

QSharedPointer<ctkAbstractWorker> worker = QSharedPointer<ctkAbstractWorker>(job->createWorker());
worker->setScheduler(*q);
this->Workers.insert(job->jobUID(), worker);

job->setStatus(ctkAbstractJob::JobStatus::Queued);
emit q->jobQueued(job->toVariant());

this->ThreadPool->start(worker.data(), job->priority());
}

emit q->jobInitialized(job->toVariant());
this->queueJobsInThreadPool();
return true;
}

Expand All @@ -205,11 +231,11 @@ bool ctkJobSchedulerPrivate::cleanJob(const QString &jobUID)
.arg(QString::number(reinterpret_cast<quint64>(QThread::currentThreadId()), 16)));

{
// The QMutexLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QMutexLockers within the scheduler's methods.
QMutexLocker locker(&this->QueueMutex);
// The QWriteLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QWriteLockers within the scheduler's methods.
QWriteLocker locker(&this->QueueLock);
QSharedPointer<ctkAbstractJob> job = this->JobsQueue.value(jobUID);
if (!job || !this->JobsConnections.contains(jobUID))
if (!job)
{
return false;
}
Expand All @@ -228,9 +254,9 @@ void ctkJobSchedulerPrivate::cleanJobs(const QStringList &jobUIDs)

QList<QVariant> dataObjects;
{
// The QMutexLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QMutexLockers within the scheduler's methods.
QMutexLocker locker(&this->QueueMutex);
// The QReadLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QReadLockers within the scheduler's methods.
QReadLocker locker(&this->QueueLock);

foreach (QString jobUID, jobUIDs)
{
Expand All @@ -256,9 +282,9 @@ bool ctkJobSchedulerPrivate::removeJob(const QString& jobUID)
.arg(QString::number(reinterpret_cast<quint64>(QThread::currentThreadId()), 16)));

{
// The QMutexLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QMutexLockers within the scheduler's methods.
QMutexLocker locker(&this->QueueMutex);
// The QWriteLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QWriteLockers within the scheduler's methods.
QWriteLocker locker(&this->QueueLock);
QSharedPointer<ctkAbstractJob> job = this->JobsQueue.value(jobUID);
if (!job || !this->JobsConnections.contains(jobUID))
{
Expand All @@ -284,11 +310,10 @@ bool ctkJobSchedulerPrivate::removeJob(const QString& jobUID)
//------------------------------------------------------------------------------
void ctkJobSchedulerPrivate::removeJobs(const QStringList &jobUIDs)
{
QList<QVariant> dataObjects;
{
// The QMutexLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QMutexLockers within the scheduler's methods.
QMutexLocker locker(&this->QueueMutex);
// The QWriteLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QWriteLockers within the scheduler's methods.
QWriteLocker locker(&this->QueueLock);

foreach (QString jobUID, jobUIDs)
{
Expand All @@ -298,8 +323,6 @@ void ctkJobSchedulerPrivate::removeJobs(const QStringList &jobUIDs)
continue;
}

dataObjects.append(job->toVariant());

QMap<QString, QMetaObject::Connection> connections = this->JobsConnections.value(jobUID);
QObject::disconnect(connections.value("started"));
QObject::disconnect(connections.value("userStopped"));
Expand Down Expand Up @@ -377,7 +400,6 @@ ctkJobScheduler::ctkJobScheduler(ctkJobSchedulerPrivate* pimpl, QObject* parent)
// --------------------------------------------------------------------------
ctkJobScheduler::~ctkJobScheduler()
{
this->setFreezeJobsScheduling(true);
this->stopAllJobs(true);
// stopAllJobs is not main thread blocking. Therefore we need actually
// to wait the jobs to end (either finished or stopped) before closing the application.
Expand All @@ -401,9 +423,9 @@ int ctkJobScheduler::numberOfJobs()
Q_D(ctkJobScheduler);
int numberOfJobs = 0;
{
// The QMutexLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QMutexLockers within the scheduler's methods.
QMutexLocker locker(&d->QueueMutex);
// The QReadLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QReadLockers within the scheduler's methods.
QReadLocker locker(&d->QueueLock);
numberOfJobs = d->JobsQueue.count();
}
return numberOfJobs;
Expand All @@ -415,9 +437,9 @@ int ctkJobScheduler::numberOfPersistentJobs()
Q_D(ctkJobScheduler);
int numberOfPersistentJobs = 0;
{
// The QMutexLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QMutexLockers within the scheduler's methods.
QMutexLocker locker(&d->QueueMutex);
// The QReadLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QReadLockers within the scheduler's methods.
QReadLocker locker(&d->QueueLock);
foreach (QSharedPointer<ctkAbstractJob> job, d->JobsQueue)
{
if (job->isPersistent())
Expand All @@ -437,9 +459,9 @@ int ctkJobScheduler::numberOfRunningJobs()

int numberOfRunningJobs = 0;
{
// The QMutexLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QMutexLockers within the scheduler's methods.
QMutexLocker locker(&d->QueueMutex);
// The QReadLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QReadLockers within the scheduler's methods.
QReadLocker locker(&d->QueueLock);
foreach (QSharedPointer<ctkAbstractJob> job, d->JobsQueue)
{
if (job->status() <= ctkAbstractJob::JobStatus::Running)
Expand Down Expand Up @@ -503,9 +525,9 @@ QSharedPointer<ctkAbstractJob> ctkJobScheduler::getJobSharedByUID(const QString&

QSharedPointer<ctkAbstractJob> job = nullptr;
{
// The QMutexLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QMutexLockers within the scheduler's methods.
QMutexLocker locker(&d->QueueMutex);
// The QReadLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QReadLockers within the scheduler's methods.
QReadLocker locker(&d->QueueLock);
QMap<QString, QSharedPointer<ctkAbstractJob>>::iterator it = d->JobsQueue.find(jobUID);
if (it == d->JobsQueue.end())
{
Expand Down Expand Up @@ -566,11 +588,12 @@ QStringList ctkJobScheduler::stopAllJobs(bool stopPersistentJobs, bool removeJob
{
Q_D(ctkJobScheduler);

d->FreezeJobsScheduling = true;
QStringList stoppedJobsUIDs;
{
// The QMutexLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QMutexLockers within the scheduler's methods.
QMutexLocker locker(&d->QueueMutex);
// The QReadLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QReadLockers within the scheduler's methods.
QReadLocker locker(&d->QueueLock);

// Stops jobs without a worker (in waiting, still in main thread).
foreach (QSharedPointer<ctkAbstractJob> job, d->JobsQueue)
Expand All @@ -586,7 +609,7 @@ QStringList ctkJobScheduler::stopAllJobs(bool stopPersistentJobs, bool removeJob
}

QString jobUID = job->jobUID();
if (!d->JobsConnections.contains(jobUID))
if (jobUID.isEmpty() || !d->JobsConnections.contains(jobUID))
{
continue;
}
Expand All @@ -600,6 +623,7 @@ QStringList ctkJobScheduler::stopAllJobs(bool stopPersistentJobs, bool removeJob
QMap<QString, QMetaObject::Connection> connections = d->JobsConnections.value(jobUID);
QObject::disconnect(connections.value("userStopped"));
job->setStatus(ctkAbstractJob::JobStatus::UserStopped);
d->BatchedJobsUserStopped.append(job->toVariant());
stoppedJobsUIDs.append(jobUID);
}
}
Expand All @@ -626,6 +650,13 @@ QStringList ctkJobScheduler::stopAllJobs(bool stopPersistentJobs, bool removeJob
worker->requestCancel();
}

d->FreezeJobsScheduling = false;

if (!d->ThrottleTimer->isActive())
{
d->ThrottleTimer->start(d->ThrottleTimeInterval);
}

return stoppedJobsUIDs;
}

Expand All @@ -641,9 +672,9 @@ void ctkJobScheduler::stopJobsByJobUIDs(const QStringList &jobUIDs, bool removeJ

QStringList initializedStoppedJobsUIDs;
{
// The QMutexLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QMutexLockers within the scheduler's methods.
QMutexLocker locker(&d->QueueMutex);
// The QWriteLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QWriteLockers within the scheduler's methods.
QWriteLocker locker(&d->QueueLock);

// Stops jobs without a worker (in waiting, still in main thread)
foreach (QSharedPointer<ctkAbstractJob> job, d->JobsQueue)
Expand Down Expand Up @@ -674,6 +705,7 @@ void ctkJobScheduler::stopJobsByJobUIDs(const QStringList &jobUIDs, bool removeJ
QMap<QString, QMetaObject::Connection> connections = d->JobsConnections.value(jobUID);
QObject::disconnect(connections.value("userStopped"));
job->setStatus(ctkAbstractJob::JobStatus::UserStopped);
d->BatchedJobsUserStopped.append(job->toVariant());
initializedStoppedJobsUIDs.append(job->jobUID());
}
}
Expand Down Expand Up @@ -702,6 +734,11 @@ void ctkJobScheduler::stopJobsByJobUIDs(const QStringList &jobUIDs, bool removeJ
worker->requestCancel();
}
}

if (!d->ThrottleTimer->isActive())
{
d->ThrottleTimer->start(d->ThrottleTimeInterval);
}
}

//----------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion Libs/Core/ctkJobScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class CTK_CORE_EXPORT ctkJobScheduler : public QObject
int numberOfJobs();
int numberOfPersistentJobs();
int numberOfRunningJobs();
Q_INVOKABLE void addJob(ctkAbstractJob* job);
Q_INVOKABLE virtual void addJob(ctkAbstractJob* job);
Q_INVOKABLE virtual void resetJob(const QString& jobUID);
Q_INVOKABLE virtual void deleteJob(const QString& jobUID);
Q_INVOKABLE virtual void deleteJobs(const QStringList& jobUIDs);
Expand Down
5 changes: 3 additions & 2 deletions Libs/Core/ctkJobScheduler_p.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#define __ctkJobSchedulerPrivate_h

// Qt includes
#include <QMutex>
#include <QReadWriteLock>
#include <QSharedPointer>
#include <QTimer>
class QThreadPool;
Expand Down Expand Up @@ -61,7 +61,7 @@ class CTK_CORE_EXPORT ctkJobSchedulerPrivate : public QObject
virtual void queueJobsInThreadPool();
virtual void clearBactchedJobsLists();

QMutex QueueMutex;
QReadWriteLock QueueLock;

int RetryDelay{100};
int MaximumNumberOfRetry{3};
Expand All @@ -71,6 +71,7 @@ class CTK_CORE_EXPORT ctkJobSchedulerPrivate : public QObject
QMap<QString, QSharedPointer<ctkAbstractJob>> JobsQueue;
QMap<QString, QMap<QString, QMetaObject::Connection>> JobsConnections;
QMap<QString, QSharedPointer<ctkAbstractWorker>> Workers;
QMap<QString, int> RunningJobsByJobClass;
QList<QVariant> BatchedJobsStarted;
QList<QVariant> BatchedJobsUserStopped;
QList<QVariant> BatchedJobsFinished;
Expand Down
Loading
Loading