Pipeline Release: Use Add-On Bundle to Update Add-Ons #269

Merged
2 changed files with 215 additions and 592 deletions
Showing only changes of commit e032c7a093 - Show all commits

View File

@ -1,58 +0,0 @@
#!/usr/bin/env python3
import os
from pathlib import Path
import shutil
import hashlib
import subprocess
import tempfile
import sys
def main():
name = "blender_studio_pipeline_release"
zipped_release, checksum_file = get_pipeline_release(name)
# TODO Upload zipped release to blender studio pipeline website
def get_pipeline_release(name: str):
temp_dir = Path(tempfile.mkdtemp(prefix=name + "_"))
output_dir = Path(temp_dir).joinpath(name)
output_dir.mkdir()
cmd_list = ("./package_local.py", str(output_dir))
process = subprocess.Popen(cmd_list, shell=False)
if process.wait() != 0:
print("Add-On Package Locally Failed!")
sys.exit(0)
zipped_release = shutil.make_archive(
temp_dir.joinpath(name),
'zip',
temp_dir,
name,
)
checksum = generate_checksum(zipped_release)
chechsum_name = name + ".zip.sha256"
checksum_path = temp_dir / chechsum_name
checksum_file = write_file(
checksum_path,
f"{checksum} {name}.zip",
)
return zipped_release, checksum_file
def write_file(file_path, content):
file = open(file_path, 'w')
file.writelines(content)
file.close()
def generate_checksum(archive_path):
with open(archive_path, 'rb') as file:
digest = hashlib.file_digest(file, "sha256")
return digest.hexdigest()
if __name__ == "__main__":
main()

741
scripts/pipeline-release/pipeline_release.py Normal file → Executable file
View File

