Skip to content

Base Classes

Bases: ABC

Abstract base class for LLM provider implementations.

Provides a unified interface for interacting with different LLM providers (OpenAI, Anthropic, Gemini) with automatic retry logic and cost tracking.

Subclasses must implement the :meth:get_response method. Other methods have default implementations that can be overridden for provider-specific optimizations.

Attributes:

Name Type Description
provider

The LLM provider name (e.g., "openai", "anthropic", "gemini").

model

The specific model identifier (e.g., "gpt-4o", "claude-sonnet-4-20250514").

input_cost

Cost per million input tokens in USD.

output_cost

Cost per million output tokens in USD.

supports_temperature_top_p

Whether the model supports temperature/top_p params.

use_web_search

Whether to enable web search (Anthropic only).

api_key_hash

Truncated SHA256 hash of the API key (for logging).

api_key_alias

Optional human-readable name for the API key.

Example

from majordomo_llm import get_llm_instance llm = get_llm_instance("anthropic", "claude-sonnet-4-20250514") response = await llm.get_response("What is 2+2?") print(response.content) 4 print(f"Cost: ${response.total_cost:.6f}")

Source code in majordomo_llm/base.py
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
class LLM(ABC):
    """Abstract base class for LLM provider implementations.

    Provides a unified interface for interacting with different LLM providers
    (OpenAI, Anthropic, Gemini) with automatic retry logic and cost tracking.

    Subclasses must implement the :meth:`get_response` method. Other methods
    have default implementations that can be overridden for provider-specific
    optimizations.

    Attributes:
        provider: The LLM provider name (e.g., "openai", "anthropic", "gemini").
        model: The specific model identifier (e.g., "gpt-4o", "claude-sonnet-4-20250514").
        input_cost: Cost per million input tokens in USD.
        output_cost: Cost per million output tokens in USD.
        supports_temperature_top_p: Whether the model supports temperature/top_p params.
        use_web_search: Whether to enable web search (Anthropic only).
        api_key_hash: Truncated SHA256 hash of the API key (for logging).
        api_key_alias: Optional human-readable name for the API key.

    Example:
        >>> from majordomo_llm import get_llm_instance
        >>> llm = get_llm_instance("anthropic", "claude-sonnet-4-20250514")
        >>> response = await llm.get_response("What is 2+2?")
        >>> print(response.content)
        4
        >>> print(f"Cost: ${response.total_cost:.6f}")
    """

    def __init__(
        self,
        provider: str,
        model: str,
        input_cost: float,
        output_cost: float,
        supports_temperature_top_p: bool = True,
        use_web_search: bool = False,
        api_key: str | None = None,
        api_key_alias: str | None = None,
        base_url: str | None = None,
        default_headers: dict[str, str] | None = None,
        hook_pipeline: HookPipeline | None = None,
    ) -> None:
        """Initialize the LLM instance.

        Args:
            provider: The LLM provider name.
            model: The model identifier.
            input_cost: Cost per million input tokens in USD.
            output_cost: Cost per million output tokens in USD.
            supports_temperature_top_p: Whether temperature/top_p are supported.
            use_web_search: Enable web search capability (Anthropic only).
            api_key: The API key (used to compute hash for logging).
            api_key_alias: Optional human-readable name for the API key.
            base_url: Optional custom base URL for routing through a proxy.
            default_headers: Optional headers sent with every request.
            hook_pipeline: Optional :class:`HookPipeline` that wraps every
                text-producing call. ``get_response_stream`` does not run
                hooks; streaming-chunk interception is deferred.
        """
        self.provider = provider
        self.model = model
        self.input_cost = input_cost
        self.output_cost = output_cost
        self.supports_temperature_top_p = supports_temperature_top_p
        self.use_web_search = use_web_search
        self.api_key_hash = _hash_api_key(api_key) if api_key else None
        self.api_key_alias = api_key_alias
        self.base_url = base_url
        self.default_headers = default_headers
        self.hook_pipeline = hook_pipeline
        self.deprecation_warning: str | None = None
        self.requested_model: str | None = None

    def get_full_model_name(self) -> str:
        """Get the fully qualified model name.

        Returns:
            Model name in the format "provider:model" (e.g., "anthropic:claude-sonnet-4-20250514").
        """
        return f"{self.provider}:{self.model}"

    def _calculate_costs(
        self, input_tokens: int, output_tokens: int
    ) -> tuple[float, float, float]:
        """Calculate costs for a request.

        Args:
            input_tokens: Number of input tokens.
            output_tokens: Number of output tokens.

        Returns:
            Tuple of (input_cost, output_cost, total_cost) in USD.
        """
        input_cost = (input_tokens * self.input_cost) / TOKENS_PER_MILLION
        output_cost = (output_tokens * self.output_cost) / TOKENS_PER_MILLION
        return input_cost, output_cost, input_cost + output_cost

    @abstractmethod
    async def _get_response_impl(
        self,
        user_prompt: str,
        system_prompt: str | None = None,
        temperature: float = 0.3,
        top_p: float = 1.0,
        extra_headers: dict[str, str] | None = None,
    ) -> LLMResponse:
        """Provider-specific implementation of ``get_response``.

        Providers apply ``@retry_provider_call`` here. The public
        :meth:`get_response` wraps this with the optional hook pipeline.
        """
        raise NotImplementedError()

    @abstractmethod
    async def _get_response_stream_impl(
        self,
        user_prompt: str,
        system_prompt: str | None = None,
        temperature: float = 0.3,
        top_p: float = 1.0,
        extra_headers: dict[str, str] | None = None,
    ) -> LLMStreamResponse:
        """Provider-specific implementation of ``get_response_stream``."""
        raise NotImplementedError()

    async def get_response(
        self,
        user_prompt: str,
        system_prompt: str | None = None,
        temperature: float = 0.3,
        top_p: float = 1.0,
        extra_headers: dict[str, str] | None = None,
        *,
        caller_metadata: dict[str, Any] | None = None,
    ) -> LLMResponse:
        """Get a plain text response from the LLM.

        Runs the optional :attr:`hook_pipeline` around the provider call.
        Hooks see the prompt before the call and the response text after.

        Args:
            user_prompt: The user's input prompt.
            system_prompt: Optional system prompt to set context/behavior.
            temperature: Sampling temperature (0.0-2.0). Lower is more deterministic.
            top_p: Nucleus sampling parameter (0.0-1.0).
            extra_headers: Optional per-request headers merged with default_headers.
            caller_metadata: Free-form dict forwarded to every hook via
                :class:`HookContext`. Unused when no pipeline is configured.

        Returns:
            LLMResponse containing the text content and usage metrics.

        Raises:
            HookBlocked: If a hook in the pipeline blocks the call.
            Exception: If the API request fails after retries.
        """
        async def impl(prompt: str) -> LLMResponse:
            return await self._get_response_impl(
                prompt, system_prompt, temperature, top_p, extra_headers=extra_headers
            )

        return await self._run_hooks_returning_response(
            user_prompt, caller_metadata, impl
        )

    async def get_response_stream(
        self,
        user_prompt: str,
        system_prompt: str | None = None,
        temperature: float = 0.3,
        top_p: float = 1.0,
        extra_headers: dict[str, str] | None = None,
        *,
        caller_metadata: dict[str, Any] | None = None,
    ) -> LLMStreamResponse:
        """Get a streaming text response from the LLM.

        Hooks do not run on streaming responses; ``caller_metadata`` is
        accepted for API symmetry and ignored.
        """
        del caller_metadata
        return await self._get_response_stream_impl(
            user_prompt, system_prompt, temperature, top_p, extra_headers=extra_headers
        )

    async def _run_hooks_returning_response(
        self,
        prompt: str,
        caller_metadata: dict[str, Any] | None,
        impl: Callable[[str], Awaitable[LLMResponse]],
    ) -> LLMResponse:
        """Run the configured hook pipeline around an LLMResponse-returning call.

        Hooks operate on text. We capture the underlying ``LLMResponse`` so
        usage metrics survive even when the pipeline rewrites the content.
        """
        if self.hook_pipeline is None:
            return await impl(prompt)

        captured: LLMResponse | None = None

        async def call(modified_prompt: str) -> str:
            nonlocal captured
            captured = await impl(modified_prompt)
            return captured.content

        final_text = await self.hook_pipeline.run(
            prompt, call, caller_metadata=caller_metadata
        )
        assert captured is not None
        if final_text == captured.content:
            return captured
        return LLMResponse(
            content=final_text,
            input_tokens=captured.input_tokens,
            output_tokens=captured.output_tokens,
            cached_tokens=captured.cached_tokens,
            input_cost=captured.input_cost,
            output_cost=captured.output_cost,
            total_cost=captured.total_cost,
            response_time=captured.response_time,
            deprecation_warning=captured.deprecation_warning,
        )

    async def get_json_response(
        self,
        user_prompt: str,
        system_prompt: str | None = None,
        temperature: float = 0.3,
        top_p: float = 1.0,
        extra_headers: dict[str, str] | None = None,
        *,
        caller_metadata: dict[str, Any] | None = None,
    ) -> LLMJSONResponse:
        """Get a JSON response from the LLM.

        Automatically parses the LLM's text response as JSON.

        Args:
            user_prompt: The user's input prompt.
            system_prompt: Optional system prompt to set context/behavior.
            temperature: Sampling temperature (0.0-2.0). Lower is more deterministic.
            top_p: Nucleus sampling parameter (0.0-1.0).
            extra_headers: Optional per-request headers merged with default_headers.

        Returns:
            LLMJSONResponse containing the parsed JSON dict and usage metrics.

        Raises:
            HookBlocked: If a hook in the pipeline blocks the call.
            ResponseParsingError: If the response cannot be parsed as JSON.
            Exception: If the API request fails after retries.
        """
        response = await self.get_response(
            user_prompt,
            system_prompt,
            temperature,
            top_p,
            extra_headers=extra_headers,
            caller_metadata=caller_metadata,
        )
        # Strip markdown code fencing if present
        content = response.content.replace("```json", "").replace("```", "").strip()
        try:
            parsed_content = json.loads(content)
        except json.JSONDecodeError as e:
            raise ResponseParsingError(
                f"Failed to parse JSON response: {e}",
                raw_content=response.content,
            ) from e
        return LLMJSONResponse(
            content=parsed_content,
            input_tokens=response.input_tokens,
            output_tokens=response.output_tokens,
            cached_tokens=response.cached_tokens,
            input_cost=response.input_cost,
            output_cost=response.output_cost,
            total_cost=response.total_cost,
            response_time=response.response_time,
        )

    async def get_structured_json_response(
        self,
        response_model: type[T],
        user_prompt: str,
        system_prompt: str | None = None,
        temperature: float = 0.3,
        top_p: float = 1.0,
        extra_headers: dict[str, str] | None = None,
        *,
        caller_metadata: dict[str, Any] | None = None,
    ) -> LLMStructuredResponse:
        """Get a structured response validated against a Pydantic model.

        Uses provider-specific mechanisms (tool calling, response schemas) to
        ensure the response conforms to the specified Pydantic model schema.

        Args:
            response_model: Pydantic model class defining the expected structure.
            user_prompt: The user's input prompt.
            system_prompt: Optional system prompt to set context/behavior.
            temperature: Sampling temperature (0.0-2.0). Lower is more deterministic.
            top_p: Nucleus sampling parameter (0.0-1.0).

        Returns:
            LLMStructuredResponse containing the validated Pydantic model instance.

        Raises:
            pydantic.ValidationError: If the response doesn't match the model schema.
            Exception: If the API request fails after retries.

        Example:
            >>> from pydantic import BaseModel
            >>> class Person(BaseModel):
            ...     name: str
            ...     age: int
            >>> response = await llm.get_structured_json_response(
            ...     response_model=Person,
            ...     user_prompt="Extract: John is 30 years old",
            ... )
            >>> print(response.content.name)
            John
        """
        response = await self.get_json_schema_response(
            user_prompt=user_prompt,
            response_schema=response_model.model_json_schema(),
            system_prompt=system_prompt,
            schema_name=response_model.__name__,
            schema_description=(
                f"Provide a structured response using the {response_model.__name__} schema"
            ),
            temperature=temperature,
            top_p=top_p,
            extra_headers=extra_headers,
            caller_metadata=caller_metadata,
        )
        parsed_content = response_model.model_validate_json(response.content)

        return LLMStructuredResponse(
            content=parsed_content,
            input_tokens=response.input_tokens,
            output_tokens=response.output_tokens,
            cached_tokens=response.cached_tokens,
            input_cost=response.input_cost,
            output_cost=response.output_cost,
            total_cost=response.total_cost,
            response_time=response.response_time,
        )

    async def get_json_schema_response(
        self,
        user_prompt: str,
        response_schema: dict[str, Any],
        system_prompt: str | None = None,
        schema_name: str = "Response",
        schema_description: str | None = None,
        temperature: float = 0.3,
        top_p: float = 1.0,
        extra_headers: dict[str, str] | None = None,
        *,
        caller_metadata: dict[str, Any] | None = None,
        **kwargs: Any,
    ) -> LLMResponse:
        """Get a structured JSON response validated against a raw JSON schema.

        Runs the optional :attr:`hook_pipeline` around the provider call.
        Hooks see the raw provider JSON text in ``after_call`` before
        downstream pydantic/JSON-schema parsing.

        Args:
            user_prompt: The user's input prompt.
            response_schema: Raw JSON schema dict defining the expected response.
            system_prompt: Optional system prompt to set context/behavior.
            schema_name: Provider-facing schema/tool name.
            schema_description: Optional provider-facing schema/tool description.
            temperature: Sampling temperature (0.0-2.0).
            top_p: Nucleus sampling parameter (0.0-1.0).
            extra_headers: Optional per-request headers merged with default_headers.
            caller_metadata: Free-form dict forwarded to every hook.
            **kwargs: Reserved for future provider-specific passthrough arguments.

        Returns:
            LLMResponse whose content is canonical JSON with sorted keys and no extra whitespace.

        Raises:
            HookBlocked: If a hook in the pipeline blocks the call.
        """
        ensure_no_unexpected_kwargs(kwargs)

        async def impl(prompt: str) -> LLMResponse:
            return await self._get_json_schema_response_retried(
                user_prompt=prompt,
                response_schema=response_schema,
                system_prompt=system_prompt,
                schema_name=schema_name,
                schema_description=schema_description,
                temperature=temperature,
                top_p=top_p,
                extra_headers=extra_headers,
            )

        return await self._run_hooks_returning_response(
            user_prompt, caller_metadata, impl
        )

    @retry_provider_call
    async def _get_json_schema_response_retried(
        self,
        user_prompt: str,
        response_schema: dict[str, Any],
        system_prompt: str | None = None,
        schema_name: str = "Response",
        schema_description: str | None = None,
        temperature: float = 0.3,
        top_p: float = 1.0,
        extra_headers: dict[str, str] | None = None,
    ) -> LLMResponse:
        """Retry-wrapped delegate to the provider override.

        Sits inside the hook boundary so retries do not re-fire hooks.
        """
        return await self._get_json_schema_response(
            user_prompt=user_prompt,
            response_schema=response_schema,
            system_prompt=system_prompt,
            schema_name=schema_name,
            schema_description=schema_description,
            temperature=temperature,
            top_p=top_p,
            extra_headers=extra_headers,
        )

    async def _get_json_schema_response(
        self,
        user_prompt: str,
        response_schema: dict[str, Any],
        system_prompt: str | None = None,
        schema_name: str = "Response",
        schema_description: str | None = None,
        temperature: float = 0.3,
        top_p: float = 1.0,
        extra_headers: dict[str, str] | None = None,
    ) -> LLMResponse:
        """Provider-specific implementation for raw JSON-schema structured responses."""
        raise StructuredOutputUnsupported(self.provider, self.model)

    async def _get_structured_response(
        self,
        response_model: type[T],
        user_prompt: str,
        system_prompt: str | None = None,
        temperature: float = 0.3,
        top_p: float = 1.0,
        extra_headers: dict[str, str] | None = None,
    ) -> LLMJSONResponse:
        """Provider-specific implementation for structured responses.

        Default implementation injects the JSON schema into the system prompt.
        Providers should override this to use native structured output features.

        Args:
            response_model: Pydantic model class defining the expected structure.
            user_prompt: The user's input prompt.
            system_prompt: Optional system prompt to set context/behavior.
            temperature: Sampling temperature (0.0-2.0).
            top_p: Nucleus sampling parameter (0.0-1.0).
            extra_headers: Optional per-request headers merged with default_headers.

        Returns:
            LLMJSONResponse containing the parsed JSON content.
        """
        response = await self.get_json_schema_response(
            user_prompt=user_prompt,
            response_schema=response_model.model_json_schema(),
            system_prompt=system_prompt,
            schema_name=response_model.__name__,
            temperature=temperature,
            top_p=top_p,
            extra_headers=extra_headers,
        )
        return LLMJSONResponse(
            content=json.loads(response.content),
            input_tokens=response.input_tokens,
            output_tokens=response.output_tokens,
            cached_tokens=response.cached_tokens,
            input_cost=response.input_cost,
            output_cost=response.output_cost,
            total_cost=response.total_cost,
            response_time=response.response_time,
        )

