diff --git a/functionsystem/src/function_agent/agent_service_actor.cpp b/functionsystem/src/function_agent/agent_service_actor.cpp index a7ebb6ff4cd7c2693d738fa9c5b08bb7dbaa4ad6..490c86507f911872d623a0572e9759d054b6f1fb 100644 --- a/functionsystem/src/function_agent/agent_service_actor.cpp +++ b/functionsystem/src/function_agent/agent_service_actor.cpp @@ -125,8 +125,13 @@ void AgentServiceActor::DownloadCodeAndStartRuntime( auto deployObject = deployObjects->front(); deployObjects->pop(); - // every time before download code, code refer should increase - AddCodeRefer(deployObject.destination, deployObject.request->instanceid(), deployObject.deployer); + + // working dir don't need to increase code refer + if (!IsDelegateWorkingDirPath(deployObject)) { + // every time before download code, code refer should increase + AddCodeRefer(deployObject.destination, deployObject.request->instanceid(), deployObject.deployer); + } + bool isMonopoly = req->scheduleoption().schedpolicyname() == MONOPOLY_SCHEDULE; if (auto iter = deployingObjects_.find(deployObject.destination); iter != deployingObjects_.end()) { // code package is downloading @@ -325,7 +330,12 @@ std::shared_ptr> AgentServiceActor::BuildDeployer destination = config->deploymentconfig().deploydir(); } if (info.Get().storageType == WORKING_DIR_STORAGE_TYPE) { - // pass unziped woring dir to runtime_manager + if (destination == info.Get().codePath) { + // delegate working dir + req->mutable_funcdeployspec()->set_deploydir(destination); + req->mutable_funcdeployspec()->set_storagetype(WORKING_DIR_STORAGE_TYPE); + } + // pass unzipped working dir to runtime_manager (void)req->mutable_createoptions()->insert({ UNZIPPED_WORKING_DIR, destination }); // pass origin config (src working dir zip file) (void)req->mutable_createoptions()->insert({ YR_WORKING_DIR, info.Get().codePath }); @@ -1193,4 +1203,11 @@ void AgentServiceActor::SetNetworkIsolationRequest(const litebus::AID &, std::st resp.set_code(static_cast(StatusCode::FAILED)); (void)Send(localSchedFuncAgentMgrAID_, "SetNetworkIsolationResponse", resp.SerializeAsString()); } + +bool AgentServiceActor::IsDelegateWorkingDirPath(const DeployerParameters &deployObject) +{ + // if th bucket id (working dir) is the code destination path, the deployObject is for a delegate working dir code + return deployObject.request->deploymentconfig().bucketid() == deployObject.destination; +} + } // namespace functionsystem::function_agent \ No newline at end of file diff --git a/functionsystem/src/function_agent/agent_service_actor.h b/functionsystem/src/function_agent/agent_service_actor.h index b35d19383d1d828884807f313ad5370cfbf73503..4ac7b1616451a2cc350b3302687f7c14e5375a6e 100644 --- a/functionsystem/src/function_agent/agent_service_actor.h +++ b/functionsystem/src/function_agent/agent_service_actor.h @@ -435,6 +435,8 @@ private: litebus::Future AsyncDownloadCode(const std::shared_ptr &request, const std::shared_ptr &deployer); + bool IsDelegateWorkingDirPath(const DeployerParameters &deployObject); + private: std::unordered_map> deployers_; diff --git a/functionsystem/src/function_agent/code_deployer/working_dir_deployer.cpp b/functionsystem/src/function_agent/code_deployer/working_dir_deployer.cpp index 1e120d58bfc42d73fd82715710b6c08bf7ee341b..ef3a91cabb62433feac4340c5dab9ca281c6f37d 100644 --- a/functionsystem/src/function_agent/code_deployer/working_dir_deployer.cpp +++ b/functionsystem/src/function_agent/code_deployer/working_dir_deployer.cpp @@ -85,6 +85,12 @@ std::string WorkingDirDeployer::GetDestination( if (appID.empty() && uriFile.empty()) { return ""; } + + if (IsDir(uriFile)) { + YRLOG_DEBUG("{}|delegate working dir is a path, use it as destination: {}", appID, uriFile); + return uriFile; + } + std::string workingDir; if (!deployDir.empty()) { std::string appDir = litebus::os::Join(deployDir, APP_FOLDER_PREFIX); @@ -126,7 +132,7 @@ bool WorkingDirDeployer::IsDeployed(const std::string &destination, [[maybe_unus DeployResult WorkingDirDeployer::Deploy(const std::shared_ptr &request) { // 'working_dir' storage type objectid (src appID = instanceID) - // bucketid (src codePath, working dir zip file) + // bucketid (src codePath, working dir zip file or delegated working dir) auto &config = request->deploymentconfig(); DeployResult result; result.destination = GetDestination(config.deploydir(), config.bucketid(), config.objectid()); @@ -135,6 +141,11 @@ DeployResult WorkingDirDeployer::Deploy(const std::shared_ptr accessor = ResourceAccessorFactory::CreateAccessor(config.bucketid()); // like: "file:///home/xxx/xxy.zip" diff --git a/functionsystem/src/runtime_manager/executor/runtime_executor.cpp b/functionsystem/src/runtime_manager/executor/runtime_executor.cpp index de0e433523f5c69de931e71c22db33ed2c8b0e41..9276b20504a0c5b0dd229f12bece5a2c2ee16533 100644 --- a/functionsystem/src/runtime_manager/executor/runtime_executor.cpp +++ b/functionsystem/src/runtime_manager/executor/runtime_executor.cpp @@ -116,6 +116,8 @@ const static std::string ASCEND_RT_VISIBLE_DEVICES = "ASCEND_RT_VISIBLE_DEVICES" const std::string CONDA_PROGRAM_NAME = "conda"; const std::string CONDA_ENV_FILE = "env.yaml"; +const std::string CHDIR_PATH_CONFIG = "CHDIR_PATH"; + // Exclude environment variables passed to the runtime const std::vector EXCLUDE_ENV_KEYS_PASSED_TO_RUNTIME = { UNZIPPED_WORKING_DIR @@ -139,6 +141,15 @@ std::function SetRuntimeIdentity(int userID, int groupID) }; } +std::function ChdirHook(std::string dir) +{ + return [dir]() { + if (chdir(dir.c_str()) != 0) { + std::cerr << "failed to chdir: " << dir << ", get errno: " << errno << std::endl; + } + }; +} + std::function SetSubProcessPgid() { return []() { @@ -1020,6 +1031,11 @@ std::pair> RuntimeExecutor::GetCppBuildArgs( std::string address = config_.ip + ":" + port; auto confPath = litebus::os::Join(config_.runtimeConfigPath, "runtime.json"); + auto resultPair = HandleWorkingDirectory(request, request->runtimeinstanceinfo()); + if (resultPair.first.IsError()) { + return { resultPair.first, {} }; + } + return { Status::OK(), { CPP_PROGRAM_NAME, RUNTIME_ID_ARG_PREFIX + request->runtimeinstanceinfo().runtimeid(), LOG_LEVEL_PREFIX + config_.runtimeLogLevel, @@ -1121,12 +1137,9 @@ std::pair RuntimeExecutor::HandleWorkingDirectory( "" }; } - if (chdir(workingDirIter->second.c_str()) != 0) { - YRLOG_ERROR("{}|{}|enter working dir failed, path: {}", info.traceid(), info.requestid(), - workingDirIter->second); - return { Status(StatusCode::RUNTIME_MANAGER_WORKING_DIR_FOR_APP_NOTFOUND, "job working dir is invalid"), "" }; - } - YRLOG_DEBUG("change python working dir to {}", workingDirIter->second); + (*request->mutable_runtimeinstanceinfo()->mutable_deploymentconfig()->mutable_deployoptions())[CHDIR_PATH_CONFIG] = + workingDirIter->second; + YRLOG_DEBUG("change working dir to {}", workingDirIter->second); return { Status::OK(), workingDirIter->second }; } @@ -1470,12 +1483,16 @@ std::pair> RuntimeExecutor::GetPosixCustomBuild "params working dir or unzipped dir is empty"), {} }; } - if (chdir(iter->second.c_str()) != 0) { + if (!litebus::os::ExistPath(iter->second)) { YRLOG_ERROR("{}|{}|enter working dir failed, path: {}", request->runtimeinstanceinfo().traceid(), request->runtimeinstanceinfo().requestid(), iter->second); return { Status(StatusCode::RUNTIME_MANAGER_WORKING_DIR_FOR_APP_NOTFOUND, "job working dir is invalid"), {} }; } + + (*request->mutable_runtimeinstanceinfo() + ->mutable_deploymentconfig() + ->mutable_deployoptions())[CHDIR_PATH_CONFIG] = iter->second; YRLOG_DEBUG("change job entrypoint execute dir to {}", iter->second); return { Status::OK(), {} }; } @@ -1487,11 +1504,15 @@ std::pair> RuntimeExecutor::GetPosixCustomBuild request->runtimeinstanceinfo().requestid()); return { Status(StatusCode::RUNTIME_MANAGER_EXECUTABLE_PATH_INVALID, "entryFile is empty"), {} }; } - if (chdir(entryFile.c_str()) != 0) { + + if (!litebus::os::ExistPath(entryFile)) { YRLOG_ERROR("{}|{}|enter entryfile path failed, path: {}", request->runtimeinstanceinfo().traceid(), request->runtimeinstanceinfo().requestid(), entryFile); return { Status(StatusCode::RUNTIME_MANAGER_EXECUTABLE_PATH_INVALID, "chdir entryfile path failed"), {} }; } + + (*request->mutable_runtimeinstanceinfo()->mutable_deploymentconfig()->mutable_deployoptions())[CHDIR_PATH_CONFIG] = + entryFile; YRLOG_DEBUG("entrypoint: {}", entryFile + "/bootstrap"); return { Status::OK(), { entryFile + "/bootstrap" } }; } @@ -1514,6 +1535,12 @@ std::vector> RuntimeExecutor::BuildInitHook( (void)initHook.emplace_back(CondaActivate(it->second, it2->second)); } } + + if (auto iter = deployOptions.find(CHDIR_PATH_CONFIG); iter != deployOptions.end() && !iter->second.empty()) { + YRLOG_DEBUG("{}|{} process add chdir({}) hook", request->runtimeinstanceinfo().requestid(), + request->runtimeinstanceinfo().instanceid(), iter->second); + (void)initHook.emplace_back(ChdirHook(iter->second)); + } return initHook; } diff --git a/functionsystem/tests/unit/function_agent/agent_service/agent_service_actor_test.cpp b/functionsystem/tests/unit/function_agent/agent_service/agent_service_actor_test.cpp index f5757f2c19b8c52278a3853db1e2e2774e137e73..677cf5e4435b1f313611458c33c125ec1d4ca2de 100644 --- a/functionsystem/tests/unit/function_agent/agent_service/agent_service_actor_test.cpp +++ b/functionsystem/tests/unit/function_agent/agent_service/agent_service_actor_test.cpp @@ -2762,4 +2762,48 @@ TEST_F(AgentServiceActorTest, ConfigCodeAgingTimeTest) EXPECT_TRUE(dstActor_->codeReferInfos_->find("/tmp/test2") != dstActor_->codeReferInfos_->end()); EXPECT_TRUE(dstActor_->codeReferInfos_->find("/tmp/test3") == dstActor_->codeReferInfos_->end()); } + +TEST_F(AgentServiceActorTest, DeployInstanceWithWorkingDirCpp) +{ + PrepareWorkingDir("/tmp/working_dir-tmp"); + auto deployInstanceReq = std::make_unique(); + deployInstanceReq->set_requestid(TEST_REQUEST_ID); // as appID + deployInstanceReq->set_instanceid(TEST_INSTANCE_ID); + deployInstanceReq->set_language("cpp11"); + + auto spec = deployInstanceReq->mutable_funcdeployspec(); + spec->set_storagetype(function_agent::WORKING_DIR_STORAGE_TYPE); + auto deployDir = "/home/sn/function/package/xxxz"; + std::string destination = "/tmp/working_dir-tmp/file.zip"; + (void)litebus::os::Rmdir(deployDir); + EXPECT_TRUE(litebus::os::ExistPath(destination)); + spec->set_deploydir(deployDir); + + deployInstanceReq->mutable_createoptions()->insert( + { "DELEGATE_DOWNLOAD", + R"({"appId":"userWorkingDirCode001", "storage_type":"working_dir", "code_path":"/tmp/working_dir-tmp/"})" }); + messages::StartInstanceResponse startInstanceResponse; + startInstanceResponse.set_code(StatusCode::SUCCESS); + startInstanceResponse.set_requestid(TEST_REQUEST_ID); + startInstanceResponse.mutable_startruntimeinstanceresponse()->set_runtimeid(TEST_RUNTIME_ID); + EXPECT_CALL(*testRuntimeManager_, MockStartInstanceResponse) + .WillOnce(Return(startInstanceResponse.SerializeAsString())); + + testFuncAgentMgrActor_->ResetDeployInstanceResponse(); + testFuncAgentMgrActor_->SendRequestToAgentServiceActor(dstActor_->GetAID(), "DeployInstance", + std::move(deployInstanceReq->SerializeAsString())); + ASSERT_AWAIT_TRUE( + [&]() -> bool { return testFuncAgentMgrActor_->GetDeployInstanceResponse()->requestid() == TEST_REQUEST_ID; }); + EXPECT_TRUE(litebus::os::ExistPath(destination)); // app deployed + + auto startInstanceRequest = std::make_shared(); + startInstanceRequest->ParseFromString(testRuntimeManager_->promiseOfStartInstanceRequest.GetFuture().Get()); + YRLOG_DEBUG(startInstanceRequest->ShortDebugString()); + EXPECT_EQ( + startInstanceRequest->runtimeinstanceinfo().runtimeconfig().posixenvs().find(UNZIPPED_WORKING_DIR)->second, + destination); // startInstance param posixenvs should contain UNZIPPED_WORKING_DIR + EXPECT_EQ(startInstanceRequest->runtimeinstanceinfo().runtimeconfig().posixenvs().find(YR_WORKING_DIR)->second, + destination); // startInstance param posixenvs should contain YR_WORKING_DIR + DestroyWorkingDir("/tmp/working_dir-tmp"); +} } // namespace functionsystem::test