7#include "searchtaskmanager.h"
8#include "agentsearchinstance.h"
9#include "akonadiserver_search_debug.h"
10#include "connection.h"
12#include "storage/selectquerybuilder.h"
14#include "private/dbus_p.h"
16#include <QDBusConnection>
17#include <QDeadlineTimer>
22using namespace Akonadi::Server;
24SearchTaskManager::SearchTaskManager()
25 : AkThread(QStringLiteral(
"SearchTaskManager"))
31SearchTaskManager::~SearchTaskManager()
33 QMutexLocker locker(&mLock);
40 mInstancesLock.lock();
41 qDeleteAll(mInstances);
42 mInstancesLock.unlock();
45void SearchTaskManager::registerInstance(
const QString &
id)
47 QMutexLocker locker(&mInstancesLock);
49 qCDebug(AKONADISERVER_SEARCH_LOG) <<
"SearchManager::registerInstance(" <<
id <<
")";
51 AgentSearchInstance *instance = mInstances.value(
id);
56 instance =
new AgentSearchInstance(
id, *
this);
57 if (!instance->init()) {
58 qCDebug(AKONADISERVER_SEARCH_LOG) <<
"Failed to initialize Search agent";
63 qCDebug(AKONADISERVER_SEARCH_LOG) <<
"Registering search instance " << id;
64 mInstances.
insert(
id, instance);
67void SearchTaskManager::unregisterInstance(
const QString &
id)
69 QMutexLocker locker(&mInstancesLock);
72 if (it != mInstances.end()) {
73 qCDebug(AKONADISERVER_SEARCH_LOG) <<
"Unregistering search instance" << id;
74 it.
value()->deleteLater();
79void SearchTaskManager::addTask(SearchTask *task)
81 QueryBuilder qb(Collection::tableName());
82 qb.addJoin(
QueryBuilder::InnerJoin, Resource::tableName(), Collection::resourceIdFullColumnName(), Resource::idFullColumnName());
83 qb.addColumn(Collection::idFullColumnName());
84 qb.addColumn(Resource::nameFullColumnName());
86 Q_ASSERT(!
task->collections.isEmpty());
89 for (qint64 collection : std::as_const(
task->collections)) {
92 qb.addValueCondition(Collection::idFullColumnName(), Query::In, list);
95 throw SearchException(qb.query().lastError().text());
98 auto &
query = qb.query();
103 mInstancesLock.lock();
105 org::freedesktop::Akonadi::AgentManager agentManager(DBus::serviceName(DBus::Control), QStringLiteral(
"/AgentManager"),
QDBusConnection::sessionBus());
107 const QString resourceId =
query.
value(1).toString();
108 if (!mInstances.contains(resourceId)) {
109 qCDebug(AKONADISERVER_SEARCH_LOG) <<
"Resource" << resourceId <<
"does not implement Search interface, skipping";
110 }
else if (!agentManager.agentInstanceOnline(resourceId)) {
111 qCDebug(AKONADISERVER_SEARCH_LOG) <<
"Agent" << resourceId <<
"is offline, skipping";
112 }
else if (agentManager.agentInstanceStatus(resourceId) > 2) {
113 qCDebug(AKONADISERVER_SEARCH_LOG) <<
"Agent" << resourceId <<
"is broken or not configured";
115 const qint64 collectionId =
query.
value(0).toLongLong();
116 qCDebug(AKONADISERVER_SEARCH_LOG) <<
"Enqueued search query (" << resourceId <<
", " << collectionId <<
")";
117 task->queries << qMakePair(resourceId, collectionId);
119 }
while (
query.next());
120 mInstancesLock.unlock();
122 QMutexLocker locker(&mLock);
123 mTasklist.append(task);
127void SearchTaskManager::pushResults(
const QByteArray &searchId,
const QSet<qint64> &ids,
Connection *connection)
131 const auto resourceName = connection->context().resource().
name();
132 qCDebug(AKONADISERVER_SEARCH_LOG) << ids.
count() <<
"results for search" << searchId <<
"pushed from" << resourceName;
134 QMutexLocker locker(&mLock);
135 ResourceTask *
task = mRunningTasks.take(resourceName);
137 qCDebug(AKONADISERVER_SEARCH_LOG) <<
"No running task for" << resourceName <<
" - maybe it has timed out?";
141 if (
task->parentTask->id != searchId) {
142 qCDebug(AKONADISERVER_SEARCH_LOG) <<
"Received results for different search - maybe the original task has timed out?";
143 qCDebug(AKONADISERVER_SEARCH_LOG) <<
"Search is" << searchId <<
", but task is" <<
task->parentTask->id;
148 mPendingResults.append(task);
153bool SearchTaskManager::allResourceTasksCompleted(SearchTask *agentSearchTask)
const
156 if (!agentSearchTask->queries.
isEmpty()) {
161 QMap<QString, ResourceTask *>::const_iterator it = mRunningTasks.
begin();
162 QMap<QString, ResourceTask *>::const_iterator
end = mRunningTasks.end();
163 for (; it !=
end; ++it) {
164 if (it.value()->parentTask == agentSearchTask) {
174 ResourceTask *
task = iter.value();
175 SearchTask *parentTask =
task->parentTask;
176 QMutexLocker locker(&parentTask->sharedLock);
180 parentTask->complete = allResourceTasksCompleted(parentTask);
181 parentTask->notifier.
wakeAll();
187void SearchTaskManager::searchLoop()
189 qint64 timeout = ULONG_MAX;
191 QMutexLocker locker(&mLock);
194 qCDebug(AKONADISERVER_SEARCH_LOG) <<
"Search loop is waiting, will wake again in" << timeout <<
"ms";
197 for (SearchTask *task : std::as_const(mTasklist)) {
198 QMutexLocker locker(&
task->sharedLock);
199 task->queries.clear();
200 task->notifier.wakeAll();
204 for (; it != mRunningTasks.
end();) {
205 if (mTasklist.contains(it.
value()->parentTask)) {
207 it = mRunningTasks.
erase(it);
210 it = cancelRunningTask(it);
217 while (!mPendingResults.isEmpty()) {
218 ResourceTask *finishedTask = mPendingResults.first();
219 mPendingResults.remove(0);
220 qCDebug(AKONADISERVER_SEARCH_LOG) <<
"Pending results from" << finishedTask->resourceId <<
"for collection" << finishedTask->collectionId
221 <<
"for search" << finishedTask->parentTask->id <<
"available!";
222 SearchTask *parentTask = finishedTask->parentTask;
223 QMutexLocker locker(&parentTask->sharedLock);
225 parentTask->pendingResults += finishedTask->results;
226 parentTask->complete = allResourceTasksCompleted(parentTask);
227 parentTask->notifier.
wakeAll();
234 for (; it != mRunningTasks.
end();) {
236 if (now -
task->timestamp > 60 * 1000) {
238 qCDebug(AKONADISERVER_SEARCH_LOG) <<
"Resource task" <<
task->resourceId <<
"for search" <<
task->parentTask->id <<
"timed out!";
239 it = cancelRunningTask(it);
245 if (!mTasklist.isEmpty()) {
246 SearchTask *
task = mTasklist.first();
247 qCDebug(AKONADISERVER_SEARCH_LOG) <<
"Search task" <<
task->id <<
"available!";
248 if (
task->queries.isEmpty()) {
249 qCDebug(AKONADISERVER_SEARCH_LOG) <<
"nothing to do for task";
250 QMutexLocker locker(&
task->sharedLock);
252 task->complete =
true;
253 task->notifier.wakeAll();
258 for (
auto it =
task->queries.begin(); it !=
task->queries.end();) {
259 if (!mRunningTasks.contains(it->
first)) {
260 const auto &[resource, colId] = *it;
261 qCDebug(AKONADISERVER_SEARCH_LOG) <<
"\t Sending query for collection" << colId <<
"to resource" << resource;
262 auto rTask =
new ResourceTask;
263 rTask->resourceId = resource;
264 rTask->collectionId = colId;
265 rTask->parentTask =
task;
267 mRunningTasks.insert(resource, rTask);
269 mInstancesLock.lock();
270 AgentSearchInstance *instance = mInstances.value(resource);
272 mInstancesLock.unlock();
277 instance->search(
task->id,
task->query, colId);
278 mInstancesLock.unlock();
280 task->sharedLock.lock();
281 it =
task->queries.erase(it);
282 task->sharedLock.unlock();
288 if (
task->queries.isEmpty()) {
289 qCDebug(AKONADISERVER_SEARCH_LOG) <<
"All queries from task" <<
task->id <<
"dispatched!";
295 if (mRunningTasks.isEmpty()) {
302#include "moc_searchtaskmanager.cpp"
An Connection represents one connection of a client to the server.
@ InnerJoin
NOTE: only supported for UPDATE and SELECT queries.
Helper integration between Akonadi and Qt.
KSERVICE_EXPORT KService::List query(FilterFunc filterFunc)
KIOCORE_EXPORT QStringList list(const QString &fileClass)
const QList< QKeySequence > & end()
qint64 currentMSecsSinceEpoch()
QDBusConnection sessionBus()
bool isEmpty() const const
void reserve(qsizetype size)
T value(qsizetype i) const const
iterator erase(const_iterator first, const_iterator last)
T value(const Key &key, const T &defaultValue) const const
qsizetype count() const const
QString & insert(qsizetype position, QChar ch)
QTaskBuilder< Task > task(Task &&task)