__init__

__init__(provider, model, input_cost, output_cost, supports_temperature_top_p=True, use_web_search=False, api_key=None, api_key_alias=None, base_url=None, default_headers=None, hook_pipeline=None)

Initialize the LLM instance.

Parameters:

Name Type Description Default
provider str

The LLM provider name.

required
model str

The model identifier.

required
input_cost float

Cost per million input tokens in USD.

required
output_cost float

Cost per million output tokens in USD.

required
supports_temperature_top_p bool

Whether temperature/top_p are supported.

True
use_web_search bool

Enable web search capability (Anthropic only).

False
api_key str | None

The API key (used to compute hash for logging).

None
api_key_alias str | None

Optional human-readable name for the API key.

None
base_url str | None

Optional custom base URL for routing through a proxy.

None
default_headers dict[str, str] | None

Optional headers sent with every request.

None
hook_pipeline HookPipeline | None

Optional :class:HookPipeline that wraps every text-producing call. get_response_stream does not run hooks; streaming-chunk interception is deferred.

None
Source code in majordomo_llm/base.py
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
def __init__(
    self,
    provider: str,
    model: str,
    input_cost: float,
    output_cost: float,
    supports_temperature_top_p: bool = True,
    use_web_search: bool = False,
    api_key: str | None = None,
    api_key_alias: str | None = None,
    base_url: str | None = None,
    default_headers: dict[str, str] | None = None,
    hook_pipeline: HookPipeline | None = None,
) -> None:
    """Initialize the LLM instance.

    Args:
        provider: The LLM provider name.
        model: The model identifier.
        input_cost: Cost per million input tokens in USD.
        output_cost: Cost per million output tokens in USD.
        supports_temperature_top_p: Whether temperature/top_p are supported.
        use_web_search: Enable web search capability (Anthropic only).
        api_key: The API key (used to compute hash for logging).
        api_key_alias: Optional human-readable name for the API key.
        base_url: Optional custom base URL for routing through a proxy.
        default_headers: Optional headers sent with every request.
        hook_pipeline: Optional :class:`HookPipeline` that wraps every
            text-producing call. ``get_response_stream`` does not run
            hooks; streaming-chunk interception is deferred.
    """
    self.provider = provider
    self.model = model
    self.input_cost = input_cost
    self.output_cost = output_cost
    self.supports_temperature_top_p = supports_temperature_top_p
    self.use_web_search = use_web_search
    self.api_key_hash = _hash_api_key(api_key) if api_key else None
    self.api_key_alias = api_key_alias
    self.base_url = base_url
    self.default_headers = default_headers
    self.hook_pipeline = hook_pipeline
    self.deprecation_warning: str | None = None
    self.requested_model: str | None = None

