ThreadWeaver

queue.cpp
1/* -*- C++ -*-
2 The Queue class in ThreadWeaver.
3
4 SPDX-FileCopyrightText: 2005-2013 Mirko Boehm <mirko@kde.org>
5
6 SPDX-License-Identifier: LGPL-2.0-or-later
7*/
8
9#include <QCoreApplication>
10#include <QList>
11#include <QMutex>
12
13#include "queue.h"
14#include "weaver.h"
15
16using namespace ThreadWeaver;
17
18namespace
19{
20static Queue::GlobalQueueFactory *globalQueueFactory;
21}
22
23class Q_DECL_HIDDEN Queue::Private
24{
25public:
26 Private(Queue *q, QueueSignals *queue)
27 : implementation(queue)
28 {
29 Q_ASSERT_X(qApp != nullptr, Q_FUNC_INFO, "Cannot create global ThreadWeaver instance before QApplication!");
30 Q_ASSERT(queue);
31 queue->setParent(q);
32 q->connect(implementation, SIGNAL(finished()), SIGNAL(finished()));
33 q->connect(implementation, SIGNAL(suspended()), SIGNAL(suspended()));
34 }
35
36 QueueSignals *implementation;
37 void init(QueueSignals *implementation);
38};
39
40/** @brief Construct a Queue. */
42 : QueueSignals(parent)
43 , d(new Private(this, new Weaver))
44{
45}
46
47/** @brief Construct a Queue, specifying the QueueSignals implementation to use.
48 *
49 * The QueueSignals instance is usually a Weaver object, which may be customized for specific
50 * application needs. The Weaver instance will take ownership of the implementation object and
51 * deletes it when destructed.
52 * @see Weaver
53 * @see GlobalQueueFactory
54 */
55Queue::Queue(QueueSignals *implementation, QObject *parent)
56 : QueueSignals(parent)
57 , d(new Private(this, implementation))
58{
59}
60
61/** @brief Destruct the Queue object.
62 *
63 * If the queue is not already in Destructed state, the destructor will call shutDown() to make sure
64 * enqueued jobs are completed and the queue is idle.
65 * The queue implementation will be destroyed.
66 * @see shutDown()
67 * @see ThreadWeaver::Destructed
68 */
70{
71 if (d->implementation->state()->stateId() != Destructed) {
72 d->implementation->shutDown();
73 }
74 delete d->implementation;
75 delete d;
76}
77
78/** @brief Create a QueueStream to enqueue jobs into this queue. */
80{
81 return QueueStream(this);
82}
83
85{
86 d->implementation->shutDown();
87}
88
89/** @brief Set the factory object that will create the global queue.
90 *
91 * Once set, the global queue factory will be deleted when the global ThreadWeaver pool is deleted.
92 * The factory object needs to be set before the global ThreadWeaver pool is instantiated. Call this
93 * method before Q(Core)Application is constructed. */
95{
96 if (globalQueueFactory) {
97 delete globalQueueFactory;
98 }
99 globalQueueFactory = factory;
100}
101
102const State *Queue::state() const
103{
104 return d->implementation->state();
105}
106
107namespace
108{
109class StaticThreadWeaverInstanceGuard : public QObject
110{
111 Q_OBJECT
112public:
113 explicit StaticThreadWeaverInstanceGuard(QAtomicPointer<Queue> &instance, QCoreApplication *app)
114 : QObject(app)
115 , instance_(instance)
116 {
117 Q_ASSERT_X(app != nullptr, Q_FUNC_INFO, "Calling ThreadWeaver::Weaver::instance() requires a QCoreApplication!");
118 QObject *impl = instance.loadRelaxed()->findChild<QueueSignals *>();
119 Q_ASSERT(impl);
120 impl->setObjectName(QStringLiteral("GlobalQueue"));
121 qAddPostRoutine(shutDownGlobalQueue);
122 }
123
124 ~StaticThreadWeaverInstanceGuard() override
125 {
126 instance_.fetchAndStoreOrdered(nullptr);
127 delete globalQueueFactory;
128 globalQueueFactory = nullptr;
129 }
130
131private:
132 static void shutDownGlobalQueue()
133 {
135 Q_ASSERT(Queue::instance()->state()->stateId() == Destructed);
136 }
137
138 QAtomicPointer<Queue> &instance_;
139};
140
141}
142
143/** @brief Access the application-global Queue.
144 *
145 * In some cases, the global queue is sufficient for the applications purpose. The global queue will only be
146 * created if this method is actually called in the lifetime of the application.
147 *
148 * The Q(Core)Application object must exist when instance() is called for the first time.
149 * The global queue will be destroyed when Q(Core)Application is destructed. After that, the instance() method
150 * returns zero.
151 */
153{
154 static QAtomicPointer<Queue> s_instance(globalQueueFactory ? globalQueueFactory->create(qApp) : new Queue(qApp));
155 // Order is of importance here:
156 // When s_instanceGuard is destructed (first, before s_instance), it sets the value of s_instance to zero. Next, qApp will delete
157 // the object s_instance pointed to.
158 static StaticThreadWeaverInstanceGuard *s_instanceGuard = new StaticThreadWeaverInstanceGuard(s_instance, qApp);
159 Q_UNUSED(s_instanceGuard);
160 Q_ASSERT_X(s_instance.loadRelaxed() == nullptr //
161 || s_instance.loadRelaxed()->thread() == QCoreApplication::instance()->thread(),
162 Q_FUNC_INFO,
163 "The global ThreadWeaver queue needs to be instantiated (accessed first) from the main thread!");
164 return s_instance.loadAcquire();
165}
166
168{
169 d->implementation->enqueue(jobs);
170}
171
172void Queue::enqueue(const JobPointer &job)
173{
174 enqueue(QList<JobPointer>() << job);
175}
176
178{
179 return d->implementation->dequeue(job);
180}
181
183{
184 return d->implementation->dequeue();
185}
186
188{
189 return d->implementation->finish();
190}
191
193{
194 return d->implementation->suspend();
195}
196
198{
199 return d->implementation->resume();
200}
201
202bool Queue::isEmpty() const
203{
204 return d->implementation->isEmpty();
205}
206
207bool Queue::isIdle() const
208{
209 return d->implementation->isIdle();
210}
211
213{
214 return d->implementation->queueLength();
215}
216
218{
219 d->implementation->setMaximumNumberOfThreads(cap);
220}
221
223{
224 return d->implementation->currentNumberOfThreads();
225}
226
228{
229 return d->implementation->maximumNumberOfThreads();
230}
231
233{
234 d->implementation->requestAbort();
235}
236
238{
239 d->implementation->reschedule();
240}
241
242#include "moc_queue.cpp"
243#include "queue.moc"
virtual bool isEmpty() const =0
Is the queue empty? The queue is empty if no more jobs are queued.
virtual void requestAbort()=0
Request aborts of the currently executed jobs.
virtual void enqueue(const QList< JobPointer > &jobs)=0
Queue a vector of jobs.
virtual bool isIdle() const =0
Is the weaver idle? The weaver is idle if no jobs are queued and no jobs are processed by the threads...
virtual int currentNumberOfThreads() const =0
Returns the current number of threads in the inventory.
virtual void finish()=0
Finish all queued operations, then return.
virtual bool dequeue(const JobPointer &job)=0
Remove a job from the queue.
virtual int maximumNumberOfThreads() const =0
Get the maximum number of threads this Weaver may start.
virtual int queueLength() const =0
Returns the number of pending jobs.
virtual void shutDown()=0
Shut down the queue.
virtual void setMaximumNumberOfThreads(int cap)=0
Set the maximum number of threads this Weaver object may start.
virtual const State * state() const =0
Return the state of the weaver object.
virtual void suspend()=0
Suspend job execution.
virtual void reschedule()=0
Reschedule the jobs in the queue.
virtual void resume()=0
Resume job queueing.
QueueSignals declares the Qt signals shared by the Queue and Weaver classes.
QueueStream implements a stream based API to access ThreadWeaver queues.
Definition queuestream.h:22
Queue implements a ThreadWeaver job queue.
Definition queue.h:36
void resume() override
Resume job queueing.
Definition queue.cpp:197
void dequeue() override
Remove all queued jobs.
Definition queue.cpp:182
void setMaximumNumberOfThreads(int cap) override
Set the maximum number of threads this Weaver object may start.
Definition queue.cpp:217
void shutDown() override
Shut down the queue.
Definition queue.cpp:84
int maximumNumberOfThreads() const override
Get the maximum number of threads this Weaver may start.
Definition queue.cpp:227
void suspend() override
Suspend job execution.
Definition queue.cpp:192
void requestAbort() override
Request aborts of the currently executed jobs.
Definition queue.cpp:232
static ThreadWeaver::Queue * instance()
Access the application-global Queue.
Definition queue.cpp:152
void finish() override
Finish all queued operations, then return.
Definition queue.cpp:187
~Queue() override
Destruct the Queue object.
Definition queue.cpp:69
int currentNumberOfThreads() const override
Returns the current number of threads in the inventory.
Definition queue.cpp:222
Queue(QObject *parent=nullptr)
Construct a Queue.
Definition queue.cpp:41
bool isEmpty() const override
Is the queue empty? The queue is empty if no more jobs are queued.
Definition queue.cpp:202
void reschedule() override
Reschedule the jobs in the queue.
Definition queue.cpp:237
QueueStream stream()
Create a QueueStream to enqueue jobs into this queue.
Definition queue.cpp:79
int queueLength() const override
Returns the number of pending jobs.
Definition queue.cpp:212
const State * state() const override
Return the state of the weaver object.
Definition queue.cpp:102
bool isIdle() const override
Is the weaver idle? The weaver is idle if no jobs are queued and no jobs are processed by the threads...
Definition queue.cpp:207
static void setGlobalQueueFactory(GlobalQueueFactory *factory)
Set the factory object that will create the global queue.
Definition queue.cpp:94
void enqueue(const QList< JobPointer > &jobs) override
Queue a vector of jobs.
Definition queue.cpp:167
We use a State pattern to handle the system state in ThreadWeaver.
Definition state.h:56
virtual StateId stateId() const =0
The state Id.
A Weaver manages worker threads.
Definition weaver.h:35
QCA_EXPORT void init()
T * loadAcquire() const const
T * loadRelaxed() const const
QCoreApplication * instance()
QMetaObject::Connection connect(const QObject *sender, PointerToMemberFunction signal, Functor functor)
void setObjectName(QAnyStringView name)
void setParent(QObject *parent)
QThread * thread() const const
Interface for the global queue factory.
Definition queue.h:69
This file is part of the KDE documentation.
Documentation copyright © 1996-2025 The KDE developers.
Generated on Fri Jan 3 2025 11:48:59 by doxygen 1.12.0 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.