File build_result.py of Package project_build_result

#/bin/env python3
# -*- encoding=utf8 -*-
#******************************************************************************
# Copyright (c) Huawei Technologies Co., Ltd. 2020-2020. All rights reserved.
# licensed under the Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
#     http://license.coscl.org.cn/MulanPSL2
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
# PURPOSE.
# See the Mulan PSL v2 for more details.
# ******************************************************************************


import os
import csv
import datetime
import logging
import argparse
import subprocess
import xml.etree.ElementTree as ET
from concurrent.futures import ThreadPoolExecutor


def run(cmd, timeout=600, print_out=False):
    """run shell cmd"""
    ret = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8",
                         timeout=timeout)
    logging.info("cmd: {}".format(cmd))
    if ret.stdout and print_out:
        logging.info("ret.stdout: {}".format(ret.stdout))
    if ret.stderr:
        logging.warning("ret.stderr: {}".format(ret.stderr))
    return ret.returncode, ret.stdout, ret.stderr


def write_to_csv(file_path, build_result_data):
    """write build result data to csv"""
    if not build_result_data:
        logging.warning(f"{file_path} has nothing to write")
        return

    with open(file_path, mode='w', newline="\n") as csv_file:
        fieldnames = list(build_result_data[0].keys())
        writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
        writer.writeheader()
        for build_result in build_result_data:
            writer.writerow(build_result)


class ProjectBuildResult:
    """project build result"""

    def __init__(self, project, **kwargs):
        """param init"""
        self.kwargs = kwargs
        self.project = project
        self.result_xml = self.kwargs.get("result_xml", "")
        self.affected_package = self.kwargs.get("affected_package", 0)
        self.failed_log_ip_port = self.kwargs.get("failed_log_ip_port", "")
        self.published_arch_list = []   # published repository list   eg: [aarch64, x86_64]
        self.build_result_list = []

    def parse_build_result(self):
        """parse build result, update project status and packages status"""
        try:
            if self.result_xml and os.path.exists(self.result_xml):
                xml_tree = ET.parse(self.result_xml)
                root = xml_tree.getroot()
            else:
                result_time = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
                project_name = self.project.replace(":", "_")
                xml_name = f"{project_name}_{result_time}.xml"
                cmd = f"osc results {self.project} --xml > {xml_name}"
                run(cmd)
                cmd = f"osc results {self.project} --xml"
                _, out, _ = run(cmd)
                root = ET.fromstring(out)
        except Exception as e:
            logging.error(f"{self.project} parse Exception: {e}")
            return False

        for repo_result in root:
            arch = repo_result.attrib["arch"]
            build_status = repo_result.attrib["code"]     # "published"

            # if published, append self.published_repo_list
            if build_status == "published":
                self.published_arch_list.append(arch)

            # update package build result
            for pkg_result in repo_result:
                pkg_result_dict = {}
                package = pkg_result.attrib["package"]
                build_result = pkg_result.attrib["code"]
                details = ""
                if build_result == "failed":
                    details = "please analyse the log"
                if build_result == "unresolvable":
                    for ele in pkg_result:
                        details = ele.text

                pkg_result_dict["project"] = self.project
                pkg_result_dict["package"] = package
                pkg_result_dict["arch"] = arch
                pkg_result_dict["build_result"] = build_result
                pkg_result_dict["details"] = details
                self.build_result_list.append(pkg_result_dict)
        return True

    def check_project_is_published(self):
        """check project repository is published or not"""
        try:
            if not self.parse_build_result():
                return False
        except Exception as e:
            logging.error(f"parse {self.project} build result Exception: {e}")
            return False
        if self.published_repo_list:
            return True
        return False

    def get_affected_package(self, package):
        """get package affected package"""
        # cmd = f"pkgship {package}"
        # _, code, err = run(cmd)
        return ""

    def get_failed_unresolvable_list(self):
        """get failed and unresolvable package list"""
        failed_unresolvable_list = []
        for pkg_build_result in self.build_result_list:
            if (pkg_build_result["build_result"] == "failed") or (pkg_build_result["build_result"] == "unresolvable"):
                if self.affected_package:
                    affected_package = self.get_affected_package(pkg_build_result["package"])
                    pkg_build_result["affected_package"] = affected_package
                if self.failed_log_ip_port:
                    failed_log_link = "NA"
                    if pkg_build_result["build_result"] == "failed":
                        failed_log_link = f"http://{self.failed_log_ip_port}/obs_build_log/{self.project}" \
                                          f"/{pkg_build_result['arch']}/{pkg_build_result['package']}/"
                    pkg_build_result["failed_log_link"] = failed_log_link

                failed_unresolvable_list.append(pkg_build_result)

        return failed_unresolvable_list

    def check_use_for_build(self):
        """check project use for build flag everyday"""
        # 1. parse build result
        try:
            self.parse_build_result()
        except Exception as e:
            logging.error(f"parse {self.project} build result Exception: {e}")
            return False

        with ThreadPoolExecutor(50) as executor:
            # 1. check use for build flag for succeeded package: open
            for repo, succeeded_pkg_list in self.succeeded_pkg_dict.items():
                for pkg in succeeded_pkg_list:
                    executor.submit(lambda p: self.pkg_check_use_for_build(*p), (repo, pkg, "open"))

            # 1. check use for build flag for failed package: close
            for repo, failed_pkg_list in self.failed_pkg_dict.items():
                for pkg in failed_pkg_list:
                    executor.submit(lambda p: self.pkg_check_use_for_build(*p), (repo, pkg, "close"))

        if self.check_flag_not_correct_list:
            logging.error(f"{self.project} use for build flag not correct list: {self.check_flag_not_correct_list}")
            return False
        else:
            logging.info(f"{self.project} use for build flag check no problem!")
        logging.info(f"{self.project} use for build flag check finish!")
        return True