get_full_model_name

get_full_model_name()

Get the fully qualified model name.

Returns:

Type Description
str

Model name in the format "provider:model" (e.g., "anthropic:claude-sonnet-4-20250514").

Source code in majordomo_llm/base.py
573
574
575
576
577
578
579
def get_full_model_name(self) -> str:
    """Get the fully qualified model name.

    Returns:
        Model name in the format "provider:model" (e.g., "anthropic:claude-sonnet-4-20250514").
    """
    return f"{self.provider}:{self.model}"

get_json_response async

get_json_response(user_prompt, system_prompt=None, temperature=0.3, top_p=1.0, extra_headers=None, *, caller_metadata=None)

Get a JSON response from the LLM.

Automatically parses the LLM's text response as JSON.

Parameters:

Name Type Description Default
user_prompt str

The user's input prompt.

required
system_prompt str | None

Optional system prompt to set context/behavior.

None
temperature float

Sampling temperature (0.0-2.0). Lower is more deterministic.

0.3
top_p float

Nucleus sampling parameter (0.0-1.0).

1.0
extra_headers dict[str, str] | None

Optional per-request headers merged with default_headers.

None

Returns:

Type Description
LLMJSONResponse

LLMJSONResponse containing the parsed JSON dict and usage metrics.

Raises:

