# arkflow
**Repository Path**: arkflow-rs/arkflow
## Basic Information
- **Project Name**: arkflow
- **Description**: 高性能 Rust 流处理引擎无缝集成了人工智能功能,可提供强大的实时数据处理和智能分析功能。
- **Primary Language**: Rust
- **License**: Apache-2.0
- **Default Branch**: main
- **Homepage**: https://arkflow-rs.com/
- **GVP Project**: No
## Statistics
- **Stars**: 1
- **Forks**: 0
- **Created**: 2025-11-02
- **Last Updated**: 2025-11-02
## Categories & Tags
**Categories**: Uncategorized
**Tags**: AI, Rust, SQL, Kafka
## README
# ArkFlow
[English](README.md) | 中文
[](https://github.com/arkflow-rs/arkflow/actions/workflows/rust.yml)
[](LICENSE)
[最新版文档](https://arkflow-rs.com/docs/intro) | [开发版文档](https://arkflow-rs.com/docs/next/intro)
高性能Rust流处理引擎,无缝集成AI能力,提供强大的实时数据处理与智能分析。
它不仅支持多种输入/输出源和处理器,更能轻松加载和执行机器学习模型,实现流式数据和推理、异常检测和复杂事件处理。
## CNCF 云原生技术全景图
ArkFlow 已收录在 [CNCF Cloud Native 云原生技术全景图](https://landscape.cncf.io/?item=app-definition-and-development--streaming-messaging--arkflow)中。
## 特性
- **高性能**:基于Rust和Tokio异步运行时构建,提供卓越的性能和低延迟
- **智能分析**:无缝集成AI模型,提供强大的智能分析功能
- **多种数据源**:支持Kafka、MQTT、HTTP、文件等多种输入输出源
- **强大的处理能力**:内置SQL查询、Python脚本、JSON处理、Protobuf编解码、批处理等多种处理器
- **可扩展**:模块化设计,易于扩展新的输入、缓冲区、输出和处理器组件
## 安装
### 从源码构建
```bash
# 克隆仓库
git clone https://github.com/arkflow-rs/arkflow.git
cd arkflow
# 构建项目
cargo build --release
# 运行测试
cargo test
```
## 快速开始
1. 创建配置文件 `config.yaml`:
```yaml
logging:
level: info
streams:
- input:
type: "generate"
context: '{ "timestamp": 1625000000000, "value": 10, "sensor": "temp_1" }'
interval: 1s
batch_size: 10
pipeline:
thread_num: 4
processors:
- type: "json_to_arrow"
- type: "sql"
query: "SELECT * FROM flow WHERE value >= 10"
output:
type: "stdout"
error_output:
type: "stdout"
```
2. 运行ArkFlow:
```bash
./target/release/arkflow --config config.yaml
```
## 配置说明
ArkFlow使用YAML格式的配置文件,支持以下主要配置项:
### 顶级配置
```yaml
logging:
level: info # 日志级别:debug, info, warn, error
streams: # 流定义列表
- input: # 输入配置
# ...
pipeline: # 处理管道配置
# ...
output: # 输出配置
# ...
error_output: # 错误输出配置
# ...
buffer: # 缓冲配置
# ...
```
### 输入组件
ArkFlow支持多种输入源:
- **Kafka**:从Kafka主题读取数据
- **MQTT**:从MQTT主题订阅消息
- **HTTP**:通过HTTP接收数据
- **文件**:使用SQL从文件(Csv、Json、Parquet、Avro、Arrow)读取数据
- **生成器**:生成测试数据
- **数据库**:从数据库(MySQL、PostgreSQL、SQLite、Duckdb)查询数据
- **Nats**: 订阅来自 Nats 主题的消息
- **Redis**: 订阅来自 Redis 频道或列表的消息
- **Websocket**: 订阅来自 WebSocket 连接的消息
- **Modbus**: 从 Modbus 设备读取数据
示例:
```yaml
input:
type: kafka
brokers:
- localhost:9092
topics:
- test-topic
consumer_group: test-group
client_id: arkflow
start_from_latest: true
```
### 处理器
ArkFlow提供多种数据处理器:
- **JSON**:JSON数据处理和转换
- **SQL**:使用SQL查询处理数据
- **Protobuf**:Protobuf编解码
- **批处理**:将消息批量处理
- **Vrl**: 使用[VRL](https://vector.dev/docs/reference/vrl/)进行处理数据
示例:
```yaml
pipeline:
thread_num: 4
processors:
- type: json_to_arrow
- type: sql
query: "SELECT * FROM flow WHERE value >= 10"
```
### 输出组件
ArkFlow支持多种输出目标:
- **Kafka**:将数据写入Kafka主题
- **MQTT**:将消息发布到MQTT主题
- **HTTP**:通过HTTP发送数据
- **标准输出**:将数据输出到控制台
- **Drop**: 丢弃数据
- **Nats**: 将消息发布到 Nats 主题
示例:
```yaml
output:
type: kafka
brokers:
- localhost:9092
topic:
type: value
value: output-topic
client_id: arkflow-producer
```
### 错误输出组件
- **Kafka**:将错误数据写入 Kafka 主题
- **MQTT**:将错误消息发布到 MQTT 主题
- **HTTP**:通过 HTTP 发送错误数据
- **标准输出**:将错误数据输出到控制台
- **丢弃**:丢弃错误数据
- **Nats**: 将消息发布到 Nats 主题
示例:
```yaml
error_output:
type: kafka
brokers:
- localhost:9092
topic:
type: value
value: error-topic
client_id: error-arkflow-producer
```
### 缓冲组件
ArkFlow 提供缓冲能力,以处理消息的背压和临时存储:
- **内存缓冲**: 内存缓冲区,用于高吞吐量场景和窗口聚合。
- **会话窗口 (Session Window)**:会话窗口缓冲组件提供了一种基于会话的消息分组机制,其中消息根据活动间隙进行分组。它实现了一个会话窗口,在可配置的非活动期后关闭。
- **滑动窗口 (Sliding Window)**:滑动窗口缓冲组件提供了一种基于时间的分批处理消息的窗口机制。它实现了一种滑动窗口算法,具有可配置的窗口大小、滑动间隔和滑动大小。
- **滚动窗口 (Tumbling Window)**:滚动窗口缓冲组件提供了一种固定大小、不重叠的批处理消息的窗口机制。它实现了一种滚动窗口算法,具有可配置的间隔设置。
示例:
```yaml
buffer:
type: memory
capacity: 10000 # Maximum number of messages to buffer
timeout: 10s # Maximum time to buffer messages
```
## 示例
### Kafka到Kafka的数据处理
```yaml
streams:
- input:
type: kafka
brokers:
- localhost:9092
topics:
- test-topic
consumer_group: test-group
pipeline:
thread_num: 4
processors:
- type: json_to_arrow
- type: sql
query: "SELECT * FROM flow WHERE value > 100"
output:
type: kafka
brokers:
- localhost:9092
topic:
type: value
value: processed-topic
```
### 生成测试数据并处理
```yaml
streams:
- input:
type: "generate"
context: '{ "timestamp": 1625000000000, "value": 10, "sensor": "temp_1" }'
interval: 1ms
batch_size: 10000
pipeline:
thread_num: 4
processors:
- type: "json_to_arrow"
- type: "sql"
query: "SELECT count(*) FROM flow WHERE value >= 10 group by sensor"
output:
type: "stdout"
```
## 用户
- Conalog(国家: 韩国)
## ArkFlow 插件
[ArkFlow 插件示例](https://github.com/arkflow-rs/arkflow-plugin-examples)
## 许可证
ArkFlow 使用 [Apache License 2.0](LICENSE) 许可证。
## 社区
Discord: https://discord.gg/CwKhzb8pux
微信社区群:
您可以在群内提出任何需要改进的地方,我们会考虑合理性并尽快修改。
如果您发现 bug 请及时提 [issue](https://github.com/arkflow-rs/arkflow/issues/new?template=bug_report.md),我们会尽快确认并修改。
如果你喜欢或正在使用这个项目来学习或开始你的解决方案,请给它一个star⭐。谢谢!