KUnifiedPush

serversenteventsstream.cpp
1/*
2 SPDX-FileCopyrightText: 2022 Volker Krause <vkrause@kde.org>
3 SPDX-License-Identifier: LGPL-2.0-or-later
4*/
5
6#include "serversenteventsstream.h"
7#include "logging.h"
8
9#include <QIODevice>
10
11#include <algorithm>
12#include <cstring>
13
14using namespace KUnifiedPush;
15
16ServerSentEventsStream::ServerSentEventsStream(QObject *parent)
17 : QObject(parent)
18{
19}
20
21ServerSentEventsStream::~ServerSentEventsStream() = default;
22
23void ServerSentEventsStream::read(QIODevice *device)
24{
25 m_buffer.clear();
26 connect(device, &QIODevice::readyRead, this, [device, this]() {
27 m_buffer.append(device->read(device->bytesAvailable()));
28 processBuffer();
29 });
30}
31
32QByteArray ServerSentEventsStream::buffer() const
33{
34 return m_buffer;
35}
36
37static bool isLineBreak(char c)
38{
39 return c == '\n' || c == '\r';
40}
41
42static QByteArray::ConstIterator findLineBreak(const QByteArray::const_iterator &begin, const QByteArray::const_iterator &end)
43{
44 return std::find_if(begin, end, isLineBreak);
45}
46
47static QByteArray::const_iterator consumeLineBreak(const QByteArray::const_iterator &begin, const QByteArray::const_iterator &end)
48{
49 auto it = begin;
50 if (it == end) {
51 } else if ((*it) == '\n') {
52 ++it;
53 } else if ((*it) == '\r') {
54 ++it;
55 if (it != end && (*it) == '\n') {
56 ++it;
57 }
58 }
59
60 return it;
61}
62
63static QByteArray::const_iterator findMessageEnd(const QByteArray::const_iterator &begin, const QByteArray::const_iterator &end)
64{
65 for (auto it = findLineBreak(begin, end); it != end; it = findLineBreak(it, end)) {
66 auto lookAhead = consumeLineBreak(it, end);
67 if (lookAhead != end && isLineBreak(*lookAhead)) {
68 return it;
69 }
70 it = lookAhead;
71 }
72
73 return end;
74}
75
76void ServerSentEventsStream::processBuffer()
77{
78 qCDebug(Log) << m_buffer;
79 auto msgEnd = findMessageEnd(m_buffer.begin(), m_buffer.end());
80 if (msgEnd == m_buffer.end()) {
81 qCDebug(Log) << "buffer incomplete, waiting for more";
82 return;
83 }
84
85 SSEMessage msg;
86 for (auto it = m_buffer.constBegin(); it != msgEnd;) {
87 auto lineBegin = it;
88 auto lineEnd = findLineBreak(lineBegin, msgEnd);
89 it = consumeLineBreak(lineEnd, msgEnd);
90 Q_ASSERT(lineBegin != it);
91
92 auto colonIt = std::find(lineBegin, lineEnd, ':');
93 auto valueBegin = colonIt;
94 if (valueBegin != lineEnd) {
95 ++valueBegin;
96 if (valueBegin != lineEnd && (*valueBegin) == ' ') {
97 ++valueBegin;
98 }
99 }
100
101 if (colonIt == lineBegin || valueBegin == lineEnd) {
102 continue; // comment or value-less field
103 }
104
105 const auto fieldNameLen = std::distance(lineBegin, colonIt);
106 if (fieldNameLen == 4 && std::strncmp(lineBegin, "data", fieldNameLen) == 0) {
107 msg.data.append(valueBegin, std::distance(valueBegin, lineEnd));
108 } else if (fieldNameLen == 5 && std::strncmp(lineBegin, "event", fieldNameLen) == 0) {
109 msg.event = QByteArray(valueBegin, std::distance(valueBegin, lineEnd));
110 } else {
111 msg.metaData.insert(QByteArray(lineBegin, std::distance(lineBegin, colonIt)), QByteArray(valueBegin, std::distance(valueBegin, lineEnd)));
112 }
113 }
114
115 // defer emission of messages until the below is finished as well
116 // this avoids reaction to this pulling things out under our feet here
117 QMetaObject::invokeMethod(this, [msg, this]() { Q_EMIT messageReceived(msg); }, Qt::QueuedConnection);
118
119 msgEnd = consumeLineBreak(msgEnd, m_buffer.end());
120 msgEnd = consumeLineBreak(msgEnd, m_buffer.end());
121 if (msgEnd == m_buffer.end()) {
122 m_buffer.clear();
123 } else {
124 const auto remainingLen = m_buffer.size() - std::distance(m_buffer.constBegin(), msgEnd);
125 std::memmove(m_buffer.begin(), msgEnd, remainingLen);
126 m_buffer.truncate(remainingLen);
127 processBuffer();
128 }
129}
130
131#include "moc_serversenteventsstream.cpp"
const QList< QKeySequence > & begin()
const QList< QKeySequence > & end()
Client-side integration with UnifiedPush.
Definition connector.h:14
bool isLineBreak(const typename Trait::String &s)
QByteArray & append(QByteArrayView data)
typedef const_iterator
iterator insert(const Key &key, const T &value)
virtual qint64 bytesAvailable() const const
QByteArray read(qint64 maxSize)
void readyRead()
bool invokeMethod(QObject *context, Functor &&function, FunctorReturnType *ret)
Q_EMITQ_EMIT
QMetaObject::Connection connect(const QObject *sender, PointerToMemberFunction signal, Functor functor)
QueuedConnection
This file is part of the KDE documentation.
Documentation copyright © 1996-2025 The KDE developers.
Generated on Fri Apr 25 2025 12:05:39 by doxygen 1.13.2 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.