fix(backend): Added locking status check before releasing to avoid releasing timing out lock (#9135)
Exception: ``` nid:ce829f66-14b0-4bd3-b748-791e46666cb6|-] Failed node execution ce829f66-14b0-4bd3-b748-791e46666cb6: Cannot release an unlocked lock {}\u001b[0m", Traceback (most recent call last):\n File \"/app/autogpt_platform/backend/backend/integrations/creds_manager.py\", line 145, in _locked\n yield\n File \"/app/autogpt_platform/backend/backend/integrations/creds_manager.py\", line 115, in acquire\n lock = self._acquire_lock(user_id, credentials_id)", ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^", File \"/app/autogpt_platform/backend/backend/integrations/creds_manager.py\", line 139, in _acquire_lock", return self._locks.acquire(key)", ^^^^^^^^^^^^^^^^^^^^^^^^", File \"/app/autogpt_platform/autogpt_libs/autogpt_libs/utils/synchronize.py\", line 44, in acquire", lock.acquire()", File \"/usr/local/lib/python3.11/site-packages/redis/lock.py\", line 218, in acquire", mod_time.sleep(sleep)", File \"/app/autogpt_platform/backend/backend/executor/manager.py\", line 471, in <lambda>", signal.SIGTERM, lambda _, __: cls.on_node_executor_sigterm()", ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^", File \"/app/autogpt_platform/backend/backend/executor/manager.py\", line 498, in on_node_executor_sigterm", sys.exit(0)", SystemExit: 0", During handling of the above exception, another exception occurred:", Traceback (most recent call last):\n File \"/app/autogpt_platform/backend/backend/executor/manager.py\", line 539, in _on_node_execution\n for execution in execute_node(\n File \"/app/autogpt_platform/backend/backend/executor/manager.py\", line 175, in execute_node\n credentials, creds_lock = creds_manager.acquire(user_id, credentials_meta.id)", ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^", File \"/app/autogpt_platform/backend/backend/integrations/creds_manager.py\", line 114, in acquire", with self._locked(user_id, credentials_id, \"!time_sensitive\"):", File \"/usr/local/lib/python3.11/contextlib.py\", line 158, in __exit__", self.gen.throw(typ, value, traceback)", File \"/app/autogpt_platform/backend/backend/integrations/creds_manager.py\", line 147, in _locked", lock.release()", File \"/usr/local/lib/python3.11/site-packages/redis/lock.py\", line 254, in release", raise LockError(\"Cannot release an unlocked lock\", lock_name=self.name)", redis.exceptions.LockError: Cannot release an unlocked lock", ``` ### Changes 🏗️ ``` try: lock.acquire() ... finally: lock.release() ``` pattern can cause an error where the lock is already released due to timeout. The scope of the change is to manually check the lock status before releasing. ### Checklist 📋 #### For code changes: - [ ] I have clearly listed my changes in the PR description - [ ] I have made a test plan - [ ] I have tested my changes according to the test plan: <!-- Put your test plan here: --> - [ ] ... <details> <summary>Example test plan</summary> - [ ] Create from scratch and execute an agent with at least 3 blocks - [ ] Import an agent from file upload, and confirm it executes correctly - [ ] Upload agent to marketplace - [ ] Import an agent from marketplace and confirm it executes correctly - [ ] Edit an agent from monitor, and confirm it executes correctly </details> #### For configuration changes: - [ ] `.env.example` is updated or already compatible with my changes - [ ] `docker-compose.yml` is updated or already compatible with my changes - [ ] I have included a list of my configuration changes in the PR description (under **Changes**) <details> <summary>Examples of configuration changes</summary> - Changing ports - Adding new services that need to communicate with each other - Secrets or environment variable changes - New or infrastructure changes such as databases </details>pull/9159/head
parent
15af2f410b
commit
a646e60d2f
|
@ -31,7 +31,8 @@ class RedisKeyedMutex:
|
|||
try:
|
||||
yield
|
||||
finally:
|
||||
lock.release()
|
||||
if lock.locked():
|
||||
lock.release()
|
||||
|
||||
def acquire(self, key: Any) -> "RedisLock":
|
||||
"""Acquires and returns a lock with the given key"""
|
||||
|
@ -45,7 +46,7 @@ class RedisKeyedMutex:
|
|||
return lock
|
||||
|
||||
def release(self, key: Any):
|
||||
if lock := self.locks.get(key):
|
||||
if (lock := self.locks.get(key)) and lock.locked() and lock.owned():
|
||||
lock.release()
|
||||
|
||||
def release_all_locks(self):
|
||||
|
|
|
@ -947,7 +947,8 @@ def synchronized(key: str, timeout: int = 60):
|
|||
lock.acquire()
|
||||
yield
|
||||
finally:
|
||||
lock.release()
|
||||
if lock.locked():
|
||||
lock.release()
|
||||
|
||||
|
||||
def llprint(message: str):
|
||||
|
|
|
@ -92,7 +92,7 @@ class IntegrationCredentialsManager:
|
|||
|
||||
fresh_credentials = oauth_handler.refresh_tokens(credentials)
|
||||
self.store.update_creds(user_id, fresh_credentials)
|
||||
if _lock:
|
||||
if _lock and _lock.locked():
|
||||
_lock.release()
|
||||
|
||||
credentials = fresh_credentials
|
||||
|
@ -144,7 +144,8 @@ class IntegrationCredentialsManager:
|
|||
try:
|
||||
yield
|
||||
finally:
|
||||
lock.release()
|
||||
if lock.locked():
|
||||
lock.release()
|
||||
|
||||
def release_all_locks(self):
|
||||
"""Call this on process termination to ensure all locks are released"""
|
||||
|
|
Loading…
Reference in New Issue