Skip to content

Commit 1d41140

Browse files
committed
wasm/alloc: async dealloc large chunks
We need to memset(0) memory for wasmtime, however if the user has configured large chunks of memory, then this can cause us to go over the task budget. In an effort to prevent that, we deallocate asynchronously. Since allocation is now asynchronous (due to a pending zero operation) we need a way to fit that into wasmtime's custom memory allocator API. In order to do that we use a thread local variable as a side channel to pass the allocated buffer into the VM, since wasmtime's APIs don't give us the ability to pass any data into the allocation request. Signed-off-by: Tyler Rockwood <[email protected]>
1 parent caf1b8e commit 1d41140

File tree

4 files changed

+306
-48
lines changed

4 files changed

+306
-48
lines changed

src/v/wasm/allocator.cc

Lines changed: 43 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,20 @@
1010
*/
1111
#include "wasm/allocator.h"
1212

13+
#include "ssx/future-util.h"
1314
#include "vassert.h"
1415
#include "vlog.h"
1516
#include "wasm/logger.h"
1617

1718
#include <seastar/core/align.hh>
1819
#include <seastar/core/aligned_buffer.hh>
20+
#include <seastar/core/future.hh>
21+
#include <seastar/core/loop.hh>
22+
#include <seastar/core/lowres_clock.hh>
1923
#include <seastar/core/print.hh>
24+
#include <seastar/core/when_all.hh>
25+
#include <seastar/coroutine/maybe_yield.hh>
26+
#include <seastar/util/later.hh>
2027

2128
#include <sys/mman.h>
2229

@@ -26,36 +33,56 @@
2633

