forked from dougbinks/enkiTS
-
Notifications
You must be signed in to change notification settings - Fork 0
/
TaskScheduler.h
214 lines (176 loc) · 8.97 KB
/
TaskScheduler.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
// Copyright (c) 2013 Doug Binks
//
// This software is provided 'as-is', without any express or implied
// warranty. In no event will the authors be held liable for any damages
// arising from the use of this software.
//
// Permission is granted to anyone to use this software for any purpose,
// including commercial applications, and to alter it and redistribute it
// freely, subject to the following restrictions:
//
// 1. The origin of this software must not be misrepresented; you must not
// claim that you wrote the original software. If you use this software
// in a product, an acknowledgement in the product documentation would be
// appreciated but is not required.
// 2. Altered source versions must be plainly marked as such, and must not be
// misrepresented as being the original software.
// 3. This notice may not be removed or altered from any source distribution.
#pragma once
#include <stdint.h>
#include "Threads.h"
#include "Atomics.h"
namespace enki
{
struct TaskSetPartition
{
uint32_t start;
uint32_t end;
};
class TaskScheduler;
class TaskPipe;
class PinnedTaskList;
struct ThreadArgs;
struct SubTaskSet;
// ICompletable is a base class used to check for completion.
// Do not use this class directly, instead derive from ITaskSet or IPinnedTask.
class ICompletable
{
public:
ICompletable() : m_RunningCount(0) {}
bool GetIsComplete() {
bool bRet = ( 0 == m_RunningCount );
BASE_MEMORYBARRIER_ACQUIRE();
return bRet; }
private:
friend class TaskScheduler;
volatile int32_t m_RunningCount;
};
// Subclass ITaskSet to create tasks.
// TaskSets can be re-used, but check completion first.
class ITaskSet : public ICompletable
{
public:
ITaskSet()
: m_SetSize(1)
, m_MinRange(1)
, m_RangeToRun(1)
{}
ITaskSet( uint32_t setSize_ )
: m_SetSize( setSize_ )
, m_MinRange(1)
, m_RangeToRun(1)
{}
ITaskSet( uint32_t setSize_, uint32_t minRange_ )
: m_SetSize( setSize_ )
, m_MinRange( minRange_ )
, m_RangeToRun(minRange_)
{}
// Execute range should be overloaded to process tasks. It will be called with a
// range_ where range.start >= 0; range.start < range.end; and range.end < m_SetSize;
// The range values should be mapped so that linearly processing them in order is cache friendly
// i.e. neighbouring values should be close together.
// threadnum should not be used for changing processing of data, it's intended purpose
// is to allow per-thread data buckets for output.
virtual void ExecuteRange( TaskSetPartition range, uint32_t threadnum ) = 0;
// Size of set - usually the number of data items to be processed, see ExecuteRange. Defaults to 1
uint32_t m_SetSize;
// Minimum size of of TaskSetPartition range when splitting a task set into partitions.
// This should be set to a value which results in computation effort of at least 10k
// clock cycles to minimize tast scheduler overhead.
// NOTE: The last partition will be smaller than m_MinRange if m_SetSize is not a multiple
// of m_MinRange.
// Also known as grain size in literature.
uint32_t m_MinRange;
private:
friend class TaskScheduler;
uint32_t m_RangeToRun;
};
// Subclass IPinnedTask to create tasks which cab be run on a given thread only.
class IPinnedTask : public ICompletable
{
public:
IPinnedTask() : threadNum(0), pNext(NULL) {} // default is to run a task on main thread
IPinnedTask( uint32_t threadNum_ ) : threadNum(threadNum_), pNext(NULL) {} // default is to run a task on main thread
// IPinnedTask needs to be non abstract for intrusive list functionality.
// Should never be called as should be overridden.
virtual void Execute() { assert(false); }
uint32_t threadNum; // thread to run this pinned task on
IPinnedTask* volatile pNext; // Do not use. For intrusive list only.
};
// TaskScheduler implements several callbacks intended for profilers
typedef void (*ProfilerCallbackFunc)( uint32_t threadnum_ );
struct ProfilerCallbacks
{
ProfilerCallbackFunc threadStart;
ProfilerCallbackFunc threadStop;
ProfilerCallbackFunc waitStart;
ProfilerCallbackFunc waitStop;
};
class TaskScheduler
{
public:
TaskScheduler();
~TaskScheduler();
// Call either Initialize() or Initialize( numThreads_ ) before adding tasks.
// Initialize() will create GetNumHardwareThreads()-1 threads, which is
// sufficient to fill the system when including the main thread.
// Initialize can be called multiple times - it will wait for completion
// before re-initializing.
void Initialize();
// Initialize( numThreads_ ) - numThreads_ (must be > 0)
// will create numThreads_-1 threads, as thread 0 is
// the thread on which the initialize was called.
void Initialize( uint32_t numThreads_ );
// Adds the TaskSet to pipe and returns if the pipe is not full.
// If the pipe is full, pTaskSet is run.
// should only be called from main thread, or within a task
void AddTaskSetToPipe( ITaskSet* pTaskSet );
// Thread 0 is main thread, otherwise use threadNum
void AddPinnedTask( IPinnedTask* pTask_ );
// This function will run any IPinnedTask* for current thread, but not run other
// Main thread should call this or use a wait to ensure it's tasks are run.
void RunPinnedTasks();
// Runs the TaskSets in pipe until true == pTaskSet->GetIsComplete();
// should only be called from thread which created the taskscheduler , or within a task
// if called with 0 it will try to run tasks, and return if none available.
void WaitforTask( const ICompletable* pCompletable_ );
// WaitforTaskSet, deprecated interface use WaitforTask
inline void WaitforTaskSet( const ICompletable* pCompletable_ ) { WaitforTask( pCompletable_ ); }
// Waits for all task sets to complete - not guaranteed to work unless we know we
// are in a situation where tasks aren't being continuosly added.
void WaitforAll();
// Waits for all task sets to complete and shutdown threads - not guaranteed to work unless we know we
// are in a situation where tasks aren't being continuosly added.
void WaitforAllAndShutdown();
// Returns the number of threads created for running tasks + 1
// to account for the main thread.
uint32_t GetNumTaskThreads() const;
// Returns the ProfilerCallbacks structure so that it can be modified to
// set the callbacks.
ProfilerCallbacks* GetProfilerCallbacks();
private:
static THREADFUNC_DECL TaskingThreadFunction( void* pArgs );
void WaitForTasks( uint32_t threadNum );
void RunPinnedTasks( uint32_t threadNum );
bool TryRunTask( uint32_t threadNum, uint32_t& hintPipeToCheck_io_ );
void StartThreads();
void StopThreads( bool bWait_ );
void SplitAndAddTask( uint32_t threadNum_, SubTaskSet subTask_, uint32_t rangeToSplit_ );
void WakeThreads( int32_t maxToWake_ = 0 );
TaskPipe* m_pPipesPerThread;
PinnedTaskList* m_pPinnedTaskListPerThread;
uint32_t m_NumThreads;
ThreadArgs* m_pThreadArgStore;
threadid_t* m_pThreadIDs;
volatile bool m_bRunning;
volatile int32_t m_NumThreadsRunning;
volatile int32_t m_NumThreadsWaiting;
uint32_t m_NumPartitions;
uint32_t m_NumInitialPartitions;
semaphoreid_t m_NewTaskSemaphore;
bool m_bHaveThreads;
ProfilerCallbacks m_ProfilerCallbacks;
TaskScheduler( const TaskScheduler& nocopy );
TaskScheduler& operator=( const TaskScheduler& nocopy );
};
}