11#include "scheduler_p.h"
15#include "workerconfig.h"
17#include <kprotocolinfo.h>
18#include <kprotocolmanager.h>
21#include <QDBusConnection>
22#include <QDBusMessage>
26#include <QThreadStorage>
29static const int s_idleWorkerLifetime = 3 * 60;
33static inline Worker *jobSWorker(
SimpleJob *job)
35 return SimpleJobPrivate::get(job)->m_worker;
38static inline void startJob(
SimpleJob *job, Worker *worker)
40 SimpleJobPrivate::get(job)->start(worker);
43class KIO::SchedulerPrivate
56 qDeleteAll(m_protocols);
59 SchedulerPrivate(
const SchedulerPrivate &) =
delete;
60 SchedulerPrivate &operator=(
const SchedulerPrivate &) =
delete;
64 Worker *m_workerOnHold =
nullptr;
66 bool m_ignoreConfigReparse =
false;
72 void removeWorkerOnHold();
74 bool isWorkerOnHoldFor(
const QUrl &url);
75 void updateInternalMetaData(
SimpleJob *job);
78 void setupWorker(KIO::Worker *worker,
const QUrl &url,
const QString &protocol,
bool newWorker,
const KIO::MetaData *config =
nullptr);
80 void slotWorkerDied(KIO::Worker *worker);
93static SchedulerPrivate *schedulerPrivate()
95 if (!s_storage.hasLocalData()) {
96 s_storage.setLocalData(
new SchedulerPrivate);
98 return s_storage.localData();
101Scheduler *Scheduler::self()
103 return schedulerPrivate()->q;
106SchedulerPrivate *Scheduler::d_func()
108 return schedulerPrivate();
112Scheduler *scheduler()
114 return schedulerPrivate()->q;
119WorkerManager::WorkerManager()
121 m_grimTimer.setSingleShot(
true);
125WorkerManager::~WorkerManager()
130void WorkerManager::returnWorker(Worker *worker)
134 m_idleWorkers.insert(worker->host(), worker);
135 scheduleGrimReaper();
138Worker *WorkerManager::takeWorkerForJob(
SimpleJob *job)
140 Worker *worker = schedulerPrivate()->heldWorkerForJob(job);
145 QUrl url = SimpleJobPrivate::get(job)->m_url;
148 if (it == m_idleWorkers.end()) {
149 it = m_idleWorkers.
begin();
151 if (it == m_idleWorkers.end()) {
155 m_idleWorkers.erase(it);
159bool WorkerManager::removeWorker(Worker *worker)
163 for (; it != m_idleWorkers.
end(); ++it) {
164 if (it.
value() == worker) {
165 m_idleWorkers.erase(it);
172void WorkerManager::clear()
174 m_idleWorkers.clear();
179 return m_idleWorkers.values();
182void WorkerManager::scheduleGrimReaper()
184 if (!m_grimTimer.isActive()) {
185 m_grimTimer.start((s_idleWorkerLifetime / 2) * 1000);
190void WorkerManager::grimReaper()
193 while (it != m_idleWorkers.end()) {
194 Worker *worker = it.
value();
195 if (worker->idleTime() >= s_idleWorkerLifetime) {
196 it = m_idleWorkers.erase(it);
206 if (!m_idleWorkers.isEmpty()) {
207 scheduleGrimReaper();
211int HostQueue::lowestSerial()
const
214 if (first != m_queuedJobs.constEnd()) {
217 return SerialPicker::maxSerial;
222 const int serial = SimpleJobPrivate::get(job)->m_schedSerial;
223 Q_ASSERT(serial != 0);
224 Q_ASSERT(!m_queuedJobs.contains(serial));
225 Q_ASSERT(!m_runningJobs.contains(job));
226 m_queuedJobs.insert(serial, job);
231 Q_ASSERT(!m_queuedJobs.isEmpty());
234 m_queuedJobs.erase(first);
235 m_runningJobs.insert(job);
241 const int serial = SimpleJobPrivate::get(job)->m_schedSerial;
242 if (m_runningJobs.remove(job)) {
243 Q_ASSERT(!m_queuedJobs.contains(serial));
246 if (m_queuedJobs.remove(serial)) {
255 ret.
reserve(m_runningJobs.size());
257 Worker *worker = jobSWorker(job);
266 Q_UNUSED(queuesBySerial);
267#ifdef SCHEDULER_DEBUG
270 auto it = queuesBySerial->
cbegin();
271 for (; it != queuesBySerial->
cend(); ++it) {
281 Q_UNUSED(runningJobsCount);
282#ifdef SCHEDULER_DEBUG
283 int realRunningJobsCount = 0;
284 auto it = queues->
cbegin();
285 for (; it != queues->
cend(); ++it) {
286 realRunningJobsCount += it.
value().runningJobsCount();
288 Q_ASSERT(realRunningJobsCount == runningJobsCount);
292 auto it2 = queues->
cbegin();
293 for (; it2 != queues->
cend(); ++it2) {
294 for (
SimpleJob *job : it2.value().runningJobs()) {
302ProtoQueue::ProtoQueue(
int maxWorkers,
int maxWorkersPerHost)
303 : m_maxConnectionsPerHost(maxWorkersPerHost ? maxWorkersPerHost : maxWorkers)
304 , m_maxConnectionsTotal(qMax(maxWorkers, maxWorkersPerHost))
305 , m_runningJobsCount(0)
310 Q_ASSERT(m_maxConnectionsPerHost >= 1);
311 Q_ASSERT(maxWorkers >= maxWorkersPerHost);
312 m_startJobTimer.setSingleShot(
true);
316ProtoQueue::~ProtoQueue()
321 m_workerManager.clear();
322 for (Worker *worker : workers) {
331 HostQueue &hq = m_queuesByHostname[
hostname];
332 const int prevLowestSerial = hq.lowestSerial();
333 Q_ASSERT(hq.runningJobsCount() <= m_maxConnectionsPerHost);
336 Q_ASSERT(SimpleJobPrivate::get(job)->m_schedSerial == 0);
337 SimpleJobPrivate::get(job)->m_schedSerial = m_serialPicker.next();
339 const bool wasQueueEmpty = hq.isQueueEmpty();
344 if (prevLowestSerial != hq.lowestSerial()) {
345 if (hq.runningJobsCount() < m_maxConnectionsPerHost) {
347 if (m_queuesBySerial.remove(prevLowestSerial) == 0) {
348 Q_UNUSED(wasQueueEmpty);
349 Q_ASSERT(wasQueueEmpty);
351 m_queuesBySerial.insert(hq.lowestSerial(), &hq);
353#ifdef SCHEDULER_DEBUG
357 Q_ASSERT(!m_queuesBySerial.contains(prevLowestSerial));
362 m_startJobTimer.start();
364 ensureNoDuplicates(&m_queuesBySerial);
367void ProtoQueue::removeJob(
SimpleJob *job)
369 SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(job);
370 HostQueue &hq = m_queuesByHostname[jobPriv->m_url.host()];
371 const int prevLowestSerial = hq.lowestSerial();
372 const int prevRunningJobs = hq.runningJobsCount();
374 Q_ASSERT(hq.runningJobsCount() <= m_maxConnectionsPerHost);
376 if (hq.removeJob(job)) {
377 if (hq.lowestSerial() != prevLowestSerial) {
379 Q_ASSERT(!jobPriv->m_worker);
380 Q_ASSERT(prevRunningJobs == hq.runningJobsCount());
381 if (m_queuesBySerial.remove(prevLowestSerial) == 0) {
383 Q_ASSERT(hq.runningJobsCount() == m_maxConnectionsPerHost);
386 if (prevRunningJobs != hq.runningJobsCount()) {
388 Q_ASSERT(prevRunningJobs - 1 == hq.runningJobsCount());
389 m_runningJobsCount--;
390 Q_ASSERT(m_runningJobsCount >= 0);
393 if (!hq.isQueueEmpty() && hq.runningJobsCount() < m_maxConnectionsPerHost) {
395 m_queuesBySerial.insert(hq.lowestSerial(), &hq);
400 m_queuesByHostname.remove(jobPriv->m_url.host());
403 if (jobPriv->m_worker && jobPriv->m_worker->isAlive()) {
404 m_workerManager.returnWorker(jobPriv->m_worker);
407 m_startJobTimer.start();
410 ensureNoDuplicates(&m_queuesBySerial);
417 Worker *worker = Worker::createWorker(protocol, url, error, errortext);
419 connect(worker, &Worker::workerDied, scheduler(), [](KIO::Worker *worker) {
420 schedulerPrivate()->slotWorkerDied(worker);
423 qCWarning(KIO_CORE) <<
"couldn't create worker:" << errortext;
431bool ProtoQueue::removeWorker(KIO::Worker *worker)
433 const bool removed = m_workerManager.removeWorker(worker);
440 auto it = m_queuesByHostname.
cbegin();
441 for (; it != m_queuesByHostname.
cend(); ++it) {
449void ProtoQueue::startAJob()
451 ensureNoDuplicates(&m_queuesBySerial);
452 verifyRunningJobsCount(&m_queuesByHostname, m_runningJobsCount);
454#ifdef SCHEDULER_DEBUG
456 auto it = m_queuesByHostname.
cbegin();
457 for (; it != m_queuesByHostname.
cend(); ++it) {
464 if (m_runningJobsCount >= m_maxConnectionsTotal) {
465#ifdef SCHEDULER_DEBUG
472 if (first != m_queuesBySerial.end()) {
474 HostQueue *hq = first.value();
475 const int prevLowestSerial = first.key();
476 Q_UNUSED(prevLowestSerial);
477 Q_ASSERT(hq->lowestSerial() == prevLowestSerial);
480 Q_ASSERT(hq->runningJobsCount() < m_maxConnectionsPerHost);
481 SimpleJob *startingJob = hq->takeFirstInQueue();
482 Q_ASSERT(hq->runningJobsCount() <= m_maxConnectionsPerHost);
483 Q_ASSERT(hq->lowestSerial() != prevLowestSerial);
485 m_queuesBySerial.erase(first);
488 if (!hq->isQueueEmpty() && hq->runningJobsCount() < m_maxConnectionsPerHost) {
489 m_queuesBySerial.insert(hq->lowestSerial(), hq);
497 m_runningJobsCount++;
499 bool isNewWorker =
false;
500 Worker *worker = m_workerManager.takeWorkerForJob(startingJob);
501 SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(startingJob);
504 worker = createWorker(jobPriv->m_protocol, startingJob, jobPriv->m_url);
508 jobPriv->m_worker = worker;
509 schedulerPrivate()->setupWorker(worker, jobPriv->m_url, jobPriv->m_protocol, isNewWorker);
510 startJob(startingJob, worker);
516 if (jobPriv->m_schedSerial) {
517 removeJob(startingJob);
518 jobPriv->m_schedSerial = 0;
522#ifdef SCHEDULER_DEBUG
527 if (!m_queuesBySerial.isEmpty()) {
528 m_startJobTimer.start();
532Scheduler::Scheduler()
537 const QString dbusPath = QStringLiteral(
"/KIO/Scheduler");
538 const QString dbusInterface = QStringLiteral(
"org.kde.KIO.Scheduler");
546 QStringLiteral(
"reparseSlaveConfiguration"),
552Scheduler::~Scheduler()
558 schedulerPrivate()->doJob(job);
564 schedulerPrivate()->cancelJob(job);
567void Scheduler::jobFinished(
KIO::SimpleJob *job, KIO::Worker *worker)
569 schedulerPrivate()->jobFinished(job, worker);
574 schedulerPrivate()->putWorkerOnHold(job, url);
577void Scheduler::removeWorkerOnHold()
579 schedulerPrivate()->removeWorkerOnHold();
582bool Scheduler::isWorkerOnHoldFor(
const QUrl &url)
584 return schedulerPrivate()->isWorkerOnHoldFor(url);
587void Scheduler::updateInternalMetaData(
SimpleJob *job)
589 schedulerPrivate()->updateInternalMetaData(job);
592void Scheduler::emitReparseSlaveConfiguration()
600 schedulerPrivate()->m_ignoreConfigReparse =
true;
605void SchedulerPrivate::slotReparseSlaveConfiguration(
const QString &proto,
const QDBusMessage &)
607 if (m_ignoreConfigReparse) {
609 m_ignoreConfigReparse =
false;
615 WorkerConfig::self()->reset();
630 for (; it != endIt; ++it) {
632 for (Worker *worker : list) {
633 worker->send(CMD_REPARSECONFIGURATION);
640void SchedulerPrivate::doJob(
SimpleJob *job)
643 KIO::SimpleJobPrivate *
const jobPriv = SimpleJobPrivate::get(job);
644 jobPriv->m_protocol = job->
url().
scheme();
646 ProtoQueue *proto = protoQ(jobPriv->m_protocol, job->
url().
host());
647 proto->queueJob(job);
650void SchedulerPrivate::cancelJob(
SimpleJob *job)
652 KIO::SimpleJobPrivate *
const jobPriv = SimpleJobPrivate::get(job);
655 if (jobPriv->m_schedSerial == 0) {
659 Worker *worker = jobSWorker(job);
661 jobFinished(job, worker);
663 ProtoQueue *pq = m_protocols.
value(jobPriv->m_protocol);
665 pq->removeWorker(worker);
671void SchedulerPrivate::jobFinished(
SimpleJob *job, Worker *worker)
674 KIO::SimpleJobPrivate *
const jobPriv = SimpleJobPrivate::get(job);
677 Q_ASSERT(jobPriv->m_schedSerial);
679 ProtoQueue *pq = m_protocols.
value(jobPriv->m_protocol);
687 if (jobPriv->m_internalMetaData.count()) {
689 ProtoQueue *queue = m_protocols.
value(worker->protocol());
692 for (
auto *runningWorker : workers) {
693 if (worker->host() == runningWorker->host()) {
694 worker->setConfig(metaDataFor(worker->protocol(), job->
url()));
701 worker->setJob(
nullptr);
702 worker->disconnect(job);
704 jobPriv->m_schedSerial = 0;
705 jobPriv->m_worker =
nullptr;
708 jobPriv->m_internalMetaData.clear();
714 MetaData configData = WorkerConfig::self()->configData(protocol, host);
719void SchedulerPrivate::setupWorker(KIO::Worker *worker,
const QUrl &url,
const QString &protocol,
bool newWorker,
const KIO::MetaData *config)
721 int port = url.
port();
729 if (newWorker || worker->host() != host || worker->port() != port || worker->user() != user || worker->passwd() != passwd) {
730 MetaData configData = metaDataFor(protocol, url);
732 configData += *config;
735 worker->setConfig(configData);
736 worker->setProtocol(url.
scheme());
737 worker->setHost(host, port, user, passwd);
741void SchedulerPrivate::slotWorkerDied(KIO::Worker *worker)
745 Q_ASSERT(!worker->isAlive());
746 ProtoQueue *pq = m_protocols.
value(worker->protocol());
749 pq->removeJob(worker->job());
752 pq->removeWorker(worker);
754 if (worker == m_workerOnHold) {
755 m_workerOnHold =
nullptr;
759 worker->aboutToDelete();
760 worker->deleteLater();
765 Worker *worker = jobSWorker(job);
767 worker->disconnect(job);
770 worker->setJob(
nullptr);
771 SimpleJobPrivate::get(job)->m_worker =
nullptr;
773 if (m_workerOnHold) {
774 m_workerOnHold->kill();
776 m_workerOnHold = worker;
778 m_workerOnHold->suspend();
781bool SchedulerPrivate::isWorkerOnHoldFor(
const QUrl &url)
783 if (url.
isValid() && m_urlOnHold.
isValid() && url == m_urlOnHold) {
790Worker *SchedulerPrivate::heldWorkerForJob(
SimpleJob *job)
792 Worker *worker =
nullptr;
793 KIO::SimpleJobPrivate *
const jobPriv = SimpleJobPrivate::get(job);
795 if (m_workerOnHold) {
797 const int cmd = jobPriv->m_command;
798 bool canJobReuse = (cmd == CMD_GET);
801 canJobReuse = (canJobReuse || cmd == CMD_SPECIAL);
804 const QString resume = outgoing.
value(QStringLiteral(
"resume"));
805 const QString rangeStart = outgoing.
value(QStringLiteral(
"range-start"));
811 if (job->
url() == m_urlOnHold) {
814 worker = m_workerOnHold;
817 m_workerOnHold->kill();
819 m_workerOnHold =
nullptr;
827void SchedulerPrivate::removeWorkerOnHold()
830 if (m_workerOnHold) {
831 m_workerOnHold->kill();
833 m_workerOnHold =
nullptr;
837ProtoQueue *SchedulerPrivate::protoQ(
const QString &protocol,
const QString &host)
839 ProtoQueue *pq = m_protocols.
value(protocol,
nullptr);
844 int maxWorkersPerHost = -1;
847 const int value = WorkerConfig::self()->configData(protocol, host, QStringLiteral(
"MaxConnections")).toInt(&ok);
849 maxWorkersPerHost = value;
852 if (maxWorkersPerHost == -1) {
856 pq =
new ProtoQueue(maxWorkers, qMin(maxWorkers, maxWorkersPerHost));
857 m_protocols.
insert(protocol, pq);
862void SchedulerPrivate::updateInternalMetaData(
SimpleJob *job)
864 KIO::SimpleJobPrivate *
const jobPriv = SimpleJobPrivate::get(job);
867 const QUrl jobUrl = job->
url();
873 while (it.hasNext()) {
876 WorkerConfig::self()->setConfigData(jobUrl.
scheme(), jobUrl.
host(), it.
key().mid(currHostToken.size()), it.
value());
878 WorkerConfig::self()->setConfigData(jobUrl.
scheme(),
QString(), it.
key().mid(allHostsToken.size()), it.
value());
883#include "moc_scheduler.cpp"
884#include "moc_scheduler_p.cpp"
A simple job (one url and one command).
void slotError(int, const QString &)
const QUrl & url() const
Returns the SimpleJob's URL.
The transfer job pumps data into and/or out of a KIO worker.
static int maxWorkersPerHost(const QString &protocol)
Returns the limit on the number of KIO workers for this protocol per host.
static int maxWorkers(const QString &protocol)
Returns the soft limit on the number of KIO workers for this protocol.
static void reparseConfiguration()
Force a reload of the general config file of KIO workers ( kioslaverc).
A namespace for KIO globals.
void error(QWidget *parent, const QString &text, const QString &title, const KGuiItem &buttonOk, Options options=Notify)
KIOCORE_EXPORT QStringList list(const QString &fileClass)
Returns a list of directories associated with this file-class.
NETWORKMANAGERQT_EXPORT QString hostname()
bool connect(const QString &service, const QString &path, const QString &interface, const QString &name, QObject *receiver, const char *slot)
QDBusConnection sessionBus()
const_iterator cbegin() const const
const_iterator cend() const const
const_iterator constBegin() const const
const_iterator constEnd() const const
const_iterator constFind(const Key &key) const const
iterator insert(const Key &key, const T &value)
Key key(const T &value) const const
T value(const Key &key) const const
void append(QList< T > &&value)
void reserve(qsizetype size)
const_iterator cbegin() const const
const_iterator cend() const const
Key key(const T &value, const Key &defaultKey) const const
T value(const Key &key, const T &defaultValue) const const
const_iterator cbegin() const const
const_iterator cend() const const
iterator find(const Key &key, const T &value)
T value(const Key &key) const const
void setObjectName(QAnyStringView name)
bool contains(const QSet< T > &other) const const
iterator insert(const T &value)
bool isEmpty() const const
QFuture< ArgsType< Signal > > connect(Sender *sender, Signal signal)
QString host(ComponentFormattingOptions options) const const
bool isValid() const const
QString password(ComponentFormattingOptions options) const const
int port(int defaultPort) const const
QString scheme() const const
QString userName(ComponentFormattingOptions options) const const