Skip to content

Commit 73354ee

Browse files
committed
Update current metrics correctly
Signed-off-by: JaySon-Huang <[email protected]>
1 parent c0f8ea4 commit 73354ee

File tree

3 files changed

+137
-53
lines changed

3 files changed

+137
-53
lines changed

dbms/src/Common/CurrentMetrics.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@
9191
M(MemoryTrackingSharedColumnData) \
9292
M(DT_NumSegments) \
9393
M(DT_NumMemTable) \
94-
M(DT_BytesMemTable)
94+
M(DT_BytesMemTable) \
95+
M(DT_BytesMemTableAllocated)
9596

9697
namespace CurrentMetrics
9798
{

dbms/src/Storages/DeltaMerge/Delta/MemTableSet.cpp

Lines changed: 85 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -26,26 +26,72 @@ namespace CurrentMetrics
2626
{
2727
extern const Metric DT_NumMemTable;
2828
extern const Metric DT_BytesMemTable;
29+
extern const Metric DT_BytesMemTableAllocated;
2930
} // namespace CurrentMetrics
3031

3132
namespace DB::DM
3233
{
3334

35+
/// Member functions of MemTableSet::Statistic ///
36+
37+
MemTableSet::Statistic::Statistic()
38+
: holder_bytes(CurrentMetrics::DT_BytesMemTable, 0)
39+
, holder_allocated_bytes(CurrentMetrics::DT_BytesMemTableAllocated, 0)
40+
{}
41+
42+
void MemTableSet::Statistic::append(
43+
size_t rows_added,
44+
size_t bytes_added,
45+
size_t allocated_bytes_added,
46+
size_t deletes_added,
47+
size_t files_added)
48+
{
49+
column_files_count += files_added;
50+
rows += rows_added;
51+
bytes += bytes_added;
52+
allocated_bytes += allocated_bytes_added;
53+
deletes += deletes_added;
54+
// update the current metrics
55+
holder_bytes.changeTo(bytes.load());
56+
holder_allocated_bytes.changeTo(allocated_bytes.load());
57+
}
58+
59+
void MemTableSet::Statistic::resetTo(
60+
size_t new_column_files_count,
61+
size_t new_rows,
62+
size_t new_bytes,
63+
size_t new_allocated_bytes,
64+
size_t new_deletes)
65+
{
66+
column_files_count = new_column_files_count;
67+
rows = new_rows;
68+
bytes = new_bytes;
69+
allocated_bytes = new_allocated_bytes;
70+
deletes = new_deletes;
71+
// update the current metrics
72+
holder_bytes.changeTo(bytes.load());
73+
holder_allocated_bytes.changeTo(allocated_bytes.load());
74+
}
75+
76+
/// Member functions of MemTableSet ///
77+
3478
MemTableSet::MemTableSet(const ColumnFiles & in_memory_files)
3579
: holder_counter(CurrentMetrics::DT_NumMemTable, 1)
36-
, holder_allocated_bytes(CurrentMetrics::DT_BytesMemTable, 0)
3780
, column_files(in_memory_files)
3881
, log(Logger::get())
3982
{
40-
column_files_count = column_files.size();
83+
size_t new_rows = 0;
84+
size_t new_bytes = 0;
85+
size_t new_alloc_bytes = 0;
86+
size_t new_deletes = 0;
4187
for (const auto & file : column_files)
4288
{
43-
rows += file->getRows();
44-
bytes += file->getBytes();
45-
allocated_bytes += file->getAllocateBytes();
46-
deletes += file->getDeletes();
89+
new_rows += file->getRows();
90+
new_bytes += file->getBytes();
91+
new_alloc_bytes += file->getAllocateBytes();
92+
new_deletes += file->getDeletes();
4793
}
48-
holder_allocated_bytes.changeTo(allocated_bytes.load());
94+
stat.resetTo(column_files.size(), new_rows, new_bytes, new_alloc_bytes, new_deletes);
4995
}
5096

5197
void MemTableSet::appendColumnFileInner(const ColumnFilePtr & column_file)
@@ -60,12 +106,12 @@ void MemTableSet::appendColumnFileInner(const ColumnFilePtr & column_file)
60106
}
61107

62108
column_files.push_back(column_file);
63-
column_files_count = column_files.size();
64-
65-
rows += column_file->getRows();
66-
bytes += column_file->getBytes();
67-
allocated_bytes += column_file->getAllocateBytes();
68-
deletes += column_file->getDeletes();
109+
stat.append(
110+
column_file->getRows(),
111+
column_file->getBytes(),
112+
column_file->getAllocateBytes(),
113+
column_file->getDeletes(),
114+
/*files_added=*/1);
69115
}
70116

