From 00b5bdd1cfbec65247066d131ca62d56f9cf614f Mon Sep 17 00:00:00 2001 From: summer <1364868892@qq.com> Date: Wed, 10 Dec 2025 17:28:53 +0800 Subject: [PATCH 1/2] fix pq flush, occasional date loss in LSM --- src/core/blob_store/blob_cleaner.cpp | 10 +- src/core/kv_table/pq_table.h | 102 +++++++++++--------- src/core/lsm_store/file/file_store_impl.cpp | 4 +- 3 files changed, 65 insertions(+), 51 deletions(-) diff --git a/src/core/blob_store/blob_cleaner.cpp b/src/core/blob_store/blob_cleaner.cpp index 70a232a..e98fac4 100644 --- a/src/core/blob_store/blob_cleaner.cpp +++ b/src/core/blob_store/blob_cleaner.cpp @@ -33,8 +33,10 @@ BResult BlobCleaner::Restore(const FileInputViewRef &fileInputView, void BlobCleaner::TriggerSnapshot(uint64_t snapshotId, uint64_t blobStoreVersion, uint64_t seqId, BlobStoreSnapshotOperatorRef &blobStoreSnapshotOperator) { - // 快照互斥,如果当前有墓碑文件正在compaction,需要等待compaction完成并关闭,待快照完成重新开启,ReleaseTombstoneSnapshot入口 - StopTombstoneCompaction(); + if (mEnableTombstone) { + // 快照互斥,如果当前有墓碑文件正在compaction,需要等待compaction完成并关闭,待快照完成重新开启,ReleaseTombstoneSnapshot入口 + StopTombstoneCompaction(); + } mTombstoneFileManager->TriggerSnapshot(snapshotId, blobStoreVersion, seqId, blobStoreSnapshotOperator); } @@ -308,7 +310,9 @@ TombstoneServiceRef BlobCleaner::RegisterTombstoneService(const std::string &nam void BlobCleaner::ReleaseTombstoneSnapshot(uint64_t snapshotId) { mTombstoneFileManager->ReleaseSnapshot(snapshotId); - StartScheduleCompaction(); + if (mEnableTombstone) { + StartScheduleCompaction(); + } } } diff --git a/src/core/kv_table/pq_table.h b/src/core/kv_table/pq_table.h index 3981b24..80761f6 100644 --- a/src/core/kv_table/pq_table.h +++ b/src/core/kv_table/pq_table.h @@ -22,11 +22,43 @@ namespace ock { namespace bss { -class SkiplistProcessor; +using SkipListCompletedNotify = std::function; +class SkiplistProcessor : public Runnable { +public: + SkiplistProcessor(const PQTableIteratorRef iterator, const LsmStoreRef &lsmStore, const PQSkipList &skipList, + SkipListCompletedNotify notify) : mIterator(iterator), mLsmStore(lsmStore), + mSkipList(skipList), mNotify(notify) {} + + ~SkiplistProcessor() override = default; + + void Run() override + { + bool expect = false; + BResult ret = BSS_ERR; + do { + if (!mResourceCleanupOwnershipTaken.compare_exchange_strong(expect, true)) { + break; + } + if (UNLIKELY(mLsmStore == nullptr)) { + break; + } + // write kv pair to lsmStore. + ret = mLsmStore->Put(mIterator); + } while (false); + if (ret == BSS_OK) { + mNotify(mSkipList); + } + } + +private: + PQTableIteratorRef mIterator; + LsmStoreRef mLsmStore = nullptr; + PQSkipList mSkipList; + SkipListCompletedNotify mNotify; + std::atomic mResourceCleanupOwnershipTaken{ false }; +}; using SkiplistProcessorRef = std::shared_ptr; -using SkipListCompletedNotify = - std::function; class PQTable : public AutoCloseable { public: PQTable(const MemManagerRef &memManager, const ExecutorServicePtr &service, const LsmStoreRef &lsmStore, @@ -105,16 +137,23 @@ public: [this](const PQSkipList &item) { PollFlushingSegment(item); }); - if (force) { - std::static_pointer_cast(processor)->Run(); - } else { - auto ret = mService->Execute(std::static_pointer_cast(processor)); - if (UNLIKELY(!ret)) { - LOG_ERROR("Submit task failed" << mService->QueueSize()); - return BSS_ERR; + auto ret = mService->Execute(std::static_pointer_cast(processor)); + if (UNLIKELY(!ret)) { + LOG_ERROR("Submit task failed" << mService->QueueSize()); + return BSS_ERR; + } + // Checkpoint流程首先确保处于待淘汰队列的跳表Flush完成. + uint32_t times = NO_1; + auto start = std::chrono::high_resolution_clock::now(); + while (force && !IsSnapshotQueueEmpty()) { + if ((times++) % NO_100 == 0) { + auto end = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast(end - start); + double elapsed = duration.count() / 1e3; // 转换为ms + LOG_WARN("PQ table check snapshot queue cost time:" << elapsed << "ms."); } + usleep(NO_100000); // 100ms } - mSkipList = InitNewSkipList(); return BSS_OK; } @@ -143,6 +182,12 @@ private: mSnapshotQueue.PushBack(mSkipList); } + inline bool IsSnapshotQueueEmpty() + { + ReadLocker lock(&mRwLock); + return mSnapshotQueue.Empty(); + } + inline bool PollFlushingSegment(const PQSkipList &item) { WriteLocker lock(&mRwLock); @@ -170,41 +215,6 @@ private: uint16_t mStateId = 0; }; using PQTableRef = std::shared_ptr; - -class SkiplistProcessor : public Runnable { -public: - SkiplistProcessor(const PQTableIteratorRef iterator, const LsmStoreRef &lsmStore, const PQSkipList &skipList, - SkipListCompletedNotify notify) : mIterator(iterator), mLsmStore(lsmStore), - mSkipList(skipList), mNotify(notify) {} - - ~SkiplistProcessor() override = default; - - void Run() override - { - bool expect = false; - BResult ret = BSS_ERR; - do { - if (!mResourceCleanupOwnershipTaken.compare_exchange_strong(expect, true)) { - break; - } - if (UNLIKELY(mLsmStore == nullptr)) { - break; - } - // write kv pair to lsmStore. - ret = mLsmStore->Put(mIterator); - } while (false); - if (ret == BSS_OK) { - mNotify(mSkipList); - } - } - -private: - PQTableIteratorRef mIterator; - LsmStoreRef mLsmStore = nullptr; - PQSkipList mSkipList; - SkipListCompletedNotify mNotify; - std::atomic mResourceCleanupOwnershipTaken{ false }; -}; } } #endif // BOOST_SS_PQ_TABLE_H \ No newline at end of file diff --git a/src/core/lsm_store/file/file_store_impl.cpp b/src/core/lsm_store/file/file_store_impl.cpp index 3f93d27..b9062e9 100644 --- a/src/core/lsm_store/file/file_store_impl.cpp +++ b/src/core/lsm_store/file/file_store_impl.cpp @@ -166,12 +166,12 @@ void LsmStore::FinalizeVersionEdit(const CompactionProcessorRef &processor, std: editBuilder->DeleteFile(lvlInput->GetGroupRange(), processor->mCompaction->GetInputLevelId(), lvlInput->GetIdentifier()); } - processor->mOutputSize.clear(); + for (auto &outputLvlInput : processor->mCompaction->GetOutputLevelInputs()) { editBuilder->DeleteFile(outputLvlInput->GetGroupRange(), processor->mCompaction->GetOutputLevelId(), outputLvlInput->GetIdentifier()); } - + processor->mOutputSize.clear(); for (auto &output : processor->mOutputs) { if (!output->GetGroupRange()->Equals(mGroupRange)) { LOG_ERROR("New File should be current group range."); -- Gitee From 286c8e99d8fcf9e08dcba755b8512475fefea8db Mon Sep 17 00:00:00 2001 From: summer <1364868892@qq.com> Date: Fri, 12 Dec 2025 09:29:07 +0800 Subject: [PATCH 2/2] fix resources usage --- .../com/huawei/ock/bss/queue/OckDBPriorityQueueSetFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/plugin/state_store_plugin/src/com/huawei/ock/bss/queue/OckDBPriorityQueueSetFactory.java b/src/plugin/state_store_plugin/src/com/huawei/ock/bss/queue/OckDBPriorityQueueSetFactory.java index 508b708..465a23d 100644 --- a/src/plugin/state_store_plugin/src/com/huawei/ock/bss/queue/OckDBPriorityQueueSetFactory.java +++ b/src/plugin/state_store_plugin/src/com/huawei/ock/bss/queue/OckDBPriorityQueueSetFactory.java @@ -46,7 +46,7 @@ import javax.annotation.Nonnull; */ public class OckDBPriorityQueueSetFactory implements PriorityQueueSetFactory { @VisibleForTesting - static final int DEFAULT_CACHES_SIZE = 2040; + static final int DEFAULT_CACHES_SIZE = 20480; @Nonnull private final DataOutputSerializer sharedElementOutView; -- Gitee