fix(backend): Make spend credit failure as part of block execution failure (#9340)
<img width="1427" alt="image" src="https://github.com/user-attachments/assets/de48ceb7-2a90-44e6-a687-d6759a1aa434" /> ### Changes 🏗️ Block execution that cost credit can fail, the scope of this fix is making this error as part of the node execution instead of a system error. ### Checklist 📋 #### For code changes: - [ ] I have clearly listed my changes in the PR description - [ ] I have made a test plan - [ ] I have tested my changes according to the test plan: <!-- Put your test plan here: --> - [ ] ... <details> <summary>Example test plan</summary> - [ ] Create from scratch and execute an agent with at least 3 blocks - [ ] Import an agent from file upload, and confirm it executes correctly - [ ] Upload agent to marketplace - [ ] Import an agent from marketplace and confirm it executes correctly - [ ] Edit an agent from monitor, and confirm it executes correctly </details> #### For configuration changes: - [ ] `.env.example` is updated or already compatible with my changes - [ ] `docker-compose.yml` is updated or already compatible with my changes - [ ] I have included a list of my configuration changes in the PR description (under **Changes**) <details> <summary>Examples of configuration changes</summary> - Changing ports - Adding new services that need to communicate with each other - Secrets or environment variable changes - New or infrastructure changes such as databases </details>pull/9285/head
parent
0811e8a990
commit
e0d153b284
|
@ -188,8 +188,6 @@ def execute_node(
|
|||
extra_exec_kwargs[field_name] = credentials
|
||||
|
||||
output_size = 0
|
||||
end_status = ExecutionStatus.COMPLETED
|
||||
|
||||
try:
|
||||
for output_name, output_data in node_block.execute(
|
||||
input_data, **extra_exec_kwargs
|
||||
|
@ -209,11 +207,21 @@ def execute_node(
|
|||
):
|
||||
yield execution
|
||||
|
||||
# Update execution status and spend credits
|
||||
res = update_execution(ExecutionStatus.COMPLETED)
|
||||
s = input_size + output_size
|
||||
t = (
|
||||
(res.end_time - res.start_time).total_seconds()
|
||||
if res.end_time and res.start_time
|
||||
else 0
|
||||
)
|
||||
data.data = input_data
|
||||
db_client.spend_credits(data, s, t)
|
||||
|
||||
except Exception as e:
|
||||
end_status = ExecutionStatus.FAILED
|
||||
error_msg = str(e)
|
||||
log_metadata.exception(f"Node execution failed with error {error_msg}")
|
||||
db_client.upsert_execution_output(node_exec_id, "error", error_msg)
|
||||
update_execution(ExecutionStatus.FAILED)
|
||||
|
||||
for execution in _enqueue_next_nodes(
|
||||
db_client=db_client,
|
||||
|
@ -235,18 +243,6 @@ def execute_node(
|
|||
except Exception as e:
|
||||
log_metadata.error(f"Failed to release credentials lock: {e}")
|
||||
|
||||
# Update execution status and spend credits
|
||||
res = update_execution(end_status)
|
||||
if end_status == ExecutionStatus.COMPLETED:
|
||||
s = input_size + output_size
|
||||
t = (
|
||||
(res.end_time - res.start_time).total_seconds()
|
||||
if res.end_time and res.start_time
|
||||
else 0
|
||||
)
|
||||
data.data = input_data
|
||||
db_client.spend_credits(data, s, t)
|
||||
|
||||
# Update execution stats
|
||||
if execution_stats is not None:
|
||||
execution_stats.update(node_block.execution_stats)
|
||||
|
@ -563,9 +559,15 @@ class Executor:
|
|||
q.add(execution)
|
||||
log_metadata.info(f"Finished node execution {node_exec.node_exec_id}")
|
||||
except Exception as e:
|
||||
log_metadata.exception(
|
||||
f"Failed node execution {node_exec.node_exec_id}: {e}"
|
||||
)
|
||||
# Avoid user error being marked as an actual error.
|
||||
if isinstance(e, ValueError):
|
||||
log_metadata.info(
|
||||
f"Failed node execution {node_exec.node_exec_id}: {e}"
|
||||
)
|
||||
else:
|
||||
log_metadata.exception(
|
||||
f"Failed node execution {node_exec.node_exec_id}: {e}"
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def on_graph_executor_start(cls):
|
||||
|
|
|
@ -62,7 +62,10 @@ def expose(func: C) -> C:
|
|||
return func(*args, **kwargs)
|
||||
except Exception as e:
|
||||
msg = f"Error in {func.__name__}: {e.__str__()}"
|
||||
logger.exception(msg)
|
||||
if isinstance(e, ValueError):
|
||||
logger.warning(msg)
|
||||
else:
|
||||
logger.exception(msg)
|
||||
raise
|
||||
|
||||
register_pydantic_serializers(func)
|
||||
|
|
Loading…
Reference in New Issue