71117
std::pair</* New */ ColumnFiles, /* Flushed */ ColumnFiles> MemTableSet::diffColumnFiles(
@@ -229,9 +275,13 @@ void MemTableSet::appendToCache(DMContext & context, const Block & block, size_t
229275
if (unlikely(!append_res.success))
230276
throw Exception("Write to MemTableSet failed", ErrorCodes::LOGICAL_ERROR);
231277
}
232-
rows += limit;
233-
bytes += append_bytes;
234-
allocated_bytes += append_res.new_alloc_bytes;
278+
279+
stat.append( //
280+
limit,
281+
append_bytes,
282+
append_res.new_alloc_bytes,
283+
/*deletes_added*/ 0,
284+
/*files_added*/ 0);
235285
}
236286

237287
void MemTableSet::appendDeleteRange(const RowKeyRange & delete_range)
@@ -294,13 +344,18 @@ ColumnFileSetSnapshotPtr MemTableSet::createSnapshot(
294344
// This may indicate that you forget to acquire a lock -- there are modifications
295345
// while this function is still running...
296346
RUNTIME_CHECK(
297-
total_rows == rows && total_deletes == deletes,
347+
total_rows == stat.rows && total_deletes == stat.deletes,
298348
total_rows,
299-
rows.load(),
349+
stat.rows.load(),
300350
total_deletes,
301-
deletes.load());
302-
303-
return std::make_shared<ColumnFileSetSnapshot>(data_provider, std::move(column_files_snap), rows, bytes, deletes);
351+
stat.deletes.load());
352+
353+
return std::make_shared<ColumnFileSetSnapshot>(
354+
data_provider,
355+
std::move(column_files_snap),
356+
stat.rows,
357+
stat.bytes,
358+
stat.deletes);
304359
}
305360

306361
ColumnFileFlushTaskPtr MemTableSet::buildFlushTask(
@@ -333,16 +388,16 @@ ColumnFileFlushTaskPtr MemTableSet::buildFlushTask(
333388
cur_rows_offset += column_file->getRows();
334389
cur_deletes_offset += column_file->getDeletes();
335390
}
336-
if (unlikely(flush_task->getFlushRows() != rows || flush_task->getFlushDeletes() != deletes))
391+
if (unlikely(flush_task->getFlushRows() != stat.rows || flush_task->getFlushDeletes() != stat.deletes))
337392
{
338393
LOG_ERROR(
339394
log,
340395
"Rows and deletes check failed. Actual: rows[{}], deletes[{}]. Expected: rows[{}], deletes[{}]. Column "
341396
"Files: {}",
342397
flush_task->getFlushRows(),
343398
flush_task->getFlushDeletes(),
344-
rows.load(),
345-
deletes.load(),
399+
stat.rows.load(),
400+
stat.deletes.load(),
346401
ColumnFile::filesToString(column_files));
347402
throw Exception("Rows and deletes check failed.", ErrorCodes::LOGICAL_ERROR);
348403
}
@@ -378,11 +433,12 @@ void MemTableSet::removeColumnFilesInFlushTask(const ColumnFileFlushTask & flush
378433
new_deletes += column_file->getDeletes();
379434
}
380435
column_files.swap(new_column_files);
381-
column_files_count = column_files.size();
382-
rows = new_rows;
383-
bytes = new_bytes;
384-
allocated_bytes = new_alloc_bytes;
385-
deletes = new_deletes;
436+
stat.resetTo( //
437+
new_column_files.size(),
438+
new_rows,
439+
new_bytes,
440+
new_alloc_bytes,
441+
new_deletes);
386442
}
387443

