1
1

Compare commits

...

9 Commits

Author SHA1 Message Date
fe226dc5b3 Do less nanosleep loops.
tested that already without much change yesterday, but for some reasons
today it gives me another 2-3% speedup in both test files.

And it should also mitigate the (supposed) almost-starving situation,
hopefully.
2017-03-03 17:42:33 +01:00
9dab894e64 Inline all task-pushing helpers.
Those are small enough to be worth it, and it does give me ~2% speedup here...
2017-03-03 17:42:33 +01:00
08b0b1c439 Revert "Attempt to address nearly-starving cases."
This reverts commit 32959917ee112200125e3e742afb528fc2196072.

Definitively gives worse performances. Looks like every overhead we add
to task management is always worse than potential better scheduling it
might give us...
2017-03-03 17:42:33 +01:00
0c451bc530 Attempt to address nearly-starving cases.
Idea here is to reduce number of threads a pool is allowed to work on,
in case it does not get tasks quickly enough.

This does not seem to be really great result (have to only do the checks
once every 200 tasks pushed to avoid too much overhead), but I cannot
reproduce that nearly-starving case here so far. @sergey, curious if it
gives any difference on your 12cores with 14_03_G?
2017-03-03 17:42:33 +01:00
1ea1b04e54 Never starve main thread (run_and_wait) from tasks! 2017-03-03 17:42:33 +01:00
1659d42bf7 Fix use-after-free concurrent issues. 2017-03-03 17:42:33 +01:00
fbe84185de Some minor changes from review. 2017-03-03 17:42:33 +01:00
599f314f1d Cleanup, factorization, comments, and some fixes for potential issues. 2017-03-03 17:42:33 +01:00
e0d486eb36 Attempt to address performances issues of task scheduler with lots of very small tasks.
This is partially  based on Sergey's work from D2421, but pushing the things
a bit further. Basically:
- We keep a sheduler-counter of TODO tasks, which avoids us to do any
locking (even of the spinlock) when queue is empty, in workers.
- We spin/nanosleep a bit (less than a ms) when we cannot find a task,
before going into real condition-waiting sleep.
- We keep a counter of condition-sleeping threads, and only use
condition notifications in case we know some are waiting on it.

In other words, when no tasks are available, we spend a bit of time in a
rather high-activity but very cheap and totally lock-free loop, before
going into more expansive real condition-waiting sleep.

No noticeable speedup in complex production scene (barbershop one), here
master, D2421 and this code give roughly same performances (about 30%
slower in new than in old despgraph).

But with testfile from T50027 and new depsgraph, after initial bake,
with master I have ~14fps, with D2421 ~14.5fps, and with this code ~19.5fps.

Note that in theory, we could get completely rid of condition and stay
in the nanosleep loop, but this implies some rather high 'noise' (about
3% of CPU usage here with 8 cores), and going into condition-waiting
state after a few hundreds of micro-seconds does not give any measurable
slow down for me.

Also note that this code is only working on POSIX systems (so no Windows, not
sure how to do our nanosleeps on this OS :/ ).

Reviewers: sergey

Differential Revision: https://developer.blender.org/D2426
2017-03-03 17:42:33 +01:00
2 changed files with 171 additions and 111 deletions

View File

@@ -97,7 +97,7 @@ void BLI_task_pool_work_and_wait(TaskPool *pool);
void BLI_task_pool_cancel(TaskPool *pool);
/* set number of threads allowed to be used by this pool */
void BLI_pool_set_num_threads(TaskPool *pool, int num_threads);
void BLI_pool_set_num_threads(TaskPool *pool, size_t num_threads);
/* for worker threads, test if canceled */
bool BLI_task_pool_canceled(TaskPool *pool);

View File

