AutoGPT/autogpt_platform/backend/test/test_data_updater.py

324 lines
11 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Test Data Updater for Store Materialized Views
This script updates existing test data to trigger changes in the materialized views:
- mv_agent_run_counts: Updated by creating new AgentGraphExecution records
- mv_review_stats: Updated by creating new StoreListingReview records
Run this after test_data_creator.py to test that materialized views update correctly.
"""
import asyncio
import random
from datetime import datetime, timedelta
import prisma.enums
from faker import Faker
from prisma import Json, Prisma
faker = Faker()
async def main():
db = Prisma()
await db.connect()
print("Starting test data updates for materialized views...")
print("=" * 60)
# Get existing data
users = await db.user.find_many(take=50)
agent_graphs = await db.agentgraph.find_many(where={"isActive": True}, take=50)
store_listings = await db.storelisting.find_many(
where={"hasApprovedVersion": True}, include={"Versions": True}, take=30
)
agent_nodes = await db.agentnode.find_many(take=100)
if not all([users, agent_graphs, store_listings]):
print(
"ERROR: Not enough test data found. Please run test_data_creator.py first."
)
await db.disconnect()
return
print(
f"Found {len(users)} users, {len(agent_graphs)} graphs, {len(store_listings)} store listings"
)
print()
# 1. Add new AgentGraphExecutions to update mv_agent_run_counts
print("1. Adding new agent graph executions...")
print("-" * 40)
new_executions_count = 0
execution_data = []
for graph in random.sample(agent_graphs, min(20, len(agent_graphs))):
# Add 5-15 new executions per selected graph
num_new_executions = random.randint(5, 15)
for _ in range(num_new_executions):
user = random.choice(users)
execution_data.append(
{
"agentGraphId": graph.id,
"agentGraphVersion": graph.version,
"userId": user.id,
"executionStatus": random.choice(
[
prisma.enums.AgentExecutionStatus.COMPLETED,
prisma.enums.AgentExecutionStatus.FAILED,
prisma.enums.AgentExecutionStatus.RUNNING,
]
),
"startedAt": faker.date_time_between(
start_date="-7d", end_date="now"
),
"stats": Json(
{
"duration": random.randint(100, 5000),
"blocks_executed": random.randint(1, 10),
}
),
}
)
new_executions_count += 1
# Batch create executions
await db.agentgraphexecution.create_many(data=execution_data)
print(f"✓ Created {new_executions_count} new executions")
# Get the created executions for node executions
recent_executions = await db.agentgraphexecution.find_many(
take=new_executions_count, order={"createdAt": "desc"}
)
# 2. Add corresponding AgentNodeExecutions
print("\n2. Adding agent node executions...")
print("-" * 40)
node_execution_data = []
for execution in recent_executions:
# Get nodes for this graph
graph_nodes = [
n for n in agent_nodes if n.agentGraphId == execution.agentGraphId
]
if graph_nodes:
for node in random.sample(graph_nodes, min(3, len(graph_nodes))):
node_execution_data.append(
{
"agentGraphExecutionId": execution.id,
"agentNodeId": node.id,
"executionStatus": execution.executionStatus,
"addedTime": datetime.now(),
"startedTime": datetime.now()
- timedelta(minutes=random.randint(1, 10)),
"endedTime": (
datetime.now()
if execution.executionStatus
== prisma.enums.AgentExecutionStatus.COMPLETED
else None
),
}
)
await db.agentnodeexecution.create_many(data=node_execution_data)
print(f"✓ Created {len(node_execution_data)} node executions")
# 3. Add new StoreListingReviews to update mv_review_stats
print("\n3. Adding new store listing reviews...")
print("-" * 40)
new_reviews_count = 0
for listing in store_listings:
if not listing.Versions:
continue
# Get approved versions
approved_versions = [
v
for v in listing.Versions
if v.submissionStatus == prisma.enums.SubmissionStatus.APPROVED
]
if not approved_versions:
continue
# Pick a version to add reviews to
version = random.choice(approved_versions)
# Get existing reviews for this version to avoid duplicates
existing_reviews = await db.storelistingreview.find_many(
where={"storeListingVersionId": version.id}
)
existing_reviewer_ids = {r.reviewByUserId for r in existing_reviews}
# Find users who haven't reviewed this version yet
available_reviewers = [u for u in users if u.id not in existing_reviewer_ids]
if available_reviewers:
# Add 2-5 new reviews
num_new_reviews = min(random.randint(2, 5), len(available_reviewers))
selected_reviewers = random.sample(available_reviewers, num_new_reviews)
for reviewer in selected_reviewers:
# Bias towards positive reviews (4-5 stars)
score = random.choices([1, 2, 3, 4, 5], weights=[5, 10, 20, 40, 25])[0]
await db.storelistingreview.create(
data={
"storeListingVersionId": version.id,
"reviewByUserId": reviewer.id,
"score": score,
"comments": (
faker.text(max_nb_chars=200)
if random.random() < 0.7
else None
),
}
)
new_reviews_count += 1
print(f"✓ Created {new_reviews_count} new reviews")
# 4. Update some store listing versions (change categories, featured status)
print("\n4. Updating store listing versions...")
print("-" * 40)
updates_count = 0
for listing in random.sample(store_listings, min(10, len(store_listings))):
if listing.Versions:
version = random.choice(listing.Versions)
if version.submissionStatus == prisma.enums.SubmissionStatus.APPROVED:
# Toggle featured status or update categories
new_categories = random.sample(
[
"productivity",
"ai",
"automation",
"data",
"social",
"marketing",
"development",
"analytics",
],
k=random.randint(2, 4),
)
await db.storelistingversion.update(
where={"id": version.id},
data={
"isFeatured": (
not version.isFeatured
if random.random() < 0.3
else version.isFeatured
),
"categories": new_categories,
"updatedAt": datetime.now(),
},
)
updates_count += 1
print(f"✓ Updated {updates_count} store listing versions")
# 5. Create some new credit transactions
print("\n5. Adding credit transactions...")
print("-" * 40)
transaction_count = 0
for user in random.sample(users, min(30, len(users))):
# Add 1-3 transactions per user
for _ in range(random.randint(1, 3)):
transaction_type = random.choice(
[
prisma.enums.CreditTransactionType.USAGE,
prisma.enums.CreditTransactionType.TOP_UP,
prisma.enums.CreditTransactionType.GRANT,
]
)
amount = (
random.randint(10, 500)
if transaction_type == prisma.enums.CreditTransactionType.TOP_UP
else -random.randint(1, 50)
)
await db.credittransaction.create(
data={
"userId": user.id,
"amount": amount,
"type": transaction_type,
"metadata": Json(
{
"source": "test_updater",
"timestamp": datetime.now().isoformat(),
}
),
}
)
transaction_count += 1
print(f"✓ Created {transaction_count} credit transactions")
# 6. Refresh materialized views
print("\n6. Refreshing materialized views...")
print("-" * 40)
try:
await db.execute_raw("SELECT refresh_store_materialized_views();")
print("✓ Materialized views refreshed successfully")
except Exception as e:
print(f"⚠ Warning: Could not refresh materialized views: {e}")
print(
" You may need to refresh them manually with: SELECT refresh_store_materialized_views();"
)
# 7. Verify the updates
print("\n7. Verifying updates...")
print("-" * 40)
# Check agent run counts
run_counts = await db.query_raw(
"SELECT COUNT(*) as view_count FROM mv_agent_run_counts"
)
print(f"✓ mv_agent_run_counts has {run_counts[0]['view_count']} entries")
# Check review stats
review_stats = await db.query_raw(
"SELECT COUNT(*) as view_count FROM mv_review_stats"
)
print(f"✓ mv_review_stats has {review_stats[0]['view_count']} entries")
# Sample some data from the views
print("\nSample data from materialized views:")
sample_runs = await db.query_raw(
"SELECT * FROM mv_agent_run_counts ORDER BY run_count DESC LIMIT 5"
)
print("\nTop 5 agents by run count:")
for row in sample_runs:
print(f" - Agent {row['agentGraphId'][:8]}...: {row['run_count']} runs")
sample_reviews = await db.query_raw(
"SELECT * FROM mv_review_stats ORDER BY avg_rating DESC NULLS LAST LIMIT 5"
)
print("\nTop 5 store listings by rating:")
for row in sample_reviews:
avg_rating = row["avg_rating"] if row["avg_rating"] is not None else 0.0
print(
f" - Listing {row['storeListingId'][:8]}...: {avg_rating:.2f} ⭐ ({row['review_count']} reviews)"
)
await db.disconnect()
print("\n" + "=" * 60)
print("Test data update completed successfully!")
print("The materialized views should now reflect the updated data.")
print(
"\nTo manually refresh views, run: SELECT refresh_store_materialized_views();"
)
if __name__ == "__main__":
asyncio.run(main())