"""
Anthropic Python SDK – Generic, SDK-aligned scaffold (single file)

This file mirrors patterns from the official anthropic-sdk-python repository's README
and helpers/docs. It provides:
  • Sync & Async clients
  • Plain messages API helpers
  • Streaming (context manager + text_stream lens)
  • Tool helpers via @beta_tool and tool_runner
  • Token counting utility
  • Message Batches helpers
  • Robust error handling, retry/timeout configuration hooks
  • Minimal Bedrock/Vertex adapters (optional) to show parity

Requires: pip install anthropic
Optionals: pip install anthropic[aiohttp]  # async aiohttp backend

Environment:
  ANTHROPIC_API_KEY=... (required)
  ANTHROPIC_MODEL_ID=claude-sonnet-4-5-20250929 (optional; override as needed)
"""
from __future__ import annotations

import os
import json
import logging
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional

import anthropic
from anthropic import Anthropic, AsyncAnthropic

# =====================
# Configuration
# =====================
API_KEY = os.environ.get("ANTHROPIC_API_KEY")
MODEL = os.environ.get("ANTHROPIC_MODEL_ID", "claude-sonnet-4-5-20250929")

if not API_KEY:
    raise RuntimeError("ANTHROPIC_API_KEY is not set")

# Global logger (opt-in via ANTHROPIC_LOG=info)
log = logging.getLogger(__name__)

# Configure client-level retries/timeouts via ctor or .with_options on demand
client = Anthropic(api_key=API_KEY)
async_client = AsyncAnthropic(api_key=API_KEY)


# =====================
# Basic Messages API (sync)
# =====================

def ask(prompt: str, *, max_tokens: int = 1024) -> str:
    """Single-turn convenience wrapper using the Messages API (sync)."""
    msg = client.messages.create(
        model=MODEL,
        max_tokens=max_tokens,
        messages=[{"role": "user", "content": prompt}],
    )
    return _extract_text(msg.content)


# =====================
# Basic Messages API (async)
# =====================

async def aask(prompt: str, *, max_tokens: int = 1024) -> str:
    """Single-turn convenience wrapper using the Messages API (async)."""
    msg = await async_client.messages.create(
        model=MODEL,
        max_tokens=max_tokens,
        messages=[{"role": "user", "content": prompt}],
    )
    return _extract_text(msg.content)


# =====================
# Streaming helpers (sync + async)
# =====================

def stream_text(prompt: str, *, max_tokens: int = 1024) -> str:
    """Stream a response and return final assembled text if available."""
    # Context manager exposes lenses like .text_stream and final message helpers.
    with client.messages.stream(
        model=MODEL,
        max_tokens=max_tokens,
        messages=[{"role": "user", "content": prompt}],
    ) as stream:
        # Print incremental text as it's generated
        for text in stream.text_stream:
            print(text, end="", flush=True)
        print()
        # Retrieve the final message object
        final = stream.get_final_message()
        return _extract_text(final.content)


async def astream_text(prompt: str, *, max_tokens: int = 1024) -> str:
    """Async streaming variant using the same helpers."""
    async with async_client.messages.stream(
        model=MODEL,
        max_tokens=max_tokens,
        messages=[{"role": "user", "content": prompt}],
    ) as stream:
        async for text in stream.text_stream:
            print(text, end="", flush=True)
        print()
        final = await stream.get_final_message()
        return _extract_text(final.content)


# =====================
# Stateful conversation (sync)
# =====================

@dataclass
class Conversation:
    history: List[Dict[str, Any]]

    @classmethod
    def start(cls, system_prompt: Optional[str] = None) -> "Conversation":
        hist: List[Dict[str, Any]] = []
        if system_prompt:
            hist.append({"role": "system", "content": system_prompt})
        return cls(history=hist)

    def chat(self, user_message: str, *, max_tokens: int = 2048) -> str:
        self.history.append({"role": "user", "content": user_message})
        msg = client.messages.create(
            model=MODEL,
            max_tokens=max_tokens,
            messages=self.history,
        )
        text = _extract_text(msg.content)
        self.history.append({"role": "assistant", "content": text})
        return text


# =====================
# Tool helpers (@beta_tool + tool_runner)
# =====================
# The SDK exposes a decorator and a runner that orchestrates tool calls.
# Keep tools generic — not tied to any specific business logic.

from anthropic import beta_tool  # type: ignore

@beta_tool
def echo(payload: dict) -> str:
    """Echo any JSON payload back as a JSON string."""
    return json.dumps({"echo": payload})

@beta_tool
def time_now(_: dict | None = None) -> str:
    """Return current UTC timestamp as JSON."""
    import datetime as _dt
    return json.dumps({"now_utc": _dt.datetime.now(_dt.timezone.utc).isoformat()})


def run_with_tools(prompt: str, *, tools: Optional[list] = None, max_tokens: int = 2048) -> str:
    """Iteratively run with tool_runner until the model stops calling tools, then return the final text."""
    tools = tools or [echo, time_now]
    runner = client.beta.messages.tool_runner(
        model=MODEL,
        max_tokens=max_tokens,
        tools=tools,
        messages=[{"role": "user", "content": prompt}],
    )
    last_text = ""
    for message in runner:  # each iteration performs a round-trip
        last_text = _extract_text(message.content)
    return last_text