Type Description
HookBlocked

If a hook in the pipeline blocks the call.

ResponseParsingError

If the response cannot be parsed as JSON.

Exception

If the API request fails after retries.

Source code in majordomo_llm/base.py
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
async def get_json_response(
    self,
    user_prompt: str,
    system_prompt: str | None = None,
    temperature: float = 0.3,
    top_p: float = 1.0,
    extra_headers: dict[str, str] | None = None,
    *,
    caller_metadata: dict[str, Any] | None = None,
) -> LLMJSONResponse:
    """Get a JSON response from the LLM.

    Automatically parses the LLM's text response as JSON.

    Args:
        user_prompt: The user's input prompt.
        system_prompt: Optional system prompt to set context/behavior.
        temperature: Sampling temperature (0.0-2.0). Lower is more deterministic.
        top_p: Nucleus sampling parameter (0.0-1.0).
        extra_headers: Optional per-request headers merged with default_headers.

    Returns:
        LLMJSONResponse containing the parsed JSON dict and usage metrics.

    Raises:
        HookBlocked: If a hook in the pipeline blocks the call.
        ResponseParsingError: If the response cannot be parsed as JSON.
        Exception: If the API request fails after retries.
    """
    response = await self.get_response(
        user_prompt,
        system_prompt,
        temperature,
        top_p,
        extra_headers=extra_headers,
        caller_metadata=caller_metadata,
    )
    # Strip markdown code fencing if present
    content = response.content.replace("```json", "").replace("```", "").strip()
    try:
        parsed_content = json.loads(content)
    except json.JSONDecodeError as e:
        raise ResponseParsingError(
            f"Failed to parse JSON response: {e}",
            raw_content=response.content,
        ) from e
    return LLMJSONResponse(
        content=parsed_content,
        input_tokens=response.input_tokens,
        output_tokens=response.output_tokens,
        cached_tokens=response.cached_tokens,
        input_cost=response.input_cost,
        output_cost=response.output_cost,
        total_cost=response.total_cost,
        response_time=response.response_time,
    )

