diff --git a/src/elasticsearch_client.cr b/src/elasticsearch_client.cr index 8094bff95fdb095ed1570504caffd8c96f292289..0616e9acd61b50638422d7fa48b69985111ef47f 100644 --- a/src/elasticsearch_client.cr +++ b/src/elasticsearch_client.cr @@ -382,25 +382,28 @@ class Elasticsearch::Client if Sched.options.should_read_manticore match = build_query_string(matches, " ", false) fields = Manticore.filter_sql_fields(fields) - match = Manticore.filter_sql_fields(match) others = Manticore.filter_sql_fields(others) sql_cmd = URI.encode_www_form("SELECT #{fields} FROM #{index} WHERE MATCH('#{match}') #{others}") host_port = "#{@settings[:manticore_host]}:#{@settings[:manticore_port]}" - response = perform_one_request(host_port, "sql", nil, "POST", "query=" + sql_cmd) + response = perform_one_request(host_port, "sql", nil, "POST", "mode=raw&query=" + sql_cmd) body = response.body # Filter the SQL result if fields are not '*' body = Manticore.filter_sql_result(body) if fields != '*' - json_hash = JSON.parse(body).as_h - if json_hash.has_key? "error" + parsed_json = JSON.parse(body) + json_hash = if parsed_json.is_a?(JSON::Any) + parsed_json[0]?.try &.as_h || {} of String => JSON::Any + else + parsed_json.as_h + end + if json_hash.has_key? "error" && !json_hash["error"].to_s.empty? error_message = json_hash["error"] raise "Manticore SQL Error: #{error_message} sql_cmd is #{sql_cmd}" end # Parse the JSON response and extract the results - results = json_hash["hits"]["hits"].as_a - results = Manticore.jobs_from_manticore(results) + results = json_hash["data"].as_a return results end diff --git a/src/job.cr b/src/job.cr index 3e4c04f6f1270da23f6744584eda6800b9470d72..54d3e5f41854ac50dbd856dd1e61daf38c4a790e 100644 --- a/src/job.cr +++ b/src/job.cr @@ -361,6 +361,7 @@ class JobHash docker_image pp_params_md5 + ss_params_md5 all_params_md5 nr_run @@ -1098,7 +1099,15 @@ class Job < JobHash self.pp_params_md5 = get_md5(flat_pp_hash) end + flat_ss_hash = Hash(String, String).new + if @hash_hhh["ss"]? + flat_ss_hash = flat_hh(@hash_hhh["ss"]) + self.ss_params_md5 = get_md5(flat_ss_hash) + end + all_params = flat_pp_hash + all_params.merge!(flat_ss_hash) + COMMON_PARAMS.each do |param| all_params[param] = @hash_plain[param] end diff --git a/src/pkgbuild.cr b/src/pkgbuild.cr index dfd5de1cd4cb2151598d73fcc9234dbc2d87ba87..d82f98ae72eb19e9d4417575f73e99a9e23f6277 100644 --- a/src/pkgbuild.cr +++ b/src/pkgbuild.cr @@ -20,9 +20,44 @@ class PkgBuild < PluginsCommon wait_jobs = {} of String => Nil ss.each do |pkg_name, pkg_params| build_job = create_pkgbuild_job(job, pkg_name, pkg_params) + if build_job - submit_pkgbuild_job(build_job) - wait_jobs[build_job.id] = nil + build_job.init_submit + limit = 3 + skip_submit = false + + submitted_jobs = find_existing_job(build_job, limit) + + submitted_jobs.each do |select_job| + select_job_id = select_job["id"].to_s + select_job_stage = select_job["job_stage"].to_s + select_job_health = select_job["job_health"].to_s + + # build job running or submitted, need add to wait_jobs, then handle next ss build job + if select_job_stage != "finish" + @log.info { "Found existing submit job #{select_job_id} for #{pkg_name}, adding to wait_jobs" } + wait_jobs[select_job_id] = nil + skip_submit = true + break + # build job run success, don't add to wait_jobs, should handle next ss build job + elsif select_job_health == "success" + @log.info { "Found success job #{select_job_id} for #{pkg_name}, handle next ss build job" } + skip_submit = true + break + end + end + + if skip_submit + next + end + + if submitted_jobs.size < limit + Sched.instance.on_job_submit(build_job) + @log.info { "submit job #{build_job.id} for #{pkg_name}, adding to wait_jobs" } + wait_jobs[build_job.id] = nil + else + raise "the all_params_md5: #{build_job.all_params_md5} job build failed, build times: #{submitted_jobs.size}" + end end end @@ -38,9 +73,13 @@ class PkgBuild < PluginsCommon raise ex.to_s end - def submit_pkgbuild_job(build_job) - build_job.init_submit - Sched.instance.on_job_submit(build_job) + private def find_existing_job(build_job : Job, limit : Int) + query_submitted = { + "all_params_md5" => "#{build_job.all_params_md5}" + } + custom_condition = "LIMIT #{limit}" + + Sched.instance.es.select("jobs", query_submitted, "id, job_stage, job_health", custom_condition) end # ss: