Skip to main content

🛠️ Development

Writing A Custom Toolkit

Toolkits are defined in a single Python file, with a top level docstring with metadata and a Tools class.

Example Top-Level Docstring

"""
title: String Inverse
author: Your Name
author_url: https://website.com
git_url: https://github.com/username/string-reverse.git
description: This tool calculates the inverse of a string
required_open_webui_version: 0.4.0
requirements: langchain-openai, langgraph, ollama, langchain_ollama
version: 0.4.0
licence: MIT
"""

Tools Class

Tools have to be defined as methods within a class called Tools, with optional subclasses called Valves and UserValves, for example:

class Tools:
def __init__(self):
"""Initialize the Tool."""
self.valves = self.Valves()

class Valves(BaseModel):
api_key: str = Field("", description="Your API key here")

def reverse_string(self, string: str) -> str:
"""
Reverses the input string.
:param string: The string to reverse
"""
# example usage of valves
if self.valves.api_key != "42":
return "Wrong API key"
return string[::-1]

Type Hints

Each tool must have type hints for arguments. The types may also be nested, such as queries_and_docs: list[tuple[str, int]]. Those type hints are used to generate the JSON schema that is sent to the model. Tools without type hints will work with a lot less consistency.

Valves and UserValves - (optional, but HIGHLY encouraged)

Valves and UserValves are used for specifying customizable settings of the Tool, you can read more on the dedicated Valves & UserValves page.

Optional Arguments

Below is a list of optional arguments your tools can depend on:

  • __event_emitter__: Emit events (see following section)
  • __event_call__: Same as event emitter but can be used for user interactions
  • __user__: A dictionary with user information. It also contains the UserValves object in __user__["valves"].
  • __metadata__: Dictionary with chat metadata
  • __messages__: List of previous messages
  • __files__: Attached files
  • __model__: A dictionary with model information
  • __oauth_token__: A dictionary containing the user's valid, automatically refreshed OAuth token payload. This is the new, recommended, and secure way to access user tokens for making authenticated API calls. The dictionary typically contains access_token, id_token, and other provider-specific data.

For more information about __oauth_token__ and how to configure this token to be sent to tools, check out the OAuth section in the environment variable docs page and the SSO documentation.

Just add them as argument to any method of your Tool class just like __user__ in the example above.

Using the OAuth Token in a Tool

When building tools that need to interact with external APIs on the user's behalf, you can now directly access their OAuth token. This removes the need for fragile cookie scraping and ensures the token is always valid.

Example: A tool that calls an external API using the user's access token.

import httpx
from typing import Optional

class Tools:
# ... other class setup ...

async def get_user_profile_from_external_api(self, __oauth_token__: Optional[dict] = None) -> str:
"""
Fetches user profile data from a secure external API using their OAuth access token.

:param __oauth_token__: Injected by Open WebUI, contains the user's token data.
"""
if not __oauth_token__ or "access_token" not in __oauth_token__:
return "Error: User is not authenticated via OAuth or token is unavailable."

access_token = __oauth_token__["access_token"]

headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}

try:
async with httpx.AsyncClient() as client:
response = await client.get("https://api.my-service.com/v1/profile", headers=headers)
response.raise_for_status() # Raise an exception for bad status codes
return f"API Response: {response.json()}"
except httpx.HTTPStatusError as e:
return f"Error: Failed to fetch data from API. Status: {e.response.status_code}"
except Exception as e:
return f"An unexpected error occurred: {e}"

Event Emitters

Event Emitters are used to add additional information to the chat interface. Similarly to Filter Outlets, Event Emitters are capable of appending content to the chat. Unlike Filter Outlets, they are not capable of stripping information. Additionally, emitters can be activated at any stage during the Tool.

⚠️ CRITICAL: Function Calling Mode Compatibility

Event Emitter behavior is significantly different depending on your function calling mode. The function calling mode is controlled by the function_calling parameter:

  • Default Mode: Uses traditional function calling approach with wider model compatibility
  • Native Mode: Leverages model's built-in tool-calling capabilities for reduced latency

Before using event emitters, you must understand these critical limitations:

  • Default Mode (function_calling = "default"): Full event emitter support with all event types working as expected
  • Native Mode (function_calling = "native"): Limited event emitter support - many event types don't work properly due to native function calling bypassing Open WebUI's custom tool processing pipeline

When to Use Each Mode:

  • Use Default Mode when you need full event emitter functionality, complex tool interactions, or real-time UI updates
  • Use Native Mode when you need reduced latency and basic tool calling without complex UI interactions

Function Calling Mode Configuration

You can configure the function calling mode in two places:

  1. Model Settings: Go to Model page → Advanced Params → Function Calling (set to "Default" or "Native")
  2. Per-request basis: Set params.function_calling = "native" or "default" in your request

If the model seems to be unable to call the tool, make sure it is enabled (either via the Model page or via the + sign next to the chat input field).

Complete Event Type Compatibility Matrix

Here's the comprehensive breakdown of how each event type behaves across function calling modes:

