AI Function Calling in 2025: Build Agents That Actually Work
Introduction
Function calling transforms LLMs from chat interfaces into autonomous agents. This guide shows you how to implement it correctly across different providers.
TL;DR
AI function calling lets models execute code and use tools. Key patterns: define schemas, parse responses, handle errors, implement retry logic. Works with OpenAI, Anthropic, and local models.
The Current Landscape
Provider Comparison
- OpenAI: Most mature, parallel function calls, JSON mode
- Anthropic (Claude): Best reasoning, tool use via XML
- Google (Gemini): Growing support, good for multi-modal
- Local (Llama 3.3): Function calling via Ollama/llama.cpp
- LangChain: Unified interface for all providers, built-in tools
Core Implementation
First, install the required packages:
pip install openai langchain langchain-openai langchain-anthropic langchain-google-genai tenacity pydantic
Step 1: Define Your Functions
from typing import Dict, Any, List
import json
from datetime import datetime
import os
# Define actual functions
def get_current_weather(location: str, unit: str = "celsius") -> Dict[str, Any]:
"""Get current weather for a location"""
# In production, call actual weather API
return {
"location": location,
"temperature": 22,
"unit": unit,
"conditions": "sunny",
"humidity": 65,
"wind_speed": 10
}
def search_database(query: str, filters: Dict[str, Any] = None) -> List[Dict[str, Any]]:
"""Search internal database"""
# In production, query actual database
results = [
{"id": 1, "title": f"Result for: {query}", "relevance": 0.95},
{"id": 2, "title": f"Related to: {query}", "relevance": 0.87}
]
if filters:
# Apply filters in production
results[0]["filters_applied"] = filters
return results
# Define schemas for the AI
tools = [
{
"type": "function",
"function": {
"name": "get_current_weather",
"description": "Get the current weather in a given location",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA"
},
"unit": {
"type": "string",
"enum": ["celsius", "fahrenheit"]
}
},
"required": ["location"]
}
}
},
{
"type": "function",
"function": {
"name": "search_database",
"description": "Search internal database for information",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query"
},
"filters": {
"type": "object",
"description": "Optional filters"
}
},
"required": ["query"]
}
}
}
]
Step 2: Implement Function Calling Logic
from openai import OpenAI
from typing import Optional
class AIAgent:
def __init__(self, api_key: str):
self.client = OpenAI(api_key=api_key)
self.available_functions = {
"get_current_weather": get_current_weather,
"search_database": search_database
}
def process_message(self, user_message: str) -> str:
messages = [{"role": "user", "content": user_message}]
# First API call
response = self.client.chat.completions.create(
model="gpt-4-turbo-preview",
messages=messages,
tools=tools,
tool_choice="auto" # Let the model decide
)
response_message = response.choices[0].message
tool_calls = response_message.tool_calls
# Check if the model wants to call functions
if tool_calls:
messages.append(response_message.model_dump())
# Execute function calls
for tool_call in tool_calls:
function_name = tool_call.function.name
function_args = json.loads(tool_call.function.arguments)
function_response = self.available_functions[function_name](
**function_args
)
messages.append({
"tool_call_id": tool_call.id,
"role": "tool",
"name": function_name,
"content": json.dumps(function_response)
})
# Second API call with function results
second_response = self.client.chat.completions.create(
model="gpt-4-turbo-preview",
messages=messages
)
return second_response.choices[0].message.content
return response_message.content
Step 3: Production-Ready Error Handling
import asyncio
from typing import Any
from tenacity import retry, stop_after_attempt, wait_exponential
class ProductionAIAgent(AIAgent):
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def execute_function_with_retry(self, func_name: str, args: dict):
try:
func = self.available_functions.get(func_name)
if not func:
return {"error": f"Function {func_name} not found"}
# Add timeout for long-running functions
result = await asyncio.wait_for(
asyncio.to_thread(func, **args),
timeout=30.0
)
return result
except asyncio.TimeoutError:
return {"error": "Function execution timed out"}
except Exception as e:
return {"error": f"Function failed: {str(e)}"}
def process_with_streaming(self, user_message: str):
messages = [{"role": "user", "content": user_message}]
stream = self.client.chat.completions.create(
model="gpt-4-turbo-preview",
messages=messages,
tools=tools,
stream=True
)
collected_chunks = []
for chunk in stream:
collected_chunks.append(chunk)
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
Step 4: Multi-Provider Support
class UniversalFunctionCaller:
def __init__(self, provider: str = "openai"):
self.provider = provider
def format_for_provider(self, tools):
if self.provider == "anthropic":
# Convert to Anthropic format
return {
"tools": [{
"name": t["function"]["name"],
"description": t["function"]["description"],
"input_schema": t["function"]["parameters"]
} for t in tools]
}
elif self.provider == "openai":
return {"tools": tools}
elif self.provider == "gemini":
# Convert to Gemini format
return {
"function_declarations": [{
"name": t["function"]["name"],
"description": t["function"]["description"],
"parameters": t["function"]["parameters"]
} for t in tools]
}
LangChain Implementation
LangChain provides a unified interface for function calling across all providers:
Basic LangChain Setup
from langchain_openai import ChatOpenAI
from langchain_core.tools import Tool, StructuredTool
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from pydantic import BaseModel, Field
# Define tool with Pydantic for better validation
class WeatherInput(BaseModel):
location: str = Field(description="City and state, e.g. San Francisco, CA")
unit: str = Field(default="celsius", enum=["celsius", "fahrenheit"])
class SearchInput(BaseModel):
query: str = Field(description="Search query string")
filters: Dict[str, Any] = Field(default_factory=dict, description="Optional filters")
# Create structured tools
weather_tool = StructuredTool.from_function(
func=get_current_weather,
name="get_current_weather",
description="Get current weather for a location",
args_schema=WeatherInput
)
search_tool = StructuredTool.from_function(
func=search_database,
name="search_database",
description="Search internal database for information",
args_schema=SearchInput
)
# Initialize LLM with tools
llm = ChatOpenAI(
model="gpt-4-turbo-preview",
temperature=0
)
# Bind tools to LLM
llm_with_tools = llm.bind_tools([weather_tool, search_tool])
# Simple usage
response = llm_with_tools.invoke("What's the weather in San Francisco?")
if response.tool_calls:
print(f"Tool calls: {response.tool_calls}")
Advanced LangChain Agent
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain.memory import ConversationBufferMemory
class LangChainAgent:
def __init__(self, model_name="gpt-4-turbo-preview"):
self.llm = ChatOpenAI(model=model_name, temperature=0)
self.tools = [weather_tool, search_tool]
# Create prompt template
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant. Use tools when needed."),
MessagesPlaceholder("chat_history", optional=True),
("human", "{input}"),
MessagesPlaceholder("agent_scratchpad"),
])
# Create the agent
agent = create_tool_calling_agent(self.llm, self.tools, prompt)
# Create executor with memory
self.executor = AgentExecutor(
agent=agent,
tools=self.tools,
verbose=True,
handle_parsing_errors=True,
max_iterations=3 # Prevent infinite loops
)
self.memory = ConversationBufferMemory(
return_messages=True,
memory_key="chat_history"
)
def process(self, user_input: str) -> str:
try:
# Include memory in the execution
response = self.executor.invoke({
"input": user_input,
"chat_history": self.memory.chat_memory.messages
})
# Update memory
self.memory.chat_memory.add_user_message(user_input)
self.memory.chat_memory.add_ai_message(response["output"])
return response["output"]
except Exception as e:
return f"Error: {str(e)}"
# Usage with conversation memory
agent = LangChainAgent()
print(agent.process("What's the weather in NYC?"))
print(agent.process("Search for restaurants there")) # Remembers NYC
LangChain Multi-Provider Support
from langchain_anthropic import ChatAnthropic
from langchain_google_genai import ChatGoogleGenerativeAI
class MultiProviderAgent:
def __init__(self, provider="openai", api_key=None):
tools = [weather_tool, search_tool]
if provider == "openai":
self.llm = ChatOpenAI(
model="gpt-4-turbo-preview",
api_key=api_key
)
elif provider == "anthropic":
self.llm = ChatAnthropic(
model="claude-3-opus-20240229",
api_key=api_key
)
elif provider == "google":
self.llm = ChatGoogleGenerativeAI(
model="gemini-pro",
google_api_key=api_key
)
# Same tools work across all providers
self.llm_with_tools = self.llm.bind_tools(tools)
self.tools_map = {tool.name: tool for tool in tools}
async def aprocess(self, message: str):
"""Process message with automatic tool execution"""
# Initial LLM call
response = await self.llm_with_tools.ainvoke(message)
if response.tool_calls:
# Execute tools
tool_results = []
for tool_call in response.tool_calls:
tool = self.tools_map.get(tool_call["name"])
if tool:
result = await asyncio.to_thread(
tool.func,
**tool_call["args"]
)
tool_results.append({
"tool_call_id": tool_call.get("id", ""),
"result": result
})
# Create follow-up message with results
follow_up = f"Tool results: {json.dumps(tool_results)}. Please provide a natural language response."
final_response = await self.llm.ainvoke(follow_up)
return final_response.content
return response.content
# Usage example
agent = MultiProviderAgent("openai", api_key=os.getenv("OPENAI_API_KEY"))
result = await agent.aprocess("What's the weather in Tokyo and search for sushi restaurants")
LangChain Streaming with Function Calls
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
async def stream_with_functions():
llm = ChatOpenAI(
model="gpt-4-turbo-preview",
streaming=True,
callbacks=[StreamingStdOutCallbackHandler()]
).bind_tools([weather_tool])
# Stream responses
async for chunk in llm.astream("What's the weather in Paris?"):
if chunk.tool_calls:
for tool_call in chunk.tool_calls:
# Handle streaming tool calls
print(f"Calling: {tool_call}")
Real-World Patterns
1. Conversation Memory
class StatefulAgent(AIAgent):
def __init__(self, api_key: str):
super().__init__(api_key)
self.conversation_history = []
self.function_call_history = []
def remember_function_calls(self, function_name: str, result: Any):
self.function_call_history.append({
"timestamp": datetime.now(),
"function": function_name,
"result": result
})
2. Function Approval System
def requires_approval(func_name: str, args: dict) -> bool:
SENSITIVE_FUNCTIONS = ["delete_user", "send_email", "make_payment"]
return func_name in SENSITIVE_FUNCTIONS
class SafeAgent(AIAgent):
async def execute_with_approval(self, func_name: str, args: dict):
if requires_approval(func_name, args):
approval = await get_user_approval(func_name, args)
if not approval:
return {"error": "Function call rejected by user"}
return await self.execute_function_with_retry(func_name, args)
3. Parallel Function Execution
import asyncio
import json
async def execute_function_async(func_name: str, args: dict):
"""Execute a function asynchronously"""
func = {
"get_current_weather": get_current_weather,
"search_database": search_database
}.get(func_name)
if not func:
return {"error": f"Unknown function: {func_name}"}
try:
# Run sync function in thread pool
result = await asyncio.to_thread(func, **args)
return result
except Exception as e:
return {"error": str(e)}
async def execute_parallel_functions(tool_calls):
"""Execute multiple function calls in parallel"""
tasks = []
for tool_call in tool_calls:
task = execute_function_async(
tool_call.function.name,
json.loads(tool_call.function.arguments)
)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append({"error": str(result)})
else:
processed_results.append(result)
return processed_results
Common Pitfalls & Solutions
- Infinite Loops: Set max function calls per conversation
- Hallucinated Functions: Validate function names before execution
- Schema Drift: Version your function schemas
- Cost Explosion: Implement token/cost limits
- Latency: Use streaming and parallel execution
LangChain-Specific Gotchas
- Tool Naming: Keep tool names consistent across providers
- Memory Overhead: Clear conversation memory periodically
- Parsing Errors: Use
handle_parsing_errors=True
in agents - Async Mixing: Don’t mix sync/async tools in same agent
Performance Optimization
# Cache function results
from functools import lru_cache
import hashlib
@lru_cache(maxsize=100)
def cached_weather_lookup(location: str):
"""Cache weather lookups for 5 minutes"""
return get_current_weather(location)
# Batch similar function calls
def batch_database_searches(queries: List[str]) -> List[Dict[str, Any]]:
"""Execute multiple searches in one go"""
results = []
for query in queries:
results.extend(search_database(query))
return results
# Implement request coalescing
class FunctionCallOptimizer:
def __init__(self):
self.pending_calls = {}
self.call_results = {}
async def optimized_call(self, func_name: str, args: dict):
# Create unique key for the call
call_key = f"{func_name}:{hashlib.md5(json.dumps(args, sort_keys=True).encode()).hexdigest()}"
# Check if we're already processing this call
if call_key in self.pending_calls:
# Wait for the existing call to complete
return await self.pending_calls[call_key]
# Start new call
future = asyncio.create_task(self._execute_call(func_name, args))
self.pending_calls[call_key] = future
try:
result = await future
self.call_results[call_key] = result
return result
finally:
del self.pending_calls[call_key]
async def _execute_call(self, func_name: str, args: dict):
# Execute the actual function
func = {
"get_current_weather": get_current_weather,
"search_database": search_database
}.get(func_name)
if not func:
raise ValueError(f"Unknown function: {func_name}")
return await asyncio.to_thread(func, **args)
Conclusion
Function calling transforms LLMs into powerful agents. Start simple with OpenAI’s implementation, add error handling and retries, then expand to multi-provider support. LangChain significantly simplifies cross-provider compatibility and provides battle-tested patterns for production use.
Key takeaways:
- Direct API usage gives you fine control
- LangChain provides convenience and standardization
- Always implement retry logic and error handling
- Test with multiple providers to avoid vendor lock-in
Next steps: Implement function chaining, add observability with OpenTelemetry, and explore advanced agent frameworks like AutoGen or CrewAI for multi-agent workflows.