diff --git a/common/litebus/include/utils/os_utils.hpp b/common/litebus/include/utils/os_utils.hpp index 9fd52a257ec0d1cd1a20fcc38d081dbf67638597..0cecd5438daab849722afd63f7577cdce9024e7f 100644 --- a/common/litebus/include/utils/os_utils.hpp +++ b/common/litebus/include/utils/os_utils.hpp @@ -121,6 +121,7 @@ public: int readMaxSize = BUFFER_CONTENT_SIZE); void ReadFromPipeRealTime(int fd, std::shared_ptr> promise, AID aid, const std::function &readPipeCallback); + void ImmediatePipeReadOperation(int fd, const std::function& dataCallback); }; litebus::Future ReadPipeAsync(int fd, bool readASync = true); diff --git a/common/litebus/src/utils/os_utils.cpp b/common/litebus/src/utils/os_utils.cpp index eef4679ee4c632563ec172698ad6b08d3b6db4c7..22b6eab63bbd02d8067fc68c5f7f331929f1d0ce 100644 --- a/common/litebus/src/utils/os_utils.cpp +++ b/common/litebus/src/utils/os_utils.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include "async/async.hpp" #include "async/asyncafter.hpp" #include "async/uuid_generator.hpp" @@ -489,6 +490,12 @@ void PipeReadActor::ReadFromPipeRealTime(int fd, std::shared_ptr& dataCallback) +{ + ReadPipeRealTime(fd, dataCallback); + BUSLOG_DEBUG("Completed immediate read operation for fd:{}", fd); +} + // atomic int read actor name std::atomic g_readActorId(0); diff --git a/common/logs/include/logs/api/log_param.h b/common/logs/include/logs/api/log_param.h index 8c827ab815995d533e195fa902663677b606625c..bd5e4c8f84f46c3495c87c5b163a1468577046d1 100644 --- a/common/logs/include/logs/api/log_param.h +++ b/common/logs/include/logs/api/log_param.h @@ -46,6 +46,7 @@ struct LogParam { uint32_t maxFiles = DEFAULT_MAX_FILES; std::string stdLogLevel; bool syncFlush = false; + std::string logFileExtensions; }; struct GlobalLogParam { diff --git a/common/logs/src/sdk/log_handler.cpp b/common/logs/src/sdk/log_handler.cpp index c4635fe7f23890a1ccf51c8350316213c067c161..08d0165d40175a55e9afad26251a7c26f153c255 100644 --- a/common/logs/src/sdk/log_handler.cpp +++ b/common/logs/src/sdk/log_handler.cpp @@ -76,7 +76,6 @@ void DoLogFileRolling(const observability::api::logs::LogParam &logParam) DeleteFile(file.second.name); redundantNum--; } - return; } void DoLogFileCompress(const observability::api::logs::LogParam &logParam) diff --git a/common/logs/src/sdk/log_param_parser.cpp b/common/logs/src/sdk/log_param_parser.cpp index 3dbbbe1970f92e20c3f152e873a530834fd73d2b..78dff2e42605ea294b0752abb140c81c266bc709 100644 --- a/common/logs/src/sdk/log_param_parser.cpp +++ b/common/logs/src/sdk/log_param_parser.cpp @@ -49,7 +49,10 @@ std::string GetLogFile(const LogsApi::LogParam ¶m) logFile = param.logDir + "/" + param.nodeName + "-" + param.modelName; } if (param.logFileWithTime) { - logFile += "-" + FormatTimePoint() + ".log"; + logFile += "-" + FormatTimePoint(); + } + if (!param.logFileExtensions.empty()) { + logFile += "." + param.logFileExtensions; } else { logFile += ".log"; } diff --git a/common/logs/test/sdk/logger_provider_sdk_test.cpp b/common/logs/test/sdk/logger_provider_sdk_test.cpp index fb726ada0c27b1c69e4f276292000cd09b553b8c..9a4370bf88f648d5f5d06bdede9cd0fbe31a5f6d 100644 --- a/common/logs/test/sdk/logger_provider_sdk_test.cpp +++ b/common/logs/test/sdk/logger_provider_sdk_test.cpp @@ -128,7 +128,6 @@ TEST_F(LoggerProviderSDKDeathTest, UseLogMarcoWithSetProvider) SDK_WARN("warn message"); SDK_ERROR("error message"); enum TestEnum { FIRST, SECOND }; - SDK_DEBUG("enum message {}", TestEnum::FIRST); EXPECT_EXIT(SDK_FATAL("fatal message"), testing::KilledBySignal(SIGINT), ""); } diff --git a/functionsystem/src/runtime_manager/CMakeLists.txt b/functionsystem/src/runtime_manager/CMakeLists.txt index 798e16c8d2c91baf77ca5e654374d76d9fea7751..91a2bcad4463c6803e2150b588357ffd567c3ccf 100644 --- a/functionsystem/src/runtime_manager/CMakeLists.txt +++ b/functionsystem/src/runtime_manager/CMakeLists.txt @@ -36,6 +36,7 @@ add_subdirectory(driver) add_subdirectory(log) add_subdirectory(debug) add_subdirectory(virtual_env_manager) +add_subdirectory(std_monitor) target_link_libraries(runtime_manager_lib ${litebus_ALL_LIB} ${yaml_LIB} diff --git a/functionsystem/src/runtime_manager/config/flags.cpp b/functionsystem/src/runtime_manager/config/flags.cpp index 4d8e3109b0802a67a6291f0fb97f430af7b49bb1..021a6c8b8cb80a1dc5710f87d1e129d46ec01d4c 100644 --- a/functionsystem/src/runtime_manager/config/flags.cpp +++ b/functionsystem/src/runtime_manager/config/flags.cpp @@ -46,6 +46,11 @@ const int MAX_DISK_LIMIT = 1024 * 1024; const int DEFAULT_MAX_LOG_SIZE_MB = 40; const int DEFAULT_MAX_LOG_FILE_NUM = 20; +const int32_t DEFAULT_USER_LOG_AUTO_FLUSH_INTERVAL_MS = 10000; +const int32_t DEFAULT_USER_LOG_BUFFER_FLUSH_THRESHOLD = 1024 * 1024; +const unsigned long DEFAULT_USER_LOG_MAX_ROLLING_FILE_SIZE_MB = 100; +const unsigned long DEFAULT_USER_LOG_MAX_ROLLING_LOG_FILE_NUM = 100; + const uint32_t DEFAULT_RUNTIME_DS_CONNECT_TIMEOUT = 60; // s const int MIN_MEMORY_DETECTION_INTERVAL = 100; // ms @@ -181,5 +186,13 @@ Flags::Flags() AddFlag(&Flags::runtimeInstanceDebugEnable_, "runtime_instance_debug_enable", "runtime instance debug enable", false); AddFlag(&Flags::userLogExportMode_, "user_log_export_mode", "user log export mode: std/file", "file"); + AddFlag(&Flags::userLogAutoFlushIntervalMs_, "user_log_auto_flush_interval_ms", + "Interval in milliseconds for auto flushing user logs", DEFAULT_USER_LOG_AUTO_FLUSH_INTERVAL_MS); + AddFlag(&Flags::userLogBufferFlushThreshold_, "user_log_buffer_flush_threshold", + "Threshold for flushing user log buffer", DEFAULT_USER_LOG_BUFFER_FLUSH_THRESHOLD); + AddFlag(&Flags::userLogRollingSizeLimitMb_, "user_log_rolling_size_limit_mb", + "Maximum size limit (in MB) for a single user log file", DEFAULT_USER_LOG_MAX_ROLLING_FILE_SIZE_MB); + AddFlag(&Flags::userLogRollingFileCountLimit_, "user_log_rolling_file_count_limit", + "Maximum number of user log files to retain", DEFAULT_USER_LOG_MAX_ROLLING_LOG_FILE_NUM); } } // namespace functionsystem::runtime_manager diff --git a/functionsystem/src/runtime_manager/config/flags.h b/functionsystem/src/runtime_manager/config/flags.h index b89209630d3c0d47a56ea576e45985a66c2e6451..f356de1feebc2a3d65148aab80c2a98910f51653 100644 --- a/functionsystem/src/runtime_manager/config/flags.h +++ b/functionsystem/src/runtime_manager/config/flags.h @@ -296,6 +296,25 @@ public: { return runtimeDirectConnectionEnable_; } + int32_t GetUserLogAutoFlushIntervalMs() const + { + return userLogAutoFlushIntervalMs_; + } + + int32_t GetUserLogBufferFlushThreshold() const + { + return userLogBufferFlushThreshold_; + } + + unsigned long GetUserLogRollingSizeLimitMb() const + { + return userLogRollingSizeLimitMb_; + } + + unsigned long GetUserLogRollingFileCountLimit() const + { + return userLogRollingFileCountLimit_; + } bool GetCleanStreamProducerEnable() const { @@ -454,6 +473,10 @@ protected: bool runtimeInstanceDebugEnable_{ false }; std::string userLogExportMode_; std::string diskResources_; + int32_t userLogAutoFlushIntervalMs_; + int32_t userLogBufferFlushThreshold_; + unsigned long userLogRollingSizeLimitMb_; + unsigned long userLogRollingFileCountLimit_; }; } // namespace functionsystem::runtime_manager diff --git a/functionsystem/src/runtime_manager/executor/executor.cpp b/functionsystem/src/runtime_manager/executor/executor.cpp index 6e8d6d5407eafd49fef3e652e79d6a46241dc0ae..8ff6b247f0858c0301ed61405a9e38fac8624f29 100644 --- a/functionsystem/src/runtime_manager/executor/executor.cpp +++ b/functionsystem/src/runtime_manager/executor/executor.cpp @@ -140,6 +140,10 @@ void Executor::SetRuntimeConfig(const Flags &flags) config_.massifEnable = flags.GetMassifEnable(); config_.inheritEnv = flags.GetInheritEnv(); config_.separatedRedirectRuntimeStd = flags.GetSeparetedRedirectRuntimeStd(); + config_.userLogAutoFlushIntervalMs_ = flags.GetUserLogAutoFlushIntervalMs(); + config_.userLogBufferFlushThreshold_ = flags.GetUserLogBufferFlushThreshold(); + config_.userLogRollingSizeLimitMb_ = flags.GetUserLogRollingSizeLimitMb(); + config_.userLogRollingFileCountLimit_ = flags.GetUserLogRollingFileCountLimit(); const std::string &prestartConfig = flags.GetRuntimePrestartConfig(); if (!prestartConfig.empty() && prestartConfig != "{}") { YRLOG_DEBUG("prestart config is not empty, start to parse"); diff --git a/functionsystem/src/runtime_manager/executor/executor.h b/functionsystem/src/runtime_manager/executor/executor.h index 5529d4dc682c24c060763e99ce4765be7480dd90..6552a3cd6372f0f5df200a4966b12f7018474858 100644 --- a/functionsystem/src/runtime_manager/executor/executor.h +++ b/functionsystem/src/runtime_manager/executor/executor.h @@ -93,6 +93,10 @@ struct RuntimeConfig { std::string userLogExportMode; bool cleanStreamProducerEnable; int virtualEnvIdleTimeLimit; + int32_t userLogAutoFlushIntervalMs_; + int32_t userLogBufferFlushThreshold_; + unsigned long userLogRollingSizeLimitMb_; + unsigned long userLogRollingFileCountLimit_; }; struct PrestartProcess { diff --git a/functionsystem/src/runtime_manager/executor/runtime_executor.cpp b/functionsystem/src/runtime_manager/executor/runtime_executor.cpp index 2b8280f214a846ee1e8f60aabddd1682896bc309..5178eb960fb3a068437059f28df658e28552b26b 100644 --- a/functionsystem/src/runtime_manager/executor/runtime_executor.cpp +++ b/functionsystem/src/runtime_manager/executor/runtime_executor.cpp @@ -18,6 +18,7 @@ #include #include +#include #include #include @@ -122,6 +123,8 @@ const std::vector EXCLUDE_ENV_KEYS_PASSED_TO_RUNTIME = { UNZIPPED_WORKING_DIR }; // job working_dir file unzipped path +const int DEFAULT_STD_MONITOR_EVENT = EPOLLIN | EPOLLHUP | EPOLLERR; + std::function SetRuntimeIdentity(int userID, int groupID) { return [groupID, userID]() { @@ -191,6 +194,9 @@ RuntimeExecutor::RuntimeExecutor(const std::string &name, const litebus::AID &fu monitorCallBackActor_ = std::make_shared(mcName, functionAgentAID); cmdTool_ = std::make_shared(); litebus::Spawn(monitorCallBackActor_); + // start std monitor + stdMonitor_ = std::make_shared(); + stdMonitor_->Start(); } void RuntimeExecutor::Init() @@ -207,6 +213,10 @@ void RuntimeExecutor::Finalize() litebus::Async(monitorCallBackActor_->GetAID(), &MonitorCallBackActor::DeleteAllMonitorAndRemoveDir); litebus::Terminate(monitorCallBackActor_->GetAID()); litebus::Await(monitorCallBackActor_->GetAID()); + + stdMonitor_->Stop(); + stdMonitor_ = nullptr; + stdRedirectors_.clear(); runtimeInstanceInfoMap_.clear(); Executor::Finalize(); @@ -556,6 +566,9 @@ litebus::Future RuntimeExecutor::StartRuntime( if (result.IsError()) { return GenFailStartInstanceResponse(request, result.StatusCode()); } + if (config_.separatedRedirectRuntimeStd && NeedToRollUserLog()) { + StartRuntimeStdRedirection(info.runtimeid(), execPtr->GetOut(), execPtr->GetErr()); + } if (!config_.separatedRedirectRuntimeStd) { litebus::Async(GetStdRedirector(config_.nodeID)->GetAID(), &StdRedirector::StartRuntimeStdRedirection, info.runtimeid(), info.instanceid(), execPtr->GetOut(), execPtr->GetErr()); @@ -575,6 +588,36 @@ litebus::Future RuntimeExecutor::StartRuntime( return GenSuccessStartInstanceResponse(request, execPtr, port); } + +void RuntimeExecutor::StartRuntimeStdRedirection(const std::string &runtimeID, + const litebus::Option stdOut, const litebus::Option stdErr) +{ + const StdRedirectParam stdRedirectParam = { + .maxLogLength = config_.userLogBufferFlushThreshold_, + .flushDuration = config_.userLogAutoFlushIntervalMs_, + .stdRollingMaxFileSize = config_.userLogRollingSizeLimitMb_, + .stdRollingMaxFiles = config_.userLogRollingFileCountLimit_, + .exportMode = FILE_EXPORTER, + .logFileExtensions = DEFAULT_LOG_FILE_OUT_EXTENSIONS, + }; + + if (stdOut.IsSome()) { + std::function ReadDataCallback = + [aid(GetStdRedirector(runtimeID, stdRedirectParam)->GetAID())](int fd, int event) { + litebus::Async(aid, &StdRedirector::ReadStdLogRealTime, fd, event); + }; + stdMonitor_->AddFd(stdOut.Get(), DEFAULT_STD_MONITOR_EVENT, ReadDataCallback); + } + + if (stdErr.IsSome()) { + std::function ReadDataCallback = + [aid(GetStdRedirector(runtimeID, stdRedirectParam)->GetAID())](int fd, int event) { + litebus::Async(aid, &StdRedirector::ReadStdLogRealTime, fd, event); + }; + stdMonitor_->AddFd(stdErr.Get(), DEFAULT_STD_MONITOR_EVENT, ReadDataCallback); + } +} + Status RuntimeExecutor::WriteProtoToRuntime(const std::string &requestID, const std::string &runtimeID, const ::messages::TLSConfig &tlsConfig, const std::shared_ptr execPtr) const @@ -672,6 +715,33 @@ std::shared_ptr RuntimeExecutor::GetStdRedirector(const std::stri return redirector; } +std::shared_ptr RuntimeExecutor::GetStdRedirector(const std::string &runtimeID, + const StdRedirectParam &stdRedirectParam) +{ + if (stdRedirectors_.find(runtimeID) != stdRedirectors_.end()) { + return stdRedirectors_[runtimeID]; + } + auto path = litebus::os::Join(config_.runtimeLogPath, config_.runtimeStdLogDir); + auto logFileName = runtimeID + stdRedirectParam.logFileExtensions; + YRLOG_INFO("{} not found, create a new redirector log file: {} {}", runtimeID, path, logFileName); + auto redirector = std::make_shared(path, logFileName, stdRedirectParam); + (void)litebus::Spawn(redirector); + (void)litebus::Async(redirector->GetAID(), &StdRedirector::Start); + stdRedirectors_[runtimeID] = redirector; + return redirector; +} + +void RuntimeExecutor::StopRedirectorByRuntimeID(const std::string &runtimeID) +{ + auto iter = stdRedirectors_.find(runtimeID); + if (iter == stdRedirectors_.end()) { + return; + } + litebus::Terminate(iter->second->GetAID()); + litebus::Await(iter->second->GetAID()); + stdRedirectors_.erase(iter); +} + void RuntimeExecutor::ReportInfo(const std::string &instanceID, const std::string runtimeID, const pid_t &pid, const functionsystem::metrics::MeterTitle &title) { @@ -724,6 +794,11 @@ void RuntimeExecutor::ConfigRuntimeRedirectLog(litebus::ExecIO &stdOut, litebus: stdErr = litebus::ExecIO::CreateFileIO(errFile); } +bool RuntimeExecutor::NeedToRollUserLog() const +{ + return config_.userLogRollingFileCountLimit_ != 0; +} + std::shared_ptr RuntimeExecutor::StartRuntimeByRuntimeID( const std::map startRuntimeParams, const std::vector &buildArgs, const Envs &envs, const std::vector> childInitHook) const @@ -739,7 +814,8 @@ std::shared_ptr RuntimeExecutor::StartRuntimeByRuntimeID( litebus::ExecIO stdOut = litebus::ExecIO::CreatePipeIO(); auto stdErr = stdOut; if (config_.userLogExportMode == functionsystem::runtime_manager::FILE_EXPORTER - && config_.separatedRedirectRuntimeStd) { + && config_.separatedRedirectRuntimeStd + && !NeedToRollUserLog()) { ConfigRuntimeRedirectLog(stdOut, stdErr, runtimeID); } std::string cmd = execPath; @@ -927,6 +1003,25 @@ void RuntimeExecutor::InheritEnv(std::map &combineEnvs } } +void RuntimeExecutor::StopMonitoringFD(const std::string &runtimeID, const std::string &requestID) +{ + auto execPtr = GetExecByRuntimeID(runtimeID); + if (execPtr == nullptr) { + YRLOG_WARN("{}|{}|runtime has already been killed.", requestID, runtimeID); + return; + } + + auto stdOut = execPtr->GetOut(); + if (stdOut.IsSome()) { + stdMonitor_->RemoveFd(stdOut.Get()); + } + + auto stdErr = execPtr->GetErr(); + if (stdErr.IsSome()) { + stdMonitor_->RemoveFd(stdErr.Get()); + } +} + Status RuntimeExecutor::StopInstanceByRuntimeID(const std::string &runtimeID, const std::string &requestID, bool oomKilled) { @@ -956,6 +1051,8 @@ Status RuntimeExecutor::StopInstanceByRuntimeID(const std::string &runtimeID, co // clear work dir if exist litebus::Async(monitorCallBackActor_->GetAID(), &MonitorCallBackActor::DeleteFromMonitorMap, instanceID); (void)runtime2PID_.erase(runtimeID); + StopMonitoringFD(runtimeID, requestID); + StopRedirectorByRuntimeID(runtimeID); (void)runtime2Exec_.erase(runtimeID); (void)pids_.erase(pidIter->second); diff --git a/functionsystem/src/runtime_manager/executor/runtime_executor.h b/functionsystem/src/runtime_manager/executor/runtime_executor.h index 7d2f6c38098aee6eaf79efadc0451b467fc8f2fd..f223b20aec557eefe37cdbe20831b099c367af8e 100644 --- a/functionsystem/src/runtime_manager/executor/runtime_executor.h +++ b/functionsystem/src/runtime_manager/executor/runtime_executor.h @@ -38,6 +38,7 @@ #include "runtime_manager/utils/std_redirector.h" #include "utils/volume_mount.h" #include "virtual_env_manager/virtual_env_manager.h" +#include "std_monitor/std_monitor.h" namespace functionsystem::runtime_manager { @@ -67,6 +68,40 @@ public: litebus::Future UpdateCredForRuntime( const std::shared_ptr &request) override; + // for test + [[maybe_unused]] void ProtectedHookRuntimeCredentialByID(std::vector> &initHook, int userID, + int groupID) const + { + HookRuntimeCredentialByID(initHook, userID, groupID); + } + + // for test + [[maybe_unused]] bool ProtectedCheckPrestartRuntimeRetry(const std::string &runtimeID, const std::string &language, + const int retryTimes) + { + return CheckPrestartRuntimeRetry(runtimeID, language, retryTimes); + } + + // for test + [[maybe_unused]] int CheckPrestartRuntimePromise() + { + int cnt = 0; + for (auto iter = prestartRuntimePromiseMap_.begin(); iter != prestartRuntimePromiseMap_.end(); ++iter) { + if (iter->second->GetFuture().IsError() || iter->second->GetFuture().IsOK()) { + cnt++; + } else { + return 0; + } + } + return cnt; + } + + // for test + [[maybe_unused]] void SetStdRedirectors(const std::string logName, const std::shared_ptr redirector) + { + stdRedirectors_[logName] = redirector; + } + litebus::Future StopAllRuntimes(); void CheckRuntimesExited(const std::chrono::steady_clock::time_point &start, const litebus::Promise &promise); @@ -103,10 +138,17 @@ private: std::shared_ptr GetStdRedirector(const std::string &logName); + std::shared_ptr GetStdRedirector(const std::string &runtimeID, + const StdRedirectParam &stdRedirectParam); + + void StopRedirectorByRuntimeID(const std::string &runtimeID); + litebus::Future StartInstanceWithoutPrestart( const std::shared_ptr &request, const std::string &language, const std::vector &cardIDs); + bool NeedToRollUserLog() const; + std::shared_ptr StartRuntimeByRuntimeID( const std::map startRuntimeParams, const std::vector &buildArgs, const Envs &envs, const std::vector> childInitHook) const; @@ -131,6 +173,8 @@ private: bool ShouldUseProcessGroup() const; + void StopMonitoringFD(const std::string &runtimeID, const std::string &requestID); + void TerminateImmediately(pid_t pid, std::string_view processType); void SendGracefulTermination(pid_t pid, std::string_view processType); @@ -271,6 +315,9 @@ private: void ConfigRuntimeRedirectLog(litebus::ExecIO &stdOut, litebus::ExecIO &stdErr, const std::string &runtimeID) const; + void StartRuntimeStdRedirection(const std::string &runtimeID, + const litebus::Option stdOut, const litebus::Option stdErr); + std::pair GetPythonExecPath( const google::protobuf::Map &deployOptions, const messages::RuntimeInstanceInfo &info) const; @@ -296,6 +343,8 @@ private: std::shared_ptr monitorCallBackActor_{ nullptr }; std::shared_ptr cmdTool_; std::shared_ptr virtualEnvMgr_{ nullptr }; + std::shared_ptr stdMonitor_{ nullptr }; + StdRedirectParam stdRedirectParam_; }; class RuntimeExecutorProxy : public ExecutorProxy { diff --git a/functionsystem/src/runtime_manager/std_monitor/CMakeLists.txt b/functionsystem/src/runtime_manager/std_monitor/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..86fc8a2ade2362109183e36e420fe042915e757e --- /dev/null +++ b/functionsystem/src/runtime_manager/std_monitor/CMakeLists.txt @@ -0,0 +1,17 @@ +# Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +aux_source_directory(${CMAKE_CURRENT_LIST_DIR} STD_MONITOR_SRCS) + +target_sources(runtime_manager_lib PRIVATE ${STD_MONITOR_SRCS}) \ No newline at end of file diff --git a/functionsystem/src/runtime_manager/std_monitor/std_monitor.cpp b/functionsystem/src/runtime_manager/std_monitor/std_monitor.cpp new file mode 100644 index 0000000000000000000000000000000000000000..6aeb5b756c577a3bdf5b70113423e83bd8d78375 --- /dev/null +++ b/functionsystem/src/runtime_manager/std_monitor/std_monitor.cpp @@ -0,0 +1,55 @@ +/* +* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "std_monitor.h" + +#include + +#include "async/future.hpp" +#include "common/status/status.h" + +namespace functionsystem { + +litebus::Future StdMonitor::Start() +{ + std::string name = "StdMonitor_" + litebus::uuid_generator::UUID::GetRandomUUID().ToString(); + actor_ = std::make_shared(name); + litebus::Spawn(actor_); + return litebus::Async(actor_->GetAID(), &StdMonitorActor::Start); +} + +void StdMonitor::Stop() +{ + ASSERT_IF_NULL(actor_); + litebus::Async(actor_->GetAID(), &StdMonitorActor::Stop); + litebus::Terminate(actor_->GetAID()); + litebus::Await(actor_->GetAID()); + actor_ = nullptr; +} + +litebus::Future StdMonitor::AddFd(int fd, int events, StdEventHandler callback) +{ + ASSERT_IF_NULL(actor_); + return litebus::Async(actor_->GetAID(), &StdMonitorActor::AddFd, fd, events, callback); +} + +litebus::Future StdMonitor::RemoveFd(int fd) +{ + ASSERT_IF_NULL(actor_); + return litebus::Async(actor_->GetAID(), &StdMonitorActor::RemoveFd, fd); +} + +} // namespace functionsystem \ No newline at end of file diff --git a/functionsystem/src/runtime_manager/std_monitor/std_monitor.h b/functionsystem/src/runtime_manager/std_monitor/std_monitor.h new file mode 100644 index 0000000000000000000000000000000000000000..25dee88f2d0b80c5ae4796429eb7a5865cd044eb --- /dev/null +++ b/functionsystem/src/runtime_manager/std_monitor/std_monitor.h @@ -0,0 +1,39 @@ +/* +* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FUNCTIONSYSTEM_STD_MONITOR_H +#define FUNCTIONSYSTEM_STD_MONITOR_H + +#include +#include + +#include "std_monitor_actor.h" + +namespace functionsystem { +class StdMonitor { +public: + StdMonitor(){}; + ~StdMonitor() = default; + litebus::Future Start(); + void Stop(); + litebus::Future AddFd(int fd, int events, StdEventHandler callback); + litebus::Future RemoveFd(int fd); +private: + std::shared_ptr actor_{ nullptr }; +}; +} // namespace functionsystem + +#endif // FUNCTIONSYSTEM_STD_MONITOR_H diff --git a/functionsystem/src/runtime_manager/std_monitor/std_monitor_actor.cpp b/functionsystem/src/runtime_manager/std_monitor/std_monitor_actor.cpp new file mode 100644 index 0000000000000000000000000000000000000000..877286abf0c2338ecdffa2073349cb2f95e0b305 --- /dev/null +++ b/functionsystem/src/runtime_manager/std_monitor/std_monitor_actor.cpp @@ -0,0 +1,289 @@ +/* +* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "std_monitor_actor.h" +#include "async/async.hpp" +#include "common/logs/logging.h" +#include "common/utils/exec_utils.h" + +namespace functionsystem { +const int MAX_EVENTS_NUM = 1024; +const int DEFAULT_USER_LOG_EPOLL_ACTIVATE_INTERVAL_MS = 1; +const long DEFAULT_USER_LOG_EPOLL_PIPE_MAX_SIZE = 1024 * 1024; + +StdMonitorActor::StdMonitorActor(const std::string &name) : ActorBase(name) +{ +} + +StdMonitorActor::~StdMonitorActor() +{ + Stop(); +} + +void StdMonitorActor::Finalize() +{ + YRLOG_INFO("finalize StdMonitorActor {}", ActorBase::GetAID().Name()); +} + +litebus::Future StdMonitorActor::Start() +{ + epollFd_ = epoll_create1(EPOLL_CLOEXEC); + if (epollFd_ == EPOLL_FD_INVALID) { + YRLOG_ERROR("Failed to create epoll fd"); + return false; + } + UpdatePipeMaxSizeFromSystem(); + if (!AddStopEvent()) { + YRLOG_ERROR("Failed to add monitoring for stopEvent"); + return false; + } + isRunning_ = true; + thread_ = std::make_unique([this]() { RunLoop(); }); + YRLOG_INFO("Successfully started StdMonitorActor"); + return true; +} + +void StdMonitorActor::UpdatePipeMaxSizeFromSystem() +{ + pipeMaxSize_ = DEFAULT_USER_LOG_EPOLL_PIPE_MAX_SIZE; + std::ifstream fs("/proc/sys/fs/pipe-max-size"); + long systemPipeMaxSize; + if ((fs) && (fs >> systemPipeMaxSize)) { + pipeMaxSize_ = std::min(pipeMaxSize_, systemPipeMaxSize); + YRLOG_INFO("pipe-max-size set to: {}", pipeMaxSize_); + } else { + YRLOG_ERROR("failed to get pipe-max-size, using default"); + } +} + +void StdMonitorActor::SetPipeMaxSize(int fd) +{ + fcntl(fd, F_SETPIPE_SZ, pipeMaxSize_); + if (fcntl(fd, F_SETPIPE_SZ, pipeMaxSize_) == -1) { + YRLOG_WARN("Failed to set pipe max size for fd: {}", fd); + } +} + +bool StdMonitorActor::SetNonBlockingIO(int fd) +{ + int flags = fcntl(fd, F_GETFL, 0); + if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) { + YRLOG_ERROR("Failed to set non-blocking for fd: {}", fd); + return false; + } + return true; +} + +void StdMonitorActor::UpdateActiveTime() +{ + auto now = std::chrono::steady_clock::now(); + auto it = fdActiveTimestamps_.begin(); + while (it != fdActiveTimestamps_.end()) { + auto elapsed = std::chrono::duration_cast(now - it->second).count(); + if (elapsed >= DEFAULT_USER_LOG_EPOLL_ACTIVATE_INTERVAL_MS) { + litebus::Async(GetAID(), &StdMonitorActor::HandleEvent, it->first, EPOLLIN); + it = fdActiveTimestamps_.erase(it); + } else { + ++it; + } + } +} + +bool StdMonitorActor::IsReaderActive(int fd) +{ + return fdActiveTimestamps_.count(fd) > 0; +} + +void StdMonitorActor::MarkReaderActive(int fd) +{ + auto now = std::chrono::steady_clock::now(); + fdActiveTimestamps_[fd] = now; +} + +void StdMonitorActor::RunLoop() +{ + struct epoll_event events[MAX_EVENTS_NUM]; + while (isRunning_) { + int nfds = epoll_wait(epollFd_, events, MAX_EVENTS_NUM, -1); + UpdateActiveTime(); + if (nfds == -1) { + YRLOG_ERROR("Failed to do epoll_wait, code:{}, err:{}", errno, litebus::os::Strerror(errno)); + continue; + } + if (shouldStopRunLoop()) { + YRLOG_INFO("receive stop event, start to exit run loop"); + break; + } + for (int i = 0; i < nfds; ++i) { + int fd = events[i].data.fd; + int ev = events[i].events; + if (!IsReaderActive(fd)) { + litebus::Async(GetAID(), &StdMonitorActor::HandleEvent, fd, ev); + MarkReaderActive(fd); + } + } + } +} + +litebus::Future StdMonitorActor::AddFd(int fd, int events, StdEventHandler callback) +{ + if (epollFd_ == EPOLL_FD_INVALID) { + YRLOG_WARN("Std monitor is not initialized yet"); + return false; + } + + if (fd < 0) { + YRLOG_WARN("Invalid fd: {}", fd); + return false; + } + + if (!SetNonBlockingIO(fd)) { + return false; + } + + SetPipeMaxSize(fd); + + struct epoll_event ev; + ev.events = events; + ev.data.fd = fd; + if (epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, &ev) == -1) { + YRLOG_ERROR("Failed to add monitoring for fd: {}", fd); + return false; + } + fdCallbacks_[fd] = callback; + YRLOG_DEBUG("Successful monitoring added for fd: {}", fd); + return true; +} + +bool StdMonitorActor::StopMonitoringFd(int fd) +{ + if (epollFd_ == EPOLL_FD_INVALID) { + YRLOG_WARN("Std monitor is not initialized yet"); + return false; + } + + if (fd < 0) { + YRLOG_WARN("Invalid fd: {}", fd); + return false; + } + + if (fdCallbacks_.find(fd) == fdCallbacks_.end()) { + YRLOG_DEBUG("The monitored fd({}) has been deleted", fd); + return true; + } + + if (epoll_ctl(epollFd_, EPOLL_CTL_DEL, fd, nullptr) != 0) { + YRLOG_ERROR("Failed to remove fd {}: code:{}, err:{}", fd, errno, litebus::os::Strerror(errno)); + return false; + } + return true; +} + +litebus::Future StdMonitorActor::RemoveFd(int fd) +{ + if (!StopMonitoringFd(fd)) { + return false; + } + fdCallbacks_.erase(fd); + YRLOG_DEBUG("Successfully removed monitoring of fd: {}", fd); + return true; +} + +void StdMonitorActor::HandleEvent(int fd, int event) +{ + auto it = fdCallbacks_.find(fd); + if (it == fdCallbacks_.end()) { + YRLOG_ERROR("Empty fdCallback, fd: {}", fd); + return; + } + const auto& callback = it->second; + if (callback) { + callback(fd, event); + } +} + +bool StdMonitorActor::AddStopEvent() +{ + stopEventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); + if (stopEventfd_ < 0) { + YRLOG_ERROR("Failed to create stopEvent fd"); + return false; + } + + auto event = EPOLLIN | EPOLLHUP | EPOLLERR; + if (!AddFd(stopEventfd_, event, nullptr).Get()) { + close(stopEventfd_); + return false; + } + return true; +} + +bool StdMonitorActor::shouldStopRunLoop() +{ + uint64_t stopSignal; + if (read(stopEventfd_, &stopSignal, sizeof(stopSignal)) == static_cast(sizeof(stopSignal))) { + isRunning_ = false; + return true; + } + return false; +} + +void StdMonitorActor::StopRunLoop() +{ + // write stop event + uint64_t stopSignal = 1; + if (write(stopEventfd_, &stopSignal, sizeof(stopSignal)) != static_cast(sizeof(stopSignal))) { + YRLOG_WARN("fail to write stopEventfd, fd:{}, code:{}, errno:{}", + stopEventfd_, errno, litebus::os::Strerror(errno)); + } +} + +void StdMonitorActor::Stop() +{ + if (!isRunning_) { + YRLOG_INFO("std monitor already stopped"); + return; + } + isRunning_ = false; + + StopRunLoop(); + ASSERT_IF_NULL(thread_); + if (thread_->joinable()) { + YRLOG_INFO("wait for std monitor thread to exit"); + thread_->join(); + } + + // clean monitor events + for (auto fdPair : fdCallbacks_) { + int fd = fdPair.first; + StopMonitoringFd(fd); + } + fdCallbacks_.clear(); + + // close epoll + if (epollFd_ >= 0) { + close(epollFd_); + epollFd_ = EPOLL_FD_INVALID; + } + + if (stopEventfd_ >= 0) { + close(stopEventfd_); + stopEventfd_ = EPOLL_FD_INVALID; + } + YRLOG_INFO("std monitor stopped"); +} + +} diff --git a/functionsystem/src/runtime_manager/std_monitor/std_monitor_actor.h b/functionsystem/src/runtime_manager/std_monitor/std_monitor_actor.h new file mode 100644 index 0000000000000000000000000000000000000000..53d2b38d40f639bfaadf71e1da68bbc690d15a67 --- /dev/null +++ b/functionsystem/src/runtime_manager/std_monitor/std_monitor_actor.h @@ -0,0 +1,70 @@ +/* +* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FUNCTIONSYSTEM_STD_EVENT_LOOP_H +#define FUNCTIONSYSTEM_STD_EVENT_LOOP_H + +#include +#include + +#include +#include +#include +#include +#include + +#include "actor/actor.hpp" +#include "async/future.hpp" + +namespace functionsystem { +const int EPOLL_FD_INVALID = -1; +using StdEventHandler = std::function; + +class StdMonitorActor : public litebus::ActorBase { +public: + explicit StdMonitorActor(const std::string &name); + ~StdMonitorActor() override; + litebus::Future Start(); + void Stop(); + litebus::Future AddFd(int fd, int events, StdEventHandler callback); + litebus::Future RemoveFd(int fd); + +private: + void Finalize() override; + void RunLoop(); + void HandleEvent(int fd, int event); + bool AddStopEvent(); + void StopRunLoop(); + bool shouldStopRunLoop(); + bool StopMonitoringFd(int fd); + void UpdateActiveTime(); + bool IsReaderActive(int fd); + void MarkReaderActive(int fd); + void UpdatePipeMaxSizeFromSystem(); + void SetPipeMaxSize(int fd); + bool SetNonBlockingIO(int fd); + + std::map fdCallbacks_; + std::unordered_map> fdActiveTimestamps_; + std::atomic isRunning_ { false }; + std::unique_ptr thread_; + int epollFd_ { EPOLL_FD_INVALID }; + int stopEventfd_ { EPOLL_FD_INVALID }; + long pipeMaxSize_; +}; + +} +#endif // FUNCTIONSYSTEM_STD_EVENT_LOOP_H diff --git a/functionsystem/src/runtime_manager/utils/std_redirector.cpp b/functionsystem/src/runtime_manager/utils/std_redirector.cpp index 263a8c26454452526145409b9d3ae39170f2ccc3..1f5c60dd143779965658bf4a9fd184b75f25a33c 100644 --- a/functionsystem/src/runtime_manager/utils/std_redirector.cpp +++ b/functionsystem/src/runtime_manager/utils/std_redirector.cpp @@ -15,6 +15,7 @@ */ #include "std_redirector.h" #include +#include #include "async/asyncafter.hpp" #include "async/defer.hpp" @@ -61,8 +62,8 @@ Status StdRedirector::Start() } // set user func std logger - size_t pos = logName_.rfind(".log"); - if (pos != std::string::npos && pos == logName_.length() - std::string(".log").size()) { + size_t pos = logName_.rfind(param_.logFileExtensions); + if (pos != std::string::npos && pos == logName_.length() - std::string(param_.logFileExtensions).size()) { logNameForLogsSdk_ = logName_.substr(0, pos); // compatible with previous configurations } else { logNameForLogsSdk_ = logName_; @@ -84,6 +85,7 @@ Status StdRedirector::Start() loggerParam.maxFiles = param_.stdRollingMaxFiles; loggerParam.maxSize = param_.stdRollingMaxFileSize; // MB loggerParam.alsoLog2Std = false; // not print to std output + loggerParam.logFileExtensions = param_.logFileExtensions; YRLOG_DEBUG("loggerParam.maxFiles: {}, loggerParam.maxSize: {} MB", loggerParam.maxFiles, loggerParam.maxSize); lp_ = LogsApi::Provider::GetLoggerProvider(); @@ -158,6 +160,56 @@ void StdRedirector::Finalize() } catch (...) { std::cerr << "close fileWriteStream failed." << std::endl; } + for (auto iter : fd2Readers_) { + litebus::Terminate(iter.second); + litebus::Await(iter.second); + } + fd2Readers_.clear(); +} + +litebus::AID StdRedirector::GetPipeAsyncReader(int fd) +{ + if (fd2Readers_.count(fd) > 0) { + return fd2Readers_[fd]; + } + const std::string strPipeReadAsync = "AID" + std::to_string(fd); + const litebus::AID pipeAsyncReader = litebus::Spawn(std::make_shared(strPipeReadAsync)); + fd2Readers_[fd] = pipeAsyncReader; + return pipeAsyncReader; +} + +void StdRedirector::TerminatePipeReader(int fd) +{ + auto it = fd2Readers_.find(fd); + if (it == fd2Readers_.end()) { + return; + } + + litebus::Terminate(it->second); + litebus::Await(it->second); + fd2Readers_.erase(it); +} + +void StdRedirector::ReadStdLogRealTime(int fd, int event) +{ + bool isValidEvent = (!(event & EPOLLERR) && !(event & EPOLLHUP)); + if (!isValidEvent) { + if (event & EPOLLERR) { + YRLOG_ERROR("fd({}) I/O error occurred (EPOLLERR)", fd); + } else if (event & EPOLLHUP) { + YRLOG_WARN("fd({}) connection closed by peer (EPOLLHUP)", fd); + } + MoveLogsToReady(); + litebus::Async(GetAID(), &StdRedirector::ExportLog); + } + if (event & EPOLLIN) { + const auto reader = GetPipeAsyncReader(fd); + + auto dataCallback = [aid(GetAID())](const std::string &content) { + litebus::Async(aid, &StdRedirector::SetStdLogRawContent, content); + }; + litebus::Async(reader, &litebus::os::PipeReadActor::ImmediatePipeReadOperation, fd, dataCallback); + } } void StdRedirector::StartRuntimeStdRedirection(const std::string &runtimeID, const std::string &instanceID, @@ -270,6 +322,20 @@ void StdRedirector::SetStdLogContent(const std::string &content, const std::stri litebus::Async(GetAID(), &StdRedirector::ExportLog); } +void StdRedirector::SetStdLogRawContent(const std::string &content) +{ + logs_.length += static_cast(content.length()); + logs_.message << content; + + if (logs_.length < param_.maxLogLength) { + return; + } + + MoveLogsToReady(); + YRLOG_DEBUG("ready to flush log to disk when log larger then {} byte.", param_.maxLogLength); + litebus::Async(GetAID(), &StdRedirector::ExportLog); +} + void StdRedirector::FlushLogContentRegularly() { YRLOG_DEBUG("ready to flush log regularly."); diff --git a/functionsystem/src/runtime_manager/utils/std_redirector.h b/functionsystem/src/runtime_manager/utils/std_redirector.h index 921a7c6a120621dbf5bb97774e5dc2e2b90f912d..fcf31ed7646ed8d9d16abfd91c165e919ce9256e 100644 --- a/functionsystem/src/runtime_manager/utils/std_redirector.h +++ b/functionsystem/src/runtime_manager/utils/std_redirector.h @@ -38,6 +38,8 @@ const unsigned long STD_ROLLING_MAX_FILES = 100; const std::string ERROR_LEVEL = "ERROR"; const std::string INFO_LEVEL = "INFO"; const std::string STD_POSTFIX = "-user_func_std.log"; +const std::string DEFAULT_LOG_FILE_EXTENSIONS = ".log"; +const std::string DEFAULT_LOG_FILE_OUT_EXTENSIONS = ".out"; struct StdRedirectParam { int32_t maxLogLength{ MAX_LOG_LENGTH }; @@ -45,6 +47,7 @@ struct StdRedirectParam { unsigned long stdRollingMaxFileSize{ STD_ROLLING_MAX_FILE_SIZE }; unsigned long stdRollingMaxFiles{ STD_ROLLING_MAX_FILES }; std::string exportMode { functionsystem::runtime_manager::FILE_EXPORTER }; + std::string logFileExtensions{ DEFAULT_LOG_FILE_EXTENSIONS }; }; struct RuntimeStandardLog { @@ -72,6 +75,7 @@ public: param_.flushDuration = flushDuration; param_.stdRollingMaxFileSize = STD_ROLLING_MAX_FILE_SIZE; param_.stdRollingMaxFiles = STD_ROLLING_MAX_FILES; + param_.logFileExtensions = DEFAULT_LOG_FILE_EXTENSIONS; } StdRedirector(const std::string &path, const std::string &logName, const StdRedirectParam ¶m) @@ -86,11 +90,15 @@ public: const litebus::Option stdOut, const litebus::Option stdErr); static std::string GetStdLog(const std::string &logFile, const std::string &runtimeID, const std::string &level, int32_t targetLineCnt = 20, int32_t readLineCnt = 1000); + void ReadStdLogRealTime(int fd, int event); + + void TerminatePipeReader(int fd); private: void FlushToDisk(const RuntimeStandardLog &log); void SetStdLogContent(const std::string &content, const std::string &runtimeID, const std::string &instanceID, const std::string &level); + void SetStdLogRawContent(const std::string &content); void FlushLogContentRegularly(); void SetTimer(const litebus::Timer &timer) { @@ -119,6 +127,8 @@ private: void FlushToDiskDirectly(); void FlushToStd(); + litebus::AID GetPipeAsyncReader(int fd); + bool logFileNotExist_{ false }; // (origin) logger std::string path_; @@ -134,6 +144,7 @@ private: std::shared_ptr userStdLogger_; std::shared_ptr lp_{ nullptr }; std::shared_ptr logManager_{ nullptr }; + std::unordered_map fd2Readers_; void Finalize() override; }; diff --git a/functionsystem/tests/unit/runtime_manager/CMakeLists.txt b/functionsystem/tests/unit/runtime_manager/CMakeLists.txt index dcae678ec37f3ef061e95ba8e280ecfb11296950..6ea969c93465c333f1ab2fe621e09f3e8bdd41ab 100644 --- a/functionsystem/tests/unit/runtime_manager/CMakeLists.txt +++ b/functionsystem/tests/unit/runtime_manager/CMakeLists.txt @@ -23,3 +23,4 @@ add_subdirectory(config) add_subdirectory(log) add_subdirectory(debug) add_subdirectory(virtual_env_manager) +add_subdirectory(std_monitor) diff --git a/functionsystem/tests/unit/runtime_manager/executor/runtime_executor_test.cpp b/functionsystem/tests/unit/runtime_manager/executor/runtime_executor_test.cpp index d034c4c786cde997b26a59c2793309355c921aaa..8241abb6d2faeb45342304b8e1facd19d952e8ae 100644 --- a/functionsystem/tests/unit/runtime_manager/executor/runtime_executor_test.cpp +++ b/functionsystem/tests/unit/runtime_manager/executor/runtime_executor_test.cpp @@ -23,6 +23,7 @@ #include "common/constants/constants.h" #include "common/status/status.h" +#include "common/utils/exec_utils.h" #include "common/utils/files.h" #include "common/utils/path.h" #include "gtest/gtest.h" @@ -30,6 +31,7 @@ #include "port/port_manager.h" #include "runtime_manager/healthcheck/health_check.h" #include "runtime_manager/metrics/mock_function_agent_actor.h" +#include "runtime_manager/utils/std_redirector.h" #include "utils/future_test_helper.h" #include "utils/os_utils.hpp" #include "utils/port_helper.h" @@ -1997,6 +1999,109 @@ TEST_F(RuntimeExecutorTest, StartJobEntrypoint_InvalidWorkingDirTest) EXPECT_EQ(response.code(), RUNTIME_MANAGER_WORKING_DIR_FOR_APP_NOTFOUND); } +TEST_F(RuntimeExecutorTest, TestStartRuntimeStdRedirection) +{ + (void)litebus::os::Rm("/home/snuser/instances/test_runtime_id.out"); + runtime_manager::Flags flags; + const char *argv[] = { "/runtime_manager", "--node_id=testid", "--ip=127.0.0.1","--host_ip=127.0.0.1", "--port=32233", + "--runtime_initial_port=500", "--runtime_std_log_dir=instances", + "--runtime_direct_connection_enable=false", + "--enable_separated_redirect_runtime_std=true"}; + flags.ParseFlags(9, argv); + executor_->SetRuntimeConfig(flags); + auto stdOut = litebus::ExecIO::CreatePipeIO(); + auto stdOErr = stdOut; + litebus::Try> s = litebus::Exec::CreateExec( + "echo start; cp a b; echo end;", litebus::None(), litebus::ExecIO::CreateFDIO(STDIN_FILENO), + stdOut, stdOErr); + executor_->StartRuntimeStdRedirection("test_runtime_id", s.Get()->GetOut(), s.Get()->GetErr()); + ASSERT_AWAIT_TRUE([=]() { + auto output = litebus::os::Read("/home/snuser/instances/test_runtime_id.out"); + return output.IsSome() && + (output.Get().find("start") != std::string::npos) && + (output.Get().find("No such file or directory") != std::string::npos) && + (output.Get().find("end") != std::string::npos); + }); + (void)litebus::os::Rm("/home/snuser/instances/test_runtime_id.out"); +} + +// Note: this case run more than 30s, set `export NOT_SKIP_LONG_TESTS=1` when run it, and not run on CI by default +TEST_F(RuntimeExecutorTest, EpollRedirectorStdLogRollingCompressTest) +{ + const char* skip_test = std::getenv("NOT_SKIP_LONG_TESTS"); + if (skip_test == nullptr || std::string(skip_test) != "1") { + GTEST_SKIP() << "Long-running tests are skipped by default"; + } + + // 1.init StdRedirectParam + std::string command = "rm -rf /home/snuser/instances/test_runtime_id*"; + (void)std::system(command.c_str()); + executor_->config_.userLogBufferFlushThreshold_ = 1024; + executor_->config_.userLogAutoFlushIntervalMs_ = 10; + executor_->config_.userLogRollingSizeLimitMb_ = 1; + executor_->config_.userLogRollingFileCountLimit_ = 3; + + // 2.StartRuntimeStdRedirection + runtime_manager::Flags flags; + const char *argv[] = { "/runtime_manager", "--node_id=testid", "--ip=127.0.0.1","--host_ip=127.0.0.1", "--port=32233", + "--runtime_initial_port=500", "--runtime_std_log_dir=instances", + "--runtime_direct_connection_enable=false", + "--enable_separated_redirect_runtime_std=true"}; + flags.ParseFlags(9, argv); + executor_->SetRuntimeConfig(flags); + auto stdOut = litebus::ExecIO::CreatePipeIO(); + auto stdErr = stdOut; + litebus::Try> s = litebus::Exec::CreateExec( + "for i in {1..15000}; do echo output1; cp a b; cp a b; done", litebus::None(), + litebus::ExecIO::CreateFDIO(STDIN_FILENO), stdOut, stdErr); + executor_->StartRuntimeStdRedirection("test_runtime_id", s.Get()->GetOut(), s.Get()->GetErr()); + + // 3.validate + sleep(30); + { + // find rolling compression log files + std::string command = "ls /home/snuser/instances/test_runtime_id* | wc -l"; + auto result = ExecuteCommand(command); + if (!result.error.empty()) { + YRLOG_ERROR("execute command {} failed, error: {}", command, result.error); + return; + } + + YRLOG_INFO("command {} output is {}", command, result.output); + std::istringstream iss(result.output); + std::string line; + std::getline(iss, line); + std::istringstream linestream(line); + int count; + linestream >> count; + EXPECT_FALSE(line.empty()); + EXPECT_TRUE(count == 1); + } + + { + // find rolling compression log files + std::string command = "ls /home/snuser/instances/test_runtime_id*.out.gz | wc -l"; + auto result = ExecuteCommand(command); + if (!result.error.empty()) { + YRLOG_ERROR("execute command {} failed, error: {}", command, result.error); + return; + } + + YRLOG_INFO("command {} output is {}", command, result.output); + std::istringstream iss(result.output); + std::string line; + std::getline(iss, line); + std::istringstream linestream(line); + int count; + linestream >> count; + EXPECT_FALSE(line.empty()); + EXPECT_TRUE(count > 0); + } + + // std::string command = "rm -rf /home/snuser/instances/testid-user_func_std*"; + // (void)std::system(command.c_str()); +} + TEST_F(RuntimeExecutorTest, SetRuntimeEnv_runtime_direct_connection_enable_false) { const std::string tmpFilePath = "/tmp/runtime_executor_entryfile"; diff --git a/functionsystem/tests/unit/runtime_manager/std_monitor/CMakeLists.txt b/functionsystem/tests/unit/runtime_manager/std_monitor/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..edde7fba1fc2ecf6a149528b48f6b0b63f944633 --- /dev/null +++ b/functionsystem/tests/unit/runtime_manager/std_monitor/CMakeLists.txt @@ -0,0 +1,17 @@ +# Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +aux_source_directory(${CMAKE_CURRENT_LIST_DIR} STD_MONITOR_SRCS) + +target_sources(${UNIT_TEST_MODULE} PRIVATE ${STD_MONITOR_SRCS}) \ No newline at end of file diff --git a/functionsystem/tests/unit/runtime_manager/std_monitor/std_monitor_actor_test.cpp b/functionsystem/tests/unit/runtime_manager/std_monitor/std_monitor_actor_test.cpp new file mode 100644 index 0000000000000000000000000000000000000000..d475ce70d4a281c9943f4e771fa6cd1ab5cd778d --- /dev/null +++ b/functionsystem/tests/unit/runtime_manager/std_monitor/std_monitor_actor_test.cpp @@ -0,0 +1,170 @@ +/* +* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include "gtest/gtest.h" +#include "async/async.hpp" +#include "runtime_manager/std_monitor/std_monitor_actor.h" +#include "utils/future_test_helper.h" + +namespace functionsystem::test { +const int DEFAULT_STD_MONITOR_EVENT = EPOLLIN | EPOLLHUP | EPOLLERR; + +class StdMonitorActorTest : public ::testing::Test { +public: + void SetUp() override + { + std::string name = "StdMonitor_" + litebus::uuid_generator::UUID::GetRandomUUID().ToString(); + actor_ = std::make_shared(name); + litebus::Spawn(actor_); + auto result = actor_->Start(); + EXPECT_TRUE(result.Get()); + } + + void TearDown() override + { + if (actor_) { + actor_->Stop(); + litebus::Terminate(actor_->GetAID()); + litebus::Await(actor_->GetAID()); + } + actor_ = nullptr; + } +protected: + std::shared_ptr actor_{ nullptr }; +}; + +// * Test case for adding valid FD +// * 1. Create a valid pipe +// * 2. Add read fd to monitor +// * 3. Verify callback registration +TEST_F(StdMonitorActorTest, AddValidFdWithCallback) { + int pipeFds[2]; + ASSERT_EQ(pipe(pipeFds), 0); + + bool callbackTriggered = false; + auto callback = [&callbackTriggered](int fd, int event) { + callbackTriggered = true; + }; + + auto addResult = litebus::Async(actor_->GetAID(), &StdMonitorActor::AddFd, pipeFds[0], + DEFAULT_STD_MONITOR_EVENT, callback).Get(); + EXPECT_TRUE(addResult); + + // Trigger event by writing to pipe + char data[] = "test"; + write(pipeFds[1], data, sizeof(data)); + + // Wait for event processing + ASSERT_AWAIT_TRUE([&callbackTriggered]() -> bool { return callbackTriggered == true; }); + + close(pipeFds[0]); + close(pipeFds[1]); +} + +// * Test case for invalid fd handling +// * 1. Test negative fd +// * 2. Test fd that doesn't exist +TEST_F(StdMonitorActorTest, AddInvalidFdVariants) { + // Test negative FD + auto result1 = litebus::Async(actor_->GetAID(), &StdMonitorActor::AddFd, -1, + DEFAULT_STD_MONITOR_EVENT, nullptr).Get(); + EXPECT_FALSE(result1); + + // Test non-existing FD + auto result3 = litebus::Async(actor_->GetAID(), &StdMonitorActor::AddFd, 9999, + DEFAULT_STD_MONITOR_EVENT, nullptr).Get(); + EXPECT_FALSE(result3); +} + +// * Test case for multiple fd monitoring +// * 1. Add three different fds +// * 2. Trigger each fd separately +// * 3. Verify callbacks are called in order +TEST_F(StdMonitorActorTest, MultipleFdMonitoring) { + int fd1[2], fd2[2], fd3[2]; + pipe(fd1); + pipe(fd2); + pipe(fd3); + + std::vector callbacksTriggered(3, false); + auto cb1 = [&callbacksTriggered](int, int) { callbacksTriggered[0] = true; }; + auto cb2 = [&callbacksTriggered](int, int) { callbacksTriggered[1] = true; }; + auto cb3 = [&callbacksTriggered](int, int) { callbacksTriggered[2] = true; }; + + litebus::Async(actor_->GetAID(), &StdMonitorActor::AddFd, fd1[0], DEFAULT_STD_MONITOR_EVENT, cb1).Get(); + litebus::Async(actor_->GetAID(), &StdMonitorActor::AddFd, fd2[0], DEFAULT_STD_MONITOR_EVENT, cb2).Get(); + litebus::Async(actor_->GetAID(), &StdMonitorActor::AddFd, fd3[0], DEFAULT_STD_MONITOR_EVENT, cb3).Get(); + + // Trigger each FD + write(fd1[1], "1", 1); + write(fd2[1], "2", 1); + write(fd3[1], "3", 1); + + for (auto&& value : callbacksTriggered) { + ASSERT_AWAIT_TRUE([&value]() -> bool { return value == true; }); + } + + close(fd1[0]); close(fd1[1]); + close(fd2[0]); close(fd2[1]); + close(fd3[0]); close(fd3[1]); +} + +// * Test case for fd removal +// * 1. Add fd and then remove it --> success +// * 2. Remove invalid fd --> fail +// * 2. Remove a non-existing fd --> success(may have been already removed) +TEST_F(StdMonitorActorTest, FdRemovalValidation) { + // 1.remove a real fd + int pipeFds[2]; + pipe(pipeFds); + + auto addResult = litebus::Async(actor_->GetAID(), &StdMonitorActor::AddFd, pipeFds[0], + DEFAULT_STD_MONITOR_EVENT, nullptr).Get(); + EXPECT_TRUE(addResult); + + auto removeResult = litebus::Async(actor_->GetAID(), &StdMonitorActor::RemoveFd, pipeFds[0]).Get(); + EXPECT_TRUE(removeResult); + close(pipeFds[0]); + close(pipeFds[1]); + + // 2.remove a invalid fd + EXPECT_FALSE(actor_->RemoveFd(-1).Get()); + + // 3.remove a non-existing + EXPECT_TRUE(actor_->RemoveFd(9999).Get()); +} + +// * Test case for eventfd stop mechanism +// * 1. Trigger stop event +// * 2. Verify run loop exit by adding a valid fd --> fail +TEST_F(StdMonitorActorTest, StopEventfdMechanism) { + actor_->Stop(); + + // add a valid fd + int pipeFds[2]; + pipe(pipeFds); + + auto addResult = litebus::Async(actor_->GetAID(), &StdMonitorActor::AddFd, pipeFds[0], + DEFAULT_STD_MONITOR_EVENT, nullptr).Get(); + EXPECT_FALSE(addResult); + litebus::Terminate(actor_->GetAID()); + litebus::Await(actor_->GetAID()); +} + +} // namespace functionsystem::test diff --git a/functionsystem/tests/unit/runtime_manager/std_monitor/std_monitor_test.cpp b/functionsystem/tests/unit/runtime_manager/std_monitor/std_monitor_test.cpp new file mode 100644 index 0000000000000000000000000000000000000000..adf994a9f59da047e8c051aef5cf7a48103f01f0 --- /dev/null +++ b/functionsystem/tests/unit/runtime_manager/std_monitor/std_monitor_test.cpp @@ -0,0 +1,83 @@ +/* +* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include "gtest/gtest.h" +#include "runtime_manager/std_monitor/std_monitor.h" +#include "runtime_manager/utils/utils.h" + +using namespace functionsystem::runtime_manager; + +namespace functionsystem::test { +const int DEFAULT_STD_MONITOR_EVENT = EPOLLIN | EPOLLET | EPOLLHUP | EPOLLERR; + +class StdMonitorTest : public ::testing::Test { +public: + void SetUp() override + { + stdMonitor_ = std::make_shared(); + stdMonitor_->Start(); + } + + void TearDown() override + { + if (stdMonitor_) { + stdMonitor_->Stop(); + stdMonitor_ = nullptr; + } + } +protected: + std::shared_ptr stdMonitor_{ nullptr }; +}; + +TEST_F(StdMonitorTest, AddValidFd) +{ + int pipeFds[2]; + ASSERT_EQ(pipe(pipeFds), 0); + EXPECT_TRUE(stdMonitor_->AddFd(pipeFds[0], DEFAULT_STD_MONITOR_EVENT, nullptr).Get()); + close(pipeFds[0]); + close(pipeFds[1]); +} + +TEST_F(StdMonitorTest, AddInvalidFd) +{ + // 1.fd = -1 + EXPECT_FALSE(stdMonitor_->AddFd(-1, DEFAULT_STD_MONITOR_EVENT, nullptr).Get()); + + // 2.fd = 9999 -- non-existing fd + EXPECT_FALSE(stdMonitor_->AddFd(9999, DEFAULT_STD_MONITOR_EVENT, nullptr).Get()); +} + +TEST_F(StdMonitorTest, RemoveFd) +{ + // 1.remove a real fd + int pipeFds[2]; + ASSERT_EQ(pipe(pipeFds), 0); + EXPECT_TRUE(stdMonitor_->AddFd(pipeFds[0], DEFAULT_STD_MONITOR_EVENT, nullptr).Get()); + EXPECT_TRUE(stdMonitor_->RemoveFd(pipeFds[0]).Get()); + close(pipeFds[0]); + close(pipeFds[1]); + + // 2.remove a invalid fd + EXPECT_FALSE(stdMonitor_->RemoveFd(-1).Get()); + + // 3.remove a non-existing fd (may have been already removed) + EXPECT_TRUE(stdMonitor_->RemoveFd(9999).Get()); +} + +} // namespace functionsystem::test diff --git a/scripts/deploy/function_system/install.sh b/scripts/deploy/function_system/install.sh index 0e4d7652a16f28b34690021a130e4fd3b7e9c3e6..d05e4be51a7c8974ecaf299137f6492d0123f96a 100644 --- a/scripts/deploy/function_system/install.sh +++ b/scripts/deploy/function_system/install.sh @@ -322,6 +322,11 @@ function install_function_agent_and_runtime_manager_in_the_same_process() { --runtime_max_log_file_num="${RUNTIME_LOG_ROLLING_MAX_FILES}" \ --runtime_config_dir="${RUNTIME_HOME_DIR}/service/cpp/config/" \ --enable_separated_redirect_runtime_std="${SEPARATED_REDIRECT_RUNTIME_STD}" \ + --user_log_auto_flush_interval_ms="${USER_LOG_AUTO_FLUSH_INTERVAL_MS}" \ + --user_log_buffer_flush_threshold="${USER_LOG_BUFFER_FLUSH_THRESHOLD}" \ + --user_log_rolling_size_limit_mb="${USER_LOG_MAX_ROLLING_FILE_SIZE_MB}" \ + --user_log_rolling_file_count_limit="${USER_LOG_MAX_ROLLING_LOG_FILE_NUM}" \ + --npu_collection_enable="${NPU_COLLECTION_ENABLE}" \ --user_log_export_mode="${USER_LOG_EXPORT_MODE}" \ --npu_collection_mode="${NPU_COLLECTION_MODE}" \ --gpu_collection_enable="${GPU_COLLECTION_ENABLE}" \