2734
namespace wasm {
2835

29-
heap_allocator::heap_allocator(config c) {
36+
heap_allocator::heap_allocator(config c)
37+
: _memset_chunk_size(c.memset_chunk_size) {
3038
size_t page_size = ::getpagesize();
31-
_max_size = ss::align_up(c.heap_memory_size, page_size);
39+
_size = ss::align_up(c.heap_memory_size, page_size);
3240
for (size_t i = 0; i < c.num_heaps; ++i) {
33-
auto buffer = ss::allocate_aligned_buffer<uint8_t>(
34-
_max_size, page_size);
35-
std::memset(buffer.get(), 0, _max_size);
36-
_memory_pool.emplace_back(std::move(buffer), _max_size);
41+
auto buffer = ss::allocate_aligned_buffer<uint8_t>(_size, page_size);
42+
_memory_pool.push_back(
43+
async_zero_memory({std::move(buffer), _size}, _size));
3744
}
3845
}
3946

40-
std::optional<heap_memory> heap_allocator::allocate(request req) {
47+
ss::future<> heap_allocator::stop() {
48+
auto pool = std::exchange(_memory_pool, {});
49+
co_await ss::when_all_succeed(pool.begin(), pool.end());
50+
}
51+
52+
ss::future<std::optional<heap_memory>> heap_allocator::allocate(request req) {
4153
if (_memory_pool.empty()) {
42-
return std::nullopt;
54+
co_return std::nullopt;
4355
}
44-
size_t size = _memory_pool.front().size;
45-
if (size < req.minimum || size > req.maximum) {
46-
return std::nullopt;
56+
if (_size < req.minimum || _size > req.maximum) {
57+
co_return std::nullopt;
4758
}
48-
heap_memory front = std::move(_memory_pool.front());
59+
ss::future<heap_memory> front = std::move(_memory_pool.front());
4960
_memory_pool.pop_front();
50-
return front;
61+
co_return co_await std::move(front);
5162
}
5263

5364
void heap_allocator::deallocate(heap_memory m, size_t used_amount) {
54-
std::memset(m.data.get(), 0, used_amount);
55-
_memory_pool.push_back(std::move(m));
65+
_memory_pool.push_back(async_zero_memory(std::move(m), used_amount));
66+
}
67+
68+
ss::future<heap_memory>
69+
heap_allocator::async_zero_memory(heap_memory m, size_t used_amount) {
70+
uint8_t* data = m.data.get();
71+
size_t remaining = used_amount;
72+
while (true) {
73+
if (remaining <= _memset_chunk_size) {
74+
std::memset(data, 0, remaining);
75+
co_return m;
76+
}
77+
std::memset(data, 0, _memset_chunk_size);
78+
// NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
79+
data += _memset_chunk_size;
80+
remaining -= _memset_chunk_size;
81+
co_await ss::coroutine::maybe_yield();
82+
}
5683
}
5784

58-
size_t heap_allocator::max_size() const { return _max_size; }
85+
size_t heap_allocator::max_size() const { return _size; }
5986

6087
stack_memory::stack_memory(stack_bounds bounds, allocated_memory data)
6188
: _bounds(bounds)

src/v/wasm/allocator.h

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414

1515
#include <seastar/core/aligned_buffer.hh>
1616
#include <seastar/core/chunked_fifo.hh>
17-
#include <seastar/util/optimized_optional.hh>
17+
#include <seastar/core/condition-variable.hh>
18+
#include <seastar/core/future.hh>
19+
#include <seastar/core/gate.hh>
1820

1921
#include <absl/container/btree_set.h>
2022

@@ -58,10 +60,18 @@ class heap_allocator {
5860
size_t heap_memory_size;
5961
// The total number of heaps allocated per core.
6062
size_t num_heaps;
63+
// The amount of memory we zero out at once.
64+
size_t memset_chunk_size;
6165
};
6266

6367
explicit heap_allocator(config);
6468

69+
/**
70+
* Stop this allocator, waiting for any asynchronous zero'ing of bytes to
71+
* finish.
72+
*/
73+
ss::future<> stop();
74+
6575
/**
6676
* A request of heap memory based on the following bounds.
6777
*/
@@ -71,18 +81,22 @@ class heap_allocator {
7181
};
7282

7383
/**
74-
* Allocate heap memory by taking a memory instance from the pool.
84+
* Allocate heap memory by taking a memory instance from the pool, returns
85+
* std::nullopt if the memory request cannot be fulfilled either because the
86+
* request does not fix within our bounds or because all memory is currently
87+
* allocated.
7588
*
7689
* Memory returned from this method will be zero-filled.
7790
*/
78-
std::optional<heap_memory> allocate(request);
91+
ss::future<std::optional<heap_memory>> allocate(request);
7992

8093
/**
8194
* Deallocate heap memory by returing a memory instance to the pool.
8295
*
8396
* used_amount is to zero out the used memory from the heap, this is
8497
* required so that if only a portion of a large memory space was used we
85-
* don't have to touch all the bytes.
98+
* don't have to touch all the bytes. If the used_amount is over some
99+
* threshold then we make zero'ing out the bytes an asynchronous task.
86100
*/
87101
void deallocate(heap_memory, size_t used_amount);
88102

@@ -92,10 +106,13 @@ class heap_allocator {
92106
size_t max_size() const;
93107

94108
private:
95-
size_t _max_size;
109+
ss::future<heap_memory> async_zero_memory(heap_memory, size_t used_amount);
110+
111+
size_t _memset_chunk_size;
112+
size_t _size;
96113
// We expect this list to be small, so override the chunk to be smaller too.
97114
static constexpr size_t items_per_chunk = 16;
98-
ss::chunked_fifo<heap_memory, items_per_chunk> _memory_pool;
115+
ss::chunked_fifo<ss::future<heap_memory>, items_per_chunk> _memory_pool;
99116
};
100117

101118
/**

src/v/wasm/tests/wasm_allocator_test.cc

Lines changed: 99 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,12 @@
1010
*/
1111

1212
#include "gmock/gmock.h"
13+
#include "units.h"
1314
#include "wasm/allocator.h"
1415

16+
#include <seastar/core/reactor.hh>
17+
#include <seastar/core/when_all.hh>
18+
1519
#include <gmock/gmock.h>
1620
#include <gtest/gtest.h>
1721

@@ -22,16 +26,22 @@
2226

2327
namespace wasm {
2428

29+
constexpr static auto default_memset_chunk_size = 10_MiB;
30+
2531
using ::testing::Optional;
2632

2733
TEST(HeapAllocatorParamsTest, SizeIsAligned) {
2834
size_t page_size = ::getpagesize();
2935
heap_allocator allocator(heap_allocator::config{
3036
.heap_memory_size = page_size + 3,
3137
.num_heaps = 1,
38+
.memset_chunk_size = default_memset_chunk_size,
3239
});
33-
auto mem = allocator.allocate(
34-
{.minimum = 0, .maximum = std::numeric_limits<size_t>::max()});
40+
auto mem = allocator
41+
.allocate(
42+
{.minimum = 0,
43+
.maximum = std::numeric_limits<size_t>::max()})
44+
.get();
3545
ASSERT_TRUE(mem.has_value());
3646
EXPECT_EQ(mem->size, page_size * 2);
3747
}
@@ -41,8 +51,10 @@ TEST(HeapAllocatorTest, CanAllocateOne) {
4151
heap_allocator allocator(heap_allocator::config{
4252
.heap_memory_size = page_size,
4353
.num_heaps = 1,
54+
.memset_chunk_size = default_memset_chunk_size,
4455
});
45-
auto mem = allocator.allocate({.minimum = page_size, .maximum = page_size});
56+
auto mem
57+
= allocator.allocate({.minimum = page_size, .maximum = page_size}).get();
4658
ASSERT_TRUE(mem.has_value());
4759
EXPECT_EQ(mem->size, page_size);
4860
}
@@ -52,14 +64,17 @@ TEST(HeapAllocatorTest, MustAllocateWithinBounds) {
5264
heap_allocator allocator(heap_allocator::config{
5365
.heap_memory_size = page_size,
5466
.num_heaps = 1,
67+
.memset_chunk_size = default_memset_chunk_size,
5568
});
5669
// minimum too large
57-
auto mem = allocator.allocate(
58-
{.minimum = page_size * 2, .maximum = page_size * 3});
70+
auto mem = allocator
71+
.allocate({.minimum = page_size * 2, .maximum = page_size * 3})
72+
.get();
5973
EXPECT_FALSE(mem.has_value());
6074
// maximum too small
61-
mem = allocator.allocate(
62-
{.minimum = page_size / 2, .maximum = page_size - 1});
75+
mem = allocator
76+
.allocate({.minimum = page_size / 2, .maximum = page_size - 1})
77+
.get();
6378
EXPECT_FALSE(mem.has_value());
6479
}
6580

@@ -68,10 +83,13 @@ TEST(HeapAllocatorTest, Exhaustion) {
6883
heap_allocator allocator(heap_allocator::config{
6984
.heap_memory_size = page_size,
7085
.num_heaps = 1,
86+
.memset_chunk_size = default_memset_chunk_size,
7187
});
72-
auto mem = allocator.allocate({.minimum = page_size, .maximum = page_size});
88+
auto mem
89+
= allocator.allocate({.minimum = page_size, .maximum = page_size}).get();
7390
EXPECT_TRUE(mem.has_value());
74-
mem = allocator.allocate({.minimum = page_size, .maximum = page_size});
91+
mem
92+
= allocator.allocate({.minimum = page_size, .maximum = page_size}).get();
7593
EXPECT_FALSE(mem.has_value());
7694
}
7795

@@ -80,25 +98,90 @@ TEST(HeapAllocatorTest, CanReturnMemoryToThePool) {
8098
heap_allocator allocator(heap_allocator::config{
8199
.heap_memory_size = page_size,
82100
.num_heaps = 3,
101+
.memset_chunk_size = default_memset_chunk_size,
83102
});
84103
heap_allocator::request req{.minimum = page_size, .maximum = page_size};
85104
std::vector<heap_memory> allocated;
86105
for (int i = 0; i < 3; ++i) {
87-
auto mem = allocator.allocate(req);
106+
auto mem = allocator.allocate(req).get();
88107
ASSERT_TRUE(mem.has_value());
89108
allocated.push_back(std::move(*mem));
90109
}
91-
auto mem = allocator.allocate(req);
110+
auto mem = allocator.allocate(req).get();
92111
EXPECT_FALSE(mem.has_value());
93112
mem = std::move(allocated.back());
94113
allocated.pop_back();
95114
allocator.deallocate(std::move(*mem), /*used_amount=*/0);
96-
mem = allocator.allocate(req);
115+
mem = allocator.allocate(req).get();
97116
EXPECT_TRUE(mem.has_value());
98-
mem = allocator.allocate(req);
117+
mem = allocator.allocate(req).get();
99118
EXPECT_FALSE(mem.has_value());
100119
}
101120

121+
// We want to test a specific scenario where the deallocation happens
122+
// asynchronously, however, release mode continuations can be "inlined" into the
123+
// current executing task, so we can't enforce the scenario we want to test, so
124+
// this test only runs in debug mode, which forces as many scheduling points as
125+
// possible and the zeroing task always happens asynchronously.
126+
#ifndef NDEBUG
127+
128+
using ::testing::_;
129+
130+
TEST(HeapAllocatorTest, AsyncDeallocationOnlyOneAwakened) {
131+
size_t page_size = ::getpagesize();
132+
// force deallocations to be asynchronous.
133+
size_t test_chunk_size = page_size / 4;
134+
heap_allocator allocator(heap_allocator::config{
135+
.heap_memory_size = page_size,
136+
.num_heaps = 2,
137+
.memset_chunk_size = test_chunk_size,
138+
});
139+
heap_allocator::request req{.minimum = page_size, .maximum = page_size};
140+
// Start on deallocation in the background.
141+
allocator.deallocate(allocator.allocate(req).get().value(), page_size);
142+
// Start another deallocation in the background so there is no memory left.
143+
allocator.deallocate(allocator.allocate(req).get().value(), page_size);
144+
auto waiter1 = allocator.allocate(req);
145+
auto waiter2 = allocator.allocate(req);
146+
// There should not be memory available, so both our requests for
147+
// memory should be waiting.
148+
EXPECT_FALSE(waiter1.available());
149+
EXPECT_FALSE(waiter2.available());
150+
// waiter1 should be notified first there is memory available, but waiter2
151+
// should not be notified yet as there is a deallocation in flight.
152+
// waiter2 may or may not be ready depending on task execution order (which
153+
// is randomized in debug mode), so we cannot assert that it's not
154+
// completed.
155+
EXPECT_THAT(waiter1.get(), Optional(_));
156+
// The second waiter completes and we can allocate memory
157+
EXPECT_THAT(waiter2.get(), Optional(_));
158+
}
159+
160+
TEST(HeapAllocatorTest, AsyncDeallocationNotEnoughMemory) {
161+
size_t page_size = ::getpagesize();
162+
// force deallocations to be asynchronous.
163+
size_t test_chunk_size = page_size / 4;
164+
heap_allocator allocator(heap_allocator::config{
165+
.heap_memory_size = page_size,
166+
.num_heaps = 1,
167+
.memset_chunk_size = test_chunk_size,
168+
});
169+
heap_allocator::request req{.minimum = page_size, .maximum = page_size};
170+
allocator.deallocate(allocator.allocate(req).get().value(), page_size);
171+
172+
// The first request succeeded and is waiting for the deallocation to finish
173+
auto waiter1 = allocator.allocate(req);
174+
EXPECT_FALSE(waiter1.available());
175+
176+
// Future requests fail immediately, no memory is available
177+
auto waiter2 = allocator.allocate(req);
178+
EXPECT_TRUE(waiter2.available());
179+
180+
EXPECT_THAT(waiter1.get(), Optional(_));
181+
EXPECT_EQ(waiter2.get(), std::nullopt);
182+
}
183+
#endif
184+
102185
MATCHER(HeapIsZeroed, "is zeroed") {
103186
std::span<uint8_t> d = {arg.data.get(), arg.size};
104187
for (auto e : d) {
@@ -114,15 +197,16 @@ TEST(HeapAllocatorTest, MemoryIsZeroFilled) {
114197
heap_allocator allocator(heap_allocator::config{
115198
.heap_memory_size = page_size,
116199
.num_heaps = 1,
200+
.memset_chunk_size = default_memset_chunk_size,
117201
});
118202
heap_allocator::request req{.minimum = page_size, .maximum = page_size};
119-
auto allocated = allocator.allocate(req);
203+
auto allocated = allocator.allocate(req).get();
120204
ASSERT_TRUE(allocated.has_value());
121205
EXPECT_THAT(allocated, Optional(HeapIsZeroed()));
122206
std::fill_n(allocated->data.get(), 4, 1);
123207
allocator.deallocate(*std::move(allocated), 4);
124208

125-
allocated = allocator.allocate(req);
209+
allocated = allocator.allocate(req).get();
126210
ASSERT_TRUE(allocated.has_value());
127211
EXPECT_THAT(allocated, Optional(HeapIsZeroed()));
128212
}

0 commit comments

Comments
 (0)