get_json_schema_response async

get_json_schema_response(user_prompt, response_schema, system_prompt=None, schema_name='Response', schema_description=None, temperature=0.3, top_p=1.0, extra_headers=None, *, caller_metadata=None, **kwargs)

Get a structured JSON response validated against a raw JSON schema.

Runs the optional :attr:hook_pipeline around the provider call. Hooks see the raw provider JSON text in after_call before downstream pydantic/JSON-schema parsing.

Parameters:

Name Type Description Default
user_prompt str

The user's input prompt.

required
response_schema dict[str, Any]

Raw JSON schema dict defining the expected response.

required
system_prompt str | None

Optional system prompt to set context/behavior.

None
schema_name str

Provider-facing schema/tool name.

'Response'
schema_description str | None

Optional provider-facing schema/tool description.

None
temperature float

Sampling temperature (0.0-2.0).

0.3
top_p float

Nucleus sampling parameter (0.0-1.0).

1.0
extra_headers dict[str, str] | None

Optional per-request headers merged with default_headers.

None
caller_metadata dict[str, Any] | None

Free-form dict forwarded to every hook.

None
**kwargs Any

Reserved for future provider-specific passthrough arguments.

{}

Returns:

Type Description
LLMResponse

LLMResponse whose content is canonical JSON with sorted keys and no extra whitespace.

