File build_result.py of Package project_build_result
#/bin/env python3
# -*- encoding=utf8 -*-
#******************************************************************************
# Copyright (c) Huawei Technologies Co., Ltd. 2020-2020. All rights reserved.
# licensed under the Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
# http://license.coscl.org.cn/MulanPSL2
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
# PURPOSE.
# See the Mulan PSL v2 for more details.
# ******************************************************************************
import os
import csv
import datetime
import logging
import argparse
import subprocess
import xml.etree.ElementTree as ET
from concurrent.futures import ThreadPoolExecutor
def run(cmd, timeout=600, print_out=False):
"""run shell cmd"""
ret = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8",
timeout=timeout)
logging.info("cmd: {}".format(cmd))
if ret.stdout and print_out:
logging.info("ret.stdout: {}".format(ret.stdout))
if ret.stderr:
logging.warning("ret.stderr: {}".format(ret.stderr))
return ret.returncode, ret.stdout, ret.stderr
def write_to_csv(file_path, build_result_data):
"""write build result data to csv"""
if not build_result_data:
logging.warning(f"{file_path} has nothing to write")
return
with open(file_path, mode='w', newline="\n") as csv_file:
fieldnames = list(build_result_data[0].keys())
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
writer.writeheader()
for build_result in build_result_data:
writer.writerow(build_result)
class ProjectBuildResult:
"""project build result"""
def __init__(self, project, **kwargs):
"""param init"""
self.kwargs = kwargs
self.project = project
self.result_xml = self.kwargs.get("result_xml", "")
self.affected_package = self.kwargs.get("affected_package", 0)
self.failed_log_ip_port = self.kwargs.get("failed_log_ip_port", "")
self.published_arch_list = [] # published repository list eg: [aarch64, x86_64]
self.build_result_list = []
def parse_build_result(self):
"""parse build result, update project status and packages status"""
try:
if self.result_xml and os.path.exists(self.result_xml):
xml_tree = ET.parse(self.result_xml)
root = xml_tree.getroot()
else:
result_time = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
project_name = self.project.replace(":", "_")
xml_name = f"{project_name}_{result_time}.xml"
cmd = f"osc results {self.project} --xml > {xml_name}"
run(cmd)
cmd = f"osc results {self.project} --xml"
_, out, _ = run(cmd)
root = ET.fromstring(out)
except Exception as e:
logging.error(f"{self.project} parse Exception: {e}")
return False
for repo_result in root:
arch = repo_result.attrib["arch"]
build_status = repo_result.attrib["code"] # "published"
# if published, append self.published_repo_list
if build_status == "published":
self.published_arch_list.append(arch)
# update package build result
for pkg_result in repo_result:
pkg_result_dict = {}
package = pkg_result.attrib["package"]
build_result = pkg_result.attrib["code"]
details = ""
if build_result == "failed":
details = "please analyse the log"
if build_result == "unresolvable":
for ele in pkg_result:
details = ele.text
pkg_result_dict["project"] = self.project
pkg_result_dict["package"] = package
pkg_result_dict["arch"] = arch
pkg_result_dict["build_result"] = build_result
pkg_result_dict["details"] = details
self.build_result_list.append(pkg_result_dict)
return True
def check_project_is_published(self):
"""check project repository is published or not"""
try:
if not self.parse_build_result():
return False
except Exception as e:
logging.error(f"parse {self.project} build result Exception: {e}")
return False
if self.published_repo_list:
return True
return False
def get_affected_package(self, package):
"""get package affected package"""
# cmd = f"pkgship {package}"
# _, code, err = run(cmd)
return ""
def get_failed_unresolvable_list(self):
"""get failed and unresolvable package list"""
failed_unresolvable_list = []
for pkg_build_result in self.build_result_list:
if (pkg_build_result["build_result"] == "failed") or (pkg_build_result["build_result"] == "unresolvable"):
if self.affected_package:
affected_package = self.get_affected_package(pkg_build_result["package"])
pkg_build_result["affected_package"] = affected_package
if self.failed_log_ip_port:
failed_log_link = "NA"
if pkg_build_result["build_result"] == "failed":
failed_log_link = f"http://{self.failed_log_ip_port}/obs_build_log/{self.project}" \
f"/{pkg_build_result['arch']}/{pkg_build_result['package']}/"
pkg_build_result["failed_log_link"] = failed_log_link
failed_unresolvable_list.append(pkg_build_result)
return failed_unresolvable_list
def check_use_for_build(self):
"""check project use for build flag everyday"""
# 1. parse build result
try:
self.parse_build_result()
except Exception as e:
logging.error(f"parse {self.project} build result Exception: {e}")
return False
with ThreadPoolExecutor(50) as executor:
# 1. check use for build flag for succeeded package: open
for repo, succeeded_pkg_list in self.succeeded_pkg_dict.items():
for pkg in succeeded_pkg_list:
executor.submit(lambda p: self.pkg_check_use_for_build(*p), (repo, pkg, "open"))
# 1. check use for build flag for failed package: close
for repo, failed_pkg_list in self.failed_pkg_dict.items():
for pkg in failed_pkg_list:
executor.submit(lambda p: self.pkg_check_use_for_build(*p), (repo, pkg, "close"))
if self.check_flag_not_correct_list:
logging.error(f"{self.project} use for build flag not correct list: {self.check_flag_not_correct_list}")
return False
else:
logging.info(f"{self.project} use for build flag check no problem!")
logging.info(f"{self.project} use for build flag check finish!")
return True
if __name__ == "__main__":
par = argparse.ArgumentParser()
par.add_argument("-p", "--project", default="", help="obs project", required=False)
par.add_argument("-pf", "--project_list_file", default="", help="obs project list file", required=False)
par.add_argument("-r", "--result_xml", default="", help="obs project build result xml", required=False)
par.add_argument("-w", "--write_to_csv", default=1, help="obs project build result xml", required=False)
par.add_argument("-c", "--csv_path", default="", help="obs project build result write to csv path", required=False)
par.add_argument("-a", "--affected_package", default=0,
help="check the faile or unresolvable package affected package ", required=False)
par.add_argument("-l", "--failed_log_ip_port", default="",
help="failed log link ip:port, eg: xx.xx.xx.xx:82", required=False)
par.add_argument("-lf", "--log_to_file", default=0, help="print log to file", required=False)
args = par.parse_args()
kw = {
"result_xml": args.result_xml,
"affected_package": args.affected_package,
"failed_log_ip_port": args.failed_log_ip_port,
}
LOG_FORMAT = "%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s"
DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
if args.log_to_file:
log_file = os.path.realpath(__file__).replace(".py", ".log")
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=DATE_FORMAT, filename=log_file,
filemode="a")
else:
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=DATE_FORMAT)
project_list = [args.project]
project_name = args.project.replace(":", "_")
if os.path.exists(args.project_list_file):
with open(args.project_list_file, "r") as f:
project_list = f.read().splitlines()
project_name = ""
exit_code = 0
# project_list = ["openEuler:Mainline"]
# kw["result_xml"] = "openeuler_mainline_result_20220706.xml"
# project_list = ["openEuler:Epol"]
# kw["result_xml"] = "openeuler_epol_result_20220706.xml"
all_failed_unresolvable_pkgs = []
for obs_project in project_list:
project_build = ProjectBuildResult(obs_project, **kw)
if not project_build.parse_build_result():
logging.error(f"project {obs_project} build result parse failed")
exit_code = 1
else:
failed_unresolvable_pkgs = project_build.get_failed_unresolvable_list()
if failed_unresolvable_pkgs:
logging.warning(f"{obs_project} failed unresolvable packages num: {len(failed_unresolvable_pkgs)}")
logging.warning(f"{obs_project} failed unresolvable packages list: {failed_unresolvable_pkgs}")
all_failed_unresolvable_pkgs.extend(failed_unresolvable_pkgs)
if all_failed_unresolvable_pkgs:
logging.warning(f"final failed unresolvable packages num: {len(all_failed_unresolvable_pkgs)}")
logging.warning(f"final failed unresolvable packages list: {all_failed_unresolvable_pkgs}")
if args.write_to_csv:
if args.csv_path:
csv_path = args.csv_path
else:
time = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
csv_path = f"{project_name}_{time}.csv"
write_to_csv(csv_path, all_failed_unresolvable_pkgs)
logging.info(f"write to csv file: {csv_path} finish.")
exit(exit_code)