Bently 2025-01-03 09:48:51 +00:00 committed by GitHub
parent fa98827fd1
commit fe8393a82f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 90 additions and 0 deletions

View File

@ -840,3 +840,93 @@ class GithubCreateRepositoryBlock(Block):
yield "clone_url", clone_url
except Exception as e:
yield "error", str(e)
class GithubListStargazersBlock(Block):
class Input(BlockSchema):
credentials: GithubCredentialsInput = GithubCredentialsField("repo")
repo_url: str = SchemaField(
description="URL of the GitHub repository",
placeholder="https://github.com/owner/repo",
)
class Output(BlockSchema):
class StargazerItem(TypedDict):
username: str
url: str
stargazer: StargazerItem = SchemaField(
title="Stargazer",
description="Stargazers with their username and profile URL",
)
error: str = SchemaField(
description="Error message if listing stargazers failed"
)
def __init__(self):
super().__init__(
id="a4b9c2d1-e5f6-4g7h-8i9j-0k1l2m3n4o5p", # Generated unique UUID
description="This block lists all users who have starred a specified GitHub repository.",
categories={BlockCategory.DEVELOPER_TOOLS},
input_schema=GithubListStargazersBlock.Input,
output_schema=GithubListStargazersBlock.Output,
test_input={
"repo_url": "https://github.com/owner/repo",
"credentials": TEST_CREDENTIALS_INPUT,
},
test_credentials=TEST_CREDENTIALS,
test_output=[
(
"stargazer",
{
"username": "octocat",
"url": "https://github.com/octocat",
},
)
],
test_mock={
"list_stargazers": lambda *args, **kwargs: [
{
"username": "octocat",
"url": "https://github.com/octocat",
}
]
},
)
@staticmethod
def list_stargazers(
credentials: GithubCredentials, repo_url: str
) -> list[Output.StargazerItem]:
api = get_api(credentials)
# Add /stargazers to the repo URL to get stargazers endpoint
stargazers_url = f"{repo_url}/stargazers"
# Set accept header to get starred_at timestamp
headers = {"Accept": "application/vnd.github.star+json"}
response = api.get(stargazers_url, headers=headers)
data = response.json()
stargazers: list[GithubListStargazersBlock.Output.StargazerItem] = [
{
"username": stargazer["login"],
"url": stargazer["html_url"],
}
for stargazer in data
]
return stargazers
def run(
self,
input_data: Input,
*,
credentials: GithubCredentials,
**kwargs,
) -> BlockOutput:
try:
stargazers = self.list_stargazers(
credentials,
input_data.repo_url,
)
yield from (("stargazer", stargazer) for stargazer in stargazers)
except Exception as e:
yield "error", str(e)