Event TypeDefault Mode FunctionalityNative Mode FunctionalityStatus
status✅ Full support - Updates status history during tool executionIdentical - Tracks function execution statusCOMPATIBLE
message✅ Full support - Appends incremental content during streamingBROKEN - Gets overwritten by native completion snapshotsINCOMPATIBLE
chat:completion✅ Full support - Handles streaming responses and completion data⚠️ LIMITED - Carries function results but may overwrite tool updatesPARTIALLY COMPATIBLE
chat:message:delta✅ Full support - Streams delta content during executionBROKEN - Content gets replaced by native function snapshotsINCOMPATIBLE
chat:message✅ Full support - Replaces entire message content cleanlyBROKEN - Gets overwritten by subsequent native completionsINCOMPATIBLE
replace✅ Full support - Replaces content with precise controlBROKEN - Replaced content gets overwritten immediatelyINCOMPATIBLE
chat:message:files / files✅ Full support - Handles file attachments in messagesIdentical - Processes files from function outputsCOMPATIBLE
chat:message:error✅ Full support - Displays error notificationsIdentical - Shows function call errorsCOMPATIBLE
chat:message:follow_ups✅ Full support - Shows follow-up suggestionsIdentical - Displays function-generated follow-upsCOMPATIBLE
chat:title✅ Full support - Updates chat title dynamicallyIdentical - Updates title based on function interactionsCOMPATIBLE
chat:tags✅ Full support - Modifies chat tagsIdentical - Manages tags from function outputsCOMPATIBLE
chat:tasks:cancel✅ Full support - Cancels ongoing tasksIdentical - Cancels native function executionsCOMPATIBLE
citation / source✅ Full support - Handles citations with full metadataIdentical - Processes function-generated citationsCOMPATIBLE
notification✅ Full support - Shows toast notificationsIdentical - Displays function execution notificationsCOMPATIBLE
confirmation✅ Full support - Requests user confirmationsIdentical - Confirms function executionsCOMPATIBLE
execute✅ Full support - Executes code dynamicallyIdentical - Runs function-generated codeCOMPATIBLE
input✅ Full support - Requests user input with full UIIdentical - Collects input for functionsCOMPATIBLE

Why Native Mode Breaks Certain Event Types

In Native Mode, the server constructs content blocks from streaming model output and repeatedly emits "chat:completion" events with full serialized content snapshots. The client treats these snapshots as authoritative and completely replaces message content, effectively overwriting any prior tool-emitted updates like message, chat:message, or replace events.

Technical Details:

  • middleware.py adds tools directly to form data for native model handling
  • Streaming handler emits repeated content snapshots via chat:completion events
  • Client's chatCompletionEventHandler treats snapshots as complete replacements: message.content = content
  • This causes tool-emitted content updates to flicker and disappear

Best Practices and Recommendations

For Tools Requiring Real-time UI Updates:

class Tools:
def __init__(self):
# Add a note about function calling mode requirements
self.description = "This tool requires Default function calling mode for full functionality"

async def interactive_tool(self, prompt: str, __event_emitter__=None) -> str:
"""
⚠️ This tool requires function_calling = "default" for proper event emission
"""
if not __event_emitter__:
return "Event emitter not available - ensure Default function calling mode is enabled"

# Safe to use message events in Default mode
await __event_emitter__({
"type": "message",
"data": {"content": "Processing step 1..."}
})
# ... rest of tool logic

For Tools That Must Work in Both Modes:

async def universal_tool(self, prompt: str, __event_emitter__=None, __metadata__=None) -> str:
"""
Tool designed to work in both Default and Native function calling modes
"""
# Check if we're in native mode (this is a rough heuristic)
is_native_mode = __metadata__ and __metadata__.get("params", {}).get("function_calling") == "native"

if __event_emitter__:
if is_native_mode:
# Use only compatible event types in native mode
await __event_emitter__({
"type": "status",
"data": {"description": "Processing in native mode...", "done": False}
})
else:
# Full event functionality in default mode
await __event_emitter__({
"type": "message",
"data": {"content": "Processing with full event support..."}
})

# ... tool logic here

if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": "Completed successfully", "done": True}
})

return "Tool execution completed"

Troubleshooting Event Emitter Issues

Symptoms of Native Mode Conflicts:

  • Tool-emitted messages appear briefly then disappear
  • Content flickers during tool execution
  • message or replace events seem to be ignored
  • Status updates work but content updates don't persist

Solutions:

  1. Switch to Default Mode: Change function_calling from "native" to "default" in model settings
  2. Use Compatible Event Types: Stick to status, citation, notification, and other compatible event types in native mode
  3. Implement Mode Detection: Add logic to detect function calling mode and adjust event usage accordingly
  4. Consider Hybrid Approaches: Use compatible events for core functionality and degrade gracefully

Debugging Your Event Emitters:

async def debug_events_tool(self, __event_emitter__=None, __metadata__=None) -> str:
"""Debug tool to test event emitter functionality"""

if not __event_emitter__:
return "No event emitter available"

# Test various event types
test_events = [
{"type": "status", "data": {"description": "Testing status events", "done": False}},
{"type": "message", "data": {"content": "Testing message events (may not work in native mode)"}},
{"type": "notification", "data": {"content": "Testing notification events"}},
]

mode_info = "Unknown"
if __metadata__:
mode_info = __metadata__.get("params", {}).get("function_calling", "default")

await __event_emitter__({
"type": "status",
"data": {"description": f"Function calling mode: {mode_info}", "done": False}
})

for i, event in enumerate(test_events):
await asyncio.sleep(1) # Space out events
await __event_emitter__(event)
await __event_emitter__({
"type": "status",
"data": {"description": f"Sent event {i+1}/{len(test_events)}", "done": False}
})

