From ff07e73e49520601ab3a1a8e141211e5969969bb Mon Sep 17 00:00:00 2001 From: jackchaoni Date: Sat, 26 Apr 2025 21:20:37 +0800 Subject: [PATCH 1/5] Init bisect_task for compact --- container/bisect/Dockerfile | 27 ++ container/bisect/bisect-task.py | 685 ++++++++++++++++++++++++++++++++ container/bisect/build | 26 ++ container/bisect/build-depends | 1 + container/bisect/readme.md | 85 ++++ container/bisect/start | 48 +++ container/bisect/start-depends | 1 + 7 files changed, 873 insertions(+) create mode 100644 container/bisect/Dockerfile create mode 100755 container/bisect/bisect-task.py create mode 100755 container/bisect/build create mode 100755 container/bisect/build-depends create mode 100644 container/bisect/readme.md create mode 100755 container/bisect/start create mode 100755 container/bisect/start-depends diff --git a/container/bisect/Dockerfile b/container/bisect/Dockerfile new file mode 100644 index 000000000..989cb1e7c --- /dev/null +++ b/container/bisect/Dockerfile @@ -0,0 +1,27 @@ +# SPDX-License-Identifier: MulanPSL-2.0+ +# Copyright (c) 2024 Huawei Technologies Co., Ltd. All rights reserved. +FROM alpine:latest + +RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories \ + && apk update + +RUN apk update --no-cache && \ + apk upgrade && \ + apk add --no-cache git coreutils util-linux ruby bash python3 python3-dev py3-pip py3-elasticsearch py3-requests py3-yaml py3-httpx py3-flask + +RUN gem install rest-client + +RUN addgroup -S bisect && adduser -S bisect -G bisect + +ENV WORK_DIR /home/bisect +ENV RUNTIME_DIR /c + +RUN mkdir $RUNTIME_DIR && \ + chown -R bisect:bisect $RUNTIME_DIR + +WORKDIR $WORK_DIR + +COPY --chown=bisect:bisect bisect-task.py $WORK_DIR +COPY --chown=bisect:bisect compass-ci $RUNTIME_DIR/compass-ci +COPY --chown=bisect:bisect lkp-tests $RUNTIME_DIR/lkp-tests + diff --git a/container/bisect/bisect-task.py b/container/bisect/bisect-task.py new file mode 100755 index 000000000..2f703debb --- /dev/null +++ b/container/bisect/bisect-task.py @@ -0,0 +1,685 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: MulanPSL-2.0+ +# Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. +# Input: manticore JSON +# +# functions: +# +# provide API/new-bisect-task +# add to bisect_tasks +# loop: +# consumer: +# one task from bisect_tasks +# fork process, start bisect +# producer: +# scan job db +# add failed task to bisect_tasks +# +import json +import re +import subprocess +import yaml +import hashlib +import os +import sys +import uuid +import shutil +import time +import threading +import logging +import traceback +from flask import Flask, jsonify, request +from flask.views import MethodView +from httpx import RequestError + +sys.path.append((os.environ['LKP_SRC']) + '/programs/bisect-py/') +from py_bisect import GitBisect +import mysql.connector +from mysql.connector import Error + + +app = Flask(__name__) + + + +class BisectTask: + def __init__(self): + self.bisect_task = None + + def add_bisect_task(self, task): + """ + Add a new bisect task to the Elasticsearch 'bisect_task' index. + + Args: + task (dict): A dictionary containing task information. It must include the following fields: + - bad_job_id: The associated bad job ID. + - error_id: The associated error ID. + + Returns: + bool: Whether the task was successfully added. + """ + # Parameter validation + required_fields = ["bad_job_id", "error_id"] + for field in required_fields: + if field not in task: + raise ValueError(f"Missing required field: {field}") + + job_info = self.get_job_info_from_manticore(task["bad_job_id"]) + print(job_info) + error_id = job_info.get('error_id') or "none" + suite = job_info.get('suite') or "none" + repo = job_info.get('upstream_repo') or "none" + task_fingerprint = hashlib.sha256((error_id+suite+repo).encode()).hexdigest() + + if self.manticore_query(f'SELECT * FROM bisect_task WHERE id={task_fingerprint}'): + logging.info(f"Task already exists: bad_job_id={task['bad_job_id']}, error_id={task['error_id']}, id={task_fingerprint}") + return False + try: + # Set priority_level + task["priority_level"] = self.set_priority_level(job_info) + task["bisect_status"] = "wait" + task["id"] = task_fingerprint + # If the task does not exist, add it to the 'bisect_task' index + self.manticore_insert("") + logging.info(f"Added new task to bisect_index: {task} with {task_fingerprint}") + return True + except Exception as e: + logging.error(f"Failed to add task: {e}") + return False + + def get_job_info_from_manticore(self, job_id): + job_json = self.manticore_query(f'SELECT j FROM jobs WHERE id={job_id}') + if not job_json: + return {} + first_row = job_json[0] + return json.loads(first_row.get('j', {})) + + def set_priority_level(self, job_info: dict) -> int: + """ + """ + WATCH_LISTS = { + "suite": ["check_abi", "pkgbuild"], # 监控的测试套件 + "repo": ["linux"], # 监控的代码仓库 + "error_id": ["stderr.eid../include/linux/thread_info.h:#:#:error:call_to'__bad_copy_from'declared_with_attribute_error:copy_source_size_is_too_small"] # 监控的错误ID + } + # 优先级权重配置 + PRIORITY_WEIGHTS = { + "suite": 2, + "repo": 1, + "error_id": 3 + } + + priority = 0 + + for field, weight in PRIORITY_WEIGHTS.items(): + # 获取任务字段值(确保返回字符串) + job_value = job_info.get(field, "") # 默认空字符串 + + # 获取监控列表 + watch_list = WATCH_LISTS.get(field, []) + + # 检查值是否在监控列表中 + if job_value in watch_list: + priority += weight + + return priority + + def bisect_producer(self): + """ + Producer function to fetch new bisect tasks from Elasticsearch and add them to the bisect index if they don't already exist. + This function runs in an infinite loop, checking for new tasks every 5 minutes. + """ + error_count = 0 + while True: + cycle_start = time.time() + try: + logging.info("Starting producer cycle...") + # Fetch new bisect tasks from Elasticsearch + new_bisect_tasks = self.get_new_bisect_task_from_manticore() + logging.info(f"Found {len(new_bisect_tasks)} new bisect tasks") + logging.debug(f"Raw tasks data: {new_bisect_tasks}") + + # Process each task + processed_count = 0 + for task in new_bisect_tasks: + try: + self.add_bisect_task(task) + except Exception as e: + logging.error(f"Error processing task {task.get('id', 'unknown')}: {str(e)}") + continue + + except Exception as e: + # 添加详细错误日志 + logging.error(f"Error in bisect_producer: {str(e)}") + logging.error(f"Failed task data: {task if 'task' in locals() else 'No task data'}") + logging.error(traceback.format_exc()) # 打印完整堆栈跟踪 + # 指数退避重试机制 + sleep_time = min(300, 2 ** error_count) + time.sleep(sleep_time) + error_count += 1 + else: + # 正常执行后重置错误计数器 + error_count = 0 + cycle_time = time.time() - cycle_start + logging.info(f"Producer cycle completed. Processed {processed_count} tasks in {cycle_time:.2f} seconds") + # 固定间隔休眠,两个循环之间的间隔永远为300秒,无论每次循环的执行时间为多少 + sleep_time = 300 - cycle_time + logging.info(f"Sleeping for {sleep_time:.2f} seconds until next cycle") + time.sleep(sleep_time) + + def bisect_consumer(self): + """ + Consumer function to fetch bisect tasks from Elasticsearch and process them. + This function runs in an infinite loop, checking for tasks every 30 seconds. + Tasks are either submitted to a scheduler or run locally, depending on the environment variable 'bisect_mode'. + """ + while True: + cycle_start = time.time() + processed = 0 + try: + # Fetch bisect tasks from Elasticsearch + bisect_tasks = self.get_tasks_from_bisect_task() + if not bisect_tasks: + time.sleep(30) + continue + + if bisect_tasks: # If tasks are found + # Check the mode of operation from environment variable + if os.getenv('bisect_mode') == "submit": + # If mode is 'submit', send tasks to the scheduler + logging.debug("Submitting bisect tasks to scheduler") + self.submit_bisect_tasks(bisect_tasks) + else: + # If mode is not 'submit', run tasks locally + logging.debug("Running bisect tasks locally") + self.run_bisect_tasks(bisect_tasks) + processed = len(bisect_tasks) + + except Exception as e: + # Log any errors that occur during task processing + logging.error(f"Error in bisect_consumer: {e}") + # 异常后休眠时间加倍(简易熔断机制) + time.sleep(60) + finally: + # 记录处理指标 + cycle_time = time.time() - cycle_start + logging.info(f"Consumer cycle processed {processed} tasks in {cycle_time:.2f}s") + + # 动态休眠控制(无任务时延长休眠) + sleep_time = 30 if processed > 0 else 60 + time.sleep(max(10, sleep_time - cycle_time)) # 保证最小间隔 + + def run_bisect_tasks(self, bisect_tasks): + """ + Process a list of bisect tasks locally by running Git bisect to find the first bad commit. + Updates the task status in Elasticsearch to 'processing' before starting the bisect process, + and updates the result after the bisect process completes. + + :param bisect_tasks: List of bisect tasks to process. + """ + for bisect_task in bisect_tasks: + task_id = bisect_task.get('id', 'unknown') + print(bisect_task) + try: + #检查bisect_task的状态,乐观锁 + if bisect_task.get('bisect_status') != 'wait': + logging.warning(f"Skipping task {task_id} with invalid status: {bisect_task['bisect_status']}") + continue + # Update task status to 'processing' in Elasticsearch + bisect_task["bisect_status"] = "processing" + # Convert to Manticore SQL update + update_sql = f""" + UPDATE bisect_task + SET bisect_status = 'processing' + WHERE id = '{bisect_task["id"]}' + """ + #TODO UPDATE + self.manticore_update(update_sql) + logging.debug(f"Started processing task: {bisect_task['id']}") + + # Prepare task data for Git bisect with result root + # Create unique temporary directory with cleanup + task_id = str(bisect_task['id']) + # Create unique clone path using task ID + clone_path = os.path.join( + ) + task = { + 'bad_job_id': bisect_task['bad_job_id'], + 'error_id': bisect_task['error_id'], + 'bisect_result_root': f"/tmp/bisect/{bisect_task['bad_job_id']}", + 'clone_path': clone_path + } + if os.path.exists(task['bisect_result_root']): + shutil.rmtree(task['bisect_result_root']) + os.makedirs(task['bisect_result_root'], exist_ok=True) + + # Handle bad_job_id conversion with validation + try: + gb = GitBisect() + result = gb.find_first_bad_commit(task) + except (ValueError, KeyError) as e: + raise ValueError(f"Invalid bad_job_id: {task.get('bad_job_id')}") from e + + # Update task status and result in Elasticsearch + bisect_task["bisect_status"] = "completed" + bisect_task["bisect_result"] = result + # Convert to Manticore SQL update + update_sql = f""" + UPDATE bisect_task + SET bisect_status = 'completed', + bisect_result = '{json.dumps(bisect_task["bisect_result"])}' + WHERE id = {bisect_task["id"]} + """ + # TODO update + self.manticore_update(update_sql) + logging.debug(f"Completed processing task: {bisect_task['id']}") + + except Exception as e: + # Update task status to 'failed' in case of an error + bisect_task["bisect_status"] = "failed" + bisect_task["bisect_result"] = str(e) + # Convert to Manticore SQL update + # Remove bisect_result column from update + update_sql = f""" + UPDATE bisect_task + SET bisect_status = 'failed' + WHERE id = '{bisect_task["id"]}' + """ + self.manticore_query(update_sql) + logging.error(f"Marked task {bisect_task['id']} as failed due to error: {e}") + + def submit_bisect_tasks(self, bisect_tasks): + """ + Submit a list of bisect tasks to the scheduler if they are not already in the database. + Each task is checked against the database to avoid duplicate submissions. + + :param bisect_tasks: List of bisect tasks to submit. + """ + # Define the query to check if a bisect task already exists in the database + query_if_bisect_already_in_db = { + "_source": ["id"], # 只需返回ID字段验证存在性 + "query": { + "bool": { + "must": [ + {"term": {"error_id": None}}, # 占位符,实际替换具体值 + {"term": {"bad_job_id": None}}, + {"exists": {"field": "id"}}, + {"term": {"suite": "bisect-py"}} + ] + } + } + } + + # Process each bisect task + for bisect_task in bisect_tasks: + task_id = bisect_task["id"] + try: + # Check if the task already exists in the database + current_query = query_if_bisect_already_in_db.copy() + current_query["query"]["bool"]["must"][0]["term"]["error_id"] = bisect_task["error_id"] + current_query["query"]["bool"]["must"][1]["term"]["bad_job_id"] = bisect_task["bad_job_id"] + if not self.es_query("jobs8", current_query): + # If the task does not exist, submit it to the scheduler + result = self.submit_bisect_job(bisect_task["bad_job_id"], bisect_task["error_id"]) + if result: + logging.info(f"Submitted bisect task to scheduler: {bisect_task['id']}") + else: + logging.error(f"Submission failed for task {task_id}") + else: + # If the task already exists, log a message + logging.debug(f"Job already in db: {bisect_task['id']}") + except KeyError as e: + # 处理任务数据格式错误 + logging.error(f"Invalid task format {task_id}: missing {str(e)}") + except Exception as e: + # 添加重试机制 + retry_count = 0 + while retry_count < 3: + logging.error(f"Submission failed for task {task_id} {retry_count+1} times") + try: + current_query = query_if_bisect_already_in_db.copy() + current_query["query"]["bool"]["must"][0]["term"]["error_id"] = bisect_task["error_id"] + current_query["query"]["bool"]["must"][1]["term"]["bad_job_id"] = bisect_task["bad_job_id"] + if not self.es_query("jobs8", current_query): + result = self.submit_bisect_job(bisect_task["bad_job_id"], bisect_task["error_id"]) + if result: + logging.info(f"Submitted bisect task to scheduler: {bisect_task['id']}") + break + except Exception: + retry_count += 1 + time.sleep(2 ** retry_count) + + def get_tasks_from_bisect_task(self): + """ + Search for bisect tasks with status 'wait' using Manticore SQL. + Returns the list of tasks if found, otherwise returns None. + + :return: List of bisect tasks with status 'wait', or None if no tasks are found. + """ + # Define SQL query + sql = """ + SELECT * + FROM bisect_task + WHERE bisect_status = 'wait' + LIMIT 100 + """ + + result = self.manticore_query(sql) + + # Return the result if tasks are found, otherwise return None + if result: + return result + else: + return None + + + def execute_sql(self, sql, db=None, write_operation=False): + """Base method for executing SQL statements""" + try: + host = os.environ.get('MANTICORE_HOST', 'localhost') + port = os.environ.get('MANTICORE_PORT', '9306') + database = db or os.environ.get('MANTICORE_DB', 'jobs') + + connection = mysql.connector.connect( + host=host, + port=port, + database=database, + connect_timeout=5 + ) + + cursor = connection.cursor(dictionary=True) + + if write_operation: + cursor.execute(sql) + connection.commit() + return cursor.rowcount + else: + cursor.execute(sql) + result = cursor.fetchall() + if cursor.with_rows: + cursor.fetchall() # Consume unread results + return result if result else None + + except mysql.connector.Error as e: + logging.error(f"Manticore operation failed: {e}\nSQL: {sql}") + return None + except Exception as e: + logging.error(f"Unexpected error: {str(e)}\nSQL: {sql}") + return None + finally: + if 'cursor' in locals(): + cursor.close() + if 'connection' in locals() and connection.is_connected(): + connection.close() + + def manticore_query(self, sql, db=None): + """Execute read query and return results""" + return self.execute_sql(sql, db, write_operation=False) + + def manticore_insert(self, table, data, db=None): + """Safe INSERT operation with parameterized query""" + if not data: + return None + + columns = ', '.join(data.keys()) + values = ', '.join([f"'{v}'" if isinstance(v, str) else str(v) for v in data.values()]) + sql = f"INSERT INTO {table} ({columns}) VALUES ({values})" + + return self.execute_sql(sql, db, write_operation=True) + + def manticore_update(self, table, updates, condition, db=None): + """Safe UPDATE operation with parameterized query""" + if not updates or not condition: + return None + + set_clause = ', '.join([ + f"{k} = '{v}'" if isinstance(v, str) else f"{k} = {v}" + for k, v in updates.items() + ]) + where_clause = ' AND '.join([ + f"{k} = '{v}'" if isinstance(v, str) else f"{k} = {v}" + for k, v in condition.items() + ]) + sql = f"UPDATE {table} SET {set_clause} WHERE {where_clause}" + + return self.execute_sql(sql, db, write_operation=True) + + def manticore_delete(self, table, condition, db=None): + """Safe DELETE operation with parameterized query""" + if not condition: + return None + + where_clause = ' AND '.join([ + f"{k} = '{v}'" if isinstance(v, str) else f"{k} = {v}" + for k, v in condition.items() + ]) + sql = f"DELETE FROM {table} WHERE {where_clause}" + + return self.execute_sql(sql, db, write_operation=True) + + def submit_bisect_job(self, bad_job_id, error_id): + """ + Submit a bisect job to the scheduler using the provided bad_job_id and error_id. + The job is submitted via a shell command, and the job ID is extracted from the response. + + :param bad_job_id: The ID of the bad job to be bisected. + :param error_id: The error ID associated with the bad job. + :return: The job ID if submission is successful, otherwise None. + """ + + try: + submit_command = f"{os.environ['LKP_SRC']}/sbin/submit runtime=36000 bad_job_id={bad_job_id} error_id={error_id} bisect-py.yaml" + result = subprocess.run( + submit_command, + shell=True, + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + timeout=60) + match = re.search(r'id=(\S+)', result.stdout) + if match: + job_id = match.group(1) + logging.info(f"Job submitted successfully. Job ID: {job_id}") + return job_id + else: + logging.error(f"Unexpected submit output: {result.stdout}") + return None + except subprocess.CalledProcessError as e: + logging.error(f"Job submission failed with return code {e.returncode}.") + return None + except subprocess.TimeoutExpired: + # 处理命令执行超时 + logging.error("Submit command timed out after 30 seconds") + return None + except KeyError: + # 处理LKP_SRC环境变量缺失 + logging.error("LKP_SRC environment variable not configured") + return None + except Exception as e: + # 兜底异常处理 + logging.error(f"Unexpected error during job submission: {str(e)}") + return None + + def get_new_bisect_task_from_manticore(self): + """ + Fetch new bisect tasks using Manticore SQL for both PKGBUILD and SS suites. + Tasks are filtered based on a white list of error IDs and processed into a standardized format. + + :return: A list of processed tasks that match the white list criteria. + """ + # Define SQL for PKGBUILD tasks + # TODO: AND submit_time > NOW() - INTERVAL 7 DAY + sql_failure = """ + SELECT id, j.stats as errid + FROM jobs + WHERE j.job_health = 'abort' + AND j.stats IS NOT NULL + ORDER BY id DESC + """ + + # Define SQL for SS tasks + sql_ss = """ + SELECT id, stats, ss + FROM jobs + WHERE job_health = 'failed' + AND ss IS NOT NULL + AND stats IS NOT NULL + """ + + # Define the white list of error IDs + errid_white_list = ["last_state.eid.test..exit_code.99"] + + # Execute Manticore SQL queries + result = self.manticore_query(sql_failure) + + # Convert the list of tasks into a dictionary with task IDs as keys + # 添加详细日志记录原始数据格式 + logging.debug(f"Raw query result sample: {result[:1] if result else 'Empty result'}") + + result_dict = {} + for item in result: + try: + item_id = item['id'] + result_dict[item_id] = item + except KeyError: + logging.warning(f"Skipping invalid item missing 'id' field: {item}") + # Process the tasks to filter and transform them based on the white list + tasks = self.process_data(result, errid_white_list) + + # Return the processed tasks + return tasks + + def process_data(self, input_data, white_list): + """ + Process input data to filter and transform it based on a white list of error IDs. + Each valid entry is assigned a new UUID and added to the result list. + + :param input_data: A list of dictionaries containing the input data to process + :param white_list: A list of error IDs to filter by. + :return: A list of processed documents, each containing a new UUID and filtered error ID. + """ + result = [] + + # Iterate over each item in the input list + for item in input_data: + try: + # Parse the JSON string in errid field + error_ids = json.loads(item["errid"]).keys() + + # Find matching error IDs from white list + matches = [errid for errid in error_ids if errid in white_list] + if not matches: + continue + + # Create new document with generated UUID and original bad_job_id + document = { + "id": str(uuid.uuid4()), # Generate UUID for task ID + "bad_job_id": str(item["id"]), # Keep original as string + "error_id": matches[0], + "bisect_status": "wait" + } + result.append(document) + + except (KeyError, json.JSONDecodeError) as e: + logging.warning(f"Skipping invalid item {item.get('id')}: {str(e)}") + + return result + + def check_existing_bisect_task(self, bad_job_id, error_id): + """ + Check if a bisect task with the given bad_job_id and error_id already exists using Manticore SQL. + + :param bad_job_id: The ID of the bad job to check. + :param error_id: The error ID associated with the bad job. + :return: Boolean indicating if task exists. + """ + sql = f""" + SELECT 1 AS exist + FROM bisect_task + WHERE bad_job_id = '{bad_job_id}' + AND error_id = '{error_id}' + LIMIT 1 + """ + + try: + result = self.manticore_query(sql, db="bisect_task") + if not result: # Handle None result + logging.warning(f"No results from existence check query for {bad_job_id}/{error_id}") + return False + + return result + + except KeyError as e: + logging.error(f"manticore response structure: {str(e)}") + return True + except Exception as e: + logging.error(f"Unexpected error during existence check: {str(e)}") + return True + + +class BisectAPI(MethodView): + def __init__(self): + self.bisect_api = BisectTask() + + def post(self): + task = request.json + print(task) + if not task: + return jsonify({"error": "No data provided"}), 400 + self.bisect_api.add_bisect_task(task) + return jsonify({"message": "Task added successfully"}), 200 + +def set_log(): + stream_handler = logging.StreamHandler() + console_formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(module)s:%(lineno)d - %(message)s' + ) + + stream_handler.setFormatter(console_formatter) + stream_handler.setLevel(logging.DEBUG if os.getenv('LOG_LEVEL') == 'DEBUG' else logging.INFO) + + logging.basicConfig( + level=logging.DEBUG, + handlers=[stream_handler], + format='%(asctime)s.%(msecs)03d %(levelname)s %(name)s:%(lineno)d - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + + logger = logging.getLogger('bisect-task') + logger.setLevel(logging.DEBUG) + logger.addHandler(stream_handler) + logger.propagate = False + + + +def run_flask(): + app.add_url_rule('/new_bisect_task', view_func=BisectAPI.as_view('bisect_api')) + app.run(host='0.0.0.0', port=9999) + +def main(): + try: + # 先启动后台任务 + set_log() + run = BisectTask() + bisect_producer_thread = threading.Thread(target=run.bisect_producer, daemon=True) + bisect_producer_thread.start() + # 在独立线程运行Flask + flask_thread = threading.Thread(target=run_flask, daemon=True) + flask_thread.start() + + num_consumer_threads = 2 + for i in range(num_consumer_threads): + bisect_consumer_thread = threading.Thread(target=run.bisect_consumer, daemon=True) + bisect_consumer_thread.start() + + # 主线程保持活跃 + while True: + time.sleep(3600) # 防止主线程退出 + except Exception as e: + print("Error when init_bisect_commit:" + e) + sys.exit(-1) + + +if __name__ == "__main__": + main() diff --git a/container/bisect/build b/container/bisect/build new file mode 100755 index 000000000..373149215 --- /dev/null +++ b/container/bisect/build @@ -0,0 +1,26 @@ +#!/bin/bash +# SPDX-License-Identifier: MulanPSL-2.0+ +# Copyright (c) 2024 Huawei Technologies Co., Ltd. All rights reserved. + +. ../defconfig.sh + +docker_skip_rebuild "bisect" + +load_cci_defaults + +if [ ! -d compass-ci ]; then + git clone https://gitee.com/openeuler/compass-ci.git +fi +if [ ! -d lkp-tests ]; then + git clone https://gitee.com/compass-ci/lkp-tests.git +fi + +if [ $? -ne 0 ]; then + echo "Failed to clone lkp-tests, copying SRC directory instead..." + cp -r "$LKP_SRC" lkp-tests +fi + +docker build -t bisect . + +push_image bisect:latest + diff --git a/container/bisect/build-depends b/container/bisect/build-depends new file mode 100755 index 000000000..5dd403e16 --- /dev/null +++ b/container/bisect/build-depends @@ -0,0 +1 @@ +scheduler-dev diff --git a/container/bisect/readme.md b/container/bisect/readme.md new file mode 100644 index 000000000..391f5fcb5 --- /dev/null +++ b/container/bisect/readme.md @@ -0,0 +1,85 @@ +## bisect submit + +### way1: + run in special testbox + +ensure security: +- account/token +- visit services + + +common testbox: + submit upload lkp-tests tar ball, run in testbox + +secure testbox: + submit NO UPLOAD lkp-tests tar ball + run only selected list of programs + run in a dedicated host machine, FW can visit services + + +### way2: +submit job1 + submit job2, reuse same account info +problem: consumes his credit / machine time + +### way3: +compass ci container/ run as service +refer to delimiter service + system bisect account + + provide API/new-bisect-task + add to ES + + loop: + consume one task from ES + fork process, start bisect + bisect step + submit job + change bisect-task state=finish + +submit-jobs + +### regression db +### bisect task queue +### bisect tasks management and dashboard + + bisect task name + suite start_time/run_time step group_id + +# Bisect 任务数据库设计文档 + +## 主表结构 (`bisect_tasks`) + +| 字段名称 | 类型 | 是否可为空 | 默认值 | 描述 | 索引建议 | +|----------------------|------------|------------|--------|----------------------------------------------------------------------|-------------------| +| id | keyword | NOT NULL | - | 任务唯一标识符(UUIDv4) | 主键(PK) | +| bad_job_id | keyword | NOT NULL | - | 需要二分法分析的原始Job ID | 普通索引(IDX_1) | +| error_id | keyword | NOT NULL | - | 错误标识符(需配合去重机制) | 唯一索引(UQ_1) | +| bisect_metrics | keyword | NULL | - | 性能指标名称(仅性能测试场景使用) | 条件索引(IDX_5) | +| priority_level | integer | NOT NULL | 1 | 优先级级别:0-低/1-中/2-高 | 排序索引(IDX_8) | +| bisect_status | keyword | NOT NULL | pending| 任务状态枚举:pending/running/paused/success/failed/retrying | 状态索引(IDX_2) | +| first_bad_commit | keyword | NULL | - | 通过bisect定位的首个问题提交 | 覆盖索引(IDX_6) | +| project | keyword | NOT NULL | - | 所属项目名称(格式:org/repo) | 组合索引(IDX_3) | +| pkgbuild_repo | keyword | NULL | - | 软件包构建仓库地址 | - | +| git_url | keyword | NULL | - | 上游代码仓库URL | - | +| bisect_suite | keyword | NOT NULL | - | 测试套件标识符(用于跨任务分析) | 组合索引(IDX_3) | +| first_bad_job_id | keyword | NULL | - | 问题提交对应的首个失败Job ID | 外键索引(FK_1) | +| first_result_root | text | NULL | - | 首次失败结果存储路径(OSS路径格式) | - | +| work_dir | text | NOT NULL | - | 任务工作目录(格式:/bisect/yyyy-mm-uuid/) | 前缀索引(IDX_4) | +| start_time | date | NULL | - | 任务实际开始时间(ISO8601) | 范围索引(IDX_7) | +| end_time | date | NULL | - | 任务结束时间(成功/失败时更新) | 范围索引(IDX_7) | +| commit_history | text | NOT NULL | - | 提交范围(格式:commit1...commit2) | - | +| timeout | integer | NOT NULL | 3600 | 超时阈值(秒) | - | + +## 嵌套表 (`job_commit_mappings`) + +| 字段名称 | 类型 | 是否可为空 | 描述 | 索引建议 | +|-----------------|------------|------------|----------------------------------------------------------------------|-------------------| +| job_id | keyword | NOT NULL | 关联的CI Job标识符 | 联合主键(PK_2) | +| commit_hash | keyword | NOT NULL | Git提交哈希(完整40位) | 覆盖索引(IDX_9) | +| metric_value | text | NULL | 性能指标数值(JSON格式存储多维度指标) | - | +| result_root | keyword | NOT NULL | 结果存储路径(OSS路径) | 前缀索引(IDX_10) | +| status | keyword | NOT NULL | 判定状态:bad/good/skip | 位图索引(BIT_1) | +| timestamp | date | NOT NULL | 任务执行时间戳(精确到毫秒) | 排序索引(IDX_11) | + + diff --git a/container/bisect/start b/container/bisect/start new file mode 100755 index 000000000..fc69adbd1 --- /dev/null +++ b/container/bisect/start @@ -0,0 +1,48 @@ +#!/usr/bin/env ruby +# SPDX-License-Identifier: MulanPSL-2.0+ +# Copyright (c) 2024 Huawei Technologies Co., Ltd. All rights reserved. +# frozen_string_literal: true + +require 'set' +require_relative '../defconfig.rb' + +names = Set.new %w[ + ES_HOST + ES_PORT + ES_USER + ES_PASSWORD +] + +defaults = relevant_defaults(names) + +service_authentication = relevant_service_authentication(names) +defaults.merge!(service_authentication) + +env = docker_env(defaults) + +DEFAULT_LKP = '/c/lkp-tests' +DEFAULT_CCI = '/c/compass-ci' +DEFAULT_CONFIG_DIR = '/etc/compass-ci/defaults' +DEFAULT_USER_CONFIG_DIR = File.expand_path("~/.config/compass-ci/") +DEFAULT_BISECT_CONFIG_DIR = '/root/.config/compass-ci/' +docker_rm 'bisect' + +cmd = %w[ + docker run + --name bisect + --restart=always + -itd +] + env + %W[ + -e LKP_SRC=#{DEFAULT_LKP} + -e CCI_SRC=#{DEFAULT_CCI} + -v #{DEFAULT_CONFIG_DIR}:#{DEFAULT_CONFIG_DIR}:ro + -v #{DEFAULT_USER_CONFIG_DIR}:#{DEFAULT_BISECT_CONFIG_DIR}:ro + -v /etc/localtime:/etc/localtime:ro + -v /etc/compass-ci/register:/etc/compass-ci/register:ro + -p 9999:9999 + bisect +] + +cmd += ['sh', '-c', 'umask 002 && python3 bisect-task.py'] + +system(*cmd) diff --git a/container/bisect/start-depends b/container/bisect/start-depends new file mode 100755 index 000000000..d94ff356a --- /dev/null +++ b/container/bisect/start-depends @@ -0,0 +1 @@ +register-accounts -- Gitee From c854e871cf37b1e10a4dd37203a5f7080919023d Mon Sep 17 00:00:00 2001 From: jackchaoni Date: Sat, 26 Apr 2025 21:22:33 +0800 Subject: [PATCH 2/5] Add bisect task database --- sbin/create-manticore-tables.sh | 1 + sbin/manti-table-bisect.sql | 21 +++++++++++++++++++++ 2 files changed, 22 insertions(+) create mode 100755 sbin/manti-table-bisect.sql diff --git a/sbin/create-manticore-tables.sh b/sbin/create-manticore-tables.sh index 7e62ab530..f2bf75037 100755 --- a/sbin/create-manticore-tables.sh +++ b/sbin/create-manticore-tables.sh @@ -17,3 +17,4 @@ echo $CURL "mode=raw&query=desc jobs" $CURL "mode=raw&query=desc hosts" $CURL "mode=raw&query=desc accounts" +$CURL "mode=raw&query=desc bisect_tasks" diff --git a/sbin/manti-table-bisect.sql b/sbin/manti-table-bisect.sql new file mode 100755 index 000000000..a26761562 --- /dev/null +++ b/sbin/manti-table-bisect.sql @@ -0,0 +1,21 @@ +CREATE TABLE bisect_tasks( + id string, + bad_job_id string, + error_id string, + bisect_metrics string, + bisect_status string, + project string, + pkgbuild_repo string, + git_url string, + bisect_suite string, + bad_commit string, + first_bad_id string, + first_result_root string, + work_dir string, + start_time bigint, + end_time bigint, + priority_level integer, + commit_history string, + timeout integer, + job_commit_mappings json +) charset_table='U+0021..U+007E'; -- Gitee From 131d47348b05599d0fa353994ff497c0c985bd54 Mon Sep 17 00:00:00 2001 From: jacknichao Date: Tue, 6 May 2025 21:00:58 +0800 Subject: [PATCH 3/5] bisect: Introduce several improvements and refactoring This commit introduces the following changes: 1. Add requirement for bisect-task.py 2. Rename "bisect-task" database to "bisect" database 3. Migrate from Elasticsearch (ES) to Manticore 4. Implement error_id search in regression tests; bisect all error_ids if none found 5. Introduce two new APIs: - Delete failed bisect tasks - List all bisect tasks BREAKING CHANGE: - Database name changed from "bisect-task" to "bisect", requiring migration of existing data - Elasticsearch dependencies removed, ensure Manticore is configured before deployment --- container/bisect/Dockerfile | 8 +- container/bisect/bisect-task.py | 712 +++++++++++++++++--------------- container/bisect/start | 4 +- 3 files changed, 392 insertions(+), 332 deletions(-) diff --git a/container/bisect/Dockerfile b/container/bisect/Dockerfile index 989cb1e7c..9b9bb6fee 100644 --- a/container/bisect/Dockerfile +++ b/container/bisect/Dockerfile @@ -7,10 +7,13 @@ RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories RUN apk update --no-cache && \ apk upgrade && \ - apk add --no-cache git coreutils util-linux ruby bash python3 python3-dev py3-pip py3-elasticsearch py3-requests py3-yaml py3-httpx py3-flask + apk add --no-cache git coreutils util-linux ruby bash python3 python3-dev py3-pip \ + mariadb-dev build-base mariadb-connector-c \ + py3-requests py3-yaml py3-httpx py3-flask py3-numpy py3-gitpython -RUN gem install rest-client +RUN gem install rest-client +RUN pip3 install mysql-connector-python --break-system-packages RUN addgroup -S bisect && adduser -S bisect -G bisect ENV WORK_DIR /home/bisect @@ -25,3 +28,4 @@ COPY --chown=bisect:bisect bisect-task.py $WORK_DIR COPY --chown=bisect:bisect compass-ci $RUNTIME_DIR/compass-ci COPY --chown=bisect:bisect lkp-tests $RUNTIME_DIR/lkp-tests +USER bisect diff --git a/container/bisect/bisect-task.py b/container/bisect/bisect-task.py index 2f703debb..4fe210174 100755 --- a/container/bisect/bisect-task.py +++ b/container/bisect/bisect-task.py @@ -19,80 +19,114 @@ import json import re import subprocess import yaml -import hashlib import os import sys -import uuid import shutil import time import threading -import logging import traceback +import mysql.connector +from random import randint from flask import Flask, jsonify, request from flask.views import MethodView from httpx import RequestError +from functools import wraps + sys.path.append((os.environ['LKP_SRC']) + '/programs/bisect-py/') from py_bisect import GitBisect -import mysql.connector -from mysql.connector import Error +from log_config import logger +sys.path.append((os.environ['CCI_SRC']) + '/lib') +from bisect_database import BisectDB app = Flask(__name__) - class BisectTask: def __init__(self): self.bisect_task = None + # Initialize read-only jobs database connection + self.jobs_db = BisectDB( + host=os.environ.get('MANTICORE_HOST', 'localhost'), + port=os.environ.get('MANTICORE_PORT', '9306'), + database="jobs", + readonly=True, + pool_size=10 + ) + + # Initialize read-write bisect database connection + self.bisect_db = BisectDB( + host=os.environ.get('MANTICORE_HOST', 'localhost'), + port=os.environ.get('MANTICORE_PORT', '9306'), + database="bisect", + pool_size=15 + ) + + self.regression_db = BisectDB( + host=os.environ.get('MANTICORE_HOST', 'localhost'), + port=os.environ.get('MANTICORE_PORT', '9306'), + database="regression", + pool_size=5 + ) + self._start_monitor() - def add_bisect_task(self, task): - """ - Add a new bisect task to the Elasticsearch 'bisect_task' index. - - Args: - task (dict): A dictionary containing task information. It must include the following fields: - - bad_job_id: The associated bad job ID. - - error_id: The associated error ID. - Returns: - bool: Whether the task was successfully added. - """ - # Parameter validation + def add_bisect_task(self, task): + """Add a new bisect task (atomic operation version)""" required_fields = ["bad_job_id", "error_id"] for field in required_fields: if field not in task: raise ValueError(f"Missing required field: {field}") - job_info = self.get_job_info_from_manticore(task["bad_job_id"]) - print(job_info) - error_id = job_info.get('error_id') or "none" - suite = job_info.get('suite') or "none" - repo = job_info.get('upstream_repo') or "none" - task_fingerprint = hashlib.sha256((error_id+suite+repo).encode()).hexdigest() - - if self.manticore_query(f'SELECT * FROM bisect_task WHERE id={task_fingerprint}'): - logging.info(f"Task already exists: bad_job_id={task['bad_job_id']}, error_id={task['error_id']}, id={task_fingerprint}") - return False try: - # Set priority_level - task["priority_level"] = self.set_priority_level(job_info) - task["bisect_status"] = "wait" - task["id"] = task_fingerprint - # If the task does not exist, add it to the 'bisect_task' index - self.manticore_insert("") - logging.info(f"Added new task to bisect_index: {task} with {task_fingerprint}") - return True + # Generate unique identifier based on bad_job_id and error_id + task_fingerprint = int(time.time() * 1e6) + randint(0, 999) + time.sleep(5) + if self.bisect_db.execute_query(f"""SELECT id FROM bisect WHERE error_id='{task['error_id']}' AND bad_job_id='{task['bad_job_id']}'"""): + logger.info(f"Already have bisect job {task} in database") + return True + # Set priority (ensure integer type) + #priority = self.set_priority_level(job_info) + + # Atomic insert operation + insert_sql = f""" + INSERT INTO bisect + (id, bad_job_id, error_id, bisect_status) + VALUES ('{task_fingerprint}', '{task.get("bad_job_id", "")}', '{task.get("error_id", "")}', 'wait') + """ + # Execute write and get affected rows + affected_rows = self.bisect_db.execute_write(insert_sql) + + # Determine result based on affected rows + if affected_rows == 1: + logger.info(f"Successfully inserted new task | ID: {task_fingerprint}") + return True + elif affected_rows == 0: + logger.debug(f"Task already exists | ID: {task_fingerprint}") + return False + else: + logger.warning(f"Unexpected affected rows: {affected_rows}") + return False + + except mysql.connector.Error as e: + if e.errno == 1062: # Duplicate entry error code + logger.debug(f"Task duplicate (caught at database level) | ID: {task_fingerprint}") + return False + else: + logger.error(f"Database error | Code: {e.errno} | Message: {e.msg}") + return False except Exception as e: - logging.error(f"Failed to add task: {e}") + logger.error(f"Unknown error: {str(e)}") return False - def get_job_info_from_manticore(self, job_id): - job_json = self.manticore_query(f'SELECT j FROM jobs WHERE id={job_id}') + def get_job_info_from_jobs(self, job_id): + job_id = int(job_id) + job_json = self.jobs_db.execute_query("SELECT j FROM jobs WHERE id = %s", (job_id,)) if not job_json: return {} first_row = job_json[0] - return json.loads(first_row.get('j', {})) + return json.loads(first_row.get('j', {})) def set_priority_level(self, job_info: dict) -> int: """ @@ -125,34 +159,29 @@ class BisectTask: return priority def bisect_producer(self): - """ - Producer function to fetch new bisect tasks from Elasticsearch and add them to the bisect index if they don't already exist. - This function runs in an infinite loop, checking for new tasks every 5 minutes. - """ + """Producer function optimized for batch processing and rate limiting""" error_count = 0 + while True: - cycle_start = time.time() + start_time = time.time() try: - logging.info("Starting producer cycle...") - # Fetch new bisect tasks from Elasticsearch - new_bisect_tasks = self.get_new_bisect_task_from_manticore() - logging.info(f"Found {len(new_bisect_tasks)} new bisect tasks") - logging.debug(f"Raw tasks data: {new_bisect_tasks}") - - # Process each task - processed_count = 0 - for task in new_bisect_tasks: - try: - self.add_bisect_task(task) - except Exception as e: - logging.error(f"Error processing task {task.get('id', 'unknown')}: {str(e)}") - continue + # Fetch new tasks with time filtering + new_bisect_tasks = self.get_new_bisect_task_from_jobs() + logger.info(f"Found {len(new_bisect_tasks)} candidate tasks") + success_count = 0 + for task in new_bisect_tasks: + if not self.bisect_db.execute_query(f"""SELECT id FROM bisect WHERE error_id='{task['error_id']}' AND bad_job_id='{task['bad_job_id']}'"""): + if self.add_bisect_task(task): + success_count += 1 + logger.info(f"Add {task['error_id']} and {task['bad_job_id']} OK") + + error_count = max(0, error_count - 1) # Reduce error count on success except Exception as e: # 添加详细错误日志 - logging.error(f"Error in bisect_producer: {str(e)}") - logging.error(f"Failed task data: {task if 'task' in locals() else 'No task data'}") - logging.error(traceback.format_exc()) # 打印完整堆栈跟踪 + logger.error(f"Error in bisect_producer: {str(e)}") + logger.error(f"Failed task data: {task if 'task' in locals() else 'No task data'}") + logger.error(traceback.format_exc()) # 打印完整堆栈跟踪 # 指数退避重试机制 sleep_time = min(300, 2 ** error_count) time.sleep(sleep_time) @@ -160,11 +189,11 @@ class BisectTask: else: # 正常执行后重置错误计数器 error_count = 0 - cycle_time = time.time() - cycle_start - logging.info(f"Producer cycle completed. Processed {processed_count} tasks in {cycle_time:.2f} seconds") + cycle_time = time.time() - start_time + logger.info(f"Producer cycle completed.") # 固定间隔休眠,两个循环之间的间隔永远为300秒,无论每次循环的执行时间为多少 sleep_time = 300 - cycle_time - logging.info(f"Sleeping for {sleep_time:.2f} seconds until next cycle") + logger.info(f"Sleeping for {sleep_time:.2f} seconds until next cycle") time.sleep(sleep_time) def bisect_consumer(self): @@ -187,23 +216,23 @@ class BisectTask: # Check the mode of operation from environment variable if os.getenv('bisect_mode') == "submit": # If mode is 'submit', send tasks to the scheduler - logging.debug("Submitting bisect tasks to scheduler") + logger.debug("Submitting bisect tasks to scheduler") self.submit_bisect_tasks(bisect_tasks) else: # If mode is not 'submit', run tasks locally - logging.debug("Running bisect tasks locally") + logger.debug("Running bisect tasks locally") self.run_bisect_tasks(bisect_tasks) processed = len(bisect_tasks) except Exception as e: # Log any errors that occur during task processing - logging.error(f"Error in bisect_consumer: {e}") + logger.error(f"Error in bisect_consumer: {e}") # 异常后休眠时间加倍(简易熔断机制) time.sleep(60) finally: # 记录处理指标 cycle_time = time.time() - cycle_start - logging.info(f"Consumer cycle processed {processed} tasks in {cycle_time:.2f}s") + logger.info(f"Consumer cycle processed {processed} tasks in {cycle_time:.2f}s") # 动态休眠控制(无任务时延长休眠) sleep_time = 30 if processed > 0 else 60 @@ -219,39 +248,48 @@ class BisectTask: """ for bisect_task in bisect_tasks: task_id = bisect_task.get('id', 'unknown') - print(bisect_task) + task_id = int(task_id) + bad_job_id = bisect_task.get('bad_job_id') + task_result_root = bisect_task.get('bisect_result_root', None) + if task_result_root is None: + task_result_root = os.path.abspath( + os.path.join('bisect_results', bad_job_id) + ) + + task_metric = None + task_good_commit = None try: + time.sleep(5) #检查bisect_task的状态,乐观锁 if bisect_task.get('bisect_status') != 'wait': - logging.warning(f"Skipping task {task_id} with invalid status: {bisect_task['bisect_status']}") + logger.warning(f"Skipping task {task_id} with invalid status: {bisect_task['bisect_status']}") continue # Update task status to 'processing' in Elasticsearch bisect_task["bisect_status"] = "processing" # Convert to Manticore SQL update update_sql = f""" - UPDATE bisect_task + UPDATE bisect SET bisect_status = 'processing' - WHERE id = '{bisect_task["id"]}' + WHERE id = {task_id} + AND bisect_status = 'wait' """ #TODO UPDATE - self.manticore_update(update_sql) - logging.debug(f"Started processing task: {bisect_task['id']}") + affected_rows = self.bisect_db.execute_update(update_sql) + if affected_rows == 0: + logger.warning(f"跳过已被处理的任务 ID: {task_id}") + continue + logger.debug(f"Started processing task: {task_id}") # Prepare task data for Git bisect with result root # Create unique temporary directory with cleanup - task_id = str(bisect_task['id']) # Create unique clone path using task ID - clone_path = os.path.join( - ) task = { 'bad_job_id': bisect_task['bad_job_id'], 'error_id': bisect_task['error_id'], - 'bisect_result_root': f"/tmp/bisect/{bisect_task['bad_job_id']}", - 'clone_path': clone_path + 'good_commit': task_good_commit, + 'bisect_result_root': task_result_root, + 'metric': task_metric } - if os.path.exists(task['bisect_result_root']): - shutil.rmtree(task['bisect_result_root']) - os.makedirs(task['bisect_result_root'], exist_ok=True) # Handle bad_job_id conversion with validation try: @@ -259,35 +297,110 @@ class BisectTask: result = gb.find_first_bad_commit(task) except (ValueError, KeyError) as e: raise ValueError(f"Invalid bad_job_id: {task.get('bad_job_id')}") from e - + # TODO: save error_id to regression # Update task status and result in Elasticsearch - bisect_task["bisect_status"] = "completed" - bisect_task["bisect_result"] = result - # Convert to Manticore SQL update - update_sql = f""" - UPDATE bisect_task + if result: + # Convert to Manticore SQL update + update_sql = f""" + UPDATE bisect SET bisect_status = 'completed', - bisect_result = '{json.dumps(bisect_task["bisect_result"])}' - WHERE id = {bisect_task["id"]} - """ - # TODO update - self.manticore_update(update_sql) - logging.debug(f"Completed processing task: {bisect_task['id']}") + project = {result['repo']}, + git_url = {result['git_url']}, + bad_commit = {result['first_bad_commit']}, + first_bad_id = {result['first_bad_id']}, + bad_result_root = {result['bad_result_root']}, + work_dir = {result['work_dir']}, + start_time = {result['start_time']}, + end_time = {result['end_time']}, + WHERE id = {task_id} + """ + # TODO update + self.bisect_db.execute_update(update_sql) + self.update_regression(task, result) + logger.debug(f"Completed processing task: {bisect_task['id']}") except Exception as e: # Update task status to 'failed' in case of an error - bisect_task["bisect_status"] = "failed" - bisect_task["bisect_result"] = str(e) # Convert to Manticore SQL update # Remove bisect_result column from update update_sql = f""" - UPDATE bisect_task - SET bisect_status = 'failed' - WHERE id = '{bisect_task["id"]}' + UPDATE bisect + SET bisect_status = 'failed + WHERE id = {task_id} """ - self.manticore_query(update_sql) - logging.error(f"Marked task {bisect_task['id']} as failed due to error: {e}") + self.bisect_db.execute_update(update_sql) + logger.error(f"Marked task {bisect_task['id']} as failed due to error: {e}") + def update_regression(self, task, result): + """Update regression database with bisect results""" + try: + # 参数校验 + if not task.get('error_id') or not task.get('bad_job_id'): + logger.error("Invalid task format for regression update") + return + + # 获取当前时间戳(秒级) + current_time = int(time.time()) + bad_job_id = task['bad_job_id'] + error_id = task['error_id'].replace("'", "''") # 转义单引号 + + # 检查是否已存在有效记录 + existing = self.regression_db.execute_query( + f"SELECT id, bisect_count, related_jobs " + f"FROM regression " + f"WHERE record_type = 'errid' " + f" AND errid = '{error_id}' " + f" AND valid = 'true'" + ) + + if not existing: + # 插入新记录 + new_id = int(f"{current_time}{randint(1000,9999)}") # 生成唯一ID + category = result.get('category', 'unknown').replace("'", "''") + related_jobs_json = json.dumps([bad_job_id]) # 初始化为数组 + + insert_sql = f""" + INSERT INTO regression + (id, record_type, errid, category, + first_seen, last_seen, bisect_count, + related_jobs, valid) + VALUES ( + {new_id}, + 'errid', + '{error_id}', + '{category}', + {current_time}, + {current_time}, + 1, + '{related_jobs_json}', + 'true' + ) + """ + self.regression_db.execute_write(insert_sql) + else: + # 更新现有记录 + record = existing[0] + new_count = record['bisect_count'] + 1 + record_id = record['id'] + + update_sql = f""" + UPDATE regression + SET bisect_count = {new_count}, + last_seen = {current_time}, + related_jobs = JSON_ARRAY_APPEND( + related_jobs, + '$', + '{bad_job_id}' + ) + WHERE id = {record_id} + """ + self.regression_db.execute_update(update_sql) + + logger.info(f"Regression updated | ErrorID: {error_id}") + + except Exception as e: + logger.error(f"Failed to update regression: {str(e)}") + logger.error(f"Task: {task} | Result: {result}") def submit_bisect_tasks(self, bisect_tasks): """ Submit a list of bisect tasks to the scheduler if they are not already in the database. @@ -296,54 +409,29 @@ class BisectTask: :param bisect_tasks: List of bisect tasks to submit. """ # Define the query to check if a bisect task already exists in the database - query_if_bisect_already_in_db = { - "_source": ["id"], # 只需返回ID字段验证存在性 - "query": { - "bool": { - "must": [ - {"term": {"error_id": None}}, # 占位符,实际替换具体值 - {"term": {"bad_job_id": None}}, - {"exists": {"field": "id"}}, - {"term": {"suite": "bisect-py"}} - ] - } - } - } # Process each bisect task for bisect_task in bisect_tasks: task_id = bisect_task["id"] try: # Check if the task already exists in the database - current_query = query_if_bisect_already_in_db.copy() - current_query["query"]["bool"]["must"][0]["term"]["error_id"] = bisect_task["error_id"] - current_query["query"]["bool"]["must"][1]["term"]["bad_job_id"] = bisect_task["bad_job_id"] - if not self.es_query("jobs8", current_query): - # If the task does not exist, submit it to the scheduler - result = self.submit_bisect_job(bisect_task["bad_job_id"], bisect_task["error_id"]) - if result: - logging.info(f"Submitted bisect task to scheduler: {bisect_task['id']}") - else: - logging.error(f"Submission failed for task {task_id}") + result = self.submit_bisect_job(bisect_task["bad_job_id"], bisect_task["error_id"]) + if result: + logger.info(f"Submitted bisect task to scheduler: {bisect_task['id']}") else: - # If the task already exists, log a message - logging.debug(f"Job already in db: {bisect_task['id']}") + logger.error(f"Submission failed for task {task_id}") except KeyError as e: # 处理任务数据格式错误 - logging.error(f"Invalid task format {task_id}: missing {str(e)}") + logger.error(f"Invalid task format {task_id}: missing {str(e)}") except Exception as e: # 添加重试机制 retry_count = 0 while retry_count < 3: - logging.error(f"Submission failed for task {task_id} {retry_count+1} times") + logger.error(f"Submission failed for task {task_id} {retry_count+1} times") try: - current_query = query_if_bisect_already_in_db.copy() - current_query["query"]["bool"]["must"][0]["term"]["error_id"] = bisect_task["error_id"] - current_query["query"]["bool"]["must"][1]["term"]["bad_job_id"] = bisect_task["bad_job_id"] - if not self.es_query("jobs8", current_query): - result = self.submit_bisect_job(bisect_task["bad_job_id"], bisect_task["error_id"]) - if result: - logging.info(f"Submitted bisect task to scheduler: {bisect_task['id']}") + result = self.submit_bisect_job(bisect_task["bad_job_id"], bisect_task["error_id"]) + if result: + logger.info(f"Submitted bisect task to scheduler: {bisect_task['id']}") break except Exception: retry_count += 1 @@ -359,12 +447,12 @@ class BisectTask: # Define SQL query sql = """ SELECT * - FROM bisect_task + FROM bisect WHERE bisect_status = 'wait' - LIMIT 100 + LIMIT 20 """ - result = self.manticore_query(sql) + result = self.bisect_db.execute_query(sql) # Return the result if tasks are found, otherwise return None if result: @@ -372,91 +460,6 @@ class BisectTask: else: return None - - def execute_sql(self, sql, db=None, write_operation=False): - """Base method for executing SQL statements""" - try: - host = os.environ.get('MANTICORE_HOST', 'localhost') - port = os.environ.get('MANTICORE_PORT', '9306') - database = db or os.environ.get('MANTICORE_DB', 'jobs') - - connection = mysql.connector.connect( - host=host, - port=port, - database=database, - connect_timeout=5 - ) - - cursor = connection.cursor(dictionary=True) - - if write_operation: - cursor.execute(sql) - connection.commit() - return cursor.rowcount - else: - cursor.execute(sql) - result = cursor.fetchall() - if cursor.with_rows: - cursor.fetchall() # Consume unread results - return result if result else None - - except mysql.connector.Error as e: - logging.error(f"Manticore operation failed: {e}\nSQL: {sql}") - return None - except Exception as e: - logging.error(f"Unexpected error: {str(e)}\nSQL: {sql}") - return None - finally: - if 'cursor' in locals(): - cursor.close() - if 'connection' in locals() and connection.is_connected(): - connection.close() - - def manticore_query(self, sql, db=None): - """Execute read query and return results""" - return self.execute_sql(sql, db, write_operation=False) - - def manticore_insert(self, table, data, db=None): - """Safe INSERT operation with parameterized query""" - if not data: - return None - - columns = ', '.join(data.keys()) - values = ', '.join([f"'{v}'" if isinstance(v, str) else str(v) for v in data.values()]) - sql = f"INSERT INTO {table} ({columns}) VALUES ({values})" - - return self.execute_sql(sql, db, write_operation=True) - - def manticore_update(self, table, updates, condition, db=None): - """Safe UPDATE operation with parameterized query""" - if not updates or not condition: - return None - - set_clause = ', '.join([ - f"{k} = '{v}'" if isinstance(v, str) else f"{k} = {v}" - for k, v in updates.items() - ]) - where_clause = ' AND '.join([ - f"{k} = '{v}'" if isinstance(v, str) else f"{k} = {v}" - for k, v in condition.items() - ]) - sql = f"UPDATE {table} SET {set_clause} WHERE {where_clause}" - - return self.execute_sql(sql, db, write_operation=True) - - def manticore_delete(self, table, condition, db=None): - """Safe DELETE operation with parameterized query""" - if not condition: - return None - - where_clause = ' AND '.join([ - f"{k} = '{v}'" if isinstance(v, str) else f"{k} = {v}" - for k, v in condition.items() - ]) - sql = f"DELETE FROM {table} WHERE {where_clause}" - - return self.execute_sql(sql, db, write_operation=True) - def submit_bisect_job(self, bad_job_id, error_id): """ Submit a bisect job to the scheduler using the provided bad_job_id and error_id. @@ -480,28 +483,28 @@ class BisectTask: match = re.search(r'id=(\S+)', result.stdout) if match: job_id = match.group(1) - logging.info(f"Job submitted successfully. Job ID: {job_id}") + logger.info(f"Job submitted successfully. Job ID: {job_id}") return job_id else: - logging.error(f"Unexpected submit output: {result.stdout}") + logger.error(f"Unexpected submit output: {result.stdout}") return None except subprocess.CalledProcessError as e: - logging.error(f"Job submission failed with return code {e.returncode}.") + logger.error(f"Job submission failed with return code {e.returncode}.") return None except subprocess.TimeoutExpired: # 处理命令执行超时 - logging.error("Submit command timed out after 30 seconds") + logger.error("Submit command timed out after 30 seconds") return None except KeyError: # 处理LKP_SRC环境变量缺失 - logging.error("LKP_SRC environment variable not configured") + logger.error("LKP_SRC environment variable not configured") return None except Exception as e: # 兜底异常处理 - logging.error(f"Unexpected error during job submission: {str(e)}") + logger.error(f"Unexpected error during job submission: {str(e)}") return None - def get_new_bisect_task_from_manticore(self): + def get_new_bisect_task_from_jobs(self): """ Fetch new bisect tasks using Manticore SQL for both PKGBUILD and SS suites. Tasks are filtered based on a white list of error IDs and processed into a standardized format. @@ -510,113 +513,104 @@ class BisectTask: """ # Define SQL for PKGBUILD tasks # TODO: AND submit_time > NOW() - INTERVAL 7 DAY + # Perf monitor sql_failure = """ - SELECT id, j.stats as errid + SELECT id, errid as errid FROM jobs WHERE j.job_health = 'abort' AND j.stats IS NOT NULL ORDER BY id DESC """ - # Define SQL for SS tasks - sql_ss = """ - SELECT id, stats, ss - FROM jobs - WHERE job_health = 'failed' - AND ss IS NOT NULL - AND stats IS NOT NULL - """ - # Define the white list of error IDs - errid_white_list = ["last_state.eid.test..exit_code.99"] + sql_error_id = """ + SELECT errid + FROM regression + WHERE record_type = 'errid' + AND valid = 'true' + ORDER BY id DESC + """ + errid_white_list = self.regression_db.execute_query(sql_error_id) # Execute Manticore SQL queries - result = self.manticore_query(sql_failure) - + result = self.bisect_db.execute_query(sql_failure) # Convert the list of tasks into a dictionary with task IDs as keys - # 添加详细日志记录原始数据格式 - logging.debug(f"Raw query result sample: {result[:1] if result else 'Empty result'}") - - result_dict = {} - for item in result: - try: - item_id = item['id'] - result_dict[item_id] = item - except KeyError: - logging.warning(f"Skipping invalid item missing 'id' field: {item}") + # 添加详细日志记录原始数据格式 # Process the tasks to filter and transform them based on the white list - tasks = self.process_data(result, errid_white_list) - + errid_tasks = self.process_data(result, errid_white_list) + # TODO: PERF_TASKS = self. # Return the processed tasks - return tasks + return errid_tasks def process_data(self, input_data, white_list): - """ - Process input data to filter and transform it based on a white list of error IDs. - Each valid entry is assigned a new UUID and added to the result list. - - :param input_data: A list of dictionaries containing the input data to process - :param white_list: A list of error IDs to filter by. - :return: A list of processed documents, each containing a new UUID and filtered error ID. - """ result = [] - - # Iterate over each item in the input list for item in input_data: try: - # Parse the JSON string in errid field - error_ids = json.loads(item["errid"]).keys() + bad_job_id = str(item["id"]) + # 修改点:直接分割字符串代替JSON解析 + errids = item["errid"].split() # 按空格分割字符串 - # Find matching error IDs from white list - matches = [errid for errid in error_ids if errid in white_list] - if not matches: - continue - - # Create new document with generated UUID and original bad_job_id - document = { - "id": str(uuid.uuid4()), # Generate UUID for task ID - "bad_job_id": str(item["id"]), # Keep original as string - "error_id": matches[0], - "bisect_status": "wait" - } - result.append(document) - - except (KeyError, json.JSONDecodeError) as e: - logging.warning(f"Skipping invalid item {item.get('id')}: {str(e)}") - + # 核心逻辑:优先白名单,无匹配则全处理 + if white_list: # 模式一:存在白名单时 + candidates = set(errids) & set(white_list) + if not candidates: # 白名单存在但无匹配时回退 + candidates = errids + else: # 模式二:无白名单时 + candidates = errids + + # 生成任务文档 + for errid in candidates: + result.append({ + "bad_job_id": bad_job_id, + "error_id": errid, + "bisect_status": "wait" + }) + + except (KeyError, AttributeError) as e: # 修改异常类型 + logger.warning(f"处理异常条目 {item.get('id')}:{str(e)}") + continue + + logger.info(f"生成任务数:{len(result)} | 白名单模式:{bool(white_list)}") return result - def check_existing_bisect_task(self, bad_job_id, error_id): """ - Check if a bisect task with the given bad_job_id and error_id already exists using Manticore SQL. + Check if a bisect task with the given bad_job_id and error_id already exists. :param bad_job_id: The ID of the bad job to check. :param error_id: The error ID associated with the bad job. :return: Boolean indicating if task exists. """ - sql = f""" - SELECT 1 AS exist - FROM bisect_task - WHERE bad_job_id = '{bad_job_id}' - AND error_id = '{error_id}' - LIMIT 1 - """ - try: - result = self.manticore_query(sql, db="bisect_task") - if not result: # Handle None result - logging.warning(f"No results from existence check query for {bad_job_id}/{error_id}") - return False - - return result - - except KeyError as e: - logging.error(f"manticore response structure: {str(e)}") - return True + result = self.bisect_db.execute_query( + "SELECT 1 FROM bisect WHERE bad_job_id = %s AND error_id = %s LIMIT 1", + (bad_job_id, error_id) + ) + return bool(result) except Exception as e: - logging.error(f"Unexpected error during existence check: {str(e)}") + logger.error(f"Error checking existing task: {str(e)}") return True - + def _start_monitor(self): + def monitor(): + while True: + try: + logger.info("🔍 连接池简略状态:") + try: + jobs_active = self.jobs_db.pool._cnx_queue.qsize() + bisect_active = self.bisect_db.pool._cnx_queue.qsize() + logger.info(f"Jobs DB 活跃连接: {jobs_active}") + logger.info(f"Bisect DB 活跃连接: {bisect_active}") + except AttributeError as e: + logger.warning(f"连接池状态获取失败: {str(e)}") + + self.jobs_db.check_connection_leaks() + self.bisect_db.check_connection_leaks() + + except Exception as e: + logger.error(f"监控异常: {str(e)}") + finally: + time.sleep(300) + + threading.Thread(target=monitor, daemon=True).start() class BisectAPI(MethodView): def __init__(self): @@ -630,43 +624,101 @@ class BisectAPI(MethodView): self.bisect_api.add_bisect_task(task) return jsonify({"message": "Task added successfully"}), 200 -def set_log(): - stream_handler = logging.StreamHandler() - console_formatter = logging.Formatter( - '%(asctime)s - %(name)s - %(levelname)s - %(module)s:%(lineno)d - %(message)s' - ) - stream_handler.setFormatter(console_formatter) - stream_handler.setLevel(logging.DEBUG if os.getenv('LOG_LEVEL') == 'DEBUG' else logging.INFO) +class ListBisectTasksAPI(MethodView): + def __init__(self): + self.bisect_db = BisectDB( + host=os.environ.get('MANTICORE_HOST', 'localhost'), + port=os.environ.get('MANTICORE_PORT', '9306'), + database="bisect", + pool_size=15 + ) + + def get(self): + try: + # 查询所有bisect任务 + tasks = self.bisect_db.execute_query(""" + SELECT id, bad_job_id, error_id, bisect_status + FROM bisect + ORDER BY id DESC + """) + + # 格式化输出 + formatted_tasks = [] + for task in tasks: + formatted_tasks.append({ + "TASk ID": task['id'], + "BAD JOB ID": task['bad_job_id'], + "ERROR ID": task['error_id'], + "STATUS": {'wait': 'wait', 'processing': 'processing', 'completed': 'finish', 'failed': 'failed'}.get(task['bisect_status'], 'unknown'), + }) + + response_data = { + "total": len(formatted_tasks), + "tasks": formatted_tasks + } + return json.dumps(response_data, indent=2, ensure_ascii=False), 200, {'Content-Type': 'application/json; charset=utf-8'} - logging.basicConfig( - level=logging.DEBUG, - handlers=[stream_handler], - format='%(asctime)s.%(msecs)03d %(levelname)s %(name)s:%(lineno)d - %(message)s', - datefmt='%Y-%m-%d %H:%M:%S' - ) + except Exception as e: + logger.error(f"获取任务列表失败: {str(e)}") + return jsonify({"error": "内部服务器错误"}), 500 - logger = logging.getLogger('bisect-task') - logger.setLevel(logging.DEBUG) - logger.addHandler(stream_handler) - logger.propagate = False +class DeleteFailedTasksAPI(MethodView): + def __init__(self): + self.bisect_db = BisectDB( + host=os.environ.get('MANTICORE_HOST', 'localhost'), + port=os.environ.get('MANTICORE_PORT', '9306'), + database="bisect", + pool_size=15 + ) + + def delete(self): + try: + # 执行删除操作 + result = self.bisect_db.execute_delete(""" + DELETE FROM bisect + WHERE bisect_status = 'failed' + """) + + logger.info(f"成功删除{result}条失败任务") + return jsonify({ + "status": "success", + "deleted_count": result + }), 200 + except Exception as e: + logger.error(f"删除失败任务时出错: {str(e)}") + return jsonify({ + "error": "服务器内部错误", + "details": str(e) + }), 500 def run_flask(): + """使用生产级 WSGI 服务器,带开发服务器回退""" app.add_url_rule('/new_bisect_task', view_func=BisectAPI.as_view('bisect_api')) - app.run(host='0.0.0.0', port=9999) + app.add_url_rule('/list_bisect_tasks', view_func=ListBisectTasksAPI.as_view('list_bisect_tasks')) + app.add_url_rule('/delete_failed_tasks', view_func=DeleteFailedTasksAPI.as_view('delete_failed_tasks')) + port = int(os.environ.get('BISECT_API_PORT', 9999)) + + try: + from waitress import serve + serve(app, host='0.0.0.0', port=port, threads=8) + except ImportError: + logger.warning("Waitress 未安装,使用开发服务器") + app.run(host='0.0.0.0', port=port) def main(): try: # 先启动后台任务 - set_log() + #set_log() run = BisectTask() bisect_producer_thread = threading.Thread(target=run.bisect_producer, daemon=True) bisect_producer_thread.start() # 在独立线程运行Flask flask_thread = threading.Thread(target=run_flask, daemon=True) flask_thread.start() + time.sleep(5) num_consumer_threads = 2 for i in range(num_consumer_threads): @@ -677,9 +729,11 @@ def main(): while True: time.sleep(3600) # 防止主线程退出 except Exception as e: - print("Error when init_bisect_commit:" + e) - sys.exit(-1) + logger.error(f"Error when init_bisect_commit: {str(e)}") + logger.error(traceback.format_exc()) # Add stack trace + sys.exit(1) if __name__ == "__main__": main() + diff --git a/container/bisect/start b/container/bisect/start index fc69adbd1..6d9d5bf3c 100755 --- a/container/bisect/start +++ b/container/bisect/start @@ -25,6 +25,7 @@ DEFAULT_CCI = '/c/compass-ci' DEFAULT_CONFIG_DIR = '/etc/compass-ci/defaults' DEFAULT_USER_CONFIG_DIR = File.expand_path("~/.config/compass-ci/") DEFAULT_BISECT_CONFIG_DIR = '/root/.config/compass-ci/' +MANTICORE_HOST='172.17.0.1' docker_rm 'bisect' cmd = %w[ @@ -35,11 +36,12 @@ cmd = %w[ ] + env + %W[ -e LKP_SRC=#{DEFAULT_LKP} -e CCI_SRC=#{DEFAULT_CCI} + -e MANTICORE_HOST=#{MANTICORE_HOST} -v #{DEFAULT_CONFIG_DIR}:#{DEFAULT_CONFIG_DIR}:ro -v #{DEFAULT_USER_CONFIG_DIR}:#{DEFAULT_BISECT_CONFIG_DIR}:ro -v /etc/localtime:/etc/localtime:ro -v /etc/compass-ci/register:/etc/compass-ci/register:ro - -p 9999:9999 + -p 10000:9999 bisect ] -- Gitee From 02a03f6c37cd3db63b4003b64c6457eb7c929daa Mon Sep 17 00:00:00 2001 From: jacknichao Date: Tue, 6 May 2025 21:07:35 +0800 Subject: [PATCH 4/5] bisect: Refactor database and logging modules, add schema scripts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit introduces the following changes: - Extract database and logging components into standalone modules: • lib/bisect_database.py # Bisect 数据库操作核心模块 • lib/generic_sql_client.py # 通用 SQL 客户端封装 • lib/log_config.py # 统一日志配置管理 - Add schema definitions for new features: • Updated sbin/manti-table-bisect.sql # 增强 bisect 表结构 • Created sbin/manti-table-regression.sql # 新增 regression 功能表定义 - Modify table creation script: • Updated sbin/create-manticore-tables.sh # 适配的部署逻辑 --- lib/bisect_database.py | 213 ++++++++++++++++++++++++++++++++ lib/generic_sql_client.py | 149 ++++++++++++++++++++++ lib/log_config.py | 135 ++++++++++++++++++++ sbin/create-manticore-tables.sh | 3 +- sbin/manti-table-bisect.sql | 18 ++- sbin/manti-table-regression.sql | 17 +++ 6 files changed, 523 insertions(+), 12 deletions(-) create mode 100644 lib/bisect_database.py create mode 100644 lib/generic_sql_client.py create mode 100644 lib/log_config.py create mode 100644 sbin/manti-table-regression.sql diff --git a/lib/bisect_database.py b/lib/bisect_database.py new file mode 100644 index 000000000..6c98479d0 --- /dev/null +++ b/lib/bisect_database.py @@ -0,0 +1,213 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: MulanPSL-2.0+ + +from log_config import logger +import mysql.connector +from generic_sql_client import GenericSQLClient +from mysql.connector import Error +import json +from typing import Optional, Dict, Any, Tuple, List +from functools import wraps +import time +import threading + +class BisectDB(GenericSQLClient): + """Database operations for bisect process with connection pool""" + + def __init__( + self, + host: str, + port: str, + database: str, + readonly: bool = False, + pool_size: int = 5 + ): + """Initialize database connection pool""" + self._active_connections = threading.local() # Thread local storage + super().__init__( + host=host, + port=port, + database=database, + pool_name=f"{database}_pool", # Isolate connection pools by database + pool_size=pool_size, + readonly=readonly + ) + if not hasattr(self, '_cache'): + self._cache = {} + + def execute_query(self, sql: str, params: Optional[tuple] = None) -> Optional[List[Dict]]: + """Execute a SELECT query with connection pool (compatibility method)""" + return self.execute(sql, params, operation='read') + + def execute_write(self, sql: str, params: Optional[tuple] = None) -> int: + """Execute write operation with native parameter passing""" + return super().execute(sql, params, operation='write') + + def execute_update(self, sql: str, params: Optional[tuple] = None) -> int: + """Execute write operation with native parameter passing""" + return super().execute(sql, params, operation='write') + + def execute_delete(self, sql: str, params: Optional[tuple] = None) -> int: + """Execute write operation with native parameter passing""" + return super().execute(sql, params, operation='write') + + + def cache_query(ttl: int = 300): + """Cache decorator for database queries""" + def decorator(func): + @wraps(func) + def wrapper(self, *args, **kwargs): + cache_key = f"{func.__module__}.{func.__name__}:" \ + f"{hash(str(args))}:{hash(frozenset(kwargs.items()))}" + if cache_key in self._cache: + timestamp, result = self._cache[cache_key] + if time.time() - timestamp < ttl: + return result + result = func(self, *args, **kwargs) + self._cache[cache_key] = (time.time(), result) + return result + return wrapper + return decorator + + @cache_query(ttl=600) + def get_job_info(self, job_id: str) -> Optional[Dict[str, Any]]: + """ + Fetch job information from database + + Args: + job_id: The job ID to fetch + + Returns: + Dict containing job information or None if not found + """ + try: + # Ensure job_id is valid integer + try: + job_id_int = int(job_id) + except ValueError: + logger.error(f"Invalid job ID format: {job_id}") + return None + + connection = self.get_connection() + cursor = connection.cursor(dictionary=True) + + query = "SELECT j FROM jobs WHERE id = %s LIMIT 1" + cursor.execute(query, (int(job_id),)) + result = cursor.fetchone() + + if not result or 'j' not in result: + return None + + return (json.loads(result['j']) + if isinstance(result['j'], str) + else result['j']) + + except Error as e: + logger.error(f"Database error: {e}") + return None + finally: + if cursor: + cursor.close() + + @cache_query(ttl=600) + def check_existing_job(self, job: Dict[str, Any], limit: int = 1) -> List[Tuple[str, str]]: + """ + Check if jobs with same configuration exist + + Args: + job: Job configuration to check + limit: Maximum number of results to return + + Returns: + List of (job_id, result_root) tuples, empty list if none found + """ + try: + connection = self.get_connection() + cursor = connection.cursor(dictionary=True) + + # Keep only core query conditions + conditions = [] + if 'program' in job: + for path, value in self._flatten_dict(job['program'], 'program'): + conditions.append(f"j.{path} = {self._format_sql_value(value)}") + + if job.get('ss'): + for path, value in self._flatten_dict(job['ss'], 'ss'): + conditions.append(f"j.{path} = {self._format_sql_value(value)}") + + where_clause = " AND ".join(conditions) if conditions else "1=1" + + # Simplified query + query = f""" + SELECT id, j.result_root as result_root + FROM jobs + WHERE {where_clause} + AND j.stats IS NOT NULL + ORDER BY submit_time DESC + LIMIT {limit} + """ + + cursor.execute(query) + results = cursor.fetchall() + + return [(result['id'], result['result_root']) for result in results] if results else [] + + except Error as e: + logger.error(f"Database error: {e}") + return None + finally: + if cursor: + cursor.close() + + def _flatten_dict(self, d: Dict, prefix: str = '') -> list: + """Flatten nested dictionary into (path, value) pairs""" + items = [] + for k, v in d.items(): + key_path = f"{prefix}.{k}" if prefix else k + if isinstance(v, dict): + items.extend(self._flatten_dict(v, key_path)) + else: + items.append((key_path, v)) + return items + + def _format_sql_value(self, value): + """安全格式化 SQL 值(避免反斜杠和语法错误)""" + if isinstance(value, (int, float)): + return str(value) + elif isinstance(value, str): + # 先转义单引号,再包裹外层引号 + escaped = value.replace("'", "''") + return f"'{escaped}'" + else: + # 处理非字符串类型(如字典、列表) + json_str = json.dumps(value).replace("'", "''") + return f"'{json_str}'" + + def close(self): + """Close database connection""" + if self._connection and self._connection.is_connected(): + self._connection.close() + + def check_connection_leaks(self): + """增强泄漏检查""" + try: + if hasattr(self._active_connections, 'count') and self._active_connections.count > 0: + logger.error(f"⚠️ 检测到连接泄漏!当前未释放连接数: {self._active_connections.count}") + self._active_connections.count = 0 + except AttributeError: + pass + + def get_pool_status(self) -> dict: + """获取连接池状态(兼容 Manticore)""" + try: + return { + "database": self.database, + "pool_size": self.pool.pool_size, + "in_use": [conn.is_connected() for conn in self.pool._connections], + "idle": self.pool._cnx_queue.qsize(), + "total": self.pool.pool_size, + "available": self.pool.pool_size - self.pool._used_connections + } + except Exception as e: + logger.error(f"获取连接池状态失败: {str(e)}") + return {"error": str(e)} diff --git a/lib/generic_sql_client.py b/lib/generic_sql_client.py new file mode 100644 index 000000000..0e9f25b3f --- /dev/null +++ b/lib/generic_sql_client.py @@ -0,0 +1,149 @@ +import sys +import os +import threading +import mysql.connector +sys.path.append((os.environ['LKP_SRC']) + '/programs/bisect-py/') +from log_config import logger + +from typing import Optional, List, Dict, Any, Tuple +from mysql.connector import pooling, Error + +class GenericSQLClient: + """Generic SQL client base class providing connection pool and basic operations""" + + def __init__( + self, + host: str, + port: str, + database: str, + pool_name: str = "generic_pool", + pool_size: int = 5, + autocommit: bool = True, + readonly: bool = False + ): + self.host = host + self.port = port + self.database = database + self.readonly = readonly + + connect_args = { + 'host': host, + 'port': int(port), + 'database': database, + 'autocommit': autocommit, + 'connect_timeout': 10, + 'user': os.getenv('MYSQL_USER', 'root'), + 'password': os.getenv('MYSQL_PASSWORD', ''), + 'client_flags': [mysql.connector.ClientFlag.MULTI_STATEMENTS], + 'raise_on_warnings': True, + 'use_pure': True, + 'charset': 'utf8mb4', + 'collation': 'utf8mb4_unicode_ci', + 'pool_reset_session': False + } + + try: + self.pool = pooling.MySQLConnectionPool( + pool_name=pool_name, + pool_size=pool_size, + **connect_args + ) + logger.info(f"成功初始化连接池 {pool_name} ({pool_size} connections)") + except RuntimeError as e: + if "C Extension not available" in str(e): + logger.warning("C 扩展不可用,切换至纯 Python 模式") + connect_args['use_pure'] = True + self.pool = pooling.MySQLConnectionPool( + pool_name=pool_name, + pool_size=pool_size, + **connect_args + ) + logger.info(f"使用纯 Python 模式初始化连接池 {pool_name}") + except Error as e: + logger.error(f"连接池初始化失败: {str(e)}") + raise RuntimeError(f"无法连接数据库 {database}@{host}:{port}") + + def get_connection(self): + """从连接池获取数据库连接(基类实现)""" + try: + conn = self.pool.get_connection() + if not hasattr(self._active_connections, 'count'): + self._active_connections.count = 0 + self._active_connections.count += 1 + logger.debug(f"获取连接 #{self._active_connections.count}") + return conn + except AttributeError as e: + logger.error("连接池未初始化,请检查以下配置:") + logger.error(f"- 主机: {self.host}:{self.port}") + logger.error(f"- 数据库: {self.database}") + logger.error(f"- 连接池名称: {self.pool.pool_name}") + raise RuntimeError("数据库连接池未正确初始化") from e + except mysql.connector.PoolError as e: + logger.error(f"获取连接失败 | 当前连接池状态:") + logger.error(f"- 总连接数: {self.pool.pool_size}") + logger.error(f"- 使用中连接: {self.pool._used_connections}") + logger.error(f"- 最后错误: {str(e)}") + raise + + def execute( + self, + sql: str, + params: Optional[tuple] = None, + operation: str = 'read' + ) -> Optional[List[Dict]]: + if self.readonly and operation != 'read': + raise RuntimeError(f"Write operation forbidden on readonly database: {self.database}") + conn = self.pool.get_connection() + try: + with conn.cursor(dictionary=True) as cursor: + cursor.execute(sql, params or ()) + + if operation == 'read': + return cursor.fetchall() + else: + conn.commit() + return cursor.rowcount + + except Error as e: + logger.error(f"SQL operation failed: {e}") + logger.error(f"Failed SQL: {sql}\nParams: {params}") + conn.rollback() + return None + finally: + try: + if conn.is_connected(): + conn.cmd_reset_connection() + except Error: + pass + finally: + conn.close() + + def health_check(self) -> bool: + """连接池健康检查""" + try: + conn = self.pool.get_connection() + conn.ping(reconnect=True) + conn.close() + return True + except Error: + return False + + def execute_transaction( + self, + queries: List[Tuple[str, tuple]] + ) -> bool: + """Execute transactional operations""" + conn = self.pool.get_connection() + try: + conn.start_transaction() + with conn.cursor() as cursor: + for sql, params in queries: + cursor.execute(sql, params) + conn.commit() + return True + except Error as e: + conn.rollback() + logger.error(f"Transaction failed: {e}") + return False + finally: + conn.close() diff --git a/lib/log_config.py b/lib/log_config.py new file mode 100644 index 000000000..0f59ace65 --- /dev/null +++ b/lib/log_config.py @@ -0,0 +1,135 @@ +# SPDX-License-Identifier: MulanPSL-2.0+ + +import logging +import os +import uuid +import time +from logging.handlers import RotatingFileHandler +from typing import Dict, Any +import json +import traceback +from datetime import datetime + +class StructuredLogger: + """统一的结构化日志记录器""" + + _instance = None + + def __new__(cls, *args, **kwargs): + if not cls._instance: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self, log_dir: str = None, name: str = 'bisect-py'): + if hasattr(self, '_initialized'): + return + + self.logger = logging.getLogger(name) + self.logger.propagate = False # Prevent log propagation + self.logger.handlers = [] # Clear existing handlers + + self.session_id = str(uuid.uuid4())[:8] + self.start_time = time.time() + self.context: Dict[str, Any] = { + 'session_id': self.session_id, + 'start_time': datetime.fromtimestamp(self.start_time).isoformat() + } + + if log_dir: + self._setup_file_handlers(log_dir) + + self._setup_console_handler() + self._initialized = True + + def _setup_file_handlers(self, log_dir: str): + """配置文件处理器""" + os.makedirs(log_dir, exist_ok=True) + + # 合并为单个日志文件 + combined_handler = RotatingFileHandler( + os.path.join(log_dir, 'bisect_process.log'), + maxBytes=10*1024*1024, # 10MB + backupCount=5, + encoding='utf-8' + ) + + # 统一日志格式 + formatter = logging.Formatter( + '%(asctime)s.%(msecs)03d [%(levelname)s] %(pathname)s:%(lineno)d - %(message)s', + '%Y-%m-%d %H:%M:%S' + ) + combined_handler.setFormatter(formatter) + combined_handler.setLevel(logging.DEBUG) + + # 清理旧handler后添加新handler + self.logger.handlers = [ + h for h in self.logger.handlers + if not isinstance(h, RotatingFileHandler) + ] + self.logger.addHandler(combined_handler) + + def _setup_console_handler(self): + """配置控制台处理器""" + console = logging.StreamHandler() + console.setLevel(logging.INFO) + console.setFormatter(logging.Formatter( + '%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s', + '%Y-%m-%d %H:%M:%S' + )) + if not any(isinstance(h, logging.StreamHandler) for h in self.logger.handlers): + self.logger.addHandler(console) + self.logger.setLevel(logging.DEBUG) + + def configure(self, log_dir: str = None, level: int = logging.INFO): + """动态配置日志""" + if log_dir: + self._setup_file_handlers(log_dir) + self.logger.setLevel(level) + + def log(self, level: str, message: str, *args, **kwargs): + """统一日志记录方法,支持格式化字符串和结构化日志""" + log_entry = { + 'timestamp': datetime.now().isoformat(), + 'level': level.upper(), + 'message': message % args if args else message, + **self.context, + **kwargs + } + + # 写入JSON日志 + json_handler = next( + (h for h in self.logger.handlers + if isinstance(h, RotatingFileHandler) and 'structured' in h.baseFilename), + None + ) + if json_handler: + try: + json_handler.stream.write(json.dumps(log_entry) + '\n') + except Exception as e: + pass + + # 调用标准日志方法,添加stacklevel参数以获取正确的调用位置 + log_method = getattr(self.logger, level) + if args: + log_method(message, *args, extra=kwargs, stacklevel=3) + else: + log_method(message, extra=kwargs, stacklevel=3) + + def debug(self, message: str, *args, **kwargs): + self.log('debug', message, *args, **kwargs) + + def info(self, message: str, *args, **kwargs): + self.log('info', message, *args, **kwargs) + + def warning(self, message: str, *args, **kwargs): + self.log('warning', message, *args, **kwargs) + + def error(self, message: str, *args, **kwargs): + self.log('error', message, *args, **kwargs) + + def exception(self, message: str, **kwargs): + kwargs['exception'] = traceback.format_exc() + self.log('error', message, **kwargs) + +# 全局日志实例 +logger = StructuredLogger() diff --git a/sbin/create-manticore-tables.sh b/sbin/create-manticore-tables.sh index f2bf75037..22ee24d72 100755 --- a/sbin/create-manticore-tables.sh +++ b/sbin/create-manticore-tables.sh @@ -17,4 +17,5 @@ echo $CURL "mode=raw&query=desc jobs" $CURL "mode=raw&query=desc hosts" $CURL "mode=raw&query=desc accounts" -$CURL "mode=raw&query=desc bisect_tasks" +$CURL "mode=raw&query=desc bisect_task" +$CURL "mode=raw&query=desc regression" diff --git a/sbin/manti-table-bisect.sql b/sbin/manti-table-bisect.sql index a26761562..12a5f68da 100755 --- a/sbin/manti-table-bisect.sql +++ b/sbin/manti-table-bisect.sql @@ -1,21 +1,17 @@ -CREATE TABLE bisect_tasks( - id string, +CREATE TABLE bisect( + id bigint, bad_job_id string, error_id string, - bisect_metrics string, bisect_status string, + bisect_metrics string, project string, - pkgbuild_repo string, git_url string, - bisect_suite string, bad_commit string, first_bad_id string, first_result_root string, work_dir string, - start_time bigint, - end_time bigint, - priority_level integer, - commit_history string, - timeout integer, - job_commit_mappings json + start_time BIGINT, + end_time BIGINT, + priority_level INT, + timeout INT ) charset_table='U+0021..U+007E'; diff --git a/sbin/manti-table-regression.sql b/sbin/manti-table-regression.sql new file mode 100644 index 000000000..689787360 --- /dev/null +++ b/sbin/manti-table-regression.sql @@ -0,0 +1,17 @@ +CREATE TABLE regression( + id bigint, + + record_type string, + errid string, + category string, + first_seen bigint, + last_seen bigint, + + metric_name string, + value float, + + bisect_count bigint, + valid string, + related_jobs json +) engine='columnar' charset_table='U+0021..U+007E'; + -- Gitee From a22fa71ed1971812f5a9a932375bf835876cd86e Mon Sep 17 00:00:00 2001 From: jacknichao Date: Tue, 6 May 2025 21:13:36 +0800 Subject: [PATCH 5/5] remove space --- lib/generic_sql_client.py | 6 +++--- sbin/manti-table-regression.sql | 21 ++++++++++----------- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/lib/generic_sql_client.py b/lib/generic_sql_client.py index 0e9f25b3f..5e18154a3 100644 --- a/lib/generic_sql_client.py +++ b/lib/generic_sql_client.py @@ -10,7 +10,7 @@ from mysql.connector import pooling, Error class GenericSQLClient: """Generic SQL client base class providing connection pool and basic operations""" - + def __init__( self, host: str, @@ -97,13 +97,13 @@ class GenericSQLClient: try: with conn.cursor(dictionary=True) as cursor: cursor.execute(sql, params or ()) - + if operation == 'read': return cursor.fetchall() else: conn.commit() return cursor.rowcount - + except Error as e: logger.error(f"SQL operation failed: {e}") logger.error(f"Failed SQL: {sql}\nParams: {params}") diff --git a/sbin/manti-table-regression.sql b/sbin/manti-table-regression.sql index 689787360..ef51b7026 100644 --- a/sbin/manti-table-regression.sql +++ b/sbin/manti-table-regression.sql @@ -1,17 +1,16 @@ CREATE TABLE regression( - id bigint, + id bigint, - record_type string, + record_type string, errid string, - category string, - first_seen bigint, - last_seen bigint, + category string, + first_seen bigint, + last_seen bigint, - metric_name string, - value float, + metric_name string, + value float, - bisect_count bigint, + bisect_count bigint, valid string, - related_jobs json -) engine='columnar' charset_table='U+0021..U+007E'; - + related_jobs json +) engine='columnar' charset_table='U+0021..U+007E'; -- Gitee