fix(agent/security): Mitigate shell injection vulnerabilities (#6903)
* Mitigate shell injection in `MacOSTTS._speech` implementation * Mitigate shell command control bypassing in `execute_shell` and `execute_shell_popen` commands - Improve implementation and docstring of `validate_command` function --------- Co-authored-by: Reinier van der Leer <pwuts@agpt.co>pull/6584/merge
parent
1f1e8c9f7d
commit
5090f55eba
|
@ -2,6 +2,7 @@
|
|||
|
||||
import logging
|
||||
import os
|
||||
import shlex
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from tempfile import NamedTemporaryFile
|
||||
|
@ -224,25 +225,31 @@ def execute_python_file(
|
|||
raise CommandExecutionError(f"Could not run the script in a container: {e}")
|
||||
|
||||
|
||||
def validate_command(command: str, config: Config) -> bool:
|
||||
"""Validate a command to ensure it is allowed
|
||||
def validate_command(command_line: str, config: Config) -> tuple[bool, bool]:
|
||||
"""Check whether a command is allowed and whether it may be executed in a shell.
|
||||
|
||||
If shell command control is enabled, we disallow executing in a shell, because
|
||||
otherwise the model could easily circumvent the command filter using shell features.
|
||||
|
||||
Args:
|
||||
command (str): The command to validate
|
||||
config (Config): The config to use to validate the command
|
||||
command_line (str): The command line to validate
|
||||
config (Config): The application config including shell command control settings
|
||||
|
||||
Returns:
|
||||
bool: True if the command is allowed, False otherwise
|
||||
bool: True if the command may be executed in a shell, False otherwise
|
||||
"""
|
||||
if not command:
|
||||
return False
|
||||
if not command_line:
|
||||
return False, False
|
||||
|
||||
command_name = command.split()[0]
|
||||
command_name = shlex.split(command_line)[0]
|
||||
|
||||
if config.shell_command_control == ALLOWLIST_CONTROL:
|
||||
return command_name in config.shell_allowlist
|
||||
return command_name in config.shell_allowlist, False
|
||||
elif config.shell_command_control == DENYLIST_CONTROL:
|
||||
return command_name not in config.shell_denylist, False
|
||||
else:
|
||||
return command_name not in config.shell_denylist
|
||||
return True, True
|
||||
|
||||
|
||||
@command(
|
||||
|
@ -269,7 +276,8 @@ def execute_shell(command_line: str, agent: Agent) -> str:
|
|||
Returns:
|
||||
str: The output of the command
|
||||
"""
|
||||
if not validate_command(command_line, agent.legacy_config):
|
||||
allow_execute, allow_shell = validate_command(command_line, agent.legacy_config)
|
||||
if not allow_execute:
|
||||
logger.info(f"Command '{command_line}' not allowed")
|
||||
raise OperationNotAllowedError("This shell command is not allowed.")
|
||||
|
||||
|
@ -282,7 +290,11 @@ def execute_shell(command_line: str, agent: Agent) -> str:
|
|||
f"Executing command '{command_line}' in working directory '{os.getcwd()}'"
|
||||
)
|
||||
|
||||
result = subprocess.run(command_line, capture_output=True, shell=True)
|
||||
result = subprocess.run(
|
||||
command_line if allow_shell else shlex.split(command_line),
|
||||
capture_output=True,
|
||||
shell=allow_shell,
|
||||
)
|
||||
output = f"STDOUT:\n{result.stdout.decode()}\nSTDERR:\n{result.stderr.decode()}"
|
||||
|
||||
# Change back to whatever the prior working dir was
|
||||
|
@ -316,7 +328,8 @@ def execute_shell_popen(command_line: str, agent: Agent) -> str:
|
|||
Returns:
|
||||
str: Description of the fact that the process started and its id
|
||||
"""
|
||||
if not validate_command(command_line, agent.legacy_config):
|
||||
allow_execute, allow_shell = validate_command(command_line, agent.legacy_config)
|
||||
if not allow_execute:
|
||||
logger.info(f"Command '{command_line}' not allowed")
|
||||
raise OperationNotAllowedError("This shell command is not allowed.")
|
||||
|
||||
|
@ -331,7 +344,10 @@ def execute_shell_popen(command_line: str, agent: Agent) -> str:
|
|||
|
||||
do_not_show_output = subprocess.DEVNULL
|
||||
process = subprocess.Popen(
|
||||
command_line, shell=True, stdout=do_not_show_output, stderr=do_not_show_output
|
||||
command_line if allow_shell else shlex.split(command_line),
|
||||
shell=allow_shell,
|
||||
stdout=do_not_show_output,
|
||||
stderr=do_not_show_output,
|
||||
)
|
||||
|
||||
# Change back to whatever the prior working dir was
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
""" MacOS TTS Voice. """
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
|
||||
from autogpt.speech.base import VoiceBase
|
||||
|
||||
|
@ -15,9 +15,9 @@ class MacOSTTS(VoiceBase):
|
|||
def _speech(self, text: str, voice_index: int = 0) -> bool:
|
||||
"""Play the given text."""
|
||||
if voice_index == 0:
|
||||
os.system(f'say "{text}"')
|
||||
subprocess.run(["say", text], shell=False)
|
||||
elif voice_index == 1:
|
||||
os.system(f'say -v "Ava (Premium)" "{text}"')
|
||||
subprocess.run(["say", "-v", "Ava (Premium)", text], shell=False)
|
||||
else:
|
||||
os.system(f'say -v Samantha "{text}"')
|
||||
subprocess.run(["say", "-v", "Samantha", text], shell=False)
|
||||
return True
|
||||
|
|
Loading…
Reference in New Issue