complete refactor to use Gitea class api wrapper

This commit is contained in:
2025-07-13 23:43:32 -05:00
parent 50a01da1b1
commit 2717cca262
9 changed files with 325 additions and 259 deletions

View File

@@ -1,5 +1,7 @@
MIT License MIT License
Copyright (c) 2017 Madhurendra Sachan
Copyright (c) 2025 alexlebens Copyright (c) 2025 alexlebens
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

View File

@@ -1,3 +1,5 @@
# workflow-scripts # workflow-scripts
Scripts to run in Gitea Actions workflows. Scripts to run in Gitea Actions workflows.
## Gitea API wrapper derived from https://github.com/Langenfeld/py-gitea

0
__init__.py Normal file
View File

6
lib/exceptions.py Normal file
View File

@@ -0,0 +1,6 @@
class AlreadyExistsException(Exception):
pass
class NotFoundException(Exception):
pass

160
lib/gitea.py Normal file
View File

@@ -0,0 +1,160 @@
import logging
import json
import requests
from immutabledict import immutabledict
from .exceptions import NotFoundException, ConflictException, AlreadyExistsException
class Gitea:
"""Object to establish a session with Gitea"""
def __init__(
self,
instance_url: str,
token: str,
log_level: str,
):
"""Initializing Gitea-instance
Args:
instance_url (str): The Gitea instance URL.
token (str): The access token.
log_level (str): The log level.
"""
self.logger = logging.getLogger(__name__)
self.logger.setLevel(log_level)
self.headers = {
"Authorization": f"token {token}",
"Accept": "application/json",
"Content-Type": "application/json",
}
self.url = instance_url
self.requests = requests.Session()
def __get_url(self, endpoint):
url = self.url + "/api/v1" + endpoint
self.logger.debug(">> Gitea URL: %s" % url)
return url
def _requests_get(self, endpoint: str, params=immutabledict()) -> requests.Response:
request = self.requests.get(
self.__get_url(endpoint), headers=self.headers, params=params
)
if request.status_code not in [200, 201]:
message = f">> Received status code: {request.status_code} ({request.url})"
if request.status_code in [404]:
raise NotFoundException(message)
if request.status_code in [403]:
raise Exception(
f">> Unauthorized: {request.url} - Check your permissions and try again! ({message})"
)
if request.status_code in [409]:
raise ConflictException(message)
raise Exception(message)
return request
def requests_get(self, endpoint: str, params=immutabledict()) -> dict:
request = self._requests_get(endpoint, params)
return self.parse_result(request)
def requests_put(self, endpoint: str, data: dict = None):
if not data:
data = {}
request = self.requests.put(
self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
)
if request.status_code not in [200, 204]:
message = f">> Received status code: {request.status_code} ({request.url}) {request.text}"
self.logger.error(message)
raise Exception(message)
def requests_post(self, endpoint: str, data: dict):
request = self.requests.post(
self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
)
if request.status_code not in [200, 201, 202]:
if (
"already exists" in request.text
):
self.logger.warning(request.text)
raise AlreadyExistsException()
self.logger.error(
f">> Received status code: {request.status_code} ({request.url})"
)
self.logger.error(f">> With info: {data} ({self.headers})")
self.logger.error(f">> Answer: {request.text}")
raise Exception(
f">> Received status code: {request.status_code} ({request.url}), {request.text}"
)
return self.parse_result(request)
def get_version(self) -> str:
path = "/version"
result = self.requests_get(path)
self.logger.debug(f">> Gitea Version: {result["version"]}")
return result["version"]
def get_issues(
self,
owner: str,
repository: str,
):
path = f"/repos/{owner}/{repository}/issues"
params = {"state": "open"}
self.logger.debug(f">> Path used to get issues: {path}")
result = self.requests_get(path, params)
self.logger.debug(">> Gitea response: %s", result)
return result
def get_pull_requests(
self,
owner: str,
repository: str,
):
path = f"/repos/{owner}/{repository}/pulls"
params = {"state": "open"}
self.logger.debug(f">> Path used to get pull requests: {path}")
result = self.requests_get(path, params)
self.logger.debug(">> Gitea response: %s", result)
return result
def update_issue_labels(
self,
issue_number: int,
labels: list,
owner: str,
repository: str,
):
path = f"/repos/{owner}/{repository}/issues/{issue_number}/labels"
data = {"labels": labels}
self.logger.debug(f">> Path used to update issue label: {path}")
result = self.requests_post(path, data)
if "id" in result:
self.logger.info(">> Successfully added label")
self.logger.debug(">> Gitea response: %s", result)
else:
self.logger.error(result["message"])
# raise Exception("User not created... (gitea: %s)" % result["message"])
def update_pull_request_labels(
self,
pull_request_number: int,
labels: list,
owner: str,
repository: str,
):
path = f"/repos/{owner}/{repository}/pulls/{pull_request_number}/labels"
data = {"labels": labels}
self.logger.debug(f">> Path used to update pull request labels: {path}")
result = self.requests_post(path, data)
if "id" in result:
self.logger.info(">> Successfully added label")
self.logger.debug(">> Gitea response: %s", result)
else:
self.logger.error(result["message"])
# raise Exception("User not created... (gitea: %s)" % result["message"])

