"""
Anthropic Python SDK – Generic, SDK-aligned scaffold (single file)
This file mirrors patterns from the official anthropic-sdk-python repository's README
and helpers/docs. It provides:
• Sync & Async clients
• Plain messages API helpers
• Streaming (context manager + text_stream lens)
• Tool helpers via @beta_tool and tool_runner
• Token counting utility
• Message Batches helpers
• Robust error handling, retry/timeout configuration hooks
• Minimal Bedrock/Vertex adapters (optional) to show parity
Requires: pip install anthropic
Optionals: pip install anthropic[aiohttp] # async aiohttp backend
Environment:
ANTHROPIC_API_KEY=... (required)
ANTHROPIC_MODEL_ID=claude-sonnet-4-5-20250929 (optional; override as needed)
"""
from __future__ import annotations
import os
import json
import logging
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional
import anthropic
from anthropic import Anthropic, AsyncAnthropic
# =====================
# Configuration
# =====================
API_KEY = os.environ.get("ANTHROPIC_API_KEY")
MODEL = os.environ.get("ANTHROPIC_MODEL_ID", "claude-sonnet-4-5-20250929")
if not API_KEY:
raise RuntimeError("ANTHROPIC_API_KEY is not set")
# Global logger (opt-in via ANTHROPIC_LOG=info)
log = logging.getLogger(__name__)
# Configure client-level retries/timeouts via ctor or .with_options on demand
client = Anthropic(api_key=API_KEY)
async_client = AsyncAnthropic(api_key=API_KEY)
# =====================
# Basic Messages API (sync)
# =====================
def ask(prompt: str, *, max_tokens: int = 1024) -> str:
"""Single-turn convenience wrapper using the Messages API (sync)."""
msg = client.messages.create(
model=MODEL,
max_tokens=max_tokens,
messages=[{"role": "user", "content": prompt}],
)
return _extract_text(msg.content)
# =====================
# Basic Messages API (async)
# =====================
async def aask(prompt: str, *, max_tokens: int = 1024) -> str:
"""Single-turn convenience wrapper using the Messages API (async)."""
msg = await async_client.messages.create(
model=MODEL,
max_tokens=max_tokens,
messages=[{"role": "user", "content": prompt}],
)
return _extract_text(msg.content)
# =====================
# Streaming helpers (sync + async)
# =====================
def stream_text(prompt: str, *, max_tokens: int = 1024) -> str:
"""Stream a response and return final assembled text if available."""
# Context manager exposes lenses like .text_stream and final message helpers.
with client.messages.stream(
model=MODEL,
max_tokens=max_tokens,
messages=[{"role": "user", "content": prompt}],
) as stream:
# Print incremental text as it's generated
for text in stream.text_stream:
print(text, end="", flush=True)
print()
# Retrieve the final message object
final = stream.get_final_message()
return _extract_text(final.content)
async def astream_text(prompt: str, *, max_tokens: int = 1024) -> str:
"""Async streaming variant using the same helpers."""
async with async_client.messages.stream(
model=MODEL,
max_tokens=max_tokens,
messages=[{"role": "user", "content": prompt}],
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)
print()
final = await stream.get_final_message()
return _extract_text(final.content)
# =====================
# Stateful conversation (sync)
# =====================
@dataclass
class Conversation:
history: List[Dict[str, Any]]
@classmethod
def start(cls, system_prompt: Optional[str] = None) -> "Conversation":
hist: List[Dict[str, Any]] = []
if system_prompt:
hist.append({"role": "system", "content": system_prompt})
return cls(history=hist)
def chat(self, user_message: str, *, max_tokens: int = 2048) -> str:
self.history.append({"role": "user", "content": user_message})
msg = client.messages.create(
model=MODEL,
max_tokens=max_tokens,
messages=self.history,
)
text = _extract_text(msg.content)
self.history.append({"role": "assistant", "content": text})
return text
# =====================
# Tool helpers (@beta_tool + tool_runner)
# =====================
# The SDK exposes a decorator and a runner that orchestrates tool calls.
# Keep tools generic — not tied to any specific business logic.
from anthropic import beta_tool # type: ignore
@beta_tool
def echo(payload: dict) -> str:
"""Echo any JSON payload back as a JSON string."""
return json.dumps({"echo": payload})
@beta_tool
def time_now(_: dict | None = None) -> str:
"""Return current UTC timestamp as JSON."""
import datetime as _dt
return json.dumps({"now_utc": _dt.datetime.now(_dt.timezone.utc).isoformat()})
def run_with_tools(prompt: str, *, tools: Optional[list] = None, max_tokens: int = 2048) -> str:
"""Iteratively run with tool_runner until the model stops calling tools, then return the final text."""
tools = tools or [echo, time_now]
runner = client.beta.messages.tool_runner(
model=MODEL,
max_tokens=max_tokens,
tools=tools,
messages=[{"role": "user", "content": prompt}],
)
last_text = ""
for message in runner: # each iteration performs a round-trip
last_text = _extract_text(message.content)
return last_text
# =====================
# Token counting utility
# =====================
def count_tokens_for(prompt: str) -> int:
"""Return the input token estimate for a message without creating it."""
count = client.messages.count_tokens(
model=MODEL,
messages=[{"role": "user", "content": prompt}],
)
return int(getattr(count, "input_tokens", 0))
# =====================
# Message Batches helpers (async is typical, sync supported by SDK as well)
# =====================
async def create_batch(requests: List[Dict[str, Any]]) -> Any:
"""Create a batch. Each request uses the same schema as messages.create()."""
return await async_client.messages.batches.create(requests=requests)
async def wait_and_collect_batch_results(batch_id: str) -> List[Dict[str, Any]]:
"""Poll for results and return an array of {custom_id, result} objects.
Uses results() which returns an async stream of entries.
"""
out: List[Dict[str, Any]] = []
async for entry in (await async_client.messages.batches.results(batch_id)):
if entry.result.type == "succeeded":
out.append({
"custom_id": entry.custom_id,
"content": _extract_text(entry.result.message.content),
})
else:
out.append({
"custom_id": entry.custom_id,
"error": getattr(entry.result, "error", None),
})
return out
# =====================
# Bedrock & Vertex adapters (optional examples)
# =====================
try:
from anthropic import AnthropicBedrock # type: ignore
def bedrock_hello() -> str:
bclient = AnthropicBedrock()
msg = bclient.messages.create(
model="anthropic.claude-sonnet-4-5-20250929-v1:0",
max_tokens=100,
messages=[{"role": "user", "content": "Hello!"}],
)
return _extract_text(msg.content)
except Exception: # pragma: no cover
def bedrock_hello() -> str:
return "Bedrock extra not installed"
try:
from anthropic import AnthropicVertex # type: ignore
def vertex_hello() -> str:
vclient = AnthropicVertex()
msg = vclient.messages.create(
model="claude-sonnet-4@20250514",
max_tokens=100,
messages=[{"role": "user", "content": "Hello!"}],
)
return _extract_text(msg.content)
except Exception: # pragma: no cover
def vertex_hello() -> str:
return "Vertex extra not installed"
# =====================
# Error-handling patterns
# =====================
def robust_ask(prompt: str, *, max_tokens: int = 1024, max_retries: int = 2) -> str:
"""Ask with basic retry/exception mapping using SDK error types."""
for attempt in range(max_retries + 1):
try:
return ask(prompt, max_tokens=max_tokens)
except anthropic.APIConnectionError as e: # network / timeouts
log.warning("Connection issue: %s (attempt %s/%s)", e, attempt + 1, max_retries + 1)
if attempt == max_retries:
raise
except anthropic.RateLimitError as e:
log.warning("Rate limited: %s (attempt %s/%s)", e, attempt + 1, max_retries + 1)
if attempt == max_retries:
raise
except anthropic.APIStatusError as e:
log.error("HTTP %s: %s", getattr(e, "status_code", "?"), e.response)
raise
# =====================
# Utilities
# =====================
def _extract_text(blocks: List[Dict[str, Any]]) -> str:
"""Concatenate all text blocks in a content array."""
parts: List[str] = []
for b in blocks or []:
if isinstance(b, dict) and b.get("type") == "text":
parts.append(b.get("text", ""))
return "".join(parts)