diff --git a/container/bisect/bisect-task.py b/container/bisect/bisect-task.py index d290d38cfc2fdf9a42494111b48012cc16602c0e..36e07445b58559952fff0edcf3b33834318e66fe 100755 --- a/container/bisect/bisect-task.py +++ b/container/bisect/bisect-task.py @@ -11,15 +11,16 @@ # fork process, start bisect import json +import logging +import os import re import subprocess -import yaml -import os import sys -import uuid -import time import threading -import logging +import time +import uuid + +import yaml from flask import Flask, jsonify, request from flask.views import MethodView from httpx import RequestError @@ -161,11 +162,11 @@ class BisectTask: try: #检查bisect_task的状态,乐观锁 if bisect_task.get('bisect_status') != 'wait': - logging.warning(f"Skipping task {task_id} with invalid status: {bisect_task['bisect_status']}") + logging.warning(f"Skipping task {task_id} with invalid status: {bisect_task.get('bisect_status')}") continue # Update task status to 'processing' in Elasticsearch bisect_task["bisect_status"] = "processing" - es_client.update_by_id("bisect_index", bisect_task["id"], bisect_task) + es_client.update_by_id("bisect_task", bisect_task["id"], bisect_task) logging.debug(f"Started processing task: {bisect_task['id']}") # Prepare task data for Git bisect @@ -181,14 +182,14 @@ class BisectTask: # Update task status and result in Elasticsearch bisect_task["bisect_status"] = "completed" bisect_task["bisect_result"] = result - es_client.update_by_id("bisect_index", bisect_task["id"], bisect_task) + es_client.update_by_id("bisect_task", bisect_task["id"], bisect_task) logging.debug(f"Completed processing task: {bisect_task['id']}") except Exception as e: # Update task status to 'failed' in case of an error bisect_task["bisect_status"] = "failed" bisect_task["bisect_result"] = str(e) - es_client.update_by_id("bisect_index", bisect_task["id"], bisect_task) + es_client.update_by_id("bisect_task", bisect_task["id"], bisect_task) logging.error(f"Marked task {bisect_task['id']} as failed due to error: {e}") def submit_bisect_tasks(self, bisect_tasks): @@ -238,7 +239,7 @@ class BisectTask: # 添加重试机制 retry_count = 0 while retry_count < 3: - logging.error(f"Submission failed for task {task_id} {retry_count+1} times") + logging.error(f"Submission failed for task {task_id} {retry_count+1} times: {str(e)}") try: current_query = query_if_bisect_already_in_db.copy() current_query["query"]["bool"]["must"][0]["term"]["error_id"] = bisect_task["error_id"] @@ -265,7 +266,7 @@ class BisectTask: "bool": { "must": [ {"exists": {"field": "id"}}, # Task must have an id - {"term": {"bisect-status": "wait"}} # Task status must be 'wait' + {"term": {"bisect_status": "wait"}} # Task status must be 'wait' ] } } @@ -472,10 +473,20 @@ class BisectTask: response = self.es_query(index="bisect_task", query=query) if response is None: logging.warning(f"ES query failed during existence check. bad_job_id={bad_job_id}") - return True # 失败时保守返回False,防止重复提交 + return True # 失败时保守返回True,防止重复提交 logging.debug(f"Query response for bad_job_id={bad_job_id}, error_id={error_id}: {response}") - total_hits = response['hits']['total']['value'] if response else 1 - return total_hits > 0 + + # 检查响应结构并获取命中数量 + if isinstance(response, dict) and 'hits' in response: + total_hits = response['hits'].get('total', {}) + if isinstance(total_hits, dict): + return total_hits.get('value', 0) > 0 + else: + return total_hits > 0 # 兼容旧版ES格式 + elif isinstance(response, list): + return len(response) > 0 + else: + return False except KeyError as e: # 处理ES响应结构异常 @@ -502,23 +513,29 @@ class BisectAPI(MethodView): def main(): try: + # 添加API路由 app.add_url_rule('/new_bisect_task', view_func=BisectAPI.as_view('bisect_api')) - app.run(host='0.0.0.0', port=9999) + + # 创建BisectTask实例 run = BisectTask() - bisect_producer_thread = threading.Thread(target=run.bisect_producer) + + # 启动生产者线程 + bisect_producer_thread = threading.Thread(target=run.bisect_producer, daemon=True) bisect_producer_thread.start() + # 启动消费者线程 num_consumer_threads = 2 + consumer_threads = [] for i in range(num_consumer_threads): - bisect_consumer_thread = threading.Thread(target=run.bisect_consumer) + bisect_consumer_thread = threading.Thread(target=run.bisect_consumer, daemon=True) bisect_consumer_thread.start() + consumer_threads.append(bisect_consumer_thread) - bisect_producer_thread.join() - for bisect_consumer_thread in threading.enumerate(): - if bisect_consumer_thread != threading.current_thread(): - bisect_consumer_thread.join() + # 启动Flask应用(这会阻塞主线程) + app.run(host='0.0.0.0', port=9999) + except Exception as e: - print("Error when init_bisect_commit:" + e) + print("Error when init_bisect_commit:" + str(e)) sys.exit(-1) diff --git a/sbin/auto_submit b/sbin/auto_submit index b7a7a3eba9f5e5c666e8e841829f1c46156b92fa..297d41a947afe2ea4d8cc0fc3232045b29e01119 100755 --- a/sbin/auto_submit +++ b/sbin/auto_submit @@ -166,6 +166,8 @@ class AutoSubmit end def excute_ccb_command(cmd) + return nil if cmd.nil? || cmd.empty? + out = %x(#{cmd}) begin out_hash = JSON.parse(out[/{[\s\S]*}/]) @@ -184,6 +186,20 @@ class AutoSubmit return out_hash end + def make_http_request(url, data, headers = {}) + begin + RestClient.post(url, data.to_json, { content_type: :json, accept: :json }.merge(headers)) + rescue RestClient::ExceptionWithResponse => e + return "{\"status_code\": #{e.response.code}, \"url\": \"#{url}\"}" + end + end + + def execute_git_cmd(cmd) + out = %x(#{cmd} 2>&1) + return nil unless $?.success? + out + end + def excute_pr_build(os_project, pr_repo, prid) request_info = { "build_type" => "static", @@ -196,21 +212,13 @@ class AutoSubmit end config = load_my_config url = "http://#{config['GATEWAY_IP']}:#{config['GATEWAY_PORT']}/api/os/#{os_project}/build_dag" - begin - RestClient.post(url, request_info.to_json, { content_type: :json, accept: :json, 'Authorization' => jwt }) - rescue RestClient::ExceptionWithResponse => e - return "{\"status_code\": #{e.response.code}, \"url\": \"#{url}\"}" - end + make_http_request(url, request_info, { 'Authorization' => jwt }) end def gitee_comment(pr_url, comment) request_info = {"pr_url" => pr_url, "token" => ENV['GITEE_ACCESS_TOKEN'], "comment" => comment} url = "http://#{ENV['CODE_HOSTING_CLIENT_HOST']}:#{ENV['CODE_HOSTING_CLIENT_PORT']}/api/code_hosting_client/gitee/pr/comment" - begin - RestClient.post(url, request_info.to_json, { content_type: :json, accept: :json}) - rescue RestClient::ExceptionWithResponse => e - return "{\"status_code\": #{e.response.code}, \"url\": \"#{url}\"}" - end + make_http_request(url, request_info) end def load_cbs_meta_relm_config_update(url, prid, file_filter=/#.*$/) @@ -218,9 +226,14 @@ class AutoSubmit if File.directory? "#{clone_dir}" out = %x(rm -rf #{clone_dir}) end - out = %x(git clone #{url} #{clone_dir}) - out = %x(cd #{clone_dir}; git fetch origin pull/#{prid}/head; git checkout FETCH_HEAD) - out = %x(cd #{clone_dir}; git diff HEAD~1 --name-only;) + + return nil unless execute_git_cmd("git clone #{url} #{clone_dir}") + return nil unless execute_git_cmd("cd #{clone_dir} && git fetch origin pull/#{prid}/head") + return nil unless execute_git_cmd("cd #{clone_dir} && git checkout FETCH_HEAD") + + out = execute_git_cmd("cd #{clone_dir} && git diff HEAD~1 --name-only") + return nil unless out + filelist = out.split(/\n/) @log.info("new/update filelist: #{filelist}") config_hash = {} @@ -615,9 +628,13 @@ class AutoSubmit def cbs_pr_listen @cbs_pr_message_queue.subscribe(block: true) do |_delivery, _properties, message| Thread.new do - @log.info(message) - message_info = JSON.parse(message) - deal_local_submit(message_info) + begin + @log.info(message) + message_info = JSON.parse(message) + deal_local_submit(message_info) + rescue JSON::ParseError => e + @log.error("Failed to parse CBS PR message: #{e.message}") + end end sleep(0.1) end @@ -626,8 +643,12 @@ class AutoSubmit def listen @queue.subscribe(block: true) do |_delivery, _properties, message| Thread.new do - message_info = JSON.parse(message) - submit_job(message_info) + begin + message_info = JSON.parse(message) + submit_job(message_info) + rescue JSON::ParseError => e + @log.error("Failed to parse message: #{e.message}") + end end sleep(0.1) end