add parse method

This commit is contained in:
2025-07-15 22:09:58 -05:00
parent 996d171e11
commit 3f82a92b02
2 changed files with 64 additions and 19 deletions

View File

@@ -5,6 +5,7 @@ from immutabledict import immutabledict
from .exceptions import NotFoundException, ConflictException, AlreadyExistsException from .exceptions import NotFoundException, ConflictException, AlreadyExistsException
class Gitea: class Gitea:
"""Object to establish a session with Gitea""" """Object to establish a session with Gitea"""
@@ -37,6 +38,13 @@ class Gitea:
self.logger.debug(">> Gitea URL: %s" % url) self.logger.debug(">> Gitea URL: %s" % url)
return url return url
@staticmethod
def parse_result(result) -> dict:
"""Parses the result-JSON to a dict."""
if result.text and len(result.text) > 3:
return json.loads(result.text)
return {}
def _requests_get(self, endpoint: str, params=immutabledict()) -> requests.Response: def _requests_get(self, endpoint: str, params=immutabledict()) -> requests.Response:
request = self.requests.get( request = self.requests.get(
self.__get_url(endpoint), headers=self.headers, params=params self.__get_url(endpoint), headers=self.headers, params=params
@@ -74,9 +82,7 @@ class Gitea:
self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
) )
if request.status_code not in [200, 201, 202]: if request.status_code not in [200, 201, 202]:
if ( if "already exists" in request.text:
"already exists" in request.text
):
self.logger.warning(request.text) self.logger.warning(request.text)
raise AlreadyExistsException() raise AlreadyExistsException()
self.logger.error( self.logger.error(

View File

@@ -6,6 +6,7 @@ from datetime import datetime, timedelta, timezone
from lib.gitea import Gitea from lib.gitea import Gitea
def main(): def main():
""" """
Main function to fetch issues from a Gitea repository and process them. Main function to fetch issues from a Gitea repository and process them.
@@ -56,28 +57,38 @@ def main():
for issue in issues: for issue in issues:
# -- Process labels --- # -- Process labels ---
issue_created_at = datetime.fromisoformat(issue["created_at"].replace("Z", "+00:00")) issue_created_at = datetime.fromisoformat(
issue_older_than_date = datetime.now(timezone.utc) - timedelta(days=issue_stale_days) issue["created_at"].replace("Z", "+00:00")
)
issue_older_than_date = datetime.now(timezone.utc) - timedelta(
days=issue_stale_days
)
issue_current_labels = {label["name"] for label in issue.get("labels", [])} issue_current_labels = {label["name"] for label in issue.get("labels", [])}
issue_update_labels = list(issue_current_labels) issue_update_labels = list(issue_current_labels)
logger.debug(f">> Issue has the following labels: {issue_current_labels}") logger.debug(f">> Issue has the following labels: {issue_current_labels}")
# -- Check required -- # -- Check required --
if (not issue_required_tag == None) and (not int(issue_required_tag) in issue_current_labels): if (not issue_required_tag == None) and (
not int(issue_required_tag) in issue_current_labels
):
logger.debug( logger.debug(
f">> Skipping issue #{issue["number"]} because it does not have the required ('{issue_required_tag}') tag" f">> Skipping issue #{issue["number"]} because it does not have the required ('{issue_required_tag}') tag"
) )
continue continue
# -- Check exclude -- # -- Check exclude --
if (not issue_exclude_tag == None) and (int(issue_exclude_tag) in issue_current_labels): if (not issue_exclude_tag == None) and (
int(issue_exclude_tag) in issue_current_labels
):
logger.debug( logger.debug(
f">> Skipping issue #{issue["number"]} because it has the exclude ('{issue_exclude_tag}') tag" f">> Skipping issue #{issue["number"]} because it has the exclude ('{issue_exclude_tag}') tag"
) )
continue continue
# -- Apply stale tag -- # -- Apply stale tag --
if (not issue_stale_tag == None) and (issue_created_at < issue_older_than_date): if (not issue_stale_tag == None) and (
issue_created_at < issue_older_than_date
):
logger.debug( logger.debug(
f">> Issue was created at {issue_created_at}, which is older then {issue_older_than_date}" f">> Issue was created at {issue_created_at}, which is older then {issue_older_than_date}"
) )
@@ -88,10 +99,17 @@ def main():
) )
continue continue
logger.info(f">> Will tag issue #{issue["number"]} with '{issue_stale_tag}'") logger.info(
f">> Will tag issue #{issue["number"]} with '{issue_stale_tag}'"
)
issue_update_labels.add(issue_stale_tag) issue_update_labels.add(issue_stale_tag)
gitea.update_issue_labels(issue_number=issue["number"], labels=issue_update_labels, owner=owner, repository=repository) gitea.update_issue_labels(
issue_number=issue["number"],
labels=issue_update_labels,
owner=owner,
repository=repository,
)
logger.info(f">> Finished issue #{issue["number"]}") logger.info(f">> Finished issue #{issue["number"]}")
@@ -108,28 +126,42 @@ def main():
for pull_request in pull_requests: for pull_request in pull_requests:
# -- Process labels --- # -- Process labels ---
pull_request_created_at = datetime.fromisoformat(pull_request["created_at"].replace("Z", "+00:00")) pull_request_created_at = datetime.fromisoformat(
pull_request_older_than_date = datetime.now(timezone.utc) - timedelta(days=pull_request_stale_days) pull_request["created_at"].replace("Z", "+00:00")
pull_request_current_labels = {label["name"] for label in pull_request.get("labels", [])} )
pull_request_older_than_date = datetime.now(timezone.utc) - timedelta(
days=pull_request_stale_days
)
pull_request_current_labels = {
label["name"] for label in pull_request.get("labels", [])
}
pull_request_update_labels = list(pull_request_current_labels) pull_request_update_labels = list(pull_request_current_labels)
logger.debug(f">> Pull request has the following labels: {issue_current_labels}") logger.debug(
f">> Pull request has the following labels: {issue_current_labels}"
)
# -- Check required -- # -- Check required --
if (not pull_request_required_tag == None) and (not int(pull_request_required_tag) in pull_request_current_labels): if (not pull_request_required_tag == None) and (
not int(pull_request_required_tag) in pull_request_current_labels
):
logger.debug( logger.debug(
f">> Skipping pull request #{pull_request["number"]} because it does not have the required ('{pull_request_required_tag}') tag" f">> Skipping pull request #{pull_request["number"]} because it does not have the required ('{pull_request_required_tag}') tag"
) )
continue continue
# -- Check exclude -- # -- Check exclude --
if (not pull_request_exclude_tag == None) and (int(pull_request_exclude_tag) in pull_request_current_labels): if (not pull_request_exclude_tag == None) and (
int(pull_request_exclude_tag) in pull_request_current_labels
):
logger.debug( logger.debug(
f">> Skipping pull request #{pull_request["number"]} because it has the exclude ('{pull_request_exclude_tag}') tag" f">> Skipping pull request #{pull_request["number"]} because it has the exclude ('{pull_request_exclude_tag}') tag"
) )
continue continue
# -- Apply stale tag -- # -- Apply stale tag --
if (not pull_request_stale_tag == None) and (pull_request_created_at < pull_request_older_than_date): if (not pull_request_stale_tag == None) and (
pull_request_created_at < pull_request_older_than_date
):
logger.debug( logger.debug(
f">> Pull request was created at {pull_request_created_at}, which is older then {pull_request_older_than_date}" f">> Pull request was created at {pull_request_created_at}, which is older then {pull_request_older_than_date}"
) )
@@ -140,10 +172,17 @@ def main():
) )
continue continue
logger.info(f">> Will tag pull request #{pull_request["number"]} with '{pull_request_stale_tag}'") logger.info(
f">> Will tag pull request #{pull_request["number"]} with '{pull_request_stale_tag}'"
)
pull_request_update_labels.add(pull_request_stale_tag) pull_request_update_labels.add(pull_request_stale_tag)
gitea.update_pull_requests(pull_request_number=pull_request["number"], labels=pull_request_update_labels, owner=owner, repository=repository) gitea.update_pull_requests(
pull_request_number=pull_request["number"],
labels=pull_request_update_labels,
owner=owner,
repository=repository,
)
logger.info(f">> Finished pull request #{issue["number"]}") logger.info(f">> Finished pull request #{issue["number"]}")