feat(platform): Add multimedia file support & add basic Video blocks (#9320)

Currently, there is no support for passing files in the platform, each
generated file should be hosted somewhere.
This PR adds support of passing files temporarily during the execution
to open up more block that does multimedia operations.

<img width="583" alt="image"
src="https://github.com/user-attachments/assets/c285de5a-c2a9-41a0-9be1-305a316879d6"
/>

<img width="1291" alt="image"
src="https://github.com/user-attachments/assets/d7bcaf38-80fa-4b51-91da-b4eed80a02c1"
/>


### Changes 🏗️

* Add media support for passing files (local files, base64, URL) and
`FileStoreBlock` (file version of `StoreValueBlock`)
* Add initial multimedia blocks: `LoopVideoBlock` &
`AddAudioToVideoBlock`.

### Checklist 📋

#### For code changes:
- [ ] I have clearly listed my changes in the PR description
- [ ] I have made a test plan
- [ ] I have tested my changes according to the test plan:
  <!-- Put your test plan here: -->
  - [ ] ...

<details>
  <summary>Example test plan</summary>
  
  - [ ] Create from scratch and execute an agent with at least 3 blocks
- [ ] Import an agent from file upload, and confirm it executes
correctly
  - [ ] Upload agent to marketplace
- [ ] Import an agent from marketplace and confirm it executes correctly
  - [ ] Edit an agent from monitor, and confirm it executes correctly
</details>

#### For configuration changes:
- [ ] `.env.example` is updated or already compatible with my changes
- [ ] `docker-compose.yml` is updated or already compatible with my
changes
- [ ] I have included a list of my configuration changes in the PR
description (under **Changes**)

<details>
  <summary>Examples of configuration changes</summary>

  - Changing ports
  - Adding new services that need to communicate with each other
  - Secrets or environment variable changes
  - New or infrastructure changes such as databases
</details>

---------

Co-authored-by: Nicholas Tindle <nicholas.tindle@agpt.co>
pull/9113/head
Zamil Majdy 2025-01-26 17:08:05 +01:00 committed by GitHub
parent 996efaf4fc
commit bb3be444de
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 1697 additions and 914 deletions

View File

@ -3,6 +3,7 @@ from typing import Any, List
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema, BlockType
from backend.data.model import SchemaField
from backend.util.file import MediaFile, store_media_file
from backend.util.mock import MockObject
from backend.util.text import TextFormatter
from backend.util.type import convert
@ -10,6 +11,42 @@ from backend.util.type import convert
formatter = TextFormatter()
class FileStoreBlock(Block):
class Input(BlockSchema):
file_in: MediaFile = SchemaField(
description="The file to store in the temporary directory, it can be a URL, data URI, or local path."
)
class Output(BlockSchema):
file_out: MediaFile = SchemaField(
description="The relative path to the stored file in the temporary directory."
)
def __init__(self):
super().__init__(
id="cbb50872-625b-42f0-8203-a2ae78242d8a",
description="Stores the input file in the temporary directory.",
categories={BlockCategory.BASIC, BlockCategory.MULTIMEDIA},
input_schema=FileStoreBlock.Input,
output_schema=FileStoreBlock.Output,
static_output=True,
)
def run(
self,
input_data: Input,
*,
graph_exec_id: str,
**kwargs,
) -> BlockOutput:
file_path = store_media_file(
graph_exec_id=graph_exec_id,
file=input_data.file_in,
return_content=False,
)
yield "file_out", file_path
class StoreValueBlock(Block):
"""
This block allows you to provide a constant value as a block, in a stateless manner.

View File

@ -151,7 +151,7 @@ class IdeogramModelBlock(Block):
super().__init__(
id="6ab085e2-20b3-4055-bc3e-08036e01eca6",
description="This block runs Ideogram models with both simple and advanced settings.",
categories={BlockCategory.AI},
categories={BlockCategory.AI, BlockCategory.MULTIMEDIA},
input_schema=IdeogramModelBlock.Input,
output_schema=IdeogramModelBlock.Output,
test_input={

View File

@ -0,0 +1,245 @@
import os
import tempfile
from typing import Literal, Optional
from moviepy.audio.io.AudioFileClip import AudioFileClip
from moviepy.video.fx.Loop import Loop
from moviepy.video.io.VideoFileClip import VideoFileClip
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import SchemaField
from backend.util.file import MediaFile, get_exec_file_path, store_media_file
class MediaDurationBlock(Block):
class Input(BlockSchema):
media_in: MediaFile = SchemaField(
description="Media input (URL, data URI, or local path)."
)
is_video: bool = SchemaField(
description="Whether the media is a video (True) or audio (False).",
default=True,
)
class Output(BlockSchema):
duration: float = SchemaField(
description="Duration of the media file (in seconds)."
)
error: str = SchemaField(
description="Error message if something fails.", default=""
)
def __init__(self):
super().__init__(
id="d8b91fd4-da26-42d4-8ecb-8b196c6d84b6",
description="Block to get the duration of a media file.",
categories={BlockCategory.MULTIMEDIA},
input_schema=MediaDurationBlock.Input,
output_schema=MediaDurationBlock.Output,
)
def run(
self,
input_data: Input,
*,
graph_exec_id: str,
**kwargs,
) -> BlockOutput:
# 1) Store the input media locally
local_media_path = store_media_file(
graph_exec_id=graph_exec_id,
file=input_data.media_in,
return_content=False,
)
media_abspath = get_exec_file_path(graph_exec_id, local_media_path)
# 2) Load the clip
if input_data.is_video:
clip = VideoFileClip(media_abspath)
else:
clip = AudioFileClip(media_abspath)
yield "duration", clip.duration
class LoopVideoBlock(Block):
"""
Block for looping (repeating) a video clip until a given duration or number of loops.
"""
class Input(BlockSchema):
video_in: MediaFile = SchemaField(
description="The input video (can be a URL, data URI, or local path)."
)
# Provide EITHER a `duration` or `n_loops` or both. We'll demonstrate `duration`.
duration: Optional[float] = SchemaField(
description="Target duration (in seconds) to loop the video to. If omitted, defaults to no looping.",
default=None,
ge=0.0,
)
n_loops: Optional[int] = SchemaField(
description="Number of times to repeat the video. If omitted, defaults to 1 (no repeat).",
default=None,
ge=1,
)
output_return_type: Literal["file_path", "data_uri"] = SchemaField(
description="How to return the output video. Either a relative path or base64 data URI.",
default="file_path",
)
class Output(BlockSchema):
video_out: str = SchemaField(
description="Looped video returned either as a relative path or a data URI."
)
error: str = SchemaField(
description="Error message if something fails.", default=""
)
def __init__(self):
super().__init__(
id="8bf9eef6-5451-4213-b265-25306446e94b",
description="Block to loop a video to a given duration or number of repeats.",
categories={BlockCategory.MULTIMEDIA},
input_schema=LoopVideoBlock.Input,
output_schema=LoopVideoBlock.Output,
)
def run(
self,
input_data: Input,
*,
node_exec_id: str,
graph_exec_id: str,
**kwargs,
) -> BlockOutput:
# 1) Store the input video locally
local_video_path = store_media_file(
graph_exec_id=graph_exec_id,
file=input_data.video_in,
return_content=False,
)
input_abspath = get_exec_file_path(graph_exec_id, local_video_path)
# 2) Load the clip
clip = VideoFileClip(input_abspath)
# 3) Apply the loop effect
looped_clip = clip
if input_data.duration:
# Loop until we reach the specified duration
looped_clip = looped_clip.with_effects([Loop(duration=input_data.duration)])
elif input_data.n_loops:
looped_clip = looped_clip.with_effects([Loop(n=input_data.n_loops)])
else:
raise ValueError("Either 'duration' or 'n_loops' must be provided.")
assert isinstance(looped_clip, VideoFileClip)
# 4) Save the looped output
output_filename = MediaFile(
f"{node_exec_id}_looped_{os.path.basename(local_video_path)}"
)
output_abspath = get_exec_file_path(graph_exec_id, output_filename)
looped_clip = looped_clip.with_audio(clip.audio)
looped_clip.write_videofile(output_abspath, codec="libx264", audio_codec="aac")
# Return as data URI
video_out = store_media_file(
graph_exec_id=graph_exec_id,
file=output_filename,
return_content=input_data.output_return_type == "data_uri",
)
yield "video_out", video_out
class AddAudioToVideoBlock(Block):
"""
Block that adds (attaches) an audio track to an existing video.
Optionally scale the volume of the new track.
"""
class Input(BlockSchema):
video_in: MediaFile = SchemaField(
description="Video input (URL, data URI, or local path)."
)
audio_in: MediaFile = SchemaField(
description="Audio input (URL, data URI, or local path)."
)
volume: float = SchemaField(
description="Volume scale for the newly attached audio track (1.0 = original).",
default=1.0,
)
output_return_type: Literal["file_path", "data_uri"] = SchemaField(
description="Return the final output as a relative path or base64 data URI.",
default="file_path",
)
class Output(BlockSchema):
video_out: MediaFile = SchemaField(
description="Final video (with attached audio), as a path or data URI."
)
error: str = SchemaField(
description="Error message if something fails.", default=""
)
def __init__(self):
super().__init__(
id="3503748d-62b6-4425-91d6-725b064af509",
description="Block to attach an audio file to a video file using moviepy.",
categories={BlockCategory.MULTIMEDIA},
input_schema=AddAudioToVideoBlock.Input,
output_schema=AddAudioToVideoBlock.Output,
)
def run(
self,
input_data: Input,
*,
node_exec_id: str,
graph_exec_id: str,
**kwargs,
) -> BlockOutput:
# 1) Store the inputs locally
local_video_path = store_media_file(
graph_exec_id=graph_exec_id,
file=input_data.video_in,
return_content=False,
)
local_audio_path = store_media_file(
graph_exec_id=graph_exec_id,
file=input_data.audio_in,
return_content=False,
)
abs_temp_dir = os.path.join(tempfile.gettempdir(), "exec_file", graph_exec_id)
video_abspath = os.path.join(abs_temp_dir, local_video_path)
audio_abspath = os.path.join(abs_temp_dir, local_audio_path)
# 2) Load video + audio with moviepy
video_clip = VideoFileClip(video_abspath)
audio_clip = AudioFileClip(audio_abspath)
# Optionally scale volume
if input_data.volume != 1.0:
audio_clip = audio_clip.with_volume_scaled(input_data.volume)
# 3) Attach the new audio track
final_clip = video_clip.with_audio(audio_clip)
# 4) Write to output file
output_filename = MediaFile(
f"{node_exec_id}_audio_attached_{os.path.basename(local_video_path)}"
)
output_abspath = os.path.join(abs_temp_dir, output_filename)
final_clip.write_videofile(output_abspath, codec="libx264", audio_codec="aac")
# 5) Return either path or data URI
video_out = store_media_file(
graph_exec_id=graph_exec_id,
file=output_filename,
return_content=input_data.output_return_type == "data_uri",
)
yield "video_out", video_out

View File

@ -131,7 +131,7 @@ class ReplicateFluxAdvancedModelBlock(Block):
super().__init__(
id="90f8c45e-e983-4644-aa0b-b4ebe2f531bc",
description="This block runs Flux models on Replicate with advanced settings.",
categories={BlockCategory.AI},
categories={BlockCategory.AI, BlockCategory.MULTIMEDIA},
input_schema=ReplicateFluxAdvancedModelBlock.Input,
output_schema=ReplicateFluxAdvancedModelBlock.Output,
test_input={

View File

@ -78,7 +78,7 @@ class CreateTalkingAvatarVideoBlock(Block):
super().__init__(
id="98c6f503-8c47-4b1c-a96d-351fc7c87dab",
description="This block integrates with D-ID to create video clips and retrieve their URLs.",
categories={BlockCategory.AI},
categories={BlockCategory.AI, BlockCategory.MULTIMEDIA},
input_schema=CreateTalkingAvatarVideoBlock.Input,
output_schema=CreateTalkingAvatarVideoBlock.Output,
test_input={

View File

@ -53,7 +53,7 @@ class UnrealTextToSpeechBlock(Block):
super().__init__(
id="4ff1ff6d-cc40-4caa-ae69-011daa20c378",
description="Converts text to speech using the Unreal Speech API",
categories={BlockCategory.AI, BlockCategory.TEXT},
categories={BlockCategory.AI, BlockCategory.TEXT, BlockCategory.MULTIMEDIA},
input_schema=UnrealTextToSpeechBlock.Input,
output_schema=UnrealTextToSpeechBlock.Output,
test_input={

View File

@ -66,6 +66,7 @@ class BlockCategory(Enum):
)
PRODUCTIVITY = "Block that helps with productivity"
ISSUE_TRACKING = "Block that helps with issue tracking"
MULTIMEDIA = "Block that interacts with multimedia content"
def dict(self) -> dict[str, str]:
return {"category": self.name, "description": self.value}

View File

@ -40,6 +40,7 @@ from backend.data.graph import GraphModel, Link, Node
from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.util import json
from backend.util.decorator import error_logged, time_measured
from backend.util.file import clean_exec_files
from backend.util.logging import configure_logging
from backend.util.process import set_service_name
from backend.util.service import (
@ -169,7 +170,12 @@ def execute_node(
log_metadata.info("Executed node with input", input=input_data_str)
update_execution(ExecutionStatus.RUNNING)
extra_exec_kwargs = {}
extra_exec_kwargs: dict = {
"graph_id": graph_id,
"node_id": node_id,
"graph_exec_id": graph_exec_id,
"node_exec_id": node_exec_id,
}
# Last-minute fetch credentials + acquire a system-wide read-write lock to prevent
# changes during execution. ⚠️ This means a set of credentials can only be used by
# one (running) block at a time; simultaneous execution of blocks using same
@ -729,6 +735,7 @@ class Executor:
finished = True
cancel.set()
cancel_thread.join()
clean_exec_files(graph_exec.graph_exec_id)
return (
exec_stats,

View File

@ -0,0 +1,143 @@
import base64
import mimetypes
import re
import shutil
import tempfile
import uuid
from pathlib import Path
from urllib.parse import urlparse
# This "requests" presumably has additional checks against internal networks for SSRF.
from backend.util.request import requests
TEMP_DIR = Path(tempfile.gettempdir()).resolve()
def get_exec_file_path(graph_exec_id: str, path: str) -> str:
"""
Utility to build an absolute path in the {temp}/exec_file/{exec_id}/... folder.
"""
return str(TEMP_DIR / "exec_file" / graph_exec_id / path)
def clean_exec_files(graph_exec_id: str, file: str = "") -> None:
"""
Utility to remove the {temp}/exec_file/{exec_id} folder and its contents.
"""
exec_path = Path(get_exec_file_path(graph_exec_id, file))
if exec_path.exists() and exec_path.is_dir():
shutil.rmtree(exec_path)
"""
MediaFile is a string that represents a file. It can be one of the following:
- Data URI: base64 encoded media file. See https://developer.mozilla.org/en-US/docs/Web/URI/Schemes/data/
- URL: Media file hosted on the internet, it starts with http:// or https://.
- Local path (anything else): A temporary file path living within graph execution time.
Note: Replace this type alias into a proper class, when more information is needed.
"""
MediaFile = str
def store_media_file(
graph_exec_id: str, file: MediaFile, return_content: bool = False
) -> MediaFile:
"""
Safely handle 'file' (a data URI, a URL, or a local path relative to {temp}/exec_file/{exec_id}),
placing or verifying it under:
{tempdir}/exec_file/{exec_id}/...
If 'return_content=True', return a data URI (data:<mime>;base64,<content>).
Otherwise, returns the file media path relative to the exec_id folder.
For each MediaFile type:
- Data URI:
-> decode and store in a new random file in that folder
- URL:
-> download and store in that folder
- Local path:
-> interpret as relative to that folder; verify it exists
(no copying, as it's presumably already there).
We realpath-check so no symlink or '..' can escape the folder.
:param graph_exec_id: The unique ID of the graph execution.
:param file: Data URI, URL, or local (relative) path.
:param return_content: If True, return a data URI of the file content.
If False, return the *relative* path inside the exec_id folder.
:return: The requested result: data URI or relative path of the media.
"""
# Build base path
base_path = Path(get_exec_file_path(graph_exec_id, ""))
base_path.mkdir(parents=True, exist_ok=True)
# Helper functions
def _extension_from_mime(mime: str) -> str:
ext = mimetypes.guess_extension(mime, strict=False)
return ext if ext else ".bin"
def _file_to_data_uri(path: Path) -> str:
mime_type, _ = mimetypes.guess_type(path)
mime_type = mime_type or "application/octet-stream"
b64 = base64.b64encode(path.read_bytes()).decode("utf-8")
return f"data:{mime_type};base64,{b64}"
def _ensure_inside_base(path_candidate: Path, base: Path) -> Path:
"""
Resolve symlinks via resolve() and ensure the result is still under base.
"""
real_candidate = path_candidate.resolve()
real_base = base.resolve()
if not real_candidate.is_relative_to(real_base):
raise ValueError(
"Local file path is outside the temp_base directory. Access denied."
)
return real_candidate
def _strip_base_prefix(absolute_path: Path, base: Path) -> str:
"""
Strip base prefix and normalize path.
"""
return str(absolute_path.relative_to(base))
# Process file
if file.startswith("data:"):
# Data URI
match = re.match(r"^data:([^;]+);base64,(.*)$", file, re.DOTALL)
if not match:
raise ValueError(
"Invalid data URI format. Expected data:<mime>;base64,<data>"
)
mime_type = match.group(1).strip().lower()
b64_content = match.group(2).strip()
# Generate filename and decode
extension = _extension_from_mime(mime_type)
filename = f"{uuid.uuid4()}{extension}"
target_path = _ensure_inside_base(base_path / filename, base_path)
target_path.write_bytes(base64.b64decode(b64_content))
elif file.startswith(("http://", "https://")):
# URL
parsed_url = urlparse(file)
filename = Path(parsed_url.path).name or f"{uuid.uuid4()}"
target_path = _ensure_inside_base(base_path / filename, base_path)
# Download and save
resp = requests.get(file)
resp.raise_for_status()
target_path.write_bytes(resp.content)
else:
# Local path
target_path = _ensure_inside_base(base_path / file, base_path)
if not target_path.is_file():
raise ValueError(f"Local file does not exist: {target_path}")
# Return result
if return_content:
return MediaFile(_file_to_data_uri(target_path))
else:
return MediaFile(_strip_base_prefix(target_path, base_path))

File diff suppressed because it is too large Load Diff

View File

@ -55,6 +55,7 @@ sqlalchemy = "^2.0.36"
psycopg2-binary = "^2.9.10"
google-cloud-storage = "^2.18.2"
launchdarkly-server-sdk = "^9.8.0"
moviepy = "^2.1.2"
[tool.poetry.group.dev.dependencies]
poethepoet = "^0.32.1"

View File

@ -11,9 +11,17 @@ const getYouTubeVideoId = (url: string) => {
};
const isValidVideoUrl = (url: string): boolean => {
if (url.startsWith("data:video")) {
return true;
}
const validUrl = /^(https?:\/\/)(www\.)?/i;
const videoExtensions = /\.(mp4|webm|ogg)$/i;
const youtubeRegex = /^(https?:\/\/)?(www\.)?(youtube\.com|youtu\.?be)\/.+$/;
return videoExtensions.test(url) || youtubeRegex.test(url);
const cleanedUrl = url.split("?")[0];
return (
(validUrl.test(cleanedUrl) && videoExtensions.test(cleanedUrl)) ||
youtubeRegex.test(cleanedUrl)
);
};
const isValidImageUrl = (url: string): boolean => {
@ -26,9 +34,13 @@ const isValidImageUrl = (url: string): boolean => {
};
const isValidAudioUrl = (url: string): boolean => {
if (url.startsWith("data:audio")) {
return true;
}
const validUrl = /^(https?:\/\/)(www\.)?/i;
const audioExtensions = /\.(mp3|wav)$/i;
const cleanedUrl = url.split("?")[0];
return audioExtensions.test(cleanedUrl);
return validUrl.test(cleanedUrl) && audioExtensions.test(cleanedUrl);
};
const VideoRenderer: React.FC<{ videoUrl: string }> = ({ videoUrl }) => {

View File

@ -142,49 +142,52 @@ export default function useAgentGraph(
setAgentDescription(graph.description);
setNodes(() => {
const newNodes = graph.nodes.map((node) => {
const block = availableNodes.find(
(block) => block.id === node.block_id,
)!;
const flow =
block.uiType == BlockUIType.AGENT
? availableFlows.find(
(flow) => flow.id === node.input_default.graph_id,
)
: null;
const newNode: CustomNode = {
id: node.id,
type: "custom",
position: {
x: node?.metadata?.position?.x || 0,
y: node?.metadata?.position?.y || 0,
},
data: {
block_id: block.id,
blockType: flow?.name || block.name,
blockCosts: block.costs,
categories: block.categories,
description: block.description,
title: `${block.name} ${node.id}`,
inputSchema: block.inputSchema,
outputSchema: block.outputSchema,
hardcodedValues: node.input_default,
webhook: node.webhook,
uiType: block.uiType,
connections: graph.links
.filter((l) => [l.source_id, l.sink_id].includes(node.id))
.map((link) => ({
edge_id: formatEdgeID(link),
source: link.source_id,
sourceHandle: link.source_name,
target: link.sink_id,
targetHandle: link.sink_name,
})),
isOutputOpen: false,
},
};
return newNode;
});
const newNodes = graph.nodes
.map((node) => {
const block = availableNodes.find(
(block) => block.id === node.block_id,
)!;
if (!block) return null;
const flow =
block.uiType == BlockUIType.AGENT
? availableFlows.find(
(flow) => flow.id === node.input_default.graph_id,
)
: null;
const newNode: CustomNode = {
id: node.id,
type: "custom",
position: {
x: node?.metadata?.position?.x || 0,
y: node?.metadata?.position?.y || 0,
},
data: {
block_id: block.id,
blockType: flow?.name || block.name,
blockCosts: block.costs,
categories: block.categories,
description: block.description,
title: `${block.name} ${node.id}`,
inputSchema: block.inputSchema,
outputSchema: block.outputSchema,
hardcodedValues: node.input_default,
webhook: node.webhook,
uiType: block.uiType,
connections: graph.links
.filter((l) => [l.source_id, l.sink_id].includes(node.id))
.map((link) => ({
edge_id: formatEdgeID(link),
source: link.source_id,
sourceHandle: link.source_name,
target: link.sink_id,
targetHandle: link.sink_name,
})),
isOutputOpen: false,
},
};
return newNode;
})
.filter((node) => node !== null);
setEdges((_) =>
graph.links.map((link) => ({
id: formatEdgeID(link),