Akonadi

itemsync.cpp
1/*
2 SPDX-FileCopyrightText: 2007 Tobias Koenig <tokoe@kde.org>
3 SPDX-FileCopyrightText: 2007 Volker Krause <vkrause@kde.org>
4 SPDX-FileCopyrightText: 2014 Christian Mollekopf <mollekopf@kolabsys.com>
5
6 SPDX-License-Identifier: LGPL-2.0-or-later
7*/
8
9#include "itemsync.h"
10
11#include "collection.h"
12#include "item_p.h"
13#include "itemcreatejob.h"
14#include "itemdeletejob.h"
15#include "itemfetchjob.h"
16#include "itemfetchscope.h"
17#include "job_p.h"
18#include "protocol_p.h"
19#include "transactionsequence.h"
20
21#include "akonadicore_debug.h"
22
23using namespace Akonadi;
24
25/**
26 * @internal
27 */
28class Akonadi::ItemSyncPrivate : public JobPrivate
29{
30public:
31 explicit ItemSyncPrivate(ItemSync *parent)
32 : JobPrivate(parent)
33 , mTransactionMode(ItemSync::SingleTransaction)
34 , mCurrentTransaction(nullptr)
35 , mTransactionJobs(0)
36 , mPendingJobs(0)
37 , mProgress(0)
38 , mTotalItems(-1)
39 , mTotalItemsProcessed(0)
40 , mStreaming(false)
41 , mIncremental(false)
42 , mDeliveryDone(false)
43 , mFinished(false)
44 , mFullListingDone(false)
45 , mProcessingBatch(false)
46 , mDisableAutomaticDeliveryDone(false)
47 , mBatchSize(10)
48 , mMergeMode(Akonadi::ItemSync::RIDMerge)
49 {
50 }
51
52 void createOrMerge(const Item &item);
53 void checkDone();
54 void slotItemsReceived(const Item::List &items);
55 void slotLocalListDone(KJob *job);
56 void slotLocalDeleteDone(KJob *job);
57 void slotLocalChangeDone(KJob *job);
58 void execute();
59 void processItems();
60 void processBatch();
61 void deleteItems(const Item::List &items);
62 void slotTransactionResult(KJob *job);
63 void requestTransaction();
64 Job *subjobParent() const;
65 void fetchLocalItemsToDelete();
66 QString jobDebuggingString() const override;
67 bool allProcessed() const;
68
69 Q_DECLARE_PUBLIC(ItemSync)
70 Collection mSyncCollection;
71 QSet<QString> mListedItems;
72
73 ItemSync::TransactionMode mTransactionMode;
74 TransactionSequence *mCurrentTransaction = nullptr;
75 int mTransactionJobs;
76
77 Akonadi::Item::List mRemoteItemQueue;
78 Akonadi::Item::List mRemovedRemoteItemQueue;
79 Akonadi::Item::List mCurrentBatchRemoteItems;
80 Akonadi::Item::List mCurrentBatchRemovedRemoteItems;
81 Akonadi::Item::List mItemsToDelete;
82
83 QDateTime mItemSyncStart;
84
85 // create counter
86 int mPendingJobs;
87 int mProgress;
88 int mTotalItems;
89 int mTotalItemsProcessed;
90
91 bool mStreaming;
92 bool mIncremental;
93 bool mDeliveryDone;
94 bool mFinished;
95 bool mFullListingDone;
96 bool mProcessingBatch;
97 bool mDisableAutomaticDeliveryDone;
98
99 int mBatchSize;
100 Akonadi::ItemSync::MergeMode mMergeMode;
101};
102
103void ItemSyncPrivate::createOrMerge(const Item &item)
104{
105 Q_Q(ItemSync);
106 // don't try to do anything in error state
107 if (q->error()) {
108 return;
109 }
110 mPendingJobs++;
111 Item modifiedItem = item;
112 if (mItemSyncStart.isValid()) {
113 modifiedItem.setModificationTime(mItemSyncStart);
114 }
115 auto create = new ItemCreateJob(modifiedItem, mSyncCollection, subjobParent());
117 if (mMergeMode == ItemSync::GIDMerge && !item.gid().isEmpty()) {
119 } else {
121 }
122 create->setMerge(merge);
123 q->connect(create, &ItemCreateJob::result, q, [this](KJob *job) {
124 slotLocalChangeDone(job);
125 });
126}
127
128bool ItemSyncPrivate::allProcessed() const
129{
130 return mDeliveryDone && mCurrentBatchRemoteItems.isEmpty() && mRemoteItemQueue.isEmpty() && mRemovedRemoteItemQueue.isEmpty()
131 && mCurrentBatchRemovedRemoteItems.isEmpty();
132}
133
134void ItemSyncPrivate::checkDone()
135{
136 Q_Q(ItemSync);
137 q->setProcessedAmount(KJob::Bytes, mProgress);
138 if (mPendingJobs > 0) {
139 return;
140 }
141
142 if (mTransactionJobs > 0) {
143 // Commit the current transaction if we're in batch processing mode or done
144 // and wait until the transaction is committed to process the next batch
145 if (mTransactionMode == ItemSync::MultipleTransactions || (mDeliveryDone && mRemoteItemQueue.isEmpty())) {
146 if (mCurrentTransaction) {
147 // Note that mCurrentTransaction->commit() is a no-op if we're already rolling back
148 // so this signal is a bit misleading (but it's only used by unittests it seems)
149 Q_EMIT q->transactionCommitted();
150 mCurrentTransaction->commit();
151 mCurrentTransaction = nullptr;
152 }
153 return;
154 }
155 }
156 mProcessingBatch = false;
157
158 if (q->error() == Job::UserCanceled && mTransactionJobs == 0 && !mFinished) {
159 qCDebug(AKONADICORE_LOG) << "ItemSync of collection" << mSyncCollection.id() << "finished due to user cancelling";
160 mFinished = true;
161 q->emitResult();
162 return;
163 }
164
165 if (!mRemoteItemQueue.isEmpty()) {
166 execute();
167 // We don't have enough items, request more
168 if (!mProcessingBatch) {
169 Q_EMIT q->readyForNextBatch(mBatchSize - mRemoteItemQueue.size());
170 }
171 return;
172 }
173 Q_EMIT q->readyForNextBatch(mBatchSize);
174
175 if (allProcessed() && !mFinished) {
176 // prevent double result emission, can happen since checkDone() is called from all over the place
177 qCDebug(AKONADICORE_LOG) << "ItemSync of collection" << mSyncCollection.id() << "finished";
178 mFinished = true;
179 q->emitResult();
180 }
181}
182
183ItemSync::ItemSync(const Collection &collection, const QDateTime &timestamp, QObject *parent)
184 : Job(new ItemSyncPrivate(this), parent)
185{
186 qCDebug(AKONADICORE_LOG) << "Created ItemSync(colId=" << collection.id() << ", timestamp=" << timestamp << ")";
187 Q_D(ItemSync);
188 d->mSyncCollection = collection;
189 if (timestamp.isValid()) {
190 d->mItemSyncStart = timestamp;
191 }
192}
193
197
199{
200 /*
201 * We received a list of items from the server:
202 * * fetch all local id's + rid's only
203 * * check each full sync item whether it's locally available
204 * * if it is modify the item
205 * * if it's not create it
206 * * delete all superfluous items
207 */
208 Q_D(ItemSync);
209 Q_ASSERT(!d->mIncremental);
210 if (!d->mStreaming) {
211 d->mDeliveryDone = true;
212 }
213 d->mRemoteItemQueue += items;
214 d->mTotalItemsProcessed += items.count();
215 qCDebug(AKONADICORE_LOG) << "Received batch: " << items.count() << "Already processed: " << d->mTotalItemsProcessed
216 << "Expected total amount: " << d->mTotalItems;
217 if (!d->mDisableAutomaticDeliveryDone && (d->mTotalItemsProcessed == d->mTotalItems)) {
218 d->mDeliveryDone = true;
219 }
220 d->execute();
221}
222
224{
225 Q_D(ItemSync);
226 Q_ASSERT(!d->mIncremental);
227 Q_ASSERT(amount >= 0);
229 qCDebug(AKONADICORE_LOG) << "Expected total amount:" << amount;
230 d->mTotalItems = amount;
232 if (!d->mDisableAutomaticDeliveryDone && (d->mTotalItems == 0)) {
233 d->mDeliveryDone = true;
234 d->execute();
235 }
236}
237
239{
240 Q_D(ItemSync);
241 d->mDisableAutomaticDeliveryDone = disable;
242}
243
244void ItemSync::setIncrementalSyncItems(const Item::List &changedItems, const Item::List &removedItems)
245{
246 /*
247 * We received an incremental listing of items:
248 * * for each changed item:
249 * ** If locally available => modify
250 * ** else => create
251 * * removed items can be removed right away
252 */
253 Q_D(ItemSync);
254 d->mIncremental = true;
255 if (!d->mStreaming) {
256 d->mDeliveryDone = true;
257 }
258 d->mRemoteItemQueue += changedItems;
259 d->mRemovedRemoteItemQueue += removedItems;
260 d->mTotalItemsProcessed += changedItems.count() + removedItems.count();
261 qCDebug(AKONADICORE_LOG) << "Received: " << changedItems.count() << "Removed: " << removedItems.count() << "In total: " << d->mTotalItemsProcessed
262 << " Wanted: " << d->mTotalItems;
263 if (!d->mDisableAutomaticDeliveryDone && (d->mTotalItemsProcessed == d->mTotalItems)) {
264 d->mDeliveryDone = true;
265 }
266 d->execute();
267}
268
270{
271}
272
273void ItemSyncPrivate::fetchLocalItemsToDelete()
274{
275 Q_Q(ItemSync);
276 if (mIncremental) {
277 qFatal("This must not be called while in incremental mode");
278 return;
279 }
280 auto job = new ItemFetchJob(mSyncCollection, subjobParent());
281 job->fetchScope().setFetchRemoteIdentification(true);
282 job->fetchScope().setFetchModificationTime(false);
283 job->setDeliveryOption(ItemFetchJob::EmitItemsIndividually);
284 // we only can fetch parts already in the cache, otherwise this will deadlock
285 job->fetchScope().setCacheOnly(true);
286
288 slotItemsReceived(lst);
289 });
290 QObject::connect(job, &ItemFetchJob::result, q, [this](KJob *job) {
291 slotLocalListDone(job);
292 });
293 mPendingJobs++;
294}
295
296void ItemSyncPrivate::slotItemsReceived(const Item::List &items)
297{
298 for (const Akonadi::Item &item : items) {
299 // Don't delete items that have not yet been synchronized
300 if (item.remoteId().isEmpty()) {
301 continue;
302 }
303 if (!mListedItems.contains(item.remoteId())) {
304 mItemsToDelete << Item(item.id());
305 }
306 }
307}
308
309void ItemSyncPrivate::slotLocalListDone(KJob *job)
310{
311 mPendingJobs--;
312 if (job->error()) {
313 qCWarning(AKONADICORE_LOG) << job->errorString();
314 }
315 deleteItems(mItemsToDelete);
316 checkDone();
317}
318
319QString ItemSyncPrivate::jobDebuggingString() const
320{
321 // TODO: also print out mIncremental and mTotalItemsProcessed, but they are set after the job
322 // started, so this requires passing jobDebuggingString to jobEnded().
323 return QStringLiteral("Collection %1 (%2)").arg(mSyncCollection.id()).arg(mSyncCollection.name());
324}
325
326void ItemSyncPrivate::execute()
327{
328 // shouldn't happen
329 if (mFinished) {
330 qCWarning(AKONADICORE_LOG) << "Call to execute() on finished job.";
331 Q_ASSERT(false);
332 return;
333 }
334 // not doing anything, start processing
335 if (!mProcessingBatch) {
336 if (mRemoteItemQueue.size() >= mBatchSize || mDeliveryDone) {
337 // we have a new batch to process
338 const int num = qMin(mBatchSize, mRemoteItemQueue.size());
339 mCurrentBatchRemoteItems.reserve(mBatchSize);
340 std::move(mRemoteItemQueue.begin(), mRemoteItemQueue.begin() + num, std::back_inserter(mCurrentBatchRemoteItems));
341 mRemoteItemQueue.erase(mRemoteItemQueue.begin(), mRemoteItemQueue.begin() + num);
342
343 mCurrentBatchRemovedRemoteItems += mRemovedRemoteItemQueue;
344 mRemovedRemoteItemQueue.clear();
345 } else {
346 // nothing to do, let's wait for more data
347 return;
348 }
349 mProcessingBatch = true;
350 processBatch();
351 return;
352 }
353 checkDone();
354}
355
356// process the current batch of items
357void ItemSyncPrivate::processBatch()
358{
359 Q_Q(ItemSync);
360 if (mCurrentBatchRemoteItems.isEmpty() && !mDeliveryDone) {
361 return;
362 }
363 if (q->error() == Job::UserCanceled) {
364 checkDone();
365 return;
366 }
367
368 // request a transaction, there are items that require processing
369 requestTransaction();
370
371 processItems();
372
373 // removed
374 if (!mIncremental && allProcessed()) {
375 // the full listing is done and we know which items to remove
376 fetchLocalItemsToDelete();
377 } else {
378 deleteItems(mCurrentBatchRemovedRemoteItems);
379 mCurrentBatchRemovedRemoteItems.clear();
380 }
381
382 checkDone();
383}
384
385void ItemSyncPrivate::processItems()
386{
387 // added / updated
388 for (const Item &remoteItem : std::as_const(mCurrentBatchRemoteItems)) {
389 if (remoteItem.remoteId().isEmpty()) {
390 qCWarning(AKONADICORE_LOG) << "Item " << remoteItem.id() << " does not have a remote identifier";
391 continue;
392 }
393 if (!mIncremental) {
394 mListedItems << remoteItem.remoteId();
395 }
396 createOrMerge(remoteItem);
397 }
398 mCurrentBatchRemoteItems.clear();
399}
400
401void ItemSyncPrivate::deleteItems(const Item::List &itemsToDelete)
402{
403 Q_Q(ItemSync);
404 // if in error state, better not change anything anymore
405 if (q->error()) {
406 return;
407 }
408
409 if (itemsToDelete.isEmpty()) {
410 return;
411 }
412
413 mPendingJobs++;
414 auto job = new ItemDeleteJob(itemsToDelete, subjobParent());
415 q->connect(job, &ItemDeleteJob::result, q, [this](KJob *job) {
416 slotLocalDeleteDone(job);
417 });
418
419 // It can happen that the groupware servers report us deleted items
420 // twice, in this case this item delete job will fail on the second try.
421 // To avoid a rollback of the complete transaction we gracefully allow the job
422 // to fail :)
423 auto transaction = qobject_cast<TransactionSequence *>(subjobParent());
424 if (transaction) {
425 transaction->setIgnoreJobFailure(job);
426 }
427}
428
429void ItemSyncPrivate::slotLocalDeleteDone(KJob *job)
430{
431 if (job->error()) {
432 qCWarning(AKONADICORE_LOG) << "Deleting items from the akonadi database failed:" << job->errorString();
433 }
434 mPendingJobs--;
435 mProgress++;
436
437 checkDone();
438}
439
440void ItemSyncPrivate::slotLocalChangeDone(KJob *job)
441{
442 if (job->error() && job->error() != Job::KilledJobError) {
443 qCWarning(AKONADICORE_LOG) << "Creating/updating items from the akonadi database failed:" << job->errorString();
444 mRemoteItemQueue.clear(); // don't try to process any more items after a rollback
445 }
446 mPendingJobs--;
447 mProgress++;
448
449 checkDone();
450}
451
452void ItemSyncPrivate::slotTransactionResult(KJob *job)
453{
454 --mTransactionJobs;
455 if (mCurrentTransaction == job) {
456 mCurrentTransaction = nullptr;
457 }
458
459 checkDone();
460}
461
462void ItemSyncPrivate::requestTransaction()
463{
464 Q_Q(ItemSync);
465 // we never want parallel transactions, single transaction just makes one big transaction, and multi transaction uses multiple transaction sequentially
466 if (!mCurrentTransaction) {
467 ++mTransactionJobs;
468 mCurrentTransaction = new TransactionSequence(q);
469 mCurrentTransaction->setAutomaticCommittingEnabled(false);
470 QObject::connect(mCurrentTransaction, &TransactionSequence::result, q, [this](KJob *job) {
471 slotTransactionResult(job);
472 });
473 }
474}
475
476Job *ItemSyncPrivate::subjobParent() const
477{
478 Q_Q(const ItemSync);
479 if (mCurrentTransaction && mTransactionMode != ItemSync::NoTransaction) {
480 return mCurrentTransaction;
481 }
482 return const_cast<ItemSync *>(q);
483}
484
486{
487 Q_D(ItemSync);
488 d->mStreaming = enable;
489}
490
492{
493 Q_D(ItemSync);
494 Q_ASSERT(d->mStreaming);
495 d->mDeliveryDone = true;
496 d->execute();
497}
498
499void ItemSync::slotResult(KJob *job)
500{
501 if (job->error()) {
502 qCWarning(AKONADICORE_LOG) << "Error during ItemSync: " << job->errorString();
503 // pretend there were no errors
505 // propagate the first error we got but continue, we might still be fed with stuff from a resource
506 if (!error()) {
507 setError(job->error());
508 setErrorText(job->errorText());
509 }
510 } else {
511 Akonadi::Job::slotResult(job);
512 }
513}
514
516{
517 Q_D(ItemSync);
518 qCDebug(AKONADICORE_LOG) << "The item sync is being rolled-back.";
520 if (d->mCurrentTransaction) {
521 d->mCurrentTransaction->rollback();
522 }
523 d->mDeliveryDone = true; // user won't deliver more data
524 d->execute(); // end this in an ordered way, since we have an error set no real change will be done
525}
526
528{
529 Q_D(ItemSync);
530 d->mTransactionMode = mode;
531}
532
534{
535 Q_D(const ItemSync);
536 return d->mBatchSize;
537}
538
540{
541 Q_D(ItemSync);
542 d->mBatchSize = size;
543}
544
545ItemSync::MergeMode ItemSync::mergeMode() const
546{
547 Q_D(const ItemSync);
548 return d->mMergeMode;
549}
550
551void ItemSync::setMergeMode(MergeMode mergeMode)
552{
553 Q_D(ItemSync);
554 d->mMergeMode = mergeMode;
555}
556
557#include "moc_itemsync.cpp"
Represents a collection of PIM items.
Definition collection.h:62
Job that creates a new item in the Akonadi storage.
@ Silent
Only return the id of the merged/created item.
@ RID
Merge by remote id.
Job that deletes items from the Akonadi storage.
Job that fetches items from the Akonadi storage.
void itemsReceived(const Akonadi::Item::List &items)
This signal is emitted whenever new items have been fetched completely.
@ EmitItemsIndividually
emitted via signal upon reception
Syncs between items known to a client (usually a resource) and the Akonadi storage.
Definition itemsync.h:41
ItemSync(const Collection &collection, const QDateTime &timestamp={}, QObject *parent=nullptr)
Creates a new item synchronizer.
Definition itemsync.cpp:183
void setTotalItems(int amount)
Set the amount of items which you are going to return in total by using the setFullSyncItems()/setInc...
Definition itemsync.cpp:223
TransactionMode
Transaction mode used by ItemSync.
Definition itemsync.h:129
@ NoTransaction
Use no transaction at all, provides highest responsiveness (might therefore feel faster even when act...
Definition itemsync.h:133
@ MultipleTransactions
Use one transaction per chunk of delivered items, good compromise between the other two when using st...
Definition itemsync.h:132
void setDisableAutomaticDeliveryDone(bool disable)
Disables the automatic completion of the item sync, based on the number of delivered items.
Definition itemsync.cpp:238
void setIncrementalSyncItems(const Item::List &changedItems, const Item::List &removedItems)
Sets the item lists for incrementally syncing the collection.
Definition itemsync.cpp:244
~ItemSync() override
Destroys the item synchronizer.
Definition itemsync.cpp:194
MergeMode mergeMode() const
Returns current merge mode.
Definition itemsync.cpp:545
void setBatchSize(int)
Set the batch size.
Definition itemsync.cpp:539
void setFullSyncItems(const Item::List &items)
Sets the full item list for the collection.
Definition itemsync.cpp:198
void doStart() override
This method must be reimplemented in the concrete jobs.
Definition itemsync.cpp:269
void rollback()
Aborts the sync process and rolls back all not yet committed transactions.
Definition itemsync.cpp:515
void deliveryDone()
Notify ItemSync that all remote items have been delivered.
Definition itemsync.cpp:491
int batchSize() const
Minimum number of items required to start processing in streaming mode.
Definition itemsync.cpp:533
void setStreamingEnabled(bool enable)
Enable item streaming.
Definition itemsync.cpp:485
void setMergeMode(MergeMode mergeMode)
Set what merge method should be used for next ItemSync run.
Definition itemsync.cpp:551
void setTransactionMode(TransactionMode mode)
Set the transaction mode to use for this sync.
Definition itemsync.cpp:527
Represents a PIM item stored in Akonadi storage.
Definition item.h:100
Base class for all actions in the Akonadi storage.
Definition job.h:81
@ UserCanceled
The user canceled this job.
Definition job.h:101
bool removeSubjob(KJob *job) override
Removes the given subjob of this job.
Definition job.cpp:369
Base class for jobs that need to run a sequence of sub-jobs in a transaction.
void commit()
Commits the transaction as soon as all pending sub-jobs finished successfully.
void setAutomaticCommittingEnabled(bool enable)
Disable automatic committing.
void setErrorText(const QString &errorText)
virtual QString errorString() const
void setTotalAmount(Unit unit, qulonglong amount)
int error() const
void result(KJob *job)
void setError(int errorCode)
QString errorText() const
Helper integration between Akonadi and Qt.
QStringView merge(QStringView lhs, QStringView rhs)
QAction * create(StandardAction id, const Receiver *recvr, Func slot, QObject *parent, std::optional< Qt::ConnectionType > connectionType=std::nullopt)
bool isValid() const const
iterator begin()
void clear()
qsizetype count() const const
iterator erase(const_iterator begin, const_iterator end)
bool isEmpty() const const
void reserve(qsizetype size)
qsizetype size() const const
QMetaObject::Connection connect(const QObject *sender, PointerToMemberFunction signal, Functor functor)
bool contains(const QSet< T > &other) const const
QString arg(Args &&... args) const const
Q_D(Todo)
This file is part of the KDE documentation.
Documentation copyright © 1996-2024 The KDE developers.
Generated on Mon Nov 18 2024 12:08:29 by doxygen 1.12.0 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.