if __name__ == "__main__":
    par = argparse.ArgumentParser()
    par.add_argument("-p", "--project", default="", help="obs project", required=False)
    par.add_argument("-pf", "--project_list_file", default="", help="obs project list file", required=False)
    par.add_argument("-r", "--result_xml", default="", help="obs project build result xml", required=False)
    par.add_argument("-w", "--write_to_csv", default=1, help="obs project build result xml", required=False)
    par.add_argument("-c", "--csv_path", default="", help="obs project build result write to csv path", required=False)
    par.add_argument("-a", "--affected_package", default=0,
                     help="check the faile or unresolvable package affected package ", required=False)
    par.add_argument("-l", "--failed_log_ip_port", default="",
                     help="failed log link ip:port, eg: xx.xx.xx.xx:82", required=False)
    par.add_argument("-lf", "--log_to_file", default=0, help="print log to file", required=False)
    args = par.parse_args()

    kw = {
        "result_xml": args.result_xml,
        "affected_package": args.affected_package,
        "failed_log_ip_port": args.failed_log_ip_port,
    }

    LOG_FORMAT = "%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s"
    DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
    if args.log_to_file:
        log_file = os.path.realpath(__file__).replace(".py", ".log")
        logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=DATE_FORMAT, filename=log_file,
                            filemode="a")
    else:
        logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=DATE_FORMAT)

    project_list = [args.project]
    project_name = args.project.replace(":", "_")
    if os.path.exists(args.project_list_file):
        with open(args.project_list_file, "r") as f:
            project_list = f.read().splitlines()
        project_name = ""

    exit_code = 0
    # project_list = ["openEuler:Mainline"]
    # kw["result_xml"] = "openeuler_mainline_result_20220706.xml"
    # project_list = ["openEuler:Epol"]
    # kw["result_xml"] = "openeuler_epol_result_20220706.xml"
    all_failed_unresolvable_pkgs = []
    for obs_project in project_list:
        project_build = ProjectBuildResult(obs_project, **kw)
        if not project_build.parse_build_result():
            logging.error(f"project {obs_project} build result parse failed")
            exit_code = 1
        else:
            failed_unresolvable_pkgs = project_build.get_failed_unresolvable_list()
            if failed_unresolvable_pkgs:
                logging.warning(f"{obs_project} failed unresolvable packages num: {len(failed_unresolvable_pkgs)}")
                logging.warning(f"{obs_project} failed unresolvable packages list: {failed_unresolvable_pkgs}")
                all_failed_unresolvable_pkgs.extend(failed_unresolvable_pkgs)

    if all_failed_unresolvable_pkgs:
        logging.warning(f"final failed unresolvable packages num: {len(all_failed_unresolvable_pkgs)}")
        logging.warning(f"final failed unresolvable packages list: {all_failed_unresolvable_pkgs}")
        if args.write_to_csv:
            if args.csv_path:
                csv_path = args.csv_path
            else:
                time = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
                csv_path = f"{project_name}_{time}.csv"
            write_to_csv(csv_path, all_failed_unresolvable_pkgs)
            logging.info(f"write to csv file: {csv_path} finish.")

    exit(exit_code)