7#include "pipewireproduce_p.h"
12#include <logging_record.h>
16#include <qstringliteral.h>
18#include "gifencoder_p.h"
19#include "h264vaapiencoder_p.h"
20#include "libopenh264encoder_p.h"
21#include "libvpxencoder_p.h"
22#include "libvpxvp9encoder_p.h"
23#include "libwebpencoder_p.h"
24#include "libx264encoder_p.h"
26#include "logging_frame_statistics.h"
32Q_DECLARE_METATYPE(std::optional<int>);
33Q_DECLARE_METATYPE(std::optional<std::chrono::nanoseconds>);
35PipeWireProduce::PipeWireProduce(PipeWireBaseEncodedStream::Encoder encoderType, uint nodeId, uint fd,
const Fraction &framerate)
38 , m_encoderType(encoderType)
40 , m_frameRate(framerate)
42 qRegisterMetaType<std::optional<int>>();
43 qRegisterMetaType<std::optional<std::chrono::nanoseconds>>();
46PipeWireProduce::~PipeWireProduce()
50void PipeWireProduce::initialize()
52 m_stream.reset(
new PipeWireSourceStream(
nullptr));
53 m_stream->setMaxFramerate(m_frameRate);
62 m_stream->setUsageHint(Encoder::supportsHardwareEncoding() ? PipeWireSourceStream::UsageHint::EncodeHardware
63 : PipeWireSourceStream::UsageHint::EncodeSoftware);
65 bool created = m_stream->createStream(m_nodeId, m_fd);
66 if (!created || !m_stream->error().isEmpty()) {
67 qCWarning(PIPEWIRERECORD_LOGGING) <<
"failed to set up stream for" << m_nodeId << m_stream->error();
68 m_error = m_stream->error();
69 m_stream.reset(
nullptr);
72 connect(m_stream.get(), &PipeWireSourceStream::streamParametersChanged,
this, &PipeWireProduce::setupStream);
74 if (PIPEWIRERECORDFRAMESTATS_LOGGING().isDebugEnabled()) {
75 m_frameStatisticsTimer = std::make_unique<QTimer>();
76 m_frameStatisticsTimer->setInterval(std::chrono::seconds(1));
78 qCDebug(PIPEWIRERECORDFRAMESTATS_LOGGING) <<
"Processed" << m_processedFrames <<
"frames in the last second.";
79 qCDebug(PIPEWIRERECORDFRAMESTATS_LOGGING) << m_pendingFilterFrames <<
"frames pending for filter.";
80 qCDebug(PIPEWIRERECORDFRAMESTATS_LOGGING) << m_pendingEncodeFrames <<
"frames pending for encode.";
81 m_processedFrames = 0;
93 m_frameRepeatTimer.reset(
new QTimer);
94 m_frameRepeatTimer->setSingleShot(
true);
95 m_frameRepeatTimer->setInterval(100);
100 if (!m_encoder->filterFrame(f)) {
104 m_pendingFilterFrames++;
105 m_passthroughCondition.notify_all();
109Fraction PipeWireProduce::maxFramerate()
const
111 return m_maxFramerate;
114void PipeWireProduce::setMaxFramerate(
const Fraction &framerate)
116 m_maxFramerate = framerate;
118 const double framesPerSecond =
static_cast<double>(framerate.numerator) / framerate.denominator;
119 if (m_frameRepeatTimer) {
120 m_frameRepeatTimer->setInterval((1000 / framesPerSecond) * 2);
123 m_stream->setMaxFramerate(framerate);
127int PipeWireProduce::maxPendingFrames()
const
129 return m_maxPendingFrames;
132void PipeWireProduce::setMaxPendingFrames(
int newMaxBufferSize)
134 if (newMaxBufferSize < 3) {
135 qCWarning(PIPEWIRERECORD_LOGGING) <<
"Maxmimum pending frame count of " << newMaxBufferSize <<
" requested. Value must be 3 or higher.";
136 newMaxBufferSize = 3;
138 m_maxPendingFrames = newMaxBufferSize;
141void PipeWireProduce::setupStream()
143 qCDebug(PIPEWIRERECORD_LOGGING) <<
"Setting up stream";
144 disconnect(m_stream.get(), &PipeWireSourceStream::streamParametersChanged,
this, &PipeWireProduce::setupStream);
146 m_encoder = makeEncoder();
148 qCWarning(PIPEWIRERECORD_LOGGING) <<
"No encoder could be created";
152 connect(m_stream.get(), &PipeWireSourceStream::stateChanged,
this, &PipeWireProduce::stateChanged);
153 if (!setupFormat()) {
154 qCWarning(PIPEWIRERECORD_LOGGING) <<
"Could not set up the producing thread";
158 connect(m_stream.data(), &PipeWireSourceStream::frameReceived,
this, &PipeWireProduce::processFrame);
160 m_passthroughThread = std::thread([
this]() {
161 m_passthroughRunning =
true;
162 while (m_passthroughRunning) {
163 std::unique_lock<std::mutex> lock(m_passthroughMutex);
164 m_passthroughCondition.wait(lock);
166 if (!m_passthroughRunning) {
170 auto [
filtered, queued] = m_encoder->encodeFrame(m_maxPendingFrames - m_pendingEncodeFrames);
172 m_pendingEncodeFrames += queued;
174 m_outputCondition.notify_all();
177 pthread_setname_np(m_passthroughThread.native_handle(),
"PipeWireProduce::passthrough");
179 m_outputThread = std::thread([
this]() {
180 m_outputRunning =
true;
181 while (m_outputRunning) {
182 std::unique_lock<std::mutex> lock(m_outputMutex);
183 m_outputCondition.wait(lock);
185 if (!m_outputRunning) {
189 auto received = m_encoder->receivePacket();
190 m_pendingEncodeFrames -= received;
191 m_processedFrames += received;
199 pthread_setname_np(m_outputThread.native_handle(),
"PipeWireProduce::output");
201 if (m_frameStatisticsTimer) {
202 m_frameStatisticsTimer->start();
206void PipeWireProduce::deactivate()
208 m_deactivated =
true;
210 auto streamState = PW_STREAM_STATE_PAUSED;
212 streamState = m_stream->state();
213 m_stream->setActive(
false);
219 if (!m_encoder || streamState != PW_STREAM_STATE_STREAMING) {
224void PipeWireProduce::destroy()
228 Q_ASSERT_X(
QThread::currentThread() == thread(),
"PipeWireProduce",
"destroy() called from a different thread than PipeWireProduce's thread");
234 m_frameRepeatTimer->stop();
236 m_frameStatisticsTimer =
nullptr;
238 if (m_passthroughThread.joinable()) {
239 m_passthroughRunning =
false;
240 m_passthroughCondition.notify_all();
241 m_passthroughThread.join();
244 if (m_outputThread.joinable()) {
245 m_outputRunning =
false;
246 m_outputCondition.notify_all();
247 m_outputThread.join();
252 qCDebug(PIPEWIRERECORD_LOGGING) <<
"finished";
257void PipeWireProduce::setQuality(
const std::optional<quint8> &quality)
261 m_encoder->setQuality(quality);
265void PipeWireProduce::setEncodingPreference(
const PipeWireBaseEncodedStream::EncodingPreference &encodingPreference)
267 m_encodingPreference = encodingPreference;
270 m_encoder->setEncodingPreference(encodingPreference);
274void PipeWireProduce::processFrame(
const PipeWireFrame &frame)
279 m_frameRepeatTimer->start();
282 m_cursor.position = frame.cursor->position;
283 m_cursor.hotspot = frame.cursor->hotspot;
284 if (!frame.cursor->texture.isNull()) {
285 m_cursor.dirty =
true;
286 m_cursor.texture = frame.cursor->texture;
290 auto pts = framePts(frame.presentationTimestamp);
291 if (m_previousPts >= 0 && pts <= m_previousPts) {
295 auto frameTime = 1000.0 / (m_maxFramerate.numerator / m_maxFramerate.denominator);
296 if ((pts - m_previousPts) < frameTime) {
300 if (m_pendingFilterFrames + 1 > m_maxPendingFrames) {
301 qCWarning(PIPEWIRERECORD_LOGGING) <<
"Filter queue is full, dropping frame" << pts;
306 if (!m_encoder->filterFrame(f)) {
310 m_pendingFilterFrames++;
313 m_passthroughCondition.notify_all();
316void PipeWireProduce::stateChanged(pw_stream_state state)
318 if (state != PW_STREAM_STATE_PAUSED || !m_deactivated) {
322 qCDebug(PIPEWIRERECORD_LOGGING) <<
"finished without a stream";
326 disconnect(m_stream.data(), &PipeWireSourceStream::frameReceived,
this, &PipeWireProduce::processFrame);
328 if (m_pendingFilterFrames <= 0 && m_pendingEncodeFrames <= 0) {
338 qCDebug(PIPEWIRERECORD_LOGGING) <<
"Waiting for frame queues to empty, still pending filter" << m_pendingFilterFrames <<
"encode"
339 << m_pendingEncodeFrames;
340 m_passthroughCondition.notify_all();
344void PipeWireProduce::handleEncodedFramesChanged()
346 if (!m_deactivated) {
355 m_passthroughCondition.notify_all();
357 if (m_pendingFilterFrames <= 0) {
360 if (m_pendingEncodeFrames <= 0) {
366std::unique_ptr<Encoder> PipeWireProduce::makeEncoder()
368 auto forcedEncoder = qEnvironmentVariable(
"KPIPEWIRE_FORCE_ENCODER");
369 if (!forcedEncoder.isNull()) {
370 qCWarning(PIPEWIRERECORD_LOGGING) <<
"Forcing encoder to" << forcedEncoder;
373 auto size = m_stream->size();
375 switch (m_encoderType) {
376 case PipeWireBaseEncodedStream::H264Baseline:
377 case PipeWireBaseEncodedStream::H264Main: {
378 auto profile = m_encoderType == PipeWireBaseEncodedStream::H264Baseline ? Encoder::H264Profile::Baseline : Encoder::H264Profile::Main;
380 if (forcedEncoder.isNull() || forcedEncoder == u
"h264_vaapi") {
381 auto hardwareEncoder = std::make_unique<H264VAAPIEncoder>(profile,
this);
382 hardwareEncoder->setQuality(m_quality);
383 hardwareEncoder->setEncodingPreference(m_encodingPreference);
384 if (hardwareEncoder->initialize(size)) {
385 return hardwareEncoder;
389 if (forcedEncoder.isNull() || forcedEncoder == u
"libx264") {
390 auto softwareEncoder = std::make_unique<LibX264Encoder>(profile,
this);
391 softwareEncoder->setQuality(m_quality);
392 softwareEncoder->setEncodingPreference(m_encodingPreference);
393 if (softwareEncoder->initialize(size)) {
394 return softwareEncoder;
399 if (forcedEncoder.isNull() || forcedEncoder == u
"libopenh264") {
400 auto softwareEncoder = std::make_unique<LibOpenH264Encoder>(profile,
this);
401 softwareEncoder->setQuality(m_quality);
402 softwareEncoder->setEncodingPreference(m_encodingPreference);
403 if (softwareEncoder->initialize(size)) {
404 return softwareEncoder;
409 case PipeWireBaseEncodedStream::VP8: {
410 if (forcedEncoder.isNull() || forcedEncoder == u
"libvpx") {
411 auto encoder = std::make_unique<LibVpxEncoder>(
this);
412 encoder->setQuality(m_quality);
413 if (encoder->initialize(size)) {
419 case PipeWireBaseEncodedStream::VP9: {
420 if (forcedEncoder.isNull() || forcedEncoder == u
"libvpx-vp9") {
421 auto encoder = std::make_unique<LibVpxVp9Encoder>(
this);
422 encoder->setQuality(m_quality);
423 if (encoder->initialize(size)) {
429 case PipeWireBaseEncodedStream::Gif: {
430 if (forcedEncoder.isNull() || forcedEncoder == u
"gif") {
431 auto encoder = std::make_unique<GifEncoder>(
this);
432 if (encoder->initialize(size)) {
438 case PipeWireBaseEncodedStream::WebP: {
439 if (forcedEncoder.isNull() || forcedEncoder == u
"libwebp") {
440 auto encoder = std::make_unique<LibWebPEncoder>(
this);
441 encoder->setQuality(m_quality);
442 if (encoder->initialize(size)) {
449 qCWarning(PIPEWIRERECORD_LOGGING) <<
"Unknown encoder type" << m_encoderType;
455#include "moc_pipewireproduce_p.cpp"
QFuture< typename qValueType< Iterator >::value_type > filtered(Iterator begin, Iterator end, KeepFunctor &&filterFunction)
QFuture< ArgsType< Signal > > connect(Sender *sender, Signal signal)
QThread * currentThread()