WIP: Functions: new local allocator for better memory reuse and performance #104630

Draft
Jacques Lucke wants to merge 44 commits from JacquesLucke/blender:local-allocator into main

When changing the target branch, be careful to rebase the branch in your fork to match. See documentation.
6 changed files with 93 additions and 53 deletions
Showing only changes of commit 12e76d1f83 - Show all commits

View File

@ -59,11 +59,23 @@ class GraphExecutor : public LazyFunction {
using Logger = GraphExecutorLogger;
using SideEffectProvider = GraphExecutorSideEffectProvider;
struct NodeBufferOffsets {
int node;
int inputs;
int outputs;
};
struct PreprocessData {
Array<NodeBufferOffsets> offsets;
int node_state_buffer_size;
};
private:
/**
* The graph that is evaluated.
*/
const Graph &graph_;
const PreprocessData &preprocess_data_;
/**
* Input and output sockets of the entire graph.
*/
@ -85,12 +97,15 @@ class GraphExecutor : public LazyFunction {
GraphExecutor(const Graph &graph,
Span<const OutputSocket *> graph_inputs,
Span<const InputSocket *> graph_outputs,
const PreprocessData &preprocess_data,
const Logger *logger,
const SideEffectProvider *side_effect_provider);
void *init_storage(LocalAllocator &allocator) const override;
void destruct_storage(void *storage, LocalAllocator &allocator) const override;
static void preprocess(const Graph &graph, PreprocessData &r_preprocess_data);
private:
void execute_impl(Params &params, const Context &context) const override;
};

View File

@ -75,7 +75,7 @@ enum class NodeScheduleState {
RunningAndRescheduled,
};
struct InputState {
struct alignas(8) InputState {
/**
* Value of this input socket. By default, the value is empty. When other nodes are done
* computing their outputs, the computed values will be forwarded to linked input sockets. The
@ -97,7 +97,7 @@ struct InputState {
bool was_ready_for_execution = false;
};
struct OutputState {
struct alignas(8) OutputState {
/**
* Keeps track of how the output value is used. If a connected input becomes used, this output
* has to become used as well. The output becomes unused when it is used by no input socket
@ -127,7 +127,7 @@ struct OutputState {
void *value = nullptr;
};
struct NodeState {
struct alignas(8) NodeState {
/**
* Needs to be locked when any data in this state is accessed that is not explicitly marked as
* not needing the lock.
@ -233,7 +233,7 @@ class Executor {
/**
* State of every node, indexed by #Node::index_in_graph.
*/
MutableSpan<NodeState> node_states_;
MutableSpan<NodeState *> node_states_;
/**
* Parameters provided by the caller. This is always non-null, while a node is running.
*/
@ -271,12 +271,15 @@ class Executor {
LocalAllocator &local_allocator = allocator.local();
for (const int node_index : range) {
const Node &node = *self_.graph_.nodes()[node_index];
NodeState &node_state = node_states_[node_index];
NodeState &node_state = *node_states_[node_index];
if (!node_state.node_has_finished) {
this->destruct_node_state(node, node_state, local_allocator);
this->destruct_node_data(node, node_state, local_allocator);
}
std::destroy_at(&node_state);
}
});
allocator.deallocate(
node_states_[0], self_.preprocess_data_.node_state_buffer_size, alignof(NodeState));
allocator.destruct_free_array(node_states_);
}
@ -317,7 +320,7 @@ class Executor {
side_effect_nodes = self_.side_effect_provider_->get_nodes_with_side_effects(*context_);
for (const FunctionNode *node : side_effect_nodes) {
const int node_index = node->index_in_graph();
NodeState &node_state = node_states_[node_index];
NodeState &node_state = *node_states_[node_index];
node_state.has_side_effects = true;
}
}
@ -340,30 +343,29 @@ class Executor {
void initialize_node_states()
{
Span<const Node *> nodes = self_.graph_.nodes();
node_states_ = context_->allocator->allocate_new_array<NodeState>(nodes.size());
node_states_ = context_->allocator->allocate_array<NodeState *>(nodes.size());
/* Construct all node states in parallel. */
threading::parallel_for(nodes.index_range(), 256, [&](const IndexRange range) {
LocalAllocator &local_allocator = context_->allocator->local();
for (const int i : range) {
const Node &node = *nodes[i];
this->construct_initial_node_state(local_allocator, node, node_states_[i]);
}
});
void *node_states_buffer = context_->allocator->allocate(
self_.preprocess_data_.node_state_buffer_size, alignof(NodeState));
for (const int i : nodes.index_range()) {
const Node &node = *nodes[i];
const GraphExecutor::NodeBufferOffsets &node_offsets = self_.preprocess_data_.offsets[i];
void *state_buffer = POINTER_OFFSET(node_states_buffer, node_offsets.node);
NodeState *node_state = new (state_buffer) NodeState();
node_state->inputs = {
static_cast<InputState *>(POINTER_OFFSET(node_states_buffer, node_offsets.inputs)),
node.inputs().size()};
node_state->outputs = {
static_cast<OutputState *>(POINTER_OFFSET(node_states_buffer, node_offsets.outputs)),
node.outputs().size()};
default_construct_n(node_state->inputs.data(), node_state->inputs.size());
default_construct_n(node_state->outputs.data(), node_state->outputs.size());
node_states_[i] = node_state;
}
}
void construct_initial_node_state(LocalAllocator &allocator,
const Node &node,
NodeState &node_state)
{
const Span<const InputSocket *> node_inputs = node.inputs();
const Span<const OutputSocket *> node_outputs = node.outputs();
node_state.inputs = allocator.allocate_new_array<InputState>(node_inputs.size());
node_state.outputs = allocator.allocate_new_array<OutputState>(node_outputs.size());
}
void destruct_node_state(const Node &node, NodeState &node_state, LocalAllocator &allocator)
void destruct_node_data(const Node &node, NodeState &node_state, LocalAllocator &allocator)
{
if (node.is_function()) {
const LazyFunction &fn = static_cast<const FunctionNode &>(node).function();
@ -376,8 +378,6 @@ class Executor {
const InputSocket &input_socket = node.input(i);
this->destruct_input_value_if_exists(input_state, input_socket.type(), allocator);
}
allocator.destruct_free_array(node_state.inputs);
allocator.destruct_free_array(node_state.outputs);
}
void schedule_newly_requested_outputs(CurrentTask &current_task)
@ -391,7 +391,7 @@ class Executor {
}
const InputSocket &socket = *self_.graph_outputs_[graph_output_index];
const Node &node = socket.node();
NodeState &node_state = node_states_[node.index_in_graph()];
NodeState &node_state = *node_states_[node.index_in_graph()];
this->with_locked_node(node, node_state, current_task, [&](LockedNode &locked_node) {
this->set_input_required(locked_node, socket);
});
@ -424,7 +424,7 @@ class Executor {
for (const int i : self_.graph_inputs_.index_range()) {
const OutputSocket &socket = *self_.graph_inputs_[i];
const Node &node = socket.node();
const NodeState &node_state = node_states_[node.index_in_graph()];
const NodeState &node_state = *node_states_[node.index_in_graph()];
const OutputState &output_state = node_state.outputs[socket.index()];
if (output_state.usage == ValueUsage::Unused) {
params_->set_input_unused(i);
@ -483,7 +483,7 @@ class Executor {
for (const int node_index : reachable_node_flags.index_range()) {
const Node &node = *all_nodes[node_index];
NodeState &node_state = node_states_[node_index];
NodeState &node_state = *node_states_[node_index];
const bool node_is_reachable = reachable_node_flags[node_index];
if (node_is_reachable) {
for (const int output_index : node.outputs().index_range()) {
@ -517,7 +517,7 @@ class Executor {
CurrentTask &current_task)
{
for (const FunctionNode *node : side_effect_nodes) {
NodeState &node_state = node_states_[node->index_in_graph()];
NodeState &node_state = *node_states_[node->index_in_graph()];
this->with_locked_node(*node, node_state, current_task, [&](LockedNode &locked_node) {
this->schedule_node(locked_node, current_task);
});
@ -560,7 +560,7 @@ class Executor {
{
const Node &node = socket.node();
const int index_in_node = socket.index();
NodeState &node_state = node_states_[node.index_in_graph()];
NodeState &node_state = *node_states_[node.index_in_graph()];
/* The notified output socket might be an input of the entire graph. In this case, notify the
* caller that the input is required. */
@ -585,9 +585,6 @@ class Executor {
BLI_assert(node.is_function());
this->with_locked_node(node, node_state, current_task, [&](LockedNode &locked_node) {
if (node_state.node_has_finished) {
return;
}
OutputState &output_state = node_state.outputs[index_in_node];
if (output_state.usage == ValueUsage::Used) {
return;
@ -601,12 +598,9 @@ class Executor {
{
const Node &node = socket.node();
const int index_in_node = socket.index();
NodeState &node_state = node_states_[node.index_in_graph()];
NodeState &node_state = *node_states_[node.index_in_graph()];
this->with_locked_node(node, node_state, current_task, [&](LockedNode &locked_node) {
if (node_state.node_has_finished) {
return;
}
OutputState &output_state = node_state.outputs[index_in_node];
output_state.potential_target_sockets -= 1;
if (output_state.potential_target_sockets == 0) {
@ -660,7 +654,7 @@ class Executor {
CurrentTask &current_task,
const FunctionRef<void(LockedNode &)> f)
{
BLI_assert(&node_state == &node_states_[node.index_in_graph()]);
BLI_assert(&node_state == node_states_[node.index_in_graph()]);
LockedNode locked_node{node, node_state};
if (this->use_multi_threading()) {
@ -704,7 +698,7 @@ class Executor {
void run_node_task(const FunctionNode &node, CurrentTask &current_task)
{
NodeState &node_state = node_states_[node.index_in_graph()];
NodeState &node_state = *node_states_[node.index_in_graph()];
LocalAllocator &allocator = this->get_local_allocator();
const LazyFunction &fn = node.function();
@ -877,7 +871,7 @@ class Executor {
}
}
this->destruct_node_state(node, node_state, allocator);
this->destruct_node_data(node, node_state, allocator);
}
void destruct_input_value_if_exists(InputState &input_state,
@ -986,7 +980,7 @@ class Executor {
const Span<const InputSocket *> targets = from_socket.targets();
for (const InputSocket *target_socket : targets) {
const Node &target_node = target_socket->node();
NodeState &node_state = node_states_[target_node.index_in_graph()];
NodeState &node_state = *node_states_[target_node.index_in_graph()];
const int input_index = target_socket->index();
const bool is_last_target = target_socket == targets.last();
BLI_assert(target_socket->type() == type);
@ -1012,9 +1006,6 @@ class Executor {
continue;
}
this->with_locked_node(target_node, node_state, current_task, [&](LockedNode &locked_node) {
if (node_state.node_has_finished) {
return;
}
InputState &input_state = node_state.inputs[input_index];
#ifdef DEBUG
@ -1278,12 +1269,32 @@ inline void Executor::execute_node(const FunctionNode &node,
}
}
void GraphExecutor::preprocess(const Graph &graph, PreprocessData &r_preprocess_data)
{
const Span<const Node *> nodes = graph.nodes();
r_preprocess_data.offsets.reinitialize(nodes.size());
int offset = 0;
for (const int i : nodes.index_range()) {
const Node &node = *nodes[i];
NodeBufferOffsets &node_offsets = r_preprocess_data.offsets[i];
node_offsets.node = offset;
offset += sizeof(NodeState);
node_offsets.inputs = offset;
offset += sizeof(InputState) * node.inputs().size();
node_offsets.outputs = offset;
offset += sizeof(OutputState) * node.outputs().size();
}
r_preprocess_data.node_state_buffer_size = offset;
}
GraphExecutor::GraphExecutor(const Graph &graph,
const Span<const OutputSocket *> graph_inputs,
const Span<const InputSocket *> graph_outputs,
const PreprocessData &preprocess_data,
const Logger *logger,
const SideEffectProvider *side_effect_provider)
: graph_(graph),
preprocess_data_(preprocess_data),
graph_inputs_(graph_inputs),
graph_outputs_(graph_outputs),
logger_(logger),

View File

@ -105,7 +105,11 @@ TEST(lazy_function, SideEffects)
SimpleSideEffectProvider side_effect_provider{{&store_node}};
GraphExecutor executor_fn{graph, {&input_node.output(0)}, {}, nullptr, &side_effect_provider};
GraphExecutor::PreprocessData preprocess_data;
GraphExecutor::preprocess(graph, preprocess_data);
GraphExecutor executor_fn{
graph, {&input_node.output(0)}, {}, preprocess_data, nullptr, &side_effect_provider};
execute_lazy_function_eagerly(executor_fn, nullptr, std::make_tuple(5), std::make_tuple());
EXPECT_EQ(dst1, 15);
@ -167,8 +171,11 @@ TEST(lazy_function, GraphWithCycle)
graph.update_node_indices();
GraphExecutor::PreprocessData preprocess_data;
GraphExecutor::preprocess(graph, preprocess_data);
GraphExecutor executor_fn{
graph, {&input_node.output(0)}, {&output_node.input(0)}, nullptr, nullptr};
graph, {&input_node.output(0)}, {&output_node.input(0)}, preprocess_data, nullptr, nullptr};
int result = 0;
execute_lazy_function_eagerly(
executor_fn, nullptr, std::make_tuple(10), std::make_tuple(&result));

View File

@ -1139,8 +1139,12 @@ static GeometrySet compute_geometry(
blender::nodes::GeometryNodesLazyFunctionLogger lf_logger(lf_graph_info);
blender::nodes::GeometryNodesLazyFunctionSideEffectProvider lf_side_effect_provider;
lf::GraphExecutor graph_executor{
lf_graph_info.graph, graph_inputs, graph_outputs, &lf_logger, &lf_side_effect_provider};
lf::GraphExecutor graph_executor{lf_graph_info.graph,
graph_inputs,
graph_outputs,
lf_graph_info.graph_preprocess_data,
&lf_logger,
&lf_side_effect_provider};
blender::nodes::GeoNodesModifierData geo_nodes_modifier_data;
geo_nodes_modifier_data.depsgraph = ctx->depsgraph;

View File

@ -187,6 +187,7 @@ struct GeometryNodesLazyFunctionGraphInfo {
* Mappings between the lazy-function graph and the #bNodeTree.
*/
GeometryNodeLazyFunctionGraphMapping mapping;
lf::GraphExecutor::PreprocessData graph_preprocess_data;
/**
* Approximate number of nodes in the graph if all sub-graphs were inlined.
* This can be used as a simple heuristic for the complexity of the node group.

View File

@ -781,6 +781,7 @@ class LazyFunctionForGroupNode : public LazyFunction {
graph_executor_.emplace(lf_graph_info.graph,
std::move(graph_inputs),
std::move(graph_outputs),
lf_graph_info.graph_preprocess_data,
&*lf_logger_,
&*lf_side_effect_provider_);
}
@ -1228,6 +1229,7 @@ struct GeometryNodesLazyFunctionGraphBuilder {
lf_graph_->update_node_indices();
lf_graph_info_->num_inline_nodes_approximate += lf_graph_->nodes().size();
lf::GraphExecutor::preprocess(*lf_graph_, lf_graph_info_->graph_preprocess_data);
}
private: