diff --git a/src/mcp/server/mcpserver/utilities/func_metadata.py b/src/mcp/server/mcpserver/utilities/func_metadata.py index 062b47d0f..8175e5185 100644 --- a/src/mcp/server/mcpserver/utilities/func_metadata.py +++ b/src/mcp/server/mcpserver/utilities/func_metadata.py @@ -155,7 +155,7 @@ def pre_parse_json(self, data: dict[str, Any]) -> dict[str, Any]: continue # Not JSON - skip if isinstance(pre_parsed, str | int | float): # This is likely that the raw value is e.g. `"hello"` which we - # Should really be parsed as '"hello"' in Python - but if we parse + # Should really be parsed as '"'hello'"' in Python - but if we parse # it as JSON it'll turn into just 'hello'. So we skip it. continue new_data[data_key] = pre_parsed @@ -206,19 +206,69 @@ def func_metadata( A FuncMetadata object containing: - arg_model: A Pydantic model representing the function's arguments - output_model: A Pydantic model for the return type if the output is structured - - wrap_output: Whether the function result needs to be wrapped in `{"result": ...}` for structured output. + - wrap_output: Whether the function result needs to be wrapped in {"result": ...} + for structured output. """ + sig = _get_function_signature(func) + arguments_model = _build_arg_model(sig, func.__name__, skip_names) + + if structured_output is False: + return FuncMetadata(arg_model=arguments_model) + + resolved = _resolve_return_annotation(sig, structured_output, func.__name__) + if resolved is None: + return FuncMetadata(arg_model=arguments_model) + + original_annotation, return_type_expr = resolved + + output_model, output_schema, wrap_output = _try_create_model_and_schema( + original_annotation, return_type_expr, func.__name__ + ) + + if output_model is None and structured_output is True: + raise InvalidSignature( + f"Function {func.__name__}: return type {return_type_expr} is not serializable for structured output" + ) + + return FuncMetadata( + arg_model=arguments_model, + output_schema=output_schema, + output_model=output_model, + wrap_output=wrap_output, + ) + + +def _get_function_signature(func: Callable[..., Any]) -> inspect.Signature: + """Get the signature of a function, raising InvalidSignature on failure.""" try: - sig = inspect.signature(func, eval_str=True) + return inspect.signature(func, eval_str=True) except NameError as e: # pragma: no cover - # This raise could perhaps be skipped, and we (MCPServer) just call - # model_rebuild right before using it 🤷 raise InvalidSignature(f"Unable to evaluate type annotations for callable {func.__name__!r}") from e + + +def _build_arg_model( + sig: inspect.Signature, + func_name: str, + skip_names: Sequence[str] = (), +) -> type[ArgModelBase]: + """Build a Pydantic model representing the function's arguments. + + Iterates over the function's parameters, handling type annotations, defaults, + and BaseModel attribute name conflicts (via aliasing). + + Args: + sig: The function's inspect.Signature. + func_name: The function's name (used for the model name). + skip_names: Parameter names to exclude from the model. + + Returns: + A dynamically created Pydantic model class. + """ params = sig.parameters dynamic_pydantic_model_params: dict[str, Any] = {} for param in params.values(): if param.name.startswith("_"): # pragma: no cover - raise InvalidSignature(f"Parameter {param.name} of {func.__name__} cannot start with '_'") + raise InvalidSignature(f"Parameter {param.name} of {func_name} cannot start with '_'") if param.name in skip_names: continue @@ -245,24 +295,38 @@ def func_metadata( else: dynamic_pydantic_model_params[field_name] = Annotated[(annotation, *field_metadata, Field(**field_kwargs))] - arguments_model = create_model( - f"{func.__name__}Arguments", + return create_model( + f"{func_name}Arguments", __base__=ArgModelBase, **dynamic_pydantic_model_params, ) - if structured_output is False: - return FuncMetadata(arg_model=arguments_model) - # set up structured output support based on return type annotation +def _resolve_return_annotation( + sig: inspect.Signature, + structured_output: bool | None, + func_name: str, +) -> tuple[Any, Any] | None: + """Resolve and validate the function's return type annotation for structured output. + + Handles special cases including CallToolResult, Annotated metadata, and Union types. + Args: + sig: The function's inspect.Signature. + structured_output: Whether structured output is requested (None for auto-detect). + func_name: The function's name (used for error messages). + + Returns: + A tuple of (original_annotation, type_expr) if structured output should be + attempted, or None if no structured output is needed. + """ if sig.return_annotation is inspect.Parameter.empty and structured_output is True: - raise InvalidSignature(f"Function {func.__name__}: return annotation required for structured output") + raise InvalidSignature(f"Function {func_name}: return annotation required for structured output") try: inspected_return_ann = inspect_annotation(sig.return_annotation, annotation_source=AnnotationSource.FUNCTION) except ForbiddenQualifier as e: - raise InvalidSignature(f"Function {func.__name__}: return annotation contains an invalid type qualifier") from e + raise InvalidSignature(f"Function {func_name}: return annotation contains an invalid type qualifier") from e return_type_expr = inspected_return_ann.type @@ -275,7 +339,7 @@ def func_metadata( # Check if CallToolResult appears in the union (excluding None for Optional check) if any(isinstance(arg, type) and issubclass(arg, CallToolResult) for arg in args if arg is not type(None)): raise InvalidSignature( - f"Function {func.__name__}: CallToolResult cannot be used in Union or Optional types. " + f"Function {func_name}: CallToolResult cannot be used in Union or Optional types. " "To return empty results, use: CallToolResult(content=[])" ) @@ -297,26 +361,11 @@ def func_metadata( # as being `ReturnType`: original_annotation = return_type_expr else: - return FuncMetadata(arg_model=arguments_model) + return None else: original_annotation = sig.return_annotation - output_model, output_schema, wrap_output = _try_create_model_and_schema( - original_annotation, return_type_expr, func.__name__ - ) - - if output_model is None and structured_output is True: - # Model creation failed or produced warnings - no structured output - raise InvalidSignature( - f"Function {func.__name__}: return type {return_type_expr} is not serializable for structured output" - ) - - return FuncMetadata( - arg_model=arguments_model, - output_schema=output_schema, - output_model=output_model, - wrap_output=wrap_output, - ) + return original_annotation, cast(Any, return_type_expr) def _try_create_model_and_schema( @@ -337,16 +386,46 @@ def _try_create_model_and_schema( Model and schema are None if warnings occur or creation fails. wrap_output is True if the result needs to be wrapped in {"result": ...} """ - model = None - wrap_output = False + model, wrap_output = _create_output_model(original_annotation, type_expr, func_name) + + if model is not None: + schema = _try_generate_strict_schema(model, type_expr, func_name) + if schema is None: + return None, None, False + return model, schema, wrap_output + + return None, None, False - # First handle special case: None + +def _create_output_model( + original_annotation: Any, + type_expr: Any, + func_name: str, +) -> tuple[type[BaseModel] | None, bool]: + """Create a Pydantic model for the function's return type. + + Dispatches to the appropriate model creation strategy based on the type: + - None -> wrapped model + - GenericAlias (list, dict, Union, etc.) -> wrapped or dict model + - BaseModel subclasses -> used directly + - TypedDict -> converted to Pydantic model + - Primitive types -> wrapped model + - Classes with type hints -> converted to Pydantic model + + Args: + original_annotation: The original return annotation. + type_expr: The underlying type expression. + func_name: The function's name. + + Returns: + A tuple of (model or None, wrap_output). + """ + # Special case: None if type_expr is None: - model = _create_wrapped_model(func_name, original_annotation) - wrap_output = True + return _create_wrapped_model(func_name, original_annotation), True # Handle GenericAlias types (list[str], dict[str, int], Union[str, int], etc.) - elif isinstance(type_expr, GenericAlias): + if isinstance(type_expr, GenericAlias): origin = get_origin(type_expr) # Special case: dict with string keys can use RootModel @@ -355,65 +434,63 @@ def _try_create_model_and_schema( if len(args) == 2 and args[0] is str: # TODO: should we use the original annotation? We are losing any potential `Annotated` # metadata for Pydantic here: - model = _create_dict_model(func_name, type_expr) + return _create_dict_model(func_name, type_expr), False else: # dict with non-str keys needs wrapping - model = _create_wrapped_model(func_name, original_annotation) - wrap_output = True + return _create_wrapped_model(func_name, original_annotation), True else: # All other generic types need wrapping (list, tuple, Union, Optional, etc.) - model = _create_wrapped_model(func_name, original_annotation) - wrap_output = True + return _create_wrapped_model(func_name, original_annotation), True # Handle regular type objects - elif isinstance(type_expr, type): + if isinstance(type_expr, type): type_annotation = cast(type[Any], type_expr) # Case 1: BaseModel subclasses (can be used directly) if issubclass(type_annotation, BaseModel): - model = type_annotation + return type_annotation, False # Case 2: TypedDicts: - elif is_typeddict(type_annotation): - model = _create_model_from_typeddict(type_annotation) + if is_typeddict(type_annotation): + return _create_model_from_typeddict(type_annotation), False # Case 3: Primitive types that need wrapping - elif type_annotation in (str, int, float, bool, bytes, type(None)): - model = _create_wrapped_model(func_name, original_annotation) - wrap_output = True + if type_annotation in (str, int, float, bool, bytes, type(None)): + return _create_wrapped_model(func_name, original_annotation), True # Case 4: Other class types (dataclasses, regular classes with annotations) - else: - type_hints = get_type_hints(type_annotation) - if type_hints: - # Classes with type hints can be converted to Pydantic models - model = _create_model_from_class(type_annotation, type_hints) - # Classes without type hints are not serializable - model remains None + type_hints = get_type_hints(type_annotation) + if type_hints: + # Classes with type hints can be converted to Pydantic models + return _create_model_from_class(type_annotation, type_hints), False + # Classes without type hints are not serializable + return None, False # Handle any other types not covered above - else: - # This includes typing constructs that aren't GenericAlias in Python 3.10 - # (e.g., Union, Optional in some Python versions) - model = _create_wrapped_model(func_name, original_annotation) - wrap_output = True - - if model: - # If we successfully created a model, try to get its schema - # Use StrictJsonSchema to raise exceptions instead of warnings - try: - schema = model.model_json_schema(schema_generator=StrictJsonSchema) - except (TypeError, ValueError, pydantic_core.SchemaError, pydantic_core.ValidationError) as e: - # These are expected errors when a type can't be converted to a Pydantic schema - # TypeError: When Pydantic can't handle the type - # ValueError: When there are issues with the type definition (including our custom warnings) - # SchemaError: When Pydantic can't build a schema - # ValidationError: When validation fails - logger.info(f"Cannot create schema for type {type_expr} in {func_name}: {type(e).__name__}: {e}") - return None, None, False + # This includes typing constructs that aren't GenericAlias in Python 3.10 + # (e.g., Union, Optional in some Python versions) + return _create_wrapped_model(func_name, original_annotation), True - return model, schema, wrap_output - return None, None, False +def _try_generate_strict_schema( + model: type[BaseModel], + type_expr: Any, + func_name: str, +) -> dict[str, Any] | None: + """Try to generate a JSON schema using StrictJsonSchema. + + Returns the schema dict on success, or None if the type cannot be serialized. + """ + try: + return model.model_json_schema(schema_generator=StrictJsonSchema) + except (TypeError, ValueError, pydantic_core.SchemaError, pydantic_core.ValidationError) as e: + # These are expected errors when a type can't be converted to a Pydantic schema + # TypeError: When Pydantic can't handle the type + # ValueError: When there are issues with the type definition (including our custom warnings) + # SchemaError: When Pydantic can't build a schema + # ValidationError: When validation fails + logger.info(f"Cannot create schema for type {type_expr} in {func_name}: {type(e).__name__}: {e}") + return None _no_default = object()