Skip to content

Commit 399cf61

Browse files
JaySon-Huangti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#10275
Signed-off-by: ti-chi-bot <[email protected]>
1 parent a26813d commit 399cf61

37 files changed

+5559
-1969
lines changed

dbms/src/Common/CurrentMetrics.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,13 @@
6464
M(DT_SnapshotOfPlaceIndex) \
6565
M(DT_SnapshotOfBitmapFilter) \
6666
M(DT_SnapshotOfDisaggReadNodeRead) \
67+
M(NumKeyspace) \
68+
M(NumIStorage) \
69+
M(DT_NumStorageDeltaMerge) \
70+
M(DT_NumSegment) \
71+
M(DT_NumMemTable) \
72+
M(DT_BytesMemTable) \
73+
M(DT_BytesMemTableAllocated) \
6774
M(IOLimiterPendingBgWriteReq) \
6875
M(IOLimiterPendingFgWriteReq) \
6976
M(IOLimiterPendingBgReadReq) \

dbms/src/Interpreters/AsynchronousMetrics.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,11 @@ void AsynchronousMetrics::update()
312312
{
313313
GET_METRIC(tiflash_storage_s3_gc_status, type_owner).Set(1.0);
314314
}
315+
else
316+
{
317+
// If the current node is not the owner, we reset the metric to 0
318+
GET_METRIC(tiflash_storage_s3_gc_status, type_owner).Set(0.0);
319+
}
315320
}
316321

317322
#if USE_MIMALLOC

dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -74,19 +74,6 @@ class ColumnFile
7474
INMEMORY_FILE = 4,
7575
};
7676

77-
struct Cache
78-
{
79-
explicit Cache(const Block & header)
80-
: block(header.cloneWithColumns(header.cloneEmptyColumns()))
81-
{}
82-
explicit Cache(Block && block)
83-
: block(std::move(block))
84-
{}
85-
86-
std::mutex mutex;
87-
Block block;
88-
};
89-
using CachePtr = std::shared_ptr<Cache>;
9077
using ColIdToOffset = std::unordered_map<ColId, size_t>;
9178

9279
public:
@@ -95,6 +82,7 @@ class ColumnFile
9582

9683
virtual size_t getRows() const { return 0; }
9784
virtual size_t getBytes() const { return 0; }
85+
virtual size_t getAllocateBytes() const { return 0; }
9886
virtual size_t getDeletes() const { return 0; }
9987

