diff --git a/README.md b/README.md index c0e863eb1d0505273dc67d05e09c8026f52e4c7a..6a80e22ca962c103b620b93f462fd3271861e262 100644 --- a/README.md +++ b/README.md @@ -1,94 +1,261 @@ -[查看中文](./README_CN.md) +![](./docs/source_zh_cn/getting-started/image/logo-large.png) +openYuanrong 是一个 Serverless 分布式计算引擎,致力于以一套统一 Serverless 架构支持 AI、大数据、微服务等各类分布式应用。它提供多语言函数编程接口,以单机编程体验简化分布式应用开发;提供分布式动态调度和数据共享等能力,实现分布式应用的高性能运行和集群的高效资源利用。 - +## 简介 -- [What Is yr-datasystem](#what-is-yr-datasystem) -- [Installation](#installation) - - [Pip mode method installation](#pip-mode-method-installation) - - [Source code compilation installation](#source-code-compilation-installation) -- [Deployment](#deployment) - - [Process deployment](#process-deployment) - - [Kubernetes deployment](#kubernetes-deployment) -- [Quickstart](#quickstart) -- [Docs](#docs) -- [License](#license) +![](./docs/source_zh_cn/getting-started/image/introduction.png) - +openYuanrong 由多语言函数运行时、函数系统和数据系统组成,支持按需灵活单独或组合使用。 -## What Is yr-datasystem +- **多语言函数运行时**:提供函数分布式编程,支持 Python、Java、C++ 语言,实现类单机编程高性能分布式运行。 +- **函数系统**:提供大规模分布式动态调度,支持函数实例极速弹性扩缩和跨节点迁移,实现集群资源高效利用。 +- **数据系统**:提供异构分布式多级缓存,支持 Object、Stream 语义,实现函数实例间高性能数据共享及传递。 -yuanrong-datasystem is a distributed data caching system that leverages the HBM/DRAM/SSD resources of a compute cluster to build a multi-level cache close to computation, improving data access performance in scenarios such as model training and inference, big data, and microservices. +openYuanrong 分为三个代码仓库:[yuanrong-runtime](https://gitee.com/openeuler/yuanrong-runtime) 对应多语言函数运行时;[yuanrong-functionsystem](https://gitee.com/openeuler/yuanrong-functionsystem) 对应函数系统;[yuanrong-datasystem](https://gitee.com/openeuler/yuanrong-datasystem) 对应数据系统,即当前代码仓。 -![](./docs/source_en/getting-started/image/logical_architecture.png) +**数据系统**是 openYuanrong 的核心概念抽象,是一款专为 AI 训推场景设计的分布式异构缓存系统。支持 HBM/DDR/SSD 异构介质池化缓存及 NPU 间异步并发高效数据传输,用于分布式 KVCache 缓存、模型参数缓存、高性能 replaybuffer 等场景。 -yuanrong-datasystem consists of three components: +yuanrong-datasystem 的主要特性包括: -- **Multi-language SDK**: It provides Python/C++ language interfaces, encapsulating various semantics such as heterogeneous objects, key-value (KV), and objects, supporting fast data read and write for business implementation. - - **heterogeneous object**: Based on the NPU card's HBM memory abstraction heterogeneous object interface, it enables high-speed direct data transmission between Ascend NPU cards. It also provides H2D (Host to Device) and D2H (Device to Host) high-speed migration interfaces, facilitating fast data transfer between DRAM and HBM. - - **KV**: Implements zero-copy Key-Value data read/write via shared memory for high-performance caching, with support for data reliability semantics through external component integration. - - **object**: Abstracts data objects using host-side shared memory, implements reference-counted lifecycle management, and encapsulates shared memory as buffers with direct pointer-based read/write access. +- **高性能分布式多级缓存**:基于 DRAM/SSD 构建分布式多级缓存,应用实例通过共享内存免拷贝读写 DRAM 数据,并提供高性能 H2D(host to device)/D2H(device to host) 接口,实现 HBM 与 DRAM 之间快速 swap。 +- **NPU 间高效数据传输**:将 NPU 的 HBM 抽象为异构对象,自动协调 NPU 间 HCCL 收发顺序,实现简单易用的卡间数据异步并发传输。并支持P2P传输负载均衡策略,充分利用卡间链路带宽。 +- **灵活的生命周期管理**:支持设置 TTL、LRU 缓存淘汰以及 delete 接口等多种生命周期管理策略,数据生命周期既可由数据系统管理,也可交由上层应用管理,提供更高的灵活性。 +- **热点数据多副本**:数据跨节点读取时自动在本地保存副本,支撑热点数据高效访问。本地副本使用 LRU 策略自动淘汰。 +- **多种数据可靠性策略**:支持 write_through、wirte_back 及 none 多种持久化策略,满足不同场景的数据可靠性需求。 +- **数据一致性**:支持 Causal 及 PRAM 两种数据一致性模型,用户可按需选择,实现性能和数据一致性的平衡。 +- **数据发布订阅**:支持数据订阅发布,解耦数据的生产者(发布者)和消费者(订阅者),实现数据的异步传输与共享。 +- **高可靠高可用**:支持分布式元数据管理,实现系统水平线性扩展。支持元数据可靠性,支持动态资源伸缩自动迁移数据,实现系统高可用。 -- **worker**: The core component of yuanrong-datasystem, responsible for managing DRAM/SSD and metadata while providing near-computing caching capabilities. - - **Zero-Copy Shared Memory**: Provides data read/write interfaces through shared memory, eliminating data copying between the SDK and worker to enhance performance. - - **NPU Concurrent Communication**: The heterogeneous object abstraction enables direct inter-NPU communication with automatic HCCL send/receive sequencing, achieving concurrent data transfers across both local NPUs and remote nodes via H2H access. - - **P2P Data Distribution**: Implements a load-balancing strategy for P2P transfers during large-scale data replication, enabling new data receivers to serve as providers and fully utilizing inter-card link bandwidth. - - **Metadata Management**: Implements distributed metadata management to achieve linear horizontal scaling and eliminate single-point bottlenecks. Supports metadata reliability for enhanced cluster availability, while providing data publish-subscribe capabilities. - - **Multi-Level Cache Eviction**: Implements LRU-based cache replacement across tiers, maintaining hot data in memory, warm data on disk, and cold data in secondary cache. - - **Data Management**: Provides multiple data lifecycle management capabilities including reference counting and TTL. Controls read-write consistency with support for multiple consistency levels. Manages data replicas across nodes, employing multi-copy caching for hot data to improve read efficiency. +### yuanrong-datasystem 适用场景 -- **Cluster Management**: Leverages ETCD to enable node discovery/health monitoring, supporting fault recovery and online scaling (both expansion and contraction). +- **LLM 长序列推理 KVCache**:基于异构对象提供分布式多级缓存 (HBM/DRAM/SSD) 和高吞吐 D2D/H2D/D2H 访问能力,构建分布式 KV Cache,实现 Prefill 阶段的 KVCache 缓存以及 Prefill/Decode 实例间 KV Cache 快速传递,提升推理吞吐。 +- **模型推理实例 M->N 快速弹性**:利用异构对象的卡间直通及 P2P 数据分发能力实现模型参数快速复制。 +- **强化学习模型参数重排**:利用异构对象的卡间直通传输能力,快速将模型参数从训练侧同步到推理侧。 +- **训练场景 CheckPoint 快速保存及加载**:基于 KV 接口快速写 Checkpoint,并支持将数据持久化到二级缓存保证数据可靠性。Checkpoint恢复时各节点将 Checkpoint 分片快速加载到异构对象中,利用异构对象的卡间直通传输及 P2P 数据分发能力,快速将 Checkpoint 传递到各节点 HBM。 -![Deployment View of yuanrong-datasystem](./docs/source_en/getting-started/image/deployment.png) +### yuanrong-datasystem 架构 -Deployment Architecture of yuanrong-datasystem (as illustrated in the diagram above): +![](./docs/source_zh_cn/getting-started/image/logical_architecture.png) -- The cluster requires **ETCD** deployment for cluster management. -- Each node must deploy a **worker** process registered with ETCD. -- The **SDK** is integrated into user processes and communicates with the local worker. +yuanrong-datasystem 由三个部分组成: -Data Transmission: -- **SDK ↔ worker**: Shared memory for data read/write. -- **worker ↔ worker**: TCP/RDMA (current version supports TCP only; RDMA support planned). -- **HBM Heterogeneous Objects**: Direct cross-device transmission via HCCS/RoCE. +- **多语言SDK**:提供 Python/C++ 语言接口,封装 heterogeneous object 及 KV 接口,支撑业务实现数据快速读写。提供两种类型接口: + - **heterogeneous object**:基于 NPU 卡的 HBM 内存抽象异构对象接口,实现昇腾 NPU 卡间数据高速直通传输。同时提供 H2D/D2H 高速迁移接口,实现数据快速在 DRAM/HBM 之间传输。 + - **KV**:基于共享内存实现免拷贝的 KV 接口,实现高性能数据缓存,支持通过对接外部组件提供数据可靠性语义。 -## Installation +- **worker**:yuanrong-datasystem 的核心组件,用于分配管理 DRAM/SSD 资源以及元数据,提供分布式多级缓存能力。 -### Pip mode method installation +- **集群管理**:依赖 ETCD,实现节点发现/健康检测,支持故障恢复及在线扩缩容。 -Install the version available on PyPI: +![](./docs/source_zh_cn/getting-started/image/deployment.png) -```bash -pip install yr-datasystem -``` +yuanrong-datasystem 的部署视图如上图所示: -To install a custom version, refer to the documentation [yr-datasystem custom version installation](./docs/source_zh_cn/getting-started/install.md#安装自定义版本) +- 需部署 ETCD 用于集群管理。 +- 每个节点需部署 worker 进程并注册到 ETCD。 +- SDK 集成到用户进程中并与同节点的 worker 通信。 -### Source code compilation installation +各组件间的数据传输协议如下: -To install yr-datasystem using the source code compilation method, refer to the documentation: [Source Code Compilation Installation of yr-datasystem](./docs/source_zh_cn/getting-started/install.md#源码编译方式安装yr-datasystem版本). +- SDK 与 worker 之间通过共享内存读写数据。 +- worker 和 worker 之间通过 TCP/RDMA 传输数据(当前版本仅支持 TCP,后续版本支持 RDMA )。 +- 异构对象 HBM 之间通过 HCCS/RoCE 卡间直通传输数据。 -## Deployment +## 入门 -### Process deployment +### 安装 yuanrong-datasystem -yr-datasystem cluster can be quickly deployed using the ds-cli tool. For more information, see [yr-datasystem Process Deployment](./docs/source_zh_cn/getting-started/deploy.md#yr-datasystem进程部署). +#### pip 方式安装 -### Kubernetes deployment +- 安装 yuanrong-datasystem 完整发行版(包含Python SDK、C++ SDK以及命令行工具): + ```bash + pip install yuanrong-datasystem + ``` -yr-datasystem also supports containerized deployment on Kubernetes. For more information, see [yr-datasystem Kubernetes Deployment](./docs/source_zh_cn/getting-started/deploy.md#yr-datasystem-kubernetes部署). +- 仅安装 yuanrong-datasystem Python SDK(不包含C++ SDK以及命令行工具): + ```bash + pip install yuanrong-datasystem-sdk + ``` -## Quickstart +#### 源码编译方式安装 -For a quick introduction to heterogeneous objects, key-value (KV), and object semantics, refer to the following documentation. -- [Heterogeneous object quickstart](./docs/source_zh_cn/getting-started/overview.md#异构对象) -- [KV quickstart](./docs/source_zh_cn/getting-started/overview.md#kv) -- [Object quickstart](./docs/source_zh_cn/getting-started/overview.md#object) +使用源码编译方式安装 yuanrong-datasystem 可以参考文档:[源码编译安装 yuanrong-datasystem](./docs/source_zh_cn/getting-started/install.md#源码编译方式安装yuanrong-datasystem版本) -## Docs +### 部署 yuanrong-datasystem -More details about installation guide, tutorials and APIs, please see the [User Documentation](docs). +#### 进程部署 -## License +- 准备ETCD + + yuanrong-datasystem 的集群管理依赖 ETCD,请先在后台启动单节点 ETCD(示例端口 2379): + ```bash + etcd --listen-client-urls http://0.0.0.0:2379 \ + --advertise-client-urls http://localhost:2379 + ``` +- 一键启动集群 + + 安装 yuanrong-datasystem 完整发行版后,即可通过随包自带的 dscli 命令行工具一键完成集群部署。启动一个监听端口号为 31501 的单机集群: + ```bash + dscli start -w --worker_address "127.0.0.1:31501" --etcd_address "127.0.0.1:2379" + ``` + +- 集群卸载 + ```bash + dscli stop --worker_address "127.0.0.1:31501" + ``` + +更多进程部署参数与部署方式请参考文档:[yuanrong-datasystem 进程部署](./docs/source_zh_cn/getting-started/deploy.md#yuanrong-datasystem进程部署) + +#### Kubernetes 部署 + +yuanrong-datasystem 还提供了基于 Kubernetes 容器化部署方式,部署前请确保部署环境集群已就绪 Kubernetes、Helm 及可访问的 ETCD 集群。 + +- 获取 yuanrong-datasystem helm chart 包 + + 安装 yuanrong-datasystem 完整发行版后,即可通过随包自带的 dscli 命令行工具在当前路径下快速获取 helm chart 包: + ``` + dscli generate_helm_chart -o ./ + ``` + +- 编辑集群部署配置 + + yuanrong-datasystem 通过 ./datasystem/values.yaml 文件进行集群相关配置,其中必配项如下: + + ```yaml + global: + # 其他配置项... + + # 镜像仓地址 + imageRegistry: "swr.cn-south-1.myhuaweicloud.com/openeuler/" + # 镜像名字和镜像tag + images: + datasystem: "yr-datasystem:0.5.0-alpha" + + etcd: + # ETCD集群地址 + etcdAddress: "127.0.0.1:2379" + ``` + +- 集群部署 + + Helm 会提交 DaemonSet,按节点依次拉起 yuanrong-datasystem 实例: + + ```bash + helm install yr_datasystem ./datasystem + ``` + +- 集群卸载 + + ```bash + helm uninstall yr_datasystem + ``` + +更多 yuanrong-datasystem Kubernetes 高级参数配置请参考文档:[yuanrong-datasystem Kubernetes 部署](./docs/source_zh_cn/getting-started/deploy.md#yuanrong-datasystem-kubernetes部署) + +### 代码样例 + +- 异构对象 + + 通过异构对象接口实现 HBM 数据零拷贝发布/订阅 + + ```python + import acl + import random + from datasystem.ds_client import DsClient + + def random_str(slen=10): + seed = "1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!@#%^*()_+=-" + sa = [] + for _ in range(slen): + sa.append(random.choice(seed)) + return ''.join(sa) + + # hetero_dev_publish and hetero_dev_subscribe must be executed in different processes + # because they need to be bound to different NPUs. + def hetero_dev_publish(): + client = DsClient("127.0.0.1", 31501) + client.init() + + acl.init() + device_idx = 1 + acl.rt.set_device(device_idx) + + key_list = [ 'key1', 'key2', 'key3' ] + data_size = 1024 * 1024 + test_value = random_str(data_size) + + in_data_blob_list = [] + for _ in key_list: + tmp_batch_list = [] + for _ in range(4): + dev_ptr, _ = acl.rt.malloc(data_size, 0) + acl.rt.memcpy(dev_ptr, data_size, acl.util.bytes_to_ptr(test_value.encode()), data_size, 1) + blob = Blob(dev_ptr, data_size) + tmp_batch_list.append(blob) + blob_list = DeviceBlobList(device_idx, tmp_batch_list) + in_data_blob_list.append(blob_list) + pub_futures = client.hetero().dev_publish(key_list, in_data_blob_list) + for future in pub_futures: + future.get() + + def hetero_dev_subscribe(): + client = DsClient("127.0.0.1", 31501) + client.init() + + acl.init() + device_idx = 2 + acl.rt.set_device(device_idx) + + key_list = [ 'key1', 'key2', 'key3' ] + data_size = 1024 * 1024 + out_data_blob_list = [] + for _ in key_list: + tmp_batch_list = [] + for _ in range(4): + dev_ptr, _ = acl.rt.malloc(data_size, 0) + blob = Blob(dev_ptr, data_size) + tmp_batch_list.append(blob) + blob_list = DeviceBlobList(device_idx, tmp_batch_list) + out_data_blob_list.append(blob_list) + sub_futures = client.hetero().dev_subscribe(key_list, out_data_blob_list) + for future in sub_futures: + future.get() + ``` + +- KV + + 通过 KV 接口,将任意二进制数据以键值对形式写入全局分布式缓存: + + ```python + from datasystem.ds_client import DsClient + + client = DsClient("127.0.0.1", 31501) + client.init() + + key = "key" + expected_val = b"value" + client.kv().set(key, expected_val) + + val = client.kv().get([key]) + assert val[0] == expected_val + + client.kv().delete([key]) + ``` + +## 文档 + +有关 yuanrong-datasystem 安装指南、教程和 API 的更多详细信息,请参阅 [用户文档](docs) + +查看 [openYuanrong 文档](https://pages.openeuler.openatom.cn/openyuanrong/docs/zh-cn/latest/index.html) 了解如何使用 openYuanrong 开发分布式应用。 + +- 安装:`pip install openyuanrong`,[更多安装信息](https://pages.openeuler.openatom.cn/openyuanrong/docs/zh-cn/latest/deploy/installation.html)。 +- [快速入门](https://pages.openeuler.openatom.cn/openyuanrong/docs/zh-cn/latest/getting_started.html) + +## 贡献 + +我们欢迎您对 openYuanrong 做各种形式的贡献,请参阅我们的[贡献者指南](https://pages.openeuler.openatom.cn/openyuanrong/docs/zh-cn/latest/contributor_guide/index.html)。 + +## 许可证 [Apache License 2.0](LICENSE) \ No newline at end of file diff --git a/README_CN.md b/README_CN.md index e824068d7d3dba7bb2bd311f4e013e3e7d4c291f..6a80e22ca962c103b620b93f462fd3271861e262 100644 --- a/README_CN.md +++ b/README_CN.md @@ -1,25 +1,20 @@ -[View English](./README.md) +![](./docs/source_zh_cn/getting-started/image/logo-large.png) - +openYuanrong 是一个 Serverless 分布式计算引擎,致力于以一套统一 Serverless 架构支持 AI、大数据、微服务等各类分布式应用。它提供多语言函数编程接口,以单机编程体验简化分布式应用开发;提供分布式动态调度和数据共享等能力,实现分布式应用的高性能运行和集群的高效资源利用。 -- [yr-datasystem 介绍](#yr-datasystem-介绍) -- [适用场景](#适用场景) -- [架构](#架构) -- [安装](#安装) - - [pip 方式安装](#pip-方式安装) - - [源码编译方式安装](#源码编译方式安装) -- [部署](#部署) - - [进程部署](#进程部署) - - [Kubernetes 部署](#kubernetes-部署) -- [快速入门](#快速入门) -- [文档](#文档) -- [许可证](#许可证) +## 简介 - +![](./docs/source_zh_cn/getting-started/image/introduction.png) -## yr-datasystem 介绍 +openYuanrong 由多语言函数运行时、函数系统和数据系统组成,支持按需灵活单独或组合使用。 -yuanrong-datasystem 是一款专为 AI 训推场景设计的分布式异构缓存系统。支持 HBM/DDR/SSD 异构介质池化缓存及 NPU 间异步并发高效数据传输,用于分布式 KVCache 缓存、模型参数缓存、高性能 replaybuffer 等场景。 +- **多语言函数运行时**:提供函数分布式编程,支持 Python、Java、C++ 语言,实现类单机编程高性能分布式运行。 +- **函数系统**:提供大规模分布式动态调度,支持函数实例极速弹性扩缩和跨节点迁移,实现集群资源高效利用。 +- **数据系统**:提供异构分布式多级缓存,支持 Object、Stream 语义,实现函数实例间高性能数据共享及传递。 + +openYuanrong 分为三个代码仓库:[yuanrong-runtime](https://gitee.com/openeuler/yuanrong-runtime) 对应多语言函数运行时;[yuanrong-functionsystem](https://gitee.com/openeuler/yuanrong-functionsystem) 对应函数系统;[yuanrong-datasystem](https://gitee.com/openeuler/yuanrong-datasystem) 对应数据系统,即当前代码仓。 + +**数据系统**是 openYuanrong 的核心概念抽象,是一款专为 AI 训推场景设计的分布式异构缓存系统。支持 HBM/DDR/SSD 异构介质池化缓存及 NPU 间异步并发高效数据传输,用于分布式 KVCache 缓存、模型参数缓存、高性能 replaybuffer 等场景。 yuanrong-datasystem 的主要特性包括: @@ -32,14 +27,14 @@ yuanrong-datasystem 的主要特性包括: - **数据发布订阅**:支持数据订阅发布,解耦数据的生产者(发布者)和消费者(订阅者),实现数据的异步传输与共享。 - **高可靠高可用**:支持分布式元数据管理,实现系统水平线性扩展。支持元数据可靠性,支持动态资源伸缩自动迁移数据,实现系统高可用。 -## 适用场景 +### yuanrong-datasystem 适用场景 - **LLM 长序列推理 KVCache**:基于异构对象提供分布式多级缓存 (HBM/DRAM/SSD) 和高吞吐 D2D/H2D/D2H 访问能力,构建分布式 KV Cache,实现 Prefill 阶段的 KVCache 缓存以及 Prefill/Decode 实例间 KV Cache 快速传递,提升推理吞吐。 - **模型推理实例 M->N 快速弹性**:利用异构对象的卡间直通及 P2P 数据分发能力实现模型参数快速复制。 - **强化学习模型参数重排**:利用异构对象的卡间直通传输能力,快速将模型参数从训练侧同步到推理侧。 - **训练场景 CheckPoint 快速保存及加载**:基于 KV 接口快速写 Checkpoint,并支持将数据持久化到二级缓存保证数据可靠性。Checkpoint恢复时各节点将 Checkpoint 分片快速加载到异构对象中,利用异构对象的卡间直通传输及 P2P 数据分发能力,快速将 Checkpoint 传递到各节点 HBM。 -## 架构 +### yuanrong-datasystem 架构 ![](./docs/source_zh_cn/getting-started/image/logical_architecture.png) @@ -67,39 +62,199 @@ yuanrong-datasystem 的部署视图如上图所示: - worker 和 worker 之间通过 TCP/RDMA 传输数据(当前版本仅支持 TCP,后续版本支持 RDMA )。 - 异构对象 HBM 之间通过 HCCS/RoCE 卡间直通传输数据。 -## 安装 +## 入门 + +### 安装 yuanrong-datasystem + +#### pip 方式安装 + +- 安装 yuanrong-datasystem 完整发行版(包含Python SDK、C++ SDK以及命令行工具): + ```bash + pip install yuanrong-datasystem + ``` + +- 仅安装 yuanrong-datasystem Python SDK(不包含C++ SDK以及命令行工具): + ```bash + pip install yuanrong-datasystem-sdk + ``` + +#### 源码编译方式安装 + +使用源码编译方式安装 yuanrong-datasystem 可以参考文档:[源码编译安装 yuanrong-datasystem](./docs/source_zh_cn/getting-started/install.md#源码编译方式安装yuanrong-datasystem版本) + +### 部署 yuanrong-datasystem + +#### 进程部署 + +- 准备ETCD + + yuanrong-datasystem 的集群管理依赖 ETCD,请先在后台启动单节点 ETCD(示例端口 2379): + ```bash + etcd --listen-client-urls http://0.0.0.0:2379 \ + --advertise-client-urls http://localhost:2379 + ``` +- 一键启动集群 + + 安装 yuanrong-datasystem 完整发行版后,即可通过随包自带的 dscli 命令行工具一键完成集群部署。启动一个监听端口号为 31501 的单机集群: + ```bash + dscli start -w --worker_address "127.0.0.1:31501" --etcd_address "127.0.0.1:2379" + ``` + +- 集群卸载 + ```bash + dscli stop --worker_address "127.0.0.1:31501" + ``` + +更多进程部署参数与部署方式请参考文档:[yuanrong-datasystem 进程部署](./docs/source_zh_cn/getting-started/deploy.md#yuanrong-datasystem进程部署) + +#### Kubernetes 部署 + +yuanrong-datasystem 还提供了基于 Kubernetes 容器化部署方式,部署前请确保部署环境集群已就绪 Kubernetes、Helm 及可访问的 ETCD 集群。 -### pip 方式安装 +- 获取 yuanrong-datasystem helm chart 包 -安装 PyPI 上的版本: -```bash -pip install yr-datasystem -``` + 安装 yuanrong-datasystem 完整发行版后,即可通过随包自带的 dscli 命令行工具在当前路径下快速获取 helm chart 包: + ``` + dscli generate_helm_chart -o ./ + ``` -安装自定义版本,可以参考文档:[安装 yr-datasystem 自定义版本](./docs/source_zh_cn/getting-started/install.md#安装自定义版本) +- 编辑集群部署配置 -### 源码编译方式安装 + yuanrong-datasystem 通过 ./datasystem/values.yaml 文件进行集群相关配置,其中必配项如下: + + ```yaml + global: + # 其他配置项... -使用源码编译方式安装 yr-datasystem 可以参考文档:[源码编译安装 yr-datasystem](./docs/source_zh_cn/getting-started/install.md#源码编译方式安装yr-datasystem版本) + # 镜像仓地址 + imageRegistry: "swr.cn-south-1.myhuaweicloud.com/openeuler/" + # 镜像名字和镜像tag + images: + datasystem: "yr-datasystem:0.5.0-alpha" + + etcd: + # ETCD集群地址 + etcdAddress: "127.0.0.1:2379" + ``` -## 部署 +- 集群部署 -### 进程部署 + Helm 会提交 DaemonSet,按节点依次拉起 yuanrong-datasystem 实例: -yr-datasystem 可基于 dscli 工具快速部署集群,参考文档:[yr-datasystem 进程部署](./docs/source_zh_cn/getting-started/deploy.md#yr-datasystem进程部署) + ```bash + helm install yr_datasystem ./datasystem + ``` -### Kubernetes 部署 +- 集群卸载 -yr-datasystem 还提供了基于 Kubernetes 容器化部署方式,参考文档:[yr-datasystem Kubernetes 部署](./docs/source_zh_cn/getting-started/deploy.md#yr-datasystem-kubernetes部署) + ```bash + helm uninstall yr_datasystem + ``` -## 快速入门 +更多 yuanrong-datasystem Kubernetes 高级参数配置请参考文档:[yuanrong-datasystem Kubernetes 部署](./docs/source_zh_cn/getting-started/deploy.md#yuanrong-datasystem-kubernetes部署) -heterogeneous object、KV 和 object 语义的快速入门,可参考以下文档。 -- [heterogeneous object 快速入门](./docs/source_zh_cn/getting-started/overview.md#异构对象) -- [KV 快速入门](./docs/source_zh_cn/getting-started/overview.md#kv) +### 代码样例 + +- 异构对象 + + 通过异构对象接口实现 HBM 数据零拷贝发布/订阅 + + ```python + import acl + import random + from datasystem.ds_client import DsClient + + def random_str(slen=10): + seed = "1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!@#%^*()_+=-" + sa = [] + for _ in range(slen): + sa.append(random.choice(seed)) + return ''.join(sa) + + # hetero_dev_publish and hetero_dev_subscribe must be executed in different processes + # because they need to be bound to different NPUs. + def hetero_dev_publish(): + client = DsClient("127.0.0.1", 31501) + client.init() + + acl.init() + device_idx = 1 + acl.rt.set_device(device_idx) + + key_list = [ 'key1', 'key2', 'key3' ] + data_size = 1024 * 1024 + test_value = random_str(data_size) + + in_data_blob_list = [] + for _ in key_list: + tmp_batch_list = [] + for _ in range(4): + dev_ptr, _ = acl.rt.malloc(data_size, 0) + acl.rt.memcpy(dev_ptr, data_size, acl.util.bytes_to_ptr(test_value.encode()), data_size, 1) + blob = Blob(dev_ptr, data_size) + tmp_batch_list.append(blob) + blob_list = DeviceBlobList(device_idx, tmp_batch_list) + in_data_blob_list.append(blob_list) + pub_futures = client.hetero().dev_publish(key_list, in_data_blob_list) + for future in pub_futures: + future.get() + + def hetero_dev_subscribe(): + client = DsClient("127.0.0.1", 31501) + client.init() + + acl.init() + device_idx = 2 + acl.rt.set_device(device_idx) + + key_list = [ 'key1', 'key2', 'key3' ] + data_size = 1024 * 1024 + out_data_blob_list = [] + for _ in key_list: + tmp_batch_list = [] + for _ in range(4): + dev_ptr, _ = acl.rt.malloc(data_size, 0) + blob = Blob(dev_ptr, data_size) + tmp_batch_list.append(blob) + blob_list = DeviceBlobList(device_idx, tmp_batch_list) + out_data_blob_list.append(blob_list) + sub_futures = client.hetero().dev_subscribe(key_list, out_data_blob_list) + for future in sub_futures: + future.get() + ``` + +- KV + + 通过 KV 接口,将任意二进制数据以键值对形式写入全局分布式缓存: + + ```python + from datasystem.ds_client import DsClient + + client = DsClient("127.0.0.1", 31501) + client.init() + + key = "key" + expected_val = b"value" + client.kv().set(key, expected_val) + + val = client.kv().get([key]) + assert val[0] == expected_val + + client.kv().delete([key]) + ``` ## 文档 -有关安装指南、教程和 API 的更多详细信息,请参阅[用户文档](docs) + +有关 yuanrong-datasystem 安装指南、教程和 API 的更多详细信息,请参阅 [用户文档](docs) + +查看 [openYuanrong 文档](https://pages.openeuler.openatom.cn/openyuanrong/docs/zh-cn/latest/index.html) 了解如何使用 openYuanrong 开发分布式应用。 + +- 安装:`pip install openyuanrong`,[更多安装信息](https://pages.openeuler.openatom.cn/openyuanrong/docs/zh-cn/latest/deploy/installation.html)。 +- [快速入门](https://pages.openeuler.openatom.cn/openyuanrong/docs/zh-cn/latest/getting_started.html) + +## 贡献 + +我们欢迎您对 openYuanrong 做各种形式的贡献,请参阅我们的[贡献者指南](https://pages.openeuler.openatom.cn/openyuanrong/docs/zh-cn/latest/contributor_guide/index.html)。 ## 许可证 diff --git a/build.sh b/build.sh index b58767f7dd16f733331ae28a133d1abd5ba19749..adbe4bb904dd03895aa6d50bef49cdc3052b21c5 100755 --- a/build.sh +++ b/build.sh @@ -59,7 +59,7 @@ Options: For debug code: -p Generate perf point logs, choose from: on/off, default: off. -s Use strip tool to export the symbol table as sym and erase symbols based on it, choose from: on/off, - default: off. + default: on. -S Use Google Sanitizers tools to detect bugs. Choose from off/address/thread/undefined, if set the value to 'address' enable AddressSanitizer, if set the value to 'thread' enable ThreadSanitizer, @@ -133,7 +133,7 @@ function init_default_opts() { export LLT_TIMEOUT_S=80 # For debug - export ENABLE_STRIP="off" + export ENABLE_STRIP="on" export ENABLE_PERF="off" export USE_SANITIZER="off" diff --git a/cmake/util.cmake b/cmake/util.cmake index cf0175a1e4afc71fc8e5da60facfe1af3c12db42..5875159f040c3a229805627ec17d5f9ef8d16a70 100644 --- a/cmake/util.cmake +++ b/cmake/util.cmake @@ -840,7 +840,7 @@ function(PACKAGE_PYTHON PACKAGE_NAME) set(PY_TAGS "${PY_TAGS}m") endif () string(TOLOWER ${CMAKE_HOST_SYSTEM_NAME}_${CMAKE_HOST_SYSTEM_PROCESSOR} PLATFORM_TAG) - set(TARGET_WHEEL "${PACKAGE_NAME}-${DATASYSTEM_VERSION}-${PY_TAGS}-${PLATFORM_TAG}.whl") + set(TARGET_WHEEL "yr_datasystem_sdk-${DATASYSTEM_VERSION}-${PY_TAGS}-${PLATFORM_TAG}.whl") set(CONFIG_PACKAGE_SCRIPT ${CMAKE_BINARY_DIR}/PackagePythonSDK.cmake) # Generate PackagePythonSDK.cmake to run setup.py configure_file(${CMAKE_SOURCE_DIR}/cmake/scripts/PackagePython.cmake.in diff --git a/docs/source_zh_cn/api/api_python/object_client/ObjectClient/datasystem.object_client.ObjectClient.generate_object_key.rst b/docs/source_zh_cn/api/api_python/object_client/ObjectClient/datasystem.object_client.ObjectClient.generate_object_key.rst index 1a1ba6295ad935c77e768de2d613686ea0da54be..8cd625eb7d48d5ff5db4fac06fb40f31d9e83aaa 100644 --- a/docs/source_zh_cn/api/api_python/object_client/ObjectClient/datasystem.object_client.ObjectClient.generate_object_key.rst +++ b/docs/source_zh_cn/api/api_python/object_client/ObjectClient/datasystem.object_client.ObjectClient.generate_object_key.rst @@ -1,7 +1,7 @@ -datasystem.object_client.ObjectClient.generate_object_id +datasystem.object_client.ObjectClient.generate_object_key ========================================================= -.. py:method:: datasystem.object_client.ObjectClient.generate_object_id(prefix='') +.. py:method:: datasystem.object_client.ObjectClient.generate_object_key(prefix='') 生成一个带数据系统Worker UUID的对象 key。 diff --git a/docs/source_zh_cn/api/api_python/object_client/datasystem.object_client.ObjectClient.rst b/docs/source_zh_cn/api/api_python/object_client/datasystem.object_client.ObjectClient.rst index fe0963182d019f1b0b708f34ca6d51ab8e48fc4a..8f6be4f8a698e9e36707c17d84241f88d313e846 100644 --- a/docs/source_zh_cn/api/api_python/object_client/datasystem.object_client.ObjectClient.rst +++ b/docs/source_zh_cn/api/api_python/object_client/datasystem.object_client.ObjectClient.rst @@ -31,4 +31,4 @@ datasystem.object_client.ObjectClient datasystem.object_client.ObjectClient.g_increase_ref datasystem.object_client.ObjectClient.g_decrease_ref datasystem.object_client.ObjectClient.query_global_ref_num - datasystem.object_client.ObjectClient.generate_object_id \ No newline at end of file + datasystem.object_client.ObjectClient.generate_object_key \ No newline at end of file diff --git a/docs/source_zh_cn/getting-started/image/introduction.png b/docs/source_zh_cn/getting-started/image/introduction.png new file mode 100644 index 0000000000000000000000000000000000000000..f20528db3c434ba1a145bc33824b814b3653bc00 Binary files /dev/null and b/docs/source_zh_cn/getting-started/image/introduction.png differ diff --git a/docs/source_zh_cn/getting-started/image/logical_architecture.png b/docs/source_zh_cn/getting-started/image/logical_architecture.png index 81f2b7d970363993e7a4f2b771b14eb593da8fea..4eb288140bf275f35e87ddbf1579e498ea2594ce 100644 Binary files a/docs/source_zh_cn/getting-started/image/logical_architecture.png and b/docs/source_zh_cn/getting-started/image/logical_architecture.png differ diff --git a/docs/source_zh_cn/getting-started/image/logo-large.png b/docs/source_zh_cn/getting-started/image/logo-large.png new file mode 100644 index 0000000000000000000000000000000000000000..a0338c5022ed3654abe8ee22d37131b2b7ed059d Binary files /dev/null and b/docs/source_zh_cn/getting-started/image/logo-large.png differ diff --git a/example/CMakeLists.txt b/example/CMakeLists.txt index 7ca9b247f353c922ef9a28efea90788c7f62c8df..fdf95c99ffcbe658cd92002035f92661149dfdeb 100644 --- a/example/CMakeLists.txt +++ b/example/CMakeLists.txt @@ -32,13 +32,5 @@ target_link_libraries(ds_example datasystem) add_executable(object_example src/object_cache/object_example.cpp) target_link_libraries(object_example datasystem) -add_executable(state_example src/state_cache/state_example.cpp) -target_link_libraries(state_example datasystem) - add_executable(kv_example src/kv_cache/kv_example.cpp) target_link_libraries(kv_example datasystem) - -if (ENABLE_PERF) - add_executable(perf_example src/perf/perf_demo.cpp) - target_link_libraries(perf_example datasystem) -endif() diff --git a/example/run-example.sh b/example/run-example.sh index f95ada7ccfd3da8f75a9298b9b90c4af996fd422..f32bdde071e65ac9c179223cb913ea273c83be50 100755 --- a/example/run-example.sh +++ b/example/run-example.sh @@ -26,38 +26,6 @@ export PATH=$PATH:/usr/sbin run_hetero="$1" run_perf="$2" -function run_go_example() -{ - echo "Start to run go example" - BUILD_GO_DIR="$(realpath "${EXAMPLE_DIR}/../build/package-go")" - export WORKER_ADDR=$1 - export WORKER_PORT=$2 - export WORKER_ETCD_ADDRESS=$3 - - # Run the golang tests - # We do this here during the example run because we lack an infrastructure for starting - # and stopping the cluster. The example has this built in already. - # In the future we should move this "go test" logic into the unit test paths - export LD_LIBRARY_PATH=${BUILD_GO_DIR}/lib:${LD_LIBRARY_PATH} - echo "Set LD_LIBRARY_PATH=${LD_LIBRARY_PATH} before go example test." - echo "Running golang tests. Using worker ${WORKER_ADDR} on port ${WORKER_PORT}, etcd address: ${WORKER_ETCD_ADDRESS}" - cd ${BUILD_GO_DIR}/common - go test - - cd ${BUILD_GO_DIR}/objectcache - go test - - cd ${BUILD_GO_DIR}/statecache - go test - - echo "Running golang state/object cache demo. Using worker ${WORKER_ADDR} on port ${WORKER_PORT}" - ${BUILD_GO_DIR}/state_cache_client_demo - ${BUILD_GO_DIR}/object_cache_client_demo - - export LD_LIBRARY_PATH=${old_ld_path} - echo "Restore LD_LIBRARY_PATH=${LD_LIBRARY_PATH} after go example test." -} - start_all "${EXAMPLE_DIR}/build" "${ds_output_dir}" echo -e "---- Running the example..." diff --git a/example/src/perf/perf_demo.cpp b/example/src/perf/perf_demo.cpp deleted file mode 100644 index 04bd85b748b6f21a0fba82943c468d50ee633a4d..0000000000000000000000000000000000000000 --- a/example/src/perf/perf_demo.cpp +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Description: The device object cache example. - */ - -#include - -#include "datasystem/perf_client.h" - -using datasystem::ConnectOptions; -using datasystem::Status; - -static std::shared_ptr client; - -static std::string DEFAULT_IP = "127.0.0.1"; -static constexpr int DEFAULT_PORT = 9088; -static constexpr int PARAMETERS_NUM = 3; - -bool Start() -{ - std::cerr << "Start to get perf log" << std::endl; - std::unordered_map> clientPerfLog; - Status status = client->GetPerfLog("client", clientPerfLog); - if (status.IsError()) { - std::cerr << "Run device exampled failed, detail: " << status.ToString() << std::endl; - } - - return status.IsOk(); -} - -int main(int argc, char *argv[]) -{ - std::string ip; - int port = 0; - int index = 0; - std::string clientPublicKey, clientPrivateKey, serverPublicKey; - - if (argc == 1) { - ip = DEFAULT_IP; - port = DEFAULT_PORT; - } else if (argc == PARAMETERS_NUM) { - ip = argv[++index]; - port = atoi(argv[++index]); - } else { - std::cerr << "Invalid input parameters."; - return -1; - } - - std::cout << "Start to run perf example" << std::endl; - - // Get the client - std::string workerAddr = ip + ':' + std::to_string(port); - std::cout << "The client workerAddr:" << workerAddr << std::endl; - ConnectOptions connectOpts{ .host = ip, .port = port, .connectTimeoutMs = 5 * 1000 }; - client = std::make_shared(connectOpts); - Status status = client->Init(); - if (status.IsError()) { - std::cerr << "[device]Failed to init object client : " << status.ToString() << std::endl; - return -1; - } - if (!Start()) { - return -1; - } - return 0; -} diff --git a/example/src/state_cache/state_example.cpp b/example/src/state_cache/state_example.cpp deleted file mode 100644 index f79ea41e05774f404e40b2ba9e0674d4539e8611..0000000000000000000000000000000000000000 --- a/example/src/state_cache/state_example.cpp +++ /dev/null @@ -1,174 +0,0 @@ -/** - * Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Description: The state cache example. - */ - -#include - -#include "datasystem/state_cache.h" - -using datasystem::ConnectOptions; -using datasystem::Context; -using datasystem::Optional; -using datasystem::ReadOnlyBuffer; -using datasystem::StateClient; -using datasystem::Status; - -static std::shared_ptr client_; - -static std::string DEFAULT_IP = "127.0.0.1"; -static constexpr int DEFAULT_PORT = 9088; -static constexpr int PARAMETERS_NUM = 3; -static constexpr int SUCCESS = 0; -static constexpr int FAILED = -1; - -static int Write() -{ - (void)Context::SetTraceId("write"); - std::string objectKey = "key1"; - std::string val = "test1"; - datasystem::SetParam opt; - opt.writeMode = datasystem::WriteMode::NONE_L2_CACHE; - Status status = client_->Set(objectKey, val, opt); - if (status.IsError()) { - std::cerr << "Set Fail: " << status.ToString() << std::endl; - return FAILED; - } - std::cout << "State client set succeeds." << std::endl; - return SUCCESS; -} - -static int Read() -{ - (void)Context::SetTraceId("read"); - std::string objectKey = "key1"; - std::string correctVal = "test1"; - std::string val; - Status status = client_->Get(objectKey, val); - if (status.IsError()) { - std::cerr << "Get string value failed, detail: " << status.ToString() << std::endl; - return FAILED; - } - if (correctVal == val) { - std::cout << "State client get string value succeeds." << std::endl; - } else { - std::cerr << "Get string value failed, expect value: " << correctVal << ", but get val: " << val << std::endl; - return FAILED; - } - - Optional buffer; - status = client_->Get(objectKey, buffer); - auto str = std::string(reinterpret_cast(buffer->ImmutableData()), buffer->GetSize()); - if (status.IsError()) { - std::cerr << "Get buffer value failed, detail: " << status.ToString() << std::endl; - return FAILED; - } - if (correctVal != str) { - std::cerr << "Get string value failed, expect value: " << correctVal << ", but get val: " << str << std::endl; - return FAILED; - } - std::cout << "State client get succeeds." << std::endl; - return SUCCESS; -} - -static int Delete() -{ - (void)Context::SetTraceId("delete"); - std::string objectKey = "key1"; - Status status = client_->Del(objectKey); - if (status.IsError()) { - std::cerr << "Delete failed, detail: " << status.ToString() << std::endl; - return FAILED; - } - std::cout << "State client delete succeeds." << std::endl; - return SUCCESS; -} - -static int TestSetValue() -{ - std::string val = "test1"; - datasystem::SetParam opt; - uint32_t ttl = 5; - opt.writeMode = datasystem::WriteMode::NONE_L2_CACHE; - opt.ttlSecond = ttl; - std::string key = client_->Set(val, opt); - if (key.empty()) { - std::cerr << "The key from set value api is empty." << std::endl; - return FAILED; - } - std::cout << "State client set value api succeeds." << std::endl; - return SUCCESS; -} - -int Start() -{ - int ret1 = Write(); - int ret2 = Read(); - int ret3 = Delete(); - int ret4 = TestSetValue(); - return ret1 | ret2 | ret3 | ret4; -} - -int main(int argc, char *argv[]) -{ - const int authParametersNum = 6; - std::string ip; - int port = 0; - int index = 0; - std::string clientPublicKey, clientPrivateKey, serverPublicKey; - - if (argc == 1) { - ip = DEFAULT_IP; - port = DEFAULT_PORT; - } else if (argc == PARAMETERS_NUM) { - ip = argv[++index]; - port = atoi(argv[++index]); - } else if (argc == authParametersNum) { - // example call: - // ./state_example 127.0.0.1 18482 - ip = argv[++index]; - port = atoi(argv[++index]); - clientPublicKey = argv[++index]; - clientPrivateKey = argv[++index]; - serverPublicKey = argv[++index]; - } else { - std::cerr << "Invalid input parameters."; - return FAILED; - } - - ConnectOptions connectOpts{ .host = ip, - .port = port, - .connectTimeoutMs = 3 * 1000, - .clientPublicKey = clientPublicKey, - .clientPrivateKey = clientPrivateKey, - .serverPublicKey = serverPublicKey }; - client_ = std::make_shared(connectOpts); - (void)Context::SetTraceId("init"); - Status status = client_->Init(); - if (status.IsError()) { - std::cerr << "Failed to init state client, detail: " << status.ToString() << std::endl; - return FAILED; - } - - if (Start() == FAILED) { - std::cerr << "The state client example run failed." << std::endl; - return FAILED; - } - - return SUCCESS; -} diff --git a/include/datasystem/hetero_cache/hetero_client.h b/include/datasystem/hetero_cache/hetero_client.h index 6ec5adaa5cab7142bf2ca41f1ea15e6cb5ea8ca0..ec626fe4a15b836fb40a85c8fdd661b0c4413d00 100644 --- a/include/datasystem/hetero_cache/hetero_client.h +++ b/include/datasystem/hetero_cache/hetero_client.h @@ -63,15 +63,6 @@ public: /// \return Status of the call. Status Init(); - /// \brief (Deprecated) For device object, to get multiple objects. - /// \param[in] objectKeys multiple keys support - /// \param[out] devBlobList vector of blobs, only modify the data pointed to by the pointer. - /// \param[in] timeoutMs max waiting time of getting data - /// \param[out] failList failed ojbejcts to be get - /// \return K_OK on any object success; the error code otherwise. - Status MGet(const std::vector &objectKeys, const std::vector &devBlobList, - uint64_t timeoutMs, std::vector &failList); - /// \brief Obtain data from the host and write the data to the device. /// MGetH2D and MSetD2H must be used together. /// If multiple memory addresses are combined and written to the host during MSetD2H, the host data @@ -85,20 +76,6 @@ public: Status MGetH2D(const std::vector &keys, const std::vector &devBlobList, std::vector &failKeys, int32_t subTimeoutMs); - /// \brief (Deprecated) For device object, to async get multiple objects. - /// \param[in] objectKeys multiple keys support - /// \param[out] devBlobList vector of blobs, only modify the data pointed to by the pointer. - /// \param[in] timeoutMs max waiting time of getting data - /// \return future of AsyncResult, describe get status and failed list. - std::shared_future MGetAsync(const std::vector &objectKeys, - const std::vector &devBlobList, uint64_t timeoutMs); - - /// \brief (Deprecated) For device object, to invoke worker client to create and publish multiple objects. - /// \param[in] objKeys multiple keys support - /// \param[in] devBlobList vector of blobs - /// \return K_OK on any object success; the error code otherwise. - Status MSet(const std::vector &objectKeys, const std::vector &devBlobList); - /// \brief Write the data of the device to the host. If the BLOB of the device contains multiple memory addresses, /// the device automatically combines data and writes the data to the host. /// \param[in] keys Keys in the host @@ -110,13 +87,6 @@ public: Status MSetD2H(const std::vector &keys, const std::vector &devBlobList, const SetParam &setParam = {}); - /// \brief (Deprecated) For device object Async set multiple objects, and return before publish rpc called. - /// \param[in] objKeys multiple keys support - /// \param[in] devBlobList vector of blobs - /// \return future of AsyncResult, describe set status and failed list. - std::shared_future MSetAsync(const std::vector &objectKeys, - const std::vector &devBlobList); - /// \brief Delete the key from the host. /// \param[in] keys Keys in the host /// \param[out] failedKeys The failed delete keys. @@ -174,13 +144,6 @@ public: Status DevSubscribe(const std::vector &keys, const std::vector &devBlobList, std::vector &futureVec); - /// \brief (Deprecate)Retrieves Device data through prefetching, completing the operation and returning immediately, - /// requiring combination with DevMGet. - /// \param[in] keys Keys corresponding to blob2dList - /// \param[out] devBlobList List describing the structure of Device memory - /// \return K_OK on when return sucesssfully; the error code otherwise. - Status DevPreFetch(const std::vector &keys, std::vector &blob2dList); - /// \brief Obtains data from the device and writes the data to blob2dList. Data is transmitted directly through /// the device-to-device channel. /// DevMSet and DevMGet must be used together. Heterogeneous objects are not automatically deleted after @@ -220,13 +183,6 @@ public: /// \ K_INVALID: the vector of keys is empty or include empty key. Status DevDelete(const std::vector &keys, std::vector &failedKeys); - /// \brief (Deprecated) LocalDelete interface. After calling this interface, the data replica stored in the - /// datasystem by the current client connection will be deleted. - /// \param[in] keys The objectKeys of the data expected to be deleted. - /// \param[out] failedKeys Partial failures will be returned through this parameter. - /// \return K_OK on when return sucesss; the error code otherwise. - Status LocalDelete(const std::vector &keys, std::vector &failedKeys); - /// \brief LocalDelete interface. After calling this interface, the data replica stored in the data system by the /// current client connection will be deleted. /// \param[in] keys The objectKeys of the data expected to be deleted. diff --git a/include/datasystem/kv_cache/read_only_buffer.h b/include/datasystem/kv_cache/read_only_buffer.h index a0e881b2fe9cbdb925e5ef8ba1776d97d040ecd5..4979f451108ddcdda224ea6215fd012701bbed4e 100644 --- a/include/datasystem/kv_cache/read_only_buffer.h +++ b/include/datasystem/kv_cache/read_only_buffer.h @@ -29,7 +29,6 @@ namespace datasystem { class KVClient; -class StateClient; class __attribute ((visibility ("default"))) ReadOnlyBuffer { public: ReadOnlyBuffer() = default; @@ -64,7 +63,6 @@ public: private: friend KVClient; - friend StateClient; explicit ReadOnlyBuffer(std::shared_ptr &buffer) { diff --git a/include/datasystem/object_cache/object_client.h b/include/datasystem/object_cache/object_client.h index 00f4bebbea0be930158b29b9be3b3f61d6d01671..ee186a107da4a8ba3b39fe0b156d1a4d8de3e77c 100644 --- a/include/datasystem/object_cache/object_client.h +++ b/include/datasystem/object_cache/object_client.h @@ -48,11 +48,6 @@ struct CreateParam { CacheType cacheType = CacheType::MEMORY; }; -struct CreateDeviceParam { - LifetimeType lifetime = LifetimeType::REFERENCE; - bool cacheLocation = true; -}; - struct ObjMetaInfo { uint64_t objSize{ 0 }; // the size of object data, 0 if object not found. std::vector locations; // the workerIds of the locations diff --git a/include/datasystem/object_cache/object_enum.h b/include/datasystem/object_cache/object_enum.h index fb6faf5e04a54851886a662220c168db8287e8b8..f7214d26fc85ee417d94bf9f4b43a89023224d13 100644 --- a/include/datasystem/object_cache/object_enum.h +++ b/include/datasystem/object_cache/object_enum.h @@ -31,7 +31,7 @@ enum class WriteMode : int { NONE_L2_CACHE = 0, WRITE_THROUGH_L2_CACHE = 1, // sync write WRITE_BACK_L2_CACHE = 2, // async write - NONE_L2_CACHE_EVICT = 3, // evictable write + NONE_L2_CACHE_EVICT = 3, // evictable write }; enum class ConsistencyType : int { @@ -39,27 +39,6 @@ enum class ConsistencyType : int { CAUSAL = 1, }; -enum class DataType : uint8_t { - DATA_TYPE_INT8 = 0, /**< int8 */ - DATA_TYPE_INT16 = 1, /**< int16 */ - DATA_TYPE_INT32 = 2, /**< int32 */ - DATA_TYPE_FP16 = 3, /**< fp16 */ - DATA_TYPE_FP32 = 4, /**< fp32 */ - DATA_TYPE_INT64 = 5, /**< int64 */ - DATA_TYPE_UINT64 = 6, /**< uint64 */ - DATA_TYPE_UINT8 = 7, /**< uint8 */ - DATA_TYPE_UINT16 = 8, /**< uint16 */ - DATA_TYPE_UINT32 = 9, /**< uint32 */ - DATA_TYPE_FP64 = 10, /**< fp64 */ - DATA_TYPE_BFP16 = 11, /**< bfp16 */ - DATA_TYPE_RESERVED /**< reserved */ -}; - -enum class LifetimeType : uint8_t { - REFERENCE = 0, - MOVE = 1, -}; - enum class CacheType : int { MEMORY = 0, DISK = 1, diff --git a/include/datasystem/state_cache.h b/include/datasystem/state_cache.h deleted file mode 100644 index 98fbd65ebba505e23acd735a567237519b853f90..0000000000000000000000000000000000000000 --- a/include/datasystem/state_cache.h +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef DATASYSTEM_STATE_CACHE_H -#define DATASYSTEM_STATE_CACHE_H - -#include "datasystem/context/context.h" -#include "datasystem/kv_cache/read_only_buffer.h" -#include "datasystem/state_cache/state_client.h" -#include "datasystem/utils/status.h" - -#endif // DATASYSTEM_STATE_CACHE_H \ No newline at end of file diff --git a/include/datasystem/state_cache/state_client.h b/include/datasystem/state_cache/state_client.h deleted file mode 100644 index a0bfce075129e31aebf5d2968fa60988286fddf5..0000000000000000000000000000000000000000 --- a/include/datasystem/state_cache/state_client.h +++ /dev/null @@ -1,252 +0,0 @@ -/** - * Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Description: Data system state cache client management. - */ -#ifndef DATASYSTEM_STATE_CACHE_STATE_CLIENT_H -#define DATASYSTEM_STATE_CACHE_STATE_CLIENT_H - -#include -#include -#include - -#include "datasystem/object_cache/buffer.h" -#include "datasystem/object_cache/object_enum.h" -#include "datasystem/kv_cache/kv_client.h" -#include "datasystem/kv_cache/read_only_buffer.h" -#include "datasystem/utils/connection.h" -#include "datasystem/utils/optional.h" -#include "datasystem/utils/status.h" -#include "datasystem/utils/string_view.h" - -namespace datasystem { -namespace object_cache { -class ObjectClientImpl; -} // namespace object_cache -} // namespace datasystem - -namespace datasystem { -class __attribute((visibility("default"))) StateClient { -public: - /** - * @brief Construct StateClient. - * @param[in] connectOptions The connection options. - */ - explicit StateClient(const ConnectOptions &connectOptions = {}); - - ~StateClient(); - - /** - * @brief Shutdown the state client. - * @return K_OK on success; the error code otherwise. - */ - Status ShutDown(); - - /** - * @brief Init StateClient object. - * @return Status of the call. - */ - Status Init(); - - /** - * @brief Invoke worker client to set the value of a key. - * @param[in] key The key. - * @param[in] val The value for the key. - * @param[in] param The get parameters. - * @return K_OK on success; the error code otherwise. - * K_INVALID: the key or val is empty. - */ - Status Set(const std::string &key, const StringView &val, const SetParam ¶m = {}); - - /** - * @brief Invoke worker client to set the value of a key. - * @param[in] val The value for the key. - * @param[in] param The get parameters. - * @return Key of object, return empty string if set error. - */ - std::string Set(const StringView &val, const SetParam ¶m = {}); - - /** - * @brief Transactional multi-key set interface, it guarantees all the keys are either successfully created or - * none of them is created. The number of keys should be in the range of 1 to 8. - * @param[in] keys The keys to be set. - * @param[in] vals The values for the keys. - * @param[in] param The set parameters. - * @return K_OK on success; the error code otherwise. - */ - Status MSetTx(const std::vector &keys, const std::vector &vals, - const MSetParam ¶m = {}); - - /** - * @brief Multi-key set interface, it can batch set keys and return failed keys. The max keys size < 2000 - * and the max value for key to set < 500 * 1024 - * @param[in] keys The keys of object - * @param[in] vals The vals to set - * @param[out] outFailedKeys The failed ids for set - * @param[in] param The set parameters. - * @return K_OK on any key success; the error code otherwise. - */ - Status MSet(const std::vector &keys, const std::vector &vals, - std::vector &outFailedKeys, const MSetParam ¶m = {}); - - /** - * @brief Invoke worker client to get the value of a key. - * @param[in] key The key. - * @param[out] val The value for the key. - * @param[in] timeoutMs timeoutMs of waiting for the result return if object not ready. A positive integer number - * required. 0 means no waiting time allowed. - * @return K_OK on success; the error code otherwise. - * K_INVALID: the key is empty. - * K_NOT_FOUND: the key not found. - * K_RUNTIME_ERROR: Cannot get value from worker. - */ - Status Get(const std::string &key, std::string &val, int32_t timeoutMs = 0); - - /** - * @brief Invoke worker client to get the value of a key. - * @param[in] key The key. - * @param[in] timeoutMs timeoutMs of waiting for the result return if object not ready. A positive integer number - * required. 0 means no waiting time allowed. - * @param[out] readOnlyBuffer The value for the key. - * @return K_OK on success; the error code otherwise. - * K_INVALID: the key is empty. - * K_NOT_FOUND: the key not found. - * K_RUNTIME_ERROR: Cannot get value from worker. - */ - Status Get(const std::string &key, Optional &readOnlyBuffer, int32_t timeoutMs = 0); - - /** - * @brief Invoke worker client to get the values of all the given keys. - * @param[in] keys The vector of the keys. - * @param[in] timeoutMs timeoutMs of waiting for the result return if object not ready. A positive integer number - * required. 0 means no waiting time allowed. - * @param[out] vals The vector of the values. - * @return K_OK on any key success; the error code otherwise. - * K_INVALID: the vector of keys is empty or include empty key. - * K_NOT_FOUND: the key not found. - * K_RUNTIME_ERROR: Cannot get values from worker. - * @verbatim - * If some keys are not found, The Status OK will return, - * and the existing keys will set the vals with the same index of keys. - * @endverbatim - */ - Status Get(const std::vector &keys, std::vector &vals, int32_t timeoutMs = 0); - - /** - * @brief Invoke worker client to get the values of all the given keys. - * @param[in] keys The vector of the keys. - * @param[in] timeoutMs timeoutMs of waiting for the result return if object not ready. A positive integer number - * required. 0 means no waiting time allowed. - * @param[out] readOnlyBuffers The vector of the values. - * @return K_OK on any key success; the error code otherwise. - * K_INVALID: the vector of keys is empty or include empty key. - * K_NOT_FOUND: the key not found. - * K_RUNTIME_ERROR: Cannot get values from worker. - * @verbatim - * If some keys are not found, The Status OK will return, - * and the existing keys will set the vals with the same index of keys. - * @endverbatim - */ - Status Get(const std::vector &keys, std::vector> &readOnlyBuffers, - int32_t timeoutMs = 0); - - /** - * @brief Some data in an object can be read based on the specified key and parameters. - * In some scenarios, read amplification can be avoided. - * @param[in] readParams The vector of the keys and offset. - * @param[out] readOnlyBuffers The vector of the values. - * @return K_OK on any key success; the error code otherwise. - * K_INVALID: the vector of keys is empty or include empty key. - * K_NOT_FOUND: the key not found. - * K_RUNTIME_ERROR: Cannot get values from worker. - * @verbatim - * If some keys are not found, The Status OK will return, - * and the existing keys will set the vals with the same index of keys. - * @endverbatim - */ - Status Read(const std::vector &readParams, std::vector> &readOnlyBuffers); - - /** - * @brief Invoke worker client to delete a key. - * @param[in] key The key. - * @return K_OK on success; the error code otherwise. - * K_INVALID: The key is empty. - */ - Status Del(const std::string &key); - - /** - * @brief Invoke worker client to delete all the given keys. - * @param[in] keys The vector of the keys. - * @param[out] failedKeys The failed delete keys. - * @return K_OK on any key success; the error code otherwise. - * K_INVALID: the vector of keys is empty or include empty key. - */ - Status Del(const std::vector &keys, std::vector &failedKeys); - - /** - * @brief Generate a key with workerId. - * @param[in] prefixKey The user specified key prefix. - * @return The key with workerId, if the key fails to be generated, an empty string is returned. - */ - std::string GenerateKey(const std::string &prefixKey = ""); - - /** - * @brief Invoke worker client to query the size of objectKeys (include the objectKeys of other AZ). - * @param[in] objectKeys The objectKeys need to query size. - * @param[out] outSizes The size for the objectKeys in bytes. - * @return K_OK on success; the error code otherwise. - * K_INVALID: The objectKeys are empty or invalid. - * K_NOT_FOUND: All objectKeys not found. - * K_RPC_UNAVAILABLE: Network error. - * K_NOT_READY: Worker not ready. - * K_RUNTIME_ERROR: Can not get objectKey size from worker. - */ - Status QuerySize(const std::vector &objectKeys, std::vector &outSizes); - - /** - * @brief Worker health check. - * @return K_OK on any object success; the error code otherwise. - */ - Status HealthCheck(); - - /// \brief Check whether the keys exist in the data system. - /// - /// \param[in] keys The keys to be checked. Constraint: The number of keys cannot exceed 10000. - /// \param[in] exists The existence of the corresponding key. - /// - /// \return K_OK if at least one key is successfully processed, the error code otherwise. - Status Exist(const std::vector &keys, std::vector &exists); - - /// \brief Sets expiration time for key list (in seconds) - /// - /// \param[in] key The keys to set expiration for. - /// \param[in] ttlSeconds TTL in seconds. - /// \param[out] failedKeys The failed expire keys. - /// - /// \return K_OK on success; the error code otherwise. - /// K_INVALID: The key is empty or contains invalid characters. - /// K_NOT_FOUND: The key is not exist. - /// K_NOT_READY: Worker is not ready. - /// K_RPC_UNAVAILABLE: Network error. - /// K_RUNTIME_ERROR: Inner error happen. - Status Expire(const std::vector &keys, uint32_t ttlSeconds, std::vector &failedKeys); - -private: - std::shared_ptr impl_; -}; -} // namespace datasystem -#endif // DATASYSTEM_STATE_CACHE_STATE_CLIENT_H diff --git a/include/datasystem/utils/status.h b/include/datasystem/utils/status.h index 574f308488bb657363f48cccad3d95c6415c9136..ee10d16f5b1c343f31b62f0d4c420eb246c881d9 100644 --- a/include/datasystem/utils/status.h +++ b/include/datasystem/utils/status.h @@ -59,6 +59,8 @@ enum StatusCode : uint32_t { K_RETRY_IF_LEAVING = 30, K_SCALE_DOWN = 31, K_SCALING = 32, + K_LRU_HARD_LIMIT = 33, + K_LRU_SOFT_LIMIT = 34, // rpc error code, range: [1000, 2000) K_RPC_CANCELLED = 1000, @@ -75,24 +77,6 @@ enum StatusCode : uint32_t { K_OC_KEY_ALREADY_EXIST = 2004, K_WORKER_PULL_OBJECT_NOT_FOUND = 2005, - // file error code, range: [4000, 5000) - // Delete file error code when open source - K_FC_BUSY = 4000, - K_FC_FRAGMENT_ERROR = 4001, - K_FC_NOT_FLUSHED = 4002, - K_FC_SEVERE_ERROR = 4003, - K_FC_HARD_LIMIT = 4004, - K_FC_SOFT_LIMIT = 4005, - K_FC_UPDATE_NEEDED = 4006, - K_FC_FILE_CLOSED = 4007, - K_FC_DIRECTORY_NOT_EMPTY = 4008, - K_FC_FILE_ALREADY_WRITING = 4009, - K_FC_FAIL_QUORUM_WRITE = 4010, - K_FC_FAIL_QUORUM_READ = 4011, - K_FC_RETRY_LAST_COMMIT = 4012, - K_FC_CATCHUP_PENDING = 4013, - K_FC_FLUSH_PENDING = 4014, - // Heterogeneous error code, range: [5000, 6000] K_ACL_ERROR = 5000, K_HCCL_ERROR = 5001, diff --git a/k8s/helm_chart/datasystem/values.yaml b/k8s/helm_chart/datasystem/values.yaml index b34f716482524b97a5e8e4f1658c4e7d912e360b..786ef2db13bd0c7c156e3e46c7586da860a17c41 100644 --- a/k8s/helm_chart/datasystem/values.yaml +++ b/k8s/helm_chart/datasystem/values.yaml @@ -52,7 +52,7 @@ global: # Minimum required resources. requests: cpu: "3" - memory: "4Gi" + memory: "3Gi" # Upper limit of the shared memory, the default unit for shared memory is MB. # To prevent being rendered as scientific notation by helm, numbers with more than 5 digits should be configured as strings. diff --git a/python/__init__.py b/python/__init__.py index 2a8a67a1a63e86585a09f7abd675153b8b4c1fea..0a800a03769552e7bf907192b6a9055430028641 100644 --- a/python/__init__.py +++ b/python/__init__.py @@ -21,7 +21,6 @@ __all__ = [ "DsClient", "HeteroClient", "ObjectClient", - "StateClient", "KVClient", "Status", "SubconfigType", diff --git a/python/setup.py b/python/setup.py index c605516e69652378d2d03b4320df16549321a325..f4bd0c0d853f679a708f64afdd23d716ab407e45 100644 --- a/python/setup.py +++ b/python/setup.py @@ -28,7 +28,7 @@ version_path = os.path.join(pwd, 'datasystem', 'VERSION') with open(version_path, 'r') as v: version = v.read() -package_name = 'datasystem' +package_name = 'yr-datasystem-sdk' commit_id = os.getenv('COMMIT_ID', 'None').replace("\n", "") package_datas = { @@ -78,7 +78,7 @@ class EggInfo(egg_info): """Egg info.""" def run(self): - egg_info_dir = os.path.join(os.path.dirname(__file__), 'datasystem.egg-info') + egg_info_dir = os.path.join(os.path.dirname(__file__), 'yr_datasystem_sdk.egg-info') super().run() update_permissions(egg_info_dir) diff --git a/src/datasystem/client/CMakeLists.txt b/src/datasystem/client/CMakeLists.txt index 6bc0a3c26c7d51029f4901ec1fb0b569e6e1e329..e15543ea0be25a537fab03927aaaf15114dd82b1 100644 --- a/src/datasystem/client/CMakeLists.txt +++ b/src/datasystem/client/CMakeLists.txt @@ -16,7 +16,6 @@ list(APPEND CLIENT_SRCS object_cache/device/device_memory_unit.cpp object_cache/device/hccl_comm_magr.cpp object_cache/device/page_attn_utils.cpp - state_cache/state_client.cpp kv_cache/read_only_buffer.cpp kv_cache/kv_client.cpp hetero_cache/hetero_client.cpp) diff --git a/src/datasystem/client/hetero_cache/device_util.h b/src/datasystem/client/hetero_cache/device_util.h index 6db2c15a07584a261916e0da59437b022b63aa8a..a0b9b8b6925aaf3f5e1600882378aa19589fb499 100644 --- a/src/datasystem/client/hetero_cache/device_util.h +++ b/src/datasystem/client/hetero_cache/device_util.h @@ -23,6 +23,7 @@ #include #include +#include "datasystem/common/constants.h" #include "datasystem/object_cache/object_enum.h" namespace datasystem { diff --git a/src/datasystem/client/hetero_cache/hetero_client.cpp b/src/datasystem/client/hetero_cache/hetero_client.cpp index 1aa83c811549a1685738ca7a179938b212b37347..8fa5f0b9dcba5a553afdb111f0ffe376656fcafb 100644 --- a/src/datasystem/client/hetero_cache/hetero_client.cpp +++ b/src/datasystem/client/hetero_cache/hetero_client.cpp @@ -61,18 +61,6 @@ Status HeteroClient::Init() return rc; } -Status HeteroClient::MGet(const std::vector &objectKeys, const std::vector &devBlobList, - uint64_t timeoutMs, std::vector &failKeys) -{ - RETURN_IF_NOT_OK(HeteroClient::IsCompileWithHetero()); - PerfPoint point(PerfKey::CLIENT_MGET_H2D_ALL); - TraceGuard traceGuard = Trace::Instance().SetTraceUUID(); - std::shared_future future = impl_->MGetH2D(objectKeys, devBlobList, timeoutMs); - auto result = future.get(); - failKeys = std::move(result.failedList); - return result.status; -} - Status HeteroClient::MGetH2D(const std::vector &keys, const std::vector &devBlobList, std::vector &failedKeys, int32_t subTimeoutMs) { @@ -85,34 +73,6 @@ Status HeteroClient::MGetH2D(const std::vector &keys, const std::ve return result.status; } -std::shared_future HeteroClient::MGetAsync(const std::vector &objectKeys, - const std::vector &devBlobList, - uint64_t timeoutMs) -{ -#ifndef BUILD_HETERO -#ifndef WITH_TESTS - std::promise promise; - promise.set_value( - AsyncResult{ .status = Status(K_RUNTIME_ERROR, "Hetero client is not supported. compile with -X on please!"), - .failedList = {} }); - return promise.get_future().share(); -#endif -#endif - TraceGuard traceGuard = Trace::Instance().SetTraceUUID(); - std::shared_future future = impl_->MGetH2D(objectKeys, devBlobList, timeoutMs); - return future; -} - -Status HeteroClient::MSet(const std::vector &objectKeys, const std::vector &devBlobList) -{ - RETURN_IF_NOT_OK(HeteroClient::IsCompileWithHetero()); - PerfPoint point(PerfKey::CLIENT_MSET_D2H_ALL); - TraceGuard traceGuard = Trace::Instance().SetTraceUUID(); - SetParam setParam{ .writeMode = WriteMode::NONE_L2_CACHE_EVICT }; - std::shared_future future = impl_->MSet(objectKeys, devBlobList, setParam); - return future.get().status; -} - Status HeteroClient::MSetD2H(const std::vector &keys, const std::vector &devBlobList, const SetParam &setParam) { @@ -123,23 +83,6 @@ Status HeteroClient::MSetD2H(const std::vector &keys, const std::ve return future.get().status; } -std::shared_future HeteroClient::MSetAsync(const std::vector &objectKeys, - const std::vector &devBlobList) -{ -#ifndef BUILD_HETERO -#ifndef WITH_TESTS - std::promise promise; - promise.set_value( - AsyncResult{ .status = Status(K_RUNTIME_ERROR, "Hetero client is not supported. compile with -X on please!"), - .failedList = {} }); - return promise.get_future().share(); -#endif -#endif - TraceGuard traceGuard = Trace::Instance().SetTraceUUID(); - - return impl_->MSet(objectKeys, devBlobList, { .writeMode = WriteMode::NONE_L2_CACHE_EVICT }); -} - Status HeteroClient::Delete(const std::vector &keys, std::vector &failedKeys) { RETURN_IF_NOT_OK(HeteroClient::IsCompileWithHetero()); @@ -200,13 +143,6 @@ Status HeteroClient::DevSubscribe(const std::vector &keys, const st return impl_->DevSubscribe(keys, devBlobList, futureVec); } -Status HeteroClient::LocalDelete(const std::vector &objectKeys, std::vector &failedKeys) -{ - RETURN_IF_NOT_OK(HeteroClient::IsCompileWithHetero()); - TraceGuard traceGuard = Trace::Instance().SetTraceUUID(); - return impl_->DevLocalDelete(objectKeys, failedKeys); -} - Status HeteroClient::DevDelete(const std::vector &keys, std::vector &failedKeys) { RETURN_IF_NOT_OK(HeteroClient::IsCompileWithHetero()); @@ -243,14 +179,6 @@ Status HeteroClient::DevMSet(const std::vector &keys, const std::ve return impl_->DevMSet(keys, devBlobList, failedKeys); } -Status HeteroClient::DevPreFetch(const std::vector &keys, std::vector &blob2dList) -{ - RETURN_IF_NOT_OK(HeteroClient::IsCompileWithHetero()); - (void)keys; - (void)blob2dList; - return Status::OK(); -} - Status HeteroClient::DevMGet(const std::vector &keys, std::vector &devBlobList, std::vector &failedKeys, int32_t subTimeoutMs) { diff --git a/src/datasystem/client/object_cache/device/client_device_object_manager.h b/src/datasystem/client/object_cache/device/client_device_object_manager.h index 0521da7116fb7497ff81fa04b7ad41859d85e49f..8b0b62333ef12033c6a51612ae8e0be4d917426c 100644 --- a/src/datasystem/client/object_cache/device/client_device_object_manager.h +++ b/src/datasystem/client/object_cache/device/client_device_object_manager.h @@ -26,6 +26,7 @@ #include +#include "datasystem/common/constants.h" #include "datasystem/common/device/ascend/acl_resource_manager.h" #include "datasystem/client/object_cache/device/device_memory_unit.h" #include "datasystem/client/object_cache/device/p2p_subscribe.h" diff --git a/src/datasystem/client/state_cache/state_client.cpp b/src/datasystem/client/state_cache/state_client.cpp deleted file mode 100644 index b76cc4668bf8a0794043c76dc8fdf96dd4dafdc1..0000000000000000000000000000000000000000 --- a/src/datasystem/client/state_cache/state_client.cpp +++ /dev/null @@ -1,349 +0,0 @@ -/** - * Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Description: Data system State Cache Client management. - */ -#include "datasystem/state_cache/state_client.h" - -#include -#include -#include -#include -#include -#include - -#include "datasystem/client/object_cache/object_client_impl.h" -#include "datasystem/common/log/access_recorder.h" -#include "datasystem/common/log/trace.h" -#include "datasystem/common/perf/perf_manager.h" -#include "datasystem/common/util/format.h" -#include "datasystem/common/util/status_helper.h" -#include "datasystem/common/util/strings_util.h" -#include "datasystem/common/util/uuid_generator.h" -#include "datasystem/kv_cache/read_only_buffer.h" -#include "datasystem/utils/status.h" - -namespace datasystem { -StateClient::StateClient(const ConnectOptions &connectOptions) -{ - impl_ = std::make_unique(connectOptions); -} - -StateClient::~StateClient() -{ - if (impl_) { - impl_.reset(); - } -} - -Status StateClient::ShutDown() -{ - TraceGuard traceGuard = Trace::Instance().SetTraceUUID(); - if (impl_) { - bool needRollbackState; - auto rc = impl_->ShutDown(needRollbackState); - impl_->CompleteHandler(rc.IsError(), needRollbackState); - return rc; - } - return Status::OK(); -} - -Status StateClient::Init() -{ - TraceGuard traceGuard = Trace::Instance().SetTraceUUID(); - bool needRollbackState; - auto rc = impl_->Init(needRollbackState, true); - impl_->CompleteHandler(rc.IsError(), needRollbackState); - return rc; -} - -Status StateClient::Set(const std::string &key, const StringView &val, const SetParam &setParam) -{ - TraceGuard traceGuard = Trace::Instance().SetTraceUUID(); - PerfPoint point(PerfKey::KV_CLIENT_SET_OBJECT); - AccessRecorder accessPoint(AccessRecorderKey::DS_KV_CLIENT_SET); - Status rc = impl_->Set(key, val, setParam); - RequestParam reqParam; - reqParam.objectKey = key.substr(0, LOG_OBJECT_KEY_SIZE_LIMIT); - reqParam.writeMode = std::to_string(static_cast(setParam.writeMode)); - reqParam.ttlSecond = std::to_string(setParam.ttlSecond); - reqParam.existence = std::to_string(static_cast(setParam.existence)); - reqParam.cacheType = std::to_string(static_cast(setParam.cacheType)); - accessPoint.Record(rc.GetCode(), std::to_string(val.size()), reqParam, rc.GetMsg()); - return rc; -} - -std::string StateClient::Set(const StringView &val, const SetParam &setParam) -{ - TraceGuard traceGuard = Trace::Instance().SetTraceUUID(); - PerfPoint point(PerfKey::KV_CLIENT_SET_OBJECT); - AccessRecorder accessPoint(AccessRecorderKey::DS_KV_CLIENT_SET); - std::string key; - auto rc = impl_->Set(val, setParam, key); - RequestParam reqParam; - reqParam.objectKey = key.substr(0, LOG_OBJECT_KEY_SIZE_LIMIT); - reqParam.writeMode = std::to_string(static_cast(setParam.writeMode)); - reqParam.ttlSecond = std::to_string(setParam.ttlSecond); - reqParam.cacheType = std::to_string(static_cast(setParam.cacheType)); - accessPoint.Record(rc.GetCode(), std::to_string(val.size()), reqParam); - return key; -} - -Status StateClient::MSetTx(const std::vector &keys, const std::vector &vals, - const MSetParam ¶m) -{ - TraceGuard traceGuard = Trace::Instance().SetTraceUUID(); - PerfPoint point(PerfKey::KV_CLIENT_MSET_OBJECT); - AccessRecorder accessPoint(AccessRecorderKey::DS_KV_CLIENT_MSETNX); - Status rc = impl_->MSet(keys, vals, param); - RequestParam reqParam; - std::string key = (keys.empty() ? "" : keys[0]); - reqParam.objectKey = key.substr(0, LOG_OBJECT_KEY_SIZE_LIMIT); - reqParam.writeMode = std::to_string(static_cast(param.writeMode)); - reqParam.ttlSecond = std::to_string(param.ttlSecond); - reqParam.existence = std::to_string(static_cast(param.existence)); - reqParam.cacheType = std::to_string(static_cast(param.cacheType)); - accessPoint.Record(rc.GetCode(), std::to_string(vals.size()), reqParam, rc.GetMsg()); - return rc; -} - -Status StateClient::Get(const std::string &key, std::string &val, int32_t timeoutMs) -{ - TraceGuard traceGuard = Trace::Instance().SetTraceUUID(); - PerfPoint point(PerfKey::KV_CLIENT_GET_OBJECT); - std::vector> buffers; - std::vector vals; - size_t dataSize = 0; - AccessRecorder accessPoint(AccessRecorderKey::DS_KV_CLIENT_GET); - Status rc = impl_->GetWithLatch({ key }, vals, timeoutMs, buffers, dataSize); - RequestParam reqParam; - reqParam.objectKey = key.substr(0, LOG_OBJECT_KEY_SIZE_LIMIT); - reqParam.timeout = std::to_string(timeoutMs); - StatusCode code = rc.GetCode() == K_NOT_FOUND ? K_OK : rc.GetCode(); - accessPoint.Record(code, std::to_string(dataSize), reqParam, rc.GetMsg()); - if (rc.IsOk()) { - val = std::move(vals[0]); - } - return rc; -} - -Status StateClient::Get(const std::vector &keys, std::vector &vals, int32_t timeoutMs) -{ - TraceGuard traceGuard = Trace::Instance().SetTraceUUID(); - PerfPoint point(PerfKey::KV_CLIENT_GET_MUL_OBJECTS); - std::vector> buffers; - size_t dataSize = 0; - AccessRecorder accessPoint(AccessRecorderKey::DS_KV_CLIENT_GET); - Status rc = impl_->GetWithLatch(keys, vals, timeoutMs, buffers, dataSize); - RequestParam reqParam; - reqParam.objectKey = objectKeysToString(keys); - reqParam.timeout = std::to_string(timeoutMs); - StatusCode code = rc.GetCode() == K_NOT_FOUND ? K_OK : rc.GetCode(); - accessPoint.Record(code, std::to_string(dataSize), reqParam, rc.GetMsg()); - return rc; -} - -Status StateClient::Get(const std::string &key, Optional &readOnlyBuffer, int32_t timeoutMs) -{ - TraceGuard traceGuard = Trace::Instance().SetTraceUUID(); - PerfPoint point(PerfKey::KV_CLIENT_GET_BUFFER); - std::vector> buffers; - AccessRecorder accessPoint(AccessRecorderKey::DS_KV_CLIENT_GET); - Status rc = impl_->Get({ key }, timeoutMs, buffers); - size_t dataSize = rc.IsOk() ? buffers[0]->GetSize() : 0; - RequestParam reqParam; - reqParam.objectKey = key.substr(0, LOG_OBJECT_KEY_SIZE_LIMIT); - reqParam.timeout = std::to_string(timeoutMs); - StatusCode code = rc.GetCode() == K_NOT_FOUND ? K_OK : rc.GetCode(); - accessPoint.Record(code, std::to_string(dataSize), reqParam, rc.GetMsg()); - RETURN_IF_NOT_OK(rc); - auto bufferSharedPtr = std::make_shared(std::move(buffers[0].value())); - readOnlyBuffer = Optional(ReadOnlyBuffer(bufferSharedPtr)); - return rc; -} - -Status StateClient::MSet(const std::vector &keys, const std::vector &vals, - std::vector &outFailedKeys, const MSetParam ¶m) -{ - TraceGuard traceGuard = Trace::Instance().SetTraceUUID(); - PerfPoint point(PerfKey::KV_CLIENT_MSET_OBJECT); - AccessRecorder accessPoint(AccessRecorderKey::DS_KV_CLIENT_MSETNX); - Status rc = impl_->MSet(keys, vals, param, outFailedKeys); - RequestParam reqParam; - std::string key = (keys.empty() ? "" : keys[0]); - reqParam.objectKey = key.substr(0, LOG_OBJECT_KEY_SIZE_LIMIT); - reqParam.writeMode = std::to_string(static_cast(param.writeMode)); - reqParam.ttlSecond = std::to_string(param.ttlSecond); - reqParam.existence = std::to_string(static_cast(param.existence)); - reqParam.cacheType = std::to_string(static_cast(param.cacheType)); - accessPoint.Record(rc.GetCode(), std::to_string(vals.size()), reqParam, rc.GetMsg()); - return rc; -} - -Status StateClient::Get(const std::vector &keys, std::vector> &readOnlyBuffers, - int32_t timeoutMs) -{ - TraceGuard traceGuard = Trace::Instance().SetTraceUUID(); - PerfPoint point(PerfKey::KV_CLIENT_GET_MUL_BUFFERS); - std::vector> buffers; - AccessRecorder accessPoint(AccessRecorderKey::DS_KV_CLIENT_GET); - Status rc = impl_->Get(keys, timeoutMs, buffers); - int64_t dataSize = 0; - if (rc.IsOk()) { - readOnlyBuffers.clear(); - for (auto &buffer : buffers) { - if (buffer) { - dataSize += buffer->GetSize(); - auto bufferSharedPtr = std::make_shared(std::move(buffer.value())); - readOnlyBuffers.emplace_back(ReadOnlyBuffer(bufferSharedPtr)); - } else { - readOnlyBuffers.emplace_back(); - } - } - } - RequestParam reqParam; - reqParam.objectKey = objectKeysToString(keys); - reqParam.timeout = std::to_string(timeoutMs); - StatusCode code = rc.GetCode() == K_NOT_FOUND ? K_OK : rc.GetCode(); - accessPoint.Record(code, std::to_string(dataSize), reqParam, rc.GetMsg()); - return rc; -} - -static std::string ReadParamToString(const std::vector ¶ms) -{ - std::string ret = "["; - uint32_t len = 0; - for (const auto ¶m : params) { - std::string msg = FormatString("[%s, off:%ld, size:%ld],", param.key.substr(0, LOG_OBJECT_KEY_SIZE_LIMIT), - param.offset, param.size); - uint64_t size = msg.size(); - if (len > LOG_TOTAL_KEYS_SIZE_LIMIT - size) { - ret.append("total:").append(std::to_string(params.size())).append("]"); - return ret; - } - len += size; - ret.append(msg); - } - if (ret.length() > 1) { - ret.pop_back(); - } - ret.append("]"); - return ret; -} - -Status StateClient::Read(const std::vector &readParams, - std::vector> &readOnlyBuffers) -{ - TraceGuard traceGuard = Trace::Instance().SetTraceUUID(); - PerfPoint point(PerfKey::KV_CLIENT_GET_MUL_BUFFERS); - std::vector> buffers; - AccessRecorder accessPoint(AccessRecorderKey::DS_KV_CLIENT_GET); - std::unordered_set keys; - RequestParam reqParam; - reqParam.objectKey = ReadParamToString(readParams); - reqParam.timeout = std::to_string(0); - int64_t dataSize = 0; - for (auto ¶m : readParams) { - if (keys.find(param.key) != keys.end()) { - auto status = Status(K_INVALID, FormatString("The input parameter contains duplicate key %s. Keys: %s", - param.key, VectorToString(keys))); - accessPoint.Record(K_INVALID, std::to_string(dataSize), reqParam, status.GetMsg()); - return status; - } - keys.insert(param.key); - } - Status rc = impl_->Read(readParams, buffers); - if (rc.IsOk()) { - readOnlyBuffers.clear(); - for (auto &buffer : buffers) { - if (buffer) { - dataSize += buffer->GetSize(); - auto bufferSharedPtr = std::make_shared(std::move(buffer.value())); - readOnlyBuffers.emplace_back(ReadOnlyBuffer(bufferSharedPtr)); - } else { - readOnlyBuffers.emplace_back(); - } - } - } - StatusCode code = rc.GetCode() == K_NOT_FOUND ? K_OK : rc.GetCode(); - accessPoint.Record(code, std::to_string(dataSize), reqParam, rc.GetMsg()); - return rc; -} - -Status StateClient::Del(const std::string &key) -{ - TraceGuard traceGuard = Trace::Instance().SetTraceUUID(); - PerfPoint point(PerfKey::KV_CLIENT_DEL_OBJECT); - std::vector failedKeys; - AccessRecorder accessPoint(AccessRecorderKey::DS_KV_CLIENT_DELETE); - Status rc = impl_->Delete({ key }, failedKeys); - RequestParam reqParam; - reqParam.objectKey = key.substr(0, LOG_OBJECT_KEY_SIZE_LIMIT); - accessPoint.Record(rc.GetCode(), "0", reqParam, rc.GetMsg()); - return rc; -} - -Status StateClient::Del(const std::vector &keys, std::vector &failedKeys) -{ - TraceGuard traceGuard = Trace::Instance().SetTraceUUID(); - PerfPoint point(PerfKey::KV_CLIENT_DEL_MUL_OBJECTS); - AccessRecorder accessPoint(AccessRecorderKey::DS_KV_CLIENT_DELETE); - Status rc = impl_->Delete(keys, failedKeys); - RequestParam reqParam; - reqParam.objectKey = objectKeysToString(keys); - accessPoint.Record(rc.GetCode(), "0", reqParam, rc.GetMsg()); - return rc; -} - -std::string StateClient::GenerateKey(const std::string &prefixKey) -{ - TraceGuard traceGuard = Trace::Instance().SetTraceUUID(); - std::string key; - (void)impl_->GenerateKey(key, prefixKey); - return key; -} - -Status StateClient::QuerySize(const std::vector &objectKeys, std::vector &outSizes) -{ - return impl_->QuerySize(objectKeys, outSizes); -} - -Status StateClient::HealthCheck() -{ - ServerState state; - return impl_->HealthCheck(state); -} - -Status StateClient::Exist(const std::vector &keys, std::vector &exists) -{ - TraceGuard traceGuard = Trace::Instance().SetTraceUUID(); - return impl_->Exist(keys, exists, true, false); -} - -Status StateClient::Expire(const std::vector &keys, uint32_t ttlSeconds, - std::vector &failedKeys) -{ - TraceGuard traceGuard = Trace::Instance().SetTraceUUID(); - PerfPoint point(PerfKey::KV_CLIENT_EXPIRE_OBJECT); - AccessRecorder accessPoint(AccessRecorderKey::DS_KV_CLIENT_EXPIRE); - auto rc = impl_->Expire(keys, ttlSeconds, failedKeys); - RequestParam reqParam; - reqParam.objectKey = objectKeysToString(keys); - accessPoint.Record(rc.GetCode(), "0", reqParam, rc.GetMsg()); - return rc; -} -} // namespace datasystem diff --git a/src/datasystem/common/constants.h b/src/datasystem/common/constants.h index cfbfad4fdbf941234712cd6df4691252a257a487..7d9bcbaa26f66c238e3ea59cee02962e70a04c7e 100644 --- a/src/datasystem/common/constants.h +++ b/src/datasystem/common/constants.h @@ -61,6 +61,32 @@ static const uint64_t KB = 1024; // worker lock id static const uint32_t WORKER_LOCK_ID = 0; + +enum class DataType : uint8_t { + DATA_TYPE_INT8 = 0, /**< int8 */ + DATA_TYPE_INT16 = 1, /**< int16 */ + DATA_TYPE_INT32 = 2, /**< int32 */ + DATA_TYPE_FP16 = 3, /**< fp16 */ + DATA_TYPE_FP32 = 4, /**< fp32 */ + DATA_TYPE_INT64 = 5, /**< int64 */ + DATA_TYPE_UINT64 = 6, /**< uint64 */ + DATA_TYPE_UINT8 = 7, /**< uint8 */ + DATA_TYPE_UINT16 = 8, /**< uint16 */ + DATA_TYPE_UINT32 = 9, /**< uint32 */ + DATA_TYPE_FP64 = 10, /**< fp64 */ + DATA_TYPE_BFP16 = 11, /**< bfp16 */ + DATA_TYPE_RESERVED /**< reserved */ +}; + +enum class LifetimeType : uint8_t { + REFERENCE = 0, + MOVE = 1, +}; + +struct CreateDeviceParam { + LifetimeType lifetime = LifetimeType::REFERENCE; + bool cacheLocation = true; +}; } // namespace datasystem #endif // DATASYSTEM_COMMON_CONSTANTS_H diff --git a/src/datasystem/common/lru/lru_cache.h b/src/datasystem/common/lru/lru_cache.h index e8c3458e05d7ea7054d66a4b331cad0934e1ddae..85eed7219adf65a0e0e844d044ea2877a80fa7c6 100644 --- a/src/datasystem/common/lru/lru_cache.h +++ b/src/datasystem/common/lru/lru_cache.h @@ -838,9 +838,9 @@ Status LruCache::MakeRoomInCache(LruObjPtr // on the eviction type. rc = evictPolicy_->RoomInCache(lruObjPtr, cacheMiss); while (rc.IsError()) { - if (rc.GetCode() == StatusCode::K_FC_HARD_LIMIT) { + if (rc.GetCode() == StatusCode::K_LRU_HARD_LIMIT) { RETURN_IF_NOT_OK(HardEviction(lruObjPtr, stats)); - } else if (rc.GetCode() == StatusCode::K_FC_SOFT_LIMIT) { + } else if (rc.GetCode() == StatusCode::K_LRU_SOFT_LIMIT) { RETURN_IF_NOT_OK(SoftEviction(lruObjPtr, stats)); } else { LOG(WARNING) << "Failed when trying to make room in cache."; diff --git a/src/datasystem/common/lru/lru_policy.h b/src/datasystem/common/lru/lru_policy.h index c0509e13bf2a3f9736a17997a9bd073e91127689..3e586fe8a3057685740e77a4c2493f9cd5f26974 100644 --- a/src/datasystem/common/lru/lru_policy.h +++ b/src/datasystem/common/lru/lru_policy.h @@ -185,8 +185,7 @@ public: * @tparam ObjPtr The object type in the cache. * @returns Status from the call. Status returns: * K_OK : there was room in the cache. - * K_FC_HARD_LIMIT : there was no room in the cache and a hard eviction will be needed. - * K_FC_SOFT_LIMIT : there was room in the cache, but the soft limit was hit and soft eviction can be done. + * K_LRU_HARD_LIMIT : there was no room in the cache and a hard eviction will be needed. */ template Status RoomInCache(ObjPtr obj, bool cacheMiss) const @@ -566,14 +565,13 @@ public: * @tparam ObjPtr the object type in the cache. * @returns Status from the call. Status returns: * K_OK : there was room in the cache. - * K_FC_HARD_LIMIT : there was no room in the cache and a hard eviction will be needed. - * K_FC_SOFT_LIMIT : there was room in the cache, but the soft limit was hit and soft eviction can be done. + * K_LRU_HARD_LIMIT : there was no room in the cache and a hard eviction will be needed. */ template Status RoomInCacheImpl(ObjPtr obj, bool cacheMiss) const { if (cacheMiss && (currentSize_ + obj->Size()) > maxSize_) { - RETURN_STATUS(StatusCode::K_FC_HARD_LIMIT, "Lru limit"); + RETURN_STATUS(StatusCode::K_LRU_HARD_LIMIT, "Lru limit"); } return Status::OK(); }; @@ -770,15 +768,14 @@ public: * @tparam ObjPtr the object type in the cache. * @returns Status from the call. Status returns: * K_OK : there was room in the cache. - * K_FC_HARD_LIMIT : there was no room in the cache and a hard eviction will be needed. - * K_FC_SOFT_LIMIT : there was room in the cache, but the soft limit was hit and soft eviction can be done. + * K_LRU_HARD_LIMIT : there was no room in the cache and a hard eviction will be needed. */ template Status RoomInCacheImpl(ObjPtr obj, bool cacheMiss) const { (void)obj; if (cacheMiss && (currentCount_ + 1) > maxCount_) { - RETURN_STATUS(StatusCode::K_FC_HARD_LIMIT, "Lru limit"); + RETURN_STATUS(StatusCode::K_LRU_HARD_LIMIT, "Lru limit"); } return Status::OK(); }; diff --git a/src/datasystem/common/object_cache/object_base.h b/src/datasystem/common/object_cache/object_base.h index 3f32245fd900b1414b4971adba5dd24cb749ea3d..087f00bf8467b37c2d17fe1c87e1d3dfc815f093 100644 --- a/src/datasystem/common/object_cache/object_base.h +++ b/src/datasystem/common/object_cache/object_base.h @@ -26,6 +26,7 @@ #include #include "datasystem/client/mmap_table_entry.h" +#include "datasystem/common/constants.h" #include "datasystem/common/object_cache/object_bitmap.h" #include "datasystem/common/rpc/rpc_message.h" #include "datasystem/common/shared_memory/shm_unit.h" diff --git a/src/datasystem/common/util/status_code.def b/src/datasystem/common/util/status_code.def index 4312290e44632b44c85be8478fd6e418b3fc8db8..a31f15291a0a8320f7be62c528d7d139fe242a6d 100644 --- a/src/datasystem/common/util/status_code.def +++ b/src/datasystem/common/util/status_code.def @@ -32,6 +32,8 @@ STATUS_CODE_DEF(K_SERVER_FD_CLOSED, "The server fd has been closed") STATUS_CODE_DEF(K_RETRY_IF_LEAVING, "Try again when worker is leaving") STATUS_CODE_DEF(K_SCALE_DOWN, "The worker is exiting") STATUS_CODE_DEF(K_SCALING, "The cluster is scaling") +STATUS_CODE_DEF(K_LRU_HARD_LIMIT, "Lru hard limit") +STATUS_CODE_DEF(K_LRU_SOFT_LIMIT, "Lru soft limit") // rpc STATUS_CODE_DEF(K_RPC_CANCELLED, "RPC cancelled") @@ -50,23 +52,6 @@ STATUS_CODE_DEF(K_FUTURE_TIMEOUT, "The future is timeout") STATUS_CODE_DEF(K_ACL_ERROR, "Acl api error") STATUS_CODE_DEF(K_HCCL_ERROR, "Hccl api error") -// file -STATUS_CODE_DEF(K_FC_BUSY, "System busy") -STATUS_CODE_DEF(K_FC_FRAGMENT_ERROR, "Fragment file error") -STATUS_CODE_DEF(K_FC_NOT_FLUSHED, "Not flushed") -STATUS_CODE_DEF(K_FC_SEVERE_ERROR, "Severe error") -STATUS_CODE_DEF(K_FC_HARD_LIMIT, "Hard limit error") -STATUS_CODE_DEF(K_FC_SOFT_LIMIT, "Soft limit error") -STATUS_CODE_DEF(K_FC_UPDATE_NEEDED, "Update needed") -STATUS_CODE_DEF(K_FC_FILE_CLOSED, "File closed") -STATUS_CODE_DEF(K_FC_DIRECTORY_NOT_EMPTY, "Directory not empty") -STATUS_CODE_DEF(K_FC_FILE_ALREADY_WRITING, "File already writing") -STATUS_CODE_DEF(K_FC_FAIL_QUORUM_WRITE, "Insufficient write quorum") -STATUS_CODE_DEF(K_FC_FAIL_QUORUM_READ, "Insufficient read quorum") -STATUS_CODE_DEF(K_FC_RETRY_LAST_COMMIT, "Retry last commit write") -STATUS_CODE_DEF(K_FC_CATCHUP_PENDING, "GCS Catchup is in progress") -STATUS_CODE_DEF(K_FC_FLUSH_PENDING, "Please resubmit Flush command to resolve recovery") - // rdma STATUS_CODE_DEF(K_OC_REMOTE_GET_NOT_ENOUGH, "Size on the remote node has changed") STATUS_CODE_DEF(K_URMA_ERROR, "Urma operation failed") diff --git a/src/datasystem/pybind_api/pybind_register_object.cpp b/src/datasystem/pybind_api/pybind_register_object.cpp index 6b94455e77de9bac4cf15c255bb7b4a674c7f9ae..085f971f1db75dce8b9d241ea74a9cb0dbcbdc8a 100644 --- a/src/datasystem/pybind_api/pybind_register_object.cpp +++ b/src/datasystem/pybind_api/pybind_register_object.cpp @@ -27,6 +27,7 @@ #include +#include "datasystem/common/constants.h" #include "datasystem/common/log/log.h" #include "datasystem/common/log/trace.h" #include "datasystem/common/util/format.h" diff --git a/tests/kvconnector/patch/0001-implement-yr-datasystem-connector-and-support-multimoda.patch b/tests/kvconnector/patch/0001-implement-yr-datasystem-connector-and-support-multimoda.patch index 019c039b13177df48896dda43e254909afd766eb..9ad5b82376846a2a7f31458c3b2e68be088588b7 100644 --- a/tests/kvconnector/patch/0001-implement-yr-datasystem-connector-and-support-multimoda.patch +++ b/tests/kvconnector/patch/0001-implement-yr-datasystem-connector-and-support-multimoda.patch @@ -983,7 +983,7 @@ index 0000000..9b7407a +from datasystem import DsTensorClient, Future + +ENABLE_PREFIX_CACHING = int(os.environ.get("USING_PREFIX_CONNECTOR", 1)) -+FUTURE_TIMEOUT = int(os.getenv("FUTURE_TIMEOUT", 0)) ++FUTURE_TIMEOUT = int(os.getenv("FUTURE_TIMEOUT", 10000)) +SYNC_FUTURE_TIMEOUT = int(os.getenv("SYNC_FUTURE_TIMEOUT", 1)) +SLEEP_TIMEOUT = 0.005 + diff --git a/tests/st/client_c_api/object_cache/state_cache_test.cpp b/tests/st/client_c_api/object_cache/state_cache_test.cpp deleted file mode 100644 index 048094952af24fc866fb5991779a26fb63cc6f36..0000000000000000000000000000000000000000 --- a/tests/st/client_c_api/object_cache/state_cache_test.cpp +++ /dev/null @@ -1,516 +0,0 @@ -/** - * Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Description: test cases for c client api. - */ -#include -#include -#include -#include -#include -#include - -#include - -#include "client/object_cache/oc_client_common.h" -#include "common.h" -#include "datasystem/common/log/log.h" -#include "datasystem/state_cache/state_client.h" - -namespace datasystem { -namespace st { -class SCCWrapperTest : public OCClientCommon { -public: - void SetClusterSetupOptions(ExternalClusterOptions &opts) override - { - opts.numWorkers = 3; - opts.numMasters = 3; - opts.numOBS = 1; - opts.numEtcd = 1; - opts.workerGflagParams = "-shared_memory_size_mb=10000"; - } - - StateClient_p CreateStateCacheClient(const std::string &workerHost, const int workerPort, const int timeOut, - const std::string &clientPublicKey, const std::string &clientPrivateKey, - const std::string &serverPublicKey, const std::string &accessKey, - const std::string &secretKey, const std::string &tenantId, - const std::string &enableCrossNodeConnection) - { - return SCCreateClient(workerHost.c_str(), workerPort, timeOut, clientPublicKey.c_str(), - clientPublicKey.length(), clientPrivateKey.c_str(), clientPrivateKey.length(), - serverPublicKey.c_str(), serverPublicKey.length(), accessKey.c_str(), accessKey.length(), - secretKey.c_str(), secretKey.length(), tenantId.c_str(), tenantId.length(), - enableCrossNodeConnection.c_str()); - } - -protected: - static const size_t GB = 1024 * 1024 * 1024; - std::string ak_ = "QTWAOYTTINDUT2QVKYUC"; - std::string sk_ = "MFyfvK41ba2giqM7**********KGpownRZlmVmHc"; -}; - -TEST_F(SCCWrapperTest, ConnectSuccess) -{ - HostPort srcWorkerAddress; - DS_ASSERT_OK(cluster_->GetWorkerAddr(0, srcWorkerAddress)); - auto stateClient = CreateStateCacheClient( - srcWorkerAddress.Host(), srcWorkerAddress.Port(), 60000, "", "", "", - ak_, sk_, "", "true"); - ASSERT_EQ(SCConnectWorker(stateClient).code, datasystem::K_OK); - SCFreeClient(stateClient); -} - -TEST_F(SCCWrapperTest, LEVEL1_ConnectFail) -{ - HostPort srcWorkerAddress; - DS_ASSERT_OK(cluster_->GetWorkerAddr(0, srcWorkerAddress)); - auto stateClient = CreateStateCacheClient( - srcWorkerAddress.Host(), 989, 60000, "", "", "", - ak_.c_str(), sk_.c_str(), "", "true"); - ASSERT_EQ(SCConnectWorker(stateClient).code, datasystem::K_RPC_UNAVAILABLE); - SCFreeClient(stateClient); -} - -TEST_F(SCCWrapperTest, SetSuccess) -{ - HostPort srcWorkerAddress; - DS_ASSERT_OK(cluster_->GetWorkerAddr(0, srcWorkerAddress)); - auto stateClient = CreateStateCacheClient( - srcWorkerAddress.Host(), srcWorkerAddress.Port(), 60000, "", "", "", - ak_, sk_, "", "true"); - ASSERT_EQ(SCConnectWorker(stateClient).code, datasystem::K_OK); - - const char *cKey = "key5485"; - const char *cVal = "data1"; - const size_t keyLen = 7; - const size_t valLen = 5; - ASSERT_EQ(SCSet(stateClient, cKey, keyLen, cVal, valLen, "NONE_L2_CACHE", 0, "NONE").code, datasystem::K_OK); - SCFreeClient(stateClient); -} - -TEST_F(SCCWrapperTest, GetSuccess) -{ - HostPort srcWorkerAddress; - DS_ASSERT_OK(cluster_->GetWorkerAddr(0, srcWorkerAddress)); - auto stateClient = CreateStateCacheClient( - srcWorkerAddress.Host(), srcWorkerAddress.Port(), 60000, "", "", "", - ak_, sk_, "", "true"); - ASSERT_EQ(SCConnectWorker(stateClient).code, datasystem::K_OK); - - const char *cKey = "key1223"; - const char *cValS = "data1"; - const size_t valLen = 5; - const size_t keyLen = 7; - ASSERT_EQ(SCSet(stateClient, cKey, keyLen, cValS, valLen, "NONE_L2_CACHE", 0, "NX").code, datasystem::K_OK); - - char *cValG = nullptr; - size_t valLenToGet; - ASSERT_EQ(SCGet(stateClient, cKey, keyLen, 0, &cValG, &valLenToGet).code, datasystem::K_OK); - ASSERT_EQ(std::string(cValS, 5), std::string(cValG, 5)); - free(cValG); - SCFreeClient(stateClient); -} - -TEST_F(SCCWrapperTest, GetBigObjectSuccess) -{ - HostPort srcWorkerAddress; - DS_ASSERT_OK(cluster_->GetWorkerAddr(0, srcWorkerAddress)); - auto stateClient = CreateStateCacheClient( - srcWorkerAddress.Host(), srcWorkerAddress.Port(), 60000, "", "", "", - ak_, sk_, "", "true"); - ASSERT_EQ(SCConnectWorker(stateClient).code, datasystem::K_OK); - std::string data = GenPartRandomString(3 * GB); - const char *cKey = "key1223"; - const char *cValS = data.c_str(); - const size_t keyLen = 7; - const size_t valLen = 3 * GB; - ASSERT_EQ(SCSet(stateClient, cKey, keyLen, cValS, valLen, "NONE_L2_CACHE", 0, "NONE").code, datasystem::K_OK); - - char *cValG = nullptr; - size_t valLenToGet; - - ASSERT_EQ(SCGet(stateClient, cKey, keyLen, 0, &cValG, &valLenToGet).code, datasystem::K_OK); - ASSERT_EQ(std::string(cValS, 3 * GB), std::string(cValG, valLenToGet)); - free(cValG); - SCFreeClient(stateClient); -} - -TEST_F(SCCWrapperTest, DelSuccess) -{ - HostPort srcWorkerAddress; - DS_ASSERT_OK(cluster_->GetWorkerAddr(0, srcWorkerAddress)); - auto stateClient = CreateStateCacheClient( - srcWorkerAddress.Host(), srcWorkerAddress.Port(), 60000, "", "", "", - ak_, sk_, "", "true"); - ASSERT_EQ(SCConnectWorker(stateClient).code, datasystem::K_OK); - - const char *cKey = "key1523"; - const char *cValS = "data1"; - const size_t valLen = 5; - const size_t keyLen = 7; - ASSERT_EQ(SCSet(stateClient, cKey, keyLen, cValS, valLen, "NONE_L2_CACHE", 0, "NONE").code, datasystem::K_OK); - - ASSERT_EQ(SCDel(stateClient, cKey, strlen(cKey)).code, datasystem::K_OK); - SCFreeClient(stateClient); -} - -TEST_F(SCCWrapperTest, GetArraySuccess) -{ - HostPort srcWorkerAddress; - DS_ASSERT_OK(cluster_->GetWorkerAddr(0, srcWorkerAddress)); - auto stateClient = CreateStateCacheClient( - srcWorkerAddress.Host(), srcWorkerAddress.Port(), 60000, "", "", "", ak_, sk_, - "", "true"); - ASSERT_EQ(SCConnectWorker(stateClient).code, datasystem::K_OK); - - const char *cKey1 = "key1763"; - const char *cValS1 = "data1"; - const size_t valLen1 = 5; - const size_t keyLen1 = 7; - ASSERT_EQ(SCSet(stateClient, cKey1, keyLen1, cValS1, valLen1, "NONE_L2_CACHE", 0, "NONE").code, datasystem::K_OK); - - const char *cKey2 = "key1233"; - const char *cValS2 = "data2"; - const size_t valLen2 = 5; - const size_t keyLen2 = 7; - ASSERT_EQ(SCSet(stateClient, cKey2, keyLen2, cValS2, valLen2, "NONE_L2_CACHE", 0, "NONE").code, datasystem::K_OK); - - const char *cKeys[2]; - cKeys[0] = cKey1; - cKeys[1] = cKey2; - const size_t keysLen[2] = {7, 7}; - char **cVals = MakeCharsArray(2); - size_t *valsLen = MakeNumArray(2); - ASSERT_EQ(SCGetArray(stateClient, cKeys, keysLen, 2, 0, cVals, valsLen).code, datasystem::K_OK); - ASSERT_EQ(std::string(cValS1, 5), std::string(cVals[0], valsLen[0])); - ASSERT_EQ(std::string(cValS2, 5), std::string(cVals[1], valsLen[1])); - FreeCharsArray(cVals, 2); - SCFreeClient(stateClient); -} - -TEST_F(SCCWrapperTest, GetArrayKeyMissing) -{ - HostPort srcWorkerAddress; - DS_ASSERT_OK(cluster_->GetWorkerAddr(0, srcWorkerAddress)); - auto stateClient = CreateStateCacheClient( - srcWorkerAddress.Host(), srcWorkerAddress.Port(), 60000, "", "", "", ak_, sk_, - "", "true"); - ASSERT_EQ(SCConnectWorker(stateClient).code, datasystem::K_OK); - - const char *cKey1 = "key1453"; - const char *cKey2 = "key1298"; - const char *cValS2 = "data1"; - const size_t valLen = 5; - const size_t keyLen2 = 7; - ASSERT_EQ(SCSet(stateClient, cKey2, keyLen2, cValS2, valLen, "NONE_L2_CACHE", 0, "NONE").code, datasystem::K_OK); - - const char *cKeys[2]; - cKeys[0] = cKey1; - cKeys[1] = cKey2; - const size_t keysLen[2] = {7, 7}; - char **cVals = MakeCharsArray(2); - size_t *valsLen = MakeNumArray(2); - ASSERT_EQ(SCGetArray(stateClient, cKeys, keysLen, 2, 0, cVals, valsLen).code, datasystem::K_OK); - ASSERT_EQ(cVals[0], nullptr); - ASSERT_EQ(std::string(cValS2, 5), std::string(cVals[1], valsLen[1])); - FreeCharsArray(cVals, 2); - SCFreeClient(stateClient); -} - -TEST_F(SCCWrapperTest, DelArraySuccess) -{ - HostPort srcWorkerAddress; - DS_ASSERT_OK(cluster_->GetWorkerAddr(0, srcWorkerAddress)); - auto stateClient = CreateStateCacheClient( - srcWorkerAddress.Host(), srcWorkerAddress.Port(), 60000, "", "", "", ak_, sk_, - "", "true"); - ASSERT_EQ(SCConnectWorker(stateClient).code, datasystem::K_OK); - - const char *cKey1 = "key1863"; - const char *cValS1 = "data1"; - const size_t valLen = 5; - const size_t keyLen = 7; - ASSERT_EQ(SCSet(stateClient, cKey1, keyLen, cValS1, valLen, "NONE_L2_CACHE", 0, "NONE").code, datasystem::K_OK); - - const char *cKey2 = "key1253"; - const char *cValS2 = "data2"; - const size_t valLen2 = 5; - const size_t keyLen2 = 7; - ASSERT_EQ(SCSet(stateClient, cKey2, keyLen2, cValS2, valLen2, "NONE_L2_CACHE", 0, "NONE").code, datasystem::K_OK); - - const char *cKeys[2]; - cKeys[0] = cKey1; - cKeys[1] = cKey2; - uint64_t failedCount; - uint64_t numKeys = 2; - char **cVals = MakeCharsArray(numKeys); - ASSERT_EQ(SCDelArray(stateClient, cKeys, numKeys, cVals, &failedCount).code, datasystem::K_OK); - ASSERT_EQ(failedCount, 0ul); - FreeCharsArray(cVals, numKeys); - SCFreeClient(stateClient); -} - -TEST_F(SCCWrapperTest, DelArrayKeyMissing) -{ - HostPort srcWorkerAddress; - DS_ASSERT_OK(cluster_->GetWorkerAddr(0, srcWorkerAddress)); - auto stateClient = CreateStateCacheClient( - srcWorkerAddress.Host(), srcWorkerAddress.Port(), 60000, "", "", "", ak_, sk_, - "", "true"); - ASSERT_EQ(SCConnectWorker(stateClient).code, datasystem::K_OK); - - const char *cKey1 = "key1823"; - const char *cKey2 = "key1890"; - const char *cValS2 = "data2"; - const size_t valLen = 5; - const size_t keyLen = 7; - ASSERT_EQ(SCSet(stateClient, cKey2, keyLen, cValS2, valLen, "NONE_L2_CACHE", 0, "NONE").code, datasystem::K_OK); - - const char *cKeys[2]; - cKeys[0] = cKey1; - cKeys[1] = cKey2; - uint64_t failedCount; - char **cVals = MakeCharsArray(2); - ASSERT_EQ(SCDelArray(stateClient, cKeys, 2, cVals, &failedCount).code, datasystem::K_OK); - ASSERT_EQ(failedCount, 0ul); - FreeCharsArray(cVals, 2); - SCFreeClient(stateClient); -} - -TEST_F(SCCWrapperTest, TestWorkerAddressError) -{ - std::string st(""); - auto stateClient = CreateStateCacheClient(st, -20, 60000, "", "", "", ak_, sk_, "", "true"); - ASSERT_EQ(stateClient, nullptr); - SCFreeClient(stateClient); -} - -TEST_F(SCCWrapperTest, ClientCreateArgumentTest) -{ - HostPort srcWorkerAddress; - DS_ASSERT_OK(cluster_->GetWorkerAddr(0, srcWorkerAddress)); - const char *clPubKey = "This is testing client public key"; - const char *clPriKey = "This is testing client private key"; - const char *srPubKey = "This is testing server public key"; - const char *accessKey = "This is testing access key"; - const char *secretKey = "This is testing secret key"; - auto stateClient = CreateStateCacheClient(srcWorkerAddress.Host(), - srcWorkerAddress.Port(), - 60000, - clPubKey, - clPriKey, - srPubKey, - accessKey, - secretKey, - "", - "true"); - ASSERT_NE(stateClient, nullptr); - SCFreeClient(stateClient); -} - -TEST_F(SCCWrapperTest, TestGenerateKey) -{ - HostPort srcWorkerAddress; - DS_ASSERT_OK(cluster_->GetWorkerAddr(0, srcWorkerAddress)); - auto stateClient = CreateStateCacheClient( - srcWorkerAddress.Host(), srcWorkerAddress.Port(), 60000, "", "", "", ak_, sk_, - "", "true"); - ASSERT_EQ(SCConnectWorker(stateClient).code, datasystem::K_OK); - - char *key = nullptr; - size_t keyLen = SCGenerateKey(stateClient, &key); - const size_t generateKeyLen = 73; // uuid;workeruuid, 36+1+36 - ASSERT_EQ(keyLen, generateKeyLen); - std::string strKey = key; - ASSERT_TRUE(strKey.find(";") != std::string::npos); - delete[] key; - - SCFreeClient(stateClient); -} - -class KVClientGetSubscribeTest : public OCClientCommon { -public: - void SetClusterSetupOptions(ExternalClusterOptions &opts) override - { - int numWorkers = 3; - opts.isObjectCache = true; - opts.masterIdx = 0; - opts.numWorkers = numWorkers; - opts.workerGflagParams = " -shared_memory_size_mb=12 -v=2"; - opts.masterGflagParams = " -v=1"; - opts.numEtcd = 1; - } - - void TearDown() override - { - ExternalClusterTest::TearDown(); - } -}; - -TEST_F(KVClientGetSubscribeTest, TestGetSubscribeAndAddLoaction) -{ - LOG(INFO) << "Test get subscribe and add location"; - std::shared_ptr client0; - std::shared_ptr client1; - InitTestKVClient(0, client0); - InitTestKVClient(1, client1); - - std::string key = "Welcome_to_join_the_conference"; - std::string value = randomData_.GetRandomString(1 * 1024ul * 1024ul); - SetParam param; - param.existence = ExistenceOpt::NX; - std::thread t1([&client0, key, value]() { - std::string getVal; - DS_ASSERT_OK(client0->Get(key, getVal, 5'000)); - ASSERT_EQ(getVal, value); - }); - - std::thread t2([&client1, key, &value, param]() { - sleep(2); - DS_ASSERT_OK(client1->Set(key, value, param)); - }); - - t1.join(); - t2.join(); - - DS_ASSERT_OK(client1->Del(key)); - std::string getVal; - DS_ASSERT_NOT_OK(client0->Get(key, getVal)); - DS_ASSERT_OK(client0->Set(key, value, param)); -} - -TEST_F(KVClientGetSubscribeTest, TestSetGetConcurrency) -{ - LOG(INFO) << "Test set get concurrency"; - std::shared_ptr client0; - std::shared_ptr client1; - InitTestKVClient(0, client0); - InitTestKVClient(1, client1); - - std::string key = "Welcome_to_join_the_conference"; - std::string value = randomData_.GetRandomString(1024ul); - SetParam param; - param.existence = ExistenceOpt::NX; - for (int k = 0; k < 5; ++k) { - std::thread t1([&client0, key, value]() { - std::string getVal; - for (size_t i = 0; i < 100; ++i) { - DS_ASSERT_OK(client0->Get(key, getVal, 5'000)); - ASSERT_EQ(getVal, value); - } - }); - - std::thread t2([&client1, key, &value, param]() { - for (size_t i = 0; i < 100; ++i) { - if (i == 0) { - sleep(1); - DS_ASSERT_OK(client1->Set(key, value, param)); - } else { - DS_ASSERT_NOT_OK(client1->Set(key, value, param)); - } - } - }); - - t1.join(); - t2.join(); - - DS_ASSERT_OK(client1->Del(key)); - } -} - -class KVClientGetTest : public OCClientCommon { -public: - void SetClusterSetupOptions(ExternalClusterOptions &opts) override - { - int numWorkers = 3; - opts.isObjectCache = true; - opts.masterIdx = 0; - opts.numWorkers = numWorkers; - opts.workerGflagParams = " -shared_memory_size_mb=8192 -v=2"; - opts.masterGflagParams = " -v=1"; - opts.numEtcd = 1; - } - - void TearDown() override - { - ExternalClusterTest::TearDown(); - } -}; - -TEST_F(KVClientGetTest, EXCLUSIVE_LEVEL1_TestGetAllDataFromQueryResultDirectly) -{ - std::vector objKeys = {"Kevin", "Bob", "Stuart", "Gru"}; - std::vector sizes = { - 1024, 1024 * 1024ul, 2 * 1024ul * 1024ul * 1024ul + 1, 2 * 1024ul * 1024ul * 1024ul + 100}; - std::shared_ptr client0; - std::shared_ptr client1; - InitTestKVClient(0, client0); - InitTestKVClient(1, client1); - - for (size_t i = 0; i < objKeys.size(); ++i) { - std::string val(sizes[i], 'a' + i); - DS_ASSERT_OK(client0->Set(objKeys[i], val)); - } - - std::vector vals; - DS_ASSERT_OK(client1->Get(objKeys, vals)); - ASSERT_EQ(vals.size(), objKeys.size()); - for (size_t i = 0; i < vals.size(); ++i) { - ASSERT_EQ(vals[i].size(), sizes[i]); - std::string expectedVal(sizes[i], 'a' + i); - ASSERT_EQ(expectedVal, vals[i]); - } -} - -TEST_F(KVClientGetTest, EXCLUSIVE_TestGetAllDataFromQueryResultAndRemote) -{ - std::vector localObjs = {"Kevin", "Bob", "Stuart"}; - std::vector localSizes = {1024, 1024 * 1024ul, 2 * 1024ul * 1024ul * 1024ul + 100}; - std::vector remoteObjs = {"LeBron", "James", "Nomadic", "Dynasty"}; - std::vector remoteSizes = {2 * 1024, 1024 * 1024ul, 2 * 1024ul * 1024ul, 6024}; - std::shared_ptr client0; - std::shared_ptr client1; - std::shared_ptr client2; - InitTestKVClient(0, client0); - InitTestKVClient(1, client1); - InitTestKVClient(1, client2); - - for (size_t i = 0; i < localObjs.size(); ++i) { - std::string val(localSizes[i], 'l'); - DS_ASSERT_OK(client0->Set(localObjs[i], val)); - } - - for (size_t i = 0; i < remoteObjs.size(); ++i) { - std::string val(remoteSizes[i], 'r'); - DS_ASSERT_OK(client1->Set(remoteObjs[i], val)); - } - - std::vector getObjs(localObjs); - getObjs.insert(getObjs.end(), remoteObjs.begin(), remoteObjs.end()); - std::vector sizes(localSizes); - sizes.insert(sizes.end(), remoteSizes.begin(), remoteSizes.end()); - std::vector vals; - DS_ASSERT_OK(client2->Get(getObjs, vals)); - ASSERT_EQ(vals.size(), getObjs.size()); - for (size_t i = 0; i < vals.size(); ++i) { - ASSERT_EQ(vals[i].size(), sizes[i]); - std::string expectedVal = i < localObjs.size() ? std::string(sizes[i], 'l') : std::string(sizes[i], 'r'); - ASSERT_EQ(expectedVal, vals[i]); - } -} -} // namespace st -} // namespace datasystem