fix(backend): Fix conn_retry decorator possible incorrect behaviour on failed async function (#8836)
This fix is triggered by an error observed on db connection failure on SupaBase: ``` 2024-11-28 07:45:24,724 INFO [DatabaseManager] Starting... 2024-11-28 07:45:24,726 INFO [PID-18|DatabaseManager|Prisma-7f32369c-6432-4edb-8e71-ef820332b9e4] Acquiring connection started... 2024-11-28 07:45:24,726 INFO [PID-18|DatabaseManager|Prisma-7f32369c-6432-4edb-8e71-ef820332b9e4] Acquiring connection completed successfully. {"is_panic":false,"message":"Can't reach database server at `...pooler.supabase.com:5432`\n\nPlease make sure your database server is running at `....pooler.supabase.com:5432`.","meta":{"database_host":"...pooler.supabase.com","database_port":5432},"error_code":"P1001"} 2024-11-28 07:45:35,153 INFO [PID-18|DatabaseManager|Prisma-7f32369c-6432-4edb-8e71-ef820332b9e4] Acquiring connection failed: Could not connect to the query engine. Retrying now... 2024-11-28 07:45:36,155 INFO [PID-18|DatabaseManager|Redis-e14a33de-2d81-4536-b48b-a8aa4b1f4766] Acquiring connection started... 2024-11-28 07:45:36,181 INFO [PID-18|DatabaseManager|Redis-e14a33de-2d81-4536-b48b-a8aa4b1f4766] Acquiring connection completed successfully. 2024-11-28 07:45:36,183 INFO [PID-18|DatabaseManager|Pyro-2722cd29-4dbd-4cf9-882f-73842658599d] Starting Pyro Service started... 2024-11-28 07:45:36,189 INFO [DatabaseManager] Connected to Pyro; URI = PYRO:DatabaseManager@0.0.0.0:8005 2024-11-28 07:46:28,241 ERROR Error in get_user_integrations: All connection attempts failed ``` Where even ``` 2024-11-28 07:45:35,153 INFO [PID-18|DatabaseManager|Prisma-7f32369c-6432-4edb-8e71-ef820332b9e4] Acquiring connection failed: Could not connect to the query engine. Retrying now... ``` is present, the Redis connection is still proceeding without waiting for the retry to complete. This was likely caused by Tenacity not fully awaiting the DB connection acquisition command. ### Changes 🏗️ * Add special handling for the async function to explicitly await the function execution result on each retry. * Explicitly raise exceptions on `db.connect()` if the db is not connected even after `prisma.connect()` command. ### Checklist 📋 #### For code changes: - [ ] I have clearly listed my changes in the PR description - [ ] I have made a test plan - [ ] I have tested my changes according to the test plan: <!-- Put your test plan here: --> - [ ] ... <details> <summary>Example test plan</summary> - [ ] Create from scratch and execute an agent with at least 3 blocks - [ ] Import an agent from file upload, and confirm it executes correctly - [ ] Upload agent to marketplace - [ ] Import an agent from marketplace and confirm it executes correctly - [ ] Edit an agent from monitor, and confirm it executes correctly </details> #### For configuration changes: - [ ] `.env.example` is updated or already compatible with my changes - [ ] `docker-compose.yml` is updated or already compatible with my changes - [ ] I have included a list of my configuration changes in the PR description (under **Changes**) <details> <summary>Examples of configuration changes</summary> - Changing ports - Adding new services that need to communicate with each other - Secrets or environment variable changes - New or infrastructure changes such as databases </details>pull/8648/head^2
parent
29f177e70d
commit
63af42dafb
|
@ -23,15 +23,23 @@ logger = logging.getLogger(__name__)
|
|||
async def connect():
|
||||
if prisma.is_connected():
|
||||
return
|
||||
|
||||
await prisma.connect()
|
||||
|
||||
if not prisma.is_connected():
|
||||
raise ConnectionError("Failed to connect to Prisma.")
|
||||
|
||||
|
||||
@conn_retry("Prisma", "Releasing connection")
|
||||
async def disconnect():
|
||||
if not prisma.is_connected():
|
||||
return
|
||||
|
||||
await prisma.disconnect()
|
||||
|
||||
if prisma.is_connected():
|
||||
raise ConnectionError("Failed to disconnect from Prisma.")
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def transaction():
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
|
@ -20,7 +21,14 @@ def _log_prefix(resource_name: str, conn_id: str):
|
|||
return f"[PID-{os.getpid()}|THREAD-{threading.get_native_id()}|{get_service_name()}|{resource_name}-{conn_id}]"
|
||||
|
||||
|
||||
def conn_retry(resource_name: str, action_name: str, max_retry: int = 5):
|
||||
def conn_retry(
|
||||
resource_name: str,
|
||||
action_name: str,
|
||||
max_retry: int = 5,
|
||||
multiplier: int = 1,
|
||||
min_wait: float = 1,
|
||||
max_wait: float = 30,
|
||||
):
|
||||
conn_id = str(uuid4())
|
||||
|
||||
def on_retry(retry_state):
|
||||
|
@ -29,27 +37,39 @@ def conn_retry(resource_name: str, action_name: str, max_retry: int = 5):
|
|||
logger.error(f"{prefix} {action_name} failed: {exception}. Retrying now...")
|
||||
|
||||
def decorator(func):
|
||||
is_coroutine = asyncio.iscoroutinefunction(func)
|
||||
retry_decorator = retry(
|
||||
stop=stop_after_attempt(max_retry + 1),
|
||||
wait=wait_exponential(multiplier=multiplier, min=min_wait, max=max_wait),
|
||||
before_sleep=on_retry,
|
||||
reraise=True,
|
||||
)
|
||||
wrapped_func = retry_decorator(func)
|
||||
|
||||
@wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
def sync_wrapper(*args, **kwargs):
|
||||
prefix = _log_prefix(resource_name, conn_id)
|
||||
logger.info(f"{prefix} {action_name} started...")
|
||||
|
||||
# Define the retrying strategy
|
||||
retrying_func = retry(
|
||||
stop=stop_after_attempt(max_retry + 1),
|
||||
wait=wait_exponential(multiplier=1, min=1, max=30),
|
||||
before_sleep=on_retry,
|
||||
reraise=True,
|
||||
)(func)
|
||||
|
||||
try:
|
||||
result = retrying_func(*args, **kwargs)
|
||||
result = wrapped_func(*args, **kwargs)
|
||||
logger.info(f"{prefix} {action_name} completed successfully.")
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.error(f"{prefix} {action_name} failed after retries: {e}")
|
||||
raise
|
||||
|
||||
return wrapper
|
||||
@wraps(func)
|
||||
async def async_wrapper(*args, **kwargs):
|
||||
prefix = _log_prefix(resource_name, conn_id)
|
||||
logger.info(f"{prefix} {action_name} started...")
|
||||
try:
|
||||
result = await wrapped_func(*args, **kwargs)
|
||||
logger.info(f"{prefix} {action_name} completed successfully.")
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.error(f"{prefix} {action_name} failed after retries: {e}")
|
||||
raise
|
||||
|
||||
return async_wrapper if is_coroutine else sync_wrapper
|
||||
|
||||
return decorator
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
import asyncio
|
||||
|
||||
import pytest
|
||||
|
||||
from backend.util.retry import conn_retry
|
||||
|
||||
|
||||
def test_conn_retry_sync_function():
|
||||
retry_count = 0
|
||||
|
||||
@conn_retry("Test", "Test function", max_retry=2, max_wait=0.1, min_wait=0.1)
|
||||
def test_function():
|
||||
nonlocal retry_count
|
||||
retry_count -= 1
|
||||
if retry_count > 0:
|
||||
raise ValueError("Test error")
|
||||
return "Success"
|
||||
|
||||
retry_count = 2
|
||||
res = test_function()
|
||||
assert res == "Success"
|
||||
|
||||
retry_count = 100
|
||||
with pytest.raises(ValueError) as e:
|
||||
test_function()
|
||||
assert str(e.value) == "Test error"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_conn_retry_async_function():
|
||||
retry_count = 0
|
||||
|
||||
@conn_retry("Test", "Test function", max_retry=2, max_wait=0.1, min_wait=0.1)
|
||||
async def test_function():
|
||||
nonlocal retry_count
|
||||
await asyncio.sleep(1)
|
||||
retry_count -= 1
|
||||
if retry_count > 0:
|
||||
raise ValueError("Test error")
|
||||
return "Success"
|
||||
|
||||
retry_count = 2
|
||||
res = await test_function()
|
||||
assert res == "Success"
|
||||
|
||||
retry_count = 100
|
||||
with pytest.raises(ValueError) as e:
|
||||
await test_function()
|
||||
assert str(e.value) == "Test error"
|
Loading…
Reference in New Issue