Akonadi

resourcebase.cpp
1/*
2 SPDX-FileCopyrightText: 2006 Till Adam <adam@kde.org>
3 SPDX-FileCopyrightText: 2007 Volker Krause <vkrause@kde.org>
4
5 SPDX-License-Identifier: LGPL-2.0-or-later
6*/
7
8#include "resourcebase.h"
9#include "agentbase_p.h"
10
11#include "akonadifull-version.h"
12#include "collectiondeletejob.h"
13#include "collectionsync_p.h"
14#include "resourceadaptor.h"
15#include "resourcescheduler_p.h"
16#include "tagsync.h"
17#include "tracerinterface.h"
18#include <QDBusConnection>
19
20#include "changerecorder.h"
21#include "collectionfetchjob.h"
22#include "collectionfetchscope.h"
23#include "collectionmodifyjob.h"
24#include "favoritecollectionattribute.h"
25#include "invalidatecachejob_p.h"
26#include "itemcreatejob.h"
27#include "itemfetchjob.h"
28#include "itemfetchscope.h"
29#include "itemmodifyjob.h"
30#include "itemmodifyjob_p.h"
31#include "monitor_p.h"
32#include "recursivemover_p.h"
33#include "resourceselectjob_p.h"
34#include "servermanager_p.h"
35#include "session.h"
36#include "specialcollectionattribute.h"
37#include "tagmodifyjob.h"
38
39#include "akonadiagentbase_debug.h"
40
41#include "shared/akranges.h"
42#include <cstdlib>
43#include <iterator>
44
45#include <KAboutData>
46#include <KCrash>
47#include <KLocalizedString>
48
49#include <QApplication>
50#include <QHash>
51#include <QTimer>
52
53#define HAVE_KICONTHEME __has_include(<KIconTheme>)
54#if HAVE_KICONTHEME
55#include <KIconTheme>
56#endif
57
58#define HAVE_STYLE_MANAGER __has_include(<KStyleManager>)
59#if HAVE_STYLE_MANAGER
60#include <KStyleManager>
61#endif
62
63using namespace Akonadi;
64using namespace AkRanges;
65using namespace std::chrono_literals;
66class Akonadi::ResourceBasePrivate : public AgentBasePrivate
67{
68 Q_OBJECT
69 Q_CLASSINFO("D-Bus Interface", "org.kde.dfaure")
70
71public:
72 explicit ResourceBasePrivate(ResourceBase *parent)
73 : AgentBasePrivate(parent)
74 , scheduler(nullptr)
75 , mItemSyncer(nullptr)
76 , mItemTransactionMode(ItemSync::SingleTransaction)
77 , mItemMergeMode(ItemSync::RIDMerge)
78 , mCollectionSyncer(nullptr)
79 , mTagSyncer(nullptr)
80 , mHierarchicalRid(false)
81 , mUnemittedProgress(0)
82 , mAutomaticProgressReporting(true)
83 , mDisableAutomaticItemDeliveryDone(false)
84 , mItemSyncBatchSize(10)
85 , mCurrentCollectionFetchJob(nullptr)
86 , mScheduleAttributeSyncBeforeCollectionSync(false)
87 {
88 Internal::setClientType(Internal::Resource);
89 mStatusMessage = defaultReadyMessage();
90 mProgressEmissionCompressor.setInterval(1000);
91 mProgressEmissionCompressor.setSingleShot(true);
92 // HACK: skip local changes of the EntityDisplayAttribute by default. Remove this for KDE5 and adjust resource implementations accordingly.
93 mKeepLocalCollectionChanges << "ENTITYDISPLAY";
94 }
95
96 ~ResourceBasePrivate() override = default;
97
98 Q_DECLARE_PUBLIC(ResourceBase)
99
100 void delayedInit() override
101 {
102 const QString serviceId = ServerManager::agentServiceName(ServerManager::Resource, mId);
103 if (!QDBusConnection::sessionBus().registerService(serviceId)) {
105 if (reason.isEmpty()) {
106 reason = QStringLiteral("this service is probably running already.");
107 }
108 qCCritical(AKONADIAGENTBASE_LOG) << "Unable to register service" << serviceId << "at D-Bus:" << reason;
109
112 }
113 } else {
114 AgentBasePrivate::delayedInit();
115 }
116 }
117
118 void changeProcessed() override
119 {
120 if (m_recursiveMover) {
121 m_recursiveMover->changeProcessed();
122 QTimer::singleShot(0s, m_recursiveMover.data(), &RecursiveMover::replayNext);
123 return;
124 }
125
126 mChangeRecorder->changeProcessed();
127 if (!mChangeRecorder->isEmpty()) {
128 scheduler->scheduleChangeReplay();
129 }
130 scheduler->taskDone();
131 }
132
133 void slotAbortRequested();
134
135 void slotDeliveryDone(KJob *job);
136 void slotCollectionSyncDone(KJob *job);
137 void slotLocalListDone(KJob *job);
138 void slotSynchronizeCollection(const Collection &col);
139 void slotItemRetrievalCollectionFetchDone(KJob *job);
140 void slotCollectionListDone(KJob *job);
141 void slotSynchronizeCollectionAttributes(const Collection &col);
142 void slotCollectionListForAttributesDone(KJob *job);
143 void slotCollectionAttributesSyncDone(KJob *job);
144 void slotSynchronizeTags();
145 void slotAttributeRetrievalCollectionFetchDone(KJob *job);
146
147 void slotItemSyncDone(KJob *job);
148
149 void slotPercent(KJob *job, quint64 percent);
150 void slotDelayedEmitProgress();
151 void slotDeleteResourceCollection();
152 void slotDeleteResourceCollectionDone(KJob *job);
153 void slotCollectionDeletionDone(KJob *job);
154
155 void slotInvalidateCache(const Akonadi::Collection &collection);
156
157 void slotPrepareItemRetrieval(const Akonadi::Item &item);
158 void slotPrepareItemRetrievalResult(KJob *job);
159
160 void slotPrepareItemsRetrieval(const QList<Akonadi::Item> &item);
161 void slotPrepareItemsRetrievalResult(KJob *job);
162
163 void changeCommittedResult(KJob *job);
164
165 void slotRecursiveMoveReplay(RecursiveMover *mover);
166 void slotRecursiveMoveReplayResult(KJob *job);
167
168 void slotTagSyncDone(KJob *job);
169
170 void slotSessionReconnected()
171 {
172 Q_Q(ResourceBase);
173
174 new ResourceSelectJob(q->identifier());
175 }
176
177 void createItemSyncInstanceIfMissing()
178 {
179 Q_Q(ResourceBase);
180 Q_ASSERT_X(scheduler->currentTask().type == ResourceScheduler::SyncCollection,
181 "createItemSyncInstance",
182 "Calling items retrieval methods although no item retrieval is in progress");
183 if (!mItemSyncer) {
184 mItemSyncer = new ItemSync(q->currentCollection(), mCollectionSyncTimestamp);
185 mItemSyncer->setTransactionMode(mItemTransactionMode);
186 mItemSyncer->setBatchSize(mItemSyncBatchSize);
187 mItemSyncer->setMergeMode(mItemMergeMode);
188 mItemSyncer->setDisableAutomaticDeliveryDone(mDisableAutomaticItemDeliveryDone);
189 mItemSyncer->setProperty("collection", QVariant::fromValue(q->currentCollection()));
190 connect(mItemSyncer, &KJob::percentChanged, this,
191 &ResourceBasePrivate::slotPercent); // NOLINT(google-runtime-int): ulong comes from KJob
192
193 connect(mItemSyncer, &KJob::result, this, &ResourceBasePrivate::slotItemSyncDone);
195 }
196 Q_ASSERT(mItemSyncer);
197 }
198
199public Q_SLOTS:
200 // Dump the state of the scheduler
201 Q_SCRIPTABLE QString dumpToString() const
202 {
203 Q_Q(const ResourceBase);
204 return scheduler->dumpToString() + QLatin1Char('\n') + q->dumpResourceToString();
205 }
206
207 Q_SCRIPTABLE void dump()
208 {
209 scheduler->dump();
210 }
211
212 Q_SCRIPTABLE void clear()
213 {
214 scheduler->clear();
215 }
216
217protected Q_SLOTS:
218 // reimplementations from AgentbBasePrivate, containing sanity checks that only apply to resources
219 // such as making sure that RIDs are present as well as translations of cross-resource moves
220 // TODO: we could possibly add recovery code for no-RID notifications by re-enquing those to the change recorder
221 // as the corresponding Add notifications, although that contains a risk of endless fail/retry loops
222
223 void itemAdded(const Akonadi::Item &item, const Akonadi::Collection &collection) override
224 {
225 if (collection.remoteId().isEmpty()) {
226 changeProcessed();
227 return;
228 }
229 AgentBasePrivate::itemAdded(item, collection);
230 }
231
232 void itemChanged(const Akonadi::Item &item, const QSet<QByteArray> &partIdentifiers) override
233 {
234 if (item.remoteId().isEmpty()) {
235 changeProcessed();
236 return;
237 }
238 AgentBasePrivate::itemChanged(item, partIdentifiers);
239 }
240
241 void itemsFlagsChanged(const Akonadi::Item::List &items, const QSet<QByteArray> &addedFlags, const QSet<QByteArray> &removedFlags) override
242 {
243 if (addedFlags.isEmpty() && removedFlags.isEmpty()) {
244 changeProcessed();
245 return;
246 }
247
248 const Item::List validItems = filterValidItems(items);
249 if (validItems.isEmpty()) {
250 changeProcessed();
251 return;
252 }
253
254 AgentBasePrivate::itemsFlagsChanged(validItems, addedFlags, removedFlags);
255 }
256
257 void itemsTagsChanged(const Akonadi::Item::List &items, const QSet<Akonadi::Tag> &addedTags, const QSet<Akonadi::Tag> &removedTags) override
258 {
259 if (addedTags.isEmpty() && removedTags.isEmpty()) {
260 changeProcessed();
261 return;
262 }
263
264 const Item::List validItems = filterValidItems(items);
265 if (validItems.isEmpty()) {
266 changeProcessed();
267 return;
268 }
269
270 AgentBasePrivate::itemsTagsChanged(validItems, addedTags, removedTags);
271 }
272
273 // TODO move the move translation code from AgentBasePrivate here, it's wrong for agents
274 void itemMoved(const Akonadi::Item &item, const Akonadi::Collection &source, const Akonadi::Collection &destination) override
275 {
276 if (item.remoteId().isEmpty() || destination.remoteId().isEmpty() || destination == source) {
277 changeProcessed();
278 return;
279 }
280 AgentBasePrivate::itemMoved(item, source, destination);
281 }
282
283 void itemsMoved(const Akonadi::Item::List &items, const Akonadi::Collection &source, const Akonadi::Collection &destination) override
284 {
285 if (destination.remoteId().isEmpty() || destination == source) {
286 changeProcessed();
287 return;
288 }
289
290 const Item::List validItems = filterValidItems(items);
291 if (validItems.isEmpty()) {
292 changeProcessed();
293 return;
294 }
295
296 AgentBasePrivate::itemsMoved(validItems, source, destination);
297 }
298
299 void itemRemoved(const Akonadi::Item &item) override
300 {
301 if (item.remoteId().isEmpty()) {
302 changeProcessed();
303 return;
304 }
305 AgentBasePrivate::itemRemoved(item);
306 }
307
308 void itemsRemoved(const Akonadi::Item::List &items) override
309 {
310 const Item::List validItems = filterValidItems(items);
311 if (validItems.isEmpty()) {
312 changeProcessed();
313 return;
314 }
315
316 AgentBasePrivate::itemsRemoved(validItems);
317 }
318
319 void collectionAdded(const Akonadi::Collection &collection, const Akonadi::Collection &parent) override
320 {
321 if (parent.remoteId().isEmpty()) {
322 changeProcessed();
323 return;
324 }
325 AgentBasePrivate::collectionAdded(collection, parent);
326 }
327
328 void collectionChanged(const Akonadi::Collection &collection) override
329 {
330 if (collection.remoteId().isEmpty()) {
331 changeProcessed();
332 return;
333 }
334 AgentBasePrivate::collectionChanged(collection);
335 }
336
337 void collectionChanged(const Akonadi::Collection &collection, const QSet<QByteArray> &partIdentifiers) override
338 {
339 if (collection.remoteId().isEmpty()) {
340 changeProcessed();
341 return;
342 }
343 AgentBasePrivate::collectionChanged(collection, partIdentifiers);
344 }
345
346 void collectionMoved(const Akonadi::Collection &collection, const Akonadi::Collection &source, const Akonadi::Collection &destination) override
347 {
348 // unknown destination or source == destination means we can't do/don't have to do anything
349 if (destination.remoteId().isEmpty() || source == destination) {
350 changeProcessed();
351 return;
352 }
353
354 // inter-resource moves, requires we know which resources the source and destination are in though
355 if (!source.resource().isEmpty() && !destination.resource().isEmpty() && source.resource() != destination.resource()) {
356 if (source.resource() == q_ptr->identifier()) { // moved away from us
357 AgentBasePrivate::collectionRemoved(collection);
358 } else if (destination.resource() == q_ptr->identifier()) { // moved to us
359 scheduler->taskDone(); // stop change replay for now
360 auto mover = new RecursiveMover(this);
361 mover->setCollection(collection, destination);
362 scheduler->scheduleMoveReplay(collection, mover);
363 }
364 return;
365 }
366
367 // intra-resource move, requires the moved collection to have a valid id though
368 if (collection.remoteId().isEmpty()) {
369 changeProcessed();
370 return;
371 }
372
373 // intra-resource move, ie. something we can handle internally
374 AgentBasePrivate::collectionMoved(collection, source, destination);
375 }
376
377 void collectionRemoved(const Akonadi::Collection &collection) override
378 {
379 if (collection.remoteId().isEmpty()) {
380 changeProcessed();
381 return;
382 }
383 AgentBasePrivate::collectionRemoved(collection);
384 }
385
386 void tagAdded(const Akonadi::Tag &tag) override
387 {
388 if (!tag.isValid()) {
389 changeProcessed();
390 return;
391 }
392
393 AgentBasePrivate::tagAdded(tag);
394 }
395
396 void tagChanged(const Akonadi::Tag &tag) override
397 {
398 if (tag.remoteId().isEmpty()) {
399 changeProcessed();
400 return;
401 }
402
403 AgentBasePrivate::tagChanged(tag);
404 }
405
406 void tagRemoved(const Akonadi::Tag &tag) override
407 {
408 if (tag.remoteId().isEmpty()) {
409 changeProcessed();
410 return;
411 }
412
413 AgentBasePrivate::tagRemoved(tag);
414 }
415
416private:
417 static Item::List filterValidItems(Item::List items)
418 {
419 items.erase(std::remove_if(items.begin(),
420 items.end(),
421 [](const auto &item) {
422 return item.remoteId().isEmpty();
423 }),
424 items.end());
425 return items;
426 }
427
428public:
429 // synchronize states
430 Collection currentCollection;
431
432 ResourceScheduler *scheduler = nullptr;
433 ItemSync *mItemSyncer = nullptr;
434 ItemSync::TransactionMode mItemTransactionMode;
435 ItemSync::MergeMode mItemMergeMode;
436 CollectionSync *mCollectionSyncer = nullptr;
437 TagSync *mTagSyncer = nullptr;
438 bool mHierarchicalRid;
439 QTimer mProgressEmissionCompressor;
440 int mUnemittedProgress;
441 QMap<Akonadi::Collection::Id, QVariantMap> mUnemittedAdvancedStatus;
442 bool mAutomaticProgressReporting;
443 bool mDisableAutomaticItemDeliveryDone;
444 QPointer<RecursiveMover> m_recursiveMover;
445 int mItemSyncBatchSize;
446 QSet<QByteArray> mKeepLocalCollectionChanges;
447 KJob *mCurrentCollectionFetchJob = nullptr;
448 bool mScheduleAttributeSyncBeforeCollectionSync;
449 QDateTime mCollectionSyncTimestamp;
450};
451
453 : AgentBase(new ResourceBasePrivate(this), id)
454{
456
457 qDBusRegisterMetaType<QByteArrayList>();
458
459 new Akonadi__ResourceAdaptor(this);
460
461 d->scheduler = new ResourceScheduler(this);
462
463 d->mChangeRecorder->setChangeRecordingEnabled(true);
464 d->mChangeRecorder->setCollectionMoveTranslationEnabled(false); // we deal with this ourselves
465 connect(d->mChangeRecorder, &ChangeRecorder::changesAdded, d->scheduler, &ResourceScheduler::scheduleChangeReplay);
466
467 d->mChangeRecorder->setResourceMonitored(d->mId.toLatin1());
468 d->mChangeRecorder->fetchCollection(true);
469
470 connect(d->scheduler, &ResourceScheduler::executeFullSync, this, &ResourceBase::retrieveCollections);
471 connect(d->scheduler, &ResourceScheduler::executeCollectionTreeSync, this, &ResourceBase::retrieveCollections);
472 connect(d->scheduler, &ResourceScheduler::executeCollectionSync, d, &ResourceBasePrivate::slotSynchronizeCollection);
473 connect(d->scheduler, &ResourceScheduler::executeCollectionAttributesSync, d, &ResourceBasePrivate::slotSynchronizeCollectionAttributes);
474 connect(d->scheduler, &ResourceScheduler::executeTagSync, d, &ResourceBasePrivate::slotSynchronizeTags);
475 connect(d->scheduler, &ResourceScheduler::executeItemFetch, d, &ResourceBasePrivate::slotPrepareItemRetrieval);
476 connect(d->scheduler, &ResourceScheduler::executeItemsFetch, d, &ResourceBasePrivate::slotPrepareItemsRetrieval);
477 connect(d->scheduler, &ResourceScheduler::executeResourceCollectionDeletion, d, &ResourceBasePrivate::slotDeleteResourceCollection);
478 connect(d->scheduler, &ResourceScheduler::executeCacheInvalidation, d, &ResourceBasePrivate::slotInvalidateCache);
479 connect(d->scheduler, &ResourceScheduler::status, this, qOverload<int, const QString &>(&ResourceBase::status));
480 connect(d->scheduler, &ResourceScheduler::executeChangeReplay, d->mChangeRecorder, &ChangeRecorder::replayNext);
481 connect(d->scheduler, &ResourceScheduler::executeRecursiveMoveReplay, d, &ResourceBasePrivate::slotRecursiveMoveReplay);
482 connect(d->scheduler, &ResourceScheduler::fullSyncComplete, this, &ResourceBase::synchronized);
483 connect(d->scheduler, &ResourceScheduler::collectionTreeSyncComplete, this, &ResourceBase::collectionTreeSynchronized);
484 connect(d->mChangeRecorder, &ChangeRecorder::nothingToReplay, d->scheduler, &ResourceScheduler::taskDone);
485 connect(d->mChangeRecorder, &Monitor::collectionRemoved, d->scheduler, &ResourceScheduler::collectionRemoved);
486 connect(this, &ResourceBase::abortRequested, d, &ResourceBasePrivate::slotAbortRequested);
487 connect(this, &ResourceBase::synchronized, d->scheduler, &ResourceScheduler::taskDone);
488 connect(this, &ResourceBase::collectionTreeSynchronized, d->scheduler, &ResourceScheduler::taskDone);
490 connect(&d->mProgressEmissionCompressor, &QTimer::timeout, d, &ResourceBasePrivate::slotDelayedEmitProgress);
491
492 d->scheduler->setOnline(d->mOnline);
493 if (!d->mChangeRecorder->isEmpty()) {
494 d->scheduler->scheduleChangeReplay();
495 }
496
497 new ResourceSelectJob(identifier());
498
499 connect(d->mChangeRecorder->session(), &Session::reconnected, d, &ResourceBasePrivate::slotSessionReconnected);
500}
501
503
505{
506 d_func()->scheduler->scheduleFullSync();
507}
508
513
515{
516 return AgentBase::agentName();
517}
518
523
528
533
538
539QString ResourceBase::parseArguments(int argc, char **argv)
540{
541 Q_UNUSED(argc)
542
543 QCommandLineOption identifierOption(QStringLiteral("identifier"), i18nc("@label command line option", "Resource identifier"), QStringLiteral("argument"));
544 QCommandLineParser parser;
545 parser.addOption(identifierOption);
546 parser.addHelpOption();
547 parser.addVersionOption();
548 parser.process(*qApp);
549 parser.setApplicationDescription(i18n("Akonadi Resource"));
550
551 if (!parser.isSet(identifierOption)) {
552 qCDebug(AKONADIAGENTBASE_LOG) << "Identifier argument missing";
553 exit(1);
554 }
555
556 const QString identifier = parser.value(identifierOption);
557
558 if (identifier.isEmpty()) {
559 qCDebug(AKONADIAGENTBASE_LOG) << "Identifier is empty";
560 exit(1);
561 }
562
564 QCoreApplication::setApplicationVersion(QStringLiteral(AKONADI_FULL_VERSION));
565
566 const QFileInfo fi(QString::fromLocal8Bit(argv[0]));
567 // strip off full path and possible .exe suffix
568 const QString catalog = fi.baseName();
569
570 auto translator = new QTranslator(qApp);
571 translator->load(catalog);
573
574 return identifier;
575}
576
578{
579#if HAVE_KICONTHEME
581#endif
582#if HAVE_STYLE_MANAGER
584#else // !HAVE_STYLE_MANAGER
585#if defined(Q_OS_MACOS) || defined(Q_OS_WIN)
586 QApplication::setStyle(QStringLiteral("breeze"));
587#endif // defined(Q_OS_MACOS) || defined(Q_OS_WIN)
588#endif // HAVE_STYLE_MANAGER
589 KLocalizedString::setApplicationDomain(QByteArrayLiteral("libakonadi6"));
590 KAboutData::setApplicationData(r.aboutData());
591
593
594 return qApp->exec();
595}
596
597void ResourceBasePrivate::slotAbortRequested()
598{
599 Q_Q(ResourceBase);
600
601 scheduler->cancelQueues();
602 q->abortActivity();
603}
604
606{
608 Q_ASSERT(d->scheduler->currentTask().type == ResourceScheduler::FetchItem);
609 if (!item.isValid()) {
610 d->scheduler->itemFetchDone(i18nc("@info", "Invalid item retrieved"));
611 return;
612 }
613
614 const QSet<QByteArray> requestedParts = d->scheduler->currentTask().itemParts;
615 for (const QByteArray &part : requestedParts) {
616 if (!item.loadedPayloadParts().contains(part)) {
617 qCWarning(AKONADIAGENTBASE_LOG) << "Item does not provide part" << part;
618 }
619 }
620
621 auto job = new ItemModifyJob(item);
622 job->d_func()->setSilent(true);
623 // FIXME: remove once the item with which we call retrieveItem() has a revision number
624 job->disableRevisionCheck();
625 connect(job, &KJob::result, d, &ResourceBasePrivate::slotDeliveryDone);
626}
627
628void ResourceBasePrivate::slotDeliveryDone(KJob *job)
629{
630 Q_Q(ResourceBase);
631 Q_ASSERT(scheduler->currentTask().type == ResourceScheduler::FetchItem);
632 if (job->error()) {
633 Q_EMIT q->error(i18nc("@info", "Error while creating item: %1", job->errorString()));
634 }
635 scheduler->itemFetchDone(QString());
636}
637
639{
641 Q_ASSERT(d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionAttributes);
642 if (!collection.isValid()) {
643 Q_EMIT attributesSynchronized(d->scheduler->currentTask().collection.id());
644 d->scheduler->taskDone();
645 return;
646 }
647
648 auto job = new CollectionModifyJob(collection);
649 connect(job, &KJob::result, d, &ResourceBasePrivate::slotCollectionAttributesSyncDone);
650}
651
652void ResourceBasePrivate::slotCollectionAttributesSyncDone(KJob *job)
653{
654 Q_Q(ResourceBase);
655 Q_ASSERT(scheduler->currentTask().type == ResourceScheduler::SyncCollectionAttributes);
656 if (job->error()) {
657 Q_EMIT q->error(i18nc("@info", "Error while updating collection: %1", job->errorString()));
658 }
659 Q_EMIT q->attributesSynchronized(scheduler->currentTask().collection.id());
660 scheduler->taskDone();
661}
662
663void ResourceBasePrivate::slotDeleteResourceCollection()
664{
665 Q_Q(ResourceBase);
666
668 job->fetchScope().setResource(q->identifier());
669 connect(job, &KJob::result, this, &ResourceBasePrivate::slotDeleteResourceCollectionDone);
670}
671
672void ResourceBasePrivate::slotDeleteResourceCollectionDone(KJob *job)
673{
674 Q_Q(ResourceBase);
675 if (job->error()) {
676 Q_EMIT q->error(job->errorString());
677 scheduler->taskDone();
678 } else {
679 const auto fetchJob = static_cast<const CollectionFetchJob *>(job);
680
681 if (!fetchJob->collections().isEmpty()) {
682 auto job = new CollectionDeleteJob(fetchJob->collections().at(0));
683 connect(job, &KJob::result, this, &ResourceBasePrivate::slotCollectionDeletionDone);
684 } else {
685 // there is no resource collection, so just ignore the request
686 scheduler->taskDone();
687 }
688 }
689}
690
691void ResourceBasePrivate::slotCollectionDeletionDone(KJob *job)
692{
693 Q_Q(ResourceBase);
694 if (job->error()) {
695 Q_EMIT q->error(job->errorString());
696 }
697
698 scheduler->taskDone();
699}
700
701void ResourceBasePrivate::slotInvalidateCache(const Akonadi::Collection &collection)
702{
703 Q_Q(ResourceBase);
704 auto job = new InvalidateCacheJob(collection, q);
705 connect(job, &KJob::result, scheduler, &ResourceScheduler::taskDone);
706}
707
709{
710 changesCommitted(Item::List() << item);
711}
712
714{
716 auto transaction = new TransactionSequence(this);
717 connect(transaction, &KJob::finished, d, &ResourceBasePrivate::changeCommittedResult);
718
719 // Modify the items one-by-one, because STORE does not support mass RID change
720 for (const Item &item : items) {
721 auto job = new ItemModifyJob(item, transaction);
722 job->d_func()->setClean();
723 job->disableRevisionCheck(); // TODO: remove, but where/how do we handle the error?
724 job->setIgnorePayload(true); // we only want to reset the dirty flag and update the remote id
725 }
726}
727
729{
731 auto job = new CollectionModifyJob(collection);
732 connect(job, &KJob::result, d, &ResourceBasePrivate::changeCommittedResult);
733}
734
735void ResourceBasePrivate::changeCommittedResult(KJob *job)
736{
737 if (job->error()) {
738 qCWarning(AKONADIAGENTBASE_LOG) << job->errorText();
739 }
740
741 Q_Q(ResourceBase);
742 if (qobject_cast<CollectionModifyJob *>(job)) {
743 if (job->error()) {
744 Q_EMIT q->error(i18nc("@info", "Updating local collection failed: %1.", job->errorText()));
745 }
746 mChangeRecorder->d_ptr->invalidateCache(static_cast<CollectionModifyJob *>(job)->collection());
747 } else {
748 if (job->error()) {
749 Q_EMIT q->error(i18nc("@info", "Updating local items failed: %1.", job->errorText()));
750 }
751 // Item and tag cache is invalidated by modify job
752 }
753
754 changeProcessed();
755}
756
758{
760 auto job = new TagModifyJob(tag);
761 connect(job, &KJob::result, d, &ResourceBasePrivate::changeCommittedResult);
762}
763
764void ResourceBase::requestItemDelivery(const QList<qint64> &uids, const QByteArrayList &parts)
765{
767 if (!isOnline()) {
768 const QString errorMsg = i18nc("@info", "Cannot fetch item in offline mode.");
770 Q_EMIT error(errorMsg);
771 return;
772 }
773
774 setDelayedReply(true);
775
776 const auto items = uids | Views::transform([](const auto uid) {
777 return Item{uid};
778 })
779 | Actions::toQVector;
780
781 const QSet<QByteArray> partSet = QSet<QByteArray>(parts.begin(), parts.end());
782 d->scheduler->scheduleItemsFetch(items, partSet, message());
783}
784
786{
788 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
789 "ResourceBase::collectionsRetrieved()",
790 "Calling collectionsRetrieved() although no collection retrieval is in progress");
791 if (!d->mCollectionSyncer) {
792 d->mCollectionSyncer = new CollectionSync(identifier());
793 d->mCollectionSyncer->setHierarchicalRemoteIds(d->mHierarchicalRid);
794 d->mCollectionSyncer->setKeepLocalChanges(d->mKeepLocalCollectionChanges);
795 connect(d->mCollectionSyncer, &KJob::percentChanged, d,
796 &ResourceBasePrivate::slotPercent); // NOLINT(google-runtime-int): ulong comes from KJob
797 connect(d->mCollectionSyncer, &KJob::result, d, &ResourceBasePrivate::slotCollectionSyncDone);
798 }
799 d->mCollectionSyncer->setRemoteCollections(collections);
800}
801
802void ResourceBase::collectionsRetrievedIncremental(const Collection::List &changedCollections, const Collection::List &removedCollections)
803{
805 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
806 "ResourceBase::collectionsRetrievedIncremental()",
807 "Calling collectionsRetrievedIncremental() although no collection retrieval is in progress");
808 if (!d->mCollectionSyncer) {
809 d->mCollectionSyncer = new CollectionSync(identifier());
810 d->mCollectionSyncer->setHierarchicalRemoteIds(d->mHierarchicalRid);
811 d->mCollectionSyncer->setKeepLocalChanges(d->mKeepLocalCollectionChanges);
812 connect(d->mCollectionSyncer, &KJob::percentChanged, d,
813 &ResourceBasePrivate::slotPercent); // NOLINT(google-runtime-int): ulong comes from KJob
814 connect(d->mCollectionSyncer, &KJob::result, d, &ResourceBasePrivate::slotCollectionSyncDone);
815 }
816 d->mCollectionSyncer->setRemoteCollections(changedCollections, removedCollections);
817}
818
820{
822 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
823 "ResourceBase::setCollectionStreamingEnabled()",
824 "Calling setCollectionStreamingEnabled() although no collection retrieval is in progress");
825 if (!d->mCollectionSyncer) {
826 d->mCollectionSyncer = new CollectionSync(identifier());
827 d->mCollectionSyncer->setHierarchicalRemoteIds(d->mHierarchicalRid);
828 connect(d->mCollectionSyncer, &KJob::percentChanged, d,
829 &ResourceBasePrivate::slotPercent); // NOLINT(google-runtime-int): ulong comes from KJob
830 connect(d->mCollectionSyncer, &KJob::result, d, &ResourceBasePrivate::slotCollectionSyncDone);
831 }
832 d->mCollectionSyncer->setStreamingEnabled(enable);
833}
834
836{
838 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
839 "ResourceBase::collectionsRetrievalDone()",
840 "Calling collectionsRetrievalDone() although no collection retrieval is in progress");
841 // streaming enabled, so finalize the sync
842 if (d->mCollectionSyncer) {
843 d->mCollectionSyncer->retrievalDone();
844 } else {
845 // user did the sync himself, we are done now
846 // FIXME: we need the same special case for SyncAll as in slotCollectionSyncDone here!
847 d->scheduler->taskDone();
848 }
849}
850
852{
854 d->mKeepLocalCollectionChanges = parts;
855}
856
857void ResourceBasePrivate::slotCollectionSyncDone(KJob *job)
858{
859 Q_Q(ResourceBase);
860 mCollectionSyncer = nullptr;
861 if (job->error()) {
862 if (job->error() != Job::UserCanceled) {
863 Q_EMIT q->error(job->errorString());
864 }
865 } else {
866 if (scheduler->currentTask().type == ResourceScheduler::SyncAll) {
868 list->setFetchScope(q->changeRecorder()->collectionFetchScope());
869 list->fetchScope().fetchAttribute<SpecialCollectionAttribute>();
870 list->fetchScope().fetchAttribute<FavoriteCollectionAttribute>();
871 list->fetchScope().setResource(mId);
872 list->fetchScope().setListFilter(CollectionFetchScope::Sync);
873 connect(list, &KJob::result, this, &ResourceBasePrivate::slotLocalListDone);
874 return;
875 } else if (scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree) {
876 scheduler->scheduleCollectionTreeSyncCompletion();
877 }
878 }
879 scheduler->taskDone();
880}
881
882namespace
883{
884bool sortCollectionsForSync(const Collection &l, const Collection &r)
885{
887 const bool lInbox = (lType == "inbox") || (QStringView(l.remoteId()).mid(1).compare(QLatin1StringView("inbox"), Qt::CaseInsensitive) == 0);
888 const bool lFav = l.hasAttribute<FavoriteCollectionAttribute>();
889
891 const bool rInbox = (rType == "inbox") || (QStringView(r.remoteId()).mid(1).compare(QLatin1StringView("inbox"), Qt::CaseInsensitive) == 0);
892 const bool rFav = r.hasAttribute<FavoriteCollectionAttribute>();
893
894 // inbox is always first
895 if (lInbox) {
896 return true;
897 } else if (rInbox) {
898 return false;
899 }
900
901 // favorites right after inbox
902 if (lFav) {
903 return !rInbox;
904 } else if (rFav) {
905 return lInbox;
906 }
907
908 // trash is always last (unless it's favorite)
909 if (lType == "trash") {
910 return false;
911 } else if (rType == "trash") {
912 return true;
913 }
914
915 // Fallback to sorting by id
916 return l.id() < r.id();
917}
918
919} // namespace
920
921void ResourceBasePrivate::slotLocalListDone(KJob *job)
922{
923 Q_Q(ResourceBase);
924 if (job->error()) {
925 Q_EMIT q->error(job->errorString());
926 } else {
927 Collection::List cols = static_cast<CollectionFetchJob *>(job)->collections();
928 std::sort(cols.begin(), cols.end(), sortCollectionsForSync);
929 for (const Collection &col : std::as_const(cols)) {
930 scheduler->scheduleSync(col);
931 }
932 scheduler->scheduleFullSyncCompletion();
933 }
934 scheduler->taskDone();
935}
936
937void ResourceBasePrivate::slotSynchronizeCollection(const Collection &col)
938{
939 Q_Q(ResourceBase);
940 currentCollection = col;
941 // This can happen due to FetchHelper::triggerOnDemandFetch() in the akonadi server (not an error).
942 if (!col.remoteId().isEmpty()) {
943 // check if this collection actually can contain anything
944 QStringList contentTypes = currentCollection.contentMimeTypes();
945 contentTypes.removeAll(Collection::mimeType());
947 if (!contentTypes.isEmpty() || col.isVirtual()) {
948 if (mAutomaticProgressReporting) {
949 Q_EMIT q->status(AgentBase::Running, i18nc("@info:status", "Syncing folder '%1'", currentCollection.displayName()));
950 }
951
952 qCDebug(AKONADIAGENTBASE_LOG) << "Preparing collection sync of collection" << currentCollection.id() << currentCollection.displayName();
953 auto fetchJob = new Akonadi::CollectionFetchJob(col, CollectionFetchJob::Base, this);
954 fetchJob->setFetchScope(q->changeRecorder()->collectionFetchScope());
955 connect(fetchJob, &KJob::result, this, &ResourceBasePrivate::slotItemRetrievalCollectionFetchDone);
956 mCurrentCollectionFetchJob = fetchJob;
957 return;
958 }
959 }
960 scheduler->taskDone();
961}
962
963void ResourceBasePrivate::slotItemRetrievalCollectionFetchDone(KJob *job)
964{
965 Q_Q(ResourceBase);
966 mCurrentCollectionFetchJob = nullptr;
967 if (job->error()) {
968 qCWarning(AKONADIAGENTBASE_LOG) << "Failed to retrieve collection for sync: " << job->errorString();
969 q->cancelTask(i18n("Failed to retrieve collection for sync."));
970 return;
971 }
972 auto fetchJob = static_cast<Akonadi::CollectionFetchJob *>(job);
973 const Collection::List collections = fetchJob->collections();
974 if (collections.isEmpty()) {
975 qCWarning(AKONADIAGENTBASE_LOG) << "The fetch job returned empty collection set. This is unexpected.";
976 q->cancelTask(i18n("Failed to retrieve collection for sync."));
977 return;
978 }
979 mCollectionSyncTimestamp = QDateTime::currentDateTimeUtc();
980 q->retrieveItems(collections.at(0));
981}
982
984{
985 Q_D(const ResourceBase);
986 return d->mItemSyncBatchSize;
987}
988
990{
992 d->mItemSyncBatchSize = batchSize;
993}
994
996{
998 d->mScheduleAttributeSyncBeforeCollectionSync = enable;
999}
1000
1001void ResourceBasePrivate::slotSynchronizeCollectionAttributes(const Collection &col)
1002{
1003 Q_Q(ResourceBase);
1004 auto fetchJob = new Akonadi::CollectionFetchJob(col, CollectionFetchJob::Base, this);
1005 fetchJob->setFetchScope(q->changeRecorder()->collectionFetchScope());
1006 connect(fetchJob, &KJob::result, this, &ResourceBasePrivate::slotAttributeRetrievalCollectionFetchDone);
1007 Q_ASSERT(!mCurrentCollectionFetchJob);
1008 mCurrentCollectionFetchJob = fetchJob;
1009}
1010
1011void ResourceBasePrivate::slotAttributeRetrievalCollectionFetchDone(KJob *job)
1012{
1013 mCurrentCollectionFetchJob = nullptr;
1014 Q_Q(ResourceBase);
1015 if (job->error()) {
1016 qCWarning(AKONADIAGENTBASE_LOG) << "Failed to retrieve collection for attribute sync: " << job->errorString();
1017 q->cancelTask(i18n("Failed to retrieve collection for attribute sync."));
1018 return;
1019 }
1020 auto fetchJob = static_cast<Akonadi::CollectionFetchJob *>(job);
1021 // FIXME: Why not call q-> directly?
1022 QMetaObject::invokeMethod(q, "retrieveCollectionAttributes", Q_ARG(Akonadi::Collection, fetchJob->collections().at(0)));
1023}
1024
1025void ResourceBasePrivate::slotSynchronizeTags()
1026{
1027 Q_Q(ResourceBase);
1028 QMetaObject::invokeMethod(this, [q] {
1029 q->retrieveTags();
1030 });
1031}
1032
1033void ResourceBasePrivate::slotPrepareItemRetrieval(const Item &item)
1034{
1035 Q_Q(ResourceBase);
1036 auto fetch = new ItemFetchJob(item, this);
1037 // we always need at least parent so we can use ItemCreateJob to merge
1038 fetch->fetchScope().setAncestorRetrieval(qMax(ItemFetchScope::Parent, q->changeRecorder()->itemFetchScope().ancestorRetrieval()));
1039 fetch->fetchScope().setCacheOnly(true);
1040 fetch->fetchScope().setFetchRemoteIdentification(true);
1041
1042 // copy list of attributes to fetch
1043 const QSet<QByteArray> attributes = q->changeRecorder()->itemFetchScope().attributes();
1044 for (const auto &attribute : attributes) {
1045 fetch->fetchScope().fetchAttribute(attribute);
1046 }
1047
1048 connect(fetch, &KJob::result, this, &ResourceBasePrivate::slotPrepareItemRetrievalResult);
1049}
1050
1051void ResourceBasePrivate::slotPrepareItemRetrievalResult(KJob *job)
1052{
1053 Q_Q(ResourceBase);
1054 Q_ASSERT_X(scheduler->currentTask().type == ResourceScheduler::FetchItem,
1055 "ResourceBasePrivate::slotPrepareItemRetrievalResult()",
1056 "Preparing item retrieval although no item retrieval is in progress");
1057 if (job->error()) {
1058 q->cancelTask(job->errorText());
1059 return;
1060 }
1061 auto fetch = qobject_cast<ItemFetchJob *>(job);
1062 if (fetch->items().count() != 1) {
1063 q->cancelTask(i18n("The requested item no longer exists"));
1064 return;
1065 }
1066 const QSet<QByteArray> parts = scheduler->currentTask().itemParts;
1067 if (!q->retrieveItem(fetch->items().at(0), parts)) {
1068 q->cancelTask();
1069 }
1070}
1071
1072void ResourceBasePrivate::slotPrepareItemsRetrieval(const QList<Item> &items)
1073{
1074 Q_Q(ResourceBase);
1075 auto fetch = new ItemFetchJob(items, this);
1076 // we always need at least parent so we can use ItemCreateJob to merge
1077 fetch->fetchScope().setAncestorRetrieval(qMax(ItemFetchScope::Parent, q->changeRecorder()->itemFetchScope().ancestorRetrieval()));
1078 fetch->fetchScope().setCacheOnly(true);
1079 fetch->fetchScope().setFetchRemoteIdentification(true);
1080 // It's possible that one or more items were removed before this task was
1081 // executed, so ignore it and just handle the rest.
1082 fetch->fetchScope().setIgnoreRetrievalErrors(true);
1083
1084 // copy list of attributes to fetch
1085 const QSet<QByteArray> attributes = q->changeRecorder()->itemFetchScope().attributes();
1086 for (const auto &attribute : attributes) {
1087 fetch->fetchScope().fetchAttribute(attribute);
1088 }
1089
1090 connect(fetch, &KJob::result, this, &ResourceBasePrivate::slotPrepareItemsRetrievalResult);
1091}
1092
1093void ResourceBasePrivate::slotPrepareItemsRetrievalResult(KJob *job)
1094{
1095 Q_Q(ResourceBase);
1096 Q_ASSERT_X(scheduler->currentTask().type == ResourceScheduler::FetchItems,
1097 "ResourceBasePrivate::slotPrepareItemsRetrievalResult()",
1098 "Preparing items retrieval although no items retrieval is in progress");
1099 if (job->error()) {
1100 q->cancelTask(job->errorText());
1101 return;
1102 }
1103 auto fetch = qobject_cast<ItemFetchJob *>(job);
1104 const auto items = fetch->items();
1105 if (items.isEmpty()) {
1106 q->cancelTask();
1107 return;
1108 }
1109
1110 const QSet<QByteArray> parts = scheduler->currentTask().itemParts;
1111 Q_ASSERT(items.first().parentCollection().isValid());
1112 if (!q->retrieveItems(items, parts)) {
1113 q->cancelTask();
1114 }
1115}
1116
1117void ResourceBasePrivate::slotRecursiveMoveReplay(RecursiveMover *mover)
1118{
1119 Q_ASSERT(mover);
1120 Q_ASSERT(!m_recursiveMover);
1121 m_recursiveMover = mover;
1122 connect(mover, &KJob::result, this, &ResourceBasePrivate::slotRecursiveMoveReplayResult);
1123 mover->start();
1124}
1125
1126void ResourceBasePrivate::slotRecursiveMoveReplayResult(KJob *job)
1127{
1128 Q_Q(ResourceBase);
1129 m_recursiveMover = nullptr;
1130
1131 if (job->error()) {
1132 q->deferTask();
1133 return;
1134 }
1135
1136 changeProcessed();
1137}
1138
1140{
1142 // streaming enabled, so finalize the sync
1143 if (d->mItemSyncer) {
1144 d->mItemSyncer->deliveryDone();
1145 } else {
1146 if (d->scheduler->currentTask().type == ResourceScheduler::FetchItems) {
1147 d->scheduler->currentTask().sendDBusReplies(QString());
1148 }
1149 // user did the sync himself, we are done now
1150 d->scheduler->taskDone();
1151 }
1152}
1153
1155{
1157 d->scheduler->scheduleResourceCollectionDeletion();
1158}
1159
1161{
1163 d->scheduler->scheduleCacheInvalidation(collection);
1164}
1165
1167{
1168 Q_D(const ResourceBase);
1169 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::SyncCollection,
1170 "ResourceBase::currentCollection()",
1171 "Trying to access current collection although no item retrieval is in progress");
1172 return d->currentCollection;
1173}
1174
1176{
1177 Q_D(const ResourceBase);
1178 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::FetchItem,
1179 "ResourceBase::currentItem()",
1180 "Trying to access current item although no item retrieval is in progress");
1181 return d->scheduler->currentTask().items[0];
1182}
1183
1185{
1186 Q_D(const ResourceBase);
1187 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::FetchItems,
1188 "ResourceBase::currentItems()",
1189 "Trying to access current items although no items retrieval is in progress");
1190 return d->scheduler->currentTask().items;
1191}
1192
1194{
1195 d_func()->scheduler->scheduleCollectionTreeSync();
1196}
1197
1199{
1200 d_func()->scheduler->scheduleTagSync();
1201}
1202
1204{
1206 if (d->mCurrentCollectionFetchJob) {
1207 d->mCurrentCollectionFetchJob->kill();
1208 d->mCurrentCollectionFetchJob = nullptr;
1209 }
1210 switch (d->scheduler->currentTask().type) {
1211 case ResourceScheduler::FetchItem:
1212 itemRetrieved(Item()); // sends the error reply and
1213 break;
1214 case ResourceScheduler::FetchItems:
1216 break;
1217 case ResourceScheduler::ChangeReplay:
1218 d->changeProcessed();
1219 break;
1220 case ResourceScheduler::SyncCollectionTree:
1221 case ResourceScheduler::SyncAll:
1222 if (d->mCollectionSyncer) {
1223 d->mCollectionSyncer->rollback();
1224 } else {
1225 d->scheduler->taskDone();
1226 }
1227 break;
1228 case ResourceScheduler::SyncCollection:
1229 if (d->mItemSyncer) {
1230 d->mItemSyncer->rollback();
1231 } else {
1232 d->scheduler->taskDone();
1233 }
1234 break;
1235 default:
1236 d->scheduler->taskDone();
1237 }
1238}
1239
1241{
1242 cancelTask();
1243
1244 Q_EMIT error(msg);
1245}
1246
1248{
1250 qCDebug(AKONADIAGENTBASE_LOG) << "Deferring task" << d->scheduler->currentTask();
1251 // Deferring a CollectionSync is just not implemented.
1252 // We'd need to d->mItemSyncer->rollback() but also to NOT call taskDone in slotItemSyncDone() here...
1253 Q_ASSERT(!d->mItemSyncer);
1254 d->scheduler->deferTask();
1255}
1256
1258{
1259 d_func()->scheduler->setOnline(state);
1260}
1261
1263{
1264 synchronizeCollection(collectionId, false);
1265}
1266
1267void ResourceBase::synchronizeCollection(qint64 collectionId, bool recursive)
1268{
1270 auto job = new CollectionFetchJob(Collection(collectionId), recursive ? CollectionFetchJob::Recursive : CollectionFetchJob::Base);
1271 job->setFetchScope(changeRecorder()->collectionFetchScope());
1272 job->fetchScope().setResource(identifier());
1273 job->fetchScope().setListFilter(CollectionFetchScope::Sync);
1274 connect(job, &KJob::result, d, &ResourceBasePrivate::slotCollectionListDone);
1275}
1276
1277void ResourceBasePrivate::slotCollectionListDone(KJob *job)
1278{
1279 if (!job->error()) {
1280 const Collection::List list = static_cast<CollectionFetchJob *>(job)->collections();
1281 for (const Collection &collection : list) {
1282 // We also get collections that should not be synced but are part of the tree.
1283 if (collection.shouldList(Collection::ListSync)) {
1284 if (mScheduleAttributeSyncBeforeCollectionSync) {
1285 scheduler->scheduleAttributesSync(collection);
1286 }
1287 scheduler->scheduleSync(collection);
1288 }
1289 }
1290 } else {
1291 qCWarning(AKONADIAGENTBASE_LOG) << "Failed to fetch collection for collection sync: " << job->errorString();
1292 }
1293}
1294
1296{
1298 d->scheduler->scheduleAttributesSync(col);
1299}
1300
1302{
1304 auto job = new CollectionFetchJob(Collection(collectionId), CollectionFetchJob::Base);
1305 job->setFetchScope(changeRecorder()->collectionFetchScope());
1306 job->fetchScope().setResource(identifier());
1307 connect(job, &KJob::result, d, &ResourceBasePrivate::slotCollectionListForAttributesDone);
1308}
1309
1310void ResourceBasePrivate::slotCollectionListForAttributesDone(KJob *job)
1311{
1312 if (!job->error()) {
1313 const Collection::List list = static_cast<CollectionFetchJob *>(job)->collections();
1314 if (!list.isEmpty()) {
1315 const Collection &col = list.first();
1316 scheduler->scheduleAttributesSync(col);
1317 }
1318 }
1319 // TODO: error handling
1320}
1321
1323{
1324 qCDebug(AKONADIAGENTBASE_LOG) << amount;
1327 if (d->mItemSyncer) {
1328 d->mItemSyncer->setTotalItems(amount);
1329 }
1330}
1331
1333{
1335 if (d->mItemSyncer) {
1336 d->mItemSyncer->setDisableAutomaticDeliveryDone(disable);
1337 }
1338 d->mDisableAutomaticItemDeliveryDone = disable;
1339}
1340
1342{
1344 d->createItemSyncInstanceIfMissing();
1345 if (d->mItemSyncer) {
1346 d->mItemSyncer->setStreamingEnabled(enable);
1347 }
1348}
1349
1351{
1353 if (d->scheduler->currentTask().type == ResourceScheduler::FetchItems) {
1354 auto trx = new TransactionSequence(this);
1355 connect(trx, &KJob::result, d, &ResourceBasePrivate::slotItemSyncDone);
1356 for (const Item &item : items) {
1357 Q_ASSERT(item.parentCollection().isValid());
1358 if (item.isValid()) { // NOLINT(bugprone-branch-clone)
1359 new ItemModifyJob(item, trx);
1360 } else if (!item.remoteId().isEmpty()) {
1361 auto job = new ItemCreateJob(item, item.parentCollection(), trx);
1362 job->setMerge(ItemCreateJob::RID);
1363 } else {
1364 // This should not happen, but just to be sure...
1365 new ItemModifyJob(item, trx);
1366 }
1367 }
1368 trx->commit();
1369 } else {
1370 d->createItemSyncInstanceIfMissing();
1371 if (d->mItemSyncer) {
1372 d->mItemSyncer->setFullSyncItems(items);
1373 }
1374 }
1375}
1376
1377void ResourceBase::itemsRetrievedIncremental(const Item::List &changedItems, const Item::List &removedItems)
1378{
1380 d->createItemSyncInstanceIfMissing();
1381 if (d->mItemSyncer) {
1382 d->mItemSyncer->setIncrementalSyncItems(changedItems, removedItems);
1383 }
1384}
1385
1386void ResourceBasePrivate::slotItemSyncDone(KJob *job)
1387{
1388 mItemSyncer = nullptr;
1389 Q_Q(ResourceBase);
1390 if (job->error() && job->error() != Job::UserCanceled) {
1391 Q_EMIT q->error(job->errorString());
1392 }
1393 if (scheduler->currentTask().type == ResourceScheduler::FetchItems) {
1394 scheduler->currentTask().sendDBusReplies((job->error() && job->error() != Job::UserCanceled) ? job->errorString() : QString());
1395 }
1396 scheduler->taskDone();
1397}
1398
1399void ResourceBasePrivate::slotDelayedEmitProgress()
1400{
1401 Q_Q(ResourceBase);
1402 if (mAutomaticProgressReporting) {
1403 Q_EMIT q->percent(mUnemittedProgress);
1404
1405 for (const QVariantMap &statusMap : std::as_const(mUnemittedAdvancedStatus)) {
1406 Q_EMIT q->advancedStatus(statusMap);
1407 }
1408 }
1409 mUnemittedProgress = 0;
1410 mUnemittedAdvancedStatus.clear();
1411}
1412
1413void ResourceBasePrivate::slotPercent(KJob *job, quint64 percent)
1414{
1415 mUnemittedProgress = static_cast<int>(percent);
1416
1417 const auto collection = job->property("collection").value<Collection>();
1418 if (collection.isValid()) {
1419 QVariantMap statusMap;
1420 statusMap.insert(QStringLiteral("key"), QStringLiteral("collectionSyncProgress"));
1421 statusMap.insert(QStringLiteral("collectionId"), collection.id());
1422 statusMap.insert(QStringLiteral("percent"), static_cast<unsigned int>(percent));
1423
1424 mUnemittedAdvancedStatus[collection.id()] = statusMap;
1425 }
1426 // deliver completion right away, intermediate progress at 1s intervals
1427 if (percent == 100U) {
1428 mProgressEmissionCompressor.stop();
1429 slotDelayedEmitProgress();
1430 } else if (!mProgressEmissionCompressor.isActive()) {
1431 mProgressEmissionCompressor.start();
1432 }
1433}
1434
1436{
1438 d->mHierarchicalRid = enable;
1439}
1440
1441void ResourceBase::scheduleCustomTask(QObject *receiver, const char *method, const QVariant &argument, SchedulePriority priority)
1442{
1444 d->scheduler->scheduleCustomTask(receiver, method, argument, priority);
1445}
1446
1448{
1450 d->scheduler->taskDone();
1451}
1452
1457
1459{
1461 d->scheduler->taskDone();
1462}
1463
1465{
1466 Q_UNUSED(item)
1467 Q_UNUSED(parts)
1468 // retrieveItem() can no longer be pure virtual, because then we could not mark
1469 // it as deprecated (i.e. implementations would still be forced to implement it),
1470 // so instead we assert here.
1471 // NOTE: Don't change to Q_ASSERT_X here: while the macro can be disabled at
1472 // compile time, we want to hit this assert *ALWAYS*.
1473 qt_assert_x("Akonadi::ResourceBase::retrieveItem()",
1474 "The base implementation of retrieveItem() must never be reached. "
1475 "You must implement either retrieveItem() or retrieveItems(Akonadi::Item::List, QSet<QByteArray>) overload "
1476 "to handle item retrieval requests.",
1477 __FILE__,
1478 __LINE__);
1479 return false;
1480}
1481
1483{
1485
1486 // If we reach this implementation of retrieveItems() then it means that the
1487 // resource is still using the deprecated retrieveItem() method, so we explode
1488 // this to a myriad of tasks in scheduler and let them be processed one by one
1489
1490 const qint64 id = d->scheduler->currentTask().serial;
1491 for (const auto &item : items) {
1492 d->scheduler->scheduleItemFetch(item, parts, d->scheduler->currentTask().dbusMsgs, id);
1493 }
1494 taskDone();
1495 return true;
1496}
1497
1501
1503{
1505 d->mItemTransactionMode = mode;
1506}
1507
1508void ResourceBase::setItemMergingMode(ItemSync::MergeMode mode)
1509{
1511 d->mItemMergeMode = mode;
1512}
1513
1515{
1517 d->mAutomaticProgressReporting = enabled;
1518}
1519
1521{
1522 Q_D(const ResourceBase);
1523 return d->dumpNotificationListToString();
1524}
1525
1527{
1528 Q_D(const ResourceBase);
1529 return d->dumpToString();
1530}
1531
1533{
1534 Q_D(const ResourceBase);
1535 d->dumpMemoryInfo();
1536}
1537
1539{
1540 Q_D(const ResourceBase);
1541 return d->dumpMemoryInfoToString();
1542}
1543
1544void ResourceBase::tagsRetrieved(const Tag::List &tags, const QHash<QString, Item::List> &tagMembers)
1545{
1547 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::SyncTags || d->scheduler->currentTask().type == ResourceScheduler::SyncAll
1548 || d->scheduler->currentTask().type == ResourceScheduler::Custom,
1549 "ResourceBase::tagsRetrieved()",
1550 "Calling tagsRetrieved() although no tag retrieval is in progress");
1551 if (!d->mTagSyncer) {
1552 d->mTagSyncer = new TagSync(this);
1553 connect(d->mTagSyncer, &KJob::percentChanged, d,
1554 &ResourceBasePrivate::slotPercent); // NOLINT(google-runtime-int): ulong comes from KJob
1555 connect(d->mTagSyncer, &KJob::result, d, &ResourceBasePrivate::slotTagSyncDone);
1556 }
1557 d->mTagSyncer->setFullTagList(tags);
1558 d->mTagSyncer->setTagMembers(tagMembers);
1559}
1560
1561void ResourceBasePrivate::slotTagSyncDone(KJob *job)
1562{
1563 Q_Q(ResourceBase);
1564 mTagSyncer = nullptr;
1565 if (job->error()) {
1566 if (job->error() != Job::UserCanceled) {
1567 qCWarning(AKONADIAGENTBASE_LOG) << "TagSync failed: " << job->errorString();
1568 Q_EMIT q->error(job->errorString());
1569 }
1570 }
1571
1572 scheduler->taskDone();
1573}
1574
1575#include "moc_resourcebase.cpp"
1576#include "resourcebase.moc"
The base class for all Akonadi agents and resources.
Definition agentbase.h:73
QStringList activities() const
Returns the activities of the agent.
virtual int status() const
This method returns the current status code of the agent.
void setActivities(const QStringList &activities)
This method is used to set the activities of the agent.
void setActivitiesEnabled(bool enabled)
This method is used to enabled the activities of the agent.
void setAgentName(const QString &name)
This method is used to set the name of the agent.
ChangeRecorder * changeRecorder() const
Returns the Akonadi::ChangeRecorder object used for monitoring.
QString agentName() const
Returns the name of the agent.
@ Running
The agent is working on something.
Definition agentbase.h:398
void abortRequested()
Emitted when another application has remotely asked the agent to abort its current operation.
bool isOnline() const
Returns whether the agent is currently online.
bool activitiesEnabled() const
Returns the activities status of the agent.
QString identifier() const
Returns the instance identifier of this agent.
void agentNameChanged(const QString &name)
This signal is emitted whenever the name of the agent has changed.
void error(const QString &message)
This signal shall be used to report errors.
void changesAdded()
Emitted when new changes are recorded.
void replayNext()
Replay the next change notification and erase the previous one from the record.
void nothingToReplay()
Emitted when replayNext() was called, but there was no valid change to replay.
Job that deletes a collection in the Akonadi storage.
Job that fetches collections from the Akonadi storage.
@ Recursive
List all sub-collections.
@ FirstLevel
Only list direct sub-collections of the base collection.
@ Base
Only fetch the base collection.
@ Sync
Only retrieve collections for synchronization, taking the local preference and enabled into account.
Job that modifies a collection in the Akonadi storage.
Represents a collection of PIM items.
Definition collection.h:62
static QString mimeType()
Returns the mimetype used for collections.
@ ListSync
Listing for synchronization.
Definition collection.h:480
bool hasAttribute(const QByteArray &name) const
Returns true if the collection has an attribute of the given type name, false otherwise.
static Collection root()
Returns the root collection.
static QString virtualMimeType()
Returns the mimetype used for virtual collections.
bool shouldList(ListPurpose purpose) const
Returns whether the collection should be listed or not for the specified purpose Takes enabled state ...
Attribute * attribute(const QByteArray &name)
Returns the attribute of the given type name if available, 0 otherwise.
QString remoteId() const
Returns the remote id of the collection.
Job that creates a new item in the Akonadi storage.
@ RID
Merge by remote id.
Job that fetches items from the Akonadi storage.
@ Parent
Only retrieve the immediate parent collection.
Job that modifies an existing item in the Akonadi storage.
Syncs between items known to a client (usually a resource) and the Akonadi storage.
Definition itemsync.h:41
void readyForNextBatch(int remainingBatchSize)
Signals the resource that new items can be delivered.
TransactionMode
Transaction mode used by ItemSync.
Definition itemsync.h:129
void setDisableAutomaticDeliveryDone(bool disable)
Disables the automatic completion of the item sync, based on the number of delivered items.
Definition itemsync.cpp:238
void setBatchSize(int)
Set the batch size.
Definition itemsync.cpp:539
void setMergeMode(MergeMode mergeMode)
Set what merge method should be used for next ItemSync run.
Definition itemsync.cpp:551
void setTransactionMode(TransactionMode mode)
Set the transaction mode to use for this sync.
Definition itemsync.cpp:527
Represents a PIM item stored in Akonadi storage.
Definition item.h:100
QString remoteId() const
Returns the remote id of the item.
Definition item.cpp:73
bool isValid() const
Returns whether the item is valid.
Definition item.cpp:88
QSet< QByteArray > loadedPayloadParts() const
Returns the list of loaded payload parts.
Definition item.cpp:283
@ UserCanceled
The user canceled this job.
Definition job.h:101
void collectionRemoved(const Akonadi::Collection &collection)
This signal is emitted if a monitored collection has been removed from the Akonadi storage.
The base class for all Akonadi resources.
void setScheduleAttributeSyncBeforeItemSync(bool)
Set to true to schedule an attribute sync before every item sync.
void setItemTransactionMode(ItemSync::TransactionMode mode)
Set transaction mode for item sync'ing.
QString dumpNotificationListToString() const
Dump the contents of the current ChangeReplay.
void dumpMemoryInfo() const
Dumps memory usage information to stdout.
void taskDone()
Indicate that the current task is finished.
void invalidateCache(const Collection &collection)
Call this method to invalidate all cached content in collection.
void setActivitiesEnabled(bool enable)
This method enables or not activities support.
void collectionAttributesRetrieved(const Collection &collection)
Call this method from retrieveCollectionAttributes() once the result is available.
bool activitiesEnabled() const
Returns true if activities is enabled.
int itemSyncBatchSize() const
Returns the batch size used during the item sync.
void setItemSyncBatchSize(int batchSize)
Set the batch size used during the item sync.
void setDisableAutomaticItemDeliveryDone(bool disable)
Disables the automatic completion of the item sync, based on the number of delivered items.
static int init(int argc, char **argv)
Use this method in the main function of your resource application to initialize your resource subclas...
void synchronizeCollection(qint64 id)
This method is called whenever the collection with the given id shall be synchronized.
virtual void retrieveTags()
Retrieve all tags from the backend.
void synchronized()
Emitted when a full synchronization has been completed.
virtual void retrieveCollections()=0
Retrieve the collection tree from the remote server and supply it via collectionsRetrieved() or colle...
QString dumpSchedulerToString() const
Dump the state of the scheduler.
QString dumpMemoryInfoToString() const
Returns a string with memory usage information.
SchedulePriority
Describes the scheduling priority of a task that has been queued for execution.
void retrieveNextItemSyncBatch(int remainingBatchSize)
Emitted when the item synchronization processed the current batch and is ready for a new one.
void setKeepLocalCollectionChanges(const QSet< QByteArray > &parts)
Allows to keep locally changed collection parts during the collection sync.
void attributesSynchronized(qlonglong collectionId)
Emitted when a collection attributes synchronization has been completed.
void itemsRetrievalDone()
Call this method to indicate you finished synchronizing the current collection.
QStringList activities() const
return list of activities.
Item::List currentItems() const
Returns the items that are currently retrieved.
void setTotalItems(int amount)
Call this method when you want to use the itemsRetrieved() method in streaming mode and indicate the ...
void setActivities(const QStringList &activities)
This method sets list of activities.
void setAutomaticProgressReporting(bool enabled)
Enable or disable automatic progress reporting.
void collectionsRetrieved(const Collection::List &collections)
Call this to supply the full folder tree retrieved from the remote server.
void deferTask()
Suspends the execution of the current task and tries again to execute it.
void setItemStreamingEnabled(bool enable)
Enable item streaming, which is disabled by default.
QString name() const
Returns the name of the resource.
void collectionsRetrievalDone()
Call this method to indicate you finished synchronizing the collection tree.
void clearCache()
Call this method to remove all items and collections of the resource from the server cache.
void scheduleCustomTask(QObject *receiver, const char *method, const QVariant &argument, SchedulePriority priority=Append)
Schedules a custom task in the internal scheduler.
void collectionsRetrievedIncremental(const Collection::List &changedCollections, const Collection::List &removedCollections)
Call this to supply incrementally retrieved collections from the remote server.
void itemRetrieved(const Item &item)
Call this method from retrieveItem() once the result is available.
virtual AKONADIAGENTBASE_DEPRECATED bool retrieveItem(const Akonadi::Item &item, const QSet< QByteArray > &parts)
Retrieve a single item from the backend.
void synchronize()
This method is called whenever the resource should start synchronize all data.
void changeCommitted(const Item &item)
Resets the dirty flag of the given item and updates the remote id.
void itemsRetrieved(const Item::List &items)
Call this method to supply the full collection listing from the remote server.
void doSetOnline(bool online) override
Inherited from AgentBase.
Collection currentCollection() const
Returns the collection that is currently synchronized.
void nameChanged(const QString &name)
This signal is emitted whenever the name of the resource has changed.
virtual void retrieveItems(const Akonadi::Collection &collection)=0
Retrieve all (new/changed) items in collection collection.
void cancelTask()
Stops the execution of the current task and continues with the next one.
void itemsRetrievedIncremental(const Item::List &changedItems, const Item::List &removedItems)
Call this method to supply incrementally retrieved items from the remote server.
void setItemMergingMode(ItemSync::MergeMode mode)
Set merge mode for item sync'ing.
void setHierarchicalRemoteIdentifiersEnabled(bool enable)
Indicate the use of hierarchical remote identifiers.
ResourceBase(const QString &id)
Creates a base resource.
virtual void retrieveCollectionAttributes(const Akonadi::Collection &collection)
Retrieve the attributes of a single collection from the backend.
void collectionTreeSynchronized()
Emitted when a collection tree synchronization has been completed.
~ResourceBase() override
Destroys the base resource.
void setName(const QString &name)
This method is used to set the name of the resource.
void synchronizeCollectionTree()
Refetches the Collections.
void synchronizeTags()
Refetches Tags.
void setCollectionStreamingEnabled(bool enable)
Enable collection streaming, that is collections don't have to be delivered at once as result of a re...
void changesCommitted(const Item::List &items)
Resets the dirty flag of all given items and updates remote ids.
virtual void abortActivity()
Abort any activity in progress in the backend.
void synchronizeCollectionAttributes(qint64 id)
This method is called whenever the collection with the given id shall have its attributes synchronize...
AKONADIAGENTBASE_DEPRECATED Item currentItem() const
Returns the item that is currently retrieved.
static QString agentServiceName(ServiceAgentType agentType, const QString &identifier)
Returns the namespaced D-Bus service name for an agent of type agentType with agent identifier identi...
static QString addNamespace(const QString &string)
Adds the multi-instance namespace to string if required (with '_' as separator).
void reconnected()
This signal is emitted whenever the session has been reconnected to the server (e....
An Attribute that stores the special collection type of a collection.
QByteArray collectionType() const
Returns the special collections type of the collection.
Job that modifies a tag in the Akonadi storage.
An Akonadi Tag.
Definition tag.h:26
Base class for jobs that need to run a sequence of sub-jobs in a transaction.
static void setApplicationData(const KAboutData &aboutData)
static void initTheme()
virtual QString errorString() const
int error() const
void result(KJob *job)
void finished(KJob *job)
QString errorText() const
void percentChanged(KJob *job, unsigned long percent)
static void setApplicationDomain(const QByteArray &domain)
QString i18nc(const char *context, const char *text, const TYPE &arg...)
QString i18n(const char *text, const TYPE &arg...)
Helper integration between Akonadi and Qt.
KCRASH_EXPORT void initialize()
KIOCORE_EXPORT QStringList list(const QString &fileClass)
void initStyle()
QStyle * setStyle(const QString &style)
bool isEmpty() const const
QCommandLineOption addHelpOption()
bool addOption(const QCommandLineOption &option)
QCommandLineOption addVersionOption()
bool isSet(const QCommandLineOption &option) const const
void process(const QCoreApplication &app)
void setApplicationDescription(const QString &description)
QString value(const QCommandLineOption &option) const const
void setApplicationName(const QString &application)
void setApplicationVersion(const QString &version)
void exit(int returnCode)
bool installTranslator(QTranslator *translationFile)
QCoreApplication * instance()
QDateTime currentDateTimeUtc()
QDBusError lastError() const const
QDBusConnection sessionBus()
const QDBusMessage & message() const const
void sendErrorReply(QDBusError::ErrorType type, const QString &msg) const const
void setDelayedReply(bool enable) const const
QString message() const const
const_reference at(qsizetype i) const const
iterator begin()
iterator end()
iterator erase(const_iterator begin, const_iterator end)
T & first()
bool isEmpty() const const
qsizetype removeAll(const AT &t)
iterator insert(const Key &key, const T &value)
bool invokeMethod(QObject *context, Functor &&function, FunctorReturnType *ret)
Q_EMITQ_EMIT
QMetaObject::Connection connect(const QObject *sender, PointerToMemberFunction signal, Functor functor)
QVariant property(const char *name) const const
bool setProperty(const char *name, QVariant &&value)
T * data() const const
bool contains(const QSet< T > &other) const const
bool isEmpty() const const
QString fromLocal8Bit(QByteArrayView str)
bool isEmpty() const const
QStringView mid(qsizetype start, qsizetype length) const const
int compare(QChar ch) const const
CaseInsensitive
QFuture< ArgsType< Signal > > connect(Sender *sender, Signal signal)
QThread * currentThread()
void setInterval(int msec)
bool isActive() const const
void setSingleShot(bool singleShot)
void start()
void stop()
void timeout()
QVariant fromValue(T &&value)
T value() const const
Q_D(Todo)
This file is part of the KDE documentation.
Documentation copyright © 1996-2024 The KDE developers.
Generated on Fri Nov 22 2024 12:03:33 by doxygen 1.12.0 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.