Akonadi

core/session.cpp
1/*
2 SPDX-FileCopyrightText: 2007 Volker Krause <vkrause@kde.org>
3
4 SPDX-License-Identifier: LGPL-2.0-or-later
5*/
6
7#include "session.h"
8#include "session_p.h"
9
10#include "job.h"
11#include "job_p.h"
12#include "private/protocol_p.h"
13#include "protocolhelper_p.h"
14#include "servermanager.h"
15#include "servermanager_p.h"
16#include "sessionthread_p.h"
17
18#include "akonadicore_debug.h"
19
20#include <KLocalizedString>
21
22#include <QCoreApplication>
23#include <QPointer>
24#include <QRandomGenerator>
25#include <QThread>
26#include <QThreadStorage>
27#include <QTimer>
28
29#include <QApplication>
30#include <QHostAddress>
31
32// ### FIXME pipelining got broken by switching result emission in JobPrivate::handleResponse to delayed emission
33// in order to work around exec() deadlocks. As a result of that Session knows to late about a finished job and still
34// sends responses for the next one to the already finished one
35#define PIPELINE_LENGTH 0
36// #define PIPELINE_LENGTH 2
37
38using namespace Akonadi;
39using namespace std::chrono_literals;
40/// @cond PRIVATE
41
42void SessionPrivate::startNext()
43{
44 QTimer::singleShot(0, mParent, [this]() {
45 doStartNext();
46 });
47}
48
49void SessionPrivate::reconnect()
50{
51 if (!connection) {
52 connection = new Connection(Connection::CommandConnection, sessionId, &mCommandBuffer);
53 sessionThread()->addConnection(connection);
54 mParent->connect(connection, &Connection::reconnected, mParent, &Session::reconnected, Qt::QueuedConnection);
55 mParent->connect(
56 connection,
57 &Connection::socketDisconnected,
58 mParent,
59 [this]() {
60 socketDisconnected();
61 },
63 mParent->connect(
64 connection,
65 &Connection::socketError,
66 mParent,
67 [this](const QString &error) {
68 socketError(error);
69 },
71 }
72
73 connection->reconnect();
74}
75
76void SessionPrivate::socketError(const QString &error)
77{
78 qCWarning(AKONADICORE_LOG) << "Socket error occurred:" << error;
79 socketDisconnected();
80}
81
82void SessionPrivate::socketDisconnected()
83{
84 if (currentJob) {
85 currentJob->d_ptr->lostConnection();
86 }
87 connected = false;
88}
89
90bool SessionPrivate::handleCommands()
91{
92 CommandBufferLocker lock(&mCommandBuffer);
93 CommandBufferNotifyBlocker notify(&mCommandBuffer);
94 while (!mCommandBuffer.isEmpty()) {
95 const auto command = mCommandBuffer.dequeue();
96 lock.unlock();
97 const auto cmd = command.command;
98 const auto tag = command.tag;
99
100 // Handle Hello response -> send Login
101 if (cmd->type() == Protocol::Command::Hello) {
102 const auto &hello = Protocol::cmdCast<Protocol::HelloResponse>(cmd);
103 if (hello.isError()) {
104 qCWarning(AKONADICORE_LOG) << "Error when establishing connection with Akonadi server:" << hello.errorMessage();
105 connection->closeConnection();
106 QTimer::singleShot(1s, connection, &Connection::reconnect);
107 return false;
108 }
109
110 qCDebug(AKONADICORE_LOG) << "Connected to" << hello.serverName() << ", using protocol version" << hello.protocolVersion();
111 qCDebug(AKONADICORE_LOG) << "Server generation:" << hello.generation();
112 qCDebug(AKONADICORE_LOG) << "Server says:" << hello.message();
113 // Version mismatch is handled in SessionPrivate::startJob() so that
114 // we can report the error out via KJob API
115 protocolVersion = hello.protocolVersion();
116 Internal::setServerProtocolVersion(protocolVersion);
117 Internal::setGeneration(hello.generation());
118
119 sendCommand(nextTag(), Protocol::LoginCommandPtr::create(sessionId));
120 } else if (cmd->type() == Protocol::Command::Login) {
121 const auto &login = Protocol::cmdCast<Protocol::LoginResponse>(cmd);
122 if (login.isError()) {
123 qCWarning(AKONADICORE_LOG) << "Unable to login to Akonadi server:" << login.errorMessage();
124 connection->closeConnection();
125 QTimer::singleShot(1s, mParent, [this]() {
126 reconnect();
127 });
128 return false;
129 }
130
131 connected = true;
132 startNext();
133 } else if (currentJob) {
134 currentJob->d_ptr->handleResponse(tag, cmd);
135 }
136
137 lock.relock();
138 }
139
140 return true;
141}
142
143bool SessionPrivate::canPipelineNext()
144{
145 if (queue.isEmpty() || pipeline.count() >= PIPELINE_LENGTH) {
146 return false;
147 }
148 if (pipeline.isEmpty() && currentJob) {
149 return currentJob->d_ptr->mWriteFinished;
150 }
151 if (!pipeline.isEmpty()) {
152 return pipeline.last()->d_ptr->mWriteFinished;
153 }
154 return false;
155}
156
157void SessionPrivate::doStartNext()
158{
159 if (!connected || (queue.isEmpty() && pipeline.isEmpty())) {
160 return;
161 }
162 if (canPipelineNext()) {
163 Akonadi::Job *nextJob = queue.dequeue();
164 pipeline.enqueue(nextJob);
165 startJob(nextJob);
166 }
167 if (jobRunning) {
168 return;
169 }
170 jobRunning = true;
171 if (!pipeline.isEmpty()) {
172 currentJob = pipeline.dequeue();
173 } else {
174 currentJob = queue.dequeue();
175 startJob(currentJob);
176 }
177}
178
179void SessionPrivate::startJob(Job *job)
180{
181 if (protocolVersion != Protocol::version()) {
183 if (protocolVersion < Protocol::version()) {
184 job->setErrorText(
185 i18n("Protocol version mismatch. Server version is older (%1) than ours (%2). "
186 "If you updated your system recently please restart the Akonadi server.",
187 protocolVersion,
188 Protocol::version()));
189 qCWarning(AKONADICORE_LOG) << "Protocol version mismatch. Server version is older (" << protocolVersion << ") than ours (" << Protocol::version()
190 << "). "
191 "If you updated your system recently please restart the Akonadi server.";
192 } else {
193 job->setErrorText(
194 i18n("Protocol version mismatch. Server version is newer (%1) than ours (%2). "
195 "If you updated your system recently please restart all KDE PIM applications.",
196 protocolVersion,
197 Protocol::version()));
198 qCWarning(AKONADICORE_LOG) << "Protocol version mismatch. Server version is newer (" << protocolVersion << ") than ours (" << Protocol::version()
199 << "). "
200 "If you updated your system recently please restart all KDE PIM applications.";
201 }
202 job->emitResult();
203 } else {
204 job->d_ptr->startQueued();
205 }
206}
207
208void SessionPrivate::endJob(Job *job)
209{
210 job->emitResult();
211}
212
213void SessionPrivate::jobDone(KJob *job)
214{
215 // ### careful, this method can be called from the QObject dtor of job (see jobDestroyed() below)
216 // so don't call any methods on job itself
217 if (job == currentJob) {
218 if (pipeline.isEmpty()) {
219 jobRunning = false;
220 currentJob = nullptr;
221 } else {
222 currentJob = pipeline.dequeue();
223 }
224 startNext();
225 } else {
226 // non-current job finished, likely canceled while still in the queue
227 queue.removeAll(static_cast<Akonadi::Job *>(job));
228 // ### likely not enough to really cancel already running jobs
229 pipeline.removeAll(static_cast<Akonadi::Job *>(job));
230 }
231}
232
233void SessionPrivate::jobWriteFinished(Akonadi::Job *job)
234{
235 Q_ASSERT((job == currentJob && pipeline.isEmpty()) || (job = pipeline.last()));
236 Q_UNUSED(job)
237
238 startNext();
239}
240
241void SessionPrivate::jobDestroyed(QObject *job)
242{
243 // careful, accessing non-QObject methods of job will fail here already
244 jobDone(static_cast<KJob *>(job));
245}
246
247void SessionPrivate::addJob(Job *job)
248{
249 queue.append(job);
250 QObject::connect(job, &KJob::result, mParent, [this](KJob *job) {
251 jobDone(job);
252 });
253 QObject::connect(job, &Job::writeFinished, mParent, [this](Job *job) {
254 jobWriteFinished(job);
255 });
256 QObject::connect(job, &QObject::destroyed, mParent, [this](QObject *o) {
257 jobDestroyed(o);
258 });
259 startNext();
260}
261
262void SessionPrivate::publishOtherJobs(Job *thanThisJob)
263{
264 int count = 0;
265 for (const auto &job : std::as_const(queue)) {
266 if (job != thanThisJob) {
267 job->d_ptr->publishJob();
268 ++count;
269 }
270 }
271 if (count > 0) {
272 qCDebug(AKONADICORE_LOG) << "published" << count << "pending jobs to the job tracker";
273 }
274 if (currentJob && currentJob != thanThisJob) {
275 currentJob->d_ptr->signalStartedToJobTracker();
276 }
277}
278
279qint64 SessionPrivate::nextTag()
280{
281 return theNextTag++;
282}
283
284void SessionPrivate::sendCommand(qint64 tag, const Protocol::CommandPtr &command)
285{
286 connection->sendCommand(tag, command);
287}
288
289void SessionPrivate::serverStateChanged(ServerManager::State state)
290{
291 if (state == ServerManager::Running && !connected) {
292 reconnect();
293 } else if (!connected && state == ServerManager::Broken) {
294 // If the server is broken, cancel all pending jobs, otherwise they will be
295 // blocked forever and applications waiting for them to finish would be stuck
296 auto q = queue;
297 for (Job *job : q) {
299 job->kill(KJob::EmitResult);
300 }
301 } else if (state == ServerManager::Stopping) {
302 sessionThread()->destroyConnection(connection);
303 connection = nullptr;
304 }
305}
306
307void SessionPrivate::itemRevisionChanged(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
308{
309 // only deal with the queue, for the guys in the pipeline it's too late already anyway
310 // and they shouldn't have gotten there if they depend on a preceding job anyway.
311 for (Job *job : std::as_const(queue)) {
312 job->d_ptr->updateItemRevision(itemId, oldRevision, newRevision);
313 }
314}
315
316/// @endcond
317
318SessionPrivate::SessionPrivate(Session *parent)
319 : mParent(parent)
320 , mSessionThread(new SessionThread)
321 , connection(nullptr)
322 , protocolVersion(0)
323 , mCommandBuffer(parent, "handleCommands")
324 , currentJob(nullptr)
325{
326 // Shutdown the thread before QApplication event loop quits - the
327 // thread()->wait() mechanism in Connection dtor crashes sometimes
328 // when called from QApplication destructor
329 connThreadCleanUp = QObject::connect(qApp, &QCoreApplication::aboutToQuit, qApp, [this]() {
330 socketDisconnected();
331 connection = nullptr;
332
333 delete mSessionThread;
334 mSessionThread = nullptr;
335 });
336}
337
338SessionPrivate::~SessionPrivate()
339{
340 QObject::disconnect(connThreadCleanUp);
341 delete mSessionThread;
342}
343
344void SessionPrivate::init(const QByteArray &id)
345{
346 if (!id.isEmpty()) {
347 sessionId = id;
348 } else {
349 sessionId = QCoreApplication::instance()->applicationName().toUtf8() + '-' + QByteArray::number(QRandomGenerator::global()->generate());
350 }
351
352 qCDebug(AKONADICORE_LOG) << "Initializing session with ID" << id;
353
354 connected = false;
355 theNextTag = 2;
356 jobRunning = false;
357
360 }
362 serverStateChanged(state);
363 });
364 reconnect();
365}
366
367void SessionPrivate::forceReconnect()
368{
369 jobRunning = false;
370 connected = false;
371 if (connection) {
372 connection->forceReconnect();
373 }
375 mParent,
376 [this]() {
377 reconnect();
378 },
380}
381
383 : QObject(parent)
384 , d(new SessionPrivate(this))
385{
386 d->init(sessionId);
387}
388
389Session::Session(SessionPrivate *dd, const QByteArray &sessionId, QObject *parent)
390 : QObject(parent)
391 , d(dd)
392{
393 d->mParent = this;
394 d->init(sessionId);
395}
396
398{
399 d->clear(false);
400}
401
403{
404 return d->sessionId;
405}
406
407Q_GLOBAL_STATIC(QThreadStorage<QPointer<Session>>, instances) // NOLINT(readability-redundant-member-init)
408
409void SessionPrivate::createDefaultSession(const QByteArray &sessionId)
410{
411 Q_ASSERT_X(!sessionId.isEmpty(), "SessionPrivate::createDefaultSession", "You tried to create a default session with empty session id!");
412 Q_ASSERT_X(!instances()->hasLocalData(), "SessionPrivate::createDefaultSession", "You tried to create a default session twice!");
413
414 auto session = new Session(sessionId);
415 setDefaultSession(session);
416}
417
418void SessionPrivate::setDefaultSession(Session *session)
419{
420 instances()->setLocalData({session});
422 instances()->setLocalData({});
423 });
424}
425
427{
428 if (!instances()->hasLocalData()) {
429 auto session = new Session();
430 SessionPrivate::setDefaultSession(session);
431 }
432 return instances()->localData().data();
433}
434
436{
437 d->clear(true);
438}
439
440void SessionPrivate::clear(bool forceReconnect)
441{
442 auto q = queue;
443 for (Job *job : q) {
444 job->kill(KJob::EmitResult); // safe, not started yet
445 }
446 queue.clear();
447 auto p = pipeline;
448 for (Job *job : p) {
449 job->d_ptr->mStarted = false; // avoid killing/reconnect loops
450 job->kill(KJob::EmitResult);
451 }
452 pipeline.clear();
453 if (currentJob) {
454 currentJob->d_ptr->mStarted = false; // avoid killing/reconnect loops
455 currentJob->kill(KJob::EmitResult);
456 }
457
458 if (forceReconnect) {
459 this->forceReconnect();
460 }
461}
462
463#include "moc_session.cpp"
qint64 Id
Describes the unique id type.
Definition item.h:105
Base class for all actions in the Akonadi storage.
Definition job.h:81
@ ProtocolVersionMismatch
The server protocol version is too old or too new.
Definition job.h:100
@ ConnectionFailed
The connection to the Akonadi server failed.
Definition job.h:99
void writeFinished(Akonadi::Job *job)
This signal is emitted if the job has finished all write operations, ie.
static State state()
Returns the state of the server.
static bool start()
Starts the server.
State
Enum for the various states the server can be in.
@ Running
Server is running and operational.
@ Broken
Server is not operational and an error has been detected.
@ NotRunning
Server is not running, could be no one started it yet or it failed to start.
@ Stopping
Server is shutting down.
void stateChanged(Akonadi::ServerManager::State state)
Emitted whenever the server state changes.
static ServerManager * self()
Returns the singleton instance of this class, for connecting to its signals.
A communication session with the Akonadi storage.
~Session() override
Destroys the session.
void clear()
Stops all jobs queued for execution.
static Session * defaultSession()
Returns the default session for this thread.
Session(const QByteArray &sessionId=QByteArray(), QObject *parent=nullptr)
Creates a new session.
QByteArray sessionId() const
Returns the session identifier.
void reconnected()
This signal is emitted whenever the session has been reconnected to the server (e....
void setErrorText(const QString &errorText)
void emitResult()
void result(KJob *job)
void setError(int errorCode)
bool kill(KJob::KillVerbosity verbosity=KJob::Quietly)
ASAP CLI session.
QString i18n(const char *text, const TYPE &arg...)
Helper integration between Akonadi and Qt.
void error(QWidget *parent, const QString &text, const QString &title, const KGuiItem &buttonOk, Options options=Notify)
KLEO_EXPORT std::unique_ptr< GpgME::DefaultAssuanTransaction > sendCommand(std::shared_ptr< GpgME::Context > &assuanContext, const std::string &command, GpgME::Error &err)
bool isEmpty() const const
QByteArray number(double n, char format, int precision)
QCoreApplication * instance()
bool invokeMethod(QObject *context, Functor &&function, FunctorReturnType *ret)
QObject(QObject *parent)
QMetaObject::Connection connect(const QObject *sender, PointerToMemberFunction signal, Functor functor)
void destroyed(QObject *obj)
bool disconnect(const QMetaObject::Connection &connection)
QObject * parent() const const
QRandomGenerator * global()
QueuedConnection
This file is part of the KDE documentation.
Documentation copyright © 1996-2025 The KDE developers.
Generated on Fri Jan 24 2025 11:49:57 by doxygen 1.13.2 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.