388444

dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h

Lines changed: 50 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -32,25 +32,60 @@ class MemTableSet
3232
: public std::enable_shared_from_this<MemTableSet>
3333
, private boost::noncopyable
3434
{
35+
private:
36+
struct Statistic
37+
{
38+
// TODO: check the proper memory_order when use this atomic variable
39+
std::atomic<size_t> column_files_count = 0;
40+
std::atomic<size_t> rows = 0;
41+
std::atomic<size_t> bytes = 0;
42+
std::atomic<size_t> allocated_bytes = 0;
43+
std::atomic<size_t> deletes = 0;
44+
45+
CurrentMetrics::Increment holder_bytes;
46+
CurrentMetrics::Increment holder_allocated_bytes;
47+
48+
Statistic();
49+
50+
String info() const
51+
{
52+
return fmt::format(
53+
"MemTableSet: {} column files, {} rows, {} bytes, {} deletes",
54+
column_files_count.load(),
55+
rows.load(),
56+
bytes.load(),
57+
deletes.load());
58+
}
59+
60+
void append(
61+
size_t rows_added,
62+
size_t bytes_added,
63+
size_t allocated_bytes_added,
64+
size_t deletes_added,
65+
size_t files_added);
66+
67+
void resetTo(
68+
size_t new_column_files_count,
69+
size_t new_rows,
70+
size_t new_bytes,
71+
size_t new_allocated_bytes,
72+
size_t new_deletes);
73+
};
74+
3575
#ifndef DBMS_PUBLIC_GTEST
3676
private:
3777
#else
3878
public:
3979
#endif
4080
// Keep track of the number of mem-table in memory.
4181
CurrentMetrics::Increment holder_counter;
42-
CurrentMetrics::Increment holder_allocated_bytes;
4382

4483
// Note that we must update `column_files_count` for outer thread-safe after `column_files` changed
4584
ColumnFiles column_files;
4685

47-
// In order to avoid data-race, we use atomic variables to track the state of this MemTableSet.
48-
// TODO: check the proper memory_order when use this atomic variable
49-
std::atomic<size_t> column_files_count;
50-
std::atomic<size_t> rows = 0;
51-
std::atomic<size_t> bytes = 0;
52-
std::atomic<size_t> allocated_bytes = 0;
53-
std::atomic<size_t> deletes = 0;
86+
// In order to avoid data-race and make it lightweight for accessing the statistic
87+
// of mem-table, we use atomic variables to track the state of this MemTableSet.
88+
Statistic stat;
5489

5590
LoggerPtr log;
5691

@@ -68,21 +103,13 @@ class MemTableSet
68103
void resetLogger(const LoggerPtr & segment_log) { log = segment_log; }
69104

70105
/// Thread safe part start
71-
String info() const
72-
{
73-
return fmt::format(
74-
"MemTableSet: {} column files, {} rows, {} bytes, {} deletes",
75-
column_files_count.load(),
76-
rows.load(),
77-
bytes.load(),
78-
deletes.load());
79-
}
80-
81-
size_t getColumnFileCount() const { return column_files_count.load(); }
82-
size_t getRows() const { return rows.load(); }
83-
size_t getBytes() const { return bytes.load(); }
84-
size_t getAllocatedBytes() const { return allocated_bytes.load(); }
85-
size_t getDeletes() const { return deletes.load(); }
106+
String info() const { return stat.info(); }
107+
108+
size_t getColumnFileCount() const { return stat.column_files_count.load(); }
109+
size_t getRows() const { return stat.rows.load(); }
110+
size_t getBytes() const { return stat.bytes.load(); }
111+
size_t getAllocatedBytes() const { return stat.allocated_bytes.load(); }
112+
size_t getDeletes() const { return stat.deletes.load(); }
86113
/// Thread safe part end
87114

88115
/**

0 commit comments

Comments
 (0)