KPipewire

pipewireproduce.cpp
1/*
2 SPDX-FileCopyrightText: 2022 Aleix Pol Gonzalez <aleixpol@kde.org>
3
4 SPDX-License-Identifier: LGPL-2.1-only OR LGPL-3.0-only OR LicenseRef-KDE-Accepted-LGPL
5*/
6
7#include "pipewireproduce_p.h"
8
9#include <QMutex>
10#include <QPainter>
11#include <QThreadPool>
12#include <logging_record.h>
13
14#include <QDateTime>
15#include <memory>
16#include <qstringliteral.h>
17
18#include "h264vaapiencoder_p.h"
19#include "libopenh264encoder_p.h"
20#include "libvpxencoder_p.h"
21#include "libvpxvp9encoder_p.h"
22#include "libx264encoder_p.h"
23
24extern "C" {
25#include <fcntl.h>
26}
27
28Q_DECLARE_METATYPE(std::optional<int>);
29Q_DECLARE_METATYPE(std::optional<std::chrono::nanoseconds>);
30
31PipeWireProduce::PipeWireProduce(PipeWireBaseEncodedStream::Encoder encoderType, uint nodeId, uint fd, const Fraction &framerate)
32 : QObject()
33 , m_nodeId(nodeId)
34 , m_encoderType(encoderType)
35 , m_fd(fd)
36 , m_frameRate(framerate)
37{
38 qRegisterMetaType<std::optional<int>>();
39 qRegisterMetaType<std::optional<std::chrono::nanoseconds>>();
40}
41
42PipeWireProduce::~PipeWireProduce()
43{
44}
45
46void PipeWireProduce::initialize()
47{
48 m_stream.reset(new PipeWireSourceStream(nullptr));
49 m_stream->setMaxFramerate(m_frameRate);
50
51 // The check in supportsHardwareEncoding() is insufficient to fully
52 // determine if we actually support hardware encoding the current stream,
53 // but to determine that we need the stream size, which we don't get until
54 // after we've created the stream, but creating the stream sets important
55 // parameters that require the correct usage hint to be set. So use the
56 // insufficient check to set the hint, assuming that we still get a working
57 // stream when we use the wrong hint with software encoding.
58 m_stream->setUsageHint(Encoder::supportsHardwareEncoding() ? PipeWireSourceStream::UsageHint::EncodeHardware
59 : PipeWireSourceStream::UsageHint::EncodeSoftware);
60
61 bool created = m_stream->createStream(m_nodeId, m_fd);
62 if (!created || !m_stream->error().isEmpty()) {
63 qCWarning(PIPEWIRERECORD_LOGGING) << "failed to set up stream for" << m_nodeId << m_stream->error();
64 m_error = m_stream->error();
65 m_stream.reset(nullptr);
66 return;
67 }
68 connect(m_stream.get(), &PipeWireSourceStream::streamParametersChanged, this, &PipeWireProduce::setupStream);
69}
70
71Fraction PipeWireProduce::maxFramerate() const
72{
73 return m_maxFramerate;
74}
75
76void PipeWireProduce::setMaxFramerate(const Fraction &framerate)
77{
78 m_maxFramerate = framerate;
79 if (m_stream) {
80 m_stream->setMaxFramerate(framerate);
81 }
82}
83
84int PipeWireProduce::maxPendingFrames() const
85{
86 return m_maxPendingFrames;
87}
88
89void PipeWireProduce::setMaxPendingFrames(int newMaxBufferSize)
90{
91 if (newMaxBufferSize < 3) {
92 qCWarning(PIPEWIRERECORD_LOGGING) << "Maxmimum pending frame count of " << newMaxBufferSize << " requested. Value must be 3 or higher.";
93 newMaxBufferSize = 3;
94 }
95 m_maxPendingFrames = newMaxBufferSize;
96}
97
98void PipeWireProduce::setupStream()
99{
100 qCDebug(PIPEWIRERECORD_LOGGING) << "Setting up stream";
101 disconnect(m_stream.get(), &PipeWireSourceStream::streamParametersChanged, this, &PipeWireProduce::setupStream);
102
103 m_encoder = makeEncoder();
104 if (!m_encoder) {
105 qCWarning(PIPEWIRERECORD_LOGGING) << "No encoder could be created";
106 return;
107 }
108
109 connect(m_stream.get(), &PipeWireSourceStream::stateChanged, this, &PipeWireProduce::stateChanged);
110 if (!setupFormat()) {
111 qCWarning(PIPEWIRERECORD_LOGGING) << "Could not set up the producing thread";
112 return;
113 }
114
115 connect(m_stream.data(), &PipeWireSourceStream::frameReceived, this, &PipeWireProduce::processFrame);
116
117 m_passthroughThread = std::thread([this]() {
118 m_passthroughRunning = true;
119 while (m_passthroughRunning) {
120 std::unique_lock<std::mutex> lock(m_passthroughMutex);
121 m_passthroughCondition.wait(lock);
122
123 if (!m_passthroughRunning) {
124 break;
125 }
126
127 auto [filtered, queued] = m_encoder->encodeFrame(m_maxPendingFrames - m_pendingEncodeFrames);
128 m_pendingFilterFrames -= filtered;
129 m_pendingEncodeFrames += queued;
130
131 m_outputCondition.notify_all();
132 }
133 });
134 pthread_setname_np(m_passthroughThread.native_handle(), "PipeWireProduce::passthrough");
135
136 m_outputThread = std::thread([this]() {
137 m_outputRunning = true;
138 while (m_outputRunning) {
139 std::unique_lock<std::mutex> lock(m_outputMutex);
140 m_outputCondition.wait(lock);
141
142 if (!m_outputRunning) {
143 break;
144 }
145
146 auto received = m_encoder->receivePacket();
147 m_pendingEncodeFrames -= received;
148
149 // Notify the produce thread that the count of processed frames has
150 // changed and it can do cleanup if needed, making sure that that
151 // handling is done on the right thread.
152 QMetaObject::invokeMethod(this, &PipeWireProduce::handleEncodedFramesChanged, Qt::QueuedConnection);
153 }
154 });
155 pthread_setname_np(m_outputThread.native_handle(), "PipeWireProduce::output");
156}
157
158void PipeWireProduce::deactivate()
159{
160 m_deactivated = true;
161
162 auto streamState = PW_STREAM_STATE_PAUSED;
163 if (m_stream) {
164 streamState = m_stream->state();
165 m_stream->setActive(false);
166 }
167
168 // If we have not been initialized properly before, ensure we still run any
169 // cleanup code and exit the thread, otherwise we risk applications not closing
170 // properly.
171 if (!m_encoder || streamState != PW_STREAM_STATE_STREAMING) {
172 QMetaObject::invokeMethod(this, &PipeWireProduce::destroy, Qt::QueuedConnection);
173 }
174}
175
176void PipeWireProduce::destroy()
177{
178 // Ensure we cleanup the PipeWireSourceStream while in the same thread we
179 // created it in.
180 Q_ASSERT_X(QThread::currentThread() == thread(), "PipeWireProduce", "destroy() called from a different thread than PipeWireProduce's thread");
181
182 if (!m_stream) {
183 return;
184 }
185
186 if (m_passthroughThread.joinable()) {
187 m_passthroughRunning = false;
188 m_passthroughCondition.notify_all();
189 m_passthroughThread.join();
190 }
191
192 if (m_outputThread.joinable()) {
193 m_outputRunning = false;
194 m_outputCondition.notify_all();
195 m_outputThread.join();
196 }
197
198 m_stream.reset();
199
200 qCDebug(PIPEWIRERECORD_LOGGING) << "finished";
201 cleanup();
203}
204
205void PipeWireProduce::setQuality(const std::optional<quint8> &quality)
206{
207 m_quality = quality;
208 if (m_encoder) {
209 m_encoder->setQuality(quality);
210 }
211}
212
213void PipeWireProduce::setEncodingPreference(const PipeWireBaseEncodedStream::EncodingPreference &encodingPreference)
214{
215 m_encodingPreference = encodingPreference;
216
217 if (m_encoder) {
218 m_encoder->setEncodingPreference(encodingPreference);
219 }
220}
221
222void PipeWireProduce::processFrame(const PipeWireFrame &frame)
223{
224 auto f = frame;
225
226 if (frame.cursor) {
227 m_cursor.position = frame.cursor->position;
228 m_cursor.hotspot = frame.cursor->hotspot;
229 if (!frame.cursor->texture.isNull()) {
230 m_cursor.dirty = true;
231 m_cursor.texture = frame.cursor->texture;
232 }
233 }
234
235 auto pts = framePts(frame.presentationTimestamp);
236 if (m_previousPts >= 0 && pts <= m_previousPts) {
237 return;
238 }
239
240 auto frameTime = 1000.0 / (m_maxFramerate.numerator / m_maxFramerate.denominator);
241 if ((pts - m_previousPts) < frameTime) {
242 return;
243 }
244
245 if (m_pendingFilterFrames + 1 > m_maxPendingFrames) {
246 qCWarning(PIPEWIRERECORD_LOGGING) << "Filter queue is full, dropping frame" << pts;
247 return;
248 }
249
250 aboutToEncode(f);
251 if (!m_encoder->filterFrame(f)) {
252 return;
253 }
254
255 m_pendingFilterFrames++;
256 m_previousPts = pts;
257
258 m_passthroughCondition.notify_all();
259}
260
261void PipeWireProduce::stateChanged(pw_stream_state state)
262{
263 if (state != PW_STREAM_STATE_PAUSED || !m_deactivated) {
264 return;
265 }
266 if (!m_stream) {
267 qCDebug(PIPEWIRERECORD_LOGGING) << "finished without a stream";
268 return;
269 }
270
271 disconnect(m_stream.data(), &PipeWireSourceStream::frameReceived, this, &PipeWireProduce::processFrame);
272
273 if (m_pendingFilterFrames <= 0 && m_pendingEncodeFrames <= 0) {
274 // If we have nothing pending, cleanup immediately.
275 m_encoder->finish();
276
277 // We want to clean up the source stream while in the input thread, but we
278 // need to do so while not handling any PipeWire callback as that risks
279 // crashing because we're stil executing PipeWire handling code.
280 QMetaObject::invokeMethod(this, &PipeWireProduce::destroy, Qt::QueuedConnection);
281 } else {
282 // If we have pending frames, wait with cleanup until all frames have been processed.
283 qCDebug(PIPEWIRERECORD_LOGGING) << "Waiting for frame queues to empty, still pending filter" << m_pendingFilterFrames << "encode"
284 << m_pendingEncodeFrames;
285 m_passthroughCondition.notify_all();
286 }
287}
288
289void PipeWireProduce::handleEncodedFramesChanged()
290{
291 if (!m_deactivated) {
292 return;
293 }
294
295 // If we're deactivating but still have frames in the queue, we want to
296 // flush everything. Since at that point we are not receiving new frames, we
297 // need a different trigger to make the filtering thread process frames.
298 // Triggering here means the filter thread runs as fast as the encode thread
299 // can process the frames.
300 m_passthroughCondition.notify_all();
301
302 if (m_pendingFilterFrames <= 0) {
303 m_encoder->finish();
304
305 if (m_pendingEncodeFrames <= 0) {
306 destroy();
307 }
308 }
309}
310
311std::unique_ptr<Encoder> PipeWireProduce::makeEncoder()
312{
313 auto forcedEncoder = qEnvironmentVariable("KPIPEWIRE_FORCE_ENCODER");
314 if (!forcedEncoder.isNull()) {
315 qCWarning(PIPEWIRERECORD_LOGGING) << "Forcing encoder to" << forcedEncoder;
316 }
317
318 auto size = m_stream->size();
319
320 switch (m_encoderType) {
321 case PipeWireBaseEncodedStream::H264Baseline:
322 case PipeWireBaseEncodedStream::H264Main: {
323 auto profile = m_encoderType == PipeWireBaseEncodedStream::H264Baseline ? Encoder::H264Profile::Baseline : Encoder::H264Profile::Main;
324
325 if (forcedEncoder.isNull() || forcedEncoder == u"h264_vaapi") {
326 auto hardwareEncoder = std::make_unique<H264VAAPIEncoder>(profile, this);
327 hardwareEncoder->setQuality(m_quality);
328 hardwareEncoder->setEncodingPreference(m_encodingPreference);
329 if (hardwareEncoder->initialize(size)) {
330 return hardwareEncoder;
331 }
332 }
333
334 if (forcedEncoder.isNull() || forcedEncoder == u"libx264") {
335 auto softwareEncoder = std::make_unique<LibX264Encoder>(profile, this);
336 softwareEncoder->setQuality(m_quality);
337 softwareEncoder->setEncodingPreference(m_encodingPreference);
338 if (softwareEncoder->initialize(size)) {
339 return softwareEncoder;
340 }
341 }
342
343 // Try libopenh264 last, it's slower and has less features.
344 if (forcedEncoder.isNull() || forcedEncoder == u"libopenh264") {
345 auto softwareEncoder = std::make_unique<LibOpenH264Encoder>(profile, this);
346 softwareEncoder->setQuality(m_quality);
347 softwareEncoder->setEncodingPreference(m_encodingPreference);
348 if (softwareEncoder->initialize(size)) {
349 return softwareEncoder;
350 }
351 }
352 break;
353 }
354 case PipeWireBaseEncodedStream::VP8: {
355 if (forcedEncoder.isNull() || forcedEncoder == u"libvpx") {
356 auto encoder = std::make_unique<LibVpxEncoder>(this);
357 encoder->setQuality(m_quality);
358 if (encoder->initialize(size)) {
359 return encoder;
360 }
361 }
362 break;
363 }
364 case PipeWireBaseEncodedStream::VP9: {
365 if (forcedEncoder.isNull() || forcedEncoder == u"libvpx-vp9") {
366 auto encoder = std::make_unique<LibVpxVp9Encoder>(this);
367 encoder->setQuality(m_quality);
368 if (encoder->initialize(size)) {
369 return encoder;
370 }
371 }
372 break;
373 }
374 default:
375 qCWarning(PIPEWIRERECORD_LOGGING) << "Unknown encoder type" << m_encoderType;
376 }
377
378 return nullptr;
379}
380
381#include "moc_pipewireproduce_p.cpp"
bool invokeMethod(QObject *context, Functor &&function, FunctorReturnType *ret)
QueuedConnection
QFuture< typename qValueType< Iterator >::value_type > filtered(Iterator begin, Iterator end, KeepFunctor &&filterFunction)
QFuture< ArgsType< Signal > > connect(Sender *sender, Signal signal)
QThread * currentThread()
void quit()
This file is part of the KDE documentation.
Documentation copyright © 1996-2024 The KDE developers.
Generated on Fri Nov 22 2024 12:08:09 by doxygen 1.12.0 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.