Raises:

Type Description
HookBlocked

If a hook in the pipeline blocks the call.

Source code in majordomo_llm/base.py
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
async def get_json_schema_response(
    self,
    user_prompt: str,
    response_schema: dict[str, Any],
    system_prompt: str | None = None,
    schema_name: str = "Response",
    schema_description: str | None = None,
    temperature: float = 0.3,
    top_p: float = 1.0,
    extra_headers: dict[str, str] | None = None,
    *,
    caller_metadata: dict[str, Any] | None = None,
    **kwargs: Any,
) -> LLMResponse:
    """Get a structured JSON response validated against a raw JSON schema.

    Runs the optional :attr:`hook_pipeline` around the provider call.
    Hooks see the raw provider JSON text in ``after_call`` before
    downstream pydantic/JSON-schema parsing.

    Args:
        user_prompt: The user's input prompt.
        response_schema: Raw JSON schema dict defining the expected response.
        system_prompt: Optional system prompt to set context/behavior.
        schema_name: Provider-facing schema/tool name.
        schema_description: Optional provider-facing schema/tool description.
        temperature: Sampling temperature (0.0-2.0).
        top_p: Nucleus sampling parameter (0.0-1.0).
        extra_headers: Optional per-request headers merged with default_headers.
        caller_metadata: Free-form dict forwarded to every hook.
        **kwargs: Reserved for future provider-specific passthrough arguments.

    Returns:
        LLMResponse whose content is canonical JSON with sorted keys and no extra whitespace.

    Raises:
        HookBlocked: If a hook in the pipeline blocks the call.
    """
    ensure_no_unexpected_kwargs(kwargs)

    async def impl(prompt: str) -> LLMResponse:
        return await self._get_json_schema_response_retried(
            user_prompt=prompt,
            response_schema=response_schema,
            system_prompt=system_prompt,
            schema_name=schema_name,
            schema_description=schema_description,
            temperature=temperature,
            top_p=top_p,
            extra_headers=extra_headers,
        )

    return await self._run_hooks_returning_response(
        user_prompt, caller_metadata, impl
    )

get_response async

get_response(user_prompt, system_prompt=None, temperature=0.3, top_p=1.0, extra_headers=None, *, caller_metadata=None)

Get a plain text response from the LLM.

Runs the optional :attr:hook_pipeline around the provider call. Hooks see the prompt before the call and the response text after.

Parameters:

Name Type Description Default
user_prompt str

The user's input prompt.

required
system_prompt str | None

Optional system prompt to set context/behavior.

None
temperature float

Sampling temperature (0.0-2.0). Lower is more deterministic.

0.3
top_p float

Nucleus sampling parameter (0.0-1.0).

1.0
extra_headers dict[str, str] | None

Optional per-request headers merged with default_headers.

None
caller_metadata dict[str, Any] | None

Free-form dict forwarded to every hook via :class:HookContext. Unused when no pipeline is configured.

None

Returns:

Type Description
LLMResponse

LLMResponse containing the text content and usage metrics.

Raises:

Type Description
HookBlocked

If a hook in the pipeline blocks the call.

Exception

If the API request fails after retries.

Source code in majordomo_llm/base.py
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
async def get_response(
    self,
    user_prompt: str,
    system_prompt: str | None = None,
    temperature: float = 0.3,
    top_p: float = 1.0,
    extra_headers: dict[str, str] | None = None,
    *,
    caller_metadata: dict[str, Any] | None = None,
) -> LLMResponse:
    """Get a plain text response from the LLM.

    Runs the optional :attr:`hook_pipeline` around the provider call.
    Hooks see the prompt before the call and the response text after.

    Args:
        user_prompt: The user's input prompt.
        system_prompt: Optional system prompt to set context/behavior.
        temperature: Sampling temperature (0.0-2.0). Lower is more deterministic.
        top_p: Nucleus sampling parameter (0.0-1.0).
        extra_headers: Optional per-request headers merged with default_headers.
        caller_metadata: Free-form dict forwarded to every hook via
            :class:`HookContext`. Unused when no pipeline is configured.

    Returns:
        LLMResponse containing the text content and usage metrics.

    Raises:
        HookBlocked: If a hook in the pipeline blocks the call.
        Exception: If the API request fails after retries.
    """
    async def impl(prompt: str) -> LLMResponse:
        return await self._get_response_impl(
            prompt, system_prompt, temperature, top_p, extra_headers=extra_headers
        )

    return await self._run_hooks_returning_response(
        user_prompt, caller_metadata, impl
    )

get_response_stream async

