8#include "itemfetchhelper.h"
11#include "connection.h"
13#include "handlerhelper.h"
14#include "shared/akranges.h"
15#include "storage/itemqueryhelper.h"
16#include "storage/itemretrievalmanager.h"
17#include "storage/parttypehelper.h"
18#include "storage/selectquerybuilder.h"
19#include "storage/transaction.h"
21#include "agentmanagerinterface.h"
22#include "akonadiserver_debug.h"
23#include "intervalcheck.h"
26#include "private/dbus_p.h"
33#include <QElapsedTimer>
36using namespace Akonadi::Server;
37using namespace AkRanges;
39#define ENABLE_FETCH_PROFILING 0
40#if ENABLE_FETCH_PROFILING
41#define BEGIN_TIMER(name) \
42 QElapsedTimer name##Timer; \
45#define END_TIMER(name) const double name##Elapsed = name##Timer.nsecsElapsed() / 1000000.0;
46#define PROF_INC(name) ++name;
48#define BEGIN_TIMER(name)
49#define END_TIMER(name)
53ItemFetchHelper::ItemFetchHelper(
Connection *connection,
55 const Protocol::ItemFetchScope &itemFetchScope,
56 const Protocol::TagFetchScope &tagFetchScope,
57 AkonadiServer &akonadi,
58 const Protocol::FetchLimit &itemsLimit)
59 : ItemFetchHelper(connection, connection->context(), scope, itemFetchScope, tagFetchScope, akonadi, itemsLimit)
63ItemFetchHelper::ItemFetchHelper(
Connection *connection,
64 const CommandContext &context,
66 const Protocol::ItemFetchScope &itemFetchScope,
67 const Protocol::TagFetchScope &tagFetchScope,
68 AkonadiServer &akonadi,
69 const Protocol::FetchLimit &itemsLimit)
70 : mConnection(connection)
73 , mItemFetchScope(itemFetchScope)
74 , mTagFetchScope(tagFetchScope)
76 , mItemsLimit(itemsLimit)
79 std::fill(mItemQueryColumnMap, mItemQueryColumnMap + ItemQueryColumnCount, -1);
82void ItemFetchHelper::disableATimeUpdates()
84 mUpdateATimeEnabled =
false;
87enum PartQueryColumns {
89 PartQueryTypeIdColumn,
91 PartQueryStorageColumn,
92 PartQueryVersionColumn,
93 PartQueryDataSizeColumn
101 if (!partList.
isEmpty() || allPayload || allAttrs) {
102 partQuery.addJoin(QueryBuilder::InnerJoin, Part::tableName(), partQuery.getTableWithColumn(PimItem::idColumn()), Part::pimItemIdFullColumnName());
103 partQuery.addColumn(partQuery.getTableWithColumn(PimItem::idColumn()));
104 partQuery.addColumn(Part::partTypeIdFullColumnName());
105 partQuery.addColumn(Part::dataFullColumnName());
106 partQuery.addColumn(Part::storageFullColumnName());
107 partQuery.addColumn(Part::versionFullColumnName());
108 partQuery.addColumn(Part::datasizeFullColumnName());
110 partQuery.addSortColumn(partQuery.getTableWithColumn(PimItem::idColumn()), Query::Descending);
112 if (!partList.isEmpty() || allPayload || allAttrs) {
113 Query::Condition cond(Query::Or);
114 for (const QByteArray &b : std::as_const(partList)) {
115 if (b.startsWith(
"PLD") || b.startsWith(
"ATR")) {
116 cond.addValueCondition(Part::partTypeIdFullColumnName(), Query::Equals, PartTypeHelper::fromFqName(b).id());
119 if (allPayload || allAttrs) {
120 partQuery.addJoin(QueryBuilder::InnerJoin, PartType::tableName(), Part::partTypeIdFullColumnName(), PartType::idFullColumnName());
122 cond.addValueCondition(PartType::nsFullColumnName(), Query::Equals, QStringLiteral(
"PLD"));
125 cond.addValueCondition(PartType::nsFullColumnName(), Query::Equals, QStringLiteral(
"ATR"));
129 partQuery.addCondition(cond);
134 if (!partQuery.
exec()) {
135 throw HandlerException(
"Unable to list item parts");
147#define ADD_COLUMN(colName, colId) \
149 itemQuery.addColumn(colName); \
150 mItemQueryColumnMap[colId] = column++; \
152 ADD_COLUMN(PimItem::idFullColumnName(), ItemQueryPimItemIdColumn);
153 if (mItemFetchScope.fetchRemoteId()) {
154 ADD_COLUMN(PimItem::remoteIdFullColumnName(), ItemQueryPimItemRidColumn)
156 ADD_COLUMN(PimItem::mimeTypeIdFullColumnName(), ItemQueryMimeTypeIdColumn)
157 ADD_COLUMN(PimItem::revFullColumnName(), ItemQueryRevColumn)
158 if (mItemFetchScope.fetchRemoteRevision()) {
159 ADD_COLUMN(PimItem::remoteRevisionFullColumnName(), ItemQueryRemoteRevisionColumn)
161 if (mItemFetchScope.fetchSize()) {
162 ADD_COLUMN(PimItem::sizeFullColumnName(), ItemQuerySizeColumn)
164 if (mItemFetchScope.fetchMTime()) {
165 ADD_COLUMN(PimItem::datetimeFullColumnName(), ItemQueryDatetimeColumn)
167 ADD_COLUMN(PimItem::collectionIdFullColumnName(), ItemQueryCollectionIdColumn)
168 if (mItemFetchScope.fetchGID()) {
169 ADD_COLUMN(PimItem::gidFullColumnName(), ItemQueryPimItemGidColumn)
173 itemQuery.addSortColumn(PimItem::idFullColumnName(),
static_cast<Query::SortOrder>(mItemsLimit.sortOrder()));
174 if (mItemsLimit.limit() > 0) {
175 itemQuery.setLimit(mItemsLimit.limit(), mItemsLimit.limitOffset());
180 if (mItemFetchScope.changedSince().isValid()) {
181 itemQuery.addValueCondition(PimItem::datetimeFullColumnName(), Query::GreaterOrEqual, mItemFetchScope.changedSince().toUTC());
184 if (!itemQuery.
exec()) {
185 throw HandlerException(
"Unable to list items");
188 itemQuery.query().
next();
193enum FlagQueryColumns {
194 FlagQueryPimItemIdColumn,
195 FlagQueryFlagIdColumn,
203 PimItemFlagRelation::tableName(),
205 PimItemFlagRelation::leftFullColumnName());
208 flagQuery.
addColumn(PimItemFlagRelation::rightFullColumnName());
213 if (!flagQuery.
exec()) {
214 throw HandlerException(
"Unable to retrieve item flags");
222enum TagQueryColumns {
223 TagQueryItemIdColumn,
230 if (mItemsLimit.limit() > 0) {
235 PimItemTagRelation::tableName(),
236 tagQuery.getTableWithColumn(PimItem::idColumn()),
237 PimItemTagRelation::leftFullColumnName());
238 tagQuery.addJoin(
QueryBuilder::InnerJoin, Tag::tableName(), Tag::idFullColumnName(), PimItemTagRelation::rightFullColumnName());
239 tagQuery.addColumn(tagQuery.getTableWithColumn(PimItem::idColumn()));
240 tagQuery.addColumn(Tag::idFullColumnName());
243 tagQuery.addSortColumn(tagQuery.getTableWithColumn(PimItem::idColumn()), Query::Descending);
245 if (!tagQuery.exec()) {
246 throw HandlerException(
"Unable to retrieve item tags");
249 tagQuery.query().next();
254enum VRefQueryColumns {
255 VRefQueryCollectionIdColumn,
256 VRefQueryItemIdColumn,
262 if (mItemsLimit.limit() > 0) {
263 vRefQuery =
QueryBuilder(itemQuery, mPimItemQueryAlias);
267 CollectionPimItemRelation::tableName(),
268 CollectionPimItemRelation::rightFullColumnName(),
269 vRefQuery.getTableWithColumn(PimItem::idColumn()));
270 vRefQuery.addColumn(CollectionPimItemRelation::leftFullColumnName());
271 vRefQuery.addColumn(CollectionPimItemRelation::rightFullColumnName());
273 vRefQuery.addSortColumn(vRefQuery.getTableWithColumn(PimItem::idColumn()), Query::Descending);
275 if (!vRefQuery.exec()) {
276 throw HandlerException(
"Unable to retrieve virtual references");
279 vRefQuery.query().next();
284bool ItemFetchHelper::isScopeLocal(
const Scope &scope)
287 if (!mConnection->sessionId().
startsWith(
"akonadi_indexing_agent")) {
292 QueryBuilder qb(PimItem::tableName(), QueryBuilder::Select);
293 qb.setDistinct(
true);
294 qb.addColumn(Resource::nameFullColumnName());
295 qb.addJoin(
QueryBuilder::LeftJoin, Collection::tableName(), PimItem::collectionIdFullColumnName(), Collection::idFullColumnName());
296 qb.addJoin(
QueryBuilder::LeftJoin, Resource::tableName(), Collection::resourceIdFullColumnName(), Resource::idFullColumnName());
298 if (mContext.resource().isValid()) {
299 qb.addValueCondition(Resource::nameFullColumnName(), Query::NotEquals, mContext.resource().
name());
303 throw HandlerException(
"Failed to query database");
320 org::freedesktop::Akonadi::AgentManager manager(DBus::serviceName(DBus::Control), QStringLiteral(
"/AgentManager"),
QDBusConnection::sessionBus());
321 const QString typeIdentifier = manager.agentInstanceType(resourceName);
323 return properties.value(QStringLiteral(
"HasLocalStorage"),
false).toBool();
326DataStore *ItemFetchHelper::storageBackend()
const
329 if (
auto store = mConnection->storageBackend()) {
337bool ItemFetchHelper::fetchItems(std::function<
void(Protocol::FetchItemsResponse &&)> &&itemCallback)
348 BEGIN_TIMER(itemRetriever)
349 BEGIN_TIMER(scopeLocal)
350#if ENABLE_FETCH_PROFILING
351 double scopeLocalElapsed = 0;
353 if (!mItemFetchScope.cacheOnly() || isScopeLocal(mScope)) {
354#if ENABLE_FETCH_PROFILING
355 scopeLocalElapsed = scopeLocalTimer.elapsed();
359 triggerOnDemandFetch();
363 ItemRetriever retriever(mAkonadi.itemRetrievalManager(), mConnection, mContext);
364 retriever.setScope(mScope);
365 retriever.setRetrieveParts(mItemFetchScope.requestedPayloads());
366 retriever.setRetrieveFullPayload(mItemFetchScope.fullPayload());
367 retriever.setChangedSince(mItemFetchScope.changedSince());
368 if (!retriever.exec() && !mItemFetchScope.ignoreErrors()) {
369 if (mContext.resource().isValid()) {
370 throw HandlerException(QStringLiteral(
"Unable to fetch item from backend (collection %1, resource %2) : %3")
371 .arg(mContext.collectionId())
372 .arg(mContext.resource().id())
375 throw HandlerException(QStringLiteral(
"Unable to fetch item from backend (collection %1) : %2")
376 .arg(mContext.collectionId())
381 END_TIMER(itemRetriever)
384 std::optional<QueryBuilder> itemQb = buildItemQuery();
385 auto &itemQuery = itemQb->query();
391 if (mItemFetchScope.ignoreErrors()) {
394 switch (mScope.scope()) {
397 case Scope::HierarchicalRid:
399 throw HandlerException(
"Item query returned empty result set");
407 std::optional<QueryBuilder> partQb;
408 if (!mItemFetchScope.requestedParts().isEmpty() || mItemFetchScope.fullPayload() || mItemFetchScope.allAttributes()) {
409 partQb = buildPartQuery(itemQuery, mItemFetchScope.requestedParts(), mItemFetchScope.fullPayload(), mItemFetchScope.allAttributes());
415 std::optional<QueryBuilder> flagQb;
416 if (mItemFetchScope.fetchFlags()) {
417 flagQb = buildFlagQuery(itemQuery);
423 std::optional<QueryBuilder> tagQb;
424 if (mItemFetchScope.fetchTags()) {
425 tagQb = buildTagQuery(itemQuery);
430 std::optional<QueryBuilder> vRefQb;
431 if (mItemFetchScope.fetchVirtualReferences()) {
432 vRefQb = buildVRefQuery(itemQuery);
436#if ENABLE_FETCH_PROFILING
444 BEGIN_TIMER(processing)
451 const qint64 pimItemId = extractQueryResult(itemQuery, ItemQueryPimItemIdColumn).
toLongLong();
452 const int pimItemRev = extractQueryResult(itemQuery, ItemQueryRevColumn).
toInt();
454 Protocol::FetchItemsResponse response;
455 response.setId(pimItemId);
456 response.setRevision(pimItemRev);
457 const qint64 mimeTypeId = extractQueryResult(itemQuery, ItemQueryMimeTypeIdColumn).
toLongLong();
458 auto mtIter = mimeTypeIdNameCache.
find(mimeTypeId);
459 if (mtIter == mimeTypeIdNameCache.
end()) {
460 mtIter = mimeTypeIdNameCache.
insert(mimeTypeId, MimeType::retrieveById(mimeTypeId).
name());
462 response.setMimeType(mtIter.value());
463 if (mItemFetchScope.fetchRemoteId()) {
464 response.setRemoteId(extractQueryResult(itemQuery, ItemQueryPimItemRidColumn).
toString());
466 response.setParentId(extractQueryResult(itemQuery, ItemQueryCollectionIdColumn).toLongLong());
468 if (mItemFetchScope.fetchSize()) {
469 response.setSize(extractQueryResult(itemQuery, ItemQuerySizeColumn).toLongLong());
471 if (mItemFetchScope.fetchMTime()) {
472 response.setMTime(extractQueryResult(itemQuery, ItemQueryDatetimeColumn).toDateTime());
474 if (mItemFetchScope.fetchRemoteRevision()) {
475 response.setRemoteRevision(extractQueryResult(itemQuery, ItemQueryRemoteRevisionColumn).
toString());
477 if (mItemFetchScope.fetchGID()) {
478 response.setGid(extractQueryResult(itemQuery, ItemQueryPimItemGidColumn).
toString());
481 if (mItemFetchScope.fetchFlags() && flagQb) {
483 auto &flagQuery = flagQb->
query();
484 while (flagQuery.isValid()) {
485 const qint64
id = flagQuery.value(FlagQueryPimItemIdColumn).toLongLong();
486 if (
id > pimItemId) {
489 }
else if (
id < pimItemId) {
492 const qint64 flagId = flagQuery.value(FlagQueryFlagIdColumn).toLongLong();
493 auto flagNameIter = flagIdNameCache.
find(flagId);
494 if (flagNameIter == flagIdNameCache.
end()) {
495 flagNameIter = flagIdNameCache.
insert(flagId, Flag::retrieveById(flagId).
name().toUtf8());
497 flags << flagNameIter.
value();
500 response.setFlags(flags);
503 if (mItemFetchScope.fetchTags() && tagQb) {
506 auto &tagQuery = tagQb->query();
507 while (tagQuery.isValid()) {
509 const qint64
id = tagQuery.value(TagQueryItemIdColumn).toLongLong();
510 if (
id > pimItemId) {
513 }
else if (
id < pimItemId) {
516 tagIds << tagQuery.
value(TagQueryTagIdColumn).toLongLong();
520 if (mTagFetchScope.fetchIdOnly()) {
521 tags = tagIds | Views::transform([](
const auto tagId) {
522 Protocol::FetchTagsResponse resp;
526 | Actions::toQVector;
528 tags = tagIds | Views::transform([
this](
const auto tagId) {
529 return HandlerHelper::fetchTagsResponse(Tag::retrieveById(tagId), mTagFetchScope, mConnection);
531 | Actions::toQVector;
533 response.setTags(tags);
536 if (mItemFetchScope.fetchVirtualReferences() && vRefQb) {
538 auto &vRefQuery = vRefQb->query();
539 while (vRefQuery.isValid()) {
541 const qint64
id = vRefQuery.value(VRefQueryItemIdColumn).toLongLong();
542 if (
id > pimItemId) {
545 }
else if (
id < pimItemId) {
548 vRefs << vRefQuery.
value(VRefQueryCollectionIdColumn).toLongLong();
551 response.setVirtualReferences(vRefs);
554 if (mItemFetchScope.ancestorDepth() != Protocol::ItemFetchScope::NoAncestor) {
555 response.setAncestors(ancestorsForItem(response.parentId()));
558 bool skipItem =
false;
563 auto &partQuery = partQb->
query();
564 while (partQuery.isValid()) {
566 const qint64
id = partQuery.value(PartQueryPimIdColumn).toLongLong();
567 if (
id > pimItemId) {
570 }
else if (
id < pimItemId) {
574 const qint64 partTypeId = partQuery.value(PartQueryTypeIdColumn).toLongLong();
575 auto ptIter = partTypeIdNameCache.
find(partTypeId);
576 if (ptIter == partTypeIdNameCache.
end()) {
579 Protocol::PartMetaData metaPart;
580 Protocol::StreamPayloadResponse partData;
581 partData.setPayloadName(ptIter.value());
582 metaPart.setName(ptIter.value());
583 metaPart.setVersion(partQuery.value(PartQueryVersionColumn).toInt());
584 metaPart.setSize(partQuery.value(PartQueryDataSizeColumn).toLongLong());
586 const QByteArray data = Utils::variantToByteArray(partQuery.value(PartQueryDataColumn));
587 if (mItemFetchScope.checkCachedPayloadPartsOnly()) {
589 cachedParts << ptIter.
value();
593 if (mItemFetchScope.ignoreErrors() && data.
isEmpty()) {
596 qCDebug(AKONADISERVER_LOG) <<
"item" <<
id <<
"has an empty payload part in parttable for part" << metaPart.name();
600 metaPart.setStorageType(
static_cast<Protocol::PartMetaData::StorageType
>(partQuery.value(PartQueryStorageColumn).toInt()));
604 partData.setData(data);
606 partData.setMetaData(metaPart);
608 if (mItemFetchScope.requestedParts().contains(ptIter.value()) || mItemFetchScope.fullPayload() || mItemFetchScope.allAttributes()) {
615 response.setParts(parts);
623 if (mItemFetchScope.checkCachedPayloadPartsOnly()) {
624 response.setCachedParts(cachedParts);
628 itemCallback(std::move(response));
630 mConnection->sendResponse(std::move(response));
642 END_TIMER(processing)
646 if (mUpdateATimeEnabled && (needsAccessTimeUpdate(mItemFetchScope.requestedParts()) || mItemFetchScope.fullPayload())) {
647 updateItemAccessTime();
652#if ENABLE_FETCH_PROFILING
653 qCDebug(AKONADISERVER_LOG) <<
"ItemFetchHelper execution stats:";
654 qCDebug(AKONADISERVER_LOG) <<
"\tItems query:" << itemsElapsed <<
"ms," << itemsCount <<
" items in total";
655 qCDebug(AKONADISERVER_LOG) <<
"\tFlags query:" << flagsElapsed <<
"ms, " << flagsCount <<
" flags in total";
656 qCDebug(AKONADISERVER_LOG) <<
"\tParts query:" << partsElapsed <<
"ms, " << partsCount <<
" parts in total";
657 qCDebug(AKONADISERVER_LOG) <<
"\tTags query: " << tagsElapsed <<
"ms, " << tagsCount <<
" tags in total";
658 qCDebug(AKONADISERVER_LOG) <<
"\tVRefs query:" << vRefsElapsed <<
"ms, " << vRefsCount <<
" vRefs in total";
659 qCDebug(AKONADISERVER_LOG) <<
"\t------------";
660 qCDebug(AKONADISERVER_LOG) <<
"\tItem retriever:" << itemRetrieverElapsed <<
"ms (scope local:" << scopeLocalElapsed <<
"ms)";
661 qCDebug(AKONADISERVER_LOG) <<
"\tTotal query:" << (itemsElapsed + flagsElapsed + partsElapsed + tagsElapsed + vRefsElapsed) <<
"ms";
662 qCDebug(AKONADISERVER_LOG) <<
"\tTotal processing: " << processingElapsed <<
"ms";
663 qCDebug(AKONADISERVER_LOG) <<
"\tATime update:" << aTimeElapsed <<
"ms";
664 qCDebug(AKONADISERVER_LOG) <<
"\t============";
665 qCDebug(AKONADISERVER_LOG) <<
"\tTotal FETCH:" << fetchElapsed <<
"ms";
666 qCDebug(AKONADISERVER_LOG);
667 qCDebug(AKONADISERVER_LOG);
679 return parts.
contains(AKONADI_PARAM_PLD_RFC822);
682void ItemFetchHelper::updateItemAccessTime()
684 Transaction transaction(storageBackend(), QStringLiteral(
"update atime"));
685 QueryBuilder qb(PimItem::tableName(), QueryBuilder::Update);
690 qCWarning(AKONADISERVER_LOG) <<
"Unable to update item access time";
692 transaction.commit();
696void ItemFetchHelper::triggerOnDemandFetch()
698 if (mContext.collectionId() <= 0 || mItemFetchScope.cacheOnly()) {
702 Collection collection = mContext.collection();
705 if (mConnection->sessionId() == collection.resource().name().
toLatin1()) {
710 if (!collection.cachePolicySyncOnDemand()) {
719 if (mItemFetchScope.ancestorDepth() == Protocol::ItemFetchScope::NoAncestor || parentColId == 0) {
722 const auto it = mAncestorCache.
constFind(parentColId);
723 if (it != mAncestorCache.
cend()) {
728 Collection col = Collection::retrieveById(parentColId);
729 const int depthNum = mItemFetchScope.ancestorDepth() == Protocol::ItemFetchScope::ParentAncestor ? 1 : INT_MAX;
730 for (
int i = 0; i < depthNum; ++i) {
731 if (!col.isValid()) {
732 Protocol::Ancestor ancestor;
734 ancestors << ancestor;
737 Protocol::Ancestor ancestor;
738 ancestor.setId(col.id());
739 ancestor.setRemoteId(col.
remoteId());
740 ancestors << ancestor;
743 mAncestorCache.
insert(parentColId, ancestors);
747QVariant ItemFetchHelper::extractQueryResult(
const QSqlQuery &query, ItemFetchHelper::ItemQueryColumns column)
const
749 const int colId = mItemQueryColumnMap[column];
750 Q_ASSERT(colId >= 0);
Represents a collection of PIM items.
qint64 Id
Describes the unique id type.
QString remoteId() const
Returns the remote id of the collection.
An Connection represents one connection of a client to the server.
This class handles all the database access.
static DataStore * self()
Per thread singleton.
virtual void activeCachePolicy(Collection &col)
Determines the active cache policy for this Collection.
void requestCollectionSync(const Collection &collection)
Requests the given collection to be synced.
Helper class for retrieving missing items parts from remote resources.
Helper class to construct arbitrary SQL queries.
void addSortColumn(const QString &column, Query::SortOrder order=Query::Ascending)
Add sort column.
QString getTableWithColumn(const QString &column) const
Returns concatenated table name with column name.
void addJoin(JoinType joinType, const QString &table, const Query::Condition &condition)
Join a table to the query.
bool exec()
Executes the query, returns true on success.
QSqlQuery & query()
Returns the query, only valid after exec().
void addColumn(const QString &col)
Adds the given column to a select query.
@ InnerJoin
NOTE: only supported for UPDATE and SELECT queries.
@ LeftJoin
NOTE: only supported for SELECT queries.
Helper class for DataStore transaction handling.
void scopeToQuery(const Scope &scope, const CommandContext &context, QueryBuilder &qb)
Add conditions to qb for the given item operation scope scope.
QString fullName(const PartType &type)
Returns full part name.
Helper integration between Akonadi and Qt.
char * toString(const EngineQuery &query)
KSERVICE_EXPORT KService::List query(FilterFunc filterFunc)
QString name(StandardAction id)
bool isEmpty() const const
bool startsWith(QByteArrayView bv) const const
QDateTime currentDateTimeUtc()
QDBusConnection sessionBus()
const_iterator cend() const const
const_iterator constFind(const Key &key) const const
iterator find(const Key &key)
iterator insert(const Key &key, const T &value)
void append(QList< T > &&value)
bool contains(const AT &value) const const
bool isEmpty() const const
qsizetype size() const const
T value(qsizetype i) const const
bool isValid() const const
QString fromLatin1(QByteArrayView str)
QByteArray toLatin1() const const
int toInt(bool *ok) const const
qlonglong toLongLong(bool *ok) const const