8#include "itemretriever.h"
10#include "connection.h"
11#include "storage/itemqueryhelper.h"
12#include "storage/itemretrievalmanager.h"
13#include "storage/itemretrievalrequest.h"
14#include "storage/parthelper.h"
15#include "storage/parttypehelper.h"
16#include "storage/querybuilder.h"
17#include "storage/selectquerybuilder.h"
20#include "private/protocol_p.h"
21#include "shared/akranges.h"
25#include "akonadiserver_debug.h"
28using namespace Akonadi::Server;
29using namespace AkRanges;
31Q_DECLARE_METATYPE(ItemRetrievalResult)
34 : mItemRetrievalManager(manager)
35 , mConnection(connection)
41 qRegisterMetaType<ItemRetrievalResult>(
"Akonadi::Server::ItemRetrievalResult");
43 connect(mConnection, &Connection::disconnected,
this, [
this]() {
54void ItemRetriever::setRetrieveParts(
const QList<QByteArray> &parts)
57 std::sort(mParts.begin(), mParts.end());
58 mParts.erase(std::unique(mParts.begin(), mParts.end()), mParts.end());
61 if (mFullPayload && !mParts.contains(AKONADI_PARAM_PLD_RFC822)) {
62 mParts.append(AKONADI_PARAM_PLD_RFC822);
66void ItemRetriever::setItemSet(
const QList<PimItem::Id> &set,
const Collection &collection)
69 mCollection = collection;
72void ItemRetriever::setItem(PimItem::Id
id)
77void ItemRetriever::setRetrieveFullPayload(
bool fullPayload)
79 mFullPayload = fullPayload;
81 if (fullPayload && !mParts.contains(AKONADI_PARAM_PLD_RFC822)) {
82 mParts.append(AKONADI_PARAM_PLD_RFC822);
88 mCollection = collection;
90 mRecursive = recursive;
98Scope ItemRetriever::scope()
const
103void ItemRetriever::setChangedSince(
const QDateTime &changedSince)
105 mChangedSince = changedSince;
125 QueryBuilder qb(PimItem::tableName());
127 qb.addJoin(
QueryBuilder::InnerJoin, Collection::tableName(), PimItem::collectionIdFullColumnName(), Collection::idFullColumnName());
129 qb.addJoin(
QueryBuilder::LeftJoin, Part::tableName(), PimItem::idFullColumnName(), Part::pimItemIdFullColumnName());
131 Query::Condition partTypeJoinCondition;
132 partTypeJoinCondition.
addColumnCondition(Part::partTypeIdFullColumnName(), Query::Equals, PartType::idFullColumnName());
133 if (!mFullPayload && !mParts.isEmpty()) {
136 partTypeJoinCondition.
addValueCondition(PartType::nsFullColumnName(), Query::Equals, QStringLiteral(
"PLD"));
139 qb.addColumn(PimItem::idFullColumnName());
140 qb.addColumn(PimItem::collectionIdFullColumnName());
141 qb.addColumn(Collection::resourceIdFullColumnName());
142 qb.addColumn(PartType::nameFullColumnName());
143 qb.addColumn(Part::datasizeFullColumnName());
145 if (!mItemSet.isEmpty() || mCollection.isValid()) {
153 const Resource res = Resource::retrieveByName(
QString::fromUtf8(mConnection->sessionId()));
155 qb.addValueCondition(Collection::resourceIdFullColumnName(), Query::NotEquals, res.id());
159 if (mChangedSince.isValid()) {
160 qb.addValueCondition(PimItem::datetimeFullColumnName(), Query::GreaterOrEqual, mChangedSince.toUTC());
163 qb.addSortColumn(PimItem::idFullColumnName(), Query::Ascending);
166 mLastError =
"Unable to retrieve items";
167 throw ItemRetrieverException(mLastError);
179 return std::all_of(req.parts.
begin(), req.parts.
end(), [&availableParts](
const auto &part) {
180 return availableParts.contains(part);
185bool ItemRetriever::runItemRetrievalRequests(std::list<ItemRetrievalRequest> requests)
187 QEventLoop eventLoop;
188 std::vector<ItemRetrievalRequest::Id> pendingRequests;
189 connect(&mItemRetrievalManager,
190 &ItemRetrievalManager::requestFinished,
192 [
this, &eventLoop, &pendingRequests](
const ItemRetrievalResult &result) {
193 const auto requestId = std::find(pendingRequests.begin(), pendingRequests.end(), result.request.id);
194 if (requestId != pendingRequests.end()) {
197 }
else if (result.errorMsg.has_value()) {
198 mLastError = result.errorMsg->toUtf8();
201 Q_EMIT itemsRetrieved(result.request.ids);
202 pendingRequests.erase(requestId);
203 if (pendingRequests.empty()) {
211 connect(mConnection, &Connection::connectionClosing, &eventLoop, [&eventLoop]() {
216 for (
auto &&request : requests) {
217 if ((!mFullPayload && request.parts.isEmpty()) || request.ids.isEmpty()) {
227 pendingRequests.push_back(request.id);
228 mItemRetrievalManager.requestItemDelivery(std::move(request));
229 }
catch (
const ItemRetrieverException &e) {
230 qCCritical(AKONADISERVER_LOG) << e.type() <<
": " << e.what();
231 mLastError = e.what();
236 if (!pendingRequests.empty()) {
237 if (eventLoop.
exec()) {
245std::optional<ItemRetriever::PreparedRequests> ItemRetriever::prepareRequests(QSqlQuery &query,
const QByteArrayList &parts)
247 QHash<qint64, QString> resourceIdNameCache;
248 std::list<ItemRetrievalRequest> requests;
249 QHash<qint64 ,
decltype(requests)::iterator> colRequests;
250 QHash<qint64 ,
decltype(requests)::iterator> itemRequests;
251 QList<qint64> readyItems;
252 qint64 prevPimItemId = -1;
253 QSet<QByteArray> availableParts;
254 auto lastRequest = requests.
end();
255 while (
query.isValid()) {
256 const qint64 pimItemId =
query.
value(PimItemIdColumn).toLongLong();
257 const qint64 collectionId =
query.
value(CollectionIdColumn).toLongLong();
258 const qint64 resourceId =
query.
value(ResourceIdColumn).toLongLong();
259 const auto itemIter = itemRequests.constFind(pimItemId);
261 if (Q_UNLIKELY(mCanceled)) {
265 if (pimItemId == prevPimItemId) {
266 if (
query.
value(PartTypeNameColumn).isNull()) {
274 if (lastRequest != requests.end()) {
275 if (hasAllParts(*lastRequest, availableParts)) {
279 lastRequest->ids.removeOne(prevPimItemId);
280 itemRequests.remove(prevPimItemId);
281 readyItems.push_back(prevPimItemId);
284 availableParts.
clear();
285 prevPimItemId = pimItemId;
288 if (itemIter != itemRequests.constEnd()) {
289 lastRequest = itemIter.value();
291 const auto colIt = colRequests.find(collectionId);
292 lastRequest = (colIt == colRequests.end()) ? requests.end() : colIt.value();
293 if (lastRequest == requests.end() || lastRequest->ids.size() > 100) {
294 requests.emplace_front(ItemRetrievalRequest{});
295 lastRequest = requests.begin();
296 lastRequest->ids.push_back(pimItemId);
297 auto resIter = resourceIdNameCache.
find(resourceId);
298 if (resIter == resourceIdNameCache.
end()) {
299 resIter = resourceIdNameCache.
insert(resourceId, Resource::retrieveById(resourceId).
name());
301 lastRequest->resourceId = *resIter;
302 lastRequest->parts = parts;
303 colRequests.
insert(collectionId, lastRequest);
304 itemRequests.insert(pimItemId, lastRequest);
306 lastRequest->ids.push_back(pimItemId);
307 itemRequests.insert(pimItemId, lastRequest);
308 colRequests.insert(collectionId, lastRequest);
311 Q_ASSERT(lastRequest != requests.end());
313 if (
query.
value(PartTypeNameColumn).isNull()) {
319 qint64 datasize =
query.
value(PartDatasizeColumn).toLongLong();
320 const QByteArray partName = Utils::variantToByteArray(
query.
value(PartTypeNameColumn));
321 Q_ASSERT(!partName.
startsWith(AKONADI_PARAM_PLD));
324 if (mFullPayload && !lastRequest->parts.contains(partName)) {
325 lastRequest->parts.push_back(partName);
330 availableParts.
insert(partName);
338 if (lastRequest != requests.end() && hasAllParts(*lastRequest, availableParts)) {
339 lastRequest->ids.removeOne(prevPimItemId);
340 readyItems.push_back(prevPimItemId);
344 return PreparedRequests{std::move(requests), std::move(readyItems)};
347bool ItemRetriever::exec()
349 if (mParts.isEmpty() && !mFullPayload) {
355 auto qb = buildQuery();
356 auto &
query = qb.query();
357 const auto parts = mParts | Views::filter([](
const auto &part) {
358 return part.startsWith(AKONADI_PARAM_PLD);
360 | Views::transform([](
const auto &part) {
365 auto requests = prepareRequests(query, parts);
366 if (!requests.has_value()) {
370 if (!requests->readyItems.isEmpty()) {
371 Q_EMIT itemsRetrieved(requests->readyItems);
374 if (!runItemRetrievalRequests(std::move(requests->requests))) {
380 if (mRecursive && mCollection.isValid()) {
381 const auto children = mCollection.children();
382 for (
const Collection &col :
children) {
383 ItemRetriever retriever(mItemRetrievalManager, mConnection, mContext);
384 retriever.setCollection(col, mRecursive);
385 retriever.setRetrieveParts(mParts);
386 retriever.setRetrieveFullPayload(mFullPayload);
387 connect(&retriever, &ItemRetriever::itemsRetrieved,
this, &ItemRetriever::itemsRetrieved);
388 result = retriever.exec();
398void ItemRetriever::verifyCache()
400 if (!connection() || !connection()->verifyCacheOnRetrieval()) {
404 SelectQueryBuilder<Part> qb;
406 qb.
addValueCondition(Part::storageFullColumnName(), Query::Equals, Part::External);
408 if (mScope.scope() != Scope::Invalid) {
415 mLastError = QByteArrayLiteral(
"Unable to query parts.");
416 throw ItemRetrieverException(mLastError);
419 const Part::List externalParts = qb.
result();
420 for (Part part : externalParts) {
425QByteArray ItemRetriever::lastError()
const
430#include "moc_itemretriever.cpp"
Represents a collection of PIM items.
An Connection represents one connection of a client to the server.
Manages and processes item retrieval requests.
Details of a single item retrieval request.
void setScope(const Scope &scope)
Retrieve all items matching the given item scope.
void setCollection(const Collection &collection, bool recursive=true)
Retrieve all items in the given collection.
Helper class to construct arbitrary SQL queries.
void addValueCondition(const QString &column, Query::CompareOperator op, const QVariant &value, ConditionType type=WhereCondition)
Add a WHERE or HAVING condition which compares a column with a given value.
void addJoin(JoinType joinType, const QString &table, const Query::Condition &condition)
Join a table to the query.
bool exec()
Executes the query, returns true on success.
@ InnerJoin
NOTE: only supported for UPDATE and SELECT queries.
@ LeftJoin
NOTE: only supported for SELECT queries.
void addColumnCondition(const QString &column, CompareOperator op, const QString &column2)
Add a WHERE condition which compares a column with another column.
void addValueCondition(const QString &column, CompareOperator op, const QVariant &value)
Add a WHERE condition which compares a column with a given value.
void addCondition(const Condition &condition)
Add a WHERE condition.
QList< T > result()
Returns the result of this SELECT query.
void scopeToQuery(const Scope &scope, const CommandContext &context, QueryBuilder &qb)
Add conditions to qb for the given item operation scope scope.
void itemSetToQuery(const QList< PimItem::Id > &set, QueryBuilder &qb, const Collection &collection=Collection())
Add conditions to qb for the given item set set.
bool verify(Part &part)
Verifies and if necessary fixes the external reference of this part.
Query::Condition conditionFromFqNames(const QStringList &fqNames)
Returns a query condition that matches the given part type list.
Helper integration between Akonadi and Qt.
KSERVICE_EXPORT KService::List query(FilterFunc filterFunc)
QString name(StandardAction id)
bool startsWith(QByteArrayView bv) const const
int exec(ProcessEventsFlags flags)
void exit(int returnCode)
iterator find(const Key &key)
iterator insert(const Key &key, const T &value)
iterator insert(const_iterator before, parameter_type value)
T value(qsizetype i) const const
const QObjectList & children() const const
QMetaObject::Connection connect(const QObject *sender, PointerToMemberFunction signal, Functor functor)
iterator insert(const T &value)
QString fromUtf8(QByteArrayView str)
QFuture< ArgsType< Signal > > connect(Sender *sender, Signal signal)