Skip to content

Commit dbd628d

Browse files
authored
Merge pull request #15101 from rockwotj/async-memset
wasm: asynchronously zero deallocated heaps
2 parents 6896699 + 9341ce7 commit dbd628d

File tree

6 files changed

+333
-51
lines changed

6 files changed

+333
-51
lines changed

src/v/wasm/allocator.cc

Lines changed: 43 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,20 @@
1010
*/
1111
#include "wasm/allocator.h"
1212

13+
#include "ssx/future-util.h"
1314
#include "vassert.h"
1415
#include "vlog.h"
1516
#include "wasm/logger.h"
1617

1718
#include <seastar/core/align.hh>
1819
#include <seastar/core/aligned_buffer.hh>
20+
#include <seastar/core/future.hh>
21+
#include <seastar/core/loop.hh>
22+
#include <seastar/core/lowres_clock.hh>
1923
#include <seastar/core/print.hh>
24+
#include <seastar/core/when_all.hh>
25+
#include <seastar/coroutine/maybe_yield.hh>
26+
#include <seastar/util/later.hh>
2027

2128
#include <sys/mman.h>
2229

@@ -26,36 +33,56 @@
2633

2734
namespace wasm {
2835

29-
heap_allocator::heap_allocator(config c) {
36+
heap_allocator::heap_allocator(config c)
37+
: _memset_chunk_size(c.memset_chunk_size) {
3038
size_t page_size = ::getpagesize();
31-
_max_size = ss::align_up(c.heap_memory_size, page_size);
39+
_size = ss::align_up(c.heap_memory_size, page_size);
3240
for (size_t i = 0; i < c.num_heaps; ++i) {
33-
auto buffer = ss::allocate_aligned_buffer<uint8_t>(
34-
_max_size, page_size);
35-
std::memset(buffer.get(), 0, _max_size);
36-
_memory_pool.emplace_back(std::move(buffer), _max_size);
41+
auto buffer = ss::allocate_aligned_buffer<uint8_t>(_size, page_size);
42+
_memory_pool.push_back(
43+
async_zero_memory({std::move(buffer), _size}, _size));
3744
}
3845
}
3946

40-
std::optional<heap_memory> heap_allocator::allocate(request req) {
47+
ss::future<> heap_allocator::stop() {
48+
auto pool = std::exchange(_memory_pool, {});
49+
co_await ss::when_all_succeed(pool.begin(), pool.end());
50+
}
51+
52+
ss::future<std::optional<heap_memory>> heap_allocator::allocate(request req) {
4153
if (_memory_pool.empty()) {
42-
return std::nullopt;
54+
co_return std::nullopt;
4355
}
44-
size_t size = _memory_pool.front().size;
45-
if (size < req.minimum || size > req.maximum) {
46-
return std::nullopt;
56+
if (_size < req.minimum || _size > req.maximum) {
57+
co_return std::nullopt;
4758
}
48-
heap_memory front = std::move(_memory_pool.front());
59+
ss::future<heap_memory> front = std::move(_memory_pool.front());
4960
_memory_pool.pop_front();
50-
return front;
61+
co_return co_await std::move(front);
5162
}
5263

5364
void heap_allocator::deallocate(heap_memory m, size_t used_amount) {
54-
std::memset(m.data.get(), 0, used_amount);
55-
_memory_pool.push_back(std::move(m));
65+
_memory_pool.push_back(async_zero_memory(std::move(m), used_amount));
66+
}
67+
68+
ss::future<heap_memory>
69+
heap_allocator::async_zero_memory(heap_memory m, size_t used_amount) {
70+
uint8_t* data = m.data.get();
71+
size_t remaining = used_amount;
72+
while (true) {
73+
if (remaining <= _memset_chunk_size) {
74+
std::memset(data, 0, remaining);
75+
co_return m;
76+
}
77+
std::memset(data, 0, _memset_chunk_size);
78+
// NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
79+
data += _memset_chunk_size;
80+
remaining -= _memset_chunk_size;
81+
co_await ss::coroutine::maybe_yield();
82+
}
5683
}
5784

58-
size_t heap_allocator::max_size() const { return _max_size; }
85+
size_t heap_allocator::max_size() const { return _size; }
5986

6087
stack_memory::stack_memory(stack_bounds bounds, allocated_memory data)
6188
: _bounds(bounds)

src/v/wasm/allocator.h

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414

1515
#include <seastar/core/aligned_buffer.hh>
1616
#include <seastar/core/chunked_fifo.hh>
17-
#include <seastar/util/optimized_optional.hh>
17+
#include <seastar/core/condition-variable.hh>
18+
#include <seastar/core/future.hh>
19+
#include <seastar/core/gate.hh>
1820

1921
#include <absl/container/btree_set.h>
2022

@@ -58,10 +60,18 @@ class heap_allocator {
5860
size_t heap_memory_size;
5961
// The total number of heaps allocated per core.
6062
size_t num_heaps;
63+
// The amount of memory we zero out at once.
64+
size_t memset_chunk_size;
6165
};
6266

6367
explicit heap_allocator(config);
6468

69+
/**
70+
* Stop this allocator, waiting for any asynchronous zero'ing of bytes to
71+
* finish.
72+
*/
73+
ss::future<> stop();
74+
6575
/**
6676
* A request of heap memory based on the following bounds.
6777
*/
@@ -71,18 +81,22 @@ class heap_allocator {
7181
};
7282

7383
/**
74-
* Allocate heap memory by taking a memory instance from the pool.
84+
* Allocate heap memory by taking a memory instance from the pool, returns
85+
* std::nullopt if the memory request cannot be fulfilled either because the
86+
* request does not fix within our bounds or because all memory is currently
87+
* allocated.
7588
*
7689
* Memory returned from this method will be zero-filled.
7790
*/
78-
std::optional<heap_memory> allocate(request);
91+
ss::future<std::optional<heap_memory>> allocate(request);
7992

8093
/**
8194
* Deallocate heap memory by returing a memory instance to the pool.
8295
*
8396
* used_amount is to zero out the used memory from the heap, this is
8497
* required so that if only a portion of a large memory space was used we
85-
* don't have to touch all the bytes.
98+
* don't have to touch all the bytes. If the used_amount is over some
99+
* threshold then we make zero'ing out the bytes an asynchronous task.
86100
*/
87101
void deallocate(heap_memory, size_t used_amount);
88102

@@ -92,10 +106,13 @@ class heap_allocator {
92106
size_t max_size() const;
93107

94108
private:
95-
size_t _max_size;
109+
ss::future<heap_memory> async_zero_memory(heap_memory, size_t used_amount);
110+
111+
size_t _memset_chunk_size;
112+
size_t _size;
96113
// We expect this list to be small, so override the chunk to be smaller too.
97114
static constexpr size_t items_per_chunk = 16;
98-
ss::chunked_fifo<heap_memory, items_per_chunk> _memory_pool;
115+
ss::chunked_fifo<ss::future<heap_memory>, items_per_chunk> _memory_pool;
99116
};
100117

101118
/**

src/v/wasm/tests/CMakeLists.txt

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,6 @@ rp_test(
112112
LABELS wasm
113113
)
114114

115-
# TODO(rockwood): Enable on CI when we determine why the module
116-
# OOMs in this environment
117115
rp_test(
118116
BENCHMARK_TEST
119117
BINARY_NAME wasm_transform
@@ -129,5 +127,4 @@ rp_test(
129127
"${TESTDATA_DIR}/identity.wasm"
130128
LABELS
131129
wasm
132-
disable_on_ci
133130
)

src/v/wasm/tests/wasm_allocator_test.cc

Lines changed: 99 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,12 @@
1010
*/
1111

1212
#include "gmock/gmock.h"
13+
#include "units.h"
1314
#include "wasm/allocator.h"
1415

16+
#include <seastar/core/reactor.hh>
17+
#include <seastar/core/when_all.hh>
18+
1519
#include <gmock/gmock.h>
1620
#include <gtest/gtest.h>
1721

@@ -22,16 +26,22 @@
2226

2327
namespace wasm {
2428

29+
constexpr static auto default_memset_chunk_size = 10_MiB;
30+
2531
using ::testing::Optional;
2632

2733
TEST(HeapAllocatorParamsTest, SizeIsAligned) {
2834
size_t page_size = ::getpagesize();
2935
heap_allocator allocator(heap_allocator::config{
3036
.heap_memory_size = page_size + 3,
3137
.num_heaps = 1,
38+
.memset_chunk_size = default_memset_chunk_size,
3239
});
33-
auto mem = allocator.allocate(
34-
{.minimum = 0, .maximum = std::numeric_limits<size_t>::max()});
40+
auto mem = allocator
41+
.allocate(
42+
{.minimum = 0,
43+
.maximum = std::numeric_limits<size_t>::max()})
44+
.get();
3545
ASSERT_TRUE(mem.has_value());
3646
EXPECT_EQ(mem->size, page_size * 2);
3747
}
@@ -41,8 +51,10 @@ TEST(HeapAllocatorTest, CanAllocateOne) {
4151
heap_allocator allocator(heap_allocator::config{
4252
.heap_memory_size = page_size,
4353
.num_heaps = 1,
54+
.memset_chunk_size = default_memset_chunk_size,
4455
});
45-
auto mem = allocator.allocate({.minimum = page_size, .maximum = page_size});
56+
auto mem
57+
= allocator.allocate({.minimum = page_size, .maximum = page_size}).get();
4658
ASSERT_TRUE(mem.has_value());
4759
EXPECT_EQ(mem->size, page_size);
4860
}
@@ -52,14 +64,17 @@ TEST(HeapAllocatorTest, MustAllocateWithinBounds) {
5264
heap_allocator allocator(heap_allocator::config{
5365
.heap_memory_size = page_size,
5466
.num_heaps = 1,
67+
.memset_chunk_size = default_memset_chunk_size,
5568
});
5669
// minimum too large
57-
auto mem = allocator.allocate(
58-
{.minimum = page_size * 2, .maximum = page_size * 3});
70+
auto mem = allocator
71+
.allocate({.minimum = page_size * 2, .maximum = page_size * 3})
72+
.get();
5973
EXPECT_FALSE(mem.has_value());
6074
// maximum too small
61-
mem = allocator.allocate(
62-
{.minimum = page_size / 2, .maximum = page_size - 1});
75+
mem = allocator
76+
.allocate({.minimum = page_size / 2, .maximum = page_size - 1})
77+
.get();
6378
EXPECT_FALSE(mem.has_value());
6479
}
6580

@@ -68,10 +83,13 @@ TEST(HeapAllocatorTest, Exhaustion) {
6883
heap_allocator allocator(heap_allocator::config{
6984
.heap_memory_size = page_size,
7085
.num_heaps = 1,
86+
.memset_chunk_size = default_memset_chunk_size,
7187
});
72-
auto mem = allocator.allocate({.minimum = page_size, .maximum = page_size});
88+
auto mem
89+
= allocator.allocate({.minimum = page_size, .maximum = page_size}).get();
7390
EXPECT_TRUE(mem.has_value());
74-
mem = allocator.allocate({.minimum = page_size, .maximum = page_size});
91+
mem
92+
= allocator.allocate({.minimum = page_size, .maximum = page_size}).get();
7593
EXPECT_FALSE(mem.has_value());
7694
}
7795

@@ -80,25 +98,90 @@ TEST(HeapAllocatorTest, CanReturnMemoryToThePool) {
8098
heap_allocator allocator(heap_allocator::config{
8199
.heap_memory_size = page_size,
82100
.num_heaps = 3,
101+
.memset_chunk_size = default_memset_chunk_size,
83102
});
84103
heap_allocator::request req{.minimum = page_size, .maximum = page_size};
85104
std::vector<heap_memory> allocated;
86105
for (int i = 0; i < 3; ++i) {
87-
auto mem = allocator.allocate(req);
106+
auto mem = allocator.allocate(req).get();
88107
ASSERT_TRUE(mem.has_value());
89108
allocated.push_back(std::move(*mem));
90109
}
91-
auto mem = allocator.allocate(req);
110+
auto mem = allocator.allocate(req).get();
92111
EXPECT_FALSE(mem.has_value());
93112
mem = std::move(allocated.back());
94113
allocated.pop_back();
95114
allocator.deallocate(std::move(*mem), /*used_amount=*/0);
96-
mem = allocator.allocate(req);
115+
mem = allocator.allocate(req).get();
97116
EXPECT_TRUE(mem.has_value());
98-
mem = allocator.allocate(req);
117+
mem = allocator.allocate(req).get();
99118
EXPECT_FALSE(mem.has_value());
100119
}
101120

121+
// We want to test a specific scenario where the deallocation happens
122+
// asynchronously, however, release mode continuations can be "inlined" into the
123+
// current executing task, so we can't enforce the scenario we want to test, so
124+
// this test only runs in debug mode, which forces as many scheduling points as
125+
// possible and the zeroing task always happens asynchronously.
126+
#ifndef NDEBUG
127+
128+
using ::testing::_;
129+
130+
TEST(HeapAllocatorTest, AsyncDeallocationOnlyOneAwakened) {
131+
size_t page_size = ::getpagesize();
132+
// force deallocations to be asynchronous.
133+
size_t test_chunk_size = page_size / 4;
134+
heap_allocator allocator(heap_allocator::config{
135+
.heap_memory_size = page_size,
136+
.num_heaps = 2,
137+
.memset_chunk_size = test_chunk_size,
138+
});
139+
heap_allocator::request req{.minimum = page_size, .maximum = page_size};
140+
// Start on deallocation in the background.
141+
allocator.deallocate(allocator.allocate(req).get().value(), page_size);
142+
// Start another deallocation in the background so there is no memory left.
143+
allocator.deallocate(allocator.allocate(req).get().value(), page_size);
144+
auto waiter1 = allocator.allocate(req);
145+
auto waiter2 = allocator.allocate(req);
146+
// There should not be memory available, so both our requests for
147+
// memory should be waiting.
148+
EXPECT_FALSE(waiter1.available());
149+
EXPECT_FALSE(waiter2.available());
150+
// waiter1 should be notified first there is memory available, but waiter2
151+
// should not be notified yet as there is a deallocation in flight.
152+
// waiter2 may or may not be ready depending on task execution order (which
153+
// is randomized in debug mode), so we cannot assert that it's not
154+
// completed.
155+
EXPECT_THAT(waiter1.get(), Optional(_));
156+
// The second waiter completes and we can allocate memory
157+
EXPECT_THAT(waiter2.get(), Optional(_));
158+
}
159+
160+
TEST(HeapAllocatorTest, AsyncDeallocationNotEnoughMemory) {
161+
size_t page_size = ::getpagesize();
162+
// force deallocations to be asynchronous.
163+
size_t test_chunk_size = page_size / 4;
164+
heap_allocator allocator(heap_allocator::config{
165+
.heap_memory_size = page_size,
166+
.num_heaps = 1,
167+
.memset_chunk_size = test_chunk_size,
168+
});
169+
heap_allocator::request req{.minimum = page_size, .maximum = page_size};
170+
allocator.deallocate(allocator.allocate(req).get().value(), page_size);
171+
172+
// The first request succeeded and is waiting for the deallocation to finish
173+
auto waiter1 = allocator.allocate(req);
174+
EXPECT_FALSE(waiter1.available());
175+
176+
// Future requests fail immediately, no memory is available
177+
auto waiter2 = allocator.allocate(req);
178+
EXPECT_TRUE(waiter2.available());
179+
180+
EXPECT_THAT(waiter1.get(), Optional(_));
181+
EXPECT_EQ(waiter2.get(), std::nullopt);
182+
}
183+
#endif
184+
102185
MATCHER(HeapIsZeroed, "is zeroed") {
103186
std::span<uint8_t> d = {arg.data.get(), arg.size};
104187
for (auto e : d) {
@@ -114,15 +197,16 @@ TEST(HeapAllocatorTest, MemoryIsZeroFilled) {
114197
heap_allocator allocator(heap_allocator::config{
115198
.heap_memory_size = page_size,
116199
.num_heaps = 1,
200+
.memset_chunk_size = default_memset_chunk_size,
117201
});
118202
heap_allocator::request req{.minimum = page_size, .maximum = page_size};
119-
auto allocated = allocator.allocate(req);
203+
auto allocated = allocator.allocate(req).get();
120204
ASSERT_TRUE(allocated.has_value());
121205
EXPECT_THAT(allocated, Optional(HeapIsZeroed()));
122206
std::fill_n(allocated->data.get(), 4, 1);
123207
allocator.deallocate(*std::move(allocated), 4);
124208

125-
allocated = allocator.allocate(req);
209+
allocated = allocator.allocate(req).get();
126210
ASSERT_TRUE(allocated.has_value());
127211
EXPECT_THAT(allocated, Optional(HeapIsZeroed()));
128212
}

0 commit comments

Comments
 (0)