# zgg-dataflow **Repository Path**: dapisces/zgg-dataflow ## Basic Information - **Project Name**: zgg-dataflow - **Description**: 数据流处理系统 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-02-04 - **Last Updated**: 2026-02-04 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 数据流处理系统使用说明 ## 项目概述 **数据流处理系统** 是一个基于 Java + Spring Boot 的数据处理平台,用于从 MongoDB 中读取业务数据,根据费用计算规则进行数据处理,并将结果汇总到费用统计表中。系统支持实时数据同步和批量数据处理。 ## 技术架构 ### 技术栈 - **后端**: Java 8 + Spring Boot - **数据库**: MySQL 8.0 (元数据存储) + MongoDB 4.x (业务数据存储) - **构建工具**: Maven 3.8.5 - **连接池**: Druid - **JSON 处理**: Jackson - **日志**: SLF4J + Logback ### 项目结构 ``` zgg-dataflow/ ├── src/ # 源代码目录 │ └── main/ │ ├── java/com/ygg/ │ │ ├── config/ # 配置类 │ │ ├── model/ # 数据模型类 │ │ ├── service/ # 服务类 │ │ ├── pipeline/ # 管道操作类 │ │ └── schedule/ # 调度类 │ └── resources/ # 配置文件 ├── docs/ # 文档目录 ├── claude/ # 工具脚本目录 ├── pom.xml # Maven 配置文件 └── README.md # 项目说明文档 ``` ## 安装和配置 ### 环境要求 - JDK 8 - Maven 3.8.5+ - MySQL 8.0+ - MongoDB 4.x+ ### 1. 克隆项目 ```bash git clone <项目地址> cd zgg-dataflow ``` ### 2. 配置数据库连接 #### MySQL 连接配置 编辑 `src/main/resources/application.properties` 文件,配置 MySQL 连接信息: ```properties # MySQL 数据库配置 spring.datasource.druid.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.druid.url=jdbc:mysql://47.97.107.26:13306/ygg_all_dev?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai spring.datasource.druid.username=root spring.datasource.druid.password=gUBNBwb72OisAECvqf8gLB9WeiKexX9u7ENFr6Ike0eLoeizQwN2w4zdMvtXPGCsgv4s1hEZuQdFUmQiWWxb+A== spring.datasource.druid.connectionProperties=config.decrypt=true;config.decrypt.key=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAIm7G1G2gYLI3UoN1eQmq2JSkRD1BXmsMaomt1vV1OVYwOoE03zNEhtqX/uP+PY4ki9nY0dQlAKkP2b/XvuTLkMCAwEAAQ== ``` #### MongoDB 连接配置 ```properties # MongoDB 数据库配置 spring.data.mongodb.uri=mongodb://47.97.107.26:37017/admin?replicaSet=rs0 spring.data.mongodb.authentication-database=admin spring.data.mongodb.username=admin spring.data.mongodb.password=zggMongo1248ffbfed78 spring.data.mongodb.database=ygg_dev spring.data.mongodb.auto-index-creation=true ``` ### 3. 编译项目 ```bash mvn clean compile ``` ## 运行项目 ### 1. 直接运行 ```bash mvn exec:java -Dexec.mainClass="com.ygg.Main" ``` ### 2. 打包运行 ```bash # 打包成 JAR 文件 mvn clean package # 运行 JAR 文件 java -jar target/dataflow-1.0-SNAPSHOT.jar ``` ### 3. 开发环境运行 使用 IDE (如 IntelliJ IDEA) 直接运行 `Main.java` 类。 ## 核心功能使用说明 ### 1. 数据提取 系统支持从以下数据源提取数据: #### 从 MySQL 提取元数据 ```java DataExtractionService extractionService = new DataExtractionService(); // 提取所有表结构元数据 List tables = extractionService.extractTableMetadata(); // 提取字段结构元数据 List fields = extractionService.extractFieldMetadata(); ``` #### 从 MongoDB 提取业务数据 ```java // 提取指定集合的所有数据 List data = extractionService.extractMongoData("collectionName"); // 提取增量数据 (基于时间戳) List incrementalData = extractionService.extractMongoIncrementalData("collectionName", startTime, endTime); ``` ### 2. 数据处理 #### 横向连接操作 ```java DataProcessingService processingService = new DataProcessingService(); List leftData = ...; List rightData = ...; // 执行内连接 List joinedData = processingService.performJoin(leftData, rightData, "joinField", JoinType.INNER); ``` #### 分组汇总操作 ```java // 分组汇总 List aggregatedData = processingService.performGroupBy(data, "groupField", "sum(field1), avg(field2)"); ``` #### 公式计算操作 ```java // 公式计算 List calculatedData = processingService.performFormulaCalculation(data, "result = field1 * 2 + field2"); ``` #### 完整数据流处理 ```java // 执行完整的数据处理流程 List result = processingService.processDataFlow(dataFlowConfig); ``` ### 3. 数据存储 #### 写入数据到 MongoDB ```java DataStorageService storageService = new DataStorageService(); // 全量同步 storageService.writeDataToMongo(data, "targetCollection"); // 增量同步 storageService.performIncrementalSync(sourceData, "targetCollection"); ``` #### 查询数据 ```java // 获取集合记录数 long count = storageService.getCollectionCount("collectionName"); // 检查集合是否存在 boolean exists = storageService.collectionExists("collectionName"); ``` ## 数据处理流程 ### 1. 初始化与配置 1. 系统启动时加载数据库配置 2. 初始化数据提取、处理和存储服务 3. 连接到 MySQL 和 MongoDB 数据库 ### 2. 数据提取阶段 1. 从 MySQL 提取表结构和字段元数据 2. 从 MongoDB 提取输入数据源表的数据 3. 支持增量数据提取 ### 3. 数据处理阶段 1. 执行横向连接操作 2. 执行分组汇总操作 3. 执行公式计算操作 4. 执行追加合并操作 5. 根据费用计算规则进行数据处理 ### 4. 数据存储阶段 1. 将处理结果写入到输出表 2. 支持增量同步 3. 确保数据一致性 ## 调度与监控 ### 定时任务 ```java // 配置定时任务 @Scheduled(cron = "0 0 2 * * ?") // 每天凌晨 2 点执行 public void dailyDataSync() { // 执行数据同步逻辑 } ``` ### 任务监控 - 系统会记录任务执行日志到 MongoDB 的 `dev_dataflow_log` 集合 - 支持查询任务执行状态和错误信息 - 实现了简单的健康检查接口 ## 故障排除 ### 1. 数据库连接问题 - 检查 MySQL 和 MongoDB 连接信息是否正确 - 确保数据库服务器正在运行 - 检查防火墙和网络连接 ### 2. 性能问题 - 优化查询条件 - 考虑添加索引 - 调整连接池大小 - 实现分页查询 ### 3. 数据一致性问题 - 检查数据处理逻辑 - 验证计算公式 - 实现数据校验和验证机制 - 使用幂等性操作 ### 4. 常见错误 #### 连接超时 ```java // 增加连接超时时间 spring.datasource.druid.connection-timeout=60000 spring.data.mongodb.socket-timeout=60000 ``` #### 内存不足 ```java // 调整 JVM 内存参数 java -Xmx2g -Xms1g -jar target/dataflow-1.0-SNAPSHOT.jar ``` ## 开发指南 ### 1. 添加新的数据处理操作 #### 实现新的管道操作类 ```java public class CustomOperation implements PipelineOperation { @Override public List execute(List data, Map params) { // 实现自定义操作逻辑 return processedData; } } ``` #### 注册操作到 DataProcessingService ```java public class DataProcessingService { public List performCustomOperation(List data, Map params) { CustomOperation operation = new CustomOperation(); return operation.execute(data, params); } } ``` ### 2. 扩展费用计算规则 #### 修改公式处理操作 ```java public class FormulaOperation { public Object evaluateFormula(String formula, Document data) { // 添加新的公式函数支持 if (formula.contains("CUSTOM_FUNCTION")) { // 自定义函数逻辑 } return result; } } ``` ### 3. 单元测试 ```java @Test public void testJoinOperation() { List leftData = Arrays.asList(new Document("id", 1), new Document("id", 2)); List rightData = Arrays.asList(new Document("id", 1), new Document("id", 3)); List result = new JoinOperation().execute(leftData, rightData, "id", JoinType.INNER); assertEquals(1, result.size()); assertEquals(1, result.get(0).getInteger("id")); } ``` ## 工具脚本使用 ### 1. 解析表结构工具 ```bash cd claude/utils python parse_table_structure.py ``` 功能:解析 MySQL 数据库中的表结构元数据,生成 JSON 文件 ### 2. 修复空 JSON 文件工具 ```bash cd claude/utils python fix_empty_json.py ``` 功能:修复空的或格式不正确的 JSON 文件 ## 文档参考 - **详细设计文档**: `technical_implementation_plan.md` - **项目架构说明**: `CLAUDE.md` - **数据结构分析**: `data_analysis_report.md` - **表结构文档**: `docs/table_structures/` ## 联系方式 如有问题或建议,请联系开发团队。