Projects
home:zhouxiaxiang
project_build_result
build_result.py
Sign Up
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File build_result.py of Package project_build_result
#/bin/env python3 # -*- encoding=utf8 -*- #****************************************************************************** # Copyright (c) Huawei Technologies Co., Ltd. 2020-2020. All rights reserved. # licensed under the Mulan PSL v2. # You can use this software according to the terms and conditions of the Mulan PSL v2. # You may obtain a copy of Mulan PSL v2 at: # http://license.coscl.org.cn/MulanPSL2 # THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR # PURPOSE. # See the Mulan PSL v2 for more details. # ****************************************************************************** import os import csv import datetime import logging import argparse import subprocess import xml.etree.ElementTree as ET from concurrent.futures import ThreadPoolExecutor def run(cmd, timeout=600, print_out=False): """run shell cmd""" ret = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8", timeout=timeout) logging.info("cmd: {}".format(cmd)) if ret.stdout and print_out: logging.info("ret.stdout: {}".format(ret.stdout)) if ret.stderr: logging.warning("ret.stderr: {}".format(ret.stderr)) return ret.returncode, ret.stdout, ret.stderr def write_to_csv(file_path, build_result_data): """write build result data to csv""" if not build_result_data: logging.warning(f"{file_path} has nothing to write") return with open(file_path, mode='w', newline="\n") as csv_file: fieldnames = list(build_result_data[0].keys()) writer = csv.DictWriter(csv_file, fieldnames=fieldnames) writer.writeheader() for build_result in build_result_data: writer.writerow(build_result) class ProjectBuildResult: """project build result""" def __init__(self, project, **kwargs): """param init""" self.kwargs = kwargs self.project = project self.result_xml = self.kwargs.get("result_xml", "") self.affected_package = self.kwargs.get("affected_package", 0) self.failed_log_ip_port = self.kwargs.get("failed_log_ip_port", "") self.published_arch_list = [] # published repository list eg: [aarch64, x86_64] self.build_result_list = [] def parse_build_result(self): """parse build result, update project status and packages status""" try: if self.result_xml and os.path.exists(self.result_xml): xml_tree = ET.parse(self.result_xml) root = xml_tree.getroot() else: result_time = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S") project_name = self.project.replace(":", "_") xml_name = f"{project_name}_{result_time}.xml" cmd = f"osc results {self.project} --xml > {xml_name}" run(cmd) cmd = f"osc results {self.project} --xml" _, out, _ = run(cmd) root = ET.fromstring(out) except Exception as e: logging.error(f"{self.project} parse Exception: {e}") return False for repo_result in root: arch = repo_result.attrib["arch"] build_status = repo_result.attrib["code"] # "published" # if published, append self.published_repo_list if build_status == "published": self.published_arch_list.append(arch) # update package build result for pkg_result in repo_result: pkg_result_dict = {} package = pkg_result.attrib["package"] build_result = pkg_result.attrib["code"] details = "" if build_result == "failed": details = "please analyse the log" if build_result == "unresolvable": for ele in pkg_result: details = ele.text pkg_result_dict["project"] = self.project pkg_result_dict["package"] = package pkg_result_dict["arch"] = arch pkg_result_dict["build_result"] = build_result pkg_result_dict["details"] = details self.build_result_list.append(pkg_result_dict) return True def check_project_is_published(self): """check project repository is published or not""" try: if not self.parse_build_result(): return False except Exception as e: logging.error(f"parse {self.project} build result Exception: {e}") return False if self.published_repo_list: return True return False def get_affected_package(self, package): """get package affected package""" # cmd = f"pkgship {package}" # _, code, err = run(cmd) return "" def get_failed_unresolvable_list(self): """get failed and unresolvable package list""" failed_unresolvable_list = [] for pkg_build_result in self.build_result_list: if (pkg_build_result["build_result"] == "failed") or (pkg_build_result["build_result"] == "unresolvable"): if self.affected_package: affected_package = self.get_affected_package(pkg_build_result["package"]) pkg_build_result["affected_package"] = affected_package if self.failed_log_ip_port: failed_log_link = "NA" if pkg_build_result["build_result"] == "failed": failed_log_link = f"http://{self.failed_log_ip_port}/obs_build_log/{self.project}" \ f"/{pkg_build_result['arch']}/{pkg_build_result['package']}/" pkg_build_result["failed_log_link"] = failed_log_link failed_unresolvable_list.append(pkg_build_result) return failed_unresolvable_list def check_use_for_build(self): """check project use for build flag everyday""" # 1. parse build result try: self.parse_build_result() except Exception as e: logging.error(f"parse {self.project} build result Exception: {e}") return False with ThreadPoolExecutor(50) as executor: # 1. check use for build flag for succeeded package: open for repo, succeeded_pkg_list in self.succeeded_pkg_dict.items(): for pkg in succeeded_pkg_list: executor.submit(lambda p: self.pkg_check_use_for_build(*p), (repo, pkg, "open")) # 1. check use for build flag for failed package: close for repo, failed_pkg_list in self.failed_pkg_dict.items(): for pkg in failed_pkg_list: executor.submit(lambda p: self.pkg_check_use_for_build(*p), (repo, pkg, "close")) if self.check_flag_not_correct_list: logging.error(f"{self.project} use for build flag not correct list: {self.check_flag_not_correct_list}") return False else: logging.info(f"{self.project} use for build flag check no problem!") logging.info(f"{self.project} use for build flag check finish!") return True if __name__ == "__main__": par = argparse.ArgumentParser() par.add_argument("-p", "--project", default="", help="obs project", required=False) par.add_argument("-pf", "--project_list_file", default="", help="obs project list file", required=False) par.add_argument("-r", "--result_xml", default="", help="obs project build result xml", required=False) par.add_argument("-w", "--write_to_csv", default=1, help="obs project build result xml", required=False) par.add_argument("-c", "--csv_path", default="", help="obs project build result write to csv path", required=False) par.add_argument("-a", "--affected_package", default=0, help="check the faile or unresolvable package affected package ", required=False) par.add_argument("-l", "--failed_log_ip_port", default="", help="failed log link ip:port, eg: xx.xx.xx.xx:82", required=False) par.add_argument("-lf", "--log_to_file", default=0, help="print log to file", required=False) args = par.parse_args() kw = { "result_xml": args.result_xml, "affected_package": args.affected_package, "failed_log_ip_port": args.failed_log_ip_port, } LOG_FORMAT = "%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s" DATE_FORMAT = "%Y-%m-%d %H:%M:%S" if args.log_to_file: log_file = os.path.realpath(__file__).replace(".py", ".log") logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=DATE_FORMAT, filename=log_file, filemode="a") else: logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=DATE_FORMAT) project_list = [args.project] project_name = args.project.replace(":", "_") if os.path.exists(args.project_list_file): with open(args.project_list_file, "r") as f: project_list = f.read().splitlines() project_name = "" exit_code = 0 # project_list = ["openEuler:Mainline"] # kw["result_xml"] = "openeuler_mainline_result_20220706.xml" # project_list = ["openEuler:Epol"] # kw["result_xml"] = "openeuler_epol_result_20220706.xml" all_failed_unresolvable_pkgs = [] for obs_project in project_list: project_build = ProjectBuildResult(obs_project, **kw) if not project_build.parse_build_result(): logging.error(f"project {obs_project} build result parse failed") exit_code = 1 else: failed_unresolvable_pkgs = project_build.get_failed_unresolvable_list() if failed_unresolvable_pkgs: logging.warning(f"{obs_project} failed unresolvable packages num: {len(failed_unresolvable_pkgs)}") logging.warning(f"{obs_project} failed unresolvable packages list: {failed_unresolvable_pkgs}") all_failed_unresolvable_pkgs.extend(failed_unresolvable_pkgs) if all_failed_unresolvable_pkgs: logging.warning(f"final failed unresolvable packages num: {len(all_failed_unresolvable_pkgs)}") logging.warning(f"final failed unresolvable packages list: {all_failed_unresolvable_pkgs}") if args.write_to_csv: if args.csv_path: csv_path = args.csv_path else: time = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S") csv_path = f"{project_name}_{time}.csv" write_to_csv(csv_path, all_failed_unresolvable_pkgs) logging.info(f"write to csv file: {csv_path} finish.") exit(exit_code)
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.