diff --git a/frameworks/libs/distributeddb/communicator/include/communicator_aggregator.h b/frameworks/libs/distributeddb/communicator/include/communicator_aggregator.h index 8ecc02c01fe08029bac06846ce825a044d418f3a..f8c42d85755f5cd7af85db53e678e9a174e4a1a2 100644 --- a/frameworks/libs/distributeddb/communicator/include/communicator_aggregator.h +++ b/frameworks/libs/distributeddb/communicator/include/communicator_aggregator.h @@ -119,7 +119,7 @@ private: const ParseResult &inResult, const DataUserInfoProc &userInfoProc); // Function with suffix NoMutex should be called with mutex in the caller - int TryDeliverAppLayerFrameToCommunicatorNoMutex(const DataUserInfoProc &userInfoProc, + int TryDeliverAppLayerFrameToCommunicatorNoMutex( const std::string &srcTarget, SerialBuffer *&inFrameBuffer, const LabelType &toLabel, const UserInfo &userInfo); // Auxiliary function for cutting short primary function @@ -168,8 +168,7 @@ private: const std::string &device, UserInfo &userInfo); int ReTryDeliverAppLayerFrameOnCommunicatorNotFound(const ReceiveBytesInfo &receiveBytesInfo, - SerialBuffer *&inFrameBuffer, const ParseResult &inResult, const DataUserInfoProc &userInfoProc, - const UserInfo &userInfo); + SerialBuffer *&inFrameBuffer, const ParseResult &inResult, const UserInfo &userInfo); DECLARE_OBJECT_TAG(CommunicatorAggregator); diff --git a/frameworks/libs/distributeddb/communicator/include/iadapter.h b/frameworks/libs/distributeddb/communicator/include/iadapter.h index af6573006772a7014e2c9a2eb56ca6b7d3ff10b9..1862aea3e7c4e655707fe71489d025d6019c4924 100644 --- a/frameworks/libs/distributeddb/communicator/include/iadapter.h +++ b/frameworks/libs/distributeddb/communicator/include/iadapter.h @@ -87,6 +87,11 @@ public: virtual std::shared_ptr GetExtendHeaderHandle(const ExtendInfo ¶mInfo) = 0; + virtual DBStatus PreprocessMessage(const AccessInfos &accessInfos) + { + return OK; + } + virtual ~IAdapter() {}; }; } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/communicator/include/network_adapter.h b/frameworks/libs/distributeddb/communicator/include/network_adapter.h index fc1c390ab95b69d5b0527f3b407f11cd420a302b..03ebe8ae592359c3e3a8bbe85112113a03b07f9c 100644 --- a/frameworks/libs/distributeddb/communicator/include/network_adapter.h +++ b/frameworks/libs/distributeddb/communicator/include/network_adapter.h @@ -55,6 +55,7 @@ public: bool IsDeviceOnline(const std::string &device) override; std::shared_ptr GetExtendHeaderHandle(const ExtendInfo ¶mInfo) override; + DBStatus PreprocessMessage(const AccessInfos &accessInfos) override; private: void OnDataReceiveHandler(const DeviceInfos &srcDevInfo, const uint8_t *data, uint32_t length); diff --git a/frameworks/libs/distributeddb/communicator/src/communicator.cpp b/frameworks/libs/distributeddb/communicator/src/communicator.cpp index e9f263706907c5965322c5607373f9fd23154519..84129a9c59099776e165ccfb4e16e9df8dfc1bcf 100644 --- a/frameworks/libs/distributeddb/communicator/src/communicator.cpp +++ b/frameworks/libs/distributeddb/communicator/src/communicator.cpp @@ -152,9 +152,18 @@ int Communicator::SendMessage(const std::string &dstTarget, const Message *inMsg return errCode; } -void Communicator::OnBufferReceive(const std::string &srcTarget, const SerialBuffer *inBuf, - const std::string &sendUser) +int Communicator::OnBufferReceive(const std::string &srcTarget, const SerialBuffer *inBuf, + const std::string &sendUser, IAdapter *iAdapter) { + if (iAdapter != nullptr) { + AccessInfos accessInfos; + OnGetAccessInfos(accessInfos); + DBStatus ret = iAdapter->PreprocessMessage(accessInfos); + if (ret != OK) { + LOGW("[Comm][PreprocessMessage] ret:%d", ret); + return -E_FEEDBACK_DB_CLOSING; + } + } std::lock_guard messageHandleLockGuard(messageHandleMutex_); if (!srcTarget.empty() && inBuf != nullptr && onMessageHandle_) { int error = E_OK; @@ -173,7 +182,7 @@ void Communicator::OnBufferReceive(const std::string &srcTarget, const SerialBuf } else if (error == -E_NOT_REGISTER) { TriggerUnknownMessageFeedback(srcTarget, message); } - return; + return E_OK; } LOGI("[Comm][Receive] label=%.3s, srcTarget=%s{private}.", VEC_TO_STR(commLabel_), srcTarget.c_str()); onMessageHandle_(srcTarget, message); @@ -185,6 +194,7 @@ void Communicator::OnBufferReceive(const std::string &srcTarget, const SerialBuf inBuf = nullptr; } } + return E_OK; } void Communicator::OnConnectChange(const std::string &target, bool isConnect) diff --git a/frameworks/libs/distributeddb/communicator/src/communicator.h b/frameworks/libs/distributeddb/communicator/src/communicator.h index 23577dbf9a5c1f6a9dc46d2e045d8dd022cde740..27210f4348d59e7ea1ba69c33aaf017f52267b8b 100644 --- a/frameworks/libs/distributeddb/communicator/src/communicator.h +++ b/frameworks/libs/distributeddb/communicator/src/communicator.h @@ -57,7 +57,8 @@ public: const OnSendEnd &onEnd) override; // Call by CommunicatorAggregator directly - void OnBufferReceive(const std::string &srcTarget, const SerialBuffer *inBuf, const std::string &sendUser); + int OnBufferReceive(const std::string &srcTarget, const SerialBuffer *inBuf, const std::string &sendUser, + IAdapter *iAdapter); // Call by CommunicatorAggregator directly void OnConnectChange(const std::string &target, bool isConnect); diff --git a/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp b/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp index 951bbece72d4cf15161b2b7ce7366499d4d466f2..73f6f81bd11dd5c5076991f0cdac26ea2f6e3aa8 100644 --- a/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp +++ b/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp @@ -276,7 +276,8 @@ void CommunicatorAggregator::ActivateCommunicator(const LabelType &commLabel, co // Do Redeliver, the communicator is responsible to deal with the frame std::list framesToRedeliver = retainer_.FetchFramesForSpecificCommunicator(commLabel); for (auto &entry : framesToRedeliver) { - commMap_[userId].at(commLabel).first->OnBufferReceive(entry.srcTarget, entry.buffer, entry.sendUser); + commMap_[userId].at(commLabel).first->OnBufferReceive(entry.srcTarget, entry.buffer, entry.sendUser, + nullptr); } } @@ -691,7 +692,7 @@ int CommunicatorAggregator::OnAppLayerFrameReceive(const ReceiveBytesInfo &recei } { std::lock_guard commMapLockGuard(commMapMutex_); - int errCode = TryDeliverAppLayerFrameToCommunicatorNoMutex(userInfoProc, receiveBytesInfo.srcTarget, + int errCode = TryDeliverAppLayerFrameToCommunicatorNoMutex(receiveBytesInfo.srcTarget, inFrameBuffer, toLabel, userInfo); if (errCode == E_OK) { // Attention: Here is equal to E_OK return E_OK; @@ -704,8 +705,7 @@ int CommunicatorAggregator::OnAppLayerFrameReceive(const ReceiveBytesInfo &recei } } LOGI("[CommAggr][AppReceive] Communicator of %.3s not found or nonactivated.", VEC_TO_STR(toLabel)); - return ReTryDeliverAppLayerFrameOnCommunicatorNotFound(receiveBytesInfo, inFrameBuffer, inResult, userInfoProc, - userInfo); + return ReTryDeliverAppLayerFrameOnCommunicatorNotFound(receiveBytesInfo, inFrameBuffer, inResult, userInfo); } int CommunicatorAggregator::GetDataUserId(const ParseResult &inResult, const LabelType &toLabel, @@ -732,17 +732,20 @@ int CommunicatorAggregator::GetDataUserId(const ParseResult &inResult, const Lab return E_OK; } -int CommunicatorAggregator::TryDeliverAppLayerFrameToCommunicatorNoMutex(const DataUserInfoProc &userInfoProc, +int CommunicatorAggregator::TryDeliverAppLayerFrameToCommunicatorNoMutex( const std::string &srcTarget, SerialBuffer *&inFrameBuffer, const LabelType &toLabel, const UserInfo &userInfo) { // Ignore nonactivated communicator, which is regarded as inexistent const std::string &sendUser = userInfo.sendUser; const std::string &receiveUser = userInfo.receiveUser; if (commMap_[receiveUser].count(toLabel) != 0 && commMap_[receiveUser].at(toLabel).second) { - commMap_[receiveUser].at(toLabel).first->OnBufferReceive(srcTarget, inFrameBuffer, sendUser); + int ret = commMap_[receiveUser].at(toLabel).first->OnBufferReceive(srcTarget, inFrameBuffer, sendUser, + adapterHandle_); // Frame handed over to communicator who is responsible to delete it. The frame is deleted here after return. - inFrameBuffer = nullptr; - return E_OK; + if (ret == E_OK) { + inFrameBuffer = nullptr; + } + return ret; } Communicator *communicator = nullptr; bool isEmpty = false; @@ -760,19 +763,12 @@ int CommunicatorAggregator::TryDeliverAppLayerFrameToCommunicatorNoMutex(const D break; } } - if (communicator != nullptr && userInfoProc.processCommunicator != nullptr) { - AccessInfos accessInfos; - communicator->OnGetAccessInfos(accessInfos); - DBStatus ret = userInfoProc.processCommunicator->PreprocessMessage(accessInfos); - if (ret != OK) { - LOGW("[CommAggr][TryDeliver] extend info notify ret:%d", ret); - return -E_FEEDBACK_DB_CLOSING; - } - } if (communicator != nullptr && (receiveUser.empty() || isEmpty)) { - communicator->OnBufferReceive(srcTarget, inFrameBuffer, sendUser); - inFrameBuffer = nullptr; - return E_OK; + int ret = communicator->OnBufferReceive(srcTarget, inFrameBuffer, sendUser, adapterHandle_); + if (ret == E_OK) { + inFrameBuffer = nullptr; + } + return ret; } LOGE("[CommAggr][TryDeliver] Communicator not found"); return -E_NOT_FOUND; @@ -1170,8 +1166,7 @@ void CommunicatorAggregator::ClearOnlineLabel() } int CommunicatorAggregator::ReTryDeliverAppLayerFrameOnCommunicatorNotFound(const ReceiveBytesInfo &receiveBytesInfo, - SerialBuffer *&inFrameBuffer, const ParseResult &inResult, const DataUserInfoProc &userInfoProc, - const UserInfo &userInfo) + SerialBuffer *&inFrameBuffer, const ParseResult &inResult, const UserInfo &userInfo) { LabelType toLabel = inResult.GetCommLabel(); int errCode = -E_NOT_FOUND; @@ -1186,7 +1181,7 @@ int CommunicatorAggregator::ReTryDeliverAppLayerFrameOnCommunicatorNotFound(cons } // Here we have to lock commMapMutex_ and search communicator again. std::lock_guard commMapLockGuard(commMapMutex_); - int errCodeAgain = TryDeliverAppLayerFrameToCommunicatorNoMutex(userInfoProc, receiveBytesInfo.srcTarget, + int errCodeAgain = TryDeliverAppLayerFrameToCommunicatorNoMutex(receiveBytesInfo.srcTarget, inFrameBuffer, toLabel, userInfo); if (errCodeAgain == E_OK) { // Attention: Here is equal to E_OK. LOGI("[CommAggr][AppReceive] Communicator of %.3s found after try again(rare case).", VEC_TO_STR(toLabel)); diff --git a/frameworks/libs/distributeddb/communicator/src/network_adapter.cpp b/frameworks/libs/distributeddb/communicator/src/network_adapter.cpp index e594819246d31aa2d2d1307d9598def1fbb43aa1..2839cfaa0cb1c6e1365400e89ee28d5589ae9b7b 100644 --- a/frameworks/libs/distributeddb/communicator/src/network_adapter.cpp +++ b/frameworks/libs/distributeddb/communicator/src/network_adapter.cpp @@ -444,4 +444,9 @@ std::shared_ptr NetworkAdapter::GetExtendHeaderHandle(const { return processCommunicator_->GetExtendHeaderHandle(paramInfo); } + +DBStatus NetworkAdapter::PreprocessMessage(const AccessInfos &accessInfos) +{ + return processCommunicator_->PreprocessMessage(accessInfos); +} } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/test/unittest/common/communicator/adapter_stub.cpp b/frameworks/libs/distributeddb/test/unittest/common/communicator/adapter_stub.cpp index 5c477deafdb2ac32ebf13373d7c7e0bbb2976182..206d8eb62687655d344258fcc9bf00f28a3be3b5 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/communicator/adapter_stub.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/communicator/adapter_stub.cpp @@ -471,4 +471,17 @@ void AdapterStub::SetUserInfo(const std::vector &userInfo) std::vector AdapterStub::GetUserInfo() { return userInfo_; +} + +void AdapterStub::RegForkPreprocessMessage(const std::function &func) +{ + forkPreprocessMessage_ = func; +} + +DBStatus AdapterStub::PreprocessMessage(const AccessInfos &accessInfos) +{ + if (forkPreprocessMessage_ != nullptr) { + return forkPreprocessMessage_(accessInfos); + } + return OK; } \ No newline at end of file diff --git a/frameworks/libs/distributeddb/test/unittest/common/communicator/adapter_stub.h b/frameworks/libs/distributeddb/test/unittest/common/communicator/adapter_stub.h index 4d719aada1da4ebcccb7be15c3c46d6705bb9e9e..347a0a52ee46531f0aa44d90215ebbe037080c64 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/communicator/adapter_stub.h +++ b/frameworks/libs/distributeddb/test/unittest/common/communicator/adapter_stub.h @@ -53,6 +53,9 @@ public: std::shared_ptr GetExtendHeaderHandle(const ExtendInfo ¶mInfo) override; + DBStatus PreprocessMessage(const AccessInfos &accessInfos) override; + void RegForkPreprocessMessage(const std::function &func); + void GetDataHeadInfo(const uint8_t *data, uint32_t &headLength); void GetDataUserInfo(const uint8_t *data, std::vector &userInfos); @@ -144,6 +147,7 @@ private: OnSendBytes onSendBytes_; std::vector userInfo_; + std::function forkPreprocessMessage_; }; } diff --git a/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_deep_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_deep_test.cpp index 39e449f42a52b7c9cd770724385dcabe0eb6373f..6d7d2d283500ec8b96f0d28fee10628e473395b5 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_deep_test.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_deep_test.cpp @@ -570,6 +570,54 @@ HWTEST_F(DistributedDBCommunicatorDeepTest, Fragment004, TestSize.Level2) recvMsgForBB = nullptr; } +/** + * @tc.name: Fragment005 + * @tc.desc: Test PreprocessMessage return error code + * @tc.type: FUNC + * @tc.require: + * @tc.author: bty + */ +HWTEST_F(DistributedDBCommunicatorDeepTest, Fragment005, TestSize.Level2) +{ + // fork callback + Message *recvMsgForBB = nullptr; + g_commBB->RegOnMessageCallback([&recvMsgForBB](const std::string &srcTarget, Message *inMsg) { + recvMsgForBB = inMsg; + }, nullptr); + g_commBB->RegOnAccessInfosCallback([](AccessInfos &accessInfos) { + accessInfos.appId = "XP"; + }, nullptr); + bool isCall = false; + g_envDeviceB.adapterHandle->RegForkPreprocessMessage([&isCall](const AccessInfos &AccessInfos) { + isCall = true; + EXPECT_EQ(AccessInfos.appId, "XP"); + return BUSY; + }); + + /** + * @tc.steps: step1. connect device A with device B + */ + AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle); + + /** + * @tc.steps: step2. device A send message(registered and giant) to device B using communicator AB + * @tc.expected: step2. communicator BB ignore the message + */ + const uint32_t dataLength = 13 * 1024; // 13 K, 1024 is scale + Message *sendMsgForAB = BuildRegedGiantMessage(dataLength); + ASSERT_NE(sendMsgForAB, nullptr); + SendConfig conf = {false, false, 0}; + int errCode = g_commAB->SendMessage(DEVICE_NAME_B, sendMsgForAB, conf); + EXPECT_EQ(errCode, E_OK); + std::this_thread::sleep_for(std::chrono::milliseconds(2600)); // Wait 2600 ms to make sure send done + ASSERT_EQ(recvMsgForBB, nullptr); + EXPECT_TRUE(isCall); + g_commBB->RegOnMessageCallback(nullptr, nullptr); + g_commBB->RegOnAccessInfosCallback(nullptr, nullptr); + g_envDeviceB.adapterHandle->RegForkPreprocessMessage(nullptr); + AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle); +} + namespace { void ClearPreviousTestCaseInfluence() {