feat(blocks): Add WebSearch & WebScrapper block for searching the web with Jina Reader (#7445)

Co-authored-by: Zamil Majdy <zamil.majdy@agpt.co>
pull/7457/head
Toran Bruce Richards 2024-07-16 09:32:23 +01:00 committed by GitHub
parent cb4b96a70c
commit e874318832
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 151 additions and 50 deletions

View File

@ -166,4 +166,4 @@ To add a new agent block, you need to create a new class that inherits from `Blo
* `run` method: the main logic of the block.
* `test_input` & `test_output`: the sample input and output data for the block, which will be used to auto-test the block.
* You can mock the functions declared in the block using the `test_mock` field for your unit tests.
* Once you finish creating the block, you can test it by running `pytest test/block/test_block.py`.
* Once you finish creating the block, you can test it by running `pytest -s test/block/test_block.py`.

View File

@ -26,6 +26,11 @@ class RedditPost(BaseModel):
body: str
class RedditComment(BaseModel):
post_id: str
comment: str
def get_praw(creds: RedditCredentials) -> praw.Reddit:
client = praw.Reddit(
client_id=creds.client_id.get(),
@ -127,10 +132,11 @@ class RedditGetPostsBlock(Block):
class RedditPostCommentBlock(Block):
class Input(BlockSchema):
creds: RedditCredentials = Field(description="Reddit credentials")
data: Any = Field(description="Reddit post")
# post_id: str = Field(description="Reddit post ID")
# comment: str = Field(description="Comment text")
creds: RedditCredentials = Field(
description="Reddit credentials",
default=RedditCredentials()
)
data: RedditComment = Field(description="Reddit comment")
class Output(BlockSchema):
comment_id: str
@ -140,10 +146,17 @@ class RedditPostCommentBlock(Block):
id="4a92261b-701e-4ffb-8970-675fd28e261f",
input_schema=RedditPostCommentBlock.Input,
output_schema=RedditPostCommentBlock.Output,
test_input={"data": {"post_id": "id", "comment": "comment"}},
test_output=[("comment_id", "dummy_comment_id")],
test_mock={"reply_post": lambda creds, comment: "dummy_comment_id"}
)
@staticmethod
def reply_post(creds: RedditCredentials, comment: RedditComment) -> str:
client = get_praw(creds)
submission = client.submission(id=comment.post_id)
comment = submission.reply(comment.comment)
return comment.id
def run(self, input_data: Input) -> BlockOutput:
client = get_praw(input_data.creds)
submission = client.submission(id=input_data.data["post_id"])
comment = submission.reply(input_data.data["comment"])
yield "comment_id", comment.id
yield "comment_id", self.reply_post(input_data.creds, input_data.data)

View File

@ -0,0 +1,124 @@
from typing import Any
from urllib.parse import quote
import requests
from autogpt_server.data.block import Block, BlockSchema, BlockOutput
class GetRequest:
@classmethod
def get_request(cls, url: str, json=False) -> Any:
response = requests.get(url)
response.raise_for_status()
return response.json() if json else response.text
class WikipediaSummaryBlock(Block, GetRequest):
class Input(BlockSchema):
topic: str
class Output(BlockSchema):
summary: str
error: str
def __init__(self):
super().__init__(
id="h5e7f8g9-1b2c-3d4e-5f6g-7h8i9j0k1l2m",
input_schema=WikipediaSummaryBlock.Input,
output_schema=WikipediaSummaryBlock.Output,
test_input={"topic": "Artificial Intelligence"},
test_output=("summary", "summary content"),
test_mock={"get_request": lambda url, json: {"extract": "summary content"}},
)
def run(self, input_data: Input) -> BlockOutput:
try:
topic = input_data.topic
url = f"https://en.wikipedia.org/api/rest_v1/page/summary/{topic}"
response = self.get_request(url, json=True)
yield "summary", response['extract']
except requests.exceptions.HTTPError as http_err:
yield "error", f"HTTP error occurred: {http_err}"
except requests.RequestException as e:
yield "error", f"Request to Wikipedia failed: {e}"
except KeyError as e:
yield "error", f"Error parsing Wikipedia response: {e}"
class WebSearchBlock(Block, GetRequest):
class Input(BlockSchema):
query: str # The search query
class Output(BlockSchema):
results: str # The search results including content from top 5 URLs
error: str # Error message if the search fails
def __init__(self):
super().__init__(
id="b2c3d4e5-6f7g-8h9i-0j1k-l2m3n4o5p6q7",
input_schema=WebSearchBlock.Input,
output_schema=WebSearchBlock.Output,
test_input={"query": "Artificial Intelligence"},
test_output=("results", "search content"),
test_mock={"get_request": lambda url, json: "search content"},
)
def run(self, input_data: Input) -> BlockOutput:
try:
# Encode the search query
encoded_query = quote(input_data.query)
# Prepend the Jina Search URL to the encoded query
jina_search_url = f"https://s.jina.ai/{encoded_query}"
# Make the request to Jina Search
response = self.get_request(jina_search_url, json=False)
# Output the search results
yield "results", response
except requests.exceptions.HTTPError as http_err:
yield "error", f"HTTP error occurred: {http_err}"
except requests.RequestException as e:
yield "error", f"Request to Jina Search failed: {e}"
class WebScraperBlock(Block, GetRequest):
class Input(BlockSchema):
url: str # The URL to scrape
class Output(BlockSchema):
content: str # The scraped content from the URL
error: str
def __init__(self):
super().__init__(
id="a1b2c3d4-5e6f-7g8h-9i0j-k1l2m3n4o5p6", # Unique ID for the block
input_schema=WebScraperBlock.Input,
output_schema=WebScraperBlock.Output,
test_input={"url": "https://en.wikipedia.org/wiki/Artificial_intelligence"},
test_output=("content", "scraped content"),
test_mock={"get_request": lambda url, json: "scraped content"},
)
def run(self, input_data: Input) -> BlockOutput:
try:
# Prepend the Jina-ai Reader URL to the input URL
jina_url = f"https://r.jina.ai/{input_data.url}"
# Make the request to Jina-ai Reader
response = self.get_request(jina_url, json=False)
# Output the scraped content
yield "content", response
except requests.exceptions.HTTPError as http_err:
yield "error", f"HTTP error occurred: {http_err}"
except requests.RequestException as e:
yield "error", f"Request to Jina-ai Reader failed: {e}"

View File

@ -1,36 +0,0 @@
import requests
from autogpt_server.data.block import Block, BlockSchema, BlockOutput
class WikipediaSummaryBlock(Block):
class Input(BlockSchema):
topic: str
class Output(BlockSchema):
summary: str
def __init__(self):
super().__init__(
id="h5e7f8g9-1b2c-3d4e-5f6g-7h8i9j0k1l2m",
input_schema=WikipediaSummaryBlock.Input,
output_schema=WikipediaSummaryBlock.Output,
test_input={"topic": "Artificial Intelligence"},
test_output=("summary", str),
)
def run(self, input_data: Input) -> BlockOutput:
try:
response = requests.get(f"https://en.wikipedia.org/api/rest_v1/page/summary/{input_data.topic}")
response.raise_for_status()
summary_data = response.json()
yield "summary", summary_data['extract']
except requests.exceptions.HTTPError as http_err:
raise ValueError(f"HTTP error occurred: {http_err}")
except requests.RequestException as e:
raise ValueError(f"Request to Wikipedia API failed: {e}")
except KeyError as e:
raise ValueError(f"Error processing Wikipedia data: {e}")

View File

@ -97,13 +97,13 @@ Make sure to only comment on a relevant post.
source_id=text_matcher_node.id,
sink_id=reddit_comment_node.id,
source_name="positive_#_post_id",
sink_name="post_id",
sink_name="data_#_post_id",
),
Link(
source_id=text_matcher_node.id,
sink_id=reddit_comment_node.id,
source_name="positive_#_marketing_text",
sink_name="comment",
sink_name="data_#_comment",
),
]

View File

@ -1,7 +1,7 @@
import time
from autogpt_server.data.block import Block
from autogpt_server.data import block, db
from autogpt_server.data import db
from autogpt_server.data.block import Block, initialize_blocks
from autogpt_server.data.execution import ExecutionStatus
from autogpt_server.executor import ExecutionManager, ExecutionScheduler
from autogpt_server.server import AgentServer
@ -24,7 +24,7 @@ class SpinTestServer:
self.scheduler.__enter__()
await db.connect()
await block.initialize_blocks()
await initialize_blocks()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):