Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 80 additions & 27 deletions packages/dbgpt-core/src/dbgpt/agent/core/base_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
import asyncio
import json
import logging
import re
import time
from concurrent.futures import Executor, ThreadPoolExecutor
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, final

from jinja2 import Template
from jinja2.sandbox import SandboxedEnvironment

from dbgpt._private.pydantic import ConfigDict, Field
from dbgpt.core import LLMClient, ModelMessageRoleType, PromptTemplate
Expand Down Expand Up @@ -39,6 +40,19 @@ class ConversableAgent(Role, Agent):

model_config = ConfigDict(arbitrary_types_allowed=True)

# Dangerous template patterns that could lead to code execution
_DANGEROUS_TEMPLATE_PATTERNS = [
r"\{\{.*__.*\}\}", # Double underscore methods
r"\{\{.*import.*\}\}", # Import statements
r"\{\{.*exec.*\}\}", # Exec calls
r"\{\{.*eval.*\}\}", # Eval calls
r"\{\{.*open.*\}\}", # File operations
r"\{\{.*subprocess.*\}\}", # Subprocess calls
r"\{\{.*os\..*\}\}", # OS module access
r"\{\{.*globals.*\}\}", # Globals access
r"\{\{.*\[.*\].*\}\}", # Bracket notation access
]

agent_context: Optional[AgentContext] = Field(None, description="Agent context")
actions: List[Action] = Field(default_factory=list)
resource: Optional[Resource] = Field(None, description="Resource")
Expand Down Expand Up @@ -883,6 +897,35 @@ async def generate_resource_variables(
"now_time": now_time,
}

def sanitize_template_params(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Sanitize template parameters to prevent injection attacks."""
if not params:
return params

return self._sanitize_dict(params)

def _sanitize_dict(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Sanitize dictionary values recursively."""
return {key: self._sanitize_value(value) for key, value in data.items()}

def _sanitize_value(self, value: Any) -> Any:
"""Sanitize a single value based on its type."""
if isinstance(value, str):
return self._sanitize_string(value)
elif isinstance(value, dict):
return self._sanitize_dict(value)
elif isinstance(value, list):
return [self._sanitize_value(item) for item in value]
return value

def _sanitize_string(self, text: str) -> str:
"""Check string for dangerous template injection patterns."""
for pattern in self._DANGEROUS_TEMPLATE_PATTERNS:
if re.search(pattern, text, re.IGNORECASE):
logger.warning(f"Blocked potential template injection: {text}")
return "[BLOCKED: Potential injection attempt]"
return text

def _excluded_models(
self,
all_models: List[str],
Expand Down Expand Up @@ -1053,33 +1096,43 @@ async def build_system_prompt(
context: Optional[Dict[str, Any]] = None,
is_retry_chat: bool = False,
):
"""Build system prompt."""
system_prompt = None
"""Build system prompt with security controls."""
if self.bind_prompt:
prompt_param = {}
if resource_vars:
prompt_param.update(resource_vars)
if context:
prompt_param.update(context)
if self.bind_prompt.template_format == "f-string":
system_prompt = self.bind_prompt.template.format(
**prompt_param,
)
elif self.bind_prompt.template_format == "jinja2":
system_prompt = Template(self.bind_prompt.template).render(prompt_param)
else:
logger.warning("Bind prompt template not exsit or format not support!")
if not system_prompt:
param: Dict = context if context else {}
system_prompt = await self.build_prompt(
question=question,
is_system=True,
most_recent_memories=most_recent_memories,
resource_vars=resource_vars,
is_retry_chat=is_retry_chat,
**param,
)
return system_prompt
return self._render_bind_prompt(resource_vars, context)

# Fallback to build_prompt with sanitized context
sanitized_context = self.sanitize_template_params(context or {})
return await self.build_prompt(
question=question,
is_system=True,
most_recent_memories=most_recent_memories,
resource_vars=resource_vars,
is_retry_chat=is_retry_chat,
**sanitized_context,
)

def _render_bind_prompt(
self,
resource_vars: Optional[Dict] = None,
context: Optional[Dict[str, Any]] = None,
) -> str:
"""Render bind prompt template with sanitized parameters."""
prompt_param = {}
if resource_vars:
prompt_param.update(resource_vars)
if context:
sanitized_context = self.sanitize_template_params(context)
prompt_param.update(sanitized_context)

if self.bind_prompt.template_format == "f-string":
return self.bind_prompt.template.format(**prompt_param)
elif self.bind_prompt.template_format == "jinja2":
env = SandboxedEnvironment()
template = env.from_string(self.bind_prompt.template)
return template.render(prompt_param)
else:
logger.warning("Bind prompt template format not supported!")
return ""

async def _load_thinking_messages(
self,
Expand Down
7 changes: 4 additions & 3 deletions packages/dbgpt-core/src/dbgpt/core/interface/prompt.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@


def _jinja2_formatter(template: str, **kwargs: Any) -> str:
"""Format a template using jinja2."""
"""Format a template using jinja2 with sandboxed environment for security."""
try:
from jinja2 import Template
from jinja2.sandbox import SandboxedEnvironment
except ImportError:
raise ImportError(
"jinja2 not installed, which is needed to use the jinja2_formatter. "
"Please install it with `pip install jinja2`."
)

return Template(template).render(**kwargs)
env = SandboxedEnvironment()
return env.from_string(template).render(**kwargs)


_DEFAULT_FORMATTER_MAPPING: Dict[str, Callable] = {
Expand Down