Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 153 additions & 76 deletions src/mcp/server/mcpserver/utilities/func_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def pre_parse_json(self, data: dict[str, Any]) -> dict[str, Any]:
continue # Not JSON - skip
if isinstance(pre_parsed, str | int | float):
# This is likely that the raw value is e.g. `"hello"` which we
# Should really be parsed as '"hello"' in Python - but if we parse
# Should really be parsed as '"'hello'"' in Python - but if we parse
# it as JSON it'll turn into just 'hello'. So we skip it.
continue
new_data[data_key] = pre_parsed
Expand Down Expand Up @@ -206,19 +206,69 @@ def func_metadata(
A FuncMetadata object containing:
- arg_model: A Pydantic model representing the function's arguments
- output_model: A Pydantic model for the return type if the output is structured
- wrap_output: Whether the function result needs to be wrapped in `{"result": ...}` for structured output.
- wrap_output: Whether the function result needs to be wrapped in {"result": ...}
for structured output.
"""
sig = _get_function_signature(func)
arguments_model = _build_arg_model(sig, func.__name__, skip_names)

if structured_output is False:
return FuncMetadata(arg_model=arguments_model)

resolved = _resolve_return_annotation(sig, structured_output, func.__name__)
if resolved is None:
return FuncMetadata(arg_model=arguments_model)

original_annotation, return_type_expr = resolved

output_model, output_schema, wrap_output = _try_create_model_and_schema(
original_annotation, return_type_expr, func.__name__
)

if output_model is None and structured_output is True:
raise InvalidSignature(
f"Function {func.__name__}: return type {return_type_expr} is not serializable for structured output"
)

return FuncMetadata(
arg_model=arguments_model,
output_schema=output_schema,
output_model=output_model,
wrap_output=wrap_output,
)


def _get_function_signature(func: Callable[..., Any]) -> inspect.Signature:
"""Get the signature of a function, raising InvalidSignature on failure."""
try:
sig = inspect.signature(func, eval_str=True)
return inspect.signature(func, eval_str=True)
except NameError as e: # pragma: no cover
# This raise could perhaps be skipped, and we (MCPServer) just call
# model_rebuild right before using it 🤷
raise InvalidSignature(f"Unable to evaluate type annotations for callable {func.__name__!r}") from e


def _build_arg_model(
sig: inspect.Signature,
func_name: str,
skip_names: Sequence[str] = (),
) -> type[ArgModelBase]:
"""Build a Pydantic model representing the function's arguments.

Iterates over the function's parameters, handling type annotations, defaults,
and BaseModel attribute name conflicts (via aliasing).

Args:
sig: The function's inspect.Signature.
func_name: The function's name (used for the model name).
skip_names: Parameter names to exclude from the model.

Returns:
A dynamically created Pydantic model class.
"""
params = sig.parameters
dynamic_pydantic_model_params: dict[str, Any] = {}
for param in params.values():
if param.name.startswith("_"): # pragma: no cover
raise InvalidSignature(f"Parameter {param.name} of {func.__name__} cannot start with '_'")
raise InvalidSignature(f"Parameter {param.name} of {func_name} cannot start with '_'")
if param.name in skip_names:
continue

Expand All @@ -245,24 +295,38 @@ def func_metadata(
else:
dynamic_pydantic_model_params[field_name] = Annotated[(annotation, *field_metadata, Field(**field_kwargs))]

arguments_model = create_model(
f"{func.__name__}Arguments",
return create_model(
f"{func_name}Arguments",
__base__=ArgModelBase,
**dynamic_pydantic_model_params,
)

if structured_output is False:
return FuncMetadata(arg_model=arguments_model)

# set up structured output support based on return type annotation
def _resolve_return_annotation(
sig: inspect.Signature,
structured_output: bool | None,
func_name: str,
) -> tuple[Any, Any] | None:
"""Resolve and validate the function's return type annotation for structured output.

Handles special cases including CallToolResult, Annotated metadata, and Union types.

Args:
sig: The function's inspect.Signature.
structured_output: Whether structured output is requested (None for auto-detect).
func_name: The function's name (used for error messages).

Returns:
A tuple of (original_annotation, type_expr) if structured output should be
attempted, or None if no structured output is needed.
"""
if sig.return_annotation is inspect.Parameter.empty and structured_output is True:
raise InvalidSignature(f"Function {func.__name__}: return annotation required for structured output")
raise InvalidSignature(f"Function {func_name}: return annotation required for structured output")

try:
inspected_return_ann = inspect_annotation(sig.return_annotation, annotation_source=AnnotationSource.FUNCTION)
except ForbiddenQualifier as e:
raise InvalidSignature(f"Function {func.__name__}: return annotation contains an invalid type qualifier") from e
raise InvalidSignature(f"Function {func_name}: return annotation contains an invalid type qualifier") from e

return_type_expr = inspected_return_ann.type

Expand All @@ -275,7 +339,7 @@ def func_metadata(
# Check if CallToolResult appears in the union (excluding None for Optional check)
if any(isinstance(arg, type) and issubclass(arg, CallToolResult) for arg in args if arg is not type(None)):
raise InvalidSignature(
f"Function {func.__name__}: CallToolResult cannot be used in Union or Optional types. "
f"Function {func_name}: CallToolResult cannot be used in Union or Optional types. "
"To return empty results, use: CallToolResult(content=[])"
)

Expand All @@ -297,26 +361,11 @@ def func_metadata(
# as being `ReturnType`:
original_annotation = return_type_expr
else:
return FuncMetadata(arg_model=arguments_model)
return None
else:
original_annotation = sig.return_annotation

output_model, output_schema, wrap_output = _try_create_model_and_schema(
original_annotation, return_type_expr, func.__name__
)

if output_model is None and structured_output is True:
# Model creation failed or produced warnings - no structured output
raise InvalidSignature(
f"Function {func.__name__}: return type {return_type_expr} is not serializable for structured output"
)

return FuncMetadata(
arg_model=arguments_model,
output_schema=output_schema,
output_model=output_model,
wrap_output=wrap_output,
)
return original_annotation, cast(Any, return_type_expr)


def _try_create_model_and_schema(
Expand All @@ -337,16 +386,46 @@ def _try_create_model_and_schema(
Model and schema are None if warnings occur or creation fails.
wrap_output is True if the result needs to be wrapped in {"result": ...}
"""
model = None
wrap_output = False
model, wrap_output = _create_output_model(original_annotation, type_expr, func_name)

if model is not None:
schema = _try_generate_strict_schema(model, type_expr, func_name)
if schema is None:
return None, None, False
return model, schema, wrap_output

return None, None, False

# First handle special case: None

def _create_output_model(
original_annotation: Any,
type_expr: Any,
func_name: str,
) -> tuple[type[BaseModel] | None, bool]:
"""Create a Pydantic model for the function's return type.

Dispatches to the appropriate model creation strategy based on the type:
- None -> wrapped model
- GenericAlias (list, dict, Union, etc.) -> wrapped or dict model
- BaseModel subclasses -> used directly
- TypedDict -> converted to Pydantic model
- Primitive types -> wrapped model
- Classes with type hints -> converted to Pydantic model

Args:
original_annotation: The original return annotation.
type_expr: The underlying type expression.
func_name: The function's name.

Returns:
A tuple of (model or None, wrap_output).
"""
# Special case: None
if type_expr is None:
model = _create_wrapped_model(func_name, original_annotation)
wrap_output = True
return _create_wrapped_model(func_name, original_annotation), True

# Handle GenericAlias types (list[str], dict[str, int], Union[str, int], etc.)
elif isinstance(type_expr, GenericAlias):
if isinstance(type_expr, GenericAlias):
origin = get_origin(type_expr)

# Special case: dict with string keys can use RootModel
Expand All @@ -355,65 +434,63 @@ def _try_create_model_and_schema(
if len(args) == 2 and args[0] is str:
# TODO: should we use the original annotation? We are losing any potential `Annotated`
# metadata for Pydantic here:
model = _create_dict_model(func_name, type_expr)
return _create_dict_model(func_name, type_expr), False
else:
# dict with non-str keys needs wrapping
model = _create_wrapped_model(func_name, original_annotation)
wrap_output = True
return _create_wrapped_model(func_name, original_annotation), True
else:
# All other generic types need wrapping (list, tuple, Union, Optional, etc.)
model = _create_wrapped_model(func_name, original_annotation)
wrap_output = True
return _create_wrapped_model(func_name, original_annotation), True

# Handle regular type objects
elif isinstance(type_expr, type):
if isinstance(type_expr, type):
type_annotation = cast(type[Any], type_expr)

# Case 1: BaseModel subclasses (can be used directly)
if issubclass(type_annotation, BaseModel):
model = type_annotation
return type_annotation, False

# Case 2: TypedDicts:
elif is_typeddict(type_annotation):
model = _create_model_from_typeddict(type_annotation)
if is_typeddict(type_annotation):
return _create_model_from_typeddict(type_annotation), False

# Case 3: Primitive types that need wrapping
elif type_annotation in (str, int, float, bool, bytes, type(None)):
model = _create_wrapped_model(func_name, original_annotation)
wrap_output = True
if type_annotation in (str, int, float, bool, bytes, type(None)):
return _create_wrapped_model(func_name, original_annotation), True

# Case 4: Other class types (dataclasses, regular classes with annotations)
else:
type_hints = get_type_hints(type_annotation)
if type_hints:
# Classes with type hints can be converted to Pydantic models
model = _create_model_from_class(type_annotation, type_hints)
# Classes without type hints are not serializable - model remains None
type_hints = get_type_hints(type_annotation)
if type_hints:
# Classes with type hints can be converted to Pydantic models
return _create_model_from_class(type_annotation, type_hints), False
# Classes without type hints are not serializable
return None, False

# Handle any other types not covered above
else:
# This includes typing constructs that aren't GenericAlias in Python 3.10
# (e.g., Union, Optional in some Python versions)
model = _create_wrapped_model(func_name, original_annotation)
wrap_output = True

if model:
# If we successfully created a model, try to get its schema
# Use StrictJsonSchema to raise exceptions instead of warnings
try:
schema = model.model_json_schema(schema_generator=StrictJsonSchema)
except (TypeError, ValueError, pydantic_core.SchemaError, pydantic_core.ValidationError) as e:
# These are expected errors when a type can't be converted to a Pydantic schema
# TypeError: When Pydantic can't handle the type
# ValueError: When there are issues with the type definition (including our custom warnings)
# SchemaError: When Pydantic can't build a schema
# ValidationError: When validation fails
logger.info(f"Cannot create schema for type {type_expr} in {func_name}: {type(e).__name__}: {e}")
return None, None, False
# This includes typing constructs that aren't GenericAlias in Python 3.10
# (e.g., Union, Optional in some Python versions)
return _create_wrapped_model(func_name, original_annotation), True

return model, schema, wrap_output

return None, None, False
def _try_generate_strict_schema(
model: type[BaseModel],
type_expr: Any,
func_name: str,
) -> dict[str, Any] | None:
"""Try to generate a JSON schema using StrictJsonSchema.

Returns the schema dict on success, or None if the type cannot be serialized.
"""
try:
return model.model_json_schema(schema_generator=StrictJsonSchema)
except (TypeError, ValueError, pydantic_core.SchemaError, pydantic_core.ValidationError) as e:
# These are expected errors when a type can't be converted to a Pydantic schema
# TypeError: When Pydantic can't handle the type
# ValueError: When there are issues with the type definition (including our custom warnings)
# SchemaError: When Pydantic can't build a schema
# ValidationError: When validation fails
logger.info(f"Cannot create schema for type {type_expr} in {func_name}: {type(e).__name__}: {e}")
return None


_no_default = object()
Expand Down
Loading