Functions: improve handling of thread-local data in lazy functions
The main goal here is to reduce the number of times thread-local data has to be looked up using e.g. `EnumerableThreadSpecific.local()`. While this isn't a bottleneck in many cases, it is when the action performed on the local data is very short and that happens very often (e.g. logging used sockets during geometry nodes evaluation). The solution is to simply pass the thread-local data as parameter to many functions that use it, instead of looking it up in those functions which generally is more costly. The lazy-function graph executor now only looks up the local data if it knows that it might be on a new thread, otherwise it uses the local data retrieved earlier. Alongside with `UserData` there is `LocalUserData` now. This allows users of the lazy-function evaluation (such as geometry nodes) to have custom thread-local data that is passed to all the lazy-functions automatically. This is used for logging now.
This commit is contained in:
@@ -165,6 +165,11 @@ struct NodeState {
|
||||
* computed.
|
||||
*/
|
||||
bool has_side_effects = false;
|
||||
/**
|
||||
* Whether this node has enabled multi-threading. If this is true, the node is allowed to call
|
||||
* methods on #Params from multiple threads.
|
||||
*/
|
||||
bool enabled_multi_threading = false;
|
||||
/**
|
||||
* A node is always in one specific schedule state. This helps to ensure that the same node does
|
||||
* not run twice at the same time accidentally.
|
||||
@@ -283,14 +288,16 @@ class Executor {
|
||||
#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
|
||||
std::thread::id current_main_thread_;
|
||||
#endif
|
||||
/**
|
||||
* A separate linear allocator for every thread. We could potentially reuse some memory, but that
|
||||
* doesn't seem worth it yet.
|
||||
*/
|
||||
struct ThreadLocalData {
|
||||
|
||||
struct ThreadLocalStorage {
|
||||
/**
|
||||
* A separate linear allocator for every thread. We could potentially reuse some memory, but
|
||||
* that doesn't seem worth it yet.
|
||||
*/
|
||||
LinearAllocator<> allocator;
|
||||
std::optional<destruct_ptr<LocalUserData>> local_user_data;
|
||||
};
|
||||
std::unique_ptr<threading::EnumerableThreadSpecific<ThreadLocalData>> thread_locals_;
|
||||
std::unique_ptr<threading::EnumerableThreadSpecific<ThreadLocalStorage>> thread_locals_;
|
||||
LinearAllocator<> main_allocator_;
|
||||
/**
|
||||
* Set to false when the first execution ends.
|
||||
@@ -299,6 +306,17 @@ class Executor {
|
||||
|
||||
friend GraphExecutorLFParams;
|
||||
|
||||
/**
|
||||
* Data that is local to the current thread. It is passed around in many places to avoid
|
||||
* retrieving it too often which would be more costly. If this evaluator does not use
|
||||
* multi-threading, this may use the `main_allocator_` and the local user data passed in by the
|
||||
* caller.
|
||||
*/
|
||||
struct LocalData {
|
||||
LinearAllocator<> *allocator;
|
||||
LocalUserData *local_user_data;
|
||||
};
|
||||
|
||||
public:
|
||||
Executor(const GraphExecutor &self) : self_(self), loaded_inputs_(self.graph_inputs_.size())
|
||||
{
|
||||
@@ -341,6 +359,8 @@ class Executor {
|
||||
};
|
||||
BLI_SCOPED_DEFER(deferred_func);
|
||||
|
||||
const LocalData local_data = this->get_local_data();
|
||||
|
||||
CurrentTask current_task;
|
||||
if (is_first_execution_) {
|
||||
this->initialize_node_states();
|
||||
@@ -349,12 +369,12 @@ class Executor {
|
||||
memset(static_cast<void *>(loaded_inputs_.data()), 0, loaded_inputs_.size() * sizeof(bool));
|
||||
|
||||
this->set_always_unused_graph_inputs();
|
||||
this->set_defaulted_graph_outputs();
|
||||
this->set_defaulted_graph_outputs(local_data);
|
||||
|
||||
/* Retrieve and tag side effect nodes. */
|
||||
Vector<const FunctionNode *> side_effect_nodes;
|
||||
if (self_.side_effect_provider_ != nullptr) {
|
||||
side_effect_nodes = self_.side_effect_provider_->get_nodes_with_side_effects(*context_);
|
||||
side_effect_nodes = self_.side_effect_provider_->get_nodes_with_side_effects(context);
|
||||
for (const FunctionNode *node : side_effect_nodes) {
|
||||
const int node_index = node->index_in_graph();
|
||||
NodeState &node_state = *node_states_[node_index];
|
||||
@@ -363,13 +383,13 @@ class Executor {
|
||||
}
|
||||
|
||||
this->initialize_static_value_usages(side_effect_nodes);
|
||||
this->schedule_side_effect_nodes(side_effect_nodes, current_task);
|
||||
this->schedule_side_effect_nodes(side_effect_nodes, current_task, local_data);
|
||||
}
|
||||
|
||||
this->schedule_for_new_output_usages(current_task);
|
||||
this->forward_newly_provided_inputs(current_task);
|
||||
this->schedule_for_new_output_usages(current_task, local_data);
|
||||
this->forward_newly_provided_inputs(current_task, local_data);
|
||||
|
||||
this->run_task(current_task);
|
||||
this->run_task(current_task, local_data);
|
||||
|
||||
if (TaskPool *task_pool = task_pool_.load()) {
|
||||
BLI_task_pool_work_and_wait(task_pool);
|
||||
@@ -433,7 +453,7 @@ class Executor {
|
||||
/**
|
||||
* When the usage of output values changed, propagate that information backwards.
|
||||
*/
|
||||
void schedule_for_new_output_usages(CurrentTask ¤t_task)
|
||||
void schedule_for_new_output_usages(CurrentTask ¤t_task, const LocalData &local_data)
|
||||
{
|
||||
for (const int graph_output_index : self_.graph_outputs_.index_range()) {
|
||||
if (params_->output_was_set(graph_output_index)) {
|
||||
@@ -446,18 +466,19 @@ class Executor {
|
||||
const InputSocket &socket = *self_.graph_outputs_[graph_output_index];
|
||||
const Node &node = socket.node();
|
||||
NodeState &node_state = *node_states_[node.index_in_graph()];
|
||||
this->with_locked_node(node, node_state, current_task, [&](LockedNode &locked_node) {
|
||||
if (output_usage == ValueUsage::Used) {
|
||||
this->set_input_required(locked_node, socket);
|
||||
}
|
||||
else {
|
||||
this->set_input_unused(locked_node, socket);
|
||||
}
|
||||
});
|
||||
this->with_locked_node(
|
||||
node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
|
||||
if (output_usage == ValueUsage::Used) {
|
||||
this->set_input_required(locked_node, socket);
|
||||
}
|
||||
else {
|
||||
this->set_input_unused(locked_node, socket);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void set_defaulted_graph_outputs()
|
||||
void set_defaulted_graph_outputs(const LocalData &local_data)
|
||||
{
|
||||
for (const int graph_output_index : self_.graph_outputs_.index_range()) {
|
||||
const InputSocket &socket = *self_.graph_outputs_[graph_output_index];
|
||||
@@ -469,7 +490,8 @@ class Executor {
|
||||
BLI_assert(default_value != nullptr);
|
||||
|
||||
if (self_.logger_ != nullptr) {
|
||||
self_.logger_->log_socket_value(socket, {type, default_value}, *context_);
|
||||
const Context context{context_->storage, context_->user_data, local_data.local_user_data};
|
||||
self_.logger_->log_socket_value(socket, {type, default_value}, context);
|
||||
}
|
||||
|
||||
void *output_ptr = params_->get_output_data_ptr(graph_output_index);
|
||||
@@ -572,19 +594,20 @@ class Executor {
|
||||
}
|
||||
|
||||
void schedule_side_effect_nodes(const Span<const FunctionNode *> side_effect_nodes,
|
||||
CurrentTask ¤t_task)
|
||||
CurrentTask ¤t_task,
|
||||
const LocalData &local_data)
|
||||
{
|
||||
for (const FunctionNode *node : side_effect_nodes) {
|
||||
NodeState &node_state = *node_states_[node->index_in_graph()];
|
||||
this->with_locked_node(*node, node_state, current_task, [&](LockedNode &locked_node) {
|
||||
this->schedule_node(locked_node, current_task, false);
|
||||
});
|
||||
this->with_locked_node(
|
||||
*node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
|
||||
this->schedule_node(locked_node, current_task, false);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void forward_newly_provided_inputs(CurrentTask ¤t_task)
|
||||
void forward_newly_provided_inputs(CurrentTask ¤t_task, const LocalData &local_data)
|
||||
{
|
||||
LinearAllocator<> &allocator = this->get_main_or_local_allocator();
|
||||
for (const int graph_input_index : self_.graph_inputs_.index_range()) {
|
||||
std::atomic<uint8_t> &was_loaded = loaded_inputs_[graph_input_index];
|
||||
if (was_loaded.load()) {
|
||||
@@ -598,23 +621,25 @@ class Executor {
|
||||
/* The value was forwarded before. */
|
||||
continue;
|
||||
}
|
||||
this->forward_newly_provided_input(current_task, allocator, graph_input_index, input_data);
|
||||
this->forward_newly_provided_input(current_task, local_data, graph_input_index, input_data);
|
||||
}
|
||||
}
|
||||
|
||||
void forward_newly_provided_input(CurrentTask ¤t_task,
|
||||
LinearAllocator<> &allocator,
|
||||
const LocalData &local_data,
|
||||
const int graph_input_index,
|
||||
void *input_data)
|
||||
{
|
||||
const OutputSocket &socket = *self_.graph_inputs_[graph_input_index];
|
||||
const CPPType &type = socket.type();
|
||||
void *buffer = allocator.allocate(type.size(), type.alignment());
|
||||
void *buffer = local_data.allocator->allocate(type.size(), type.alignment());
|
||||
type.move_construct(input_data, buffer);
|
||||
this->forward_value_to_linked_inputs(socket, {type, buffer}, current_task);
|
||||
this->forward_value_to_linked_inputs(socket, {type, buffer}, current_task, local_data);
|
||||
}
|
||||
|
||||
void notify_output_required(const OutputSocket &socket, CurrentTask ¤t_task)
|
||||
void notify_output_required(const OutputSocket &socket,
|
||||
CurrentTask ¤t_task,
|
||||
const LocalData &local_data)
|
||||
{
|
||||
const Node &node = socket.node();
|
||||
const int index_in_node = socket.index();
|
||||
@@ -637,46 +662,49 @@ class Executor {
|
||||
/* The value was forwarded already. */
|
||||
return;
|
||||
}
|
||||
this->forward_newly_provided_input(
|
||||
current_task, this->get_main_or_local_allocator(), graph_input_index, input_data);
|
||||
this->forward_newly_provided_input(current_task, local_data, graph_input_index, input_data);
|
||||
return;
|
||||
}
|
||||
|
||||
BLI_assert(node.is_function());
|
||||
this->with_locked_node(node, node_state, current_task, [&](LockedNode &locked_node) {
|
||||
if (output_state.usage == ValueUsage::Used) {
|
||||
return;
|
||||
}
|
||||
output_state.usage = ValueUsage::Used;
|
||||
this->schedule_node(locked_node, current_task, false);
|
||||
});
|
||||
this->with_locked_node(
|
||||
node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
|
||||
if (output_state.usage == ValueUsage::Used) {
|
||||
return;
|
||||
}
|
||||
output_state.usage = ValueUsage::Used;
|
||||
this->schedule_node(locked_node, current_task, false);
|
||||
});
|
||||
}
|
||||
|
||||
void notify_output_unused(const OutputSocket &socket, CurrentTask ¤t_task)
|
||||
void notify_output_unused(const OutputSocket &socket,
|
||||
CurrentTask ¤t_task,
|
||||
const LocalData &local_data)
|
||||
{
|
||||
const Node &node = socket.node();
|
||||
const int index_in_node = socket.index();
|
||||
NodeState &node_state = *node_states_[node.index_in_graph()];
|
||||
OutputState &output_state = node_state.outputs[index_in_node];
|
||||
|
||||
this->with_locked_node(node, node_state, current_task, [&](LockedNode &locked_node) {
|
||||
output_state.potential_target_sockets -= 1;
|
||||
if (output_state.potential_target_sockets == 0) {
|
||||
BLI_assert(output_state.usage != ValueUsage::Unused);
|
||||
if (output_state.usage == ValueUsage::Maybe) {
|
||||
output_state.usage = ValueUsage::Unused;
|
||||
if (node.is_dummy()) {
|
||||
const int graph_input_index = self_.graph_inputs_.index_of(&socket);
|
||||
params_->set_input_unused(graph_input_index);
|
||||
this->with_locked_node(
|
||||
node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
|
||||
output_state.potential_target_sockets -= 1;
|
||||
if (output_state.potential_target_sockets == 0) {
|
||||
BLI_assert(output_state.usage != ValueUsage::Unused);
|
||||
if (output_state.usage == ValueUsage::Maybe) {
|
||||
output_state.usage = ValueUsage::Unused;
|
||||
if (node.is_dummy()) {
|
||||
const int graph_input_index = self_.graph_inputs_.index_of(&socket);
|
||||
params_->set_input_unused(graph_input_index);
|
||||
}
|
||||
else {
|
||||
/* Schedule as priority node. This allows freeing up memory earlier which results
|
||||
* in better memory reuse and less copy-on-write copies caused by shared data. */
|
||||
this->schedule_node(locked_node, current_task, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
/* Schedule as priority node. This allows freeing up memory earlier which results in
|
||||
* better memory reuse and less copy-on-write copies caused by shared data. */
|
||||
this->schedule_node(locked_node, current_task, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void schedule_node(LockedNode &locked_node, CurrentTask ¤t_task, const bool is_priority)
|
||||
@@ -712,6 +740,7 @@ class Executor {
|
||||
void with_locked_node(const Node &node,
|
||||
NodeState &node_state,
|
||||
CurrentTask ¤t_task,
|
||||
const LocalData &local_data,
|
||||
const FunctionRef<void(LockedNode &)> f)
|
||||
{
|
||||
BLI_assert(&node_state == node_states_[node.index_in_graph()]);
|
||||
@@ -725,96 +754,104 @@ class Executor {
|
||||
f(locked_node);
|
||||
}
|
||||
|
||||
this->send_output_required_notifications(locked_node.delayed_required_outputs, current_task);
|
||||
this->send_output_unused_notifications(locked_node.delayed_unused_outputs, current_task);
|
||||
this->send_output_required_notifications(
|
||||
locked_node.delayed_required_outputs, current_task, local_data);
|
||||
this->send_output_unused_notifications(
|
||||
locked_node.delayed_unused_outputs, current_task, local_data);
|
||||
}
|
||||
|
||||
void send_output_required_notifications(const Span<const OutputSocket *> sockets,
|
||||
CurrentTask ¤t_task)
|
||||
CurrentTask ¤t_task,
|
||||
const LocalData &local_data)
|
||||
{
|
||||
for (const OutputSocket *socket : sockets) {
|
||||
this->notify_output_required(*socket, current_task);
|
||||
this->notify_output_required(*socket, current_task, local_data);
|
||||
}
|
||||
}
|
||||
|
||||
void send_output_unused_notifications(const Span<const OutputSocket *> sockets,
|
||||
CurrentTask ¤t_task)
|
||||
CurrentTask ¤t_task,
|
||||
const LocalData &local_data)
|
||||
{
|
||||
for (const OutputSocket *socket : sockets) {
|
||||
this->notify_output_unused(*socket, current_task);
|
||||
this->notify_output_unused(*socket, current_task, local_data);
|
||||
}
|
||||
}
|
||||
|
||||
void run_task(CurrentTask ¤t_task)
|
||||
void run_task(CurrentTask ¤t_task, const LocalData &local_data)
|
||||
{
|
||||
while (const FunctionNode *node = current_task.scheduled_nodes.pop_next_node()) {
|
||||
if (current_task.scheduled_nodes.is_empty()) {
|
||||
current_task.has_scheduled_nodes.store(false, std::memory_order_relaxed);
|
||||
}
|
||||
this->run_node_task(*node, current_task);
|
||||
this->run_node_task(*node, current_task, local_data);
|
||||
}
|
||||
}
|
||||
|
||||
void run_node_task(const FunctionNode &node, CurrentTask ¤t_task)
|
||||
void run_node_task(const FunctionNode &node,
|
||||
CurrentTask ¤t_task,
|
||||
const LocalData &local_data)
|
||||
{
|
||||
NodeState &node_state = *node_states_[node.index_in_graph()];
|
||||
LinearAllocator<> &allocator = this->get_main_or_local_allocator();
|
||||
LinearAllocator<> &allocator = *local_data.allocator;
|
||||
Context local_context{context_->storage, context_->user_data, local_data.local_user_data};
|
||||
const LazyFunction &fn = node.function();
|
||||
|
||||
bool node_needs_execution = false;
|
||||
this->with_locked_node(node, node_state, current_task, [&](LockedNode &locked_node) {
|
||||
BLI_assert(node_state.schedule_state == NodeScheduleState::Scheduled);
|
||||
node_state.schedule_state = NodeScheduleState::Running;
|
||||
this->with_locked_node(
|
||||
node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
|
||||
BLI_assert(node_state.schedule_state == NodeScheduleState::Scheduled);
|
||||
node_state.schedule_state = NodeScheduleState::Running;
|
||||
|
||||
if (node_state.node_has_finished) {
|
||||
return;
|
||||
}
|
||||
|
||||
bool required_uncomputed_output_exists = false;
|
||||
for (OutputState &output_state : node_state.outputs) {
|
||||
output_state.usage_for_execution = output_state.usage;
|
||||
if (output_state.usage == ValueUsage::Used && !output_state.has_been_computed) {
|
||||
required_uncomputed_output_exists = true;
|
||||
}
|
||||
}
|
||||
if (!required_uncomputed_output_exists && !node_state.has_side_effects) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!node_state.always_used_inputs_requested) {
|
||||
/* Request linked inputs that are always needed. */
|
||||
const Span<Input> fn_inputs = fn.inputs();
|
||||
for (const int input_index : fn_inputs.index_range()) {
|
||||
const Input &fn_input = fn_inputs[input_index];
|
||||
if (fn_input.usage == ValueUsage::Used) {
|
||||
const InputSocket &input_socket = node.input(input_index);
|
||||
if (input_socket.origin() != nullptr) {
|
||||
this->set_input_required(locked_node, input_socket);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
node_state.always_used_inputs_requested = true;
|
||||
}
|
||||
|
||||
for (const int input_index : node_state.inputs.index_range()) {
|
||||
InputState &input_state = node_state.inputs[input_index];
|
||||
if (input_state.was_ready_for_execution) {
|
||||
continue;
|
||||
}
|
||||
if (input_state.value != nullptr) {
|
||||
input_state.was_ready_for_execution = true;
|
||||
continue;
|
||||
}
|
||||
if (!fn.allow_missing_requested_inputs()) {
|
||||
if (input_state.usage == ValueUsage::Used) {
|
||||
if (node_state.node_has_finished) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
node_needs_execution = true;
|
||||
});
|
||||
bool required_uncomputed_output_exists = false;
|
||||
for (OutputState &output_state : node_state.outputs) {
|
||||
output_state.usage_for_execution = output_state.usage;
|
||||
if (output_state.usage == ValueUsage::Used && !output_state.has_been_computed) {
|
||||
required_uncomputed_output_exists = true;
|
||||
}
|
||||
}
|
||||
if (!required_uncomputed_output_exists && !node_state.has_side_effects) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!node_state.always_used_inputs_requested) {
|
||||
/* Request linked inputs that are always needed. */
|
||||
const Span<Input> fn_inputs = fn.inputs();
|
||||
for (const int input_index : fn_inputs.index_range()) {
|
||||
const Input &fn_input = fn_inputs[input_index];
|
||||
if (fn_input.usage == ValueUsage::Used) {
|
||||
const InputSocket &input_socket = node.input(input_index);
|
||||
if (input_socket.origin() != nullptr) {
|
||||
this->set_input_required(locked_node, input_socket);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
node_state.always_used_inputs_requested = true;
|
||||
}
|
||||
|
||||
for (const int input_index : node_state.inputs.index_range()) {
|
||||
InputState &input_state = node_state.inputs[input_index];
|
||||
if (input_state.was_ready_for_execution) {
|
||||
continue;
|
||||
}
|
||||
if (input_state.value != nullptr) {
|
||||
input_state.was_ready_for_execution = true;
|
||||
continue;
|
||||
}
|
||||
if (!fn.allow_missing_requested_inputs()) {
|
||||
if (input_state.usage == ValueUsage::Used) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
node_needs_execution = true;
|
||||
});
|
||||
|
||||
if (node_needs_execution) {
|
||||
if (!node_state.storage_and_defaults_initialized) {
|
||||
@@ -832,7 +869,7 @@ class Executor {
|
||||
const void *default_value = input_socket.default_value();
|
||||
BLI_assert(default_value != nullptr);
|
||||
if (self_.logger_ != nullptr) {
|
||||
self_.logger_->log_socket_value(input_socket, {type, default_value}, *context_);
|
||||
self_.logger_->log_socket_value(input_socket, {type, default_value}, local_context);
|
||||
}
|
||||
BLI_assert(input_state.value == nullptr);
|
||||
input_state.value = allocator.allocate(type.size(), type.alignment());
|
||||
@@ -846,26 +883,28 @@ class Executor {
|
||||
/* Importantly, the node must not be locked when it is executed. That would result in locks
|
||||
* being hold very long in some cases and results in multiple locks being hold by the same
|
||||
* thread in the same graph which can lead to deadlocks. */
|
||||
this->execute_node(node, node_state, current_task);
|
||||
this->execute_node(node, node_state, current_task, local_data);
|
||||
}
|
||||
|
||||
this->with_locked_node(node, node_state, current_task, [&](LockedNode &locked_node) {
|
||||
this->with_locked_node(
|
||||
node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
|
||||
#ifdef DEBUG
|
||||
if (node_needs_execution) {
|
||||
this->assert_expected_outputs_have_been_computed(locked_node);
|
||||
}
|
||||
if (node_needs_execution) {
|
||||
this->assert_expected_outputs_have_been_computed(locked_node, local_data);
|
||||
}
|
||||
#endif
|
||||
this->finish_node_if_possible(locked_node);
|
||||
const bool reschedule_requested = node_state.schedule_state ==
|
||||
NodeScheduleState::RunningAndRescheduled;
|
||||
node_state.schedule_state = NodeScheduleState::NotScheduled;
|
||||
if (reschedule_requested && !node_state.node_has_finished) {
|
||||
this->schedule_node(locked_node, current_task, false);
|
||||
}
|
||||
});
|
||||
this->finish_node_if_possible(locked_node);
|
||||
const bool reschedule_requested = node_state.schedule_state ==
|
||||
NodeScheduleState::RunningAndRescheduled;
|
||||
node_state.schedule_state = NodeScheduleState::NotScheduled;
|
||||
if (reschedule_requested && !node_state.node_has_finished) {
|
||||
this->schedule_node(locked_node, current_task, false);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void assert_expected_outputs_have_been_computed(LockedNode &locked_node)
|
||||
void assert_expected_outputs_have_been_computed(LockedNode &locked_node,
|
||||
const LocalData &local_data)
|
||||
{
|
||||
const FunctionNode &node = static_cast<const FunctionNode &>(locked_node.node);
|
||||
const NodeState &node_state = locked_node.node_state;
|
||||
@@ -887,7 +926,8 @@ class Executor {
|
||||
}
|
||||
if (!missing_outputs.is_empty()) {
|
||||
if (self_.logger_ != nullptr) {
|
||||
self_.logger_->dump_when_outputs_are_missing(node, missing_outputs, *context_);
|
||||
const Context context{context_->storage, context_->user_data, local_data.local_user_data};
|
||||
self_.logger_->dump_when_outputs_are_missing(node, missing_outputs, context);
|
||||
}
|
||||
BLI_assert_unreachable();
|
||||
}
|
||||
@@ -945,17 +985,22 @@ class Executor {
|
||||
}
|
||||
}
|
||||
|
||||
void execute_node(const FunctionNode &node, NodeState &node_state, CurrentTask ¤t_task);
|
||||
void execute_node(const FunctionNode &node,
|
||||
NodeState &node_state,
|
||||
CurrentTask ¤t_task,
|
||||
const LocalData &local_data);
|
||||
|
||||
void set_input_unused_during_execution(const Node &node,
|
||||
NodeState &node_state,
|
||||
const int input_index,
|
||||
CurrentTask ¤t_task)
|
||||
CurrentTask ¤t_task,
|
||||
const LocalData &local_data)
|
||||
{
|
||||
const InputSocket &input_socket = node.input(input_index);
|
||||
this->with_locked_node(node, node_state, current_task, [&](LockedNode &locked_node) {
|
||||
this->set_input_unused(locked_node, input_socket);
|
||||
});
|
||||
this->with_locked_node(
|
||||
node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
|
||||
this->set_input_unused(locked_node, input_socket);
|
||||
});
|
||||
}
|
||||
|
||||
void set_input_unused(LockedNode &locked_node, const InputSocket &input_socket)
|
||||
@@ -983,13 +1028,15 @@ class Executor {
|
||||
void *set_input_required_during_execution(const Node &node,
|
||||
NodeState &node_state,
|
||||
const int input_index,
|
||||
CurrentTask ¤t_task)
|
||||
CurrentTask ¤t_task,
|
||||
const LocalData &local_data)
|
||||
{
|
||||
const InputSocket &input_socket = node.input(input_index);
|
||||
void *result;
|
||||
this->with_locked_node(node, node_state, current_task, [&](LockedNode &locked_node) {
|
||||
result = this->set_input_required(locked_node, input_socket);
|
||||
});
|
||||
this->with_locked_node(
|
||||
node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
|
||||
result = this->set_input_required(locked_node, input_socket);
|
||||
});
|
||||
return result;
|
||||
}
|
||||
|
||||
@@ -1021,14 +1068,16 @@ class Executor {
|
||||
|
||||
void forward_value_to_linked_inputs(const OutputSocket &from_socket,
|
||||
GMutablePointer value_to_forward,
|
||||
CurrentTask ¤t_task)
|
||||
CurrentTask ¤t_task,
|
||||
const LocalData &local_data)
|
||||
{
|
||||
BLI_assert(value_to_forward.get() != nullptr);
|
||||
LinearAllocator<> &allocator = this->get_main_or_local_allocator();
|
||||
const CPPType &type = *value_to_forward.type();
|
||||
const Context local_context{
|
||||
context_->storage, context_->user_data, local_data.local_user_data};
|
||||
|
||||
if (self_.logger_ != nullptr) {
|
||||
self_.logger_->log_socket_value(from_socket, value_to_forward, *context_);
|
||||
self_.logger_->log_socket_value(from_socket, value_to_forward, local_context);
|
||||
}
|
||||
|
||||
const Span<const InputSocket *> targets = from_socket.targets();
|
||||
@@ -1041,7 +1090,7 @@ class Executor {
|
||||
#ifdef DEBUG
|
||||
if (input_state.value != nullptr) {
|
||||
if (self_.logger_ != nullptr) {
|
||||
self_.logger_->dump_when_input_is_set_twice(*target_socket, from_socket, *context_);
|
||||
self_.logger_->dump_when_input_is_set_twice(*target_socket, from_socket, local_context);
|
||||
}
|
||||
BLI_assert_unreachable();
|
||||
}
|
||||
@@ -1051,7 +1100,7 @@ class Executor {
|
||||
BLI_assert(target_socket->origin() == &from_socket);
|
||||
|
||||
if (self_.logger_ != nullptr) {
|
||||
self_.logger_->log_socket_value(*target_socket, value_to_forward, *context_);
|
||||
self_.logger_->log_socket_value(*target_socket, value_to_forward, local_context);
|
||||
}
|
||||
if (target_node.is_dummy()) {
|
||||
/* Forward the value to the outside of the graph. */
|
||||
@@ -1070,21 +1119,23 @@ class Executor {
|
||||
}
|
||||
continue;
|
||||
}
|
||||
this->with_locked_node(target_node, node_state, current_task, [&](LockedNode &locked_node) {
|
||||
if (input_state.usage == ValueUsage::Unused) {
|
||||
return;
|
||||
}
|
||||
if (is_last_target) {
|
||||
/* No need to make a copy if this is the last target. */
|
||||
this->forward_value_to_input(locked_node, input_state, value_to_forward, current_task);
|
||||
value_to_forward = {};
|
||||
}
|
||||
else {
|
||||
void *buffer = allocator.allocate(type.size(), type.alignment());
|
||||
type.copy_construct(value_to_forward.get(), buffer);
|
||||
this->forward_value_to_input(locked_node, input_state, {type, buffer}, current_task);
|
||||
}
|
||||
});
|
||||
this->with_locked_node(
|
||||
target_node, node_state, current_task, local_data, [&](LockedNode &locked_node) {
|
||||
if (input_state.usage == ValueUsage::Unused) {
|
||||
return;
|
||||
}
|
||||
if (is_last_target) {
|
||||
/* No need to make a copy if this is the last target. */
|
||||
this->forward_value_to_input(
|
||||
locked_node, input_state, value_to_forward, current_task);
|
||||
value_to_forward = {};
|
||||
}
|
||||
else {
|
||||
void *buffer = local_data.allocator->allocate(type.size(), type.alignment());
|
||||
type.copy_construct(value_to_forward.get(), buffer);
|
||||
this->forward_value_to_input(locked_node, input_state, {type, buffer}, current_task);
|
||||
}
|
||||
});
|
||||
}
|
||||
if (value_to_forward.get() != nullptr) {
|
||||
value_to_forward.destruct();
|
||||
@@ -1158,7 +1209,7 @@ class Executor {
|
||||
}
|
||||
#endif
|
||||
if (!thread_locals_) {
|
||||
thread_locals_ = std::make_unique<threading::EnumerableThreadSpecific<ThreadLocalData>>();
|
||||
thread_locals_ = std::make_unique<threading::EnumerableThreadSpecific<ThreadLocalStorage>>();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1187,19 +1238,24 @@ class Executor {
|
||||
CurrentTask new_current_task;
|
||||
new_current_task.scheduled_nodes = std::move(scheduled_nodes);
|
||||
new_current_task.has_scheduled_nodes.store(true, std::memory_order_relaxed);
|
||||
executor.run_task(new_current_task);
|
||||
const LocalData local_data = executor.get_local_data();
|
||||
executor.run_task(new_current_task, local_data);
|
||||
},
|
||||
scheduled_nodes,
|
||||
true,
|
||||
[](TaskPool * /*pool*/, void *data) { MEM_delete(static_cast<ScheduledNodes *>(data)); });
|
||||
}
|
||||
|
||||
LinearAllocator<> &get_main_or_local_allocator()
|
||||
LocalData get_local_data()
|
||||
{
|
||||
if (this->use_multi_threading()) {
|
||||
return thread_locals_->local().allocator;
|
||||
if (!this->use_multi_threading()) {
|
||||
return {&main_allocator_, context_->local_user_data};
|
||||
}
|
||||
return main_allocator_;
|
||||
ThreadLocalStorage &local_storage = thread_locals_->local();
|
||||
if (!local_storage.local_user_data.has_value()) {
|
||||
local_storage.local_user_data = context_->user_data->get_local(local_storage.allocator);
|
||||
}
|
||||
return {&local_storage.allocator, local_storage.local_user_data->get()};
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1209,22 +1265,36 @@ class GraphExecutorLFParams final : public Params {
|
||||
const Node &node_;
|
||||
NodeState &node_state_;
|
||||
CurrentTask ¤t_task_;
|
||||
/** Local data of the thread that calls the lazy-function. */
|
||||
const Executor::LocalData &caller_local_data_;
|
||||
|
||||
public:
|
||||
GraphExecutorLFParams(const LazyFunction &fn,
|
||||
Executor &executor,
|
||||
const Node &node,
|
||||
NodeState &node_state,
|
||||
CurrentTask ¤t_task)
|
||||
: Params(fn, executor.use_multi_threading()),
|
||||
CurrentTask ¤t_task,
|
||||
const Executor::LocalData &local_data)
|
||||
: Params(fn, node_state.enabled_multi_threading),
|
||||
executor_(executor),
|
||||
node_(node),
|
||||
node_state_(node_state),
|
||||
current_task_(current_task)
|
||||
current_task_(current_task),
|
||||
caller_local_data_(local_data)
|
||||
{
|
||||
}
|
||||
|
||||
private:
|
||||
Executor::LocalData get_local_data()
|
||||
{
|
||||
if (!node_state_.enabled_multi_threading) {
|
||||
/* Can use the data from the thread-local data from the calling thread. */
|
||||
return caller_local_data_;
|
||||
}
|
||||
/* Need to retrieve the thread-local data for the current thread. */
|
||||
return executor_.get_local_data();
|
||||
}
|
||||
|
||||
void *try_get_input_data_ptr_impl(const int index) const override
|
||||
{
|
||||
const InputState &input_state = node_state_.inputs[index];
|
||||
@@ -1240,7 +1310,8 @@ class GraphExecutorLFParams final : public Params {
|
||||
if (input_state.was_ready_for_execution) {
|
||||
return input_state.value;
|
||||
}
|
||||
return executor_.set_input_required_during_execution(node_, node_state_, index, current_task_);
|
||||
return executor_.set_input_required_during_execution(
|
||||
node_, node_state_, index, current_task_, this->get_local_data());
|
||||
}
|
||||
|
||||
void *get_output_data_ptr_impl(const int index) override
|
||||
@@ -1248,7 +1319,7 @@ class GraphExecutorLFParams final : public Params {
|
||||
OutputState &output_state = node_state_.outputs[index];
|
||||
BLI_assert(!output_state.has_been_computed);
|
||||
if (output_state.value == nullptr) {
|
||||
LinearAllocator<> &allocator = executor_.get_main_or_local_allocator();
|
||||
LinearAllocator<> &allocator = *this->get_local_data().allocator;
|
||||
const CPPType &type = node_.output(index).type();
|
||||
output_state.value = allocator.allocate(type.size(), type.alignment());
|
||||
}
|
||||
@@ -1261,8 +1332,10 @@ class GraphExecutorLFParams final : public Params {
|
||||
BLI_assert(!output_state.has_been_computed);
|
||||
BLI_assert(output_state.value != nullptr);
|
||||
const OutputSocket &output_socket = node_.output(index);
|
||||
executor_.forward_value_to_linked_inputs(
|
||||
output_socket, {output_socket.type(), output_state.value}, current_task_);
|
||||
executor_.forward_value_to_linked_inputs(output_socket,
|
||||
{output_socket.type(), output_state.value},
|
||||
current_task_,
|
||||
this->get_local_data());
|
||||
output_state.value = nullptr;
|
||||
output_state.has_been_computed = true;
|
||||
}
|
||||
@@ -1281,12 +1354,17 @@ class GraphExecutorLFParams final : public Params {
|
||||
|
||||
void set_input_unused_impl(const int index) override
|
||||
{
|
||||
executor_.set_input_unused_during_execution(node_, node_state_, index, current_task_);
|
||||
executor_.set_input_unused_during_execution(
|
||||
node_, node_state_, index, current_task_, this->get_local_data());
|
||||
}
|
||||
|
||||
bool try_enable_multi_threading_impl() override
|
||||
{
|
||||
return executor_.try_enable_multi_threading();
|
||||
const bool success = executor_.try_enable_multi_threading();
|
||||
if (success) {
|
||||
node_state_.enabled_multi_threading = true;
|
||||
}
|
||||
return success;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1297,13 +1375,13 @@ class GraphExecutorLFParams final : public Params {
|
||||
*/
|
||||
inline void Executor::execute_node(const FunctionNode &node,
|
||||
NodeState &node_state,
|
||||
CurrentTask ¤t_task)
|
||||
CurrentTask ¤t_task,
|
||||
const LocalData &local_data)
|
||||
{
|
||||
const LazyFunction &fn = node.function();
|
||||
GraphExecutorLFParams node_params{fn, *this, node, node_state, current_task};
|
||||
BLI_assert(context_ != nullptr);
|
||||
Context fn_context = *context_;
|
||||
fn_context.storage = node_state.storage;
|
||||
GraphExecutorLFParams node_params{fn, *this, node, node_state, current_task, local_data};
|
||||
|
||||
Context fn_context(node_state.storage, context_->user_data, local_data.local_user_data);
|
||||
|
||||
if (self_.logger_ != nullptr) {
|
||||
self_.logger_->log_before_node_execute(node, node_params, fn_context);
|
||||
|
||||
Reference in New Issue
Block a user