11#include "collection.h"
13#include "itemcreatejob.h"
14#include "itemdeletejob.h"
15#include "itemfetchjob.h"
16#include "itemfetchscope.h"
18#include "protocol_p.h"
19#include "transactionsequence.h"
21#include "akonadicore_debug.h"
28class Akonadi::ItemSyncPrivate :
public JobPrivate
31 explicit ItemSyncPrivate(ItemSync *parent)
33 , mTransactionMode(ItemSync::SingleTransaction)
34 , mCurrentTransaction(nullptr)
39 , mTotalItemsProcessed(0)
42 , mDeliveryDone(false)
44 , mFullListingDone(false)
45 , mProcessingBatch(false)
46 , mDisableAutomaticDeliveryDone(false)
48 , mMergeMode(Akonadi::ItemSync::RIDMerge)
52 void createOrMerge(
const Item &item);
54 void slotItemsReceived(
const Item::List &items);
55 void slotLocalListDone(KJob *job);
56 void slotLocalDeleteDone(KJob *job);
57 void slotLocalChangeDone(KJob *job);
62 void slotTransactionResult(KJob *job);
63 void requestTransaction();
64 Job *subjobParent()
const;
65 void fetchLocalItemsToDelete();
66 QString jobDebuggingString()
const override;
67 bool allProcessed()
const;
69 Q_DECLARE_PUBLIC(ItemSync)
70 Collection mSyncCollection;
71 QSet<QString> mListedItems;
74 TransactionSequence *mCurrentTransaction =
nullptr;
83 QDateTime mItemSyncStart;
89 int mTotalItemsProcessed;
95 bool mFullListingDone;
96 bool mProcessingBatch;
97 bool mDisableAutomaticDeliveryDone;
100 Akonadi::ItemSync::MergeMode mMergeMode;
103void ItemSyncPrivate::createOrMerge(
const Item &item)
111 Item modifiedItem = item;
112 if (mItemSyncStart.isValid()) {
113 modifiedItem.setModificationTime(mItemSyncStart);
115 auto create =
new ItemCreateJob(modifiedItem, mSyncCollection, subjobParent());
117 if (mMergeMode == ItemSync::GIDMerge && !item.gid().isEmpty()) {
124 slotLocalChangeDone(job);
128bool ItemSyncPrivate::allProcessed()
const
130 return mDeliveryDone && mCurrentBatchRemoteItems.isEmpty() && mRemoteItemQueue.isEmpty() && mRemovedRemoteItemQueue.isEmpty()
131 && mCurrentBatchRemovedRemoteItems.isEmpty();
134void ItemSyncPrivate::checkDone()
138 if (mPendingJobs > 0) {
142 if (mTransactionJobs > 0) {
146 if (mCurrentTransaction) {
149 Q_EMIT q->transactionCommitted();
150 mCurrentTransaction->commit();
151 mCurrentTransaction =
nullptr;
156 mProcessingBatch =
false;
159 qCDebug(AKONADICORE_LOG) <<
"ItemSync of collection" << mSyncCollection.id() <<
"finished due to user cancelling";
165 if (!mRemoteItemQueue.isEmpty()) {
168 if (!mProcessingBatch) {
169 Q_EMIT q->readyForNextBatch(mBatchSize - mRemoteItemQueue.size());
173 Q_EMIT q->readyForNextBatch(mBatchSize);
175 if (allProcessed() && !mFinished) {
177 qCDebug(AKONADICORE_LOG) <<
"ItemSync of collection" << mSyncCollection.id() <<
"finished";
186 qCDebug(AKONADICORE_LOG) <<
"Created ItemSync(colId=" << collection.id() <<
", timestamp=" << timestamp <<
")";
188 d->mSyncCollection = collection;
190 d->mItemSyncStart = timestamp;
209 Q_ASSERT(!d->mIncremental);
210 if (!d->mStreaming) {
211 d->mDeliveryDone =
true;
213 d->mRemoteItemQueue += items;
214 d->mTotalItemsProcessed += items.
count();
215 qCDebug(AKONADICORE_LOG) <<
"Received batch: " << items.
count() <<
"Already processed: " << d->mTotalItemsProcessed
216 <<
"Expected total amount: " << d->mTotalItems;
217 if (!d->mDisableAutomaticDeliveryDone && (d->mTotalItemsProcessed == d->mTotalItems)) {
218 d->mDeliveryDone =
true;
226 Q_ASSERT(!d->mIncremental);
227 Q_ASSERT(amount >= 0);
229 qCDebug(AKONADICORE_LOG) <<
"Expected total amount:" << amount;
230 d->mTotalItems = amount;
232 if (!d->mDisableAutomaticDeliveryDone && (d->mTotalItems == 0)) {
233 d->mDeliveryDone =
true;
241 d->mDisableAutomaticDeliveryDone = disable;
254 d->mIncremental =
true;
255 if (!d->mStreaming) {
256 d->mDeliveryDone =
true;
258 d->mRemoteItemQueue += changedItems;
259 d->mRemovedRemoteItemQueue += removedItems;
260 d->mTotalItemsProcessed += changedItems.
count() + removedItems.
count();
261 qCDebug(AKONADICORE_LOG) <<
"Received: " << changedItems.
count() <<
"Removed: " << removedItems.
count() <<
"In total: " << d->mTotalItemsProcessed
262 <<
" Wanted: " << d->mTotalItems;
263 if (!d->mDisableAutomaticDeliveryDone && (d->mTotalItemsProcessed == d->mTotalItems)) {
264 d->mDeliveryDone =
true;
273void ItemSyncPrivate::fetchLocalItemsToDelete()
277 qFatal(
"This must not be called while in incremental mode");
280 auto job =
new ItemFetchJob(mSyncCollection, subjobParent());
281 job->fetchScope().setFetchRemoteIdentification(
true);
282 job->fetchScope().setFetchModificationTime(
false);
285 job->fetchScope().setCacheOnly(
true);
288 slotItemsReceived(lst);
291 slotLocalListDone(job);
296void ItemSyncPrivate::slotItemsReceived(
const Item::List &items)
298 for (
const Akonadi::Item &item : items) {
300 if (item.remoteId().isEmpty()) {
303 if (!mListedItems.contains(item.remoteId())) {
304 mItemsToDelete << Item(item.id());
309void ItemSyncPrivate::slotLocalListDone(KJob *job)
315 deleteItems(mItemsToDelete);
319QString ItemSyncPrivate::jobDebuggingString()
const
323 return QStringLiteral(
"Collection %1 (%2)").arg(mSyncCollection.id()).arg(mSyncCollection.name());
326void ItemSyncPrivate::execute()
330 qCWarning(AKONADICORE_LOG) <<
"Call to execute() on finished job.";
335 if (!mProcessingBatch) {
336 if (mRemoteItemQueue.size() >= mBatchSize || mDeliveryDone) {
338 const int num = qMin(mBatchSize, mRemoteItemQueue.size());
339 mCurrentBatchRemoteItems.reserve(mBatchSize);
340 std::move(mRemoteItemQueue.begin(), mRemoteItemQueue.begin() + num, std::back_inserter(mCurrentBatchRemoteItems));
341 mRemoteItemQueue.erase(mRemoteItemQueue.begin(), mRemoteItemQueue.begin() + num);
343 mCurrentBatchRemovedRemoteItems += mRemovedRemoteItemQueue;
344 mRemovedRemoteItemQueue.clear();
349 mProcessingBatch =
true;
357void ItemSyncPrivate::processBatch()
360 if (mCurrentBatchRemoteItems.isEmpty() && !mDeliveryDone) {
369 requestTransaction();
374 if (!mIncremental && allProcessed()) {
376 fetchLocalItemsToDelete();
378 deleteItems(mCurrentBatchRemovedRemoteItems);
379 mCurrentBatchRemovedRemoteItems.clear();
385void ItemSyncPrivate::processItems()
388 for (
const Item &remoteItem : std::as_const(mCurrentBatchRemoteItems)) {
389 if (remoteItem.remoteId().isEmpty()) {
390 qCWarning(AKONADICORE_LOG) <<
"Item " << remoteItem.id() <<
" does not have a remote identifier";
394 mListedItems << remoteItem.remoteId();
396 createOrMerge(remoteItem);
398 mCurrentBatchRemoteItems.clear();
401void ItemSyncPrivate::deleteItems(
const Item::List &itemsToDelete)
414 auto job =
new ItemDeleteJob(itemsToDelete, subjobParent());
416 slotLocalDeleteDone(job);
423 auto transaction = qobject_cast<TransactionSequence *>(subjobParent());
425 transaction->setIgnoreJobFailure(job);
429void ItemSyncPrivate::slotLocalDeleteDone(KJob *job)
432 qCWarning(AKONADICORE_LOG) <<
"Deleting items from the akonadi database failed:" << job->
errorString();
440void ItemSyncPrivate::slotLocalChangeDone(KJob *job)
442 if (job->
error() && job->
error() != Job::KilledJobError) {
443 qCWarning(AKONADICORE_LOG) <<
"Creating/updating items from the akonadi database failed:" << job->
errorString();
444 mRemoteItemQueue.
clear();
452void ItemSyncPrivate::slotTransactionResult(KJob *job)
455 if (mCurrentTransaction == job) {
456 mCurrentTransaction =
nullptr;
462void ItemSyncPrivate::requestTransaction()
466 if (!mCurrentTransaction) {
468 mCurrentTransaction =
new TransactionSequence(q);
469 mCurrentTransaction->setAutomaticCommittingEnabled(
false);
471 slotTransactionResult(job);
476Job *ItemSyncPrivate::subjobParent()
const
480 return mCurrentTransaction;
482 return const_cast<ItemSync *
>(q);
488 d->mStreaming = enable;
494 Q_ASSERT(d->mStreaming);
495 d->mDeliveryDone =
true;
499void ItemSync::slotResult(
KJob *job)
502 qCWarning(AKONADICORE_LOG) <<
"Error during ItemSync: " << job->
errorString();
511 Akonadi::Job::slotResult(job);
518 qCDebug(AKONADICORE_LOG) <<
"The item sync is being rolled-back.";
520 if (d->mCurrentTransaction) {
521 d->mCurrentTransaction->rollback();
523 d->mDeliveryDone =
true;
530 d->mTransactionMode = mode;
536 return d->mBatchSize;
542 d->mBatchSize = size;
548 return d->mMergeMode;
557#include "moc_itemsync.cpp"
Represents a collection of PIM items.
@ Silent
Only return the id of the merged/created item.
Job that fetches items from the Akonadi storage.
void itemsReceived(const Akonadi::Item::List &items)
This signal is emitted whenever new items have been fetched completely.
@ EmitItemsIndividually
emitted via signal upon reception
Syncs between items known to a client (usually a resource) and the Akonadi storage.
ItemSync(const Collection &collection, const QDateTime ×tamp={}, QObject *parent=nullptr)
Creates a new item synchronizer.
void setTotalItems(int amount)
Set the amount of items which you are going to return in total by using the setFullSyncItems()/setInc...
TransactionMode
Transaction mode used by ItemSync.
@ NoTransaction
Use no transaction at all, provides highest responsiveness (might therefore feel faster even when act...
@ MultipleTransactions
Use one transaction per chunk of delivered items, good compromise between the other two when using st...
void setDisableAutomaticDeliveryDone(bool disable)
Disables the automatic completion of the item sync, based on the number of delivered items.
void setIncrementalSyncItems(const Item::List &changedItems, const Item::List &removedItems)
Sets the item lists for incrementally syncing the collection.
~ItemSync() override
Destroys the item synchronizer.
MergeMode mergeMode() const
Returns current merge mode.
void setBatchSize(int)
Set the batch size.
void setFullSyncItems(const Item::List &items)
Sets the full item list for the collection.
void doStart() override
This method must be reimplemented in the concrete jobs.
void rollback()
Aborts the sync process and rolls back all not yet committed transactions.
void deliveryDone()
Notify ItemSync that all remote items have been delivered.
int batchSize() const
Minimum number of items required to start processing in streaming mode.
void setStreamingEnabled(bool enable)
Enable item streaming.
void setMergeMode(MergeMode mergeMode)
Set what merge method should be used for next ItemSync run.
void setTransactionMode(TransactionMode mode)
Set the transaction mode to use for this sync.
QList< Item > List
Describes a list of items.
Base class for all actions in the Akonadi storage.
Job(QObject *parent=nullptr)
Creates a new job.
@ UserCanceled
The user canceled this job.
bool removeSubjob(KJob *job) override
Removes the given subjob of this job.
void setErrorText(const QString &errorText)
virtual QString errorString() const
void setTotalAmount(Unit unit, qulonglong amount)
void setError(int errorCode)
QString errorText() const
Helper integration between Akonadi and Qt.
QStringView merge(QStringView lhs, QStringView rhs)
QAction * create(StandardAction id, const QObject *recvr, const char *slot, QObject *parent)
bool isValid() const const
qsizetype count() const const
bool isEmpty() const const
QMetaObject::Connection connect(const QObject *sender, PointerToMemberFunction signal, Functor functor)
QObject * parent() const const