Intel(R) Threading Building Blocks Doxygen Documentation version 4.2.3
Loading...
Searching...
No Matches
_flow_graph_impl.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2005-2020 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#ifndef __TBB_flow_graph_impl_H
18#define __TBB_flow_graph_impl_H
19
20#include "../tbb_stddef.h"
21#include "../task.h"
22#include "../task_arena.h"
23#include "../flow_graph_abstractions.h"
24
25#if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
26#include "../concurrent_priority_queue.h"
27#endif
28
29#include <list>
30
31#if TBB_DEPRECATED_FLOW_ENQUEUE
32#define FLOW_SPAWN(a) tbb::task::enqueue((a))
33#else
34#define FLOW_SPAWN(a) tbb::task::spawn((a))
35#endif
36
37#if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
38#define __TBB_FLOW_GRAPH_PRIORITY_EXPR( expr ) expr
39#define __TBB_FLOW_GRAPH_PRIORITY_ARG0( priority ) , priority
40#define __TBB_FLOW_GRAPH_PRIORITY_ARG1( arg1, priority ) arg1, priority
41#else
42#define __TBB_FLOW_GRAPH_PRIORITY_EXPR( expr )
43#define __TBB_FLOW_GRAPH_PRIORITY_ARG0( priority )
44#define __TBB_FLOW_GRAPH_PRIORITY_ARG1( arg1, priority ) arg1
45#endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
46
47#if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
48#define __TBB_DEPRECATED_LIMITER_EXPR( expr ) expr
49#define __TBB_DEPRECATED_LIMITER_ARG2( arg1, arg2 ) arg1, arg2
50#define __TBB_DEPRECATED_LIMITER_ARG4( arg1, arg2, arg3, arg4 ) arg1, arg3, arg4
51#else
52#define __TBB_DEPRECATED_LIMITER_EXPR( expr )
53#define __TBB_DEPRECATED_LIMITER_ARG2( arg1, arg2 ) arg1
54#define __TBB_DEPRECATED_LIMITER_ARG4( arg1, arg2, arg3, arg4 ) arg1, arg2
55#endif // TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
56
57namespace tbb {
58namespace flow {
59
60namespace internal {
61static tbb::task * const SUCCESSFULLY_ENQUEUED = (task *)-1;
62#if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
63typedef unsigned int node_priority_t;
65#endif
66}
67
68namespace interface10 {
69class graph;
70}
71
72namespace interface11 {
73
75
76#if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
80struct graph_task : public task {
81 graph_task( node_priority_t node_priority = no_priority ) : priority( node_priority ) {}
82 node_priority_t priority;
83};
84#else
85typedef task graph_task;
86#endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
87
88class graph_node;
89
90template <typename GraphContainerType, typename GraphNodeType>
93 friend class graph_node;
94public:
95 typedef size_t size_type;
96 typedef GraphNodeType value_type;
97 typedef GraphNodeType* pointer;
98 typedef GraphNodeType& reference;
99 typedef const GraphNodeType& const_reference;
100 typedef std::forward_iterator_tag iterator_category;
101
104
108 {}
109
112 if (this != &other) {
113 my_graph = other.my_graph;
115 }
116 return *this;
117 }
118
120 reference operator*() const;
121
123 pointer operator->() const;
124
126 bool operator==(const graph_iterator& other) const {
127 return ((my_graph == other.my_graph) && (current_node == other.current_node));
128 }
129
131 bool operator!=(const graph_iterator& other) const { return !(operator==(other)); }
132
136 return *this;
137 }
138
141 graph_iterator result = *this;
142 operator++();
143 return result;
144 }
145
146private:
147 // the graph over which we are iterating
148 GraphContainerType *my_graph;
149 // pointer into my_graph's my_nodes list
151
153 graph_iterator(GraphContainerType *g, bool begin);
154 void internal_forward();
155}; // class graph_iterator
156
157// flags to modify the behavior of the graph reset(). Can be combined.
160 rf_reset_bodies = 1 << 0, // delete the current node body, reset to a copy of the initial node body.
161 rf_clear_edges = 1 << 1 // delete edges
163
164namespace internal {
165
173
174#if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
176 bool operator()(const graph_task* left, const graph_task* right) {
177 return left->priority < right->priority;
178 }
179};
180
182
184public:
186 : my_priority_queue(priority_queue) {}
188 graph_task* t = NULL;
189 bool result = my_priority_queue.try_pop(t);
190 __TBB_ASSERT_EX( result, "Number of critical tasks for scheduler and tasks"
191 " in graph's priority queue mismatched" );
192 __TBB_ASSERT( t && t != SUCCESSFULLY_ENQUEUED,
193 "Incorrect task submitted to graph priority queue" );
195 "Tasks from graph's priority queue must have priority" );
196 task* t_next = t->execute();
197 task::destroy(*t);
198 return t_next;
199 }
200private:
202};
203#endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
204
205}
206
207} // namespace interfaceX
208namespace interface10 {
210
213
214 template< typename Body >
216 public:
217 run_task(Body& body
219 , tbb::flow::interface11::node_priority_t node_priority = tbb::flow::interface11::no_priority
220 ) : tbb::flow::interface11::graph_task(node_priority),
221#else
222 ) :
223#endif
224 my_body(body) { }
226 my_body();
227 return NULL;
228 }
229 private:
231 };
232
233 template< typename Receiver, typename Body >
235 public:
236 run_and_put_task(Receiver &r, Body& body) : my_receiver(r), my_body(body) {}
238 tbb::task *res = my_receiver.try_put_task(my_body());
239 if (res == tbb::flow::interface11::SUCCESSFULLY_ENQUEUED) res = NULL;
240 return res;
241 }
242 private:
243 Receiver &my_receiver;
245 };
246 typedef std::list<tbb::task *> task_list_type;
247
250 public:
253 };
254
258 public:
260 void operator()() const {
262 }
263 };
264
265 void prepare_task_arena(bool reinit = false) {
266 if (reinit) {
267 __TBB_ASSERT(my_task_arena, "task arena is NULL");
270 }
271 else {
272 __TBB_ASSERT(my_task_arena == NULL, "task arena is not NULL");
274 }
275 if (!my_task_arena->is_active()) // failed to attach
276 my_task_arena->initialize(); // create a new, default-initialized arena
277 __TBB_ASSERT(my_task_arena->is_active(), "task arena is not active");
278 }
279
280public:
282 graph();
283
285 explicit graph(tbb::task_group_context& use_this_context);
286
288
289 ~graph();
290
291#if TBB_PREVIEW_FLOW_GRAPH_TRACE
292 void set_name(const char *name);
293#endif
294
296 reserve_wait();
297 }
298
300 release_wait();
301 }
302
304
307
309
312
314
316 template< typename Receiver, typename Body >
317 __TBB_DEPRECATED void run(Receiver &r, Body body) {
319 task* rtask = new (task::allocate_additional_child_of(*root_task()))
322 }
323 }
324
326
328 template< typename Body >
329 __TBB_DEPRECATED void run(Body body) {
331 task* rtask = new (task::allocate_additional_child_of(*root_task())) run_task< Body >(body);
333 }
334 }
335
337
339 cancelled = false;
340 caught_exception = false;
341 if (my_root_task) {
342#if TBB_USE_EXCEPTIONS
343 try {
344#endif
346#if __TBB_TASK_GROUP_CONTEXT
348#endif
349#if TBB_USE_EXCEPTIONS
350 }
351 catch (...) {
353 my_context->reset();
354 caught_exception = true;
355 cancelled = true;
356 throw;
357 }
358#endif
359#if __TBB_TASK_GROUP_CONTEXT
360 // TODO: the "if" condition below is just a work-around to support the concurrent wait
361 // mode. The cancellation and exception mechanisms are still broken in this mode.
362 // Consider using task group not to re-implement the same functionality.
364 my_context->reset(); // consistent with behavior in catch()
365#endif
367#if __TBB_TASK_GROUP_CONTEXT
368 }
369#endif
370 }
371 }
372
375 return my_root_task;
376 }
377
378 // ITERATORS
379 template<typename C, typename N>
381
382 // Graph iterator typedefs
385
386 // Graph iterator constructors
388 iterator begin();
390 iterator end();
392 const_iterator begin() const;
394 const_iterator end() const;
396 const_iterator cbegin() const;
398 const_iterator cend() const;
399
401 bool is_cancelled() { return cancelled; }
403
404 // thread-unsafe state reset.
406
407private:
409#if __TBB_TASK_GROUP_CONTEXT
411#endif
417
419
423
425
426#if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
428#endif
429
437
439
440}; // class graph
441} // namespace interface10
442
443namespace interface11 {
444
446
447#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
448namespace internal{
449class get_graph_helper;
450}
451#endif
452
455 friend class graph;
456 template<typename C, typename N>
457 friend class graph_iterator;
458
459#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
460 friend class internal::get_graph_helper;
461#endif
462
463protected:
466public:
467 explicit graph_node(graph& g);
468
469 virtual ~graph_node();
470
471#if TBB_PREVIEW_FLOW_GRAPH_TRACE
472 virtual void set_name(const char *name) = 0;
473#endif
474
475#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
476 virtual void extract() = 0;
477#endif
478
479protected:
480 // performs the reset on an individual node.
482}; // class graph_node
483
484namespace internal {
485
486inline void activate_graph(graph& g) {
487 g.my_is_active = true;
488}
489
490inline void deactivate_graph(graph& g) {
491 g.my_is_active = false;
492}
493
494inline bool is_graph_active(graph& g) {
495 return g.my_is_active;
496}
497
498#if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
500 task* critical_task = &t;
501 // TODO: change flow graph's interfaces to work with graph_task type instead of tbb::task.
502 graph_task* gt = static_cast<graph_task*>(&t);
503 if( gt->priority != no_priority ) {
508 critical_task = new( gt->allocate_continuation() ) priority_task_selector(g.my_priority_queue);
509 tbb::internal::make_critical( *critical_task );
511 }
512 return *critical_task;
513}
514#else
516 return t;
517}
518#endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
519
521inline void spawn_in_graph_arena(graph& g, tbb::task& arena_task) {
522 if (is_graph_active(g)) {
523 graph::spawn_functor s_fn(prioritize_task(g, arena_task));
525 g.my_task_arena->execute(s_fn);
526 }
527}
528
530inline void enqueue_in_graph_arena(graph &g, tbb::task& arena_task) {
531 if (is_graph_active(g)) {
532 __TBB_ASSERT( g.my_task_arena && g.my_task_arena->is_active(), "Is graph's arena initialized and active?" );
533 task::enqueue(prioritize_task(g, arena_task), *g.my_task_arena);
534 }
535}
536
538 g.my_reset_task_list.push_back(tp);
539}
540
541} // namespace internal
542
543} // namespace interfaceX
544} // namespace flow
545} // namespace tbb
546
547#endif // __TBB_flow_graph_impl_H
#define FLOW_SPAWN(a)
Definition: flow_graph.h:65
#define __TBB_DEPRECATED
Definition: tbb_config.h:636
#define __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
Definition: tbb_config.h:861
#define __TBB_ASSERT_EX(predicate, comment)
"Extended" version is useful to suppress warnings if a variable is only used with an assert
Definition: tbb_stddef.h:167
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
#define __TBB_override
Definition: tbb_stddef.h:240
void const char const char int ITT_FORMAT __itt_group_sync x void const char * name
The graph class.
void make_critical(task &t)
Definition: task.h:1013
void spawn_in_graph_arena(tbb::flow::interface10::graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
void deactivate_graph(tbb::flow::interface10::graph &g)
void enqueue_in_graph_arena(tbb::flow::interface10::graph &g, tbb::task &arena_task)
Enqueues a task inside graph arena.
bool is_graph_active(tbb::flow::interface10::graph &g)
tbb::task & prioritize_task(tbb::flow::interface10::graph &g, tbb::task &arena_task)
tbb::concurrent_priority_queue< graph_task *, graph_task_comparator > graph_task_priority_queue_t
void add_task_to_graph_reset_list(tbb::flow::interface10::graph &g, tbb::task *tp)
void activate_graph(tbb::flow::interface10::graph &g)
static const node_priority_t no_priority
unsigned int node_priority_t
static tbb::task *const SUCCESSFULLY_ENQUEUED
void push(const_reference elem)
Pushes elem onto the queue, increasing capacity of queue if necessary.
bool try_pop(reference elem)
Gets a reference to and removes highest priority element.
Pure virtual template classes that define interfaces for async communication.
Base class for tasks generated by graph nodes.
graph_task(node_priority_t node_priority=no_priority)
graph_iterator & operator++()
Pre-increment.
reference operator*() const
Dereference.
Definition: flow_graph.h:755
graph_iterator operator++(int)
Post-increment.
graph_iterator(const graph_iterator &other)
Copy constructor.
bool operator!=(const graph_iterator &other) const
Inequality.
pointer operator->() const
Dereference.
Definition: flow_graph.h:761
graph_iterator(GraphContainerType *g, bool begin)
Private initializing constructor for begin() and end() iterators.
std::forward_iterator_tag iterator_category
bool operator==(const graph_iterator &other) const
Equality.
graph_iterator & operator=(const graph_iterator &other)
Assignment.
bool operator()(const graph_task *left, const graph_task *right)
priority_task_selector(graph_task_priority_queue_t &priority_queue)
task * execute() __TBB_override
Should be overridden by derived classes.
void register_node(tbb::flow::interface11::graph_node *n)
Definition: flow_graph.h:820
tbb::flow::interface11::graph_iterator< const graph, const tbb::flow::interface11::graph_node > const_iterator
const_iterator cbegin() const
start const iterator
Definition: flow_graph.h:874
bool is_cancelled()
return status of graph execution
__TBB_DEPRECATED void run(Receiver &r, Body body)
Spawns a task that runs a body and puts its output to a specific receiver.
std::list< tbb::task * > task_list_type
tbb::task_group_context * my_context
tbb::flow::interface11::graph_node * my_nodes
void reset(tbb::flow::interface11::reset_flags f=tbb::flow::interface11::rf_reset_protocol)
Definition: flow_graph.h:843
iterator end()
end iterator
Definition: flow_graph.h:868
graph()
Constructs a graph with isolated task_group_context.
Definition: flow_graph.h:774
const_iterator cend() const
end const iterator
Definition: flow_graph.h:876
void remove_node(tbb::flow::interface11::graph_node *n)
Definition: flow_graph.h:831
__TBB_DEPRECATED void run(Body body)
Spawns a task that runs a function object.
void reserve_wait() __TBB_override
Used to register that an external entity may still interact with the graph.
Definition: flow_graph.h:806
tbb::flow::interface11::graph_iterator< graph, tbb::flow::interface11::graph_node > iterator
__TBB_DEPRECATED void decrement_wait_count()
tbb::flow::interface11::internal::graph_task_priority_queue_t my_priority_queue
__TBB_DEPRECATED void increment_wait_count()
void wait_for_all()
Wait until graph is idle and decrement_wait_count calls equals increment_wait_count calls.
void prepare_task_arena(bool reinit=false)
tbb::flow::interface11::graph_node * my_nodes_last
__TBB_DEPRECATED tbb::task * root_task()
Returns the root task of the graph.
iterator begin()
start iterator
Definition: flow_graph.h:866
void release_wait() __TBB_override
Deregisters an external entity that may have interacted with the graph.
Definition: flow_graph.h:813
~graph()
Destroys the graph.
Definition: flow_graph.h:798
run_task(Body &body, tbb::flow::interface11::node_priority_t node_priority=tbb::flow::interface11::no_priority)
tbb::task * execute() __TBB_override
Should be overridden by derived classes.
tbb::task * execute() __TBB_override
Should be overridden by derived classes.
The base of all graph nodes.
virtual void reset_node(reset_flags f=rf_reset_protocol)=0
A lock that occupies a single byte.
Definition: spin_mutex.h:39
Used to form groups of tasks.
Definition: task.h:358
bool __TBB_EXPORTED_METHOD is_group_execution_cancelled() const
Returns true if the context received cancellation request.
uintptr_t traits() const
Returns the context's trait.
Definition: task.h:578
void __TBB_EXPORTED_METHOD reset()
Forcefully reinitializes the context after the task tree it was associated with is completed.
Base class for user-defined tasks.
Definition: task.h:615
virtual task * execute()=0
Should be overridden by derived classes.
static void enqueue(task &t)
Enqueue task for starvation-resistant execution.
Definition: task.h:836
internal::allocate_continuation_proxy & allocate_continuation()
Returns proxy for overloaded new that allocates a continuation task of *this.
Definition: task.h:676
void wait_for_all()
Wait for reference count to become one, and set reference count to zero.
Definition: task.h:819
void set_ref_count(int count)
Set reference count.
Definition: task.h:761
internal::return_type_or_void< F >::type execute(F &f)
Definition: task_arena.h:423
void initialize()
Forces allocation of the resources for the task_arena as specified in constructor arguments.
Definition: task_arena.h:315
Tag class used to indicate the "attaching" constructor.
Definition: task_arena.h:304
Base class for types that should not be assigned.
Definition: tbb_stddef.h:322
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:330

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.