get_response_stream(user_prompt, system_prompt=None, temperature=0.3, top_p=1.0, extra_headers=None, *, caller_metadata=None)

Get a streaming text response from the LLM.

Hooks do not run on streaming responses; caller_metadata is accepted for API symmetry and ignored.

Source code in majordomo_llm/base.py
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
async def get_response_stream(
    self,
    user_prompt: str,
    system_prompt: str | None = None,
    temperature: float = 0.3,
    top_p: float = 1.0,
    extra_headers: dict[str, str] | None = None,
    *,
    caller_metadata: dict[str, Any] | None = None,
) -> LLMStreamResponse:
    """Get a streaming text response from the LLM.

    Hooks do not run on streaming responses; ``caller_metadata`` is
    accepted for API symmetry and ignored.
    """
    del caller_metadata
    return await self._get_response_stream_impl(
        user_prompt, system_prompt, temperature, top_p, extra_headers=extra_headers
    )

get_structured_json_response async

get_structured_json_response(response_model, user_prompt, system_prompt=None, temperature=0.3, top_p=1.0, extra_headers=None, *, caller_metadata=None)

Get a structured response validated against a Pydantic model.

Uses provider-specific mechanisms (tool calling, response schemas) to ensure the response conforms to the specified Pydantic model schema.

Parameters:

Name Type Description Default
response_model type[T]

Pydantic model class defining the expected structure.

required
user_prompt str

The user's input prompt.

required
system_prompt str | None

Optional system prompt to set context/behavior.

None
temperature float

Sampling temperature (0.0-2.0). Lower is more deterministic.

0.3
top_p float

Nucleus sampling parameter (0.0-1.0).

1.0

Returns:

Type Description
LLMStructuredResponse

LLMStructuredResponse containing the validated Pydantic model instance.

Raises:

Type Description
ValidationError

If the response doesn't match the model schema.

Exception

If the API request fails after retries.

Example

from pydantic import BaseModel class Person(BaseModel): ... name: str ... age: int response = await llm.get_structured_json_response( ... response_model=Person, ... user_prompt="Extract: John is 30 years old", ... ) print(response.content.name) John

Source code in majordomo_llm/base.py
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
async def get_structured_json_response(
    self,
    response_model: type[T],
    user_prompt: str,
    system_prompt: str | None = None,
    temperature: float = 0.3,
    top_p: float = 1.0,
    extra_headers: dict[str, str] | None = None,
    *,
    caller_metadata: dict[str, Any] | None = None,
) -> LLMStructuredResponse:
    """Get a structured response validated against a Pydantic model.

    Uses provider-specific mechanisms (tool calling, response schemas) to
    ensure the response conforms to the specified Pydantic model schema.

    Args:
        response_model: Pydantic model class defining the expected structure.
        user_prompt: The user's input prompt.
        system_prompt: Optional system prompt to set context/behavior.
        temperature: Sampling temperature (0.0-2.0). Lower is more deterministic.
        top_p: Nucleus sampling parameter (0.0-1.0).

    Returns:
        LLMStructuredResponse containing the validated Pydantic model instance.

    Raises:
        pydantic.ValidationError: If the response doesn't match the model schema.
        Exception: If the API request fails after retries.

    Example:
        >>> from pydantic import BaseModel
        >>> class Person(BaseModel):
        ...     name: str
        ...     age: int
        >>> response = await llm.get_structured_json_response(
        ...     response_model=Person,
        ...     user_prompt="Extract: John is 30 years old",
        ... )
        >>> print(response.content.name)
        John
    """
    response = await self.get_json_schema_response(
        user_prompt=user_prompt,
        response_schema=response_model.model_json_schema(),
        system_prompt=system_prompt,
        schema_name=response_model.__name__,
        schema_description=(
            f"Provide a structured response using the {response_model.__name__} schema"
        ),
        temperature=temperature,
        top_p=top_p,
        extra_headers=extra_headers,
        caller_metadata=caller_metadata,
    )
    parsed_content = response_model.model_validate_json(response.content)

    return LLMStructuredResponse(
        content=parsed_content,
        input_tokens=response.input_tokens,
        output_tokens=response.output_tokens,
        cached_tokens=response.cached_tokens,
        input_cost=response.input_cost,
        output_cost=response.output_cost,
        total_cost=response.total_cost,
        response_time=response.response_time,
    )

Bases: Usage

Response from an LLM containing plain text content.

Inherits all usage metrics from :class:Usage.

Attributes:

Name Type Description
content str

The text content of the LLM response.

deprecation_warning str | None

Warning if a deprecated model was auto-replaced.

Source code in majordomo_llm/base.py
355
356
357
358
359
360
361
362
363
364
365
366
367
@dataclass
class LLMResponse(Usage):
    """Response from an LLM containing plain text content.

    Inherits all usage metrics from :class:`Usage`.

    Attributes:
        content: The text content of the LLM response.
        deprecation_warning: Warning if a deprecated model was auto-replaced.
    """

    content: str
    deprecation_warning: str | None = None

