BLI: Let to know about prefix sum to weighted parallel function #120322

Closed
Iliya Katushenock wants to merge 5 commits from mod_moder:parallel_for_weighted into main

When changing the target branch, be careful to rebase the branch in your fork to match. See documentation.
3 changed files with 131 additions and 22 deletions

View File

@ -73,6 +73,10 @@ void parallel_for_weighted_impl(IndexRange range,
int64_t grain_size,
FunctionRef<void(IndexRange)> function,
FunctionRef<void(IndexRange, MutableSpan<int64_t>)> task_sizes_fn);
void parallel_for_weighted_impl(IndexRange range,
int64_t grain_size,
FunctionRef<void(IndexRange)> function,
FunctionRef<int64_t(IndexRange)> task_sizes_fn);
void memory_bandwidth_bound_task_impl(FunctionRef<void()> function);
} // namespace detail
@ -102,7 +106,12 @@ inline void parallel_for(IndexRange range, int64_t grain_size, const Function &f
*
* \param task_size_fn: Gets the task index as input and computes that tasks size.
* \param grain_size: Determines approximately how large a combined task should be. For example, if
* the grain size is 100, then 5 tasks of size 20 fit into it.
* the grain size is 100, then 5 tasks of size 20 fit into it. For different use cases, there can
* be different signatures:
* - `int64_t (int64_t element)`
* - `int64_t (IndexRange elements)`
* If Weight of each element is known as prefix sum, it's better to retrieve total weight of range
* instead of do this per each element.
*/
template<typename Function, typename TaskSizeFn>
inline void parallel_for_weighted(IndexRange range,
@ -113,14 +122,27 @@ inline void parallel_for_weighted(IndexRange range,
if (range.is_empty()) {
return;
}
detail::parallel_for_weighted_impl(
range, grain_size, function, [&](const IndexRange sub_range, MutableSpan<int64_t> r_sizes) {
for (const int64_t i : sub_range.index_range()) {
const int64_t task_size = task_size_fn(sub_range[i]);
BLI_assert(task_size >= 0);
r_sizes[i] = task_size;
}
});
if constexpr (std::is_invocable_r_v<int64_t, TaskSizeFn, int64_t>) {
detail::parallel_for_weighted_impl(
range,
grain_size,
function,
[&](const IndexRange sub_range, MutableSpan<int64_t> r_sizes) {
for (const int64_t i : sub_range.index_range()) {
const int64_t task_size = task_size_fn(sub_range[i]);
BLI_assert(task_size >= 0);
r_sizes[i] = task_size;
}
});
}
else {
detail::parallel_for_weighted_impl(
range, grain_size, function, [&](const IndexRange sub_range) -> int64_t {
const int64_t range_total = task_size_fn(sub_range);
BLI_assert(range_total >= 0);
return range_total;
});
}
}
/**

View File

@ -8,11 +8,14 @@
* Task parallel range functions.
*/
#include <iostream>
#include <cstdlib>
#include "MEM_guardedalloc.h"
#include "BLI_array.hh"
#include "BLI_binary_search.hh"
#include "BLI_lazy_threading.hh"
#include "BLI_offset_indices.hh"
#include "BLI_task.h"
@ -223,6 +226,37 @@ void parallel_for_weighted_impl(
});
}
void parallel_for_weighted_impl(const IndexRange range,
const int64_t grain_size,
const FunctionRef<void(IndexRange)> function,
const FunctionRef<int64_t(IndexRange)> task_sizes_fn)
{
Vector<int64_t, 256> offsets_vec;
offsets_vec.append(0);
IndexRange next_range = range;
while (!next_range.is_empty()) {
const int64_t size_of_current_segment = binary_search::find_predicate_begin(
next_range.begin(), next_range.end(), [&](const int64_t i) {
const IndexRange slice = IndexRange::from_begin_end_inclusive(next_range.first(), i);
return task_sizes_fn(slice) > grain_size;
});
offsets_vec.append(next_range[size_of_current_segment] + 1);
next_range = next_range.drop_front(size_of_current_segment + 1);
}
if (offsets_vec.last() < range.last()) {
offsets_vec.append(range.size());
}
const OffsetIndices<int64_t> offsets = offsets_vec.as_span();
threading::parallel_for(offsets.index_range(), 1, [&](const IndexRange offsets_range) {
for (const int64_t i : offsets_range) {
function(offsets[i]);
}
});
}
void memory_bandwidth_bound_task_impl(const FunctionRef<void()> function)
{
#ifdef WITH_TBB

View File

@ -2,8 +2,12 @@
*
* SPDX-License-Identifier: GPL-2.0-or-later */
#include <iostream>
#include <atomic>
#include "BLI_timeit.hh"
#include "BKE_attribute.hh"
#include "BKE_instances.hh"
@ -32,6 +36,8 @@ static void node_declare(NodeDeclarationBuilder &b)
b.add_input<decl::Int>("Group ID").field_on_all().hide_value();
b.add_input<decl::Float>("Sort Weight").field_on_all().hide_value();
b.add_input<decl::Bool>("New");
b.add_output<decl::Geometry>("Geometry").propagate_all();
}
@ -45,10 +51,11 @@ static void node_init(bNodeTree * /*tree*/, bNode *node)
node->custom1 = int(bke::AttrDomain::Point);
}
static void grouped_sort(const OffsetIndices<int> offsets,
const Span<float> weights,
MutableSpan<int> indices)
static void grouped_sort_by_element(const OffsetIndices<int> offsets,
const Span<float> weights,
MutableSpan<int> indices)
{
SCOPED_TIMER_AVERAGED(__func__);
const auto comparator = [&](const int index_a, const int index_b) {
const float weight_a = weights[index_a];
const float weight_b = weights[index_b];
@ -59,12 +66,43 @@ static void grouped_sort(const OffsetIndices<int> offsets,
return weight_a < weight_b;
};
threading::parallel_for(offsets.index_range(), 250, [&](const IndexRange range) {
for (const int group_index : range) {
MutableSpan<int> group = indices.slice(offsets[group_index]);
parallel_sort(group.begin(), group.end(), comparator);
threading::parallel_for_weighted(
offsets.index_range(),
1024,
[&](const IndexRange range) {
for (const int group_index : range) {
MutableSpan<int> group = indices.slice(offsets[group_index]);
parallel_sort(group.begin(), group.end(), comparator);
}
},
[&](const int64_t element) -> int64_t { return offsets[element].size(); });
}
static void grouped_sort_by_range(const OffsetIndices<int> offsets,
const Span<float> weights,
MutableSpan<int> indices)
{
SCOPED_TIMER_AVERAGED(__func__);
const auto comparator = [&](const int index_a, const int index_b) {
const float weight_a = weights[index_a];
const float weight_b = weights[index_b];
if (UNLIKELY(weight_a == weight_b)) {
/* Approach to make it stable. */
return index_a < index_b;
}
});
return weight_a < weight_b;
};
threading::parallel_for_weighted(
offsets.index_range(),
1024,
[&](const IndexRange range) {
for (const int group_index : range) {
MutableSpan<int> group = indices.slice(offsets[group_index]);
parallel_sort(group.begin(), group.end(), comparator);
}
},
[&](const IndexRange range) -> int64_t { return offsets[range].size(); });
}
static void find_points_by_group_index(const Span<int> indices,
@ -123,7 +161,8 @@ static std::optional<Array<int>> sorted_indices(const fn::FieldContext &field_co
const int domain_size,
const Field<bool> selection_field,
const Field<int> group_id_field,
const Field<float> weight_field)
const Field<float> weight_field,
const bool is_new)
{
if (domain_size == 0) {
return std::nullopt;
@ -151,7 +190,12 @@ static std::optional<Array<int>> sorted_indices(const fn::FieldContext &field_co
mask.to_indices<int>(gathered_indices);
Array<float> weight_span(domain_size);
array_utils::copy(weight, mask, weight_span.as_mutable_span());
grouped_sort(Span({0, int(mask.size())}), weight_span, gathered_indices);
if (is_new) {
grouped_sort_by_range(Span({0, int(mask.size())}), weight_span, gathered_indices);
}
else {
grouped_sort_by_element(Span({0, int(mask.size())}), weight_span, gathered_indices);
}
}
else {
Array<int> gathered_group_id(mask.size());
@ -162,7 +206,12 @@ static std::optional<Array<int>> sorted_indices(const fn::FieldContext &field_co
if (!weight.is_single()) {
Array<float> weight_span(mask.size());
array_utils::gather(weight, mask, weight_span.as_mutable_span());
grouped_sort(offsets_to_sort.as_span(), weight_span, gathered_indices);
if (is_new) {
grouped_sort_by_range(offsets_to_sort.as_span(), weight_span, gathered_indices);
}
else {
grouped_sort_by_element(offsets_to_sort.as_span(), weight_span, gathered_indices);
}
}
parallel_transform<int>(gathered_indices, 2048, [&](const int pos) { return mask[pos]; });
}
@ -199,6 +248,8 @@ static void node_geo_exec(GeoNodeExecParams params)
const Field<float> weight_field = params.extract_input<Field<float>>("Sort Weight");
const bke::AttrDomain domain = bke::AttrDomain(params.node().custom1);
const bool is_new = params.extract_input<bool>("New");
const bke::AnonymousAttributePropagationInfo propagation_info =
params.get_output_propagation_info("Geometry");
@ -213,7 +264,8 @@ static void node_geo_exec(GeoNodeExecParams params)
instances->instances_num(),
selection_field,
group_id_field,
weight_field))
weight_field,
is_new))
{
bke::Instances *result = geometry::reorder_instaces(
*instances, *indices, propagation_info);
@ -239,7 +291,8 @@ static void node_geo_exec(GeoNodeExecParams params)
src_component->attribute_domain_size(domain),
selection_field,
group_id_field,
weight_field);
weight_field,
is_new);
if (!indices.has_value()) {
continue;
}