await __event_emitter__({
"type": "status",
"data": {"description": "Event testing complete", "done": True}
})

return f"Event testing completed in {mode_info} mode. Check for missing or flickering content."

There are several specific event types with different behaviors:

Status Events ✅ FULLY COMPATIBLE

Status events work identically in both Default and Native function calling modes. This is the most reliable event type for providing real-time feedback during tool execution.

Status events add live status updates to a message while it's performing steps. These can be emitted at any stage during tool execution. Status messages appear right above the message content and are essential for tools that delay the LLM response or process large amounts of information.

Basic Status Event Structure:

await __event_emitter__({
"type": "status",
"data": {
"description": "Message that shows up in the chat",
"done": False, # False = still processing, True = completed
"hidden": False # False = visible, True = auto-hide when done
}
})

Status Event Parameters:

  • description: The status message text shown to users
  • done: Boolean indicating if this status represents completion
  • hidden: Boolean to auto-hide the status once done: True is set
Basic Status Example
async def data_processing_tool(
self, data_file: str, __user__: dict, __event_emitter__=None
) -> str:
"""
Processes a large data file with status updates
✅ Works in both Default and Native function calling modes
"""

if not __event_emitter__:
return "Processing completed (no status updates available)"

# Step 1: Loading
await __event_emitter__({
"type": "status",
"data": {"description": "Loading data file...", "done": False}
})

# Simulate loading time
await asyncio.sleep(2)

# Step 2: Processing
await __event_emitter__({
"type": "status",
"data": {"description": "Analyzing 10,000 records...", "done": False}
})

# Simulate processing time
await asyncio.sleep(3)

# Step 3: Completion
await __event_emitter__({
"type": "status",
"data": {"description": "Analysis complete!", "done": True, "hidden": False}
})

return "Data analysis completed successfully. Found 23 anomalies."
Advanced Status with Error Handling
async def api_integration_tool(
self, endpoint: str, __event_emitter__=None
) -> str:
"""
Integrates with external API with comprehensive status tracking
✅ Compatible with both function calling modes
"""

if not __event_emitter__:
return "API integration completed (no status available)"

try:
await __event_emitter__({
"type": "status",
"data": {"description": "Connecting to API...", "done": False}
})

# Simulate API connection
await asyncio.sleep(1.5)

await __event_emitter__({
"type": "status",
"data": {"description": "Authenticating...", "done": False}
})

# Simulate authentication
await asyncio.sleep(1)

await __event_emitter__({
"type": "status",
"data": {"description": "Fetching data...", "done": False}
})

# Simulate data fetching
await asyncio.sleep(2)

# Success status
await __event_emitter__({
"type": "status",
"data": {"description": "API integration successful", "done": True}
})

return "Successfully retrieved 150 records from the API"

except Exception as e:
# Error status - always visible for debugging
await __event_emitter__({
"type": "status",
"data": {"description": f"Error: {str(e)}", "done": True, "hidden": False}
})

return f"API integration failed: {str(e)}"
Multi-Step Progress Status
async def batch_processor_tool(
self, items: list, __event_emitter__=None
) -> str:
"""
Processes items in batches with detailed progress tracking
✅ Works perfectly in both function calling modes
"""

if not __event_emitter__ or not items:
return "Batch processing completed"

total_items = len(items)
batch_size = 10
completed = 0