0
scripts/__init__.py Normal file
View File

View File

@@ -1,128 +0,0 @@
import os
import sys
import requests
from datetime import datetime, timedelta, timezone
def main():
"""
Main function to fetch issues from a Gitea repository and applies a tag to them.
"""
# --- Get configuration from environment variables ---
try:
instance_url = os.environ['INSTANCE_URL'].rstrip('/')
repository = os.environ['REPOSITORY']
token = os.environ['TOKEN']
stale_days = os.environ.get('STALE_DAYS')
stale_tag = os.environ.get('STALE_TAG')
exclude_tag = os.environ.get('EXCLUDE_TAG')
required_tag = os.environ.get('REQUIRED_TAG')
except KeyError as e:
print(f"Error: Missing required environment variable: {e}", file=sys.stderr)
sys.exit(1)
# --- Switch off checks if env is empty ---
if required_tag == None:
enable_required_tag = False
print(f">> No optional required tag set.")
else:
required_tag = int(required_tag)
enable_required_tag = True
if exclude_tag == None:
enable_exclude_tag= False
print(f">> No optional exclusive tag set.")
else:
exclude_tag = int(exclude_tag)
enable_exclude_tag = True
if stale_days == None:
enable_stale_tag= False
print(f">> No value for stale days.")
else:
stale_days = int(stale_days)
older_than_date = datetime.now(timezone.utc) - timedelta(days=stale_days)
enable_stale_tag = True
stale_tag = int(stale_tag)
# --- Check if any actions are enabled ---
if not any([enable_stale_tag]) == True:
print(f">> No actions enabled, exiting.")
sys.exit(1)
# --- Set up API headers and base URL ---
headers = {
'Authorization': f'token {token}',
'Accept': 'application/json',
'Content-Type': 'application/json'
}
base_api_url = f"{instance_url}/api/v1/repos/{repository}/issues"
params = {'state': 'open'}
# --- 3. Fetch open issues ---
print(">> Fetching issues ...")
try:
response = requests.get(base_api_url, headers=headers, params=params, timeout=30)
response.raise_for_status()
issues = response.json()
except requests.exceptions.RequestException as e:
print(f"Error fetching issues: {e}", file=sys.stderr)
sys.exit(1)
if not issues:
print(">> No open issues found, exiting.")
sys.exit(0)
print(f">> Processing {len(issues)} open issues ...")
# --- Process Pull Requests ---
for issue in issues:
issue_number = issue['number']
issue_created_at = datetime.fromisoformat(issue['created_at'].replace('Z', '+00:00'))
# -- Process labels ---
issue_current_labels = {label['name'] for label in issue.get('labels', [])}
update_labels = issue_current_labels
if enable_required_tag:
if not required_tag in issue_current_labels:
print(f">> Skipping issue #{issue_number} because it does not have the '{required_tag}' tag.")
continue
if enable_exclude_tag:
if exclude_tag in issue_current_labels:
print(f">> Skipping issue #{issue_number} because it has the '{exclude_tag}' tag.")
continue
if enable_stale_tag:
if issue_created_at < older_than_date:
if stale_tag in issue_current_labels:
print(f">> Skipping issue #{issue_number} because it already has the '{stale_tag}' tag.")
continue
print(f">> Will tag issue #{issue_number} with '{stale_tag}'")
update_labels.add(stale_tag)
# --- Make update with new labels ---
update_payload = {'labels': list(update_labels)}
update_url = f"{base_api_url}/{issue_number}/labels"
print(f">> Updating Issue #{issue_number} ...")
try:
update_response = requests.put(update_url, headers=headers, json=update_payload, timeout=30)
update_response.raise_for_status()
print(f">> Successfully updated labels for issue #{issue_number}")
except requests.exceptions.RequestException as e:
print(f"Error updating labels for issue #{issue_number}: {e}", file=sys.stderr)
print(">> Finished processing issues.")
if __name__ == "__main__":
main()

View File

