KASync

job_impl.h
1/*
2 SPDX-FileCopyrightText: 2014-2015 Daniel Vrátil <dvratil@redhat.com>
3 SPDX-FileCopyrightText: 2015-2016 Daniel Vrátil <dvratil@kde.org>
4 SPDX-FileCopyrightText: 2016 Christian Mollekopf <mollekopf@kolabsystems.com>
5
6 SPDX-License-Identifier: LGPL-2.0-or-later
7*/
8
9#ifndef KASYNC_JOB_IMPL_H
10#define KASYNC_JOB_IMPL_H
11
12#include "async.h"
13#include "traits_p.h"
14
15#include <QTimer>
16
17//@cond PRIVATE
18
19namespace KAsync
20{
21
22template<typename Out, typename ... In>
23template<typename ... InOther>
24Job<Out, In ...>::operator std::conditional_t<std::is_void<OutType>::value, IncompleteType, Job<void>> ()
25{
26 return thenImpl<void, InOther ...>({JobContinuation<void, InOther ...>([](InOther ...){ return KAsync::null<void>(); })}, {});
27}
28
29template<typename Out, typename ... In>
30template<typename OutOther, typename ... InOther>
31Job<OutOther, In ...> Job<Out, In ...>::thenImpl(Private::ContinuationHolder<OutOther, InOther ...> workHelper,
32 Private::ExecutionFlag execFlag) const
33{
34 thenInvariants<InOther ...>();
35 return Job<OutOther, In ...>(QSharedPointer<Private::Executor<OutOther, InOther ...>>::create(
36 std::forward<Private::ContinuationHolder<OutOther, InOther ...>>(workHelper), mExecutor, execFlag));
37}
38
39template<typename Out, typename ... In>
40template<typename OutOther, typename ... InOther>
41Job<OutOther, In ...> Job<Out, In ...>::then(const Job<OutOther, InOther ...> &job) const
42{
43 thenInvariants<InOther ...>();
44 auto executor = job.mExecutor;
45 executor->prepend(mExecutor);
46 return Job<OutOther, In ...>(executor);
47}
48
49template<typename Out, typename ... In>
50Job<Out, In ...> Job<Out, In ...>::onError(SyncErrorContinuation<void> &&errorFunc) const
51{
52 return Job<Out, In...>(QSharedPointer<Private::Executor<Out, Out>>::create(
53 // Extra indirection to allow propagating the result of a previous future when no
54 // error occurs
55 Private::ContinuationHolder<Out, Out>([errorFunc = std::move(errorFunc)](const Error &error, const Out &val) {
56 errorFunc(error);
57 return val;
58 }), mExecutor, Private::ExecutionFlag::ErrorCase));
59}
60
61template<> // Specialize for void jobs
62inline Job<void> Job<void>::onError(SyncErrorContinuation<void> &&errorFunc) const
63{
64 return Job<void>(QSharedPointer<Private::Executor<void>>::create(
65 Private::ContinuationHolder<void>(std::forward<SyncErrorContinuation<void>>(errorFunc)),
66 mExecutor, Private::ExecutionFlag::ErrorCase));
67}
68
69template<typename Out, typename ... In>
70template<typename FirstIn>
71KAsync::Future<Out> Job<Out, In ...>::exec(FirstIn in)
72{
73 // Inject a fake sync executor that will return the initial value
74 Private::ExecutorBasePtr first = mExecutor;
75 while (first->mPrev) {
76 first = first->mPrev;
77 }
78
79 first->mPrev = QSharedPointer<Private::Executor<FirstIn>>::create(
80 Private::ContinuationHolder<FirstIn>([val = std::move(in)](Future<FirstIn> &future) {
81 future.setResult(val);
82 }));
83
84 auto result = exec();
85 // Remove the injected executor
86 first->mPrev.reset();
87 return result;
88}
89
90template<typename Out, typename ... In>
91KAsync::Future<Out> Job<Out, In ...>::exec()
92{
93 Private::ExecutionPtr execution = mExecutor->exec(mExecutor, Private::ExecutionContext::Ptr::create());
94 KAsync::Future<Out> result = *execution->result<Out>();
95
96 return result;
97}
98
99template<typename Out, typename ... In>
100Job<Out, In ...>::Job(Private::ExecutorBasePtr executor)
101 : JobBase(executor)
102{}
103
104template<typename Out, typename ... In>
105Job<Out, In ...>::Job(JobContinuation<Out, In ...> &&func)
106 : JobBase(new Private::Executor<Out, In ...>(std::forward<JobContinuation<Out, In ...>>(func), {}))
107{
108 qWarning() << "Creating job job";
109 static_assert(sizeof...(In) <= 1, "Only one or zero input parameters are allowed.");
110}
111
112template<typename Out, typename ... In>
113template<typename OutOther>
114void Job<Out, In ...>::eachInvariants() const
115{
116 static_assert(traits::isContainer<Out>::value,
117 "The 'Each' task can only be connected to a job that returns a list or an array.");
118 static_assert(std::is_void<OutOther>::value || traits::isContainer<OutOther>::value,
119 "The result type of 'Each' task must be void, a list or an array.");
120}
121
122template<typename Out, typename ... In>
123template<typename InOtherFirst, typename ... InOtherTail>
124void Job<Out, In ...>::thenInvariants() const
125{
126 static_assert(!std::is_void<Out>::value && (std::is_convertible<Out, InOtherFirst>::value || std::is_base_of<Out, InOtherFirst>::value),
127 "The return type of previous task must be compatible with input type of this task");
128}
129
130template<typename Out, typename ... In>
131template<typename ... InOther>
132auto Job<Out, In ...>::thenInvariants() const -> std::enable_if_t<(sizeof...(InOther) == 0)>
133{
134}
135
136template<template<typename> class Container>
137KAsync::Job<void> waitForCompletion(Container<KAsync::Future<void>> &futures)
138{
139 struct Context {
140 void removeWatcher(KAsync::FutureWatcher<void> *w)
141 {
142 pending.erase(std::remove_if(pending.begin(), pending.end(), [w](const auto &watcher) {
143 return w == watcher.get();
144 }));
145 }
146
147 std::vector<std::unique_ptr<KAsync::FutureWatcher<void>>> pending;
148 };
149
150 return start<Context *>([]() {
151 return new Context();
152 })
153 .template then<Context*, Context*>([futures](Context *context, KAsync::Future<Context *> &future) {
154 for (KAsync::Future<void> subFuture : futures) {
155 if (subFuture.isFinished()) {
156 continue;
157 }
158 // FIXME bind lifetime all watcher to future (respectively the main job)
159 auto watcher = std::make_unique<KAsync::FutureWatcher<void>>();
160 QObject::connect(watcher.get(), &KAsync::FutureWatcher<void>::futureReady,
161 [&future, watcher = watcher.get(), context]() {
162 context->removeWatcher(watcher);
163 if (context->pending.empty()) {
164 future.setResult(context);
165 }
166 });
167 watcher->setFuture(subFuture);
168 context->pending.push_back(std::move(watcher));
169 }
170 if (context->pending.empty()) {
171 future.setResult(context);
172 }
173 })
174 .template then<void, Context*>([](Context *context) {
175 delete context;
176 });
177 // .finally<void>([context]() { delete context; });
178}
179
180template<typename List, typename ValueType>
181Job<void, List> forEach(KAsync::Job<void, ValueType> job)
182{
183 auto cont = [job] (const List &values) mutable {
186 for (const auto &v : values) {
187 auto future = job
188 .template then<void>([error] (const KAsync::Error &e) {
189 if (e && !*error) {
190 //TODO ideally we would aggregate the errors instead of just using the first one
191 *error = e;
192 }
193 })
194 .exec(v);
195 list.push_back(future);
196 }
197 return waitForCompletion(list)
198 .then<void>([error](KAsync::Future<void> &future) {
199 if (*error) {
200 future.setError(*error);
201 } else {
202 future.setFinished();
203 }
204 });
205 };
206 return Job<void, List>(QSharedPointer<Private::Executor<void, List>>::create(
207 Private::ContinuationHolder<void, List>(JobContinuation<void, List>(std::move(cont))), nullptr, Private::ExecutionFlag::GoodCase));
208}
209
210
211template<typename List, typename ValueType>
212Job<void, List> serialForEach(KAsync::Job<void, ValueType> job)
213{
214 auto cont = [job] (const List &values) mutable {
216 auto serialJob = KAsync::null<void>();
217 for (const auto &value : values) {
218 serialJob = serialJob.then<void>([value, job, error](KAsync::Future<void> &future) {
219 job.template then<void>([&future, error] (const KAsync::Error &e) {
220 if (e && !*error) {
221 //TODO ideally we would aggregate the errors instead of just using the first one
222 *error = e;
223 }
224 future.setFinished();
225 })
226 .exec(value);
227 });
228 }
229 return serialJob
230 .then<void>([error](KAsync::Future<void> &future) {
231 if (*error) {
232 future.setError(*error);
233 } else {
234 future.setFinished();
235 }
236 });
237 };
238 return Job<void, List>(QSharedPointer<Private::Executor<void, List>>::create(
239 Private::ContinuationHolder<void, List>(JobContinuation<void, List>(std::move(cont))), nullptr, Private::ExecutionFlag::GoodCase));
240}
241
242template<typename List, typename ValueType>
243Job<void, List> forEach(JobContinuation<void, ValueType> &&func)
244{
245 return forEach<List, ValueType>(KAsync::start<void, ValueType>(std::forward<JobContinuation<void, ValueType>>(func)));
246}
247
248template<typename List, typename ValueType>
249Job<void, List> serialForEach(JobContinuation<void, ValueType> &&func)
250{
251 return serialForEach<List, ValueType>(KAsync::start<void, ValueType>(std::forward<JobContinuation<void, ValueType>>(func)));
252}
253
254template<typename Out>
255Job<Out> null()
256{
257 return KAsync::start<Out>(
258 [](KAsync::Future<Out> &future) {
259 future.setFinished();
260 });
261}
262
263template<typename Out>
264Job<Out> value(Out v)
265{
266 return KAsync::start<Out>(
267 [val = std::move(v)](KAsync::Future<Out> &future) {
268 future.setResult(val);
269 });
270}
271
272template<typename Out>
273Job<Out> error(int errorCode, const QString &errorMessage)
274{
275 return error<Out>({errorCode, errorMessage});
276}
277
278template<typename Out>
279Job<Out> error(const char *message)
280{
281 return error<Out>(Error(message));
282}
283
284template<typename Out>
285Job<Out> error(const Error &error)
286{
287 return KAsync::start<Out>(
288 [error](KAsync::Future<Out> &future) {
289 future.setError(error);
290 });
291}
292
293inline Job<void> doWhile(const Job<ControlFlowFlag> &body)
294{
295 return KAsync::start<void>([body] (KAsync::Future<void> &future) {
296 auto job = body.then<void, ControlFlowFlag>([&future, body](const KAsync::Error &error, ControlFlowFlag flag) {
297 if (error) {
298 future.setError(error);
299 future.setFinished();
300 } else if (flag == ControlFlowFlag::Continue) {
301 doWhile(body).then<void>([&future](const KAsync::Error &error) {
302 if (error) {
303 future.setError(error);
304 }
305 future.setFinished();
306 }).exec();
307 } else {
308 future.setFinished();
309 }
310 }).exec();
311 });
312}
313
314inline Job<void> doWhile(const JobContinuation<ControlFlowFlag> &body)
315{
316 return doWhile(KAsync::start<ControlFlowFlag>([body] {
317 return body();
318 }));
319}
320
321inline Job<void> wait(int delay)
322{
323 return KAsync::start<void>([delay](KAsync::Future<void> &future) {
324 QTimer::singleShot(delay, [&future]() {
325 future.setFinished();
326 });
327 });
328}
329} // namespace KAsync
330
331//@endcond
332
333#endif // KASYNC_JOB_IMPL_H
Q_SCRIPTABLE Q_NOREPLY void start()
KCALUTILS_EXPORT QString errorMessage(const KCalendarCore::Exception &exception)
void error(QWidget *parent, const QString &text, const QString &title, const KGuiItem &buttonOk, Options options=Notify)
KIOCORE_EXPORT QStringList list(const QString &fileClass)
KGuiItem cont()
KGuiItem forward(BidiMode useBidi=IgnoreRTL)
void forEach(const typename Trait::template Vector< ItemType > &types, std::shared_ptr< Document< Trait > > doc, ItemFunctor< Trait > func, unsigned int maxNestingLevel=0)
void push_back(parameter_type value)
QMetaObject::Connection connect(const QObject *sender, PointerToMemberFunction signal, Functor functor)
QSharedPointer< T > create(Args &&... args)
This file is part of the KDE documentation.
Documentation copyright © 1996-2024 The KDE developers.
Generated on Fri Nov 29 2024 11:49:43 by doxygen 1.12.0 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.