for i in range(0, total_items, batch_size):
batch = items[i:i + batch_size]
batch_num = (i // batch_size) + 1
total_batches = (total_items + batch_size - 1) // batch_size

# Update status for current batch
await __event_emitter__({
"type": "status",
"data": {
"description": f"Processing batch {batch_num}/{total_batches} ({len(batch)} items)...",
"done": False
}
})

# Simulate batch processing
await asyncio.sleep(1)

completed += len(batch)

# Progress update
progress_pct = int((completed / total_items) * 100)
await __event_emitter__({
"type": "status",
"data": {
"description": f"Progress: {completed}/{total_items} items ({progress_pct}%)",
"done": False
}
})

# Final completion status
await __event_emitter__({
"type": "status",
"data": {
"description": f"Batch processing complete! Processed {total_items} items",
"done": True
}
})

return f"Successfully processed {total_items} items in {total_batches} batches"

Message Events ⚠️ DEFAULT MODE ONLY

🚨 CRITICAL WARNING: Message events are INCOMPATIBLE with Native function calling mode!

Message events (message, chat:message, chat:message:delta, replace) allow you to append or modify message content at any stage during tool execution. This enables embedding images, rendering web pages, streaming content updates, and creating rich interactive experiences.

However, these event types have major compatibility issues:

  • Default Mode: Full functionality - content persists and displays properly
  • Native Mode: BROKEN - content gets overwritten by completion snapshots and disappears

Why Message Events Break in Native Mode: Native function calling emits repeated chat:completion events with full content snapshots that completely replace message content, causing any tool-emitted message updates to flicker and disappear.

Safe Message Event Structure (Default Mode Only):

await __event_emitter__({
"type": "message", # Also: "chat:message", "chat:message:delta", "replace"
"data": {"content": "This content will be appended/replaced in the chat"},
# Note: message types do NOT require a "done" condition
})

Message Event Types:

  • message / chat:message:delta: Appends content to existing message
  • chat:message / replace: Replaces entire message content
  • Both types will be overwritten in Native mode
Safe Message Streaming (Default Mode)
async def streaming_content_tool(
self, query: str, __event_emitter__=None, __metadata__=None
) -> str:
"""
Streams content updates during processing
⚠️ REQUIRES function_calling = "default" - Will not work in Native mode!
"""

# Check function calling mode (rough detection)
mode = "unknown"
if __metadata__:
mode = __metadata__.get("params", {}).get("function_calling", "default")

if mode == "native":
return "❌ This tool requires Default function calling mode. Message streaming is not supported in Native mode due to content overwriting issues."

if not __event_emitter__:
return "Event emitter not available"

# Stream progressive content updates
content_chunks = [
"🔍 **Phase 1: Research**\nGathering information about your query...\n\n",
"📊 **Phase 2: Analysis**\nAnalyzing gathered data patterns...\n\n",
"✨ **Phase 3: Synthesis**\nGenerating insights and recommendations...\n\n",
"📝 **Phase 4: Final Report**\nCompiling comprehensive results...\n\n"
]

accumulated_content = ""

for i, chunk in enumerate(content_chunks):
accumulated_content += chunk

# Append this chunk to the message
await __event_emitter__({
"type": "message",
"data": {"content": chunk}
})

# Show progress status
await __event_emitter__({
"type": "status",
"data": {
"description": f"Processing phase {i+1}/{len(content_chunks)}...",
"done": False
}
})

# Simulate processing time
await asyncio.sleep(2)

# Final completion
await __event_emitter__({
"type": "status",
"data": {"description": "Content streaming complete!", "done": True}
})

return "Content streaming completed successfully. All phases processed."
Dynamic Content Replacement (Default Mode)
async def live_dashboard_tool(
self, __event_emitter__=None, __metadata__=None
) -> str:
"""
Creates a live-updating dashboard using content replacement
⚠️ ONLY WORKS in Default function calling mode
"""

# Verify we're not in Native mode
mode = __metadata__.get("params", {}).get("function_calling", "default") if __metadata__ else "default"

if mode == "native":
return """
❌ **Native Mode Incompatibility**

This dashboard tool cannot function in Native mode because:
- Content replacement events get overwritten by completion snapshots
- Live updates will flicker and disappear
- Real-time data will not persist in the interface

**Solution:** Switch to Default function calling mode in Model Settings → Advanced Params → Function Calling = "Default"
"""

if not __event_emitter__:
return "Dashboard created (static mode - no live updates)"

# Create initial dashboard
initial_dashboard = """
# 📊 Live System Dashboard

## System Status: 🟡 Initializing...

### Current Metrics:
- **CPU Usage**: Loading...
- **Memory**: Loading...
- **Active Users**: Loading...
- **Response Time**: Loading...

---
*Last Updated: Initializing...*
"""

await __event_emitter__({
"type": "replace",
"data": {"content": initial_dashboard}
})

# Simulate live data updates
updates = [
{
"status": "🟢 Online",
"cpu": "23%",
"memory": "64%",
"users": "1,247",
"response": "145ms"
},
{
"status": "🟢 Optimal",
"cpu": "18%",
"memory": "61%",
"users": "1,352",
"response": "132ms"
},
{
"status": "🟡 Busy",
"cpu": "67%",
"memory": "78%",
"users": "1,891",
"response": "234ms"
}
]

for i, data in enumerate(updates):
await asyncio.sleep(3) # Simulate data collection delay

updated_dashboard = f"""
# 📊 Live System Dashboard

## System Status: {data['status']}

### Current Metrics:
- **CPU Usage**: {data['cpu']}
- **Memory**: {data['memory']}
- **Active Users**: {data['users']}
- **Response Time**: {data['response']}

---
*Last Updated: {datetime.now().strftime('%H:%M:%S')}*
*Update {i+1}/{len(updates)}*
"""

# Replace entire dashboard content
await __event_emitter__({
"type": "replace",
"data": {"content": updated_dashboard}
})

# Status update
await __event_emitter__({
"type": "status",
"data": {"description": f"Dashboard updated ({i+1}/{len(updates)})", "done": False}
})

await __event_emitter__({
"type": "status",
"data": {"description": "Live dashboard monitoring complete", "done": True}
})

return "Dashboard monitoring session completed."
Mode-Safe Message Tool
async def adaptive_content_tool(
self, content_type: str, __event_emitter__=None, __metadata__=None
) -> str:
"""
Adapts behavior based on function calling mode
✅ Provides best possible experience in both modes
"""

# Detect function calling mode
mode = "default" # Default assumption
if __metadata__:
mode = __metadata__.get("params", {}).get("function_calling", "default")

if not __event_emitter__:
return f"Generated {content_type} content (no real-time updates available)"

# Mode-specific behavior
if mode == "native":
# Use only compatible events in Native mode
await __event_emitter__({
"type": "status",
"data": {"description": f"Generating {content_type} content in Native mode...", "done": False}
})

await asyncio.sleep(2)

await __event_emitter__({
"type": "status",
"data": {"description": "Content generation complete", "done": True}
})

# Return content normally - no message events
return f"""
# {content_type.title()} Content

**Mode**: Native Function Calling (Limited Event Support)

Generated content here... This content is returned as the tool result rather than being streamed via message events.

*Note: Live content updates are not available in Native mode due to event compatibility limitations.*
"""

else: # Default mode
# Full message event functionality available
await __event_emitter__({
"type": "status",
"data": {"description": "Generating content with full streaming support...", "done": False}
})

# Stream content progressively
progressive_content = [
f"# {content_type.title()} Content\n\n**Mode**: Default Function Calling ✅\n\n",
"## Section 1: Introduction\nStreaming content in real-time...\n\n",
"## Section 2: Details\nAdding detailed information...\n\n",
"## Section 3: Conclusion\nFinalizing content delivery...\n\n",
"*✅ Content streaming completed successfully!*"
]

for i, chunk in enumerate(progressive_content):
await __event_emitter__({
"type": "message",
"data": {"content": chunk}
})

await __event_emitter__({
"type": "status",
"data": {"description": f"Streaming section {i+1}/{len(progressive_content)}...", "done": False}
})

await asyncio.sleep(1.5)

await __event_emitter__({
"type": "status",
"data": {"description": "Content streaming complete!", "done": True}
})

return "Content has been streamed above with full Default mode capabilities."

Citations ✅ FULLY COMPATIBLE

Citation events work identically in both Default and Native function calling modes. This event type provides source references and citations in the chat interface, allowing users to click and view source materials.

Citations are essential for tools that retrieve information from external sources, databases, or documents. They provide transparency and allow users to verify information sources.

Citation Event Structure:

await __event_emitter__({
"type": "citation",
"data": {
"document": [content], # Array of content strings
"metadata": [ # Array of metadata objects
{
"date_accessed": datetime.now().isoformat(),
"source": title,
"author": "Author Name", # Optional
"publication_date": "2024-01-01", # Optional
"url": "https://source-url.com" # Optional
}
],
"source": {"name": title, "url": url} # Primary source info
}
})

Important Citation Setup: When implementing custom citations, you must disable automatic citations in your Tools class:

def __init__(self):
self.citation = False # REQUIRED - prevents automatic citations from overriding custom ones

⚠️ Critical Citation Warning: If you set self.citation = True (or don't set it to False), automatic citations will replace any custom citations you send. Always disable automatic citations when using custom citation events.

Basic Citation Example
class Tools:
def __init__(self):
self.citation = False # Disable automatic citations

async def research_tool(
self, topic: str, __event_emitter__=None
) -> str:
"""
Researches a topic and provides proper citations
✅ Works identically in both Default and Native modes
"""

if not __event_emitter__:
return "Research completed (citations not available)"

# Simulate research findings
sources = [
{
"title": "Advanced AI Systems",
"url": "https://example.com/ai-systems",
"content": "Artificial intelligence systems have evolved significantly...",
"author": "Dr. Jane Smith",
"date": "2024-03-15"
},
{
"title": "Machine Learning Fundamentals",
"url": "https://example.com/ml-fundamentals",
"content": "The core principles of machine learning include...",
"author": "Prof. John Doe",
"date": "2024-02-20"
}
]

# Emit citations for each source
for source in sources:
await __event_emitter__({
"type": "citation",
"data": {
"document": [source["content"]],
"metadata": [
{
"date_accessed": datetime.now().isoformat(),
"source": source["title"],
"author": source["author"],
"publication_date": source["date"],
"url": source["url"]
}
],
"source": {
"name": source["title"],
"url": source["url"]
}
}
})

return f"Research on '{topic}' completed. Found {len(sources)} relevant sources with detailed citations."
Advanced Multi-Source Citations
async def comprehensive_analysis_tool(
self, query: str, __event_emitter__=None
) -> str:
"""
Performs comprehensive analysis with multiple source types
✅ Full compatibility across all function calling modes
"""

if not __event_emitter__:
return "Analysis completed"

# Multiple source types with rich metadata
research_sources = {
"academic": [
{
"title": "Neural Network Architecture in Modern AI",
"authors": ["Dr. Sarah Chen", "Prof. Michael Rodriguez"],
"journal": "Journal of AI Research",
"volume": "Vol. 45, Issue 2",
"pages": "123-145",
"doi": "10.1000/182",
"date": "2024-01-15",
"content": "This comprehensive study examines the evolution of neural network architectures..."
}
],
"web_sources": [
{
"title": "Industry AI Implementation Trends",
"url": "https://tech-insights.com/ai-trends-2024",
"site_name": "TechInsights",
"published": "2024-03-01",
"content": "Recent industry surveys show that 78% of companies are implementing AI solutions..."
}
],
"reports": [
{
"title": "Global AI Market Report 2024",
"organization": "International Tech Research Institute",
"report_number": "ITRI-2024-AI-001",
"date": "2024-02-28",
"content": "The global artificial intelligence market is projected to reach $1.8 trillion by 2030..."
}
]
}

citation_count = 0

# Process academic sources
for source in research_sources["academic"]:
citation_count += 1
await __event_emitter__({
"type": "citation",
"data": {
"document": [source["content"]],
"metadata": [
{
"date_accessed": datetime.now().isoformat(),
"source": source["title"],
"authors": source["authors"],
"journal": source["journal"],
"volume": source["volume"],
"pages": source["pages"],
"doi": source["doi"],
"publication_date": source["date"],
"type": "academic_journal"
}
],
"source": {
"name": f"{source['title']} - {source['journal']}",
"url": f"https://doi.org/{source['doi']}"
}
}
})

# Process web sources
for source in research_sources["web_sources"]:
citation_count += 1
await __event_emitter__({
"type": "citation",
"data": {
"document": [source["content"]],
"metadata": [
{
"date_accessed": datetime.now().isoformat(),
"source": source["title"],
"site_name": source["site_name"],
"publication_date": source["published"],
"url": source["url"],
"type": "web_article"
}
],
"source": {
"name": source["title"],
"url": source["url"]
}
}
})

# Process reports
for source in research_sources["reports"]:
citation_count += 1
await __event_emitter__({
"type": "citation",
"data": {
"document": [source["content"]],
"metadata": [
{
"date_accessed": datetime.now().isoformat(),
"source": source["title"],
"organization": source["organization"],
"report_number": source["report_number"],
"publication_date": source["date"],
"type": "research_report"
}
],
"source": {
"name": f"{source['title']} - {source['organization']}",
"url": f"https://reports.example.com/{source['report_number']}"
}
}
})

return f"""
# Analysis Complete

Comprehensive analysis of '{query}' has been completed using {citation_count} authoritative sources:

- **{len(research_sources['academic'])}** Academic journal articles
- **{len(research_sources['web_sources'])}** Industry web sources
- **{len(research_sources['reports'])}** Research reports

All sources have been properly cited and are available for review by clicking the citation links above.
"""
Database Citation Tool
async def database_query_tool(
self, sql_query: str, __event_emitter__=None
) -> str:
"""
Queries database and provides data citations
✅ Works perfectly in both function calling modes
"""

if not __event_emitter__:
return "Database query executed"

# Simulate database results with citation metadata
query_results = [
{
"record_id": "USR_001247",
"data": "John Smith, Software Engineer, joined 2023-01-15",
"table": "employees",
"last_updated": "2024-03-10T14:30:00Z",
"updated_by": "admin_user"
},
{
"record_id": "USR_001248",
"data": "Jane Wilson, Product Manager, joined 2023-02-20",
"table": "employees",
"last_updated": "2024-03-08T09:15:00Z",
"updated_by": "hr_system"
}
]

# Create citations for each database record
for i, record in enumerate(query_results):
await __event_emitter__({
"type": "citation",
"data": {
"document": [f"Database Record: {record['data']}"],
"metadata": [
{
"date_accessed": datetime.now().isoformat(),
"source": f"Database Table: {record['table']}",
"record_id": record['record_id'],
"last_updated": record['last_updated'],
"updated_by": record['updated_by'],
"query": sql_query,
"type": "database_record"
}
],
"source": {
"name": f"Record {record['record_id']} - {record['table']}",
"url": f"database://internal/tables/{record['table']}/{record['record_id']}"
}
}
})

return f"""
# Database Query Results

Executed query: `{sql_query}`

Retrieved **{len(query_results)}** records with complete citation metadata. Each record includes:
- Record ID and source table
- Last modification timestamp
- Update attribution
- Full audit trail

All data sources have been properly cited for transparency and verification.
"""

Additional Compatible Event Types ✅

The following event types work identically in both Default and Native function calling modes:

Notification Events

await __event_emitter__({
"type": "notification",
"data": {"content": "Toast notification message"}
})

File Events

await __event_emitter__({
"type": "files", # or "chat:message:files"
"data": {"files": [{"name": "report.pdf", "url": "/files/report.pdf"}]}
})

Follow-up Events

await __event_emitter__({
"type": "chat:message:follow_ups",
"data": {"follow_ups": ["What about X?", "Tell me more about Y"]}
})

Title Update Events

await __event_emitter__({
"type": "chat:title",
"data": {"title": "New Chat Title"}
})

Tag Events

await __event_emitter__({
"type": "chat:tags",
"data": {"tags": ["research", "analysis", "completed"]}
})

Error Events

await __event_emitter__({
"type": "chat:message:error",
"data": {"content": "Error message to display"}
})

Confirmation Events

await __event_emitter__({
"type": "confirmation",
"data": {"message": "Are you sure you want to continue?"}
})

Input Request Events

await __event_emitter__({
"type": "input",
"data": {"prompt": "Please enter additional information:"}
})

Code Execution Events

await __event_emitter__({
"type": "execute",
"data": {"code": "print('Hello from tool-generated code!')"}
})

Comprehensive Function Calling Mode Guide

Choosing the right function calling mode is crucial for your tool's functionality. This guide helps you make an informed decision based on your specific requirements.

Mode Comparison Overview:

AspectDefault ModeNative Mode
LatencyHigher - processes through Open WebUI pipelineLower - direct model handling
Event Support✅ Full - all event types work perfectly⚠️ Limited - many event types broken
ComplexityHandles complex tool interactions wellBest for simple tool calls
CompatibilityWorks with all modelsRequires models with native tool calling
StreamingPerfect for real-time updatesPoor - content gets overwritten
Citations✅ Full support✅ Full support
Status Updates✅ Full support✅ Full support
Message Events✅ Full support❌ Broken - content disappears

Decision Framework:

  1. Do you need real-time content streaming, live updates, or dynamic message modification?

    • Yes → Use Default Mode (Native mode will break these features)
    • No → Either mode works
  2. Is your tool primarily for simple data retrieval or computation?

    • YesNative Mode is fine (lower latency)
    • No → Consider Default Mode for complex interactions
  3. Do you need maximum performance and minimal latency?

    • YesNative Mode (if compatible with your features)
    • NoDefault Mode provides more features
  4. Are you building interactive experiences, dashboards, or multi-step workflows?

    • YesDefault Mode required
    • No → Either mode works

Recommended Usage Patterns:

🏆 Best Practices for Mode Selection

Choose Default Mode For:

  • Tools with progressive content updates
  • Interactive dashboards or live data displays
  • Multi-step workflows with visual feedback
  • Complex tool chains with intermediate results
  • Educational tools that show step-by-step processes
  • Any tool that needs message, replace, or chat:message events

Choose Native Mode For:

  • Simple API calls or database queries
  • Basic calculations or data transformations
  • Tools that only need status updates and citations
  • Performance-critical applications where latency matters
  • Simple retrieval tools without complex UI requirements

Universal Compatibility Pattern:

async def mode_adaptive_tool(
self, query: str, __event_emitter__=None, __metadata__=None
) -> str:
"""
Tool that adapts its behavior based on function calling mode
✅ Provides optimal experience in both modes
"""

# Detect current mode
mode = "default"
if __metadata__:
mode = __metadata__.get("params", {}).get("function_calling", "default")

is_native_mode = (mode == "native")

if not __event_emitter__:
return "Tool executed successfully (no event support)"

# Always safe: status updates work in both modes
await __event_emitter__({
"type": "status",
"data": {"description": f"Running in {mode} mode...", "done": False}
})

# Mode-specific logic
if is_native_mode:
# Native mode: use compatible events only
await __event_emitter__({
"type": "status",
"data": {"description": "Processing with native efficiency...", "done": False}
})

# Simulate processing
await asyncio.sleep(1)

# Return results directly - no message streaming
result = f"Query '{query}' processed successfully in Native mode."

else:
# Default mode: full event capabilities
await __event_emitter__({
"type": "message",
"data": {"content": f"🔍 **Processing Query**: {query}\n\n"}
})

await __event_emitter__({
"type": "status",
"data": {"description": "Analyzing with full streaming...", "done": False}
})

await asyncio.sleep(1)

await __event_emitter__({
"type": "message",
"data": {"content": "📊 **Results**: Analysis complete with detailed findings.\n\n"}
})

result = "Query processed with full Default mode capabilities."

# Final status (works in both modes)
await __event_emitter__({
"type": "status",
"data": {"description": "Processing complete!", "done": True}
})

return result
🔧 Debugging Event Emitter Issues

Common Issues and Solutions:

Issue: Content appears then disappears

  • Cause: Using message events in Native mode
  • Solution: Switch to Default mode or use status events instead

Issue: Tool seems unresponsive

  • Cause: Function calling not enabled for model
  • Solution: Enable tools in Model settings or via + button

Issue: Events not firing at all

  • Cause: __event_emitter__ parameter missing or None
  • Solution: Ensure parameter is included in tool method signature

Issue: Citations being overwritten

  • Cause: self.citation = True (or not set to False)
  • Solution: Set self.citation = False in __init__ method

Diagnostic Tool:

async def event_diagnostics_tool(
self, __event_emitter__=None, __metadata__=None, __user__=None
) -> str:
"""
Comprehensive diagnostic tool for event emitter debugging
"""

report = ["# 🔍 Event Emitter Diagnostic Report\n"]

# Check event emitter availability
if __event_emitter__:
report.append("✅ Event emitter is available\n")
else:
report.append("❌ Event emitter is NOT available\n")
return "".join(report)

# Check metadata availability
if __metadata__:
mode = __metadata__.get("params", {}).get("function_calling", "default")
report.append(f"✅ Function calling mode: **{mode}**\n")
else:
report.append("⚠️ Metadata not available (mode unknown)\n")
mode = "unknown"

# Check user context
if __user__:
report.append("✅ User context available\n")
else:
report.append("⚠️ User context not available\n")

# Test compatible events (work in both modes)
report.append("\n## Testing Compatible Events:\n")

try:
await __event_emitter__({
"type": "status",
"data": {"description": "Testing status events...", "done": False}
})
report.append("✅ Status events: WORKING\n")
except Exception as e:
report.append(f"❌ Status events: FAILED - {str(e)}\n")

try:
await __event_emitter__({
"type": "notification",
"data": {"content": "Test notification"}
})
report.append("✅ Notification events: WORKING\n")
except Exception as e:
report.append(f"❌ Notification events: FAILED - {str(e)}\n")

# Test problematic events (broken in Native mode)
report.append("\n## Testing Mode-Dependent Events:\n")

try:
await __event_emitter__({
"type": "message",
"data": {"content": "**Test message event** - This should appear in Default mode only\n"}
})
report.append("✅ Message events: SENT (may disappear in Native mode)\n")
except Exception as e:
report.append(f"❌ Message events: FAILED - {str(e)}\n")

# Final status
await __event_emitter__({
"type": "status",
"data": {"description": "Diagnostic complete", "done": True}
})

# Mode-specific recommendations
report.append("\n## Recommendations:\n")

if mode == "native":
report.append("""
⚠️ **Native Mode Detected**: Limited event support
- ✅ Use: status, citation, notification, files events
- ❌ Avoid: message, replace, chat:message events
- 💡 Switch to Default mode for full functionality
""")
elif mode == "default":
report.append("""
✅ **Default Mode Detected**: Full event support available
- All event types should work perfectly
- Optimal for interactive and streaming tools
""")
else:
report.append("""
❓ **Unknown Mode**: Check your model configuration
- Ensure function calling is enabled
- Verify model supports tool calling
""")

return "".join(report)
📚 Event Emitter Quick Reference

Always Compatible (Both Modes):

# Status updates - perfect for progress tracking
await __event_emitter__({
"type": "status",
"data": {"description": "Processing...", "done": False}
})

# Citations - essential for source attribution
await __event_emitter__({
"type": "citation",
"data": {
"document": ["Content"],
"source": {"name": "Source", "url": "https://example.com"}
}
})

# Notifications - user alerts
await __event_emitter__({
"type": "notification",
"data": {"content": "Task completed!"}
})

Default Mode Only (Broken in Native):

# ⚠️ These will flicker/disappear in Native mode

# Progressive content streaming
await __event_emitter__({
"type": "message",
"data": {"content": "Streaming content..."}
})

# Content replacement
await __event_emitter__({
"type": "replace",
"data": {"content": "New complete content"}
})

# Delta updates
await __event_emitter__({
"type": "chat:message:delta",
"data": {"content": "Additional content"}
})

Mode Detection Pattern:

def get_function_calling_mode(__metadata__):
"""Utility to detect current function calling mode"""
if not __metadata__:
return "unknown"
return __metadata__.get("params", {}).get("function_calling", "default")

# Usage in tools:
mode = get_function_calling_mode(__metadata__)
is_native = (mode == "native")
can_stream_messages = not is_native

Essential Imports:

import asyncio
from datetime import datetime
from typing import Optional, Callable, Awaitable

Rich UI Element Embedding

Both External and Built-In Tools now support rich UI element embedding, allowing tools to return HTML content and interactive iframes that display directly within chat conversations. This feature enables tools to provide sophisticated visual interfaces, interactive widgets, charts, dashboards, and other rich web content.

When a tool returns an HTMLResponse with the appropriate headers, the content will be embedded as an interactive iframe in the chat interface rather than displayed as plain text.

Basic Usage

To embed HTML content, your tool should return an HTMLResponse with the Content-Disposition: inline header:

from fastapi.responses import HTMLResponse

def create_visualization_tool(self, data: str) -> HTMLResponse:
"""
Creates an interactive data visualization that embeds in the chat.

:param data: The data to visualize
"""
html_content = """
<!DOCTYPE html>
<html>
<head>
<title>Data Visualization</title>
<script src="https://cdn.plot.ly/plotly-latest.min.js"></script>
</head>
<body>
<div id="chart" style="width:100%;height:400px;"></div>
<script>
// Your interactive chart code here
Plotly.newPlot('chart', [{
y: [1, 2, 3, 4],
type: 'scatter'
}]);
</script>
</body>
</html>
"""

headers = {"Content-Disposition": "inline"}
return HTMLResponse(content=html_content, headers=headers)

Advanced Features

The embedded iframes support auto-resizing and include configurable security settings. The system automatically handles:

  • Auto-resizing: Embedded content automatically adjusts height based on its content
  • Cross-origin communication: Safe message passing between the iframe and parent window
  • Security sandbox: Configurable security restrictions for embedded content

Security Considerations

When embedding external content, several security options can be configured through the UI settings:

  • iframeSandboxAllowForms: Allow form submissions within embedded content
  • iframeSandboxAllowSameOrigin: Allow same-origin requests (use with caution)
  • iframeSandboxAllowPopups: Allow popup windows from embedded content

Use Cases

Rich UI embedding is perfect for:

  • Interactive dashboards: Real-time data visualization and controls
  • Form interfaces: Complex input forms with validation and dynamic behavior
  • Charts and graphs: Interactive plotting with libraries like Plotly, D3.js, or Chart.js
  • Media players: Video, audio, or interactive media content
  • Custom widgets: Specialized UI components for specific tool functionality
  • External integrations: Embedding content from external services or APIs

External Tool Example

For external tools served via HTTP endpoints:

@app.post("/tools/dashboard")
async def create_dashboard():
html = """
<div style="padding: 20px;">
<h2>System Dashboard</h2>
<canvas id="myChart" width="400" height="200"></canvas>
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
<script>
const ctx = document.getElementById('myChart').getContext('2d');
new Chart(ctx, {
type: 'line',
data: { /* your chart data */ }
});
</script>
</div>
"""

return HTMLResponse(
content=html,
headers={"Content-Disposition": "inline"}
)

The embedded content automatically inherits responsive design and integrates seamlessly with the chat interface, providing a native-feeling experience for users interacting with your tools.

External packages

In the Tools definition metadata you can specify custom packages. When you click Save the line will be parsed and pip install will be run on all requirements at once.

Keep in mind that as pip is used in the same process as Open WebUI, the UI will be completely unresponsive during the installation.

No measures are taken to handle package conflicts with Open WebUI's requirements. That means that specifying requirements can break Open WebUI if you're not careful. You might be able to work around this by specifying open-webui itself as a requirement.

Example
"""
title: myToolName
author: myName
funding_url: [any link here will be shown behind a `Heart` button for users to show their support to you]
version: 1.0.0
# the version is displayed in the UI to help users keep track of updates.
license: GPLv3
description: [recommended]
requirements: package1>=2.7.0,package2,package3
"""