97 lines
3.6 KiB
Python
97 lines
3.6 KiB
Python
import os
|
|
import sys
|
|
import argparse
|
|
import requests
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from dotenv import load_dotenv
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
GITEA_SERVER = os.getenv("GITEA_SERVER")
|
|
API_TOKEN = os.getenv("API_TOKEN")
|
|
|
|
if not GITEA_SERVER or not API_TOKEN:
|
|
print("Error: GITEA_SERVER and API_TOKEN must be set in a .env file.")
|
|
sys.exit(1)
|
|
|
|
# Helper functions
|
|
def list_releases(owner, repo):
|
|
url = f"{GITEA_SERVER}/api/v1/repos/{owner}/{repo}/releases"
|
|
headers = {"Authorization": f"token {API_TOKEN}"}
|
|
|
|
response = requests.get(url, headers=headers)
|
|
|
|
if response.status_code == 200:
|
|
releases = response.json()
|
|
if not releases:
|
|
print("No releases found.")
|
|
else:
|
|
print("Releases:")
|
|
for release in releases:
|
|
print(f"ID: {release['id']}, Tag: {release['tag_name']}, Name: {release['name']}")
|
|
else:
|
|
print(f"Failed to list releases: {response.status_code} {response.text}")
|
|
|
|
def upload_file(owner, repo, release_id, file_path):
|
|
url = f"{GITEA_SERVER}/api/v1/repos/{owner}/{repo}/releases/{release_id}/assets"
|
|
headers = {"Authorization": f"token {API_TOKEN}"}
|
|
|
|
file_name = os.path.basename(file_path)
|
|
with open(file_path, 'rb') as file:
|
|
files = {"attachment": (file_name, file)}
|
|
params = {"name": file_name}
|
|
response = requests.post(url, headers=headers, files=files, params=params)
|
|
|
|
if response.status_code == 201:
|
|
print(f"Successfully uploaded {file_name}.")
|
|
else:
|
|
print(f"Failed to upload {file_name}: {response.status_code} {response.text}")
|
|
|
|
def upload_folder(owner, repo, release_id, folder_path="upload"):
|
|
if not os.path.isdir(folder_path):
|
|
print(f"Folder '{folder_path}' does not exist.")
|
|
return
|
|
|
|
files_to_upload = [os.path.join(folder_path, file_name) for file_name in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, file_name))]
|
|
|
|
if not files_to_upload:
|
|
print(f"No files found in folder '{folder_path}'.")
|
|
return
|
|
|
|
print(f"Uploading {len(files_to_upload)} files from '{folder_path}'...")
|
|
|
|
# Use ThreadPoolExecutor for parallel uploads
|
|
with ThreadPoolExecutor() as executor:
|
|
futures = [executor.submit(upload_file, owner, repo, release_id, file_path) for file_path in files_to_upload]
|
|
for future in futures:
|
|
future.result() # Wait for each upload to complete
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Gitea Release Management Script")
|
|
subparsers = parser.add_subparsers(dest="command", required=True)
|
|
|
|
# List releases
|
|
list_parser = subparsers.add_parser("list", help="List releases")
|
|
list_parser.add_argument("owner", help="Repository owner")
|
|
list_parser.add_argument("repo", help="Repository name")
|
|
|
|
# Upload file(s)
|
|
upload_parser = subparsers.add_parser("upload", help="Upload file(s) to a release")
|
|
upload_parser.add_argument("owner", help="Repository owner")
|
|
upload_parser.add_argument("repo", help="Repository name")
|
|
upload_parser.add_argument("id", help="Release ID")
|
|
upload_parser.add_argument("file", nargs="?", default=None, help="File path to upload (if not specified, upload from 'upload' folder)")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.command == "list":
|
|
list_releases(args.owner, args.repo)
|
|
elif args.command == "upload":
|
|
if args.file:
|
|
upload_file(args.owner, args.repo, args.id, args.file)
|
|
else:
|
|
upload_folder(args.owner, args.repo, args.id)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|