13#include <QCoreApplication>
16#include <QDeadlineTimer>
17#include "debuggingaids.h"
18#include "destructedstate.h"
20#include "inconstructionstate.h"
22#include "managedjobpointer.h"
23#include "queuepolicy.h"
24#include "shuttingdownstate.h"
26#include "suspendedstate.h"
27#include "suspendingstate.h"
29#include "threadweaver.h"
31#include "workinghardstate.h"
33using namespace ThreadWeaver;
37 : QueueAPI(new Private::Weaver_Private(), parent)
39 qRegisterMetaType<ThreadWeaver::JobPointer>(
"ThreadWeaver::JobPointer");
44 setState_p(InConstruction);
51 setState_p(WorkingHard);
57 Q_ASSERT_X(
state()->stateId() == Destructed, Q_FUNC_INFO,
"shutDown() method was not called before Weaver destructor!");
69void Weaver::shutDown_p()
75 TWDEBUG(3,
"WeaverImpl::shutDown: destroying inventory.\n");
76 d()->semaphore.acquire(d()->createdThreads.loadAcquire());
81 d()->jobFinished.wakeAll();
95 if (d()->inventory.isEmpty()) {
98 th = d()->inventory.takeFirst();
102 Q_ASSERT(
state()->stateId() == ShuttingDown);
108 "WeaverImpl::shutDown: thread %i did not exit as expected, "
116 Q_ASSERT(d()->inventory.isEmpty());
117 TWDEBUG(3,
"WeaverImpl::shutDown: done\n");
133void Weaver::setState_p(StateId
id)
135 Q_ASSERT(!d()->mutex->tryLock());
136 State *newState = d()->states[id].data();
137 State *previous = d()->state.fetchAndStoreOrdered(newState);
138 if (previous ==
nullptr || previous->
stateId() !=
id) {
141 if (
id == Suspended) {
150 return d()->state.loadAcquire();
155 return d()->state.loadAcquire();
160 Q_ASSERT_X(cap >= 0,
"Weaver Impl",
"Thread inventory size has to be larger than or equal to zero.");
167void Weaver::setMaximumNumberOfThreads_p(
int cap)
169 Q_ASSERT(!d()->mutex->tryLock());
170 const bool createInitialThread = (d()->inventoryMax == 0 && cap > 0);
171 d()->inventoryMax = cap;
172 if (createInitialThread) {
184int Weaver::maximumNumberOfThreads_p()
const
186 Q_ASSERT(!d()->mutex->tryLock());
187 return d()->inventoryMax;
197int Weaver::currentNumberOfThreads_p()
const
199 Q_ASSERT(!d()->mutex->tryLock());
200 return d()->inventory.count();
212 Q_ASSERT(!d()->mutex->tryLock());
218 Q_ASSERT(job->status() == Job::Status_New);
220 TWDEBUG(3,
"WeaverImpl::enqueue: queueing job %p.\n", (
void *)job.data());
221 job->aboutToBeQueued(
this);
223 int i = d()->assignments.size();
225 while (i > 0 && d()->assignments.at(i - 1)->priority() < job->priority()) {
228 d()->assignments.insert(i, job);
230 d()->assignments.append(job);
232 job->setStatus(Job::Status_Queued);
247 Q_ASSERT(!d()->mutex->tryLock());
248 int position = d()->assignments.indexOf(job);
249 if (position != -1) {
250 job->aboutToBeDequeued(
this);
251 int newPosition = d()->assignments.indexOf(job);
252 JobPointer job = d()->assignments.takeAt(newPosition);
253 job->setStatus(Job::Status_New);
254 Q_ASSERT(!d()->assignments.contains(job));
255 TWDEBUG(3,
"WeaverImpl::dequeue: job %p dequeued, %i jobs left.\n", (
void *)job.
data(), queueLength_p());
257 d()->jobFinished.wakeAll();
258 Q_ASSERT(!d()->assignments.contains(job));
261 TWDEBUG(3,
"WeaverImpl::dequeue: job %p not found in queue.\n", (
void *)job.
data());
273void Weaver::dequeue_p()
275 Q_ASSERT(!d()->mutex->tryLock());
276 TWDEBUG(3,
"WeaverImpl::dequeue: dequeueing all jobs.\n");
277 for (
int index = 0; index < d()->assignments.size(); ++index) {
278 d()->assignments.at(index)->aboutToBeDequeued(
this);
280 d()->assignments.clear();
281 ENSURE(d()->assignments.isEmpty());
291void Weaver::finish_p()
293 Q_ASSERT(!d()->mutex->tryLock());
295 const int MaxWaitMilliSeconds = 50;
297 const int MaxWaitMilliSeconds = 500;
299 while (!isIdle_p()) {
300 Q_ASSERT_X(
state()->stateId() == WorkingHard, Q_FUNC_INFO, qPrintable(
state()->stateName()));
301 TWDEBUG(2,
"WeaverImpl::finish: not done, waiting.\n");
302 if (d()->jobFinished.wait(d()->mutex,
QDeadlineTimer(MaxWaitMilliSeconds)) ==
false) {
303 TWDEBUG(2,
"WeaverImpl::finish: wait timed out, %i jobs left, waking threads.\n", queueLength_p());
307 TWDEBUG(2,
"WeaverImpl::finish: done.\n\n\n");
317void Weaver::suspend_p()
329void Weaver::resume_p()
341bool Weaver::isEmpty_p()
const
343 Q_ASSERT(!d()->mutex->tryLock());
344 return d()->assignments.isEmpty();
354bool Weaver::isIdle_p()
const
356 Q_ASSERT(!d()->mutex->tryLock());
357 return isEmpty_p() && d()->active == 0;
367int Weaver::queueLength_p()
const
369 Q_ASSERT(!d()->mutex->tryLock());
370 return d()->assignments.count();
382 d()->jobAvailable.wakeAll();
385void Weaver::requestAbort_p()
387 Q_ASSERT(!d()->mutex->tryLock());
388 for (
int i = 0; i < d()->inventory.size(); ++i) {
389 d()->inventory[i]->requestAbort();
405 Q_ASSERT(!d()->mutex->tryLock());
407 const int reserve = d()->inventoryMax - d()->inventory.count();
410 for (
int i = 0; i < qMin(reserve, numberOfNewJobs); ++i) {
413 d()->inventory.append(th);
415 d()->createdThreads.ref();
417 "WeaverImpl::adjustInventory: thread created, "
418 "%i threads in inventory.\n",
419 currentNumberOfThreads_p());
424Private::Weaver_Private *Weaver::d()
426 return reinterpret_cast<Private::Weaver_Private *
>(QueueSignals::d());
429const Private::Weaver_Private *Weaver::d()
const
431 return reinterpret_cast<const Private::Weaver_Private *
>(QueueSignals::d());
455 d()->jobFinished.wakeAll();
464 Q_ASSERT(!d()->mutex->tryLock());
467 "WeaverImpl::adjustActiveThreadCount: %i active threads (%i jobs"
472 if (d()->assignments.isEmpty() && d()->active == 0) {
484 Q_ASSERT(!d()->mutex->tryLock());
491 d()->semaphore.release(1);
509 Q_ASSERT(threadWasBusy ==
false || (threadWasBusy ==
true && d()->active > 0));
510 TWDEBUG(3,
"WeaverImpl::takeFirstAvailableJobOrWait: trying to assign new job to thread %i (%s state).\n", th->
id(), qPrintable(
state()->stateName()));
512 "WeaverImpl::takeFirstAvailableJobOrWait: %i active threads, was busy: %s, suspend: %s, assign new job: %s.\n",
514 threadWasBusy ?
"yes" :
"no",
515 suspendIfInactive ?
"yes" :
"no",
516 !justReturning ?
"yes" :
"no");
517 d()->deleteExpiredThreads();
524 Q_ASSERT(d()->active >= 0);
526 if (suspendIfInactive && d()->active == 0 &&
state()->stateId() == Suspending) {
527 setState_p(Suspended);
531 if (
state()->stateId() != WorkingHard || justReturning) {
535 if (
state()->stateId() == WorkingHard && d()->inventory.size() > d()->inventoryMax) {
536 const int count = d()->inventory.removeAll(th);
537 Q_ASSERT(count == 1);
538 d()->expiredThreads.append(th);
539 throw AbortThread(QStringLiteral(
"Inventory size exceeded"));
543 for (
int index = 0; index < d()->assignments.size(); ++index) {
544 const JobPointer &candidate = d()->assignments.at(index);
545 if (d()->canBeExecuted(candidate)) {
547 d()->assignments.removeAt(index);
554 "WeaverImpl::takeFirstAvailableJobOrWait: job %p assigned to thread %i (%s state).\n",
557 qPrintable(
state()->stateName()));
575 return state()->applyForWork(th, wasBusy);
581 state()->waitForAvailableJob(th);
598 Q_ASSERT(!d()->mutex->tryLock());
599 TWDEBUG(4,
"WeaverImpl::blockThreadUntilJobsAreBeingAssigned_locked: thread %i blocked (%s state).\n", th->
id(), qPrintable(
state()->stateName()));
601 d()->jobAvailable.wait(d()->mutex);
602 TWDEBUG(4,
"WeaverImpl::blockThreadUntilJobsAreBeingAssigned_locked: thread %i resumed (%s state).\n", th->
id(), qPrintable(
state()->stateName()));
605#include "moc_weaver.cpp"
DestructedState is only active after the thread have been destroyed by the destructor,...
InConstructionState handles the calls to the Weaver object until the constructor has finished.
virtual bool isEmpty() const =0
Is the queue empty? The queue is empty if no more jobs are queued.
virtual void requestAbort()=0
Request aborts of the currently executed jobs.
virtual void enqueue(const QList< JobPointer > &jobs)=0
Queue a vector of jobs.
virtual bool isIdle() const =0
Is the weaver idle? The weaver is idle if no jobs are queued and no jobs are processed by the threads...
virtual int currentNumberOfThreads() const =0
Returns the current number of threads in the inventory.
virtual void finish()=0
Finish all queued operations, then return.
virtual bool dequeue(const JobPointer &job)=0
Remove a job from the queue.
virtual int maximumNumberOfThreads() const =0
Get the maximum number of threads this Weaver may start.
virtual int queueLength() const =0
Returns the number of pending jobs.
virtual void shutDown()=0
Shut down the queue.
virtual void setMaximumNumberOfThreads(int cap)=0
Set the maximum number of threads this Weaver object may start.
virtual void suspend()=0
Suspend job execution.
virtual void resume()=0
Resume job queueing.
void finished()
Emitted when the Queue has completed all jobs currently queued.
void suspended()
The Queue has been suspended.
void stateChanged(ThreadWeaver::State *)
Emitted when the processing state of the Queue has changed.
ShuttingDownState is enabled when the Weaver destructor is entered.
We use a State pattern to handle the system state in ThreadWeaver.
virtual void activated()
The state has been changed so that this object is responsible for state handling.
QString stateName() const
The ID of the current state.
virtual StateId stateId() const =0
The state Id.
In SuspendedState, jobs are queued, but will not be executed.
SuspendingState is the state after suspend() has been called, but before all threads finished executi...
Thread represents a worker thread in a Queue's inventory.
unsigned int id() const
Returns the thread id.
int queueLength() const override
Returns the number of pending jobs.
void enqueue(const QList< JobPointer > &jobs) override
Queue a vector of jobs.
void finish() override
Finish all queued operations, then return.
void decActiveThreadCount()
brief Decrement the count of active threads.
void threadEnteredRun(Thread *thread)
Called from a new thread when entering the run method.
virtual Thread * createThread()
Factory method to create the threads.
bool isEmpty() const override
Is the queue empty? The queue is empty if no more jobs are queued.
void incActiveThreadCount()
Increment the count of active threads.
void adjustInventory(int noOfNewJobs)
Adjust the inventory size.
void threadSuspended(ThreadWeaver::Thread *)
A thread has been suspended.
void blockThreadUntilJobsAreBeingAssigned_locked(Thread *th)
Blocks the calling thread until jobs can be assigned.
void reschedule() override
Reschedule the jobs in the queue.
void shutDown() override
Enter Destructed state.
const State * state() const override
Return the state of the weaver object.
void adjustActiveThreadCount(int diff)
Adjust active thread count.
JobPointer takeFirstAvailableJobOrSuspendOrWait(Thread *th, bool threadWasBusy, bool suspendIfAllThreadsInactive, bool justReturning)
Take the first available job out of the queue and return it.
~Weaver() override
Destructs a Weaver object.
int activeThreadCount()
Returns the number of active threads.
void dequeue() override
Remove all queued jobs.
void waitForAvailableJob(Thread *th) override
Wait for a job to become available.
int maximumNumberOfThreads() const override
Get the maximum number of threads this Weaver may start.
void threadExited(ThreadWeaver::Thread *)
A thread has exited.
void setState(StateId)
Set the Weaver state.
void suspend() override
Suspend job execution.
int currentNumberOfThreads() const override
Returns the current number of threads in the inventory.
void setMaximumNumberOfThreads(int cap) override
Set the maximum number of threads this Weaver object may start.
Weaver(QObject *parent=nullptr)
Constructs a Weaver object.
bool isIdle() const override
Is the weaver idle? The weaver is idle if no jobs are queued and no jobs are processed by the threads...
void threadStarted(ThreadWeaver::Thread *)
A Thread has been created.
void blockThreadUntilJobsAreBeingAssigned(Thread *th)
Blocks the calling thread until jobs can be assigned.
JobPointer applyForWork(Thread *thread, bool wasBusy) override
Assign a job to the calling thread.
void resume() override
Resume job queueing.
void requestAbort() override
Request aborts of the currently executed jobs.
const char * constData() const const
bool isEmpty() const const
void moveToThread(QThread *targetThread)
QThread * thread() const const
QByteArray toLatin1() const const
QThread * currentThread()
bool isFinished() const const
void start(Priority priority)
bool wait(QDeadlineTimer deadline)