diff --git a/functionsystem/src/function_master/instance_manager/group_manager_actor.cpp b/functionsystem/src/function_master/instance_manager/group_manager_actor.cpp index cea9cf7e09862537e5700ce2bdb5bec4860c53c5..adeb98d0a4a81f453c286392f307e4a7c71aad19 100644 --- a/functionsystem/src/function_master/instance_manager/group_manager_actor.cpp +++ b/functionsystem/src/function_master/instance_manager/group_manager_actor.cpp @@ -468,9 +468,19 @@ void GroupManagerActor::MasterBusiness::SuspendGroup(const litebus::AID &from, auto &groupID = killGroupReq->groupid(); auto &requestID = killGroupReq->grouprequestid(); auto srcInstanceID = killGroupReq->srcinstanceid(); - if (auto group = member_->groupCaches->GetGroupInfo(groupID); - group.second && group.first.second->status() != static_cast(GroupState::RUNNING)) { - auto reason = fmt::format("group({}) status is {} which not allow to be suspend", groupID, + auto group = member_->groupCaches->GetGroupInfo(groupID); + if (!group.second) { + auto reason = fmt::format("group({}) is not found, unable to be suspend", groupID); + YRLOG_ERROR("{}, request from {}", reason, from.HashString()); + return actor->InnerKillInstanceOnComplete(from, groupID, requestID, + Status(StatusCode::ERR_PARAM_INVALID, reason)); + } + if (group.first.second->status() == static_cast(GroupState::SUSPEND)) { + return actor->InnerKillInstanceOnComplete(from, groupID, requestID, Status::OK()); + } + if (group.first.second->status() != static_cast(GroupState::RUNNING)) { + auto reason = fmt::format("status of group(id:{} name:{}) is {} which not allow to be suspend", groupID, + group.first.second->groupopts().groupname(), ToString(static_cast(group.first.second->status()))); YRLOG_ERROR("{}, request from {}", reason, from.HashString()); return actor->InnerKillInstanceOnComplete(from, groupID, requestID, @@ -497,9 +507,19 @@ void GroupManagerActor::MasterBusiness::ResumeGroup(const litebus::AID &from, auto groupID = killGroupReq->groupid(); auto requestID = killGroupReq->grouprequestid(); auto srcInstanceID = killGroupReq->srcinstanceid(); - if (auto group = member_->groupCaches->GetGroupInfo(groupID); - group.second && group.first.second->status() != static_cast(GroupState::SUSPEND)) { - auto reason = fmt::format("group({}) status is {} which not allow to be resumed", groupID, + auto group = member_->groupCaches->GetGroupInfo(groupID); + if (!group.second) { + auto reason = fmt::format("group({}) is not found, unable to be resume", groupID); + YRLOG_ERROR("{}, request from {}", reason, from.HashString()); + return actor->InnerKillInstanceOnComplete(from, groupID, requestID, + Status(StatusCode::ERR_PARAM_INVALID, reason)); + } + if (group.first.second->status() == static_cast(GroupState::RUNNING)) { + return actor->InnerKillInstanceOnComplete(from, groupID, requestID, Status::OK()); + } + if (group.first.second->status() != static_cast(GroupState::SUSPEND)) { + auto reason = fmt::format("status of group(id:{} name:{}) is {} which not allow to be resumed", groupID, + group.first.second->groupopts().groupname(), ToString(static_cast(group.first.second->status()))); YRLOG_ERROR("{}, request from {}", reason, from.HashString()); return actor->InnerKillInstanceOnComplete(from, groupID, requestID, @@ -707,7 +727,7 @@ void GroupManagerActor::InnerKillInstanceOnComplete(const litebus::AID &from, co auto msg = ::messages::KillGroupResponse{}; msg.set_groupid(groupID); msg.set_code(static_cast(status.StatusCode())); - msg.set_message(status.GetMessage()); + msg.set_message(status.RawMessage()); msg.set_grouprequestid(requestID); YRLOG_INFO("send OnKillGroup of ({}) to {}, msg {}", groupID, from.HashString(), msg.message()); Send(from, "OnKillGroup", msg.SerializeAsString()); diff --git a/functionsystem/src/function_proxy/local_scheduler/instance_control/instance_ctrl_actor.cpp b/functionsystem/src/function_proxy/local_scheduler/instance_control/instance_ctrl_actor.cpp index 00a9872cf4685589c00227a2bbe1a80f242a2efa..1223a2cb10902ddafc475fc971858c62ca3cf81c 100644 --- a/functionsystem/src/function_proxy/local_scheduler/instance_control/instance_ctrl_actor.cpp +++ b/functionsystem/src/function_proxy/local_scheduler/instance_control/instance_ctrl_actor.cpp @@ -3622,7 +3622,7 @@ litebus::Future InstanceCtrlActor::Checkpoint(const std::string &instanc if (saveRsp.code() != common::ErrorCode::ERR_NONE) { YRLOG_ERROR("failed to save checkpoint state, error code: {}, msg: {}", fmt::underlying(saveRsp.code()), saveRsp.message()); - promise->SetValue(Status(static_cast(saveRsp.code()))); + promise->SetValue(Status(static_cast(saveRsp.code()), saveRsp.message())); return; } promise->SetValue(Status::OK()); diff --git a/functionsystem/src/function_proxy/local_scheduler/local_group_ctrl/local_group_ctrl_actor.cpp b/functionsystem/src/function_proxy/local_scheduler/local_group_ctrl/local_group_ctrl_actor.cpp index 5c41929adfe6f17b441de952b1d7c09ace4017a5..21483281f2ece2bb853422e9a70c2b09e1d080df 100644 --- a/functionsystem/src/function_proxy/local_scheduler/local_group_ctrl/local_group_ctrl_actor.cpp +++ b/functionsystem/src/function_proxy/local_scheduler/local_group_ctrl/local_group_ctrl_actor.cpp @@ -400,8 +400,8 @@ litebus::Future> LocalGroupCtrlActor::GroupSche return resp; } auto groupCtx = NewGroupCtx(groupInfo); - YRLOG_INFO("{}|{}|received group schedule request, id ({}) instance num {}", req->traceid(), req->requestid(), - groupInfo->groupid(), req->requests_size()); + YRLOG_INFO("{}|{}|received group schedule request, id ({}) instance num {} policy {}", req->traceid(), req->requestid(), + groupInfo->groupid(), req->requests_size(), groupInfo->groupopts().grouppolicy()); ASSERT_IF_NULL(groupOperator_); ASSERT_IF_NULL(scheduler_); auto future = ToGroupInstanceScheduling(groupCtx).Then( diff --git a/functionsystem/tests/unit/function_master/instance_manager/group_manager_test.cpp b/functionsystem/tests/unit/function_master/instance_manager/group_manager_test.cpp index a171944b6a4ae5b799217431f4a5a53c931066ba..e40ec25c1e947a2fd2f9186d9e9d37f50e85386c 100644 --- a/functionsystem/tests/unit/function_master/instance_manager/group_manager_test.cpp +++ b/functionsystem/tests/unit/function_master/instance_manager/group_manager_test.cpp @@ -1402,6 +1402,73 @@ TEST_F(GroupManagerTest, SuspendGroupInvalidState) litebus::Await(groupMgrActor->GetAID()); } +TEST_F(GroupManagerTest, SuspendGroupNoExist) +{ + auto mockMetaClient = std::make_shared(metaStoreServerHost_); + EXPECT_CALL(*mockMetaClient, Get).WillRepeatedly(testing::Return(litebus::Future>())); + auto mockGlobalScheduler = std::make_shared(); + auto groupMgrActor = std::make_shared(mockMetaClient, mockGlobalScheduler); + litebus::Spawn(groupMgrActor); + litebus::Async(groupMgrActor->GetAID(), &GroupManagerActor::UpdateLeaderInfo, + GetLeaderInfo(groupMgrActor->GetAID())); + { + auto outerKillerActor = std::make_shared(); + ASSERT_TRUE(litebus::Spawn(outerKillerActor).OK()); + auto respPromise = std::make_shared>(); + EXPECT_CALL(*outerKillerActor, OnKillGroupCallback) + .WillOnce(testing::Invoke( + [&respPromise](const messages::KillGroupResponse &rsp) { respPromise->SetValue(rsp); })); + // let killer send SuspendGroup + auto killGroupReq = std::make_shared(); + killGroupReq->set_groupid(GROUP_ID_1); + killGroupReq->set_signal(GROUP_SUSPEND_SIGNAL); + litebus::Async(outerKillerActor->GetAID(), &OuterKillerActor::SendKillGroup, groupMgrActor->GetAID(), + killGroupReq); + // will send kill group response back to outer killer + ASSERT_AWAIT_READY(respPromise->GetFuture()); + auto kgRsp = respPromise->GetFuture().Get(); + EXPECT_EQ(kgRsp.code(), static_cast(ERR_PARAM_INVALID)); + YRLOG_INFO("SUSPEND group response: {}", kgRsp.DebugString()); + litebus::Terminate(outerKillerActor->GetAID()); + litebus::Await(outerKillerActor->GetAID()); + } + litebus::Terminate(groupMgrActor->GetAID()); + litebus::Await(groupMgrActor->GetAID()); +} + +TEST_F(GroupManagerTest, SuspendGroupAlreadySuspend) +{ + auto mockMetaClient = std::make_shared(metaStoreServerHost_); + EXPECT_CALL(*mockMetaClient, Get).WillRepeatedly(testing::Return(litebus::Future>())); + auto mockGlobalScheduler = std::make_shared(); + auto groupMgrActor = std::make_shared(mockMetaClient, mockGlobalScheduler); + litebus::Spawn(groupMgrActor); + PrepareGroup(groupMgrActor, GroupState::SUSPEND, InstanceState::RUNNING); + { + auto outerKillerActor = std::make_shared(); + ASSERT_TRUE(litebus::Spawn(outerKillerActor).OK()); + auto respPromise = std::make_shared>(); + EXPECT_CALL(*outerKillerActor, OnKillGroupCallback) + .WillOnce(testing::Invoke( + [&respPromise](const messages::KillGroupResponse &rsp) { respPromise->SetValue(rsp); })); + // let killer send SuspendGroup + auto killGroupReq = std::make_shared(); + killGroupReq->set_groupid(GROUP_ID_1); + killGroupReq->set_signal(GROUP_SUSPEND_SIGNAL); + litebus::Async(outerKillerActor->GetAID(), &OuterKillerActor::SendKillGroup, groupMgrActor->GetAID(), + killGroupReq); + // will send kill group response back to outer killer + ASSERT_AWAIT_READY(respPromise->GetFuture()); + auto kgRsp = respPromise->GetFuture().Get(); + EXPECT_EQ(kgRsp.code(), 0); + YRLOG_INFO("SUSPEND group response: {}", kgRsp.DebugString()); + litebus::Terminate(outerKillerActor->GetAID()); + litebus::Await(outerKillerActor->GetAID()); + } + litebus::Terminate(groupMgrActor->GetAID()); + litebus::Await(groupMgrActor->GetAID()); +} + TEST_F(GroupManagerTest, ResumeGroup) { auto mockMetaClient = std::make_shared(metaStoreServerHost_); @@ -1478,7 +1545,7 @@ TEST_F(GroupManagerTest, ResumeGroupInvalidState) auto mockGlobalScheduler = std::make_shared(); auto groupMgrActor = std::make_shared(mockMetaClient, mockGlobalScheduler); litebus::Spawn(groupMgrActor); - PrepareGroup(groupMgrActor, GroupState::RUNNING, InstanceState::RUNNING); + PrepareGroup(groupMgrActor, GroupState::SCHEDULING, InstanceState::RUNNING); auto [group, exist] = groupMgrActor->member_->groupCaches->GetGroupInfo(GROUP_ID_1); EXPECT_EQ(exist, true); @@ -1502,6 +1569,68 @@ TEST_F(GroupManagerTest, ResumeGroupInvalidState) litebus::Await(groupMgrActor->GetAID()); } +TEST_F(GroupManagerTest, ResumeGroupNotExistState) +{ + auto mockMetaClient = std::make_shared(metaStoreServerHost_); + EXPECT_CALL(*mockMetaClient, Get).WillRepeatedly(testing::Return(litebus::Future>())); + auto mockGlobalScheduler = std::make_shared(); + auto groupMgrActor = std::make_shared(mockMetaClient, mockGlobalScheduler); + litebus::Spawn(groupMgrActor); + litebus::Async(groupMgrActor->GetAID(), &GroupManagerActor::UpdateLeaderInfo, + GetLeaderInfo(groupMgrActor->GetAID())); + auto [group, exist] = groupMgrActor->member_->groupCaches->GetGroupInfo(GROUP_ID_1); + EXPECT_EQ(exist, false); + auto outerKillerActor = std::make_shared(); + ASSERT_TRUE(litebus::Spawn(outerKillerActor).OK()); + auto respPromise = std::make_shared>(); + EXPECT_CALL(*outerKillerActor, OnKillGroupCallback) + .WillOnce( + testing::Invoke([&respPromise](const messages::KillGroupResponse &rsp) { respPromise->SetValue(rsp); })); + auto killGroupReq = std::make_shared(); + killGroupReq->set_groupid(GROUP_ID_1); + killGroupReq->set_signal(GROUP_RESUME_SIGNAL); + litebus::Async(outerKillerActor->GetAID(), &OuterKillerActor::SendKillGroup, groupMgrActor->GetAID(), killGroupReq); + ASSERT_AWAIT_READY(respPromise->GetFuture()); + auto kgRsp = respPromise->GetFuture().Get(); + EXPECT_EQ(kgRsp.code(), static_cast(ERR_PARAM_INVALID)); + YRLOG_INFO("RESUME group response: {}", kgRsp.DebugString()); + litebus::Terminate(outerKillerActor->GetAID()); + litebus::Await(outerKillerActor->GetAID()); + litebus::Terminate(groupMgrActor->GetAID()); + litebus::Await(groupMgrActor->GetAID()); +} + +TEST_F(GroupManagerTest, ResumeGroupAlreadyRunning) +{ + auto mockMetaClient = std::make_shared(metaStoreServerHost_); + EXPECT_CALL(*mockMetaClient, Get).WillRepeatedly(testing::Return(litebus::Future>())); + auto mockGlobalScheduler = std::make_shared(); + auto groupMgrActor = std::make_shared(mockMetaClient, mockGlobalScheduler); + litebus::Spawn(groupMgrActor); + PrepareGroup(groupMgrActor, GroupState::RUNNING, InstanceState::RUNNING); + + auto [group, exist] = groupMgrActor->member_->groupCaches->GetGroupInfo(GROUP_ID_1); + EXPECT_EQ(exist, true); + auto outerKillerActor = std::make_shared(); + ASSERT_TRUE(litebus::Spawn(outerKillerActor).OK()); + auto respPromise = std::make_shared>(); + EXPECT_CALL(*outerKillerActor, OnKillGroupCallback) + .WillOnce( + testing::Invoke([&respPromise](const messages::KillGroupResponse &rsp) { respPromise->SetValue(rsp); })); + auto killGroupReq = std::make_shared(); + killGroupReq->set_groupid(GROUP_ID_1); + killGroupReq->set_signal(GROUP_RESUME_SIGNAL); + litebus::Async(outerKillerActor->GetAID(), &OuterKillerActor::SendKillGroup, groupMgrActor->GetAID(), killGroupReq); + ASSERT_AWAIT_READY(respPromise->GetFuture()); + auto kgRsp = respPromise->GetFuture().Get(); + EXPECT_EQ(kgRsp.code(), 0); + YRLOG_INFO("RESUME group response: {}", kgRsp.DebugString()); + litebus::Terminate(outerKillerActor->GetAID()); + litebus::Await(outerKillerActor->GetAID()); + litebus::Terminate(groupMgrActor->GetAID()); + litebus::Await(groupMgrActor->GetAID()); +} + TEST_F(GroupManagerTest, OnInstancePutTest) { auto mockGlobalScheduler = std::make_shared();