KPipewire

pipewireproduce.cpp
1/*
2 SPDX-FileCopyrightText: 2022 Aleix Pol Gonzalez <aleixpol@kde.org>
3
4 SPDX-License-Identifier: LGPL-2.1-only OR LGPL-3.0-only OR LicenseRef-KDE-Accepted-LGPL
5*/
6
7#include "pipewireproduce_p.h"
8
9#include <QMutex>
10#include <QPainter>
11#include <QThreadPool>
12#include <logging_record.h>
13
14#include <QDateTime>
15#include <memory>
16#include <qstringliteral.h>
17
18#include "gifencoder_p.h"
19#include "h264vaapiencoder_p.h"
20#include "libopenh264encoder_p.h"
21#include "libvpxencoder_p.h"
22#include "libvpxvp9encoder_p.h"
23#include "libwebpencoder_p.h"
24#include "libx264encoder_p.h"
25
26#include "logging_frame_statistics.h"
27
28extern "C" {
29#include <fcntl.h>
30}
31
32Q_DECLARE_METATYPE(std::optional<int>);
33Q_DECLARE_METATYPE(std::optional<std::chrono::nanoseconds>);
34
35PipeWireProduce::PipeWireProduce(PipeWireBaseEncodedStream::Encoder encoderType, uint nodeId, uint fd, const Fraction &framerate)
36 : QObject()
37 , m_nodeId(nodeId)
38 , m_encoderType(encoderType)
39 , m_fd(fd)
40 , m_frameRate(framerate)
41{
42 qRegisterMetaType<std::optional<int>>();
43 qRegisterMetaType<std::optional<std::chrono::nanoseconds>>();
44}
45
46PipeWireProduce::~PipeWireProduce()
47{
48}
49
50void PipeWireProduce::initialize()
51{
52 m_stream.reset(new PipeWireSourceStream(nullptr));
53 m_stream->setMaxFramerate(m_frameRate);
54
55 // The check in supportsHardwareEncoding() is insufficient to fully
56 // determine if we actually support hardware encoding the current stream,
57 // but to determine that we need the stream size, which we don't get until
58 // after we've created the stream, but creating the stream sets important
59 // parameters that require the correct usage hint to be set. So use the
60 // insufficient check to set the hint, assuming that we still get a working
61 // stream when we use the wrong hint with software encoding.
62 m_stream->setUsageHint(Encoder::supportsHardwareEncoding() ? PipeWireSourceStream::UsageHint::EncodeHardware
63 : PipeWireSourceStream::UsageHint::EncodeSoftware);
64
65 bool created = m_stream->createStream(m_nodeId, m_fd);
66 if (!created || !m_stream->error().isEmpty()) {
67 qCWarning(PIPEWIRERECORD_LOGGING) << "failed to set up stream for" << m_nodeId << m_stream->error();
68 m_error = m_stream->error();
69 m_stream.reset(nullptr);
70 return;
71 }
72 connect(m_stream.get(), &PipeWireSourceStream::streamParametersChanged, this, &PipeWireProduce::setupStream);
73
74 if (PIPEWIRERECORDFRAMESTATS_LOGGING().isDebugEnabled()) {
75 m_frameStatisticsTimer = std::make_unique<QTimer>();
76 m_frameStatisticsTimer->setInterval(std::chrono::seconds(1));
77 connect(m_frameStatisticsTimer.get(), &QTimer::timeout, this, [this]() {
78 qCDebug(PIPEWIRERECORDFRAMESTATS_LOGGING) << "Processed" << m_processedFrames << "frames in the last second.";
79 qCDebug(PIPEWIRERECORDFRAMESTATS_LOGGING) << m_pendingFilterFrames << "frames pending for filter.";
80 qCDebug(PIPEWIRERECORDFRAMESTATS_LOGGING) << m_pendingEncodeFrames << "frames pending for encode.";
81 m_processedFrames = 0;
82 });
83 }
84
85 /**
86 * Kwin only sends a new frame when there's damage on screen
87 * The encoder does not flush all frames whilst a stream is active
88 * it will keep one frame in the queue waiting for more input until the stream is closed
89 *
90 * If there's no update this timer bumps the last frame through the stack again
91 * to flush the last frame.
92 */
93 m_frameRepeatTimer.reset(new QTimer);
94 m_frameRepeatTimer->setSingleShot(true);
95 m_frameRepeatTimer->setInterval(100);
96 connect(m_frameRepeatTimer.data(), &QTimer::timeout, this, [this]() {
97 auto f = m_lastFrame;
98 m_lastFrame = {};
99 aboutToEncode(f);
100 if (!m_encoder->filterFrame(f)) {
101 return;
102 }
103
104 m_pendingFilterFrames++;
105 m_passthroughCondition.notify_all();
106 });
107}
108
109Fraction PipeWireProduce::maxFramerate() const
110{
111 return m_maxFramerate;
112}
113
114void PipeWireProduce::setMaxFramerate(const Fraction &framerate)
115{
116 m_maxFramerate = framerate;
117
118 const double framesPerSecond = static_cast<double>(framerate.numerator) / framerate.denominator;
119 if (m_frameRepeatTimer) {
120 m_frameRepeatTimer->setInterval((1000 / framesPerSecond) * 2);
121 }
122 if (m_stream) {
123 m_stream->setMaxFramerate(framerate);
124 }
125}
126
127int PipeWireProduce::maxPendingFrames() const
128{
129 return m_maxPendingFrames;
130}
131
132void PipeWireProduce::setMaxPendingFrames(int newMaxBufferSize)
133{
134 if (newMaxBufferSize < 3) {
135 qCWarning(PIPEWIRERECORD_LOGGING) << "Maxmimum pending frame count of " << newMaxBufferSize << " requested. Value must be 3 or higher.";
136 newMaxBufferSize = 3;
137 }
138 m_maxPendingFrames = newMaxBufferSize;
139}
140
141void PipeWireProduce::setupStream()
142{
143 qCDebug(PIPEWIRERECORD_LOGGING) << "Setting up stream";
144 disconnect(m_stream.get(), &PipeWireSourceStream::streamParametersChanged, this, &PipeWireProduce::setupStream);
145
146 m_encoder = makeEncoder();
147 if (!m_encoder) {
148 qCWarning(PIPEWIRERECORD_LOGGING) << "No encoder could be created";
149 return;
150 }
151
152 connect(m_stream.get(), &PipeWireSourceStream::stateChanged, this, &PipeWireProduce::stateChanged);
153 if (!setupFormat()) {
154 qCWarning(PIPEWIRERECORD_LOGGING) << "Could not set up the producing thread";
155 return;
156 }
157
158 connect(m_stream.data(), &PipeWireSourceStream::frameReceived, this, &PipeWireProduce::processFrame);
159
160 m_passthroughThread = std::thread([this]() {
161 m_passthroughRunning = true;
162 while (m_passthroughRunning) {
163 std::unique_lock<std::mutex> lock(m_passthroughMutex);
164 m_passthroughCondition.wait(lock);
165
166 if (!m_passthroughRunning) {
167 break;
168 }
169
170 auto [filtered, queued] = m_encoder->encodeFrame(m_maxPendingFrames - m_pendingEncodeFrames);
171 m_pendingFilterFrames -= filtered;
172 m_pendingEncodeFrames += queued;
173
174 m_outputCondition.notify_all();
175 }
176 });
177 pthread_setname_np(m_passthroughThread.native_handle(), "PipeWireProduce::passthrough");
178
179 m_outputThread = std::thread([this]() {
180 m_outputRunning = true;
181 while (m_outputRunning) {
182 std::unique_lock<std::mutex> lock(m_outputMutex);
183 m_outputCondition.wait(lock);
184
185 if (!m_outputRunning) {
186 break;
187 }
188
189 auto received = m_encoder->receivePacket();
190 m_pendingEncodeFrames -= received;
191 m_processedFrames += received;
192
193 // Notify the produce thread that the count of processed frames has
194 // changed and it can do cleanup if needed, making sure that that
195 // handling is done on the right thread.
196 QMetaObject::invokeMethod(this, &PipeWireProduce::handleEncodedFramesChanged, Qt::QueuedConnection);
197 }
198 });
199 pthread_setname_np(m_outputThread.native_handle(), "PipeWireProduce::output");
200
201 if (m_frameStatisticsTimer) {
202 m_frameStatisticsTimer->start();
203 }
204}
205
206void PipeWireProduce::deactivate()
207{
208 m_deactivated = true;
209
210 auto streamState = PW_STREAM_STATE_PAUSED;
211 if (m_stream) {
212 streamState = m_stream->state();
213 m_stream->setActive(false);
214 }
215
216 // If we have not been initialized properly before, ensure we still run any
217 // cleanup code and exit the thread, otherwise we risk applications not closing
218 // properly.
219 if (!m_encoder || streamState != PW_STREAM_STATE_STREAMING) {
220 QMetaObject::invokeMethod(this, &PipeWireProduce::destroy, Qt::QueuedConnection);
221 }
222}
223
224void PipeWireProduce::destroy()
225{
226 // Ensure we cleanup the PipeWireSourceStream while in the same thread we
227 // created it in.
228 Q_ASSERT_X(QThread::currentThread() == thread(), "PipeWireProduce", "destroy() called from a different thread than PipeWireProduce's thread");
229
230 if (!m_stream) {
231 return;
232 }
233
234 m_frameRepeatTimer->stop();
235
236 m_frameStatisticsTimer = nullptr;
237
238 if (m_passthroughThread.joinable()) {
239 m_passthroughRunning = false;
240 m_passthroughCondition.notify_all();
241 m_passthroughThread.join();
242 }
243
244 if (m_outputThread.joinable()) {
245 m_outputRunning = false;
246 m_outputCondition.notify_all();
247 m_outputThread.join();
248 }
249
250 m_stream.reset();
251
252 qCDebug(PIPEWIRERECORD_LOGGING) << "finished";
253 cleanup();
255}
256
257void PipeWireProduce::setQuality(const std::optional<quint8> &quality)
258{
259 m_quality = quality;
260 if (m_encoder) {
261 m_encoder->setQuality(quality);
262 }
263}
264
265void PipeWireProduce::setEncodingPreference(const PipeWireBaseEncodedStream::EncodingPreference &encodingPreference)
266{
267 m_encodingPreference = encodingPreference;
268
269 if (m_encoder) {
270 m_encoder->setEncodingPreference(encodingPreference);
271 }
272}
273
274void PipeWireProduce::processFrame(const PipeWireFrame &frame)
275{
276 auto f = frame;
277
278 m_lastFrame = frame;
279 m_frameRepeatTimer->start();
280
281 if (frame.cursor) {
282 m_cursor.position = frame.cursor->position;
283 m_cursor.hotspot = frame.cursor->hotspot;
284 if (!frame.cursor->texture.isNull()) {
285 m_cursor.dirty = true;
286 m_cursor.texture = frame.cursor->texture;
287 }
288 }
289
290 auto pts = framePts(frame.presentationTimestamp);
291 if (m_previousPts >= 0 && pts <= m_previousPts) {
292 return;
293 }
294
295 auto frameTime = 1000.0 / (m_maxFramerate.numerator / m_maxFramerate.denominator);
296 if ((pts - m_previousPts) < frameTime) {
297 return;
298 }
299
300 if (m_pendingFilterFrames + 1 > m_maxPendingFrames) {
301 qCWarning(PIPEWIRERECORD_LOGGING) << "Filter queue is full, dropping frame" << pts;
302 return;
303 }
304
305 aboutToEncode(f);
306 if (!m_encoder->filterFrame(f)) {
307 return;
308 }
309
310 m_pendingFilterFrames++;
311 m_previousPts = pts;
312
313 m_passthroughCondition.notify_all();
314}
315
316void PipeWireProduce::stateChanged(pw_stream_state state)
317{
318 if (state != PW_STREAM_STATE_PAUSED || !m_deactivated) {
319 return;
320 }
321 if (!m_stream) {
322 qCDebug(PIPEWIRERECORD_LOGGING) << "finished without a stream";
323 return;
324 }
325
326 disconnect(m_stream.data(), &PipeWireSourceStream::frameReceived, this, &PipeWireProduce::processFrame);
327
328 if (m_pendingFilterFrames <= 0 && m_pendingEncodeFrames <= 0) {
329 // If we have nothing pending, cleanup immediately.
330 m_encoder->finish();
331
332 // We want to clean up the source stream while in the input thread, but we
333 // need to do so while not handling any PipeWire callback as that risks
334 // crashing because we're stil executing PipeWire handling code.
335 QMetaObject::invokeMethod(this, &PipeWireProduce::destroy, Qt::QueuedConnection);
336 } else {
337 // If we have pending frames, wait with cleanup until all frames have been processed.
338 qCDebug(PIPEWIRERECORD_LOGGING) << "Waiting for frame queues to empty, still pending filter" << m_pendingFilterFrames << "encode"
339 << m_pendingEncodeFrames;
340 m_passthroughCondition.notify_all();
341 }
342}
343
344void PipeWireProduce::handleEncodedFramesChanged()
345{
346 if (!m_deactivated) {
347 return;
348 }
349
350 // If we're deactivating but still have frames in the queue, we want to
351 // flush everything. Since at that point we are not receiving new frames, we
352 // need a different trigger to make the filtering thread process frames.
353 // Triggering here means the filter thread runs as fast as the encode thread
354 // can process the frames.
355 m_passthroughCondition.notify_all();
356
357 if (m_pendingFilterFrames <= 0) {
358 m_encoder->finish();
359
360 if (m_pendingEncodeFrames <= 0) {
361 destroy();
362 }
363 }
364}
365
366std::unique_ptr<Encoder> PipeWireProduce::makeEncoder()
367{
368 auto forcedEncoder = qEnvironmentVariable("KPIPEWIRE_FORCE_ENCODER");
369 if (!forcedEncoder.isNull()) {
370 qCWarning(PIPEWIRERECORD_LOGGING) << "Forcing encoder to" << forcedEncoder;
371 }
372
373 auto size = m_stream->size();
374
375 switch (m_encoderType) {
376 case PipeWireBaseEncodedStream::H264Baseline:
377 case PipeWireBaseEncodedStream::H264Main: {
378 auto profile = m_encoderType == PipeWireBaseEncodedStream::H264Baseline ? Encoder::H264Profile::Baseline : Encoder::H264Profile::Main;
379
380 if (forcedEncoder.isNull() || forcedEncoder == u"h264_vaapi") {
381 auto hardwareEncoder = std::make_unique<H264VAAPIEncoder>(profile, this);
382 hardwareEncoder->setQuality(m_quality);
383 hardwareEncoder->setEncodingPreference(m_encodingPreference);
384 if (hardwareEncoder->initialize(size)) {
385 return hardwareEncoder;
386 }
387 }
388
389 if (forcedEncoder.isNull() || forcedEncoder == u"libx264") {
390 auto softwareEncoder = std::make_unique<LibX264Encoder>(profile, this);
391 softwareEncoder->setQuality(m_quality);
392 softwareEncoder->setEncodingPreference(m_encodingPreference);
393 if (softwareEncoder->initialize(size)) {
394 return softwareEncoder;
395 }
396 }
397
398 // Try libopenh264 last, it's slower and has less features.
399 if (forcedEncoder.isNull() || forcedEncoder == u"libopenh264") {
400 auto softwareEncoder = std::make_unique<LibOpenH264Encoder>(profile, this);
401 softwareEncoder->setQuality(m_quality);
402 softwareEncoder->setEncodingPreference(m_encodingPreference);
403 if (softwareEncoder->initialize(size)) {
404 return softwareEncoder;
405 }
406 }
407 break;
408 }
409 case PipeWireBaseEncodedStream::VP8: {
410 if (forcedEncoder.isNull() || forcedEncoder == u"libvpx") {
411 auto encoder = std::make_unique<LibVpxEncoder>(this);
412 encoder->setQuality(m_quality);
413 if (encoder->initialize(size)) {
414 return encoder;
415 }
416 }
417 break;
418 }
419 case PipeWireBaseEncodedStream::VP9: {
420 if (forcedEncoder.isNull() || forcedEncoder == u"libvpx-vp9") {
421 auto encoder = std::make_unique<LibVpxVp9Encoder>(this);
422 encoder->setQuality(m_quality);
423 if (encoder->initialize(size)) {
424 return encoder;
425 }
426 }
427 break;
428 }
429 case PipeWireBaseEncodedStream::Gif: {
430 if (forcedEncoder.isNull() || forcedEncoder == u"gif") {
431 auto encoder = std::make_unique<GifEncoder>(this);
432 if (encoder->initialize(size)) {
433 return encoder;
434 }
435 }
436 break;
437 }
438 case PipeWireBaseEncodedStream::WebP: {
439 if (forcedEncoder.isNull() || forcedEncoder == u"libwebp") {
440 auto encoder = std::make_unique<LibWebPEncoder>(this);
441 encoder->setQuality(m_quality);
442 if (encoder->initialize(size)) {
443 return encoder;
444 }
445 }
446 break;
447 }
448 default:
449 qCWarning(PIPEWIRERECORD_LOGGING) << "Unknown encoder type" << m_encoderType;
450 }
451
452 return nullptr;
453}
454
455#include "moc_pipewireproduce_p.cpp"
bool invokeMethod(QObject *context, Functor &&function, FunctorReturnType *ret)
QueuedConnection
QFuture< typename qValueType< Iterator >::value_type > filtered(Iterator begin, Iterator end, KeepFunctor &&filterFunction)
QFuture< ArgsType< Signal > > connect(Sender *sender, Signal signal)
QThread * currentThread()
void quit()
void timeout()
This file is part of the KDE documentation.
Documentation copyright © 1996-2025 The KDE developers.
Generated on Fri Apr 11 2025 11:54:43 by doxygen 1.13.2 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.