12#include "private/protocol_p.h"
13#include "protocolhelper_p.h"
14#include "servermanager.h"
15#include "servermanager_p.h"
16#include "sessionthread_p.h"
18#include "akonadicore_debug.h"
20#include <KLocalizedString>
22#include <QCoreApplication>
24#include <QRandomGenerator>
26#include <QThreadStorage>
29#include <QApplication>
30#include <QHostAddress>
35#define PIPELINE_LENGTH 0
39using namespace std::chrono_literals;
42void SessionPrivate::startNext()
49void SessionPrivate::reconnect()
52 connection =
new Connection(Connection::CommandConnection, sessionId, &mCommandBuffer);
53 sessionThread()->addConnection(connection);
57 &Connection::socketDisconnected,
65 &Connection::socketError,
73 connection->reconnect();
76void SessionPrivate::socketError(
const QString &error)
78 qCWarning(AKONADICORE_LOG) <<
"Socket error occurred:" <<
error;
82void SessionPrivate::socketDisconnected()
85 currentJob->d_ptr->lostConnection();
90bool SessionPrivate::handleCommands()
92 CommandBufferLocker lock(&mCommandBuffer);
93 CommandBufferNotifyBlocker notify(&mCommandBuffer);
94 while (!mCommandBuffer.isEmpty()) {
95 const auto command = mCommandBuffer.dequeue();
97 const auto cmd = command.command;
98 const auto tag = command.tag;
101 if (cmd->type() == Protocol::Command::Hello) {
102 const auto &hello = Protocol::cmdCast<Protocol::HelloResponse>(cmd);
103 if (hello.isError()) {
104 qCWarning(AKONADICORE_LOG) <<
"Error when establishing connection with Akonadi server:" << hello.errorMessage();
105 connection->closeConnection();
110 qCDebug(AKONADICORE_LOG) <<
"Connected to" << hello.serverName() <<
", using protocol version" << hello.protocolVersion();
111 qCDebug(AKONADICORE_LOG) <<
"Server generation:" << hello.generation();
112 qCDebug(AKONADICORE_LOG) <<
"Server says:" << hello.message();
115 protocolVersion = hello.protocolVersion();
116 Internal::setServerProtocolVersion(protocolVersion);
117 Internal::setGeneration(hello.generation());
119 sendCommand(nextTag(), Protocol::LoginCommandPtr::create(sessionId));
120 }
else if (cmd->type() == Protocol::Command::Login) {
121 const auto &login = Protocol::cmdCast<Protocol::LoginResponse>(cmd);
122 if (login.isError()) {
123 qCWarning(AKONADICORE_LOG) <<
"Unable to login to Akonadi server:" << login.errorMessage();
124 connection->closeConnection();
133 }
else if (currentJob) {
134 currentJob->d_ptr->handleResponse(tag, cmd);
143bool SessionPrivate::canPipelineNext()
145 if (queue.isEmpty() || pipeline.count() >= PIPELINE_LENGTH) {
148 if (pipeline.isEmpty() && currentJob) {
149 return currentJob->d_ptr->mWriteFinished;
151 if (!pipeline.isEmpty()) {
152 return pipeline.last()->d_ptr->mWriteFinished;
157void SessionPrivate::doStartNext()
159 if (!connected || (queue.isEmpty() && pipeline.isEmpty())) {
162 if (canPipelineNext()) {
164 pipeline.enqueue(nextJob);
171 if (!pipeline.isEmpty()) {
172 currentJob = pipeline.dequeue();
174 currentJob = queue.dequeue();
175 startJob(currentJob);
179void SessionPrivate::startJob(
Job *job)
181 if (protocolVersion != Protocol::version()) {
183 if (protocolVersion < Protocol::version()) {
185 i18n(
"Protocol version mismatch. Server version is older (%1) than ours (%2). "
186 "If you updated your system recently please restart the Akonadi server.",
188 Protocol::version()));
189 qCWarning(AKONADICORE_LOG) <<
"Protocol version mismatch. Server version is older (" << protocolVersion <<
") than ours (" << Protocol::version()
191 "If you updated your system recently please restart the Akonadi server.";
194 i18n(
"Protocol version mismatch. Server version is newer (%1) than ours (%2). "
195 "If you updated your system recently please restart all KDE PIM applications.",
197 Protocol::version()));
198 qCWarning(AKONADICORE_LOG) <<
"Protocol version mismatch. Server version is newer (" << protocolVersion <<
") than ours (" << Protocol::version()
200 "If you updated your system recently please restart all KDE PIM applications.";
204 job->d_ptr->startQueued();
208void SessionPrivate::endJob(
Job *job)
213void SessionPrivate::jobDone(
KJob *job)
217 if (job == currentJob) {
218 if (pipeline.isEmpty()) {
220 currentJob =
nullptr;
222 currentJob = pipeline.dequeue();
235 Q_ASSERT((job == currentJob && pipeline.isEmpty()) || (job = pipeline.last()));
241void SessionPrivate::jobDestroyed(
QObject *job)
244 jobDone(
static_cast<KJob *
>(job));
247void SessionPrivate::addJob(
Job *job)
254 jobWriteFinished(job);
262void SessionPrivate::publishOtherJobs(
Job *thanThisJob)
265 for (
const auto &job : std::as_const(queue)) {
266 if (job != thanThisJob) {
267 job->d_ptr->publishJob();
272 qCDebug(AKONADICORE_LOG) <<
"published" << count <<
"pending jobs to the job tracker";
274 if (currentJob && currentJob != thanThisJob) {
275 currentJob->d_ptr->signalStartedToJobTracker();
279qint64 SessionPrivate::nextTag()
284void SessionPrivate::sendCommand(qint64 tag,
const Protocol::CommandPtr &command)
286 connection->sendCommand(tag, command);
299 job->
kill(KJob::EmitResult);
302 sessionThread()->destroyConnection(connection);
303 connection =
nullptr;
307void SessionPrivate::itemRevisionChanged(
Akonadi::Item::Id itemId,
int oldRevision,
int newRevision)
311 for (
Job *job : std::as_const(queue)) {
312 job->d_ptr->updateItemRevision(itemId, oldRevision, newRevision);
318SessionPrivate::SessionPrivate(
Session *parent)
320 , mSessionThread(new SessionThread)
321 , connection(nullptr)
323 , mCommandBuffer(parent,
"handleCommands")
324 , currentJob(nullptr)
330 socketDisconnected();
331 connection =
nullptr;
333 delete mSessionThread;
334 mSessionThread =
nullptr;
338SessionPrivate::~SessionPrivate()
341 delete mSessionThread;
344void SessionPrivate::init(
const QByteArray &
id)
352 qCDebug(AKONADICORE_LOG) <<
"Initializing session with ID" << id;
362 serverStateChanged(state);
367void SessionPrivate::forceReconnect()
372 connection->forceReconnect();
384 , d(new SessionPrivate(this))
409void SessionPrivate::createDefaultSession(
const QByteArray &sessionId)
411 Q_ASSERT_X(!sessionId.
isEmpty(),
"SessionPrivate::createDefaultSession",
"You tried to create a default session with empty session id!");
412 Q_ASSERT_X(!instances()->hasLocalData(),
"SessionPrivate::createDefaultSession",
"You tried to create a default session twice!");
414 auto session =
new Session(sessionId);
415 setDefaultSession(session);
418void SessionPrivate::setDefaultSession(
Session *session)
420 instances()->setLocalData({session});
422 instances()->setLocalData({});
428 if (!instances()->hasLocalData()) {
430 SessionPrivate::setDefaultSession(session);
432 return instances()->localData().data();
440void SessionPrivate::clear(
bool forceReconnect)
444 job->
kill(KJob::EmitResult);
449 job->d_ptr->mStarted =
false;
450 job->
kill(KJob::EmitResult);
454 currentJob->d_ptr->mStarted =
false;
455 currentJob->kill(KJob::EmitResult);
458 if (forceReconnect) {
459 this->forceReconnect();
463#include "moc_session.cpp"
qint64 Id
Describes the unique id type.
Base class for all actions in the Akonadi storage.
@ ProtocolVersionMismatch
The server protocol version is too old or too new.
@ ConnectionFailed
The connection to the Akonadi server failed.
void writeFinished(Akonadi::Job *job)
This signal is emitted if the job has finished all write operations, ie.
static State state()
Returns the state of the server.
static bool start()
Starts the server.
State
Enum for the various states the server can be in.
@ Running
Server is running and operational.
@ Broken
Server is not operational and an error has been detected.
@ NotRunning
Server is not running, could be no one started it yet or it failed to start.
@ Stopping
Server is shutting down.
void stateChanged(Akonadi::ServerManager::State state)
Emitted whenever the server state changes.
static ServerManager * self()
Returns the singleton instance of this class, for connecting to its signals.
A communication session with the Akonadi storage.
~Session() override
Destroys the session.
void clear()
Stops all jobs queued for execution.
static Session * defaultSession()
Returns the default session for this thread.
Session(const QByteArray &sessionId=QByteArray(), QObject *parent=nullptr)
Creates a new session.
QByteArray sessionId() const
Returns the session identifier.
void reconnected()
This signal is emitted whenever the session has been reconnected to the server (e....
void setErrorText(const QString &errorText)
void setError(int errorCode)
bool kill(KJob::KillVerbosity verbosity=KJob::Quietly)
QString i18n(const char *text, const TYPE &arg...)
Helper integration between Akonadi and Qt.
void error(QWidget *parent, const QString &text, const QString &title, const KGuiItem &buttonOk, Options options=Notify)
KLEO_EXPORT std::unique_ptr< GpgME::DefaultAssuanTransaction > sendCommand(std::shared_ptr< GpgME::Context > &assuanContext, const std::string &command, GpgME::Error &err)
bool isEmpty() const const
QByteArray number(double n, char format, int precision)
QCoreApplication * instance()
QMetaObject::Connection connect(const QObject *sender, PointerToMemberFunction signal, Functor functor)
void destroyed(QObject *obj)
bool disconnect(const QMetaObject::Connection &connection)
QObject * parent() const const
QRandomGenerator * global()