8#include "notificationmanager.h"
9#include "aggregatedfetchscope.h"
10#include "akonadiserver_debug.h"
11#include "handlerhelper.h"
12#include "notificationsubscriber.h"
13#include "storage/collectionstatistics.h"
14#include "storage/notificationcollector.h"
17#include "private/scope_p.h"
18#include "private/standarddirs_p.h"
19#include "shared/akranges.h"
27using namespace Akonadi::Server;
28using namespace AkRanges;
30NotificationManager::NotificationManager(StartMode startMode)
31 : AkThread(QStringLiteral(
"NotificationManager"), startMode)
33 , mNotifyThreadPool(nullptr)
34 , mDebugNotifications(0)
38NotificationManager::~NotificationManager()
43void NotificationManager::init()
47 const QString serverConfigFile = StandardDirs::serverConfigFile(StandardDirs::ReadWrite);
50 mTimer =
new QTimer(
this);
51 mTimer->setInterval(settings.value(QStringLiteral(
"NotificationManager/Interval"), 50).toInt());
52 mTimer->setSingleShot(
true);
55 mNotifyThreadPool =
new QThreadPool(
this);
56 mNotifyThreadPool->setMaxThreadCount(5);
58 mCollectionFetchScope =
new AggregatedCollectionFetchScope();
59 mItemFetchScope =
new AggregatedItemFetchScope();
60 mTagFetchScope =
new AggregatedTagFetchScope();
63void NotificationManager::quit()
70 mNotifyThreadPool->clear();
71 mNotifyThreadPool->waitForDone();
72 delete mNotifyThreadPool;
74 qDeleteAll(mSubscribers);
76 delete mCollectionFetchScope;
77 delete mItemFetchScope;
78 delete mTagFetchScope;
83void NotificationManager::registerConnection(quintptr socketDescriptor)
87 auto subscriber =
new NotificationSubscriber(
this, socketDescriptor);
88 qCInfo(AKONADISERVER_LOG) <<
"New notification connection (registered as" << subscriber <<
")";
89 connect(subscriber, &NotificationSubscriber::notificationDebuggingChanged,
this, [
this](
bool enabled) {
91 ++mDebugNotifications;
93 --mDebugNotifications;
95 Q_ASSERT(mDebugNotifications >= 0);
96 Q_ASSERT(mDebugNotifications <= mSubscribers.count());
99 mSubscribers.push_back(subscriber);
101void NotificationManager::forgetSubscriber(NotificationSubscriber *subscriber)
104 mSubscribers.removeAll(subscriber);
107void NotificationManager::slotNotify(
const Protocol::ChangeNotificationList &msgs)
110 for (
const auto &msg : msgs) {
111 switch (msg->type()) {
112 case Protocol::Command::CollectionChangeNotification:
113 Protocol::CollectionChangeNotification::appendAndCompress(mNotifications, msg);
115 case Protocol::Command::ItemChangeNotification:
116 case Protocol::Command::TagChangeNotification:
117 case Protocol::Command::SubscriptionChangeNotification:
118 case Protocol::Command::DebugChangeNotification:
119 mNotifications.push_back(msg);
123 Q_ASSERT_X(
false,
"slotNotify",
"Invalid notification type!");
128 if (!mTimer->isActive()) {
133class NotifyRunnable :
public QRunnable
136 explicit NotifyRunnable(NotificationSubscriber *subscriber,
const Protocol::ChangeNotificationList ¬ifications)
137 : mSubscriber(subscriber)
138 , mNotifications(notifications)
142 ~NotifyRunnable()
override =
default;
146 for (
const auto &ntf : std::as_const(mNotifications)) {
148 mSubscriber->notify(ntf);
156 Q_DISABLE_COPY_MOVE(NotifyRunnable)
158 QPointer<NotificationSubscriber> mSubscriber;
159 Protocol::ChangeNotificationList mNotifications;
162void NotificationManager::emitPendingNotifications()
166 if (mNotifications.isEmpty()) {
170 if (mDebugNotifications == 0) {
171 mSubscribers | Views::filter(IsNotNull) | Actions::forEach([
this](
const auto &subscriber) {
172 mNotifyThreadPool->start(
new NotifyRunnable(subscriber, mNotifications));
177 for (
const auto ¬ification : std::as_const(mNotifications)) {
178 QList<QByteArray> listeners;
179 for (NotificationSubscriber *subscriber : std::as_const(mSubscribers)) {
180 if (subscriber && subscriber->notify(notification)) {
181 listeners.
push_back(subscriber->subscriber());
185 emitDebugNotification(notification, listeners);
189 mNotifications.clear();
192void NotificationManager::emitDebugNotification(
const Protocol::ChangeNotificationPtr &ntf,
const QList<QByteArray> &listeners)
194 auto debugNtf = Protocol::DebugChangeNotificationPtr::create();
195 debugNtf->setNotification(ntf);
196 debugNtf->setListeners(listeners);
198 mSubscribers | Views::filter(IsNotNull) | Actions::forEach([
this, &debugNtf](
const auto &subscriber) {
199 mNotifyThreadPool->start(
new NotifyRunnable(subscriber, {debugNtf}));
203#include "moc_notificationmanager.cpp"
Helper integration between Akonadi and Qt.
qint64 currentMSecsSinceEpoch()
void push_back(parameter_type value)
QMetaObject::Connection connect(const QObject *sender, PointerToMemberFunction signal, Functor functor)
QThread * thread() const const
QFuture< T > run(Function function,...)
QThread * currentThread()