@@ -1,130 +0,0 @@
import os
import sys
import requests
from datetime import datetime, timedelta, timezone
def main():
"""
Main function to fetch pull requests from a Gitea repository and process them.
"""
# --- Get configuration from environment variables ---
try:
instance_url = os.environ['INSTANCE_URL'].rstrip('/')
repository = os.environ['REPOSITORY']
token = os.environ['TOKEN']
stale_days = os.environ.get('STALE_DAYS')
stale_tag = os.environ.get('STALE_TAG')
exclude_tag = os.environ.get('EXCLUDE_TAG')
required_tag = os.environ.get('REQUIRED_TAG')
except KeyError as e:
print(f"Error: Missing required environment variable: {e}", file=sys.stderr)
sys.exit(1)
print(f">> Processing pull requests ...")
# --- Switch off checks if env is empty ---
if required_tag == None:
enable_required_tag = False
print(f">> No optional required tag set.")
else:
required_tag = int(required_tag)
enable_required_tag = True
if exclude_tag == None:
enable_exclude_tag= False
print(f">> No optional exclusive tag set.")
else:
exclude_tag = int(exclude_tag)
enable_exclude_tag = True
if stale_days == None:
enable_stale_tag= False
print(f">> No value for stale days.")
else:
stale_days = int(stale_days)
older_than_date = datetime.now(timezone.utc) - timedelta(days=stale_days)
enable_stale_tag = True
stale_tag = int(stale_tag)
# --- Check if any actions are enabled ---
if not any([enable_stale_tag]) == True:
print(f">> No actions enabled, exiting.")
sys.exit(1)
# --- Set up API headers and base URL ---
headers = {
'Authorization': f'token {token}',
'Accept': 'application/json',
'Content-Type': 'application/json'
}
base_api_url = f"{instance_url}/api/v1/repos/{repository}/pulls"
params = {'state': 'open'}
# --- Fetch pull requests ---
print(">> Fetching pull requests ...")
try:
response = requests.get(base_api_url, headers=headers, params=params, timeout=30)
response.raise_for_status()
pull_requests = response.json()
except requests.exceptions.RequestException as e:
print(f"Error fetching pull requests: {e}", file=sys.stderr)
sys.exit(1)
if not pull_requests:
print(">> No open pull requests found, exiting.")
sys.exit(0)
print(f">> Processing {len(pull_requests)} open pull requests ...")
# --- Process Pull Requests ---
for pr in pull_requests:
pr_number = pr['number']
pr_created_at = datetime.fromisoformat(pr['created_at'].replace('Z', '+00:00'))
# -- Process labels ---
pr_current_labels = {label['name'] for label in pr.get('labels', [])}
update_labels = pr_current_labels
if enable_required_tag:
if not required_tag in pr_current_labels:
print(f">> Skipping PR #{pr_number} because it does not have the '{required_tag}' tag.")
continue
if enable_exclude_tag:
if exclude_tag in pr_current_labels:
print(f">> Skipping PR #{pr_number} because it has the '{exclude_tag}' tag.")
continue
if enable_stale_tag:
if pr_created_at < older_than_date:
if stale_tag in pr_current_labels:
print(f">> Skipping issue #{pr_number} because it already has the '{stale_tag}' tag.")
continue
print(f">> Will tag PR #{pr_number} with '{stale_tag}'")
update_labels.add(stale_tag)
# --- Make update with new labels ---
update_payload = {'labels': list(update_labels)}
update_url = f"{base_api_url}/{pr_number}/labels"
print(f">> Updating PR #{pr_number} ...")
try:
update_response = requests.put(update_url, headers=headers, json=update_payload, timeout=30)
update_response.raise_for_status()
print(f">> Successfully updated labels for PR #{pr_number}")
except requests.exceptions.RequestException as e:
print(f"Error updating labels for PR #{pr_number}: {e}", file=sys.stderr)
print(">> Finished processing pull requests.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,154 @@
import os
import sys
import logging
from datetime import datetime, timedelta, timezone
from ..lib.gitea import Gitea
def main():
"""
Main function to fetch issues from a Gitea repository and process them.
"""
# --- Get configuration from environment variables ---
try:
instance_url = os.environ["INSTANCE_URL"].rstrip("/")
owner = os.environ["OWNER"]
repository = os.environ["REPOSITORY"]
token = os.environ["TOKEN"]
log_level = os.environ.get("LOG_LEVEL", "INFO").upper()
issue_stale_days = os.environ.get("ISSUE_STALE_DAYS")
issue_stale_tag = os.environ.get("ISSUE_STALE_TAG")
issue_exclude_tag = os.environ.get("ISSUE_EXCLUDE_TAG")
issue_required_tag = os.environ.get("ISSUE_REQUIRED_TAG")
pull_request_stale_days = os.environ.get("PULL_REQUEST_STALE_DAYS")
pull_request_stale_tag = os.environ.get("PULL_REQUEST_STALE_TAG")
pull_request_exclude_tag = os.environ.get("PULL_REQUEST_EXCLUDE_TAG")
pull_request_required_tag = os.environ.get("PULL_REQUEST_REQUIRED_TAG")
except KeyError as e:
print(f">> Error: Missing required environment variable: {e}", file=sys.stderr)
sys.exit(1)
# Enable logging
LOG_LEVELS = {
"DEBUG": logging.DEBUG,
"INFO": logging.INFO,
"WARNING": logging.WARNING,
"ERROR": logging.ERROR,
}
logging.basicConfig(
level=LOG_LEVELS.get(log_level, logging.INFO),
format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
# --- Setup Gitea API ---
gitea = Gitea(instance_url, token, log_level)
# --- Fetch issues ---
issues = gitea.get_issues(owner=owner, repository=repository)
# --- Process issues ---
if len(issues) < 1:
logger.info(">> No open issues found")
else:
logger.info(f">> Processing {len(issues)} open issues ...")
for issue in issues:
# -- Process labels ---
issue_created_at = datetime.fromisoformat(issue["created_at"].replace("Z", "+00:00"))
issue_older_than_date = datetime.now(timezone.utc) - timedelta(days=issue_stale_days)
issue_current_labels = {label["name"] for label in issue.get("labels", [])}
issue_update_labels = list(issue_current_labels)
logger.debug(f">> Issue has the following labels: {issue_current_labels}")
# -- Check required --
if (not issue_required_tag == None) and (not int(issue_required_tag) in issue_current_labels):
logger.debug(
f">> Skipping issue #{issue["number"]} because it does not have the required ('{issue_required_tag}') tag"
)
continue
# -- Check exclude --
if (not issue_exclude_tag == None) and (int(issue_exclude_tag) in issue_current_labels):
logger.debug(
f">> Skipping issue #{issue["number"]} because it has the exclude ('{issue_exclude_tag}') tag"
)
continue
# -- Apply stale tag --
if (not issue_stale_tag == None) and (issue_created_at < issue_older_than_date):
logger.debug(
f">> Issue was created at {issue_created_at}, which is older then {issue_older_than_date}"
)
if issue_stale_tag in issue_current_labels:
logger.debug(
f">> Skipping issue #{issue["number"]} because it already has the stale ('{issue_stale_tag}') tag"
)
continue
logger.info(f">> Will tag issue #{issue["number"]} with '{issue_stale_tag}'")
issue_update_labels.add(issue_stale_tag)
gitea.update_issue_labels(issue_number=issue["number"], labels=issue_update_labels, owner=owner, repository=repository)
logger.info(f">> Finished issue #{issue["number"]}")
logger.info(">> Finished processing issues")
# --- Fetch pull requests ---
pull_requests = gitea.get_pull_requests(owner=owner, repository=repository)
# --- Process pull requests ---
if len(pull_requests) < 1:
logger.info(">> No open pull requests found")
else:
logger.info(f">> Processing {len(pull_requests)} open pull requests ...")
for pull_request in pull_requests:
# -- Process labels ---
pull_request_created_at = datetime.fromisoformat(pull_request["created_at"].replace("Z", "+00:00"))
pull_request_older_than_date = datetime.now(timezone.utc) - timedelta(days=pull_request_stale_days)
pull_request_current_labels = {label["name"] for label in pull_request.get("labels", [])}
pull_request_update_labels = list(pull_request_current_labels)
logger.debug(f">> Pull request has the following labels: {issue_current_labels}")
# -- Check required --
if (not pull_request_required_tag == None) and (not int(pull_request_required_tag) in pull_request_current_labels):
logger.debug(
f">> Skipping pull request #{pull_request["number"]} because it does not have the required ('{pull_request_required_tag}') tag"
)
continue
# -- Check exclude --
if (not pull_request_exclude_tag == None) and (int(pull_request_exclude_tag) in pull_request_current_labels):
logger.debug(
f">> Skipping pull request #{pull_request["number"]} because it has the exclude ('{pull_request_exclude_tag}') tag"
)
continue
# -- Apply stale tag --
if (not pull_request_stale_tag == None) and (pull_request_created_at < pull_request_older_than_date):
logger.debug(
f">> Pull request was created at {pull_request_created_at}, which is older then {pull_request_older_than_date}"
)
if pull_request_stale_tag in pull_request_current_labels:
logger.debug(
f">> Skipping pull request #{pull_request["number"]} because it already has the stale ('{pull_request_stale_tag}') tag"
)
continue
logger.info(f">> Will tag pull request #{pull_request["number"]} with '{pull_request_stale_tag}'")
pull_request_update_labels.add(pull_request_stale_tag)
gitea.update_pull_requests(pull_request_number=pull_request["number"], labels=pull_request_update_labels, owner=owner, repository=repository)
logger.info(f">> Finished pull request #{issue["number"]}")
logger.info(">> Finished processing pull requests")
if __name__ == "__main__":
main()