Akonadi

notificationmanager.cpp
1/*
2 SPDX-FileCopyrightText: 2006-2007 Volker Krause <vkrause@kde.org>
3 SPDX-FileCopyrightText: 2010 Michael Jansen <kde@michael-jansen>
4
5 SPDX-License-Identifier: LGPL-2.0-or-later
6*/
7
8#include "notificationmanager.h"
9#include "aggregatedfetchscope.h"
10#include "akonadiserver_debug.h"
11#include "handlerhelper.h"
12#include "notificationsubscriber.h"
13#include "storage/collectionstatistics.h"
14#include "storage/notificationcollector.h"
15#include "tracer.h"
16
17#include "private/scope_p.h"
18#include "private/standarddirs_p.h"
19#include "shared/akranges.h"
20
21#include <QDateTime>
22#include <QSettings>
23#include <QThreadPool>
24#include <QTimer>
25
26using namespace Akonadi;
27using namespace Akonadi::Server;
28using namespace AkRanges;
29
30NotificationManager::NotificationManager(StartMode startMode)
31 : AkThread(QStringLiteral("NotificationManager"), startMode)
32 , mTimer(nullptr)
33 , mNotifyThreadPool(nullptr)
34 , mDebugNotifications(0)
35{
36}
37
38NotificationManager::~NotificationManager()
39{
40 quitThread();
41}
42
43void NotificationManager::init()
44{
45 AkThread::init();
46
47 const QString serverConfigFile = StandardDirs::serverConfigFile(StandardDirs::ReadWrite);
48 QSettings settings(serverConfigFile, QSettings::IniFormat);
49
50 mTimer = new QTimer(this);
51 mTimer->setInterval(settings.value(QStringLiteral("NotificationManager/Interval"), 50).toInt());
52 mTimer->setSingleShot(true);
53 connect(mTimer, &QTimer::timeout, this, &NotificationManager::emitPendingNotifications);
54
55 mNotifyThreadPool = new QThreadPool(this);
56 mNotifyThreadPool->setMaxThreadCount(5);
57
58 mCollectionFetchScope = new AggregatedCollectionFetchScope();
59 mItemFetchScope = new AggregatedItemFetchScope();
60 mTagFetchScope = new AggregatedTagFetchScope();
61}
62
63void NotificationManager::quit()
64{
65 mQuitting = true;
66
67 mTimer->stop();
68 delete mTimer;
69
70 mNotifyThreadPool->clear();
71 mNotifyThreadPool->waitForDone();
72 delete mNotifyThreadPool;
73
74 qDeleteAll(mSubscribers);
75
76 delete mCollectionFetchScope;
77 delete mItemFetchScope;
78 delete mTagFetchScope;
79
80 AkThread::quit();
81}
82
83void NotificationManager::registerConnection(quintptr socketDescriptor)
84{
85 Q_ASSERT(thread() == QThread::currentThread());
86
87 auto subscriber = new NotificationSubscriber(this, socketDescriptor);
88 qCInfo(AKONADISERVER_LOG) << "New notification connection (registered as" << subscriber << ")";
89 connect(subscriber, &NotificationSubscriber::notificationDebuggingChanged, this, [this](bool enabled) {
90 if (enabled) {
91 ++mDebugNotifications;
92 } else {
93 --mDebugNotifications;
94 }
95 Q_ASSERT(mDebugNotifications >= 0);
96 Q_ASSERT(mDebugNotifications <= mSubscribers.count());
97 });
98
99 mSubscribers.push_back(subscriber);
100}
101void NotificationManager::forgetSubscriber(NotificationSubscriber *subscriber)
102{
103 Q_ASSERT(QThread::currentThread() == thread());
104 mSubscribers.removeAll(subscriber);
105}
106
107void NotificationManager::slotNotify(const Protocol::ChangeNotificationList &msgs)
108{
109 Q_ASSERT(QThread::currentThread() == thread());
110 for (const auto &msg : msgs) {
111 switch (msg->type()) {
112 case Protocol::Command::CollectionChangeNotification:
113 Protocol::CollectionChangeNotification::appendAndCompress(mNotifications, msg);
114 continue;
115 case Protocol::Command::ItemChangeNotification:
116 case Protocol::Command::TagChangeNotification:
117 case Protocol::Command::SubscriptionChangeNotification:
118 case Protocol::Command::DebugChangeNotification:
119 mNotifications.push_back(msg);
120 continue;
121
122 default:
123 Q_ASSERT_X(false, "slotNotify", "Invalid notification type!");
124 continue;
125 }
126 }
127
128 if (!mTimer->isActive()) {
129 mTimer->start();
130 }
131}
132
133class NotifyRunnable : public QRunnable
134{
135public:
136 explicit NotifyRunnable(NotificationSubscriber *subscriber, const Protocol::ChangeNotificationList &notifications)
137 : mSubscriber(subscriber)
138 , mNotifications(notifications)
139 {
140 }
141
142 ~NotifyRunnable() override = default;
143
144 void run() override
145 {
146 for (const auto &ntf : std::as_const(mNotifications)) {
147 if (mSubscriber) {
148 mSubscriber->notify(ntf);
149 } else {
150 break;
151 }
152 }
153 }
154
155private:
156 Q_DISABLE_COPY_MOVE(NotifyRunnable)
157
159 Protocol::ChangeNotificationList mNotifications;
160};
161
162void NotificationManager::emitPendingNotifications()
163{
164 Q_ASSERT(QThread::currentThread() == thread());
165
166 if (mNotifications.isEmpty()) {
167 return;
168 }
169
170 if (mDebugNotifications == 0) {
171 mSubscribers | Views::filter(IsNotNull) | Actions::forEach([this](const auto &subscriber) {
172 mNotifyThreadPool->start(new NotifyRunnable(subscriber, mNotifications));
173 });
174 } else {
175 // When debugging notification we have to use a non-threaded approach
176 // so that we can work with return value of notify()
177 for (const auto &notification : std::as_const(mNotifications)) {
178 QList<QByteArray> listeners;
179 for (NotificationSubscriber *subscriber : std::as_const(mSubscribers)) {
180 if (subscriber && subscriber->notify(notification)) {
181 listeners.push_back(subscriber->subscriber());
182 }
183 }
184
185 emitDebugNotification(notification, listeners);
186 }
187 }
188
189 mNotifications.clear();
190}
191
192void NotificationManager::emitDebugNotification(const Protocol::ChangeNotificationPtr &ntf, const QList<QByteArray> &listeners)
193{
194 auto debugNtf = Protocol::DebugChangeNotificationPtr::create();
195 debugNtf->setNotification(ntf);
196 debugNtf->setListeners(listeners);
197 debugNtf->setTimestamp(QDateTime::currentMSecsSinceEpoch());
198 mSubscribers | Views::filter(IsNotNull) | Actions::forEach([this, &debugNtf](const auto &subscriber) {
199 mNotifyThreadPool->start(new NotifyRunnable(subscriber, {debugNtf}));
200 });
201}
202
203#include "moc_notificationmanager.cpp"
Helper integration between Akonadi and Qt.
qint64 currentMSecsSinceEpoch()
qsizetype count() const const
void push_back(parameter_type value)
qsizetype removeAll(const AT &t)
QMetaObject::Connection connect(const QObject *sender, PointerToMemberFunction signal, Functor functor)
QThread * thread() const const
QFuture< T > run(Function function,...)
QThread * currentThread()
void clear()
void setMaxThreadCount(int maxThreadCount)
void start(Callable &&callableToRun, int priority)
bool waitForDone(int msecs)
void setInterval(int msec)
bool isActive() const const
void setSingleShot(bool singleShot)
void start()
void stop()
void timeout()
This file is part of the KDE documentation.
Documentation copyright © 1996-2024 The KDE developers.
Generated on Sat Dec 21 2024 17:01:42 by doxygen 1.12.0 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.