Akonadi

notificationmanager.cpp
1/*
2 SPDX-FileCopyrightText: 2006-2007 Volker Krause <vkrause@kde.org>
3 SPDX-FileCopyrightText: 2010 Michael Jansen <kde@michael-jansen>
4
5 SPDX-License-Identifier: LGPL-2.0-or-later
6*/
7
8#include "notificationmanager.h"
9#include "aggregatedfetchscope.h"
10#include "akonadiserver_debug.h"
11#include "handlerhelper.h"
12#include "notificationsubscriber.h"
13#include "storage/collectionstatistics.h"
14#include "storage/notificationcollector.h"
15#include "tracer.h"
16
17#include "private/scope_p.h"
18#include "private/standarddirs_p.h"
19#include "shared/akranges.h"
20
21#include <QDateTime>
22#include <QSettings>
23#include <QThreadPool>
24#include <QTimer>
25
26using namespace Akonadi;
27using namespace Akonadi::Server;
28using namespace AkRanges;
29
30NotificationManager::NotificationManager(StartMode startMode)
31 : AkThread(QStringLiteral("NotificationManager"), startMode)
32 , mTimer(nullptr)
33 , mNotifyThreadPool(nullptr)
34 , mDebugNotifications(0)
35{
36}
37
38NotificationManager::~NotificationManager()
39{
40 quitThread();
41}
42
43void NotificationManager::init()
44{
45 AkThread::init();
46
47 const QString serverConfigFile = StandardDirs::serverConfigFile(StandardDirs::ReadWrite);
48 QSettings settings(serverConfigFile, QSettings::IniFormat);
49
50 mTimer = new QTimer(this);
51 mTimer->setInterval(settings.value(QStringLiteral("NotificationManager/Interval"), 50).toInt());
52 mTimer->setSingleShot(true);
53 connect(mTimer, &QTimer::timeout, this, &NotificationManager::emitPendingNotifications);
54
55 mNotifyThreadPool = new QThreadPool(this);
56 mNotifyThreadPool->setMaxThreadCount(5);
57
58 mCollectionFetchScope = new AggregatedCollectionFetchScope();
59 mItemFetchScope = new AggregatedItemFetchScope();
60 mTagFetchScope = new AggregatedTagFetchScope();
61}
62
63void NotificationManager::quit()
64{
65 mQuitting = true;
66
67 mTimer->stop();
68 delete mTimer;
69
70 mNotifyThreadPool->clear();
71 mNotifyThreadPool->waitForDone();
72 delete mNotifyThreadPool;
73
74 qDeleteAll(mSubscribers);
75
76 delete mCollectionFetchScope;
77 delete mItemFetchScope;
78 delete mTagFetchScope;
79
80 AkThread::quit();
81}
82
83void NotificationManager::registerConnection(quintptr socketDescriptor)
84{
85 Q_ASSERT(thread() == QThread::currentThread());
86
87 auto subscriber = new NotificationSubscriber(this, socketDescriptor);
88 qCInfo(AKONADISERVER_LOG) << "New notification connection (registered as" << subscriber << ")";
89 connect(subscriber, &NotificationSubscriber::notificationDebuggingChanged, this, [this](bool enabled) {
90 if (enabled) {
91 ++mDebugNotifications;
92 } else {
93 --mDebugNotifications;
94 }
95 Q_ASSERT(mDebugNotifications >= 0);
96 Q_ASSERT(mDebugNotifications <= mSubscribers.count());
97 });
98
99 mSubscribers.push_back(subscriber);
100}
101void NotificationManager::forgetSubscriber(NotificationSubscriber *subscriber)
102{
103 Q_ASSERT(QThread::currentThread() == thread());
104 mSubscribers.removeAll(subscriber);
105}
106
107void NotificationManager::slotNotify(const Protocol::ChangeNotificationList &msgs)
108{
109 Q_ASSERT(QThread::currentThread() == thread());
110 for (const auto &msg : msgs) {
111 switch (msg->type()) {
112 case Protocol::Command::CollectionChangeNotification:
113 Protocol::CollectionChangeNotification::appendAndCompress(mNotifications, msg);
114 continue;
115 case Protocol::Command::ItemChangeNotification:
116 case Protocol::Command::TagChangeNotification:
117 case Protocol::Command::SubscriptionChangeNotification:
118 case Protocol::Command::DebugChangeNotification:
119 mNotifications.push_back(msg);
120 continue;
121
122 default:
123 Q_ASSERT_X(false, "slotNotify", "Invalid notification type!");
124 continue;
125 }
126 }
127
128 if (!mTimer->isActive()) {
129 mTimer->start();
130 }
131}
132
133class NotifyRunnable : public QRunnable
134{
135public:
136 explicit NotifyRunnable(NotificationSubscriber *subscriber, const Protocol::ChangeNotificationList &notifications)
137 : mSubscriber(subscriber)
138 , mNotifications(notifications)
139 {
140 }
141
142 ~NotifyRunnable() override = default;
143
144 void run() override
145 {
146 for (const auto &ntf : std::as_const(mNotifications)) {
147 if (mSubscriber) {
148 mSubscriber->notify(ntf);
149 } else {
150 break;
151 }
152 }
153 }
154
155private:
156 Q_DISABLE_COPY_MOVE(NotifyRunnable)
157
158 QPointer<NotificationSubscriber> mSubscriber;
159 Protocol::ChangeNotificationList mNotifications;
160};
161
162void NotificationManager::emitPendingNotifications()
163{
164 Q_ASSERT(QThread::currentThread() == thread());
165
166 if (mNotifications.isEmpty()) {
167 return;
168 }
169
170 if (mDebugNotifications == 0) {
171 mSubscribers | Views::filter(IsNotNull) | Actions::forEach([this](const auto &subscriber) {
172 mNotifyThreadPool->start(new NotifyRunnable(subscriber, mNotifications));
173 });
174 } else {
175 // When debugging notification we have to use a non-threaded approach
176 // so that we can work with return value of notify()
177 for (const auto &notification : std::as_const(mNotifications)) {
178 QList<QByteArray> listeners;
179 for (NotificationSubscriber *subscriber : std::as_const(mSubscribers)) {
180 if (subscriber && subscriber->notify(notification)) {
181 listeners.push_back(subscriber->subscriber());
182 }
183 }
184
185 emitDebugNotification(notification, listeners);
186 }
187 }
188
189 mNotifications.clear();
190}
191
192void NotificationManager::emitDebugNotification(const Protocol::ChangeNotificationPtr &ntf, const QList<QByteArray> &listeners)
193{
194 auto debugNtf = Protocol::DebugChangeNotificationPtr::create();
195 debugNtf->setNotification(ntf);
196 debugNtf->setListeners(listeners);
197 debugNtf->setTimestamp(QDateTime::currentMSecsSinceEpoch());
198 mSubscribers | Views::filter(IsNotNull) | Actions::forEach([this, &debugNtf](const auto &subscriber) {
199 mNotifyThreadPool->start(new NotifyRunnable(subscriber, {debugNtf}));
200 });
201}
202
203#include "moc_notificationmanager.cpp"
Helper integration between Akonadi and Qt.
qint64 currentMSecsSinceEpoch()
void push_back(parameter_type value)
QMetaObject::Connection connect(const QObject *sender, PointerToMemberFunction signal, Functor functor)
QThread * thread() const const
QFuture< T > run(Function function,...)
QThread * currentThread()
void timeout()
This file is part of the KDE documentation.
Documentation copyright © 1996-2025 The KDE developers.
Generated on Fri Jan 24 2025 11:49:57 by doxygen 1.13.2 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.