# =====================
# Token counting utility
# =====================

def count_tokens_for(prompt: str) -> int:
    """Return the input token estimate for a message without creating it."""
    count = client.messages.count_tokens(
        model=MODEL,
        messages=[{"role": "user", "content": prompt}],
    )
    return int(getattr(count, "input_tokens", 0))


# =====================
# Message Batches helpers (async is typical, sync supported by SDK as well)
# =====================

async def create_batch(requests: List[Dict[str, Any]]) -> Any:
    """Create a batch. Each request uses the same schema as messages.create()."""
    return await async_client.messages.batches.create(requests=requests)


async def wait_and_collect_batch_results(batch_id: str) -> List[Dict[str, Any]]:
    """Poll for results and return an array of {custom_id, result} objects.
    Uses results() which returns an async stream of entries.
    """
    out: List[Dict[str, Any]] = []
    async for entry in (await async_client.messages.batches.results(batch_id)):
        if entry.result.type == "succeeded":
            out.append({
                "custom_id": entry.custom_id,
                "content": _extract_text(entry.result.message.content),
            })
        else:
            out.append({
                "custom_id": entry.custom_id,
                "error": getattr(entry.result, "error", None),
            })
    return out


# =====================
# Bedrock & Vertex adapters (optional examples)
# =====================

try:
    from anthropic import AnthropicBedrock  # type: ignore
    def bedrock_hello() -> str:
        bclient = AnthropicBedrock()
        msg = bclient.messages.create(
            model="anthropic.claude-sonnet-4-5-20250929-v1:0",
            max_tokens=100,
            messages=[{"role": "user", "content": "Hello!"}],
        )
        return _extract_text(msg.content)
except Exception:  # pragma: no cover
    def bedrock_hello() -> str:
        return "Bedrock extra not installed"

try:
    from anthropic import AnthropicVertex  # type: ignore
    def vertex_hello() -> str:
        vclient = AnthropicVertex()
        msg = vclient.messages.create(
            model="claude-sonnet-4@20250514",
            max_tokens=100,
            messages=[{"role": "user", "content": "Hello!"}],
        )
        return _extract_text(msg.content)
except Exception:  # pragma: no cover
    def vertex_hello() -> str:
        return "Vertex extra not installed"


# =====================
# Error-handling patterns
# =====================

def robust_ask(prompt: str, *, max_tokens: int = 1024, max_retries: int = 2) -> str:
    """Ask with basic retry/exception mapping using SDK error types."""
    for attempt in range(max_retries + 1):
        try:
            return ask(prompt, max_tokens=max_tokens)
        except anthropic.APIConnectionError as e:  # network / timeouts
            log.warning("Connection issue: %s (attempt %s/%s)", e, attempt + 1, max_retries + 1)
            if attempt == max_retries:
                raise
        except anthropic.RateLimitError as e:
            log.warning("Rate limited: %s (attempt %s/%s)", e, attempt + 1, max_retries + 1)
            if attempt == max_retries:
                raise
        except anthropic.APIStatusError as e:
            log.error("HTTP %s: %s", getattr(e, "status_code", "?"), e.response)
            raise


# =====================
# Utilities
# =====================

def _extract_text(blocks: List[Dict[str, Any]]) -> str:
    """Concatenate all text blocks in a content array."""
    parts: List[str] = []
    for b in blocks or []:
        if isinstance(b, dict) and b.get("type") == "text":
            parts.append(b.get("text", ""))
    return "".join(parts)
  

Mayur Gandhi

Regional Enterprise Technologist | Cloud Architect | AWS · GCP · Terraform · AIOps · Vault

Professional Summary
Experience
Regional Enterprise Technologist APJ
CodeRabbit.ai
September 2025 - Present
Staff Engineer
HashiCorp (IBM Company)
February 2022 - August 2025
Vice President
Deutsche Bank
July 2020 - January 2022
Information Architect
IBM GTS Labs
November 2017 - July 2020
Senior Vice President
Yes Bank
April 2015 - October 2017
Platform Architect
Atos
October 2014 - April 2015
Cloud Architect
IBM Watson Labs
September 2009 - September 2014
Senior Engineer
Tech Mahindra
July 2008 - September 2009
Consultant
Siemens India (SISL)
March 2007 - July 2008
Assistant Manager
HDFC Bank
February 2005 - March 2007
Skills & Technologies
AWS
GCP
HashiCorp Vault
Terraform
Consul
Nomad
API Management
DevOps
Ansible
OpenShift
AIOps
LLM
AI Infrastructure
Security Engineering
Certifications
Open Group Speaker
IBM Recognised Speaker
Google Certified Architect
Google Certified ML Engineer
IBM Certified Cloud Architect
HashiCorp Certified Terraform Associate
Public Speaking & Events
Education
Bachelor of Engineering - Computer Science
Mumbai University
2004
References
Armon Dadgar - Co-Founder, CTO HashiCorp an IBM Company
Dave McJannet - Former CEO HashiCorp an IBM Company
Rajesh Thapar - CISO, NSE (National Stock Exchange)
Anup Purohit - Former Global CIO, Wipro
Erik Papir - VP WW Engineering leader Palo Alto Networks
Suhail Gulzar - Solutions Engineering leader Neo4j
Chandresh Dedhia - CTO State Bank of India Securities