@@ -48,6 +48,10 @@
*/
#define MEMPOOL_SIZE 256
/* Parameters controlling how much we spin in nanosleeps before switching to real condition-controlled sleeping. */
#define NANOSLEEP_MAX_SPINNING 200 /* Number of failed attempt to get a task before going to condition waiting. */
#define NANOSLEEP_DURATION (const struct timespec[]){{0, 200L}} /* Nanosleep duration (in nano-seconds). */
typedef struct Task {
struct Task *next, *prev;
@@ -105,11 +109,9 @@ typedef struct TaskMemPoolStats {
struct TaskPool {
TaskScheduler *scheduler;
volatile size_t num;
size_t num;
size_t num_threads;
size_t currently_running_tasks;
ThreadMutex num_mutex;
ThreadCondition num_cond;
void *userdata;
ThreadMutex user_mutex;
@@ -146,10 +148,14 @@ struct TaskScheduler {
bool background_thread_only;
ListBase queue;
ThreadMutex queue_mutex;
ThreadCondition queue_cond;
// size_t num_queued;
SpinLock queue_spinlock;
volatile bool do_exit;
ThreadMutex workers_mutex;
ThreadCondition workers_condition;
size_t num_workers_sleeping;
uint8_t do_exit;
};
typedef struct TaskThread {
@@ -229,42 +235,123 @@ static void task_free(TaskPool *pool, Task *task, const int thread_id)
/* Task Scheduler */
static void task_pool_num_decrease(TaskPool *pool, size_t done)
BLI_INLINE void task_pool_num_decrease(TaskPool *pool, size_t done)
{
BLI_mutex_lock(&pool->num_mutex);
BLI_assert(pool->num >= done);
TaskScheduler *scheduler = pool->scheduler;
pool->num -= done;
atomic_sub_and_fetch_z(&pool->currently_running_tasks, done);
const size_t num = atomic_sub_and_fetch_z(&pool->num, done);
if (pool->num == 0)
BLI_condition_notify_all(&pool->num_cond);
/* WARNING! do not use pool anymore, it might be already freed by concurrent thread! */
BLI_mutex_unlock(&pool->num_mutex);
/* This is needed for several things:
* - Wake up all sleeping threads on exit, before we join them.
* - Wake up 'main' thread itself in case it called BLI_task_pool_work_and_wait() and ended up sleeping there.
* - Wake up 'main' thread itself in case it called BLI_task_pool_cancel() and ended up sleeping there. */
if (num == 0 && scheduler->num_workers_sleeping != 0) {
BLI_mutex_lock(&scheduler->workers_mutex);
BLI_condition_notify_all(&scheduler->workers_condition);
BLI_mutex_unlock(&scheduler->workers_mutex);
}
}
static void task_pool_num_increase(TaskPool *pool)
BLI_INLINE void task_pool_num_increase(TaskPool *pool)
{
BLI_mutex_lock(&pool->num_mutex);
atomic_add_and_fetch_z(&pool->num, 1);
pool->num++;
BLI_condition_notify_all(&pool->num_cond);
BLI_mutex_unlock(&pool->num_mutex);
if (pool->scheduler->num_workers_sleeping != 0) {
BLI_mutex_lock(&pool->scheduler->workers_mutex);
/* NOTE: Even tho it's only single task added here we notify all threads.
* The reason for that is because there might be much more tasks coming
* right after this one, so waking up all threads earlier seems to give
* a bit less of threading overhead.
*/
BLI_condition_notify_all(&pool->scheduler->workers_condition);
BLI_mutex_unlock(&pool->scheduler->workers_mutex);
}
}
static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task)
BLI_INLINE bool task_find(
TaskScheduler * restrict scheduler, Task ** restrict task, TaskPool * restrict pool, const bool is_main)
{
Task *current_task;
bool found_task = false;
/* This check allows us to completely avoid a spinlock in case the queue is reported empty.
* There is a possibility of race condition here (check being done after task has been added to queue,
* and before counter is increased), but this should not be an issue in practice, quite unlikely and
* would just delay a bit that thread going back to work. */
// if (scheduler->num_queued != 0) {
if (scheduler->queue.first != NULL) {
/* NOTE: We almost always do single iteration here, so spin time is most of the time is really low. */
BLI_spin_lock(&scheduler->queue_spinlock);
for (current_task = scheduler->queue.first;
current_task != NULL;
current_task = current_task->next)
{
TaskPool *current_pool = current_task->pool;
if (!ELEM(pool, NULL, current_pool)) {
continue;
}
if (!is_main && scheduler->background_thread_only && !current_pool->run_in_background) {
continue;
}
if (atomic_add_and_fetch_z(&current_pool->currently_running_tasks, 1) <= current_pool->num_threads ||
is_main || current_pool->num_threads == 0)
{
*task = current_task;
found_task = true;
BLI_remlink(&scheduler->queue, *task);
// atomic_sub_and_fetch_z(&scheduler->num_queued, 1);
break;
}
else {
atomic_sub_and_fetch_z(&current_pool->currently_running_tasks, 1);
}
}
BLI_spin_unlock(&scheduler->queue_spinlock);
}
return found_task;
}
BLI_INLINE bool task_wait(TaskScheduler * restrict scheduler, int * restrict loop_count)
{
/* If we have iterated NANOSLEEP_MAX_SPINNING times without finding a task, go into real sleep. */
if (++(*loop_count) > NANOSLEEP_MAX_SPINNING) {
BLI_mutex_lock(&scheduler->workers_mutex);
/* Check again inside the mutex, bad race condition is possible here (though unlikely),
* leading to undying thread... */
if (scheduler->do_exit) {
BLI_mutex_unlock(&scheduler->workers_mutex);
return true;
}
/* Even though this is read outside of mutex lock, there is no real need to use atomic ops here,
* changing the value inside mutex should be enough to ensure safety. */
scheduler->num_workers_sleeping++;
BLI_condition_wait(&scheduler->workers_condition, &scheduler->workers_mutex);
scheduler->num_workers_sleeping--;
BLI_mutex_unlock(&scheduler->workers_mutex);
}
else {
nanosleep(NANOSLEEP_DURATION, NULL);
}
return false;
}
BLI_INLINE bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task)
{
bool found_task = false;
BLI_mutex_lock(&scheduler->queue_mutex);
while (!scheduler->queue.first && !scheduler->do_exit)
BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex);
int loop_count = 0;
do {
Task *current_task;
/* Assuming we can only have a void queue in 'exit' case here seems logical (we should only be here after
* our worker thread has been woken up from a condition_wait(), which only happens after a new task was
* added to the queue), but it is wrong.
@@ -276,38 +363,16 @@ static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task
* So we only abort here if do_exit is set.
*/
if (scheduler->do_exit) {
BLI_mutex_unlock(&scheduler->queue_mutex);
return false;
}
for (current_task = scheduler->queue.first;
current_task != NULL;
current_task = current_task->next)
{
TaskPool *pool = current_task->pool;
if (scheduler->background_thread_only && !pool->run_in_background) {
continue;
}
if (atomic_add_and_fetch_z(&pool->currently_running_tasks, 1) <= pool->num_threads ||
pool->num_threads == 0)
{
*task = current_task;
found_task = true;
BLI_remlink(&scheduler->queue, *task);
break;
}
else {
atomic_sub_and_fetch_z(&pool->currently_running_tasks, 1);
if (!(found_task = task_find(scheduler, task, NULL, false))) {
if (task_wait(scheduler, &loop_count)) {
return false;
}
}
if (!found_task)
BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex);
} while (!found_task);
BLI_mutex_unlock(&scheduler->queue_mutex);
return true;
}
@@ -328,6 +393,8 @@ static void *task_scheduler_thread_run(void *thread_p)
/* delete task */
task_free(pool, task, thread_id);
atomic_sub_and_fetch_z(&pool->currently_running_tasks, 1);
/* notify pool task was done */
task_pool_num_decrease(pool, 1);
}
@@ -341,11 +408,13 @@ TaskScheduler *BLI_task_scheduler_create(int num_threads)
/* multiple places can use this task scheduler, sharing the same
* threads, so we keep track of the number of users. */
scheduler->do_exit = false;
scheduler->do_exit = 0;
BLI_listbase_clear(&scheduler->queue);
BLI_mutex_init(&scheduler->queue_mutex);
BLI_condition_init(&scheduler->queue_cond);
BLI_spin_init(&scheduler->queue_spinlock);
BLI_mutex_init(&scheduler->workers_mutex);
BLI_condition_init(&scheduler->workers_condition);
if (num_threads == 0) {
/* automatic number of threads will be main thread + num cores */
@@ -391,10 +460,12 @@ void BLI_task_scheduler_free(TaskScheduler *scheduler)
Task *task;
/* stop all waiting threads */
BLI_mutex_lock(&scheduler->queue_mutex);
scheduler->do_exit = true;
BLI_condition_notify_all(&scheduler->queue_cond);
BLI_mutex_unlock(&scheduler->queue_mutex);
atomic_fetch_and_or_uint8(&scheduler->do_exit, 1);
if (scheduler->num_workers_sleeping != 0) {
BLI_mutex_lock(&scheduler->workers_mutex);
BLI_condition_notify_all(&scheduler->workers_condition);
BLI_mutex_unlock(&scheduler->workers_mutex);
}
/* delete threads */
if (scheduler->threads) {
@@ -430,8 +501,10 @@ void BLI_task_scheduler_free(TaskScheduler *scheduler)
BLI_freelistN(&scheduler->queue);
/* delete mutex/condition */
BLI_mutex_end(&scheduler->queue_mutex);
BLI_condition_end(&scheduler->queue_cond);
BLI_spin_end(&scheduler->queue_spinlock);
BLI_mutex_end(&scheduler->workers_mutex);
BLI_condition_end(&scheduler->workers_condition);
MEM_freeN(scheduler);
}
@@ -441,20 +514,24 @@ int BLI_task_scheduler_num_threads(TaskScheduler *scheduler)
return scheduler->num_threads + 1;
}
static void task_scheduler_push(TaskScheduler *scheduler, Task *task, TaskPriority priority)
BLI_INLINE void task_scheduler_push(TaskScheduler *scheduler, Task *task, TaskPriority priority)
{
task_pool_num_increase(task->pool);
TaskPool *pool = task->pool;
/* add task to queue */
BLI_mutex_lock(&scheduler->queue_mutex);
BLI_spin_lock(&scheduler->queue_spinlock);
if (priority == TASK_PRIORITY_HIGH)
BLI_addhead(&scheduler->queue, task);
else
BLI_addtail(&scheduler->queue, task);
BLI_condition_notify_one(&scheduler->queue_cond);
BLI_mutex_unlock(&scheduler->queue_mutex);
BLI_spin_unlock(&scheduler->queue_spinlock);
/* WARNING! do not use task anymore, it might be already processed and freed by concurrent thread! */
task_pool_num_increase(pool);
// atomic_add_and_fetch_z(&scheduler->num_queued, 1);
}
static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool)
@@ -462,7 +539,7 @@ static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool)
Task *task, *nexttask;
size_t done = 0;
BLI_mutex_lock(&scheduler->queue_mutex);
BLI_spin_lock(&scheduler->queue_spinlock);
/* free all tasks from this pool from the queue */
for (task = scheduler->queue.first; task; task = nexttask) {
@@ -476,7 +553,9 @@ static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool)
}
}
BLI_mutex_unlock(&scheduler->queue_mutex);
BLI_spin_unlock(&scheduler->queue_spinlock);
// atomic_sub_and_fetch_z(&scheduler->num_queued, done);
/* notify done */
task_pool_num_decrease(pool, done);
@@ -507,9 +586,6 @@ static TaskPool *task_pool_create_ex(TaskScheduler *scheduler, void *userdata, c
pool->do_cancel = false;
pool->run_in_background = is_background;
BLI_mutex_init(&pool->num_mutex);
BLI_condition_init(&pool->num_cond);
pool->userdata = userdata;
BLI_mutex_init(&pool->user_mutex);
@@ -567,9 +643,6 @@ void BLI_task_pool_free(TaskPool *pool)
{
BLI_task_pool_cancel(pool);
BLI_mutex_end(&pool->num_mutex);
BLI_condition_end(&pool->num_cond);
BLI_mutex_end(&pool->user_mutex);
/* Free local memory pool, those pointers are lost forever. */
@@ -634,60 +707,43 @@ void BLI_task_pool_push_from_thread(TaskPool *pool, TaskRunFunction run,
void BLI_task_pool_work_and_wait(TaskPool *pool)
{
TaskScheduler *scheduler = pool->scheduler;
BLI_mutex_lock(&pool->num_mutex);
int loop_count = 0;
while (pool->num != 0) {
Task *task, *work_task = NULL;
Task *task;
bool found_task = false;
BLI_mutex_unlock(&pool->num_mutex);
BLI_mutex_lock(&scheduler->queue_mutex);
/* find task from this pool. if we get a task from another pool,
* we can get into deadlock */
if (pool->num_threads == 0 ||
pool->currently_running_tasks < pool->num_threads)
{
for (task = scheduler->queue.first; task; task = task->next) {
if (task->pool == pool) {
work_task = task;
found_task = true;
BLI_remlink(&scheduler->queue, task);
break;
}
}
}
BLI_mutex_unlock(&scheduler->queue_mutex);
found_task = task_find(scheduler, &task, pool, true);
/* if found task, do it, otherwise wait until other tasks are done */
if (found_task) {
/* run task */
atomic_add_and_fetch_z(&pool->currently_running_tasks, 1);
work_task->run(pool, work_task->taskdata, 0);
task->run(pool, task->taskdata, 0);
/* delete task */
task_free(pool, task, 0);
atomic_sub_and_fetch_z(&pool->currently_running_tasks, 1);
/* notify pool task was done */
task_pool_num_decrease(pool, 1);
/* Reset the 'failed' counter to zero. */
loop_count = 0;
}
BLI_mutex_lock(&pool->num_mutex);
if (pool->num == 0)
else if (pool->num == 0) {
break;
if (!found_task)
BLI_condition_wait(&pool->num_cond, &pool->num_mutex);
}
else {
task_wait(scheduler, &loop_count); /* We do not care about return value here. */
}
}
BLI_mutex_unlock(&pool->num_mutex);
}
void BLI_pool_set_num_threads(TaskPool *pool, int num_threads)
void BLI_pool_set_num_threads(TaskPool *pool, size_t num_threads)
{
/* NOTE: Don't try to modify threads while tasks are running! */
pool->num_threads = num_threads;
@@ -700,10 +756,14 @@ void BLI_task_pool_cancel(TaskPool *pool)
task_scheduler_clear(pool->scheduler, pool);
/* wait until all entries are cleared */
BLI_mutex_lock(&pool->num_mutex);
while (pool->num)
BLI_condition_wait(&pool->num_cond, &pool->num_mutex);
BLI_mutex_unlock(&pool->num_mutex);
while (pool->num) {
/* No real point in spinning here... */
BLI_mutex_lock(&pool->scheduler->workers_mutex);
pool->scheduler->num_workers_sleeping++;
BLI_condition_wait(&pool->scheduler->workers_condition, &pool->scheduler->workers_mutex);
pool->scheduler->num_workers_sleeping--;
BLI_mutex_unlock(&pool->scheduler->workers_mutex);
}
pool->do_cancel = false;
}