KPipewire

pipewireproduce.cpp
1/*
2 SPDX-FileCopyrightText: 2022 Aleix Pol Gonzalez <aleixpol@kde.org>
3
4 SPDX-License-Identifier: LGPL-2.1-only OR LGPL-3.0-only OR LicenseRef-KDE-Accepted-LGPL
5*/
6
7#include "pipewireproduce_p.h"
8
9#include <QMutex>
10#include <QPainter>
11#include <QThreadPool>
12#include <logging_record.h>
13
14#include <QDateTime>
15#include <memory>
16#include <qstringliteral.h>
17
18#include "gifencoder_p.h"
19#include "h264vaapiencoder_p.h"
20#include "libopenh264encoder_p.h"
21#include "libvpxencoder_p.h"
22#include "libvpxvp9encoder_p.h"
23#include "libwebpencoder_p.h"
24#include "libx264encoder_p.h"
25
26extern "C" {
27#include <fcntl.h>
28}
29
30Q_DECLARE_METATYPE(std::optional<int>);
31Q_DECLARE_METATYPE(std::optional<std::chrono::nanoseconds>);
32
33PipeWireProduce::PipeWireProduce(PipeWireBaseEncodedStream::Encoder encoderType, uint nodeId, uint fd, const Fraction &framerate)
34 : QObject()
35 , m_nodeId(nodeId)
36 , m_encoderType(encoderType)
37 , m_fd(fd)
38 , m_frameRate(framerate)
39{
40 qRegisterMetaType<std::optional<int>>();
41 qRegisterMetaType<std::optional<std::chrono::nanoseconds>>();
42}
43
44PipeWireProduce::~PipeWireProduce()
45{
46}
47
48void PipeWireProduce::initialize()
49{
50 m_stream.reset(new PipeWireSourceStream(nullptr));
51 m_stream->setMaxFramerate(m_frameRate);
52
53 // The check in supportsHardwareEncoding() is insufficient to fully
54 // determine if we actually support hardware encoding the current stream,
55 // but to determine that we need the stream size, which we don't get until
56 // after we've created the stream, but creating the stream sets important
57 // parameters that require the correct usage hint to be set. So use the
58 // insufficient check to set the hint, assuming that we still get a working
59 // stream when we use the wrong hint with software encoding.
60 m_stream->setUsageHint(Encoder::supportsHardwareEncoding() ? PipeWireSourceStream::UsageHint::EncodeHardware
61 : PipeWireSourceStream::UsageHint::EncodeSoftware);
62
63 bool created = m_stream->createStream(m_nodeId, m_fd);
64 if (!created || !m_stream->error().isEmpty()) {
65 qCWarning(PIPEWIRERECORD_LOGGING) << "failed to set up stream for" << m_nodeId << m_stream->error();
66 m_error = m_stream->error();
67 m_stream.reset(nullptr);
68 return;
69 }
70 connect(m_stream.get(), &PipeWireSourceStream::streamParametersChanged, this, &PipeWireProduce::setupStream);
71
72 /**
73 * Kwin only sends a new frame when there's damage on screen
74 * The encoder does not flush all frames whilst a stream is active
75 * it will keep one frame in the queue waiting for more input until the stream is closed
76 *
77 * If there's no update this timer bumps the last frame through the stack again
78 * to flush the last frame.
79 */
80 m_frameRepeatTimer.reset(new QTimer);
81 m_frameRepeatTimer->setSingleShot(true);
82 m_frameRepeatTimer->setInterval(100);
83 connect(m_frameRepeatTimer.data(), &QTimer::timeout, this, [this]() {
84 auto f = m_lastFrame;
85 m_lastFrame = {};
86 aboutToEncode(f);
87 if (!m_encoder->filterFrame(f)) {
88 return;
89 }
90
91 m_pendingFilterFrames++;
92 m_passthroughCondition.notify_all();
93 });
94}
95
96Fraction PipeWireProduce::maxFramerate() const
97{
98 return m_maxFramerate;
99}
100
101void PipeWireProduce::setMaxFramerate(const Fraction &framerate)
102{
103 m_maxFramerate = framerate;
104
105 const double framesPerSecond = static_cast<double>(framerate.numerator) / framerate.denominator;
106 if (m_frameRepeatTimer) {
107 m_frameRepeatTimer->setInterval((1000 / framesPerSecond) * 2);
108 }
109 if (m_stream) {
110 m_stream->setMaxFramerate(framerate);
111 }
112}
113
114int PipeWireProduce::maxPendingFrames() const
115{
116 return m_maxPendingFrames;
117}
118
119void PipeWireProduce::setMaxPendingFrames(int newMaxBufferSize)
120{
121 if (newMaxBufferSize < 3) {
122 qCWarning(PIPEWIRERECORD_LOGGING) << "Maxmimum pending frame count of " << newMaxBufferSize << " requested. Value must be 3 or higher.";
123 newMaxBufferSize = 3;
124 }
125 m_maxPendingFrames = newMaxBufferSize;
126}
127
128void PipeWireProduce::setupStream()
129{
130 qCDebug(PIPEWIRERECORD_LOGGING) << "Setting up stream";
131 disconnect(m_stream.get(), &PipeWireSourceStream::streamParametersChanged, this, &PipeWireProduce::setupStream);
132
133 m_encoder = makeEncoder();
134 if (!m_encoder) {
135 qCWarning(PIPEWIRERECORD_LOGGING) << "No encoder could be created";
136 return;
137 }
138
139 connect(m_stream.get(), &PipeWireSourceStream::stateChanged, this, &PipeWireProduce::stateChanged);
140 if (!setupFormat()) {
141 qCWarning(PIPEWIRERECORD_LOGGING) << "Could not set up the producing thread";
142 return;
143 }
144
145 connect(m_stream.data(), &PipeWireSourceStream::frameReceived, this, &PipeWireProduce::processFrame);
146
147 m_passthroughThread = std::thread([this]() {
148 m_passthroughRunning = true;
149 while (m_passthroughRunning) {
150 std::unique_lock<std::mutex> lock(m_passthroughMutex);
151 m_passthroughCondition.wait(lock);
152
153 if (!m_passthroughRunning) {
154 break;
155 }
156
157 auto [filtered, queued] = m_encoder->encodeFrame(m_maxPendingFrames - m_pendingEncodeFrames);
158 m_pendingFilterFrames -= filtered;
159 m_pendingEncodeFrames += queued;
160
161 m_outputCondition.notify_all();
162 }
163 });
164 pthread_setname_np(m_passthroughThread.native_handle(), "PipeWireProduce::passthrough");
165
166 m_outputThread = std::thread([this]() {
167 m_outputRunning = true;
168 while (m_outputRunning) {
169 std::unique_lock<std::mutex> lock(m_outputMutex);
170 m_outputCondition.wait(lock);
171
172 if (!m_outputRunning) {
173 break;
174 }
175
176 auto received = m_encoder->receivePacket();
177 m_pendingEncodeFrames -= received;
178
179 // Notify the produce thread that the count of processed frames has
180 // changed and it can do cleanup if needed, making sure that that
181 // handling is done on the right thread.
182 QMetaObject::invokeMethod(this, &PipeWireProduce::handleEncodedFramesChanged, Qt::QueuedConnection);
183 }
184 });
185 pthread_setname_np(m_outputThread.native_handle(), "PipeWireProduce::output");
186}
187
188void PipeWireProduce::deactivate()
189{
190 m_deactivated = true;
191
192 auto streamState = PW_STREAM_STATE_PAUSED;
193 if (m_stream) {
194 streamState = m_stream->state();
195 m_stream->setActive(false);
196 }
197
198 // If we have not been initialized properly before, ensure we still run any
199 // cleanup code and exit the thread, otherwise we risk applications not closing
200 // properly.
201 if (!m_encoder || streamState != PW_STREAM_STATE_STREAMING) {
202 QMetaObject::invokeMethod(this, &PipeWireProduce::destroy, Qt::QueuedConnection);
203 }
204}
205
206void PipeWireProduce::destroy()
207{
208 // Ensure we cleanup the PipeWireSourceStream while in the same thread we
209 // created it in.
210 Q_ASSERT_X(QThread::currentThread() == thread(), "PipeWireProduce", "destroy() called from a different thread than PipeWireProduce's thread");
211
212 if (!m_stream) {
213 return;
214 }
215
216 m_frameRepeatTimer->stop();
217
218 if (m_passthroughThread.joinable()) {
219 m_passthroughRunning = false;
220 m_passthroughCondition.notify_all();
221 m_passthroughThread.join();
222 }
223
224 if (m_outputThread.joinable()) {
225 m_outputRunning = false;
226 m_outputCondition.notify_all();
227 m_outputThread.join();
228 }
229
230 m_stream.reset();
231
232 qCDebug(PIPEWIRERECORD_LOGGING) << "finished";
233 cleanup();
235}
236
237void PipeWireProduce::setQuality(const std::optional<quint8> &quality)
238{
239 m_quality = quality;
240 if (m_encoder) {
241 m_encoder->setQuality(quality);
242 }
243}
244
245void PipeWireProduce::setEncodingPreference(const PipeWireBaseEncodedStream::EncodingPreference &encodingPreference)
246{
247 m_encodingPreference = encodingPreference;
248
249 if (m_encoder) {
250 m_encoder->setEncodingPreference(encodingPreference);
251 }
252}
253
254void PipeWireProduce::processFrame(const PipeWireFrame &frame)
255{
256 auto f = frame;
257
258 m_lastFrame = frame;
259 m_frameRepeatTimer->start();
260
261 if (frame.cursor) {
262 m_cursor.position = frame.cursor->position;
263 m_cursor.hotspot = frame.cursor->hotspot;
264 if (!frame.cursor->texture.isNull()) {
265 m_cursor.dirty = true;
266 m_cursor.texture = frame.cursor->texture;
267 }
268 }
269
270 auto pts = framePts(frame.presentationTimestamp);
271 if (m_previousPts >= 0 && pts <= m_previousPts) {
272 return;
273 }
274
275 auto frameTime = 1000.0 / (m_maxFramerate.numerator / m_maxFramerate.denominator);
276 if ((pts - m_previousPts) < frameTime) {
277 return;
278 }
279
280 if (m_pendingFilterFrames + 1 > m_maxPendingFrames) {
281 qCWarning(PIPEWIRERECORD_LOGGING) << "Filter queue is full, dropping frame" << pts;
282 return;
283 }
284
285 aboutToEncode(f);
286 if (!m_encoder->filterFrame(f)) {
287 return;
288 }
289
290 m_pendingFilterFrames++;
291 m_previousPts = pts;
292
293 m_passthroughCondition.notify_all();
294}
295
296void PipeWireProduce::stateChanged(pw_stream_state state)
297{
298 if (state != PW_STREAM_STATE_PAUSED || !m_deactivated) {
299 return;
300 }
301 if (!m_stream) {
302 qCDebug(PIPEWIRERECORD_LOGGING) << "finished without a stream";
303 return;
304 }
305
306 disconnect(m_stream.data(), &PipeWireSourceStream::frameReceived, this, &PipeWireProduce::processFrame);
307
308 if (m_pendingFilterFrames <= 0 && m_pendingEncodeFrames <= 0) {
309 // If we have nothing pending, cleanup immediately.
310 m_encoder->finish();
311
312 // We want to clean up the source stream while in the input thread, but we
313 // need to do so while not handling any PipeWire callback as that risks
314 // crashing because we're stil executing PipeWire handling code.
315 QMetaObject::invokeMethod(this, &PipeWireProduce::destroy, Qt::QueuedConnection);
316 } else {
317 // If we have pending frames, wait with cleanup until all frames have been processed.
318 qCDebug(PIPEWIRERECORD_LOGGING) << "Waiting for frame queues to empty, still pending filter" << m_pendingFilterFrames << "encode"
319 << m_pendingEncodeFrames;
320 m_passthroughCondition.notify_all();
321 }
322}
323
324void PipeWireProduce::handleEncodedFramesChanged()
325{
326 if (!m_deactivated) {
327 return;
328 }
329
330 // If we're deactivating but still have frames in the queue, we want to
331 // flush everything. Since at that point we are not receiving new frames, we
332 // need a different trigger to make the filtering thread process frames.
333 // Triggering here means the filter thread runs as fast as the encode thread
334 // can process the frames.
335 m_passthroughCondition.notify_all();
336
337 if (m_pendingFilterFrames <= 0) {
338 m_encoder->finish();
339
340 if (m_pendingEncodeFrames <= 0) {
341 destroy();
342 }
343 }
344}
345
346std::unique_ptr<Encoder> PipeWireProduce::makeEncoder()
347{
348 auto forcedEncoder = qEnvironmentVariable("KPIPEWIRE_FORCE_ENCODER");
349 if (!forcedEncoder.isNull()) {
350 qCWarning(PIPEWIRERECORD_LOGGING) << "Forcing encoder to" << forcedEncoder;
351 }
352
353 auto size = m_stream->size();
354
355 switch (m_encoderType) {
356 case PipeWireBaseEncodedStream::H264Baseline:
357 case PipeWireBaseEncodedStream::H264Main: {
358 auto profile = m_encoderType == PipeWireBaseEncodedStream::H264Baseline ? Encoder::H264Profile::Baseline : Encoder::H264Profile::Main;
359
360 if (forcedEncoder.isNull() || forcedEncoder == u"h264_vaapi") {
361 auto hardwareEncoder = std::make_unique<H264VAAPIEncoder>(profile, this);
362 hardwareEncoder->setQuality(m_quality);
363 hardwareEncoder->setEncodingPreference(m_encodingPreference);
364 if (hardwareEncoder->initialize(size)) {
365 return hardwareEncoder;
366 }
367 }
368
369 if (forcedEncoder.isNull() || forcedEncoder == u"libx264") {
370 auto softwareEncoder = std::make_unique<LibX264Encoder>(profile, this);
371 softwareEncoder->setQuality(m_quality);
372 softwareEncoder->setEncodingPreference(m_encodingPreference);
373 if (softwareEncoder->initialize(size)) {
374 return softwareEncoder;
375 }
376 }
377
378 // Try libopenh264 last, it's slower and has less features.
379 if (forcedEncoder.isNull() || forcedEncoder == u"libopenh264") {
380 auto softwareEncoder = std::make_unique<LibOpenH264Encoder>(profile, this);
381 softwareEncoder->setQuality(m_quality);
382 softwareEncoder->setEncodingPreference(m_encodingPreference);
383 if (softwareEncoder->initialize(size)) {
384 return softwareEncoder;
385 }
386 }
387 break;
388 }
389 case PipeWireBaseEncodedStream::VP8: {
390 if (forcedEncoder.isNull() || forcedEncoder == u"libvpx") {
391 auto encoder = std::make_unique<LibVpxEncoder>(this);
392 encoder->setQuality(m_quality);
393 if (encoder->initialize(size)) {
394 return encoder;
395 }
396 }
397 break;
398 }
399 case PipeWireBaseEncodedStream::VP9: {
400 if (forcedEncoder.isNull() || forcedEncoder == u"libvpx-vp9") {
401 auto encoder = std::make_unique<LibVpxVp9Encoder>(this);
402 encoder->setQuality(m_quality);
403 if (encoder->initialize(size)) {
404 return encoder;
405 }
406 }
407 break;
408 }
409 case PipeWireBaseEncodedStream::Gif: {
410 if (forcedEncoder.isNull() || forcedEncoder == u"gif") {
411 auto encoder = std::make_unique<GifEncoder>(this);
412 if (encoder->initialize(size)) {
413 return encoder;
414 }
415 }
416 break;
417 }
418 case PipeWireBaseEncodedStream::WebP: {
419 if (forcedEncoder.isNull() || forcedEncoder == u"libwebp") {
420 auto encoder = std::make_unique<LibWebPEncoder>(this);
421 encoder->setQuality(m_quality);
422 if (encoder->initialize(size)) {
423 return encoder;
424 }
425 }
426 break;
427 }
428 default:
429 qCWarning(PIPEWIRERECORD_LOGGING) << "Unknown encoder type" << m_encoderType;
430 }
431
432 return nullptr;
433}
434
435#include "moc_pipewireproduce_p.cpp"
bool invokeMethod(QObject *context, Functor &&function, FunctorReturnType *ret)
QueuedConnection
QFuture< typename qValueType< Iterator >::value_type > filtered(Iterator begin, Iterator end, KeepFunctor &&filterFunction)
QFuture< ArgsType< Signal > > connect(Sender *sender, Signal signal)
QThread * currentThread()
void quit()
void timeout()
This file is part of the KDE documentation.
Documentation copyright © 1996-2024 The KDE developers.
Generated on Sat Dec 21 2024 17:05:20 by doxygen 1.12.0 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.