@ -1,582 +1,263 @@
# ***** BEGIN GPL LICENSE BLOCK *****
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
# ***** END GPL LICENCE BLOCK *****
#
# (c) 2021, Blender Foundation
#!/usr/bin/env python3
import zipfile
import hashlib
import sys
import os
import subprocess
from pathlib import Path
from typing import List
import shutil
import argparse
import re
from typing import Pattern
import datetime
import hashlib
import subprocess
import tempfile
import sys
import requests
import json
from requests import Response
REPO_ROOT_DIR = Path(__file__).parent.parent.parent
# BORROWED FROM https://github.com/pawamoy/git-changelog/blob/master/src/git_changelog/commit.py
TYPES: dict[str, str] = {
"add": "Added",
"fix": "Fixed",
"change": "Changed",
"remove": "Removed",
"merge": "Merged",
"doc": "Documented",
"breaking": "Breaking",
}
BASE_PATH = "https://projects.blender.org/api/v1"
REPO_PATH = '/studio/blender-studio-pipeline'
RELEASE_PATH = BASE_PATH + f'/repos{REPO_PATH}/releases'
TAG_PATH = BASE_PATH + f'/repos{REPO_PATH}/tags'
API_TOKEN = None
# GITEA LOGIN SETTINGS
api_token_file = Path(__file__).parent.joinpath("api_token.env")
if not api_token_file.exists():
print("API Token File not Found")
api_token = open(api_token_file, 'r').read()
base_url = 'https://projects.blender.org'
api_path = f"{base_url}/api/v1"
repo_path = '/studio/blender-studio-pipeline'
release_path = f'/repos{repo_path}/releases'
tag_path = f'/repos{repo_path}/tags'
RELEASE_TITLE = "Blender Studio Add-Ons Latest"
RELEASE_VERSION = "latest"
RELEASE_DESCRIPTION = "Latest Release of Blender Studio Pipeline Add-Ons"
ZIP_NAME = "blender_studio_add-ons_latest"
def parse_commit(commit_message: str) -> dict[str, str]:
"""
Parse the type of the commit given its subject.
Arguments:
commit_subject: The commit message subject.
def main():
get_api_token()
latest_release = get_latest_release()
release_files = create_latest_addons_zip(ZIP_NAME)
remove_existing_release_assets(latest_release["id"])
for file in release_files:
upload_asset_to_release(latest_release["id"], file)
print("Pipeline Release Successfully Updated")
def remove_existing_release_assets(release_id: int) -> None:
"""Removes all existing release assets for the given release ID.
Args:
release_id (int): The ID of the release to remove assets from.
Returns:
Dict containing commit message and type
None
"""
type = ""
# Split at first colon to remove prefix from commit
if ": " in commit_message:
message_body = commit_message.split(': ')[1]
all_assets = send_get_request(RELEASE_PATH + f"/{release_id}/assets").json()
for asset in all_assets:
if asset["name"] == ZIP_NAME + ".zip" or asset["name"] == ZIP_NAME + ".zip.sha256":
response = requests.delete(
RELEASE_PATH + f"/{release_id}/assets/{asset['id']}?token={API_TOKEN}"
)
if response.status_code != 204:
print(f"Failed to delete {asset['name']}")
else:
message_body = commit_message
type_regex: Pattern = re.compile(r"^(?P<type>(%s))" % "|".join(TYPES.keys()), re.I)
breaking_regex: Pattern = re.compile(
r"^break(s|ing changes?)?[ :].+$",
re.I | re.MULTILINE,
print(f"Deleted {asset['name']} created on: {asset['created_at']}")
def upload_asset_to_release(release_id: int, file: str) -> None:
"""Uploads an asset to the specified release.
Args:
release_id (int): The id of the release to upload to.
file (str): The path to the file to upload.
Returns:
None
"""
file_name = Path(file.name).name
payload = open(file, 'rb')
file_content = [
('attachment', (file_name, payload, 'application/zip')),
]
print(f"Uploading '{file_name}'......", end="")
response = requests.post(
url=f"{RELEASE_PATH}/{release_id}/assets?name={file_name}&token={API_TOKEN}",
files=file_content,
)
type_match = type_regex.match(message_body)
if type_match:
type = TYPES.get(type_match.groupdict()["type"].lower(), "")
if bool(breaking_regex.search(message_body)):
type = "Breaking"
return {
"message": message_body,
"type": type,
response.raise_for_status()
if not response.status_code == 201:
print(f"Failed to upload.")
else:
print(f"Completed")
def get_latest_release() -> dict:
"""Get the latest release matching the release title and version.
Checks if the latest release matches the expected title and version. If not,
loops through all releases to find the latest matching one. If none found,
creates a new release.
Returns:
dict: The release object for the latest matching release.
"""
latest_release = send_get_request(RELEASE_PATH + "/latest").json()
if latest_release["name"] == RELEASE_TITLE and latest_release["tag_name"] == RELEASE_VERSION:
return latest_release
all_releases = send_get_request(RELEASE_PATH)
for release in all_releases.json(): # List is sorted by latest first
if release["name"] == RELEASE_TITLE and release["tag_name"] == RELEASE_VERSION:
return release
return create_new_release()
def create_new_release() -> dict:
"""Create a new release on Gitea with the given title, version and description.
Makes a POST request to the Gitea API to create a new release with the specified
parameters. Checks if a tag with the same version already exists first. If not,
creates the tag before creating the release.
Returns:
dict: The release object for the latest matching release.
"""
# Create New Tag
existing_tag = send_get_request(TAG_PATH + f'/{RELEASE_VERSION}')
if existing_tag.status_code == 404:
tag_content = {
"message": RELEASE_DESCRIPTION,
"tag_name": RELEASE_VERSION,
"target": f"main",
}
send_post_request(TAG_PATH, tag_content)
parser = argparse.ArgumentParser()
parser.add_argument(
"-c",
"--commit",
help="Find commit with this message and use it as the last version.",
type=str,
)
parser.add_argument(
"-n",
"--name",
help="Only update the addon corrisponding to this name(s).",
type=str,
)
# Create New Release
release_content = {
"body": RELEASE_DESCRIPTION,
"draft": False,
"name": RELEASE_TITLE,
"prerelease": False,
"tag_name": RELEASE_VERSION,
"target_commitish": "string", # will default to latest
}
parser.add_argument(
"-o",
"--output",
help="Provide a string for the output path of generated zips",
type=str,
)
parser.add_argument(
"-m",
"--major",
help="Bump the major version number, otherwise bump minor version number",
action="store_true",
)
parser.add_argument(
"-t",
"--test",
help="Test release system by only running locally and skip committing/uploading to release",
action="store_true",
)
parser.add_argument(
"-r",
"--reuse_lastest_release",
help="Add new packages to the lastest avaliable release",
action="store_true",
)
parser.add_argument(
"-f",
"--force",
help="Bump version even if no commits are found",
action="store_true",
)
return send_post_request(RELEASE_PATH, release_content).json()
def cli_command(command: str) -> subprocess:
"""Run command in CLI and capture it's output
Arguments:
command: String of command to run in CLI.
def get_api_token() -> None:
"""Get API token from environment file.
Reads the API token from the api_token.env file and assigns it to the global
API_TOKEN variable. Exits with error if file not found. Exists if API token is invalid.
"""
output = subprocess.run(command.split(' '), capture_output=True, encoding="utf-8")
return output
global API_TOKEN
api_token_file = Path(__file__).parent.joinpath("api_token.env")
if not api_token_file.exists():
print("API Token File not Found")
sys.exit(1)
API_TOKEN = open(api_token_file, 'r').read()
# Don't use send_get_request() so we can print custom error message to user
response = requests.get(url=f"{BASE_PATH}/settings/api?token={API_TOKEN}")
if response.status_code != 200:
print("API Token is invalid")
print(f"Error: {response.status_code}: '{response.reason}'")
sys.exit(1)
def exit_program(message: str):
print(message)
def create_latest_addons_zip(name: str):
"""Generate a pipeline release.
Args:
name (str): The name of the release.
Returns:
list: A list containing the path to the zipped release and checksum file.
"""
temp_dir = Path(tempfile.mkdtemp(prefix=name + "_"))
output_dir = Path(temp_dir).joinpath(name)
output_dir.mkdir()
pipeline_release_dir = Path(__file__).parent
# Incase script has been invoked from different directory
if os.getcwd() != str(pipeline_release_dir):
os.chdir(pipeline_release_dir)
cmd_list = ("./package_local.py", str(output_dir))
process = subprocess.Popen(cmd_list, shell=False)
if process.wait() != 0:
print("Add-On Package Locally Failed")
sys.exit(0)
zipped_release = shutil.make_archive(
temp_dir.joinpath(name),
'zip',
temp_dir,
name,
)
checksum = generate_checksum(zipped_release)
chechsum_name = name + ".zip.sha256"
checksum_path = temp_dir / chechsum_name
write_file(
checksum_path,
f"{checksum} {name}.zip",
)
return [Path(zipped_release), Path(checksum_path)]
def write_file(file_path: Path, content: str) -> None:
"""Write content to file at given file path.
Args:
file_path (Path): Path to file to write to.
content (str): Content to write to file.
Returns:
None
"""
def write_file(file_path: Path, content):
file = open(file_path, 'w')
file.writelines(content)
file.close()
def replace_line(file_path: Path, new_line: str, line_number: int):
file = open(
file_path,
)
lines = file.readlines()
lines[line_number] = new_line
out = open(file_path, 'w')
out.writelines(lines)
out.close()
def generate_checksum(archive_path: Path) -> str:
"""Generate checksum for archive file.
Args:
archive_path (Path): Path to archive file to generate checksum for.
def get_directory(repo_root: Path, folder_name: str) -> Path:
"""Returns directory PATH, creates one if none exists"""
path = repo_root.joinpath(folder_name)
if not os.path.exists(path):
os.makedirs(path)
return path
def clean_str(string: str) -> str:
"""Returns string with qoutes and line breaks removed"""
return string.replace('\n', '').replace("'", "").replace('"', '')
def generate_checksum(archive_path: str) -> str:
"""
Generate a checksum for a zip file
Arguments:
archive_path: String of the archive's file path
Returns:
sha256 checksum for the provided archive as string
str: Hex digest string of checksum.
"""
sha256 = hashlib.sha256()
with open(archive_path, 'rb') as file:
# Read the file in chunks to handle large files efficiently
chunk = file.read(4096)
while len(chunk) > 0:
sha256.update(chunk)
chunk = file.read(4096)
return sha256.hexdigest()
digest = hashlib.file_digest(file, "sha256")
return digest.hexdigest()
def changelog_category_get(changelog_messages: dict[str, str], title: str, key: str):
"""
Generate changelog messages for a specific category.
Types are defined in global variable 'TYPES'
Arguments:
changelog_messages: dict contaning commit message & type
title: Title of the changelog category
key: Key for category/type as defined in global variable TYPES
Returns:
changelog entry for the given category/type as a string
"""
entry = ''
if not any(commit for commit in changelog_messages if commit["type"] == key):
return entry
entry += f"### {title} \n"
for commit in changelog_messages:
if commit["type"] == key:
entry += f'- {commit["message"]}'
entry += "\n"
return entry
def send_get_request(url: str) -> Response:
response = requests.get(url=f"{url}?token={API_TOKEN}")
if not (response.status_code == 200 or response.status_code == 404):
print(f"Error: {response.status_code}: '{response.reason}'")
sys.exit(1)
return response
def changelog_generate(commit_hashes: list[str], version: str) -> str:
"""
Generate Changelog Entries from a list of commits hashes
Arguments:
commit_hashes: A list of commit hashes to include in Changelog
version: Latest addon version number
Returns:
complete changelog for latest version as string
"""
log_entry = f'## {version} - {datetime.date.today()} \n \n'
changelog_messages = []
if commit_hashes is not None:
for commit in commit_hashes:
message = (
f"{cli_command(f'git log --pretty=format:%s -n 1 {commit}').stdout}\n"
)
changelog_messages.append(parse_commit(message))
for type in TYPES:
log_entry += changelog_category_get(
changelog_messages, TYPES.get(type).upper(), TYPES.get(type)
)
log_entry += "### UN-CATEGORIZED \n"
for commit in changelog_messages:
if commit["message"] not in log_entry:
log_entry += f"- {commit['message']}"
log_entry += "\n"
return log_entry
def changelog_commits_get(directory: Path, commit_message: str) -> list[str]:
"""
Get list of commit hashes, that affect a given directory
Arguments:
directory: Name of directory/folder to filter commits
commit_message: Prefix of commit to use as base for latest release
Returns:
list of commit hashes
"""
last_version_commit = None
commits_in_folder = cli_command(
f'git log --format=format:"%H" {directory}/*'
).stdout.split('\n')
# Find Last Version
for commit in commits_in_folder:
commit = clean_str(commit)
commit_msg = cli_command(f'git log --format=%B -n 1 {commit}')
if commit_message in commit_msg.stdout:
last_version_commit = commit
if last_version_commit is None:
return
commits_since_release = cli_command(
f'git rev-list {clean_str(last_version_commit)[0:9]}..HEAD'
).stdout.split('\n')
commit_hashes = []
for commit in commits_in_folder:
if any(clean_str(commit) in x for x in commits_since_release):
commit_hashes.append(clean_str(commit))
return commit_hashes
def changelog_file_write(file_path: Path, content: str):
"""
Append changelog to existing changelog file or create a new
changelog file if none exists
Arguments:
file_path: PATH to changelog
content: changelog for latest version as string
"""
if file_path.exists():
dummy_file = str(file_path._str) + '.bak'
with open(file_path, 'r') as read_obj, open(dummy_file, 'w') as write_obj:
write_obj.write(content)
for line in read_obj:
write_obj.write(line)
os.remove(file_path)
os.rename(dummy_file, file_path)
else:
write_file(file_path, content)
return file_path
def update_release_table(addon_dir: Path, version: str, release_version: str):
directory = Path(__file__).parent
template_file = directory.joinpath("overview.md.template")
table_file = directory.joinpath("overview.md")
with open(template_file, 'r') as readme_template:
for num, line in enumerate(readme_template):
if addon_dir.name in line:
line_to_replace = num
break # Use first line found
line = line.replace("<VERSION>", f"{version}")
line = line.replace(
"<ZIP_URL>",
f"{base_url}{repo_path}/releases/download/{release_version}/{addon_dir.name}-{version}.zip",
)
new_line = line.replace(
"<CHECKSUM_URL>",
f"{base_url}{repo_path}/releases/download/{release_version}/{addon_dir.name}-{version}.sha256",
)
replace_line(table_file, new_line, line_to_replace)
return table_file
def addon_package(
directory: Path,
commit_prefix: str,
is_major=False,
force=False,
test=False,
output_path=None,
to_upload=[],
release_version="",
):
"""
For a give directory, if new commits are found after the commit matching 'commit_prefix',
bump addon version, generate a changelog, commit changes and package addon into an archive.
Print statements indicate if addon was version bumped, or if new version was found.
Arguments:
directory: Name of directory/folder to filter commits
commit_prefix: Prefix of commit to use as base for latest release
is_major: if major 2nd digit in version is updated, else 3rd digit
"""
commit_msg = 'Version Bump:' if commit_prefix is None else commit_prefix
commits_in_folder = changelog_commits_get(directory, commit_msg)
dist_dir = get_directory(REPO_ROOT_DIR, "dist")
if commits_in_folder or force:
init_file, version = addon_version_bump(directory, is_major)
change_log = changelog_generate(commits_in_folder, version)
table_file = update_release_table(directory, version, release_version)
change_log_file = changelog_file_write(
directory.joinpath("CHANGELOG.md"), change_log
)
if not test:
cli_command(f'git reset')
cli_command(f'git stage {change_log_file}')
cli_command(f'git stage {init_file}')
cli_command(f'git stage {table_file}')
subprocess.run(
['git', 'commit', '-m', f"Version Bump: {directory.name} {version}"],
capture_output=True,
encoding="utf-8",
)
print(f"Version Bump: {directory.name} {version}")
name = directory.name
if output_path is None:
addon_output_dir = get_directory(dist_dir, directory.name)
else:
addon_output_dir = get_directory(Path(output_path), directory.name)
zipped_addon = shutil.make_archive(
addon_output_dir.joinpath(f"{name}-{version}"),
'zip',
directory.parent,
directory.name,
)
checksum = generate_checksum(zipped_addon)
checksum_path = addon_output_dir.joinpath(f"{name}-{version}.sha256")
checksum_file = write_file(
checksum_path,
f"{checksum} {name}-{version}.zip",
)
to_upload.append(zipped_addon)
to_upload.append(checksum_path._str)
else:
print(f"No New Version: {directory.name}")
def addon_version_set(version_line: str, is_major: bool) -> str:
"""
Read bl_info within addon's __init__.py file to get new version number
Arguments:
version_line: Line of bl_info containing version number
is_major: if major 2nd digit in version is updated, else 3rd digit
Returns
Latest addon version number
"""
version = version_line.split('(')[1].split(')')[0]
# Bump either last digit for minor versions and second last digit for major
if is_major:
new_version = version[:-4] + str(int(version[3]) + 1) + version[-3:]
else:
new_version = version[:-1] + str(int(version[-1]) + 1)
return new_version
def addon_version_bump(directory: Path, is_major: bool):
"""
Update bl_info within addon's __init__.py file to indicate
version bump. Expects line to read as '"version": (n, n, n),\n'
Arguments:
directory: Name of directory/folder containing addon
is_major: if major 2nd digit in version is updated, else 3rd digit
Returns:
init_file: PATH to init file that has been updated with new version
version: Latest addon version number
"""
version_line = None
str_find = "version"
init_file = directory.joinpath("__init__.py")
with open(init_file, 'r') as myFile:
for num, line in enumerate(myFile):
if str_find in line and "(" in line and line[0] != "#":
version_line = num
break # Use first line found
file = open(
init_file,
)
lines = file.readlines()
version = addon_version_set(lines[version_line], is_major)
repl_str = f' "version": ({version}),\n'
replace_line(init_file, repl_str, version_line)
return init_file, version.replace(', ', '.').replace(',', '.')
### GITEA UPLOAD RELEASE
import requests # TODO ADD PRINT STATEMENT IF UNABLE TO IMPORT
import json
"""
API token must be created under user>settings>application
- Use browser to 'INSPECT' the Generate Token button
- Find the property 'GTHidden Display' and remove the element of 'None' to nothing
- Then Set the correct scope for the key using the new dropdown menu before creating tag
"""
def upload_file_to_release(url, api_token, release_id, file):
file_name = Path(file.name).name
file_content = [
('attachment', (file_name, file, 'application/zip')),
]
response = requests.post(
url=f"{url}/{release_id}/assets?name={file_name}&token={api_token}",
files=file_content,
)
if not response.status_code == 201:
print(f"{file_name} failed to upload")
else:
print(f"Uploaded {file_name}")
def send_post_request(url, api_token, data):
def send_post_request(url: str, data: dict) -> Response:
header_cont = {
'Content-type': 'application/json',
}
response = requests.post(
url=f"{url}?token={api_token}",
url=f"{url}?token={API_TOKEN}",
headers=header_cont,
data=json.dumps(data),
)
response_json = response.json()
if response.status_code != 201:
print(response_json["message"])
return response_json
def create_new_release(tag_url, base_release_url, release_version, api_token):
release_description = "Latest Release of Blender Studio Pipeline"
# Create New Tag
tag_content = {
"message": f"{release_description}",
"tag_name": f"{release_version}",
"target": f"main",
}
send_post_request(tag_url, api_token, tag_content)
# Create New Release
release_content = {
"body": f"{release_description}",
"draft": False,
"name": f"Pipeline Release {release_version}",
"prerelease": False,
"tag_name": f"{release_version}",
"target_commitish": "string", # will default to latest
}
return send_post_request(base_release_url, api_token, release_content)
def main() -> int:
args = parser.parse_args()
commit = args.commit
major = args.major
test = args.test
user_names = args.name
output_path = args.output
force = args.force
reuse_latest_relase = args.reuse_lastest_release
addon_folder = REPO_ROOT_DIR.joinpath(REPO_ROOT_DIR, "scripts-blender/addons")
addon_to_upload = []
base_release_url = f"{api_path}{release_path}"
base_tag_url = f"{api_path}{tag_path}"
latest_release = requests.get(url=f"{base_release_url}/latest?token={api_token}")
# Exception for intial release
if latest_release.status_code == 404:
release_version = '0.0.1'
else:
latest_tag = latest_release.json()["tag_name"]
release_version = latest_tag.replace(
latest_tag[-1], str(int(latest_tag[-1]) + 1)
)
addon_dirs = [
name
for name in os.listdir(addon_folder)
if os.path.isdir(addon_folder.joinpath(name))
]
if user_names:
addon_dirs = [
name
for name in os.listdir(addon_folder)
if os.path.isdir(addon_folder.joinpath(name)) and name in user_names
]
for dir in addon_dirs:
addon_to_package = addon_folder.joinpath(addon_folder, dir)
addon_package(
addon_to_package,
commit,
major,
force,
test,
output_path,
addon_to_upload,
release_version,
)
if not test:
# Release Script
if reuse_latest_relase:
release_id = latest_release.json()["id"]
else:
response = create_new_release(
base_tag_url, base_release_url, release_version, api_token
)
release_id = response["id"]
for file in addon_to_upload:
payload = open(file, 'rb')
upload_file_to_release(
base_release_url,
api_token,
release_id,
payload,
)
return 0
sys.exit(1)
return response
if __name__ == "__main__":