10088
virtual Type getType() const = 0;
@@ -138,7 +126,13 @@ class ColumnFile
138126
/// been persisted in the disk and their data will be immutable.
139127
virtual bool isAppendable() const { return false; }
140128
virtual void disableAppend() {}
141-
virtual bool append(
129+
130+
struct AppendResult
131+
{
132+
bool success = false; // whether the append is successful
133+
size_t new_alloc_bytes = 0; // the new allocated bytes after append
134+
};
135+
virtual AppendResult append(
142136
const DMContext & /*dm_context*/,
143137
const Block & /*data*/,
144138
size_t /*offset*/,

dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ void ColumnFileInMemory::fillColumns(const ColumnDefines & col_defs, size_t col_
4242
// Copy data from cache
4343
const auto & type = getDataType(cd.id);
4444
auto col_data = type->createColumn();
45+
col_data->reserve(rows);
4546
col_data->insertRangeFrom(*(cache->block.getByPosition(col_offset).column), 0, rows);
4647
// Cast if need
4748
auto col_converted = convertColumnByColumnDefineIfNeed(type, std::move(col_data), cd);
@@ -65,36 +66,45 @@ ColumnFileReaderPtr ColumnFileInMemory::getReader(
6566
return std::make_shared<ColumnFileInMemoryReader>(*this, col_defs);
6667
}
6768

68-
bool ColumnFileInMemory::append(
69+
void ColumnFileInMemory::disableAppend()
70+
{
71+
disable_append = true;
72+
// TODO: Call shrinkToFit() to release the extra memory of the cache block.
73+
}
74+
75+
ColumnFile::AppendResult ColumnFileInMemory::append(
6976
const DMContext & context,
7077
const Block & data,
7178
size_t offset,
7279
size_t limit,
7380
size_t data_bytes)
7481
{
7582
if (disable_append)
76-
return false;
83+
return AppendResult{false, 0};
7784

7885
std::scoped_lock lock(cache->mutex);
7986
if (!isSameSchema(cache->block, data))
80-
return false;
87+
return AppendResult{false, 0};
8188

8289
// check whether this instance overflows
8390
if (cache->block.rows() >= context.delta_cache_limit_rows
8491
|| cache->block.bytes() >= context.delta_cache_limit_bytes)
85-
return false;
92+
return AppendResult{false, 0};
8693

94+
size_t new_alloc_block_bytes = 0;
8795
for (size_t i = 0; i < cache->block.columns(); ++i)
8896
{
8997
const auto & col = data.getByPosition(i).column;
9098
const auto & cache_col = *cache->block.getByPosition(i).column;
9199
auto * mutable_cache_col = const_cast<IColumn *>(&cache_col);
100+
size_t alloc_bytes = mutable_cache_col->allocatedBytes();
92101
mutable_cache_col->insertRangeFrom(*col, offset, limit);
102+
new_alloc_block_bytes += mutable_cache_col->allocatedBytes() - alloc_bytes;
93103
}
94104

95105
rows += limit;
96106
bytes += data_bytes;
97-
return true;
107+
return AppendResult{true, new_alloc_block_bytes};
98108
}
99109

100110
Block ColumnFileInMemory::readDataForFlush() const

dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,20 @@ class ColumnFileInMemory : public ColumnFile
3131
friend class ColumnFileInMemoryReader;
3232
friend struct Remote::Serializer;
3333

34+
struct Cache
35+
{
36+
explicit Cache(const Block & header)
37+
: block(header.cloneWithColumns(header.cloneEmptyColumns()))
38+
{}
39+
explicit Cache(Block && block)
40+
: block(std::move(block))
41+
{}
42+
43+
std::mutex mutex;
44+
Block block;
45+
};
46+
using CachePtr = std::shared_ptr<Cache>;
47+
3448
private:
3549
ColumnFileSchemaPtr schema;
3650

@@ -66,6 +80,7 @@ class ColumnFileInMemory : public ColumnFile
6680

6781
size_t getRows() const override { return rows; }
6882
size_t getBytes() const override { return bytes; }
83+
size_t getAllocateBytes() const override { return cache->block.allocatedBytes(); }
6984

7085
CachePtr getCache() { return cache; }
7186

@@ -81,9 +96,13 @@ class ColumnFileInMemory : public ColumnFile
8196
ReadTag) const override;
8297

8398
bool isAppendable() const override { return !disable_append; }
84-
void disableAppend() override { disable_append = true; }
85-
bool append(const DMContext & dm_context, const Block & data, size_t offset, size_t limit, size_t data_bytes)
86-
override;
99+
void disableAppend() override;
100+
AppendResult append(
101+
const DMContext & dm_context,
102+
const Block & data,
103+
size_t offset,
104+
size_t limit,
105+
size_t data_bytes) override;
87106

88107
Block readDataForFlush() const;
89108

dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,8 +275,15 @@ size_t DeltaValueSpace::getTotalCacheBytes() const
275275
return mem_table_set->getBytes() + persisted_file_set->getTotalCacheBytes();
276276
}
277277

278+
size_t DeltaValueSpace::getTotalAllocatedBytes() const
279+
{
280+
std::scoped_lock lock(mutex);
281+
return mem_table_set->getAllocatedBytes();
282+
}
283+
278284
size_t DeltaValueSpace::getValidCacheRows() const
279285
{
286+
// FIXME: Seems that this function is the same as getTotalCacheRows().
280287
std::scoped_lock lock(mutex);
281288
return mem_table_set->getRows() + persisted_file_set->getValidCacheRows();
282289
}

dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ class DeltaValueSpace
8484

8585
/// Note that it's safe to do multiple flush concurrently but only one of them can succeed,
8686
/// and other thread's work is just a waste of resource.
87-
/// So we only allow one flush task running at any time to aviod waste resource.
87+
/// So we only allow one flush task running at any time to avoid waste resource.
8888
std::atomic_bool is_flushing = false;
8989

9090
std::atomic<size_t> last_try_flush_rows = 0;
@@ -219,6 +219,7 @@ class DeltaValueSpace
219219

220220
size_t getTotalCacheRows() const;
221221
size_t getTotalCacheBytes() const;
222+
size_t getTotalAllocatedBytes() const;
222223
size_t getValidCacheRows() const;
223224

224225
bool isFlushing() const { return is_flushing; }

0 commit comments

Comments
 (0)