WIP: Project Tools: Remove Requests Module from studio scripts #279
@ -4,15 +4,15 @@ import os
|
||||
from pathlib import Path
|
||||
import shutil
|
||||
import hashlib
|
||||
import subprocess
|
||||
import tempfile
|
||||
import sys
|
||||
import requests
|
||||
from urllib.request import Request, urlopen
|
||||
from urllib.error import HTTPError
|
||||
from http.client import HTTPResponse
|
||||
import json
|
||||
from requests import Response
|
||||
|
||||
BASE_PATH = "https://projects.blender.org/api/v1"
|
||||
REPO_PATH = '/studio/blender-studio-pipeline'
|
||||
REPO_PATH = '/TinyNick/blender-studio-pipeline'
|
||||
RELEASE_PATH = BASE_PATH + f'/repos{REPO_PATH}/releases'
|
||||
TAG_PATH = BASE_PATH + f'/repos{REPO_PATH}/tags'
|
||||
API_TOKEN = None
|
||||
@ -46,7 +46,7 @@ def remove_existing_release_assets(release_id: int) -> None:
|
||||
None
|
||||
"""
|
||||
|
||||
all_assets = send_get_request(RELEASE_PATH + f"/{release_id}/assets").json()
|
||||
all_assets = send_get_request(RELEASE_PATH + f"/{release_id}/assets")
|
||||
for asset in all_assets:
|
||||
if asset["name"] == ZIP_NAME + ".zip" or asset["name"] == ZIP_NAME + ".zip.sha256":
|
||||
send_delete_request(RELEASE_PATH + f"/{release_id}/assets/{asset['id']}")
|
||||
@ -64,23 +64,29 @@ def upload_asset_to_release(release_id: int, file: str) -> None:
|
||||
None
|
||||
"""
|
||||
|
||||
file_name = Path(file.name).name
|
||||
payload = open(file, 'rb')
|
||||
file_content = [
|
||||
('attachment', (file_name, payload, 'application/zip')),
|
||||
]
|
||||
file_name = Path(file).name
|
||||
zip_file = open(file, 'rb')
|
||||
zip_data = zip_file.read()
|
||||
|
||||
url = f"{RELEASE_PATH}/{release_id}/assets?name={file_name}&token={API_TOKEN}"
|
||||
req = Request(url, data=zip_data)
|
||||
|
||||
print(f"Uploading '{file_name}'......", end="")
|
||||
response = requests.post(
|
||||
url=f"{RELEASE_PATH}/{release_id}/assets?name={file_name}&token={API_TOKEN}",
|
||||
files=file_content,
|
||||
)
|
||||
# TODO DEBUG WHY THIS RETURNS 500 ERROR
|
||||
with urlopen(req) as response:
|
||||
response_data = json.loads(response.read().decode("utf-8"))
|
||||
print(response, response_data)
|
||||
|
||||
response.raise_for_status()
|
||||
# response = requests.post(
|
||||
# url=f"{RELEASE_PATH}/{release_id}/assets?name={file_name}&token={API_TOKEN}",
|
||||
# files=file_content,
|
||||
# )
|
||||
# response.raise_for_status()
|
||||
|
||||
if not response.status_code == 201:
|
||||
print(f"Failed to upload.")
|
||||
else:
|
||||
print(f"Completed")
|
||||
# if not response.status_code == 201:
|
||||
# print(f"Failed to upload.")
|
||||
# else:
|
||||
# print(f"Completed")
|
||||
|
||||
|
||||
def get_release() -> dict:
|
||||
@ -94,7 +100,7 @@ def get_release() -> dict:
|
||||
"""
|
||||
|
||||
# Remove Previous Release so Release is always based on Current Commit
|
||||
for release in send_get_request(RELEASE_PATH).json():
|
||||
for release in send_get_request(RELEASE_PATH):
|
||||
if release["name"] == RELEASE_TITLE and release["tag_name"] == RELEASE_VERSION:
|
||||
send_delete_request(RELEASE_PATH + f"/{release['id']}")
|
||||
send_delete_request(TAG_PATH + f"/{release['tag_name']}")
|
||||
@ -114,7 +120,7 @@ def create_new_release() -> dict:
|
||||
"""
|
||||
# Create New Tag
|
||||
existing_tag = send_get_request(TAG_PATH + f'/{RELEASE_VERSION}')
|
||||
if existing_tag.status_code == 404:
|
||||
if not existing_tag:
|
||||
tag_content = {
|
||||
"message": RELEASE_DESCRIPTION,
|
||||
"tag_name": RELEASE_VERSION,
|
||||
@ -133,7 +139,7 @@ def create_new_release() -> dict:
|
||||
"target_commitish": "string", # will default to latest
|
||||
}
|
||||
|
||||
return send_post_request(RELEASE_PATH, release_content).json()
|
||||
return send_post_request(RELEASE_PATH, release_content)
|
||||
|
||||
|
||||
def get_api_token() -> None:
|
||||
@ -150,10 +156,10 @@ def get_api_token() -> None:
|
||||
sys.exit(1)
|
||||
API_TOKEN = open(api_token_file, 'r').read()
|
||||
# Don't use send_get_request() so we can print custom error message to user
|
||||
response = requests.get(url=f"{BASE_PATH}/settings/api?token={API_TOKEN}")
|
||||
if response.status_code != 200:
|
||||
response_data = urlopen(url=f"{BASE_PATH}/settings/api?token={API_TOKEN}")
|
||||
if response_data.status != 200:
|
||||
print("API Token is invalid")
|
||||
print(f"Error: {response.status_code}: '{response.reason}'")
|
||||
print(f"Error: {response_data.status}: '{response_data.msg}'")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
@ -217,36 +223,32 @@ def generate_checksum(archive_path: Path) -> str:
|
||||
return digest.hexdigest()
|
||||
|
||||
|
||||
def send_delete_request(url) -> Response:
|
||||
response = requests.delete(url=f"{url}?token={API_TOKEN}")
|
||||
if response.status_code != 204:
|
||||
print(f"Error: {response.status_code}: '{response.reason}'")
|
||||
def send_delete_request(url) -> HTTPResponse:
|
||||
response = urlopen(Request(url=f"{url}?token={API_TOKEN}", method="DELETE"))
|
||||
if response.status != 204:
|
||||
print(f"Error: {response.status}: '{response.msg}'")
|
||||
sys.exit(1)
|
||||
return response
|
||||
|
||||
|
||||
def send_get_request(url: str) -> Response:
|
||||
response = requests.get(url=f"{url}?token={API_TOKEN}")
|
||||
if not (response.status_code == 200 or response.status_code == 404):
|
||||
print(f"Error: {response.status_code}: '{response.reason}'")
|
||||
def send_get_request(url: str):
|
||||
try:
|
||||
response = urlopen(f"{url}?token={API_TOKEN}")
|
||||
except HTTPError as e:
|
||||
if e.code == 404:
|
||||
return
|
||||
print(f"Error: {e.code}: '{e.reason}'")
|
||||
sys.exit(1)
|
||||
return response
|
||||
return json.loads(response.read().decode("utf-8"))
|
||||
|
||||
|
||||
def send_post_request(url: str, data: dict) -> Response:
|
||||
header_cont = {
|
||||
'Content-type': 'application/json',
|
||||
}
|
||||
response = requests.post(
|
||||
url=f"{url}?token={API_TOKEN}",
|
||||
headers=header_cont,
|
||||
data=json.dumps(data),
|
||||
)
|
||||
response_json = response.json()
|
||||
if response.status_code != 201:
|
||||
print(response_json["message"])
|
||||
sys.exit(1)
|
||||
return response
|
||||
def send_post_request(url: str, data: dict):
|
||||
data = json.dumps(data)
|
||||
req = Request(url=f"{url}?token={API_TOKEN}", data=bytes(data.encode("utf-8")), method="POST")
|
||||
req.add_header("Content-type", "application/json; charset=UTF-8")
|
||||
resp = urlopen(req)
|
||||
response_data = json.loads(resp.read().decode("utf-8"))
|
||||
return response_data
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
Loading…
Reference in New Issue
Block a user