Async-iterable wrapper around a streaming LLM response.

Yields text chunks as they arrive. After iteration completes, usage and cost data is available via the :attr:usage property.

Example

stream = await llm.get_response_stream("Hello") async for chunk in stream: ... print(chunk, end="") print(stream.usage.total_cost)

Source code in majordomo_llm/base.py
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
class LLMStreamResponse:
    """Async-iterable wrapper around a streaming LLM response.

    Yields text chunks as they arrive. After iteration completes, usage
    and cost data is available via the :attr:`usage` property.

    Example:
        >>> stream = await llm.get_response_stream("Hello")
        >>> async for chunk in stream:
        ...     print(chunk, end="")
        >>> print(stream.usage.total_cost)
    """

    def __init__(
        self,
        stream: AsyncIterator[str],
        state: _StreamState,
        llm: "LLM",
    ) -> None:
        self._stream = stream
        self._state = state
        self._llm = llm
        self._chunks: list[str] = []
        self._consumed = False
        self._usage: Usage | None = None
        self._on_complete: Callable[[Usage, str], None] | None = None
        self._on_error: Callable[[Exception], None] | None = None

    def __aiter__(self) -> "LLMStreamResponse":
        return self

    async def __anext__(self) -> str:
        try:
            chunk = await self._stream.__anext__()
            self._chunks.append(chunk)
            return chunk
        except StopAsyncIteration:
            self._finalize()
            raise
        except Exception as e:
            if self._on_error:
                self._on_error(e)
            raise

    def _finalize(self) -> None:
        if self._consumed:
            return
        self._consumed = True
        response_time = time.time() - self._state.start_time
        input_cost, output_cost, total_cost = self._llm._calculate_costs(
            self._state.input_tokens, self._state.output_tokens
        )
        self._usage = Usage(
            input_tokens=self._state.input_tokens,
            output_tokens=self._state.output_tokens,
            cached_tokens=self._state.cached_tokens,
            input_cost=input_cost,
            output_cost=output_cost,
            total_cost=total_cost,
            response_time=response_time,
        )
        if self._on_complete:
            self._on_complete(self._usage, "".join(self._chunks))

    @property
    def usage(self) -> Usage | None:
        """Usage metrics, available after the stream is fully consumed."""
        return self._usage

    async def collect(self) -> LLMResponse:
        """Consume the entire stream and return an :class:`LLMResponse`."""
        chunks: list[str] = []
        async for chunk in self:
            chunks.append(chunk)
        assert self._usage is not None
        return LLMResponse(
            content="".join(self._chunks),
            input_tokens=self._usage.input_tokens,
            output_tokens=self._usage.output_tokens,
            cached_tokens=self._usage.cached_tokens,
            input_cost=self._usage.input_cost,
            output_cost=self._usage.output_cost,
            total_cost=self._usage.total_cost,
            response_time=self._usage.response_time,
            deprecation_warning=self._llm.deprecation_warning,
        )

usage property

usage

Usage metrics, available after the stream is fully consumed.

collect async

collect()

Consume the entire stream and return an :class:LLMResponse.

Source code in majordomo_llm/base.py
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
async def collect(self) -> LLMResponse:
    """Consume the entire stream and return an :class:`LLMResponse`."""
    chunks: list[str] = []
    async for chunk in self:
        chunks.append(chunk)
    assert self._usage is not None
    return LLMResponse(
        content="".join(self._chunks),
        input_tokens=self._usage.input_tokens,
        output_tokens=self._usage.output_tokens,
        cached_tokens=self._usage.cached_tokens,
        input_cost=self._usage.input_cost,
        output_cost=self._usage.output_cost,
        total_cost=self._usage.total_cost,
        response_time=self._usage.response_time,
        deprecation_warning=self._llm.deprecation_warning,
    )

Bases: Usage

Response from an LLM containing parsed JSON content.

Inherits all usage metrics from :class:Usage.

Attributes:

Name Type Description
content dict[str, Any]

The parsed JSON content as a Python dict.

Source code in majordomo_llm/base.py
370
371
372
373
374
375
376
377
378
379
380
@dataclass
class LLMJSONResponse(Usage):
    """Response from an LLM containing parsed JSON content.

    Inherits all usage metrics from :class:`Usage`.

    Attributes:
        content: The parsed JSON content as a Python dict.
    """

    content: dict[str, Any]

Bases: Usage

Response from an LLM containing a validated Pydantic model.

Inherits all usage metrics from :class:Usage.

Attributes:

Name Type Description
content BaseModel

The validated Pydantic model instance.

Source code in majordomo_llm/base.py
383
384
385
386
387
388
389
390
391
392
393
@dataclass
class LLMStructuredResponse(Usage):
    """Response from an LLM containing a validated Pydantic model.

    Inherits all usage metrics from :class:`Usage`.

    Attributes:
        content: The validated Pydantic model instance.
    """

    content: BaseModel