diff --git a/container/fluentd-base/Dockerfile b/container/fluentd-base/Dockerfile index 64f71ef9b7e85767e9dcb748bb613e042a706022..c4bdf6f0453a20706130ad1d548677ace89f9ea5 100644 --- a/container/fluentd-base/Dockerfile +++ b/container/fluentd-base/Dockerfile @@ -14,6 +14,7 @@ RUN apk add --no-cache 'ruby-dev' \ RUN umask 002 && \ gem sources -r https://rubygems.org/ -a https://gems.ruby-china.com/ && \ + gem install console:1.15.3 && \ gem install elasticsearch:7.11.1 && \ gem install fluentd && \ gem install fluent-plugin-rabbitmq && \ diff --git a/container/scheduled_task/Dockerfile b/container/scheduled_task/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..70ad31d3b9114e53e4b2b33dd46ca05794305651 --- /dev/null +++ b/container/scheduled_task/Dockerfile @@ -0,0 +1,41 @@ +# SPDX-License-Identifier: MulanPSL-2.0+ +# Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights reserved. + +FROM python:3.7-buster + +ENV PIP_SOURCE https://pypi.tuna.tsinghua.edu.cn/simple + +ARG ES_HOST +ARG ES_PORT +ARG ES_USER +ARG ES_PASSWORD +ARG SCHED_HOST +ARG SCHED_PORT + +ENV ES_HOST ${ES_HOST} +ENV ES_PORT ${ES_PORT} +ENV ES_USER ${ES_USER} +ENV ES_PASSWORD ${ES_PASSWORD} +ENV SCHED_HOST ${SCHED_HOST} +ENV SCHED_PORT ${SCHED_PORT} +ENV LKP_SRC /c/lkp-tests + +ENV SERVICE_PORT 20040 + +COPY conf/sources.list* /etc/apt/ +COPY requirements.txt . + +RUN apt-get -y update && \ + apt-get -y install git ruby ruby-dev make libssl-dev gcc g++ sudo uuid-runtime cpio +RUN gem sources -r https://rubygems.org/ -a https://gems.ruby-china.com/ + +RUN gem install faye-websocket activesupport:6.1.4.4 rest-client public_suffix:4.0.7 git + +RUN umask 002 && \ + /usr/local/bin/python3 -m pip install --upgrade pip && \ + pip3 install -r ./requirements.txt -i ${PIP_SOURCE} + +WORKDIR /opt/compass-ci/src/scheduled_task + +EXPOSE ${SERVICE_PORT} +ENTRYPOINT gunicorn --config ./conf/gunicorn_conf.py run:app diff --git a/container/scheduled_task/build b/container/scheduled_task/build new file mode 100755 index 0000000000000000000000000000000000000000..dcd7fa7be938d75b83fe317c54fba074680dbe8b --- /dev/null +++ b/container/scheduled_task/build @@ -0,0 +1,20 @@ +#!/bin/sh +# SPDX-License-Identifier: MulanPSL-2.0+ +# Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights reserved. + +. ../defconfig.sh +load_cci_defaults +load_service_authentication + +service_name=scheduled_task + +docker_skip_rebuild "$service_name:latest" + +docker build -t $service_name:latest \ + --build-arg ES_HOST=${ES_HOST} \ + --build-arg ES_PORT=${ES_PORT} \ + --build-arg ES_USER=${ES_USER} \ + --build-arg ES_PASSWORD=${ES_PASSWORD} \ + --build-arg SCHED_HOST=${SCHED_HOST} \ + --build-arg SCHED_PORT=${SCHED_PORT} \ + . diff --git a/container/scheduled_task/conf/sources.list b/container/scheduled_task/conf/sources.list new file mode 100644 index 0000000000000000000000000000000000000000..8179e01cadd258c86414c028f9eb913fa44e58d1 --- /dev/null +++ b/container/scheduled_task/conf/sources.list @@ -0,0 +1,9 @@ +deb [trusted=yes] http://mirrors.163.com/debian/ buster main non-free contrib +deb [trusted=yes] http://mirrors.163.com/debian/ buster-updates main non-free contrib +deb [trusted=yes] http://mirrors.163.com/debian/ buster-backports main non-free contrib +deb [trusted=yes] http://mirrors.163.com/debian-security/ buster/updates main non-free contrib + +deb-src [trusted=yes] http://mirrors.163.com/debian/ buster main non-free contrib +deb-src [trusted=yes] http://mirrors.163.com/debian/ buster-updates main non-free contrib +deb-src [trusted=yes] http://mirrors.163.com/debian/ buster-backports main non-free contrib +deb-src [trusted=yes] http://mirrors.163.com/debian-security/ buster/updates main non-free contrib diff --git a/container/scheduled_task/requirements.txt b/container/scheduled_task/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..784898fe724bf688ea858c80f8acf37257b71f44 --- /dev/null +++ b/container/scheduled_task/requirements.txt @@ -0,0 +1,12 @@ +flasgger==0.9.5 +Flask==1.1.2 +Flask-RESTful==0.3.9 +gevent==21.8.0 +gunicorn==20.1.0 +requests==2.28.0 +schema==0.7.5 +Jinja2==2.11.3 +MarkupSafe==2.0.1 +itsdangerous==2.0.1 +Werkzeug==2.0.2 +elasticsearch==7.8.0 diff --git a/container/scheduled_task/start b/container/scheduled_task/start new file mode 100755 index 0000000000000000000000000000000000000000..c6c1bfa1950f743e746ef250277aee192a60063f --- /dev/null +++ b/container/scheduled_task/start @@ -0,0 +1,30 @@ +#!/bin/bash +# SPDX-License-Identifier: MulanPSL-2.0+ + +[[ $CCI_SRC ]] || CCI_SRC=/c/cbs + +. $CCI_SRC/container/defconfig.sh + +load_cci_defaults + +docker_name="scheduled_task" +docker_rm $docker_name + +cmd=( + + docker run + --name $docker_name + --restart=always + -d + -p 20040:20040 + -v $CCI_SRC/src:/opt/compass-ci/src + -v $LKP_SRC:/c/lkp-tests:ro + -v /etc/localtime:/etc/localtime:ro + -v /etc/compass-ci:/etc/compass-ci:ro + -v /srv/log:/srv/log:rw + -v /root/.config:/root/.config:ro + --log-driver json-file + $docker_name +) + +"${cmd[@]}" diff --git a/container/webhook/root/src_oepkgs_pr_hook.rb b/container/webhook/root/src_oepkgs_pr_hook.rb new file mode 100755 index 0000000000000000000000000000000000000000..29f444db539df715b1276c821f5cf13fefd20d86 --- /dev/null +++ b/container/webhook/root/src_oepkgs_pr_hook.rb @@ -0,0 +1,15 @@ +#!/usr/bin/env ruby +# SPDX-License-Identifier: MulanPSL-2.0+ +# Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights reserved. +# frozen_string_literal: true + +require 'bunny' + +connection = Bunny.new('amqp://172.17.0.1:5672') +connection.start +channel = connection.create_channel + +queue = channel.queue('src_oepkgs_pr_hook') +message = ARGV[0] +queue.publish(message) +connection.close diff --git a/container/webhook/root/webhook.js b/container/webhook/root/webhook.js index 1756533d39b670bc04274171814614709fa2a38e..c429048734476c9b960dccdd3016bcd073933464 100755 --- a/container/webhook/root/webhook.js +++ b/container/webhook/root/webhook.js @@ -9,7 +9,7 @@ handler.on('error', function(err){ handler.on('push', function(event){ console.log(event.payload.repository.url) - if(event.payload.repository.url.startsWith("https://gitee.com/src-oepkgs")){ + if(event.payload.repository.url.startsWith("https://gitee.com/src-oepkgs/")){ var msg = { "commit_id" : event.payload.after, "url" : event.payload.repository.url, @@ -23,7 +23,10 @@ handler.on('push', function(event){ }) handler.on('Merge Request Hook', function(event){ - if(event.payload.action != "open"){ + if(event.payload.action != "open" && event.payload.action != "update"){ + return + } + if(event.payload.action == "update" && event.payload.action_desc != "source_branch_changed"){ return } var msg = { @@ -32,13 +35,19 @@ handler.on('Merge Request Hook', function(event){ "master" : event.payload.pull_request.head.sha } }, + "branch" : event.payload.target_branch, "url" : event.payload.pull_request.base.repo.url, "submit_command" : { "pr_merge_reference_name" : event.payload.pull_request.merge_reference_name } } console.log(msg) - spawn('ruby', ['/js/pr_hook.rb', JSON.stringify(msg)]) + + if(event.payload.pull_request.base.repo.url.startsWith("https://gitee.com/src-oepkgs/")){ + spawn('ruby', ['/js/src_oepkgs_pr_hook.rb', JSON.stringify(msg)]) + } else { + spawn('ruby', ['/js/pr_hook.rb', JSON.stringify(msg)]) + } }) http.createServer(function(req, res){ diff --git a/lib/mq_client.rb b/lib/mq_client.rb index 1b92570e1111e43c59534adce9baa9416c61fe2c..af29e0d366a0a72157c93a1c2a715a75ae54bfa1 100755 --- a/lib/mq_client.rb +++ b/lib/mq_client.rb @@ -13,6 +13,7 @@ class MQClient @conn = Bunny.new(opts) @conn.start @channel = @conn.create_channel + @channel.prefetch(opts[:prefetch_count]) if opts[:prefetch_count] end def fanout_queue(exchange_name, queue_name) diff --git a/lib/parse_install_rpm.rb b/lib/parse_install_rpm.rb index 5f4ac213f25a75dd691da3795a4af36ddd8e0af7..d54a6d6952f76e895e68b899873cbec467019c4c 100644 --- a/lib/parse_install_rpm.rb +++ b/lib/parse_install_rpm.rb @@ -16,8 +16,10 @@ def get_install_rpm_result_by_group_id(group_id) next unless job['_source']['stats'] tmp_hash = {} - tmp_hash['rpm_name'] = job['_source']['rpm_name'] - key = job['_source']['rpm_name'] + + repo = "#{job['_source']['mount_repo_addr'].split('/')[6]}" + tmp_hash['rpm_name'] = "#{job['_source']['rpm_name']},#{repo}" + key = "#{job['_source']['rpm_name']},#{repo}" tmp_hash[key] = {} srv_http_result_host = job['SRV_HTTP_RESULT_HOST'] || 'api.compass-ci.openeuler.org' srv_http_protocol = job['SRV_HTTP_PROTOCOL'] || 'https' @@ -26,6 +28,8 @@ def get_install_rpm_result_by_group_id(group_id) tmp_hash[key]['arch'] = job['_source']['arch'] tmp_hash[key]['property'] = job['_source']['property'] || 'Open Source' tmp_hash[key]['os'] = "#{job['_source']['os']} #{job['_source']['os_version']}" + tmp_hash[key]['repo'] = repo + tmp_hash[key]['rpm_name'] = key job['_source']['stats'].merge!(tmp_hash) end @@ -34,14 +38,21 @@ end def parse_rpm_name(tmp_hash, result) rpm_name = result['rpm_name'] + repo = result[rpm_name]['repo'] rpm_name_list = rpm_name.gsub(',', ' ').split(' ') + rpm_name_list.each do |rpm_name| - tmp_hash[rpm_name] = {} unless tmp_hash.key?(rpm_name) - tmp_hash[rpm_name].merge!(result[result['rpm_name']]) if rpm_name =~ /(.*)(-[^-]+){2}/ - key = "#{rpm_name}.#{tmp_hash[rpm_name]['os']}.#{tmp_hash[rpm_name]['arch']}" - tmp_hash[key] = tmp_hash[rpm_name] - tmp_hash.delete(rpm_name) + tmp_hash["#{rpm_name}.#{repo}"] = {} unless tmp_hash.key?("#{rpm_name}.#{repo}") + tmp_hash["#{rpm_name}.#{repo}"].merge!(result[result['rpm_name']]) + key = "#{rpm_name}.#{tmp_hash["#{rpm_name}.#{repo}"]['os']}.#{tmp_hash["#{rpm_name}.#{repo}"]['arch']}.#{repo}" + if tmp_hash.key?(rpm_name) + tmp_hash["#{rpm_name}.#{repo}"].merge!(tmp_hash[rpm_name]) + tmp_hash.delete(rpm_name) + end + + tmp_hash[key] = tmp_hash["#{rpm_name}.#{repo}"] + tmp_hash.delete("#{rpm_name}.#{repo}") end end tmp_hash @@ -96,6 +107,7 @@ def parse_install_rpm_result_to_json(result_list) when /^install-rpm\.(.*)_location\.element/ tmp_hash[$1] = {} unless tmp_hash.key?($1) tmp_hash[$1].merge!({ 'location' => v }) + tmp_hash[$1].merge!({ 'repo' => v[0].split('/')[6] }) when /install-rpm\.(.*)_evr.element/ tmp_hash[$1] = {} unless tmp_hash.key?($1) tmp_hash[$1].merge!({ 'evr' => v }) diff --git a/sbin/build-compat-list.rb b/sbin/build-compat-list.rb index 368802905ef0b88cd908c5990bb69670d3c138f2..946d6987a42d7eeb8352797f1ecdb6d70b914e69 100755 --- a/sbin/build-compat-list.rb +++ b/sbin/build-compat-list.rb @@ -67,12 +67,17 @@ end def update_compat_software?(index, query, info) my_data = MyData.new + repo = info['repo'].split('/')[0] data = my_data.es_query(index, query) - _id = "#{info['softwareName']}--#{info['version']}--#{info['arch']}--#{info['os']}" + _id = "#{info['softwareName']}.#{info['version']}.#{info['arch']}.#{info['os']}.#{info['repo']}" add = my_data.es_add(index, _id, info.to_json) if data['took'] == 0 data['hits']['hits'].each do |source| my_data.es_delete(index, source['_id']) unless source['_source']['install'] == 'pass' my_data.es_delete(index, source['_id']) if source['_source']['delete'] + if 4 <= source['_id'].split('--').length <= 5 + my_data.es_delete(index, source['_id']) + sleep 1 + end if source['_id'] == _id id = source['_id'] @@ -81,7 +86,6 @@ def update_compat_software?(index, query, info) my_data.es_add(index, _id, info.to_json) end end - sleep 2 end def read_csv_file(filename) @@ -189,7 +193,7 @@ def refine_json(data) pkg_info['evr'].each do |version| tmp_hash = {} - tmp_hash.merge!({ 'os' => pkg_info['os'], 'arch' => pkg_info['arch'] }) + tmp_hash.merge!({ 'os' => pkg_info['os'], 'arch' => pkg_info['arch'], 'repo' => pkg_info['repo'] }) tmp_hash.merge!({ 'property' => pkg_info['property'], 'result_url' => pkg_info['result_url'] }) tmp_hash.merge!(pkg_info).delete('evr') tmp_hash.delete('location') diff --git a/sbin/rpm-repo.rb b/sbin/rpm-repo.rb index ee27686d953026851a1846f3f422c78bfd1fbb90..0eabadb47bcb5d3196f8cd1f5fa5187cb22c2990 100755 --- a/sbin/rpm-repo.rb +++ b/sbin/rpm-repo.rb @@ -18,6 +18,7 @@ require_relative '../lib/constants.rb' MQ_HOST = ENV['MQ_HOST'] || ENV['LKP_SERVER'] || '172.17.0.1' MQ_PORT = ENV['MQ_PORT'] || 5672 +PREFETCH_COUNT = 1 # --------------------------------------------------------------------------------------------------------------------------------------- # end_user can use cmd: @@ -29,48 +30,42 @@ MQ_PORT = ENV['MQ_PORT'] || 5672 # # HandleRepo used to handle the mq queue "update_repo" # -# Example items in @update_repo_mq.queue "update_repo": { "upload_rpms" => ["/srv/rpm/upload/**/Packages/*.rpm", "/srv/rpm/upload/**/source/*.rpm"]} +# Example items in @update_repo_mq.queue "update_repo": { "upload_rpms" => ["/srv/rpm/upload/**/Packages/*.rpm", "/srv/rpm/upload/**/source/*.rpm"], "job_id": "xxx"} # handle_new_rpm # move /srv/rpm/upload/**/*.rpm to /srv/rpm/testing/**/*.rpm -# update /srv/rpm/testing/**/repodate +# update /srv/rpm/testing/**/repodata # update_pub_dir # copy /srv/rpm/testing/**/*.rpm to /srv/rpm/pub/**/*.rpm -# change /srv/rpm/pub/**/repodate +# change /srv/rpm/pub/**/repodata # @update_repo_mq.ack(info) class HandleRepo @@upload_dir_prefix = '/srv/rpm/upload/' def initialize @es = ESQuery.new(ES_HOSTS) - @update_repo_mq = MQClient.new(hostname: MQ_HOST, port: MQ_PORT) - @create_repodata_mq = MQClient.new(hostname: MQ_HOST, port: MQ_PORT) - @createrepodata_complete_mq = MQClient.new(hostname: MQ_HOST, port: MQ_PORT) + @update_repo_mq = MQClient.new(hostname: MQ_HOST, port: MQ_PORT, prefetch_count: PREFETCH_COUNT) + @create_repodata_mq = MQClient.new(hostname: MQ_HOST, port: MQ_PORT, prefetch_count: PREFETCH_COUNT) + @createrepodata_complete_mq = MQClient.new(hostname: MQ_HOST, port: MQ_PORT, prefetch_count: PREFETCH_COUNT) @log = JSONLogger.new end @@upload_flag = true @@create_repo_path = Set.new + @@create_repodata = false def handle_new_rpm update_repo_queue = @update_repo_mq.queue('update_repo') createrepodata_queue = @create_repodata_mq.queue('createrepodata') - Thread.new do - update_repo_queue.subscribe({ manual_ack: true }) do |info, _pro, msg| + Thread.new 10 do + update_repo_queue.subscribe({ block: true, manual_ack: true }) do |info, _pro, msg| loop do next unless @@upload_flag begin rpm_info = JSON.parse(msg) check_upload_rpms(rpm_info) - createrepodata_queue.publish(msg) - - rpm_info['upload_rpms'].each do |rpm| - rpm_path = File.dirname(rpm).sub('upload', 'testing') - FileUtils.mkdir_p(rpm_path) unless File.directory?(rpm_path) - @@create_repo_path << rpm_path - - dest = File.join(rpm_path.to_s, File.basename(rpm)) - FileUtils.mv(rpm, dest) - end + handle_upload_rpms(rpm_info) + @@create_repodata = false + createrepodata_queue.publish(msg) @update_repo_mq.ack(info) break rescue StandardError => e @@ -85,6 +80,55 @@ class HandleRepo end end + def handle_upload_rpms(rpm_info) + rpm_info['upload_rpms'].each do |rpm| + rpm_path = File.dirname(rpm).sub('upload', 'testing') + dest = File.join(rpm_path.to_s, File.basename(rpm)) + extras_rpm_path = create_extras_path(rpm) + + unless check_if_extras(rpm) + FileUtils.mkdir_p(rpm_path) unless File.directory?(rpm_path) + @@create_repo_path << rpm_path + FileUtils.mv(rpm, dest) + end + + FileUtils.mkdir_p(extras_rpm_path) unless File.directory?(extras_rpm_path) + @@create_repo_path << File.dirname(extras_rpm_path) + + extras_dest = File.join(extras_rpm_path.to_s, File.basename(rpm)) + unless check_if_extras(rpm) + next if File.exist?(extras_dest) + File.link(dest, extras_dest) + else + FileUtils.mv(rpm, extras_dest) + end + end + end + + def check_if_extras(rpm) + rpm_list = rpm.split('/') + if rpm.split('/')[5] == "refreshing" && rpm.split('/')[6] == "extras" + return true + elsif rpm.split('/')[5] == "extras" + return true + else + return false + end + end + + def create_extras_path(rpm) + rpm_path = File.dirname(rpm).sub('upload', 'testing') + extras_rpm_path = "" + if rpm.split('/')[5] == "refreshing" + extras_rpm_path_prefix = rpm_path.split('/')[0..4].join('/') + "/refreshing/extras/" + else + extras_rpm_path_prefix = rpm_path.split('/')[0..4].join('/') + "/extras/" + end + + extras_rpm_path_suffix = rpm_path.split('/')[-2..-1].join('/') + "/" + "#{File.basename(rpm)[0].downcase}" + extras_rpm_path = extras_rpm_path_prefix + extras_rpm_path_suffix + end + def get_results_by_group_id(group_id) query = { 'group_id' => group_id } tmp_stats_hash = get_install_rpm_result_by_group_id(query) @@ -103,25 +147,40 @@ class HandleRepo result_hash end + def update_compat_software_list(pkg_info) + query = { + 'query' => { + 'query_string' => { + 'query' => "softwareName:#{pkg_info['softwareName']}" + } + } + }.to_json + + if pkg_info['repo_name'].start_with?("refreshing") + pkg_info['repo_name'] = pkg_info['repo_name'].delete_prefix('refreshing/') + pkg_info['repo_name'] = "extras" if pkg_info['repo'] == "extras" + update_compat_software?('srpm-info-tmp', query, pkg_info) + else + pkg_info['repo_name'] = "extras" if pkg_info['repo'] == "extras" + update_compat_software?('srpm-info', query, pkg_info) + end + end + def deal_pub_dir(group_id) result_list = get_results_by_group_id(group_id) update = [] result_list.each do |pkg_info| - query = { - 'query' => { - 'query_string' => { - 'query' => "softwareName:#{pkg_info['softwareName']}" - } - } - }.to_json - next unless pkg_info['install'] == 'pass' next unless pkg_info['downloadLink'] next unless pkg_info['src_location'] + next unless pkg_info['result_root'] job_id = pkg_info['result_root'].split('/')[-1] + h = get_srpm_addr(job_id) + next if h.empty?() pkg_info.merge!(get_srpm_addr(job_id)) - update_compat_software?('srpm-info', query, pkg_info) + + update_compat_software_list(pkg_info) rpm_path = pkg_info['downloadLink'].delete_prefix!('https://api.compass-ci.openeuler.org:20018') srpm_path = pkg_info['src_location'].delete_prefix!('https://api.compass-ci.openeuler.org:20018') @@ -140,6 +199,7 @@ class HandleRepo raise JSON.dump({ 'errcode' => '200', 'errmsg' => 'upload_rpms params type error' }) if data['upload_rpms'].class != Array data['upload_rpms'].each do |rpm| + raise JSON.dump({ 'errcode' => '200', 'errmsg' => "no custom_repo_name specified", 'job_id' => "#{data['job_id']}" }) if rpm.split('/')[5] == "" raise JSON.dump({ 'errcode' => '200', 'errmsg' => "#{rpm} not exist", 'job_id' => "#{data['job_id']}" }) unless File.exist?(rpm) raise JSON.dump({ 'errcode' => '200', 'errmsg' => 'the upload directory is incorrect', 'job_id' => "#{data['job_id']}" }) unless File.dirname(rpm).start_with?(@@upload_dir_prefix) end @@ -148,113 +208,241 @@ class HandleRepo def update_pub_dir(update) pub_path_list = Set.new update.each do |rpm| - pub_path = File.dirname(rpm).sub('testing', 'pub') + if rpm.split('/')[5] == "refreshing" + pub_path = File.dirname(rpm).sub('testing', 'tmp_pub') + else + pub_path = File.dirname(rpm).sub('testing', 'pub') + end FileUtils.mkdir_p(pub_path) unless File.directory?(pub_path) dest = File.join(pub_path, File.basename(rpm)) - FileUtils.cp(rpm, dest) - pub_path_list << pub_path + next unless File.exist?(rpm) + next if File.exist?(dest) + File.link(rpm, dest) + if File.basename(pub_path) != "Packages" + pub_path_list << File.dirname(pub_path) + else + pub_path_list << pub_path + end end pub_path_list.each do |pub_path| + if File.basename(pub_path) != "Packages" + pub_path = File.dirname(pub_path) + end system("createrepo --update $(dirname #{pub_path})") end end - def create_repo + def ack_create_repo_done createrepodata_queue = @create_repodata_mq.queue('createrepodata') createrepodata_complete_queue = @createrepodata_complete_mq.queue('createrepodata_complete') Thread.new do - loop do - sleep 80 - next if @@create_repo_path.empty? - - @@upload_flag = false - # Avoid mv in handle_new_rpm() is not over. - sleep 1 - @@create_repo_path.each do |path| - system("createrepo --update $(dirname #{path})") - end - - createrepodata_queue.subscribe({ manual_ack: true }) do |info, _pro, msg| + createrepodata_queue.subscribe({ manual_ack: true }) do |info, _pro, msg| + loop do begin - createrepodata_complete_queue.publish(msg) + next unless @@create_repodata @create_repodata_mq.ack(info) + createrepodata_complete_queue.publish(msg) + break rescue StandardError => e @log.warn({ - "create_repo error message": e.message + "create_repodata error message": e.message }.to_json) @create_repodata_mq.ack(info) + break end end + end + end + end + + + # @@create_repo_path: + # Example items in @@create_repo_path: + # "/srv/rpm/testing/**/Packages" + # "/srv/rpm/testing/$os_version/extras/$arch/Packages/a-z" + # if @@create_repo_path is not empty + # update the repodata for the repo_path in the @@create_repo_path + # + # @@upload_flag: + # The initial value of @@upload_flag is true. + # when the repo is being update repodata, Packages are not allowed be moved to the repo. + # So, we use this @@upload_flag as a lock to control the movement of the packages. + # + # @@create_repodata: + # The initial value of @@create_repodata is false. + # we have to ensure that the packages have moved to "/srv/rpm/testing" from "/srv/rpm/upload", + # and the "/srv/rpm/testing/xxx" repodata already updated, then, the data in the createrepodata_queue + # can be published to createrepodata_complete_queue, and the install-rpm task can be submitted. + # Therefore, we will set the value of @@create_repodata to control the sync execution of the entire process. + def create_repo + Thread.new do + loop do + begin + sleep 180 + next if @@create_repo_path.empty? + + @@upload_flag = false + # Avoid mv in handle_new_rpm() is not over. + sleep 1 + + threads = {} + @@create_repo_path.each do |path| + thr = Thread.new do + if File.basename(path) != "Packages" + path = File.dirname(path) + end + system("createrepo --update $(dirname #{path})") + end + threads[path] = thr + end + + threads.each do |_, thr| + thr.join + end - sleep 1 + @@create_repodata = true + sleep 5 + + rescue StandardError => e + @log.warn({ + "create_repodata error message": e.message + }.to_json) + end @@create_repo_path.clear @@upload_flag = true end end end - def parse_arg(rpm_path, job_id) + def extras_parse_arg(rpm) + extras_submit_argv = ["#{ENV['LKP_SRC']}/sbin/submit", "--no-pack"] + extras_rpm_path = create_extras_path(rpm) + extras_rpm_path = File.dirname(File.dirname(extras_rpm_path)) + extras_rpm_path.sub!('/srv', '') + mount_repo_name = "extras" + extras_rpm_path = "https://api.compass-ci.openeuler.org:20018#{extras_rpm_path}" + + extras_submit_argv.push("mount_repo_addr=#{extras_rpm_path}") + extras_submit_argv.push("mount_repo_name=#{mount_repo_name}") + extras_submit_argv + end + + def rpmbuild_arg(job_id) + rpmbuild_argv = [] query_result = @es.query_by_id(job_id) - submit_argv = ["#{ENV['LKP_SRC']}/sbin/submit"] - rpm_path = File.dirname(rpm_path).sub('upload', 'testing') + + upstream_repo = query_result['upstream_repo'] + upstream_commit = query_result['upstream_commit'] + if ! upstream_repo.nil? && upstream_repo.start_with?('https://gitee.com/src-oepkgs/') && ! upstream_commit.nil? + rpmbuild_argv.push("upstream_repo=#{upstream_repo}") + rpmbuild_argv.push("upstream_commit=#{upstream_commit}") + rpmbuild_argv.push("-i /c/lkp-tests/jobs/secrets_info.yaml") + end + rpmbuild_argv.push("os=#{query_result['os']}") + rpmbuild_argv.push("os_version=#{query_result['os_version']}") + rpmbuild_argv.push("testbox=#{query_result['tbox_group']}") + rpmbuild_argv.push("queue=#{query_result['queue']}") + rpmbuild_argv.push("rpmbuild_job_id=#{job_id}") + rpmbuild_argv.push("docker_image=#{query_result['docker_image']}") if query_result.key?('docker_image') + rpmbuild_argv + end + + + # input: + # rpm: /srv/rpm/upload/**/Packages/*.rpm + # job_id: the job id of rpmbuild task + # return: + # extras_submit_argv: ["#{ENV['LKP_SRC']}/sbin/submit", "--no-pack", "install-rpm.yaml", "arch=aarch64", "xxx"] + # submit_argv: ["#{ENV['LKP_SRC']}/sbin/submit", "--no-pack", "install-rpm.yaml", "arch=aarch64", "xxx"] + def parse_arg(rpm, job_id) + extras_submit_argv = extras_parse_arg(rpm) + rpmbuild_argv = rpmbuild_arg(job_id) + extras_submit_argv = extras_submit_argv + rpmbuild_argv + + submit_argv = ["#{ENV['LKP_SRC']}/sbin/submit", "--no-pack"] + rpm_path = File.dirname(rpm).sub('upload', 'testing') rpm_path.sub!('/srv', '') - mount_repo_name = rpm_path.split('/')[4..-3].join('/') rpm_path = "https://api.compass-ci.openeuler.org:20018#{rpm_path}" rpm_path = rpm_path.delete_suffix('/Packages') submit_arch = rpm_path.split('/')[-1] - submit_argv.push("mount_repo_addr=#{rpm_path}") - submit_argv.push("arch=#{submit_arch}") - submit_argv.push("mount_repo_name=#{mount_repo_name}") - submit_argv.push("os=#{query_result['os']}") - submit_argv.push("os_version=#{query_result['os_version']}") - submit_argv.push("tbox_group=#{query_result['tbox_group']}") - submit_argv.push("rpmbuild_job_id=#{job_id}") - submit_argv.push("docker_image=#{query_result['docker_image']}") if query_result.key?('docker_image') fixed_arg = YAML.load_file('/etc/submit_arg.yaml') - submit_argv.push((fixed_arg['yaml']).to_s) + unless check_if_extras(rpm) + mount_repo_name = rpm_path.split('/')[4..-3].join('/') + submit_argv.push("arch=#{submit_arch}") + submit_argv.push("mount_repo_name=#{mount_repo_name}") + submit_argv.push("mount_repo_addr=#{rpm_path}") + submit_argv = submit_argv + rpmbuild_argv + submit_argv.push((fixed_arg['yaml']).to_s) + end + extras_submit_argv.push("arch=#{submit_arch}") + + extras_submit_argv.push((fixed_arg['yaml']).to_s) + + return submit_argv, extras_submit_argv, submit_arch + end + + + # input: + # rpm_info: { "upload_rpms" => ["/srv/rpm/upload/**/Packages/*.rpm", "/srv/rpm/upload/**/source/*.rpm"], "job_id": "xxxx"} + # return: + # real_argvs: ["#{ENV['LKP_SRC']}/sbin/submit", "--no-pack", "install-rpm.yaml", "arch=aarch64", "xxx"] + # extras_real_argvs: ["#{ENV['LKP_SRC']}/sbin/submit", "--no-pack", "install-rpm.yaml", "arch=aarch64", "xxx"] + def parse_real_argvs(rpm_info) + group_id = Time.new.strftime('%Y-%m-%d') + '-auto-install-rpm' + job_id = rpm_info['job_id'] + rpm_names = [] + real_argvs = [] + extras_real_argvs = [] + rpm_info['upload_rpms'].each do |rpm| + submit_argv, extras_submit_argv, submit_arch = parse_arg(rpm, job_id) + next if submit_arch == 'source' + + # zziplib-0.13.62-12.aarch64.rpm => zziplib-0.13.62-12.aarch64 + # zziplib-help.rpm + # zziplib-doc.rpm + rpm_name = File.basename(rpm).delete_suffix('.rpm') + rpm_names << rpm_name + real_argvs = Array.new(submit_argv) + extras_real_argvs = Array.new(extras_submit_argv) + end + rpm_names = rpm_names.join(',') + real_argvs.push("rpm_name=#{rpm_names} group_id=#{group_id}") + extras_real_argvs.push("rpm_name=#{rpm_names} group_id=#{group_id}") - return submit_argv, submit_arch + return real_argvs, extras_real_argvs, rpm_names end + + # Example items in @createrepodata_complete_mq.queue "createrepodata_complete": + # { "upload_rpms" => ["/srv/rpm/upload/**/Packages/*.rpm", "/srv/rpm/upload/**/source/*.rpm"], "job_id": "xxxx"} + # submit install-rpm task to the upload_rpms def submit_install_rpm createrepodata_complete_queue = @createrepodata_complete_mq.queue('createrepodata_complete') Thread.new do - loop do - q = createrepodata_complete_queue.subscribe({ manual_ack: true }) do |info, _pro, msg| - begin - group_id = Time.new.strftime('%Y-%m-%d') + '-auto-install-rpm' - rpm_info = JSON.parse(msg) - job_id = rpm_info['job_id'] - - rpm_names = [] - real_argvs = [] - rpm_info['upload_rpms'].each do |rpm| - submit_argv, submit_arch = parse_arg(rpm, job_id) - next if submit_arch == 'source' - - # zziplib-0.13.62-12.aarch64.rpm => zziplib-0.13.62-12.aarch64 - # zziplib-help.rpm - # zziplib-doc.rpm - rpm_name = File.basename(rpm).delete_suffix('.rpm') - rpm_names << rpm_name - real_argvs = Array.new(submit_argv) + q = createrepodata_complete_queue.subscribe({ manual_ack: true }) do |info, _pro, msg| + begin + rpm_info = JSON.parse(msg) + + real_argvs, extras_real_argvs, rpm_names = parse_real_argvs(rpm_info) + + unless real_argvs.length == 3 + Process.fork do + system(real_argvs.join(' ')) end - rpm_names = rpm_names.join(',') - real_argvs.push("rpm_name=#{rpm_names}") - real_argvs.push("group_id=#{group_id}") - system(real_argvs.join(' ')) - @createrepodata_complete_mq.ack(info) - rescue StandardError => e - @log.warn({ - "submit_install_rpm error message": e.message - }.to_json) - @createrepodata_complete_mq.ack(info) end + Process.fork do + system(extras_real_argvs.join(' ')) + end + @createrepodata_complete_mq.ack(info) + rescue StandardError => e + @log.warn({ + "submit_install_rpm error message": e.backtrace + }.to_json) + @createrepodata_complete_mq.ack(info) end - q.cancel end end end @@ -267,6 +455,7 @@ config_yaml('auto-submit') hr = HandleRepo.new hr.create_repo +hr.ack_create_repo_done hr.handle_new_rpm hr.submit_install_rpm diff --git a/sbin/src-oepkgs-management.rb b/sbin/src-oepkgs-management.rb index 82c69d4c773af8a6056cb0fc54f223fc25bd3bd4..26d520d49a31214fbd3c4d91719b4a2543decb40 100755 --- a/sbin/src-oepkgs-management.rb +++ b/sbin/src-oepkgs-management.rb @@ -38,32 +38,63 @@ MQ_PORT = ENV['MQ_PORT'] || 5672 # @mq.ack(info) class SrcOepkgs def initialize - @mq = MQClient.new(hostname: MQ_HOST, port: MQ_PORT) + @pr_mq = MQClient.new(hostname: MQ_HOST, port: MQ_PORT) + @push_mq = MQClient.new(hostname: MQ_HOST, port: MQ_PORT) @log = JSONLogger.new end + def handle_src_oepkgs_pr_hook + pr_queue = @pr_mq.queue('src_oepkgs_pr_hook') + Thread.new do + pr_queue.subscribe({ block: true, manual_ack: true }) do |info, _pro, msg| + loop do + begin + src_oepkgs_pr_hook_info = JSON.parse(msg) + check_pr_hook_info(src_oepkgs_pr_hook_info) + submit_pr_rpmbuild_job(src_oepkgs_pr_hook_info) + + @pr_mq.ack(info) + break + rescue StandardError => e + @log.warn({ + "handle_src_oepkgs_pr_hook error message": e.message + }.to_json) + @pr_mq.ack(info) + break + end + end + end + end + end + def handle_src_oepkgs_web_hook - queue = @mq.queue('src_oepkgs_web_hook') - queue.subscribe({ block: true, manual_ack: true }) do |info, _pro, msg| + push_queue = @push_mq.queue('src_oepkgs_web_hook') + push_queue.subscribe({ block: true, manual_ack: true }) do |info, _pro, msg| loop do begin src_oepkgs_webhook_info = JSON.parse(msg) check_webhook_info(src_oepkgs_webhook_info) submit_rpmbuild_job(src_oepkgs_webhook_info) - @mq.ack(info) + @push_mq.ack(info) break rescue StandardError => e @log.warn({ "handle_src_oepkgs_web_hook error message": e.message }.to_json) - @mq.ack(info) + @push_mq.ack(info) break end end end end + def check_pr_hook_info(data) + raise JSON.dump({ 'errcode' => '200', 'errmsg' => 'no commit_id params' }) unless data.key?('new_refs') + raise JSON.dump({ 'errcode' => '200', 'errmsg' => 'no upstream repo url params' }) unless data.key?('url') + raise JSON.dump({ 'errcode' => '200', 'errmsg' => 'no upstream branch params' }) unless data.key?('submit_command') + end + def check_webhook_info(data) raise JSON.dump({ 'errcode' => '200', 'errmsg' => 'no commit_id params' }) unless data.key?('commit_id') raise JSON.dump({ 'errcode' => '200', 'errmsg' => 'no upstream repo url params' }) unless data.key?('url') @@ -93,23 +124,54 @@ class SrcOepkgs return sig_name end - def parse_arg(src_oepkgs_webhook_info) + def parse_push_arg(src_oepkgs_webhook_info) upstream_commit = src_oepkgs_webhook_info['commit_id'] upstream_repo = src_oepkgs_webhook_info['url'] upstream_branch = src_oepkgs_webhook_info['branch'] - submit_argv = ["#{ENV['LKP_SRC']}/sbin/submit"] + submit_argv = ["#{ENV['LKP_SRC']}/sbin/submit", "-i", "/c/lkp-tests/jobs/secrets_info.yaml"] - fixed_arg = YAML.load_file('/etc/submit_arg.yaml') - - submit_argv.push((fixed_arg['yaml']).to_s) submit_argv.push("upstream_repo=#{upstream_repo}") submit_argv.push("upstream_commit=#{upstream_commit}") return submit_argv, upstream_repo, upstream_branch end + def parse_pr_arg(src_oepkgs_pr_hook_info) + pr_num = src_oepkgs_pr_hook_info['submit_command']['pr_merge_reference_name'].split('/')[2] + upstream_repo = src_oepkgs_pr_hook_info['url'] + upstream_branch = src_oepkgs_pr_hook_info['branch'] + upstream_commit = src_oepkgs_pr_hook_info['new_refs']['heads']['master'] + submit_argv = ["#{ENV['LKP_SRC']}/sbin/submit", "-i", "/c/lkp-tests/jobs/secrets_info.yaml"] + + src_oepkgs_pr_hook_info['submit_command'].each do |k, v| + submit_argv.push("#{k}=#{v}") + end + submit_argv.push("pr_num=#{pr_num}") + submit_argv.push("upstream_repo=#{upstream_repo}") + submit_argv.push("upstream_commit=#{upstream_commit}") + + return submit_argv, upstream_branch + end + + def submit_pr_rpmbuild_job(src_oepkgs_pr_hook_info) + submit_argv, upstream_branch = parse_pr_arg(src_oepkgs_pr_hook_info) + real_argvs = Array.new(submit_argv) + + os_version = parse_os_version(upstream_branch) + raise JSON.dump({ 'errcode' => '200', 'errmsg' => 'no os_version' }) unless os_version + + if os_version == "22.03-LTS" + real_argvs.push("rpmbuild-vm.yaml os=openeuler os_version=#{os_version} testbox=vm-2p8g") + else + real_argvs.push("rpmbuild.yaml docker_image=openeuler:#{os_version} testbox=dc-16g") + end + Process.fork do + system(real_argvs.join(' ')) + end + end + def submit_rpmbuild_job(src_oepkgs_webhook_info) - submit_argv, upstream_repo, upstream_branch = parse_arg(src_oepkgs_webhook_info) + submit_argv, upstream_repo, upstream_branch = parse_push_arg(src_oepkgs_webhook_info) real_argvs = Array.new(submit_argv) @@ -121,8 +183,15 @@ class SrcOepkgs raise JSON.dump({ 'errcode' => '200', 'errmsg' => 'no os_version' }) unless os_version real_argvs.push("custom_repo_name=contrib/#{sig_name}") - real_argvs.push("docker_image=openeuler:#{os_version} testbox=dc-16g") - system(real_argvs.join(' ')) + if os_version == "22.03-LTS" + real_argvs.push("rpmbuild-vm.yaml os=openeuler os_version=#{os_version} testbox=vm-2p8g") + else + real_argvs.push("rpmbuild.yaml docker_image=openeuler:#{os_version} testbox=dc-16g") + end + + Process.fork do + system(real_argvs.join(' ')) + end end end @@ -130,4 +199,5 @@ do_local_pack so = SrcOepkgs.new config_yaml('auto-submit') +so.handle_src_oepkgs_pr_hook so.handle_src_oepkgs_web_hook diff --git a/src/lib/job.cr b/src/lib/job.cr index 07c4c161a0bffea1eaf2ecdcbdc5faadb1c79b19..c40671d5b0f323dff84b400da1d09746bf71d83c 100644 --- a/src/lib/job.cr +++ b/src/lib/job.cr @@ -188,7 +188,7 @@ class Job checkout_max_run() end - private def set_defaults + def set_defaults extract_user_pkg() set_os_mount() append_init_field() @@ -963,4 +963,25 @@ class Job end @hash["pkg_data"] = JSON::Any.new(new_pkg_data) end + + def delete_kernel_params + @hash.delete("kernel_version") + @hash.delete("kernel_uri") + @hash.delete("modules_uri") + end + + def delete_host_info + @hash.delete("memory") + @hash.delete("nr_hdd_partitions") + @hash.delete("hdd_partitions") + @hash.delete("ssd_partitions") + @hash.delete("rootfs_disk") + @hash.delete("mac_addr") + @hash.delete("arch") + @hash.delete("nr_node") + @hash.delete("nr_cpu") + @hash.delete("model_name") + @hash.delete("ipmi_ip") + @hash.delete("serial_number") + end end diff --git a/src/libpy/es_client.py b/src/libpy/es_client.py new file mode 100644 index 0000000000000000000000000000000000000000..72cb9a402d6cc8ec101ce8717803362230bb27d5 --- /dev/null +++ b/src/libpy/es_client.py @@ -0,0 +1,18 @@ +import os + +from elasticsearch import Elasticsearch + + +class EsClient: + def __init__(self): + hosts = ["http://{0}:{1}".format(os.getenv("ES_HOST", '172.17.0.1'), os.getenv("ES_PORT", 9200))] + self.es_handler = Elasticsearch(hosts=hosts, + http_auth=(os.getenv("ES_USER"), os.getenv("ES_PASSWORD")), timeout=3600) + + def search_by_id(self, index: str, doc_id: str) -> dict: + """ + :param index: + :param doc_id: + :return: {} + """ + return self.es_handler.get(index=index, id=doc_id, ignore=404) diff --git a/src/scheduled_task/app/lib/scheduled_task.py b/src/scheduled_task/app/lib/scheduled_task.py new file mode 100755 index 0000000000000000000000000000000000000000..a2fa4c8e5ba178c570644566fcff9d5b3df6b73e --- /dev/null +++ b/src/scheduled_task/app/lib/scheduled_task.py @@ -0,0 +1,53 @@ +import subprocess +import os +import re +import json +from libpy.es_client import EsClient + +class SubmitJob(): + def __init__(self, request_body): + self.request_body = request_body + + def submit_job(self): + job_ids = [] + job_id_pattern = re.compile(r"got job id=([^ ]*)") + + pre_submit_command = "/c/lkp-tests/sbin/submit" + pre_submit_command += " os=" + self.request_body["os"].strip() + pre_submit_command += " os_arch=" + self.request_body["os_arch"].strip() + pre_submit_command += " os_version=" + self.request_body["os_version"].strip() + pre_submit_command += " " + self.request_body["framework"].strip() + ".yaml" + + for case in self.request_body["case_list"]: + submit_command = pre_submit_command + submit_command += " name=" + case["name"] + + if case["machine_type"] == "vm": + testbox = "vm-" + case["cpu"] + case["memory"] + elif case["machine_type"] == "vt": + testbox = "vt-" + case["cpu"] + case["memory"] + else: + testbox = "dc-" + case["memory"] + + submit_command += " testbox=" + testbox + + submit_output = subprocess.getoutput(submit_command) + job_id = job_id_pattern.findall(submit_output)[0] + job_ids.append(job_id) + + return {'job_ids': job_ids} + +class CheckJob(): + def __init__(self): + self.es = EsClient() + + def check_job_info(self, job_id): + job_info = self.es.search_by_id("jobs", job_id)['_source'] + extract_keys = ['os', 'os_arch', 'os_variant', 'os_version', 'os_project', 'testbox', 'job_stage', 'job_health', 'submit_time', 'start_time', 'end_time'] + extract_dict = {} + + for key in extract_keys: + if key in job_info: + extract_dict[key] = job_info[key] + + return extract_dict diff --git a/src/scheduled_task/app/route.py b/src/scheduled_task/app/route.py new file mode 100755 index 0000000000000000000000000000000000000000..415acba63d7e5eaf1f467b67b28a8cd1541b1d8f --- /dev/null +++ b/src/scheduled_task/app/route.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python3 + +from flask import request +from jsonschema.validators import validate +from flask_restful import Resource +from scheduled_task.app.lib.scheduled_task import SubmitJob, CheckJob +import time +import json + +class RadiaTest(Resource): + @staticmethod + def post(): + request_body = request.json + + my_schema = { + "type": "object", + "properties": { + "os": { + "type": "string" + }, + "os_arch": { + "type": "string" + }, + "os_version": { + "type": "string" + }, + "framework": { + "type": "string" + }, + "case_list": { + "type": "array", + "items": { + "type": "object", + "required": [ + "name", + "cpu", + "memory", + "machine_type" + ] + }, + "properties": { + "name": { + "type": "string" + }, + "cpu": { + "type": "string" + }, + "memory": { + "type": "string" + }, + "machine_type": { + "type": "string" + } + } + } + }, + "required": [ + "os" + ] + } + + validate(instance=request_body, schema=my_schema) + submit_job = SubmitJob(request_body) + submit_result = submit_job.submit_job() + + return submit_result + +class CheckJobResult(Resource): + @staticmethod + def get(job_id): + check_job = CheckJob() + check_result = check_job.check_job_info(job_id) + + return check_result diff --git a/src/scheduled_task/conf/gunicorn_conf.py b/src/scheduled_task/conf/gunicorn_conf.py new file mode 100644 index 0000000000000000000000000000000000000000..160083ba5c8cdff47f7faac96e2fb6a999d465eb --- /dev/null +++ b/src/scheduled_task/conf/gunicorn_conf.py @@ -0,0 +1,33 @@ +# Copyright (c) [2022] Huawei Technologies Co.,Ltd.ALL rights reserved. +# This program is licensed under Mulan PSL v2. +# You can use it according to the terms and conditions of the Mulan PSL v2. +# http://license.coscl.org.cn/MulanPSL2 +# THIS PROGRAM IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +# See the Mulan PSL v2 for more details. +#################################### +# @Author : +# @email : +# @Date : +# @License : Mulan PSL v2 +##################################### + +import os + +bind = "0.0.0.0:" + os.environ["SERVICE_PORT"].strip() +timeout = 30 +daemon = 'false' +worker_class = 'gevent' + +# workers = multiprocessing.cpu_count() * 2 + 1 +workers = 8 + +threads = 2 + +pidfile = '/var/log/scheduled_task.pid' +loglevel = 'info' +access_log_format = '%(t)s %(p)s %(h)s "%(r)s" %(s)s %(L)s %(b)s %(f)s" "%(a)s"' + +# accesslog = "/srv/log/scheduled_task_access.log" +# errorlog = "/srv/log/scheduled_task_error.log" diff --git a/src/scheduled_task/run.py b/src/scheduled_task/run.py new file mode 100644 index 0000000000000000000000000000000000000000..081f5d50c86b71cc49d2222b8b9b2bfbea557b13 --- /dev/null +++ b/src/scheduled_task/run.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python +# coding=utf-8 + +import os +import sys +from flask import Flask +from flask_restful import Api +from flask import request +from flasgger import Swagger + +sys.path.append(os.path.abspath("../")) + +from scheduled_task.app.route import RadiaTest, CheckJobResult + +app = Flask(__name__) + +api = Api(app) +api.add_resource(RadiaTest, '/api/submit/radia-test') +api.add_resource(CheckJobResult, '/api/data-api/get-job-info/') + +Swagger(app, Swagger.DEFAULT_CONFIG) diff --git a/src/scheduler/plugins/cluster.cr b/src/scheduler/plugins/cluster.cr index 0c55e1bbc0c6aabb6cac08dc1738d89b2c9f3df5..55ebd9db0012d39c07cf029b289b96c09dce4cde 100644 --- a/src/scheduler/plugins/cluster.cr +++ b/src/scheduler/plugins/cluster.cr @@ -52,13 +52,16 @@ class Cluster < PluginsCommon # continue if role in cluster spec matches role in job next if (spec["roles"].as_a.map(&.to_s) & roles).empty? + job_id = @redis.get_job_id(lab) + single_job = Job.new(JSON.parse(job.dump_to_json), job_id) + single_job.delete_host_info + single_job.delete_kernel_params + host_info = Utils.get_host_info(host.to_s) - job.update(host_info) + single_job.update(host_info) queue = host.to_s queue = queue = $1 if queue =~ /(\S+)--[0-9]+$/ - job_id = @redis.get_job_id(lab) - # return when job_id is '0' # 2 Questions: # - how to deal with the jobs added to DB prior to this loop @@ -66,10 +69,11 @@ class Cluster < PluginsCommon job_ids << job_id # add to job content when multi-test - job["testbox"] = queue - job["queue"] = queue - job.update_tbox_group(queue) - job["node_roles"] = spec["roles"].as_a.join(" ") + single_job["testbox"] = queue + single_job["queue"] = queue + single_job.update_tbox_group(queue) + single_job["os_arch"] = host_info["arch"] + single_job["node_roles"] = spec["roles"].as_a.join(" ") if spec["macs"]? direct_macs = spec["macs"].as_a direct_ips = [] of String @@ -78,16 +82,17 @@ class Cluster < PluginsCommon direct_ips << "#{net_id}.#{ip0}" ip0 += 1 end - job["direct_macs"] = direct_macs.join(" ") - job["direct_ips"] = direct_ips.join(" ") + single_job["direct_macs"] = direct_macs.join(" ") + single_job["direct_ips"] = direct_ips.join(" ") end # multi-machine test requires two network cards - job["nr_nic"] = "2" + single_job["nr_nic"] = "2" - job.update_id(job_id) + single_job.update_id(job_id) + single_job.set_defaults - jobs << Job.new(JSON.parse(job.dump_to_json), job_id) + jobs << single_job end cluster_id = job_ids[0]