Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions dbms/src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@
M(DT_SnapshotOfReplayVersionChain) \
M(DT_SnapshotOfBitmapFilter) \
M(DT_SnapshotOfDisaggReadNodeRead) \
M(NumKeyspace) \
M(NumIStorage) \
M(DT_NumStorageDeltaMerge) \
M(DT_NumSegment) \
M(DT_NumMemTable) \
M(DT_BytesMemTable) \
M(DT_BytesMemTableAllocated) \
M(IOLimiterPendingBgWriteReq) \
M(IOLimiterPendingFgWriteReq) \
M(IOLimiterPendingBgReadReq) \
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Interpreters/AsynchronousMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,11 @@ void AsynchronousMetrics::update()
{
GET_METRIC(tiflash_storage_s3_gc_status, type_owner).Set(1.0);
}
else
{
// If the current node is not the owner, we reset the metric to 0
GET_METRIC(tiflash_storage_s3_gc_status, type_owner).Set(0.0);
}
}

#if USE_MIMALLOC
Expand Down
22 changes: 8 additions & 14 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,6 @@ class ColumnFile
INMEMORY_FILE = 4,
};

struct Cache
{
explicit Cache(const Block & header)
: block(header.cloneWithColumns(header.cloneEmptyColumns()))
{}
explicit Cache(Block && block)
: block(std::move(block))
{}

std::mutex mutex;
Block block;
};
using CachePtr = std::shared_ptr<Cache>;
using ColIdToOffset = std::unordered_map<ColId, size_t>;

public:
Expand All @@ -94,6 +81,7 @@ class ColumnFile

virtual size_t getRows() const { return 0; }
virtual size_t getBytes() const { return 0; }
virtual size_t getAllocateBytes() const { return 0; }
virtual size_t getDeletes() const { return 0; }

virtual Type getType() const = 0;
Expand Down Expand Up @@ -137,7 +125,13 @@ class ColumnFile
/// been persisted in the disk and their data will be immutable.
virtual bool isAppendable() const { return false; }
virtual void disableAppend() {}
virtual bool append(

struct AppendResult
{
bool success = false; // whether the append is successful
size_t new_alloc_bytes = 0; // the new allocated bytes after append
};
virtual AppendResult append(
const DMContext & /*dm_context*/,
const Block & /*data*/,
size_t /*offset*/,
Expand Down
20 changes: 15 additions & 5 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ void ColumnFileInMemory::fillColumns(const ColumnDefines & col_defs, size_t col_
// Copy data from cache
const auto & type = getDataType(cd.id);
auto col_data = type->createColumn();
col_data->reserve(rows);
col_data->insertRangeFrom(*(cache->block.getByPosition(col_offset).column), 0, rows);
// Cast if need
auto col_converted = convertColumnByColumnDefineIfNeed(type, std::move(col_data), cd);
Expand All @@ -64,36 +65,45 @@ ColumnFileReaderPtr ColumnFileInMemory::getReader(
return std::make_shared<ColumnFileInMemoryReader>(*this, col_defs);
}

bool ColumnFileInMemory::append(
void ColumnFileInMemory::disableAppend()
{
disable_append = true;
// TODO: Call shrinkToFit() to release the extra memory of the cache block.
}

ColumnFile::AppendResult ColumnFileInMemory::append(
const DMContext & context,
const Block & data,
size_t offset,
size_t limit,
size_t data_bytes)
{
if (disable_append)
return false;
return AppendResult{false, 0};

std::scoped_lock lock(cache->mutex);
if (!isSameSchema(cache->block, data))
return false;
return AppendResult{false, 0};

// check whether this instance overflows
if (cache->block.rows() >= context.delta_cache_limit_rows
|| cache->block.bytes() >= context.delta_cache_limit_bytes)
return false;
return AppendResult{false, 0};

size_t new_alloc_block_bytes = 0;
for (size_t i = 0; i < cache->block.columns(); ++i)
{
const auto & col = data.getByPosition(i).column;
const auto & cache_col = *cache->block.getByPosition(i).column;
auto * mutable_cache_col = const_cast<IColumn *>(&cache_col);
size_t alloc_bytes = mutable_cache_col->allocatedBytes();
mutable_cache_col->insertRangeFrom(*col, offset, limit);
new_alloc_block_bytes += mutable_cache_col->allocatedBytes() - alloc_bytes;
}

rows += limit;
bytes += data_bytes;
return true;
return AppendResult{true, new_alloc_block_bytes};
}

Block ColumnFileInMemory::readDataForFlush() const
Expand Down
25 changes: 22 additions & 3 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,20 @@ class ColumnFileInMemory : public ColumnFile
friend class ColumnFileInMemoryReader;
friend struct Remote::Serializer;

struct Cache
{
explicit Cache(const Block & header)
: block(header.cloneWithColumns(header.cloneEmptyColumns()))
{}
explicit Cache(Block && block)
: block(std::move(block))
{}

std::mutex mutex;
Block block;
};
using CachePtr = std::shared_ptr<Cache>;

private:
ColumnFileSchemaPtr schema;

Expand Down Expand Up @@ -67,6 +81,7 @@ class ColumnFileInMemory : public ColumnFile

size_t getRows() const override { return rows; }
size_t getBytes() const override { return bytes; }
size_t getAllocateBytes() const override { return cache->block.allocatedBytes(); }

CachePtr getCache() { return cache; }

Expand All @@ -82,9 +97,13 @@ class ColumnFileInMemory : public ColumnFile
ReadTag) const override;

bool isAppendable() const override { return !disable_append; }
void disableAppend() override { disable_append = true; }
bool append(const DMContext & dm_context, const Block & data, size_t offset, size_t limit, size_t data_bytes)
override;
void disableAppend() override;
AppendResult append(
const DMContext & dm_context,
const Block & data,
size_t offset,
size_t limit,
size_t data_bytes) override;

Block readDataForFlush() const;

Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,15 @@ size_t DeltaValueSpace::getTotalCacheBytes() const
return mem_table_set->getBytes();
}

size_t DeltaValueSpace::getTotalAllocatedBytes() const
{
std::scoped_lock lock(mutex);
return mem_table_set->getAllocatedBytes();
}

size_t DeltaValueSpace::getValidCacheRows() const
{
// FIXME: Seems that this function is the same as getTotalCacheRows().
std::scoped_lock lock(mutex);
return mem_table_set->getRows();
}
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class DeltaValueSpace

/// Note that it's safe to do multiple flush concurrently but only one of them can succeed,
/// and other thread's work is just a waste of resource.
/// So we only allow one flush task running at any time to aviod waste resource.
/// So we only allow one flush task running at any time to avoid waste resource.
std::atomic_bool is_flushing = false;

std::atomic<size_t> last_try_flush_rows = 0;
Expand Down Expand Up @@ -220,6 +220,7 @@ class DeltaValueSpace

size_t getTotalCacheRows() const;
size_t getTotalCacheBytes() const;
size_t getTotalAllocatedBytes() const;
size_t getValidCacheRows() const;

bool isFlushing() const { return is_flushing; }
Expand Down
Loading