From cbb117cd677a4823c6dfe4da7b25390b9167c992 Mon Sep 17 00:00:00 2001 From: salaboy Date: Mon, 6 Apr 2026 22:43:23 +0100 Subject: [PATCH] adding sdk tracing capabilities on spring Signed-off-by: salaboy --- .../dapr-spring-boot-4-autoconfigure/pom.xml | 10 + .../DaprClientSB4AutoConfiguration.java | 18 +- .../client/ObservationDaprClient.java | 754 ++++++++++++++++++ .../client/ObservationDaprWorkflowClient.java | 293 +++++++ .../dapr-spring-boot-autoconfigure/pom.xml | 10 + .../client/DaprClientAutoConfiguration.java | 19 +- .../client/ObservationDaprClient.java | 754 ++++++++++++++++++ .../client/ObservationDaprWorkflowClient.java | 293 +++++++ ...lientObservationAutoConfigurationTest.java | 159 ++++ .../client/ObservationDaprClientTest.java | 367 +++++++++ .../ObservationDaprWorkflowClientTest.java | 232 ++++++ 11 files changed, 2903 insertions(+), 6 deletions(-) create mode 100644 dapr-spring/dapr-spring-boot-4-autoconfigure/src/main/java/io/dapr/spring/boot4/autoconfigure/client/ObservationDaprClient.java create mode 100644 dapr-spring/dapr-spring-boot-4-autoconfigure/src/main/java/io/dapr/spring/boot4/autoconfigure/client/ObservationDaprWorkflowClient.java create mode 100644 dapr-spring/dapr-spring-boot-autoconfigure/src/main/java/io/dapr/spring/boot/autoconfigure/client/ObservationDaprClient.java create mode 100644 dapr-spring/dapr-spring-boot-autoconfigure/src/main/java/io/dapr/spring/boot/autoconfigure/client/ObservationDaprWorkflowClient.java create mode 100644 dapr-spring/dapr-spring-boot-autoconfigure/src/test/java/io/dapr/spring/boot/autoconfigure/client/DaprClientObservationAutoConfigurationTest.java create mode 100644 dapr-spring/dapr-spring-boot-autoconfigure/src/test/java/io/dapr/spring/boot/autoconfigure/client/ObservationDaprClientTest.java create mode 100644 dapr-spring/dapr-spring-boot-autoconfigure/src/test/java/io/dapr/spring/boot/autoconfigure/client/ObservationDaprWorkflowClientTest.java diff --git a/dapr-spring/dapr-spring-boot-4-autoconfigure/pom.xml b/dapr-spring/dapr-spring-boot-4-autoconfigure/pom.xml index c5b93b8f02..9ffe09faa8 100644 --- a/dapr-spring/dapr-spring-boot-4-autoconfigure/pom.xml +++ b/dapr-spring/dapr-spring-boot-4-autoconfigure/pom.xml @@ -76,6 +76,16 @@ spring-data-keyvalue true + + io.micrometer + micrometer-observation + true + + + io.micrometer + micrometer-observation-test + test + org.springframework.boot spring-boot-starter-web diff --git a/dapr-spring/dapr-spring-boot-4-autoconfigure/src/main/java/io/dapr/spring/boot4/autoconfigure/client/DaprClientSB4AutoConfiguration.java b/dapr-spring/dapr-spring-boot-4-autoconfigure/src/main/java/io/dapr/spring/boot4/autoconfigure/client/DaprClientSB4AutoConfiguration.java index cbe107ebb4..d16cff207e 100644 --- a/dapr-spring/dapr-spring-boot-4-autoconfigure/src/main/java/io/dapr/spring/boot4/autoconfigure/client/DaprClientSB4AutoConfiguration.java +++ b/dapr-spring/dapr-spring-boot-4-autoconfigure/src/main/java/io/dapr/spring/boot4/autoconfigure/client/DaprClientSB4AutoConfiguration.java @@ -24,6 +24,7 @@ import io.dapr.spring.boot.properties.client.DaprConnectionDetails; import io.dapr.workflows.client.DaprWorkflowClient; import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; +import io.micrometer.observation.ObservationRegistry; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; @@ -91,14 +92,25 @@ DaprClientBuilder daprClientBuilder(DaprConnectionDetails daprConnectionDetails, @Bean @ConditionalOnMissingBean - DaprClient daprClient(DaprClientBuilder daprClientBuilder) { - return daprClientBuilder.build(); + DaprClient daprClient(DaprClientBuilder daprClientBuilder, + ObjectProvider observationRegistryProvider) { + DaprClient client = daprClientBuilder.build(); + ObservationRegistry registry = observationRegistryProvider.getIfAvailable(); + if (registry != null && !registry.isNoop()) { + return new ObservationDaprClient(client, registry); + } + return client; } @Bean @ConditionalOnMissingBean - DaprWorkflowClient daprWorkflowClient(DaprConnectionDetails daprConnectionDetails) { + DaprWorkflowClient daprWorkflowClient(DaprConnectionDetails daprConnectionDetails, + ObjectProvider observationRegistryProvider) { Properties properties = createPropertiesFromConnectionDetails(daprConnectionDetails); + ObservationRegistry registry = observationRegistryProvider.getIfAvailable(); + if (registry != null && !registry.isNoop()) { + return new ObservationDaprWorkflowClient(properties, registry); + } return new DaprWorkflowClient(properties); } diff --git a/dapr-spring/dapr-spring-boot-4-autoconfigure/src/main/java/io/dapr/spring/boot4/autoconfigure/client/ObservationDaprClient.java b/dapr-spring/dapr-spring-boot-4-autoconfigure/src/main/java/io/dapr/spring/boot4/autoconfigure/client/ObservationDaprClient.java new file mode 100644 index 0000000000..c0a321b337 --- /dev/null +++ b/dapr-spring/dapr-spring-boot-4-autoconfigure/src/main/java/io/dapr/spring/boot4/autoconfigure/client/ObservationDaprClient.java @@ -0,0 +1,754 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.spring.boot4.autoconfigure.client; + +import io.dapr.client.DaprClient; +import io.dapr.client.domain.BulkPublishRequest; +import io.dapr.client.domain.BulkPublishResponse; +import io.dapr.client.domain.ConfigurationItem; +import io.dapr.client.domain.DaprMetadata; +import io.dapr.client.domain.DeleteJobRequest; +import io.dapr.client.domain.DeleteStateRequest; +import io.dapr.client.domain.ExecuteStateTransactionRequest; +import io.dapr.client.domain.GetBulkSecretRequest; +import io.dapr.client.domain.GetBulkStateRequest; +import io.dapr.client.domain.GetConfigurationRequest; +import io.dapr.client.domain.GetJobRequest; +import io.dapr.client.domain.GetJobResponse; +import io.dapr.client.domain.GetSecretRequest; +import io.dapr.client.domain.GetStateRequest; +import io.dapr.client.domain.HttpExtension; +import io.dapr.client.domain.InvokeBindingRequest; +import io.dapr.client.domain.InvokeMethodRequest; +import io.dapr.client.domain.PublishEventRequest; +import io.dapr.client.domain.SaveStateRequest; +import io.dapr.client.domain.ScheduleJobRequest; +import io.dapr.client.domain.State; +import io.dapr.client.domain.StateOptions; +import io.dapr.client.domain.SubscribeConfigurationRequest; +import io.dapr.client.domain.SubscribeConfigurationResponse; +import io.dapr.client.domain.TransactionalStateOperation; +import io.dapr.client.domain.UnsubscribeConfigurationRequest; +import io.dapr.client.domain.UnsubscribeConfigurationResponse; +import io.dapr.utils.TypeRef; +import io.grpc.Channel; +import io.grpc.stub.AbstractStub; +import io.micrometer.observation.Observation; +import io.micrometer.observation.ObservationRegistry; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * A {@link DaprClient} decorator that creates Micrometer Observation spans (bridged to OpenTelemetry) + * for each non-deprecated method call. Consumers continue to use {@link DaprClient} as-is; no code + * changes are required on their side. + * + *

Deprecated methods are delegated directly without any observation. + */ +public class ObservationDaprClient implements DaprClient { + + private final DaprClient delegate; + private final ObservationRegistry observationRegistry; + + /** + * Creates a new {@code ObservationDaprClient}. + * + * @param delegate the underlying {@link DaprClient} to delegate calls to + * @param observationRegistry the Micrometer {@link ObservationRegistry} used to create spans + */ + public ObservationDaprClient(DaprClient delegate, ObservationRegistry observationRegistry) { + this.delegate = delegate; + this.observationRegistry = observationRegistry; + } + + // ------------------------------------------------------------------------- + // Internal helpers + // ------------------------------------------------------------------------- + + private Mono observe(Observation obs, Supplier> monoSupplier) { + obs.start(); + return monoSupplier.get() + .doOnError(obs::error) + .doFinally(signal -> obs.stop()); + } + + private Flux observeFlux(Observation obs, Supplier> fluxSupplier) { + obs.start(); + return fluxSupplier.get() + .doOnError(obs::error) + .doFinally(signal -> obs.stop()); + } + + private Observation observation(String name) { + return Observation.createNotStarted(name, observationRegistry); + } + + // ------------------------------------------------------------------------- + // Lifecycle + // ------------------------------------------------------------------------- + + @Override + public Mono waitForSidecar(int timeoutInMilliseconds) { + return observe(observation("dapr.client.wait_for_sidecar"), + () -> delegate.waitForSidecar(timeoutInMilliseconds)); + } + + @Override + public Mono shutdown() { + return observe(observation("dapr.client.shutdown"), + () -> delegate.shutdown()); + } + + @Override + public void close() throws Exception { + delegate.close(); + } + + // ------------------------------------------------------------------------- + // Pub/Sub + // ------------------------------------------------------------------------- + + @Override + public Mono publishEvent(String pubsubName, String topicName, Object data) { + return observe( + observation("dapr.client.publish_event") + .highCardinalityKeyValue("dapr.pubsub.name", pubsubName) + .highCardinalityKeyValue("dapr.topic.name", topicName), + () -> delegate.publishEvent(pubsubName, topicName, data)); + } + + @Override + public Mono publishEvent(String pubsubName, String topicName, Object data, + Map metadata) { + return observe( + observation("dapr.client.publish_event") + .highCardinalityKeyValue("dapr.pubsub.name", pubsubName) + .highCardinalityKeyValue("dapr.topic.name", topicName), + () -> delegate.publishEvent(pubsubName, topicName, data, metadata)); + } + + @Override + public Mono publishEvent(PublishEventRequest request) { + return observe( + observation("dapr.client.publish_event") + .highCardinalityKeyValue("dapr.pubsub.name", request.getPubsubName()) + .highCardinalityKeyValue("dapr.topic.name", request.getTopic()), + () -> delegate.publishEvent(request)); + } + + @Override + public Mono> publishEvents(BulkPublishRequest request) { + return observe( + observation("dapr.client.publish_events") + .highCardinalityKeyValue("dapr.pubsub.name", request.getPubsubName()) + .highCardinalityKeyValue("dapr.topic.name", request.getTopic()), + () -> delegate.publishEvents(request)); + } + + @Override + public Mono> publishEvents(String pubsubName, String topicName, + String contentType, List events) { + return observe( + observation("dapr.client.publish_events") + .highCardinalityKeyValue("dapr.pubsub.name", pubsubName) + .highCardinalityKeyValue("dapr.topic.name", topicName), + () -> delegate.publishEvents(pubsubName, topicName, contentType, events)); + } + + @Override + @SuppressWarnings("unchecked") + public Mono> publishEvents(String pubsubName, String topicName, + String contentType, T... events) { + return observe( + observation("dapr.client.publish_events") + .highCardinalityKeyValue("dapr.pubsub.name", pubsubName) + .highCardinalityKeyValue("dapr.topic.name", topicName), + () -> delegate.publishEvents(pubsubName, topicName, contentType, events)); + } + + @Override + public Mono> publishEvents(String pubsubName, String topicName, + String contentType, + Map requestMetadata, + List events) { + return observe( + observation("dapr.client.publish_events") + .highCardinalityKeyValue("dapr.pubsub.name", pubsubName) + .highCardinalityKeyValue("dapr.topic.name", topicName), + () -> delegate.publishEvents(pubsubName, topicName, contentType, requestMetadata, events)); + } + + @Override + @SuppressWarnings("unchecked") + public Mono> publishEvents(String pubsubName, String topicName, + String contentType, + Map requestMetadata, + T... events) { + return observe( + observation("dapr.client.publish_events") + .highCardinalityKeyValue("dapr.pubsub.name", pubsubName) + .highCardinalityKeyValue("dapr.topic.name", topicName), + () -> delegate.publishEvents(pubsubName, topicName, contentType, requestMetadata, events)); + } + + // ------------------------------------------------------------------------- + // Service Invocation — all deprecated, delegate directly + // ------------------------------------------------------------------------- + + @Override + @Deprecated + public Mono invokeMethod(String appId, String methodName, Object data, + HttpExtension httpExtension, Map metadata, + TypeRef type) { + return delegate.invokeMethod(appId, methodName, data, httpExtension, metadata, type); + } + + @Override + @Deprecated + public Mono invokeMethod(String appId, String methodName, Object request, + HttpExtension httpExtension, Map metadata, + Class clazz) { + return delegate.invokeMethod(appId, methodName, request, httpExtension, metadata, clazz); + } + + @Override + @Deprecated + public Mono invokeMethod(String appId, String methodName, Object request, + HttpExtension httpExtension, TypeRef type) { + return delegate.invokeMethod(appId, methodName, request, httpExtension, type); + } + + @Override + @Deprecated + public Mono invokeMethod(String appId, String methodName, Object request, + HttpExtension httpExtension, Class clazz) { + return delegate.invokeMethod(appId, methodName, request, httpExtension, clazz); + } + + @Override + @Deprecated + public Mono invokeMethod(String appId, String methodName, HttpExtension httpExtension, + Map metadata, TypeRef type) { + return delegate.invokeMethod(appId, methodName, httpExtension, metadata, type); + } + + @Override + @Deprecated + public Mono invokeMethod(String appId, String methodName, HttpExtension httpExtension, + Map metadata, Class clazz) { + return delegate.invokeMethod(appId, methodName, httpExtension, metadata, clazz); + } + + @Override + @Deprecated + public Mono invokeMethod(String appId, String methodName, Object request, + HttpExtension httpExtension, Map metadata) { + return delegate.invokeMethod(appId, methodName, request, httpExtension, metadata); + } + + @Override + @Deprecated + public Mono invokeMethod(String appId, String methodName, Object request, + HttpExtension httpExtension) { + return delegate.invokeMethod(appId, methodName, request, httpExtension); + } + + @Override + @Deprecated + public Mono invokeMethod(String appId, String methodName, HttpExtension httpExtension, + Map metadata) { + return delegate.invokeMethod(appId, methodName, httpExtension, metadata); + } + + @Override + @Deprecated + public Mono invokeMethod(String appId, String methodName, byte[] request, + HttpExtension httpExtension, Map metadata) { + return delegate.invokeMethod(appId, methodName, request, httpExtension, metadata); + } + + @Override + @Deprecated + public Mono invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef type) { + return delegate.invokeMethod(invokeMethodRequest, type); + } + + // ------------------------------------------------------------------------- + // Bindings + // ------------------------------------------------------------------------- + + @Override + public Mono invokeBinding(String bindingName, String operation, Object data) { + return observe( + observation("dapr.client.invoke_binding") + .highCardinalityKeyValue("dapr.binding.name", bindingName) + .highCardinalityKeyValue("dapr.binding.operation", operation), + () -> delegate.invokeBinding(bindingName, operation, data)); + } + + @Override + public Mono invokeBinding(String bindingName, String operation, byte[] data, + Map metadata) { + return observe( + observation("dapr.client.invoke_binding") + .highCardinalityKeyValue("dapr.binding.name", bindingName) + .highCardinalityKeyValue("dapr.binding.operation", operation), + () -> delegate.invokeBinding(bindingName, operation, data, metadata)); + } + + @Override + public Mono invokeBinding(String bindingName, String operation, Object data, + TypeRef type) { + return observe( + observation("dapr.client.invoke_binding") + .highCardinalityKeyValue("dapr.binding.name", bindingName) + .highCardinalityKeyValue("dapr.binding.operation", operation), + () -> delegate.invokeBinding(bindingName, operation, data, type)); + } + + @Override + public Mono invokeBinding(String bindingName, String operation, Object data, + Class clazz) { + return observe( + observation("dapr.client.invoke_binding") + .highCardinalityKeyValue("dapr.binding.name", bindingName) + .highCardinalityKeyValue("dapr.binding.operation", operation), + () -> delegate.invokeBinding(bindingName, operation, data, clazz)); + } + + @Override + public Mono invokeBinding(String bindingName, String operation, Object data, + Map metadata, TypeRef type) { + return observe( + observation("dapr.client.invoke_binding") + .highCardinalityKeyValue("dapr.binding.name", bindingName) + .highCardinalityKeyValue("dapr.binding.operation", operation), + () -> delegate.invokeBinding(bindingName, operation, data, metadata, type)); + } + + @Override + public Mono invokeBinding(String bindingName, String operation, Object data, + Map metadata, Class clazz) { + return observe( + observation("dapr.client.invoke_binding") + .highCardinalityKeyValue("dapr.binding.name", bindingName) + .highCardinalityKeyValue("dapr.binding.operation", operation), + () -> delegate.invokeBinding(bindingName, operation, data, metadata, clazz)); + } + + @Override + public Mono invokeBinding(InvokeBindingRequest request) { + return observe( + observation("dapr.client.invoke_binding") + .highCardinalityKeyValue("dapr.binding.name", request.getName()) + .highCardinalityKeyValue("dapr.binding.operation", request.getOperation()), + () -> delegate.invokeBinding(request)); + } + + @Override + public Mono invokeBinding(InvokeBindingRequest request, TypeRef type) { + return observe( + observation("dapr.client.invoke_binding") + .highCardinalityKeyValue("dapr.binding.name", request.getName()) + .highCardinalityKeyValue("dapr.binding.operation", request.getOperation()), + () -> delegate.invokeBinding(request, type)); + } + + // ------------------------------------------------------------------------- + // State Management + // ------------------------------------------------------------------------- + + @Override + public Mono> getState(String storeName, State state, TypeRef type) { + return observe( + observation("dapr.client.get_state") + .highCardinalityKeyValue("dapr.store.name", storeName) + .highCardinalityKeyValue("dapr.state.key", state.getKey()), + () -> delegate.getState(storeName, state, type)); + } + + @Override + public Mono> getState(String storeName, State state, Class clazz) { + return observe( + observation("dapr.client.get_state") + .highCardinalityKeyValue("dapr.store.name", storeName) + .highCardinalityKeyValue("dapr.state.key", state.getKey()), + () -> delegate.getState(storeName, state, clazz)); + } + + @Override + public Mono> getState(String storeName, String key, TypeRef type) { + return observe( + observation("dapr.client.get_state") + .highCardinalityKeyValue("dapr.store.name", storeName) + .highCardinalityKeyValue("dapr.state.key", key), + () -> delegate.getState(storeName, key, type)); + } + + @Override + public Mono> getState(String storeName, String key, Class clazz) { + return observe( + observation("dapr.client.get_state") + .highCardinalityKeyValue("dapr.store.name", storeName) + .highCardinalityKeyValue("dapr.state.key", key), + () -> delegate.getState(storeName, key, clazz)); + } + + @Override + public Mono> getState(String storeName, String key, StateOptions options, + TypeRef type) { + return observe( + observation("dapr.client.get_state") + .highCardinalityKeyValue("dapr.store.name", storeName) + .highCardinalityKeyValue("dapr.state.key", key), + () -> delegate.getState(storeName, key, options, type)); + } + + @Override + public Mono> getState(String storeName, String key, StateOptions options, + Class clazz) { + return observe( + observation("dapr.client.get_state") + .highCardinalityKeyValue("dapr.store.name", storeName) + .highCardinalityKeyValue("dapr.state.key", key), + () -> delegate.getState(storeName, key, options, clazz)); + } + + @Override + public Mono> getState(GetStateRequest request, TypeRef type) { + return observe( + observation("dapr.client.get_state") + .highCardinalityKeyValue("dapr.store.name", request.getStoreName()) + .highCardinalityKeyValue("dapr.state.key", request.getKey()), + () -> delegate.getState(request, type)); + } + + @Override + public Mono>> getBulkState(String storeName, List keys, + TypeRef type) { + return observe( + observation("dapr.client.get_bulk_state") + .highCardinalityKeyValue("dapr.store.name", storeName), + () -> delegate.getBulkState(storeName, keys, type)); + } + + @Override + public Mono>> getBulkState(String storeName, List keys, + Class clazz) { + return observe( + observation("dapr.client.get_bulk_state") + .highCardinalityKeyValue("dapr.store.name", storeName), + () -> delegate.getBulkState(storeName, keys, clazz)); + } + + @Override + public Mono>> getBulkState(GetBulkStateRequest request, TypeRef type) { + return observe( + observation("dapr.client.get_bulk_state") + .highCardinalityKeyValue("dapr.store.name", request.getStoreName()), + () -> delegate.getBulkState(request, type)); + } + + @Override + public Mono executeStateTransaction(String storeName, + List> operations) { + return observe( + observation("dapr.client.execute_state_transaction") + .highCardinalityKeyValue("dapr.store.name", storeName), + () -> delegate.executeStateTransaction(storeName, operations)); + } + + @Override + public Mono executeStateTransaction(ExecuteStateTransactionRequest request) { + return observe( + observation("dapr.client.execute_state_transaction") + .highCardinalityKeyValue("dapr.store.name", request.getStateStoreName()), + () -> delegate.executeStateTransaction(request)); + } + + @Override + public Mono saveBulkState(String storeName, List> states) { + return observe( + observation("dapr.client.save_bulk_state") + .highCardinalityKeyValue("dapr.store.name", storeName), + () -> delegate.saveBulkState(storeName, states)); + } + + @Override + public Mono saveBulkState(SaveStateRequest request) { + return observe( + observation("dapr.client.save_bulk_state") + .highCardinalityKeyValue("dapr.store.name", request.getStoreName()), + () -> delegate.saveBulkState(request)); + } + + @Override + public Mono saveState(String storeName, String key, Object value) { + return observe( + observation("dapr.client.save_state") + .highCardinalityKeyValue("dapr.store.name", storeName) + .highCardinalityKeyValue("dapr.state.key", key), + () -> delegate.saveState(storeName, key, value)); + } + + @Override + public Mono saveState(String storeName, String key, String etag, Object value, + StateOptions options) { + return observe( + observation("dapr.client.save_state") + .highCardinalityKeyValue("dapr.store.name", storeName) + .highCardinalityKeyValue("dapr.state.key", key), + () -> delegate.saveState(storeName, key, etag, value, options)); + } + + @Override + public Mono saveState(String storeName, String key, String etag, Object value, + Map meta, StateOptions options) { + return observe( + observation("dapr.client.save_state") + .highCardinalityKeyValue("dapr.store.name", storeName) + .highCardinalityKeyValue("dapr.state.key", key), + () -> delegate.saveState(storeName, key, etag, value, meta, options)); + } + + @Override + public Mono deleteState(String storeName, String key) { + return observe( + observation("dapr.client.delete_state") + .highCardinalityKeyValue("dapr.store.name", storeName) + .highCardinalityKeyValue("dapr.state.key", key), + () -> delegate.deleteState(storeName, key)); + } + + @Override + public Mono deleteState(String storeName, String key, String etag, + StateOptions options) { + return observe( + observation("dapr.client.delete_state") + .highCardinalityKeyValue("dapr.store.name", storeName) + .highCardinalityKeyValue("dapr.state.key", key), + () -> delegate.deleteState(storeName, key, etag, options)); + } + + @Override + public Mono deleteState(DeleteStateRequest request) { + return observe( + observation("dapr.client.delete_state") + .highCardinalityKeyValue("dapr.store.name", request.getStateStoreName()) + .highCardinalityKeyValue("dapr.state.key", request.getKey()), + () -> delegate.deleteState(request)); + } + + // ------------------------------------------------------------------------- + // Secrets + // ------------------------------------------------------------------------- + + @Override + public Mono> getSecret(String storeName, String secretName, + Map metadata) { + return observe( + observation("dapr.client.get_secret") + .highCardinalityKeyValue("dapr.secret.store", storeName) + .highCardinalityKeyValue("dapr.secret.name", secretName), + () -> delegate.getSecret(storeName, secretName, metadata)); + } + + @Override + public Mono> getSecret(String storeName, String secretName) { + return observe( + observation("dapr.client.get_secret") + .highCardinalityKeyValue("dapr.secret.store", storeName) + .highCardinalityKeyValue("dapr.secret.name", secretName), + () -> delegate.getSecret(storeName, secretName)); + } + + @Override + public Mono> getSecret(GetSecretRequest request) { + return observe( + observation("dapr.client.get_secret") + .highCardinalityKeyValue("dapr.secret.store", request.getStoreName()) + .highCardinalityKeyValue("dapr.secret.name", request.getKey()), + () -> delegate.getSecret(request)); + } + + @Override + public Mono>> getBulkSecret(String storeName) { + return observe( + observation("dapr.client.get_bulk_secret") + .highCardinalityKeyValue("dapr.secret.store", storeName), + () -> delegate.getBulkSecret(storeName)); + } + + @Override + public Mono>> getBulkSecret(String storeName, + Map metadata) { + return observe( + observation("dapr.client.get_bulk_secret") + .highCardinalityKeyValue("dapr.secret.store", storeName), + () -> delegate.getBulkSecret(storeName, metadata)); + } + + @Override + public Mono>> getBulkSecret(GetBulkSecretRequest request) { + return observe( + observation("dapr.client.get_bulk_secret") + .highCardinalityKeyValue("dapr.secret.store", request.getStoreName()), + () -> delegate.getBulkSecret(request)); + } + + // ------------------------------------------------------------------------- + // Configuration + // ------------------------------------------------------------------------- + + @Override + public Mono getConfiguration(String storeName, String key) { + return observe( + observation("dapr.client.get_configuration") + .highCardinalityKeyValue("dapr.configuration.store", storeName) + .highCardinalityKeyValue("dapr.configuration.key", key), + () -> delegate.getConfiguration(storeName, key)); + } + + @Override + public Mono getConfiguration(String storeName, String key, + Map metadata) { + return observe( + observation("dapr.client.get_configuration") + .highCardinalityKeyValue("dapr.configuration.store", storeName) + .highCardinalityKeyValue("dapr.configuration.key", key), + () -> delegate.getConfiguration(storeName, key, metadata)); + } + + @Override + public Mono> getConfiguration(String storeName, String... keys) { + return observe( + observation("dapr.client.get_configuration") + .highCardinalityKeyValue("dapr.configuration.store", storeName), + () -> delegate.getConfiguration(storeName, keys)); + } + + @Override + public Mono> getConfiguration(String storeName, List keys, + Map metadata) { + return observe( + observation("dapr.client.get_configuration") + .highCardinalityKeyValue("dapr.configuration.store", storeName), + () -> delegate.getConfiguration(storeName, keys, metadata)); + } + + @Override + public Mono> getConfiguration(GetConfigurationRequest request) { + return observe( + observation("dapr.client.get_configuration") + .highCardinalityKeyValue("dapr.configuration.store", request.getStoreName()), + () -> delegate.getConfiguration(request)); + } + + @Override + public Flux subscribeConfiguration(String storeName, + String... keys) { + return observeFlux( + observation("dapr.client.subscribe_configuration") + .highCardinalityKeyValue("dapr.configuration.store", storeName), + () -> delegate.subscribeConfiguration(storeName, keys)); + } + + @Override + public Flux subscribeConfiguration(String storeName, + List keys, + Map metadata) { + return observeFlux( + observation("dapr.client.subscribe_configuration") + .highCardinalityKeyValue("dapr.configuration.store", storeName), + () -> delegate.subscribeConfiguration(storeName, keys, metadata)); + } + + @Override + public Flux subscribeConfiguration( + SubscribeConfigurationRequest request) { + return observeFlux( + observation("dapr.client.subscribe_configuration") + .highCardinalityKeyValue("dapr.configuration.store", request.getStoreName()), + () -> delegate.subscribeConfiguration(request)); + } + + @Override + public Mono unsubscribeConfiguration(String id, + String storeName) { + return observe( + observation("dapr.client.unsubscribe_configuration") + .highCardinalityKeyValue("dapr.configuration.store", storeName), + () -> delegate.unsubscribeConfiguration(id, storeName)); + } + + @Override + public Mono unsubscribeConfiguration( + UnsubscribeConfigurationRequest request) { + return observe( + observation("dapr.client.unsubscribe_configuration") + .highCardinalityKeyValue("dapr.configuration.store", request.getStoreName()), + () -> delegate.unsubscribeConfiguration(request)); + } + + // ------------------------------------------------------------------------- + // gRPC Stub — no remote call at creation time, no observation needed + // ------------------------------------------------------------------------- + + @Override + public > T newGrpcStub(String appId, Function stubBuilder) { + return delegate.newGrpcStub(appId, stubBuilder); + } + + // ------------------------------------------------------------------------- + // Metadata + // ------------------------------------------------------------------------- + + @Override + public Mono getMetadata() { + return observe(observation("dapr.client.get_metadata"), () -> delegate.getMetadata()); + } + + // ------------------------------------------------------------------------- + // Jobs + // ------------------------------------------------------------------------- + + @Override + public Mono scheduleJob(ScheduleJobRequest scheduleJobRequest) { + return observe( + observation("dapr.client.schedule_job") + .highCardinalityKeyValue("dapr.job.name", scheduleJobRequest.getName()), + () -> delegate.scheduleJob(scheduleJobRequest)); + } + + @Override + public Mono getJob(GetJobRequest getJobRequest) { + return observe( + observation("dapr.client.get_job") + .highCardinalityKeyValue("dapr.job.name", getJobRequest.getName()), + () -> delegate.getJob(getJobRequest)); + } + + @Override + public Mono deleteJob(DeleteJobRequest deleteJobRequest) { + return observe( + observation("dapr.client.delete_job") + .highCardinalityKeyValue("dapr.job.name", deleteJobRequest.getName()), + () -> delegate.deleteJob(deleteJobRequest)); + } +} diff --git a/dapr-spring/dapr-spring-boot-4-autoconfigure/src/main/java/io/dapr/spring/boot4/autoconfigure/client/ObservationDaprWorkflowClient.java b/dapr-spring/dapr-spring-boot-4-autoconfigure/src/main/java/io/dapr/spring/boot4/autoconfigure/client/ObservationDaprWorkflowClient.java new file mode 100644 index 0000000000..a9de16c546 --- /dev/null +++ b/dapr-spring/dapr-spring-boot-4-autoconfigure/src/main/java/io/dapr/spring/boot4/autoconfigure/client/ObservationDaprWorkflowClient.java @@ -0,0 +1,293 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.spring.boot4.autoconfigure.client; + +import io.dapr.config.Properties; +import io.dapr.workflows.Workflow; +import io.dapr.workflows.client.DaprWorkflowClient; +import io.dapr.workflows.client.NewWorkflowOptions; +import io.dapr.workflows.client.WorkflowState; +import io.micrometer.observation.Observation; +import io.micrometer.observation.ObservationRegistry; + +import javax.annotation.Nullable; +import java.time.Duration; +import java.util.concurrent.TimeoutException; + +/** + * A {@link DaprWorkflowClient} subclass that creates Micrometer Observation spans (bridged to + * OpenTelemetry) for each non-deprecated method call. + * + *

Because this class extends {@link DaprWorkflowClient}, consumers can keep injecting + * {@code DaprWorkflowClient} without any code changes. Deprecated methods fall through to the + * parent implementation without any observation. + * + *

Constructor note: calling {@code super(properties)} eagerly creates a gRPC + * {@code ManagedChannel}, but the actual TCP connection is established lazily on the first RPC call, + * so construction succeeds even when the Dapr sidecar is not yet available. + */ +public class ObservationDaprWorkflowClient extends DaprWorkflowClient { + + private final ObservationRegistry observationRegistry; + + /** + * Creates a new {@code ObservationDaprWorkflowClient}. + * + * @param properties connection properties for the underlying gRPC channel + * @param observationRegistry the Micrometer {@link ObservationRegistry} used to create spans + */ + public ObservationDaprWorkflowClient(Properties properties, + ObservationRegistry observationRegistry) { + super(properties); + this.observationRegistry = observationRegistry; + } + + // ------------------------------------------------------------------------- + // Internal helpers + // ------------------------------------------------------------------------- + + private Observation observation(String name) { + return Observation.createNotStarted(name, observationRegistry); + } + + // ------------------------------------------------------------------------- + // scheduleNewWorkflow — only String-based "leaf" overloads are overridden. + // Class-based overloads in the parent delegate to this.scheduleNewWorkflow(String, ...) + // via dynamic dispatch, so they naturally pick up these observations. + // ------------------------------------------------------------------------- + + @Override + public String scheduleNewWorkflow(String name) { + Observation obs = observation("dapr.workflow.schedule") + .highCardinalityKeyValue("dapr.workflow.name", name) + .start(); + try { + return super.scheduleNewWorkflow(name); + } catch (RuntimeException e) { + obs.error(e); + throw e; + } finally { + obs.stop(); + } + } + + @Override + public String scheduleNewWorkflow(String name, Object input) { + Observation obs = observation("dapr.workflow.schedule") + .highCardinalityKeyValue("dapr.workflow.name", name) + .start(); + try { + return super.scheduleNewWorkflow(name, input); + } catch (RuntimeException e) { + obs.error(e); + throw e; + } finally { + obs.stop(); + } + } + + @Override + public String scheduleNewWorkflow(String name, Object input, + String instanceId) { + Observation obs = observation("dapr.workflow.schedule") + .highCardinalityKeyValue("dapr.workflow.name", name) + .highCardinalityKeyValue("dapr.workflow.instance_id", + instanceId != null ? instanceId : "") + .start(); + try { + return super.scheduleNewWorkflow(name, input, instanceId); + } catch (RuntimeException e) { + obs.error(e); + throw e; + } finally { + obs.stop(); + } + } + + @Override + public String scheduleNewWorkflow(String name, + NewWorkflowOptions options) { + String instanceId = options != null && options.getInstanceId() != null + ? options.getInstanceId() : ""; + Observation obs = observation("dapr.workflow.schedule") + .highCardinalityKeyValue("dapr.workflow.name", name) + .highCardinalityKeyValue("dapr.workflow.instance_id", instanceId) + .start(); + try { + return super.scheduleNewWorkflow(name, options); + } catch (RuntimeException e) { + obs.error(e); + throw e; + } finally { + obs.stop(); + } + } + + // ------------------------------------------------------------------------- + // Lifecycle operations + // ------------------------------------------------------------------------- + + @Override + public void suspendWorkflow(String workflowInstanceId, @Nullable String reason) { + Observation obs = observation("dapr.workflow.suspend") + .highCardinalityKeyValue("dapr.workflow.instance_id", workflowInstanceId) + .start(); + try { + super.suspendWorkflow(workflowInstanceId, reason); + } catch (RuntimeException e) { + obs.error(e); + throw e; + } finally { + obs.stop(); + } + } + + @Override + public void resumeWorkflow(String workflowInstanceId, @Nullable String reason) { + Observation obs = observation("dapr.workflow.resume") + .highCardinalityKeyValue("dapr.workflow.instance_id", workflowInstanceId) + .start(); + try { + super.resumeWorkflow(workflowInstanceId, reason); + } catch (RuntimeException e) { + obs.error(e); + throw e; + } finally { + obs.stop(); + } + } + + @Override + public void terminateWorkflow(String workflowInstanceId, @Nullable Object output) { + Observation obs = observation("dapr.workflow.terminate") + .highCardinalityKeyValue("dapr.workflow.instance_id", workflowInstanceId) + .start(); + try { + super.terminateWorkflow(workflowInstanceId, output); + } catch (RuntimeException e) { + obs.error(e); + throw e; + } finally { + obs.stop(); + } + } + + // ------------------------------------------------------------------------- + // State queries + // ------------------------------------------------------------------------- + + @Override + @Nullable + public WorkflowState getWorkflowState(String instanceId, boolean getInputsAndOutputs) { + Observation obs = observation("dapr.workflow.get_state") + .highCardinalityKeyValue("dapr.workflow.instance_id", instanceId) + .start(); + try { + return super.getWorkflowState(instanceId, getInputsAndOutputs); + } catch (RuntimeException e) { + obs.error(e); + throw e; + } finally { + obs.stop(); + } + } + + // ------------------------------------------------------------------------- + // Waiting + // ------------------------------------------------------------------------- + + @Override + @Nullable + public WorkflowState waitForWorkflowStart(String instanceId, Duration timeout, + boolean getInputsAndOutputs) throws TimeoutException { + Observation obs = observation("dapr.workflow.wait_start") + .highCardinalityKeyValue("dapr.workflow.instance_id", instanceId) + .start(); + try { + return super.waitForWorkflowStart(instanceId, timeout, getInputsAndOutputs); + } catch (TimeoutException e) { + obs.error(e); + throw e; + } catch (RuntimeException e) { + obs.error(e); + throw e; + } finally { + obs.stop(); + } + } + + @Override + @Nullable + public WorkflowState waitForWorkflowCompletion(String instanceId, Duration timeout, + boolean getInputsAndOutputs) + throws TimeoutException { + Observation obs = observation("dapr.workflow.wait_completion") + .highCardinalityKeyValue("dapr.workflow.instance_id", instanceId) + .start(); + try { + return super.waitForWorkflowCompletion(instanceId, timeout, getInputsAndOutputs); + } catch (TimeoutException e) { + obs.error(e); + throw e; + } catch (RuntimeException e) { + obs.error(e); + throw e; + } finally { + obs.stop(); + } + } + + // ------------------------------------------------------------------------- + // Events + // ------------------------------------------------------------------------- + + @Override + public void raiseEvent(String workflowInstanceId, String eventName, Object eventPayload) { + Observation obs = observation("dapr.workflow.raise_event") + .highCardinalityKeyValue("dapr.workflow.instance_id", workflowInstanceId) + .highCardinalityKeyValue("dapr.workflow.event_name", eventName) + .start(); + try { + super.raiseEvent(workflowInstanceId, eventName, eventPayload); + } catch (RuntimeException e) { + obs.error(e); + throw e; + } finally { + obs.stop(); + } + } + + // ------------------------------------------------------------------------- + // Cleanup + // ------------------------------------------------------------------------- + + @Override + public boolean purgeWorkflow(String workflowInstanceId) { + Observation obs = observation("dapr.workflow.purge") + .highCardinalityKeyValue("dapr.workflow.instance_id", workflowInstanceId) + .start(); + try { + return super.purgeWorkflow(workflowInstanceId); + } catch (RuntimeException e) { + obs.error(e); + throw e; + } finally { + obs.stop(); + } + } + + // Deprecated methods (getInstanceState, waitForInstanceStart, waitForInstanceCompletion, + // purgeInstance) are intentionally not overridden — they fall through to the parent + // implementation without any observation. +} diff --git a/dapr-spring/dapr-spring-boot-autoconfigure/pom.xml b/dapr-spring/dapr-spring-boot-autoconfigure/pom.xml index 59ad076197..240915674d 100644 --- a/dapr-spring/dapr-spring-boot-autoconfigure/pom.xml +++ b/dapr-spring/dapr-spring-boot-autoconfigure/pom.xml @@ -53,6 +53,16 @@ org.springframework spring-context + + io.micrometer + micrometer-observation + true + + + io.micrometer + micrometer-observation-test + test + org.springframework.boot spring-boot-starter-web diff --git a/dapr-spring/dapr-spring-boot-autoconfigure/src/main/java/io/dapr/spring/boot/autoconfigure/client/DaprClientAutoConfiguration.java b/dapr-spring/dapr-spring-boot-autoconfigure/src/main/java/io/dapr/spring/boot/autoconfigure/client/DaprClientAutoConfiguration.java index 041e300d68..6b6dcac34b 100644 --- a/dapr-spring/dapr-spring-boot-autoconfigure/src/main/java/io/dapr/spring/boot/autoconfigure/client/DaprClientAutoConfiguration.java +++ b/dapr-spring/dapr-spring-boot-autoconfigure/src/main/java/io/dapr/spring/boot/autoconfigure/client/DaprClientAutoConfiguration.java @@ -23,6 +23,8 @@ import io.dapr.spring.boot.properties.client.DaprConnectionDetails; import io.dapr.workflows.client.DaprWorkflowClient; import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; +import io.micrometer.observation.ObservationRegistry; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; @@ -81,14 +83,25 @@ DaprClientBuilder daprClientBuilder(DaprConnectionDetails daprConnectionDetails) @Bean @ConditionalOnMissingBean - DaprClient daprClient(DaprClientBuilder daprClientBuilder) { - return daprClientBuilder.build(); + DaprClient daprClient(DaprClientBuilder daprClientBuilder, + ObjectProvider observationRegistryProvider) { + DaprClient client = daprClientBuilder.build(); + ObservationRegistry registry = observationRegistryProvider.getIfAvailable(); + if (registry != null && !registry.isNoop()) { + return new ObservationDaprClient(client, registry); + } + return client; } @Bean @ConditionalOnMissingBean - DaprWorkflowClient daprWorkflowClient(DaprConnectionDetails daprConnectionDetails) { + DaprWorkflowClient daprWorkflowClient(DaprConnectionDetails daprConnectionDetails, + ObjectProvider observationRegistryProvider) { Properties properties = createPropertiesFromConnectionDetails(daprConnectionDetails); + ObservationRegistry registry = observationRegistryProvider.getIfAvailable(); + if (registry != null && !registry.isNoop()) { + return new ObservationDaprWorkflowClient(properties, registry); + } return new DaprWorkflowClient(properties); } diff --git a/dapr-spring/dapr-spring-boot-autoconfigure/src/main/java/io/dapr/spring/boot/autoconfigure/client/ObservationDaprClient.java b/dapr-spring/dapr-spring-boot-autoconfigure/src/main/java/io/dapr/spring/boot/autoconfigure/client/ObservationDaprClient.java new file mode 100644 index 0000000000..4fa1839f2c --- /dev/null +++ b/dapr-spring/dapr-spring-boot-autoconfigure/src/main/java/io/dapr/spring/boot/autoconfigure/client/ObservationDaprClient.java @@ -0,0 +1,754 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.spring.boot.autoconfigure.client; + +import io.dapr.client.DaprClient; +import io.dapr.client.domain.BulkPublishRequest; +import io.dapr.client.domain.BulkPublishResponse; +import io.dapr.client.domain.ConfigurationItem; +import io.dapr.client.domain.DaprMetadata; +import io.dapr.client.domain.DeleteJobRequest; +import io.dapr.client.domain.DeleteStateRequest; +import io.dapr.client.domain.ExecuteStateTransactionRequest; +import io.dapr.client.domain.GetBulkSecretRequest; +import io.dapr.client.domain.GetBulkStateRequest; +import io.dapr.client.domain.GetConfigurationRequest; +import io.dapr.client.domain.GetJobRequest; +import io.dapr.client.domain.GetJobResponse; +import io.dapr.client.domain.GetSecretRequest; +import io.dapr.client.domain.GetStateRequest; +import io.dapr.client.domain.HttpExtension; +import io.dapr.client.domain.InvokeBindingRequest; +import io.dapr.client.domain.InvokeMethodRequest; +import io.dapr.client.domain.PublishEventRequest; +import io.dapr.client.domain.SaveStateRequest; +import io.dapr.client.domain.ScheduleJobRequest; +import io.dapr.client.domain.State; +import io.dapr.client.domain.StateOptions; +import io.dapr.client.domain.SubscribeConfigurationRequest; +import io.dapr.client.domain.SubscribeConfigurationResponse; +import io.dapr.client.domain.TransactionalStateOperation; +import io.dapr.client.domain.UnsubscribeConfigurationRequest; +import io.dapr.client.domain.UnsubscribeConfigurationResponse; +import io.dapr.utils.TypeRef; +import io.grpc.Channel; +import io.grpc.stub.AbstractStub; +import io.micrometer.observation.Observation; +import io.micrometer.observation.ObservationRegistry; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * A {@link DaprClient} decorator that creates Micrometer Observation spans (bridged to OpenTelemetry) + * for each non-deprecated method call. Consumers continue to use {@link DaprClient} as-is; no code + * changes are required on their side. + * + *

Deprecated methods are delegated directly without any observation. + */ +public class ObservationDaprClient implements DaprClient { + + private final DaprClient delegate; + private final ObservationRegistry observationRegistry; + + /** + * Creates a new {@code ObservationDaprClient}. + * + * @param delegate the underlying {@link DaprClient} to delegate calls to + * @param observationRegistry the Micrometer {@link ObservationRegistry} used to create spans + */ + public ObservationDaprClient(DaprClient delegate, ObservationRegistry observationRegistry) { + this.delegate = delegate; + this.observationRegistry = observationRegistry; + } + + // ------------------------------------------------------------------------- + // Internal helpers + // ------------------------------------------------------------------------- + + private Mono observe(Observation obs, Supplier> monoSupplier) { + obs.start(); + return monoSupplier.get() + .doOnError(obs::error) + .doFinally(signal -> obs.stop()); + } + + private Flux observeFlux(Observation obs, Supplier> fluxSupplier) { + obs.start(); + return fluxSupplier.get() + .doOnError(obs::error) + .doFinally(signal -> obs.stop()); + } + + private Observation observation(String name) { + return Observation.createNotStarted(name, observationRegistry); + } + + // ------------------------------------------------------------------------- + // Lifecycle + // ------------------------------------------------------------------------- + + @Override + public Mono waitForSidecar(int timeoutInMilliseconds) { + return observe(observation("dapr.client.wait_for_sidecar"), + () -> delegate.waitForSidecar(timeoutInMilliseconds)); + } + + @Override + public Mono shutdown() { + return observe(observation("dapr.client.shutdown"), + () -> delegate.shutdown()); + } + + @Override + public void close() throws Exception { + delegate.close(); + } + + // ------------------------------------------------------------------------- + // Pub/Sub + // ------------------------------------------------------------------------- + + @Override + public Mono publishEvent(String pubsubName, String topicName, Object data) { + return observe( + observation("dapr.client.publish_event") + .highCardinalityKeyValue("dapr.pubsub.name", pubsubName) + .highCardinalityKeyValue("dapr.topic.name", topicName), + () -> delegate.publishEvent(pubsubName, topicName, data)); + } + + @Override + public Mono publishEvent(String pubsubName, String topicName, Object data, + Map metadata) { + return observe( + observation("dapr.client.publish_event") + .highCardinalityKeyValue("dapr.pubsub.name", pubsubName) + .highCardinalityKeyValue("dapr.topic.name", topicName), + () -> delegate.publishEvent(pubsubName, topicName, data, metadata)); + } + + @Override + public Mono publishEvent(PublishEventRequest request) { + return observe( + observation("dapr.client.publish_event") + .highCardinalityKeyValue("dapr.pubsub.name", request.getPubsubName()) + .highCardinalityKeyValue("dapr.topic.name", request.getTopic()), + () -> delegate.publishEvent(request)); + } + + @Override + public Mono> publishEvents(BulkPublishRequest request) { + return observe( + observation("dapr.client.publish_events") + .highCardinalityKeyValue("dapr.pubsub.name", request.getPubsubName()) + .highCardinalityKeyValue("dapr.topic.name", request.getTopic()), + () -> delegate.publishEvents(request)); + } + + @Override + public Mono> publishEvents(String pubsubName, String topicName, + String contentType, List events) { + return observe( + observation("dapr.client.publish_events") + .highCardinalityKeyValue("dapr.pubsub.name", pubsubName) + .highCardinalityKeyValue("dapr.topic.name", topicName), + () -> delegate.publishEvents(pubsubName, topicName, contentType, events)); + } + + @Override + @SuppressWarnings("unchecked") + public Mono> publishEvents(String pubsubName, String topicName, + String contentType, T... events) { + return observe( + observation("dapr.client.publish_events") + .highCardinalityKeyValue("dapr.pubsub.name", pubsubName) + .highCardinalityKeyValue("dapr.topic.name", topicName), + () -> delegate.publishEvents(pubsubName, topicName, contentType, events)); + } + + @Override + public Mono> publishEvents(String pubsubName, String topicName, + String contentType, + Map requestMetadata, + List events) { + return observe( + observation("dapr.client.publish_events") + .highCardinalityKeyValue("dapr.pubsub.name", pubsubName) + .highCardinalityKeyValue("dapr.topic.name", topicName), + () -> delegate.publishEvents(pubsubName, topicName, contentType, requestMetadata, events)); + } + + @Override + @SuppressWarnings("unchecked") + public Mono> publishEvents(String pubsubName, String topicName, + String contentType, + Map requestMetadata, + T... events) { + return observe( + observation("dapr.client.publish_events") + .highCardinalityKeyValue("dapr.pubsub.name", pubsubName) + .highCardinalityKeyValue("dapr.topic.name", topicName), + () -> delegate.publishEvents(pubsubName, topicName, contentType, requestMetadata, events)); + } + + // ------------------------------------------------------------------------- + // Service Invocation — all deprecated, delegate directly + // ------------------------------------------------------------------------- + + @Override + @Deprecated + public Mono invokeMethod(String appId, String methodName, Object data, + HttpExtension httpExtension, Map metadata, + TypeRef type) { + return delegate.invokeMethod(appId, methodName, data, httpExtension, metadata, type); + } + + @Override + @Deprecated + public Mono invokeMethod(String appId, String methodName, Object request, + HttpExtension httpExtension, Map metadata, + Class clazz) { + return delegate.invokeMethod(appId, methodName, request, httpExtension, metadata, clazz); + } + + @Override + @Deprecated + public Mono invokeMethod(String appId, String methodName, Object request, + HttpExtension httpExtension, TypeRef type) { + return delegate.invokeMethod(appId, methodName, request, httpExtension, type); + } + + @Override + @Deprecated + public Mono invokeMethod(String appId, String methodName, Object request, + HttpExtension httpExtension, Class clazz) { + return delegate.invokeMethod(appId, methodName, request, httpExtension, clazz); + } + + @Override + @Deprecated + public Mono invokeMethod(String appId, String methodName, HttpExtension httpExtension, + Map metadata, TypeRef type) { + return delegate.invokeMethod(appId, methodName, httpExtension, metadata, type); + } + + @Override + @Deprecated + public Mono invokeMethod(String appId, String methodName, HttpExtension httpExtension, + Map metadata, Class clazz) { + return delegate.invokeMethod(appId, methodName, httpExtension, metadata, clazz); + } + + @Override + @Deprecated + public Mono invokeMethod(String appId, String methodName, Object request, + HttpExtension httpExtension, Map metadata) { + return delegate.invokeMethod(appId, methodName, request, httpExtension, metadata); + } + + @Override + @Deprecated + public Mono invokeMethod(String appId, String methodName, Object request, + HttpExtension httpExtension) { + return delegate.invokeMethod(appId, methodName, request, httpExtension); + } + + @Override + @Deprecated + public Mono invokeMethod(String appId, String methodName, HttpExtension httpExtension, + Map metadata) { + return delegate.invokeMethod(appId, methodName, httpExtension, metadata); + } + + @Override + @Deprecated + public Mono invokeMethod(String appId, String methodName, byte[] request, + HttpExtension httpExtension, Map metadata) { + return delegate.invokeMethod(appId, methodName, request, httpExtension, metadata); + } + + @Override + @Deprecated + public Mono invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef type) { + return delegate.invokeMethod(invokeMethodRequest, type); + } + + // ------------------------------------------------------------------------- + // Bindings + // ------------------------------------------------------------------------- + + @Override + public Mono invokeBinding(String bindingName, String operation, Object data) { + return observe( + observation("dapr.client.invoke_binding") + .highCardinalityKeyValue("dapr.binding.name", bindingName) + .highCardinalityKeyValue("dapr.binding.operation", operation), + () -> delegate.invokeBinding(bindingName, operation, data)); + } + + @Override + public Mono invokeBinding(String bindingName, String operation, byte[] data, + Map metadata) { + return observe( + observation("dapr.client.invoke_binding") + .highCardinalityKeyValue("dapr.binding.name", bindingName) + .highCardinalityKeyValue("dapr.binding.operation", operation), + () -> delegate.invokeBinding(bindingName, operation, data, metadata)); + } + + @Override + public Mono invokeBinding(String bindingName, String operation, Object data, + TypeRef type) { + return observe( + observation("dapr.client.invoke_binding") + .highCardinalityKeyValue("dapr.binding.name", bindingName) + .highCardinalityKeyValue("dapr.binding.operation", operation), + () -> delegate.invokeBinding(bindingName, operation, data, type)); + } + + @Override + public Mono invokeBinding(String bindingName, String operation, Object data, + Class clazz) { + return observe( + observation("dapr.client.invoke_binding") + .highCardinalityKeyValue("dapr.binding.name", bindingName) + .highCardinalityKeyValue("dapr.binding.operation", operation), + () -> delegate.invokeBinding(bindingName, operation, data, clazz)); + } + + @Override + public Mono invokeBinding(String bindingName, String operation, Object data, + Map metadata, TypeRef type) { + return observe( + observation("dapr.client.invoke_binding") + .highCardinalityKeyValue("dapr.binding.name", bindingName) + .highCardinalityKeyValue("dapr.binding.operation", operation), + () -> delegate.invokeBinding(bindingName, operation, data, metadata, type)); + } + + @Override + public Mono invokeBinding(String bindingName, String operation, Object data, + Map metadata, Class clazz) { + return observe( + observation("dapr.client.invoke_binding") + .highCardinalityKeyValue("dapr.binding.name", bindingName) + .highCardinalityKeyValue("dapr.binding.operation", operation), + () -> delegate.invokeBinding(bindingName, operation, data, metadata, clazz)); + } + + @Override + public Mono invokeBinding(InvokeBindingRequest request) { + return observe( + observation("dapr.client.invoke_binding") + .highCardinalityKeyValue("dapr.binding.name", request.getName()) + .highCardinalityKeyValue("dapr.binding.operation", request.getOperation()), + () -> delegate.invokeBinding(request)); + } + + @Override + public Mono invokeBinding(InvokeBindingRequest request, TypeRef type) { + return observe( + observation("dapr.client.invoke_binding") + .highCardinalityKeyValue("dapr.binding.name", request.getName()) + .highCardinalityKeyValue("dapr.binding.operation", request.getOperation()), + () -> delegate.invokeBinding(request, type)); + } + + // ------------------------------------------------------------------------- + // State Management + // ------------------------------------------------------------------------- + + @Override + public Mono> getState(String storeName, State state, TypeRef type) { + return observe( + observation("dapr.client.get_state") + .highCardinalityKeyValue("dapr.store.name", storeName) + .highCardinalityKeyValue("dapr.state.key", state.getKey()), + () -> delegate.getState(storeName, state, type)); + } + + @Override + public Mono> getState(String storeName, State state, Class clazz) { + return observe( + observation("dapr.client.get_state") + .highCardinalityKeyValue("dapr.store.name", storeName) + .highCardinalityKeyValue("dapr.state.key", state.getKey()), + () -> delegate.getState(storeName, state, clazz)); + } + + @Override + public Mono> getState(String storeName, String key, TypeRef type) { + return observe( + observation("dapr.client.get_state") + .highCardinalityKeyValue("dapr.store.name", storeName) + .highCardinalityKeyValue("dapr.state.key", key), + () -> delegate.getState(storeName, key, type)); + } + + @Override + public Mono> getState(String storeName, String key, Class clazz) { + return observe( + observation("dapr.client.get_state") + .highCardinalityKeyValue("dapr.store.name", storeName) + .highCardinalityKeyValue("dapr.state.key", key), + () -> delegate.getState(storeName, key, clazz)); + } + + @Override + public Mono> getState(String storeName, String key, StateOptions options, + TypeRef type) { + return observe( + observation("dapr.client.get_state") + .highCardinalityKeyValue("dapr.store.name", storeName) + .highCardinalityKeyValue("dapr.state.key", key), + () -> delegate.getState(storeName, key, options, type)); + } + + @Override + public Mono> getState(String storeName, String key, StateOptions options, + Class clazz) { + return observe( + observation("dapr.client.get_state") + .highCardinalityKeyValue("dapr.store.name", storeName) + .highCardinalityKeyValue("dapr.state.key", key), + () -> delegate.getState(storeName, key, options, clazz)); + } + + @Override + public Mono> getState(GetStateRequest request, TypeRef type) { + return observe( + observation("dapr.client.get_state") + .highCardinalityKeyValue("dapr.store.name", request.getStoreName()) + .highCardinalityKeyValue("dapr.state.key", request.getKey()), + () -> delegate.getState(request, type)); + } + + @Override + public Mono>> getBulkState(String storeName, List keys, + TypeRef type) { + return observe( + observation("dapr.client.get_bulk_state") + .highCardinalityKeyValue("dapr.store.name", storeName), + () -> delegate.getBulkState(storeName, keys, type)); + } + + @Override + public Mono>> getBulkState(String storeName, List keys, + Class clazz) { + return observe( + observation("dapr.client.get_bulk_state") + .highCardinalityKeyValue("dapr.store.name", storeName), + () -> delegate.getBulkState(storeName, keys, clazz)); + } + + @Override + public Mono>> getBulkState(GetBulkStateRequest request, TypeRef type) { + return observe( + observation("dapr.client.get_bulk_state") + .highCardinalityKeyValue("dapr.store.name", request.getStoreName()), + () -> delegate.getBulkState(request, type)); + } + + @Override + public Mono executeStateTransaction(String storeName, + List> operations) { + return observe( + observation("dapr.client.execute_state_transaction") + .highCardinalityKeyValue("dapr.store.name", storeName), + () -> delegate.executeStateTransaction(storeName, operations)); + } + + @Override + public Mono executeStateTransaction(ExecuteStateTransactionRequest request) { + return observe( + observation("dapr.client.execute_state_transaction") + .highCardinalityKeyValue("dapr.store.name", request.getStateStoreName()), + () -> delegate.executeStateTransaction(request)); + } + + @Override + public Mono saveBulkState(String storeName, List> states) { + return observe( + observation("dapr.client.save_bulk_state") + .highCardinalityKeyValue("dapr.store.name", storeName), + () -> delegate.saveBulkState(storeName, states)); + } + + @Override + public Mono saveBulkState(SaveStateRequest request) { + return observe( + observation("dapr.client.save_bulk_state") + .highCardinalityKeyValue("dapr.store.name", request.getStoreName()), + () -> delegate.saveBulkState(request)); + } + + @Override + public Mono saveState(String storeName, String key, Object value) { + return observe( + observation("dapr.client.save_state") + .highCardinalityKeyValue("dapr.store.name", storeName) + .highCardinalityKeyValue("dapr.state.key", key), + () -> delegate.saveState(storeName, key, value)); + } + + @Override + public Mono saveState(String storeName, String key, String etag, Object value, + StateOptions options) { + return observe( + observation("dapr.client.save_state") + .highCardinalityKeyValue("dapr.store.name", storeName) + .highCardinalityKeyValue("dapr.state.key", key), + () -> delegate.saveState(storeName, key, etag, value, options)); + } + + @Override + public Mono saveState(String storeName, String key, String etag, Object value, + Map meta, StateOptions options) { + return observe( + observation("dapr.client.save_state") + .highCardinalityKeyValue("dapr.store.name", storeName) + .highCardinalityKeyValue("dapr.state.key", key), + () -> delegate.saveState(storeName, key, etag, value, meta, options)); + } + + @Override + public Mono deleteState(String storeName, String key) { + return observe( + observation("dapr.client.delete_state") + .highCardinalityKeyValue("dapr.store.name", storeName) + .highCardinalityKeyValue("dapr.state.key", key), + () -> delegate.deleteState(storeName, key)); + } + + @Override + public Mono deleteState(String storeName, String key, String etag, + StateOptions options) { + return observe( + observation("dapr.client.delete_state") + .highCardinalityKeyValue("dapr.store.name", storeName) + .highCardinalityKeyValue("dapr.state.key", key), + () -> delegate.deleteState(storeName, key, etag, options)); + } + + @Override + public Mono deleteState(DeleteStateRequest request) { + return observe( + observation("dapr.client.delete_state") + .highCardinalityKeyValue("dapr.store.name", request.getStateStoreName()) + .highCardinalityKeyValue("dapr.state.key", request.getKey()), + () -> delegate.deleteState(request)); + } + + // ------------------------------------------------------------------------- + // Secrets + // ------------------------------------------------------------------------- + + @Override + public Mono> getSecret(String storeName, String secretName, + Map metadata) { + return observe( + observation("dapr.client.get_secret") + .highCardinalityKeyValue("dapr.secret.store", storeName) + .highCardinalityKeyValue("dapr.secret.name", secretName), + () -> delegate.getSecret(storeName, secretName, metadata)); + } + + @Override + public Mono> getSecret(String storeName, String secretName) { + return observe( + observation("dapr.client.get_secret") + .highCardinalityKeyValue("dapr.secret.store", storeName) + .highCardinalityKeyValue("dapr.secret.name", secretName), + () -> delegate.getSecret(storeName, secretName)); + } + + @Override + public Mono> getSecret(GetSecretRequest request) { + return observe( + observation("dapr.client.get_secret") + .highCardinalityKeyValue("dapr.secret.store", request.getStoreName()) + .highCardinalityKeyValue("dapr.secret.name", request.getKey()), + () -> delegate.getSecret(request)); + } + + @Override + public Mono>> getBulkSecret(String storeName) { + return observe( + observation("dapr.client.get_bulk_secret") + .highCardinalityKeyValue("dapr.secret.store", storeName), + () -> delegate.getBulkSecret(storeName)); + } + + @Override + public Mono>> getBulkSecret(String storeName, + Map metadata) { + return observe( + observation("dapr.client.get_bulk_secret") + .highCardinalityKeyValue("dapr.secret.store", storeName), + () -> delegate.getBulkSecret(storeName, metadata)); + } + + @Override + public Mono>> getBulkSecret(GetBulkSecretRequest request) { + return observe( + observation("dapr.client.get_bulk_secret") + .highCardinalityKeyValue("dapr.secret.store", request.getStoreName()), + () -> delegate.getBulkSecret(request)); + } + + // ------------------------------------------------------------------------- + // Configuration + // ------------------------------------------------------------------------- + + @Override + public Mono getConfiguration(String storeName, String key) { + return observe( + observation("dapr.client.get_configuration") + .highCardinalityKeyValue("dapr.configuration.store", storeName) + .highCardinalityKeyValue("dapr.configuration.key", key), + () -> delegate.getConfiguration(storeName, key)); + } + + @Override + public Mono getConfiguration(String storeName, String key, + Map metadata) { + return observe( + observation("dapr.client.get_configuration") + .highCardinalityKeyValue("dapr.configuration.store", storeName) + .highCardinalityKeyValue("dapr.configuration.key", key), + () -> delegate.getConfiguration(storeName, key, metadata)); + } + + @Override + public Mono> getConfiguration(String storeName, String... keys) { + return observe( + observation("dapr.client.get_configuration") + .highCardinalityKeyValue("dapr.configuration.store", storeName), + () -> delegate.getConfiguration(storeName, keys)); + } + + @Override + public Mono> getConfiguration(String storeName, List keys, + Map metadata) { + return observe( + observation("dapr.client.get_configuration") + .highCardinalityKeyValue("dapr.configuration.store", storeName), + () -> delegate.getConfiguration(storeName, keys, metadata)); + } + + @Override + public Mono> getConfiguration(GetConfigurationRequest request) { + return observe( + observation("dapr.client.get_configuration") + .highCardinalityKeyValue("dapr.configuration.store", request.getStoreName()), + () -> delegate.getConfiguration(request)); + } + + @Override + public Flux subscribeConfiguration(String storeName, + String... keys) { + return observeFlux( + observation("dapr.client.subscribe_configuration") + .highCardinalityKeyValue("dapr.configuration.store", storeName), + () -> delegate.subscribeConfiguration(storeName, keys)); + } + + @Override + public Flux subscribeConfiguration(String storeName, + List keys, + Map metadata) { + return observeFlux( + observation("dapr.client.subscribe_configuration") + .highCardinalityKeyValue("dapr.configuration.store", storeName), + () -> delegate.subscribeConfiguration(storeName, keys, metadata)); + } + + @Override + public Flux subscribeConfiguration( + SubscribeConfigurationRequest request) { + return observeFlux( + observation("dapr.client.subscribe_configuration") + .highCardinalityKeyValue("dapr.configuration.store", request.getStoreName()), + () -> delegate.subscribeConfiguration(request)); + } + + @Override + public Mono unsubscribeConfiguration(String id, + String storeName) { + return observe( + observation("dapr.client.unsubscribe_configuration") + .highCardinalityKeyValue("dapr.configuration.store", storeName), + () -> delegate.unsubscribeConfiguration(id, storeName)); + } + + @Override + public Mono unsubscribeConfiguration( + UnsubscribeConfigurationRequest request) { + return observe( + observation("dapr.client.unsubscribe_configuration") + .highCardinalityKeyValue("dapr.configuration.store", request.getStoreName()), + () -> delegate.unsubscribeConfiguration(request)); + } + + // ------------------------------------------------------------------------- + // gRPC Stub — no remote call at creation time, no observation needed + // ------------------------------------------------------------------------- + + @Override + public > T newGrpcStub(String appId, Function stubBuilder) { + return delegate.newGrpcStub(appId, stubBuilder); + } + + // ------------------------------------------------------------------------- + // Metadata + // ------------------------------------------------------------------------- + + @Override + public Mono getMetadata() { + return observe(observation("dapr.client.get_metadata"), () -> delegate.getMetadata()); + } + + // ------------------------------------------------------------------------- + // Jobs + // ------------------------------------------------------------------------- + + @Override + public Mono scheduleJob(ScheduleJobRequest scheduleJobRequest) { + return observe( + observation("dapr.client.schedule_job") + .highCardinalityKeyValue("dapr.job.name", scheduleJobRequest.getName()), + () -> delegate.scheduleJob(scheduleJobRequest)); + } + + @Override + public Mono getJob(GetJobRequest getJobRequest) { + return observe( + observation("dapr.client.get_job") + .highCardinalityKeyValue("dapr.job.name", getJobRequest.getName()), + () -> delegate.getJob(getJobRequest)); + } + + @Override + public Mono deleteJob(DeleteJobRequest deleteJobRequest) { + return observe( + observation("dapr.client.delete_job") + .highCardinalityKeyValue("dapr.job.name", deleteJobRequest.getName()), + () -> delegate.deleteJob(deleteJobRequest)); + } +} diff --git a/dapr-spring/dapr-spring-boot-autoconfigure/src/main/java/io/dapr/spring/boot/autoconfigure/client/ObservationDaprWorkflowClient.java b/dapr-spring/dapr-spring-boot-autoconfigure/src/main/java/io/dapr/spring/boot/autoconfigure/client/ObservationDaprWorkflowClient.java new file mode 100644 index 0000000000..40351b504f --- /dev/null +++ b/dapr-spring/dapr-spring-boot-autoconfigure/src/main/java/io/dapr/spring/boot/autoconfigure/client/ObservationDaprWorkflowClient.java @@ -0,0 +1,293 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.spring.boot.autoconfigure.client; + +import io.dapr.config.Properties; +import io.dapr.workflows.Workflow; +import io.dapr.workflows.client.DaprWorkflowClient; +import io.dapr.workflows.client.NewWorkflowOptions; +import io.dapr.workflows.client.WorkflowState; +import io.micrometer.observation.Observation; +import io.micrometer.observation.ObservationRegistry; + +import javax.annotation.Nullable; +import java.time.Duration; +import java.util.concurrent.TimeoutException; + +/** + * A {@link DaprWorkflowClient} subclass that creates Micrometer Observation spans (bridged to + * OpenTelemetry) for each non-deprecated method call. + * + *

Because this class extends {@link DaprWorkflowClient}, consumers can keep injecting + * {@code DaprWorkflowClient} without any code changes. Deprecated methods fall through to the + * parent implementation without any observation. + * + *

Constructor note: calling {@code super(properties)} eagerly creates a gRPC + * {@code ManagedChannel}, but the actual TCP connection is established lazily on the first RPC call, + * so construction succeeds even when the Dapr sidecar is not yet available. + */ +public class ObservationDaprWorkflowClient extends DaprWorkflowClient { + + private final ObservationRegistry observationRegistry; + + /** + * Creates a new {@code ObservationDaprWorkflowClient}. + * + * @param properties connection properties for the underlying gRPC channel + * @param observationRegistry the Micrometer {@link ObservationRegistry} used to create spans + */ + public ObservationDaprWorkflowClient(Properties properties, + ObservationRegistry observationRegistry) { + super(properties); + this.observationRegistry = observationRegistry; + } + + // ------------------------------------------------------------------------- + // Internal helpers + // ------------------------------------------------------------------------- + + private Observation observation(String name) { + return Observation.createNotStarted(name, observationRegistry); + } + + // ------------------------------------------------------------------------- + // scheduleNewWorkflow — only String-based "leaf" overloads are overridden. + // Class-based overloads in the parent delegate to this.scheduleNewWorkflow(String, ...) + // via dynamic dispatch, so they naturally pick up these observations. + // ------------------------------------------------------------------------- + + @Override + public String scheduleNewWorkflow(String name) { + Observation obs = observation("dapr.workflow.schedule") + .highCardinalityKeyValue("dapr.workflow.name", name) + .start(); + try { + return super.scheduleNewWorkflow(name); + } catch (RuntimeException e) { + obs.error(e); + throw e; + } finally { + obs.stop(); + } + } + + @Override + public String scheduleNewWorkflow(String name, Object input) { + Observation obs = observation("dapr.workflow.schedule") + .highCardinalityKeyValue("dapr.workflow.name", name) + .start(); + try { + return super.scheduleNewWorkflow(name, input); + } catch (RuntimeException e) { + obs.error(e); + throw e; + } finally { + obs.stop(); + } + } + + @Override + public String scheduleNewWorkflow(String name, Object input, + String instanceId) { + Observation obs = observation("dapr.workflow.schedule") + .highCardinalityKeyValue("dapr.workflow.name", name) + .highCardinalityKeyValue("dapr.workflow.instance_id", + instanceId != null ? instanceId : "") + .start(); + try { + return super.scheduleNewWorkflow(name, input, instanceId); + } catch (RuntimeException e) { + obs.error(e); + throw e; + } finally { + obs.stop(); + } + } + + @Override + public String scheduleNewWorkflow(String name, + NewWorkflowOptions options) { + String instanceId = options != null && options.getInstanceId() != null + ? options.getInstanceId() : ""; + Observation obs = observation("dapr.workflow.schedule") + .highCardinalityKeyValue("dapr.workflow.name", name) + .highCardinalityKeyValue("dapr.workflow.instance_id", instanceId) + .start(); + try { + return super.scheduleNewWorkflow(name, options); + } catch (RuntimeException e) { + obs.error(e); + throw e; + } finally { + obs.stop(); + } + } + + // ------------------------------------------------------------------------- + // Lifecycle operations + // ------------------------------------------------------------------------- + + @Override + public void suspendWorkflow(String workflowInstanceId, @Nullable String reason) { + Observation obs = observation("dapr.workflow.suspend") + .highCardinalityKeyValue("dapr.workflow.instance_id", workflowInstanceId) + .start(); + try { + super.suspendWorkflow(workflowInstanceId, reason); + } catch (RuntimeException e) { + obs.error(e); + throw e; + } finally { + obs.stop(); + } + } + + @Override + public void resumeWorkflow(String workflowInstanceId, @Nullable String reason) { + Observation obs = observation("dapr.workflow.resume") + .highCardinalityKeyValue("dapr.workflow.instance_id", workflowInstanceId) + .start(); + try { + super.resumeWorkflow(workflowInstanceId, reason); + } catch (RuntimeException e) { + obs.error(e); + throw e; + } finally { + obs.stop(); + } + } + + @Override + public void terminateWorkflow(String workflowInstanceId, @Nullable Object output) { + Observation obs = observation("dapr.workflow.terminate") + .highCardinalityKeyValue("dapr.workflow.instance_id", workflowInstanceId) + .start(); + try { + super.terminateWorkflow(workflowInstanceId, output); + } catch (RuntimeException e) { + obs.error(e); + throw e; + } finally { + obs.stop(); + } + } + + // ------------------------------------------------------------------------- + // State queries + // ------------------------------------------------------------------------- + + @Override + @Nullable + public WorkflowState getWorkflowState(String instanceId, boolean getInputsAndOutputs) { + Observation obs = observation("dapr.workflow.get_state") + .highCardinalityKeyValue("dapr.workflow.instance_id", instanceId) + .start(); + try { + return super.getWorkflowState(instanceId, getInputsAndOutputs); + } catch (RuntimeException e) { + obs.error(e); + throw e; + } finally { + obs.stop(); + } + } + + // ------------------------------------------------------------------------- + // Waiting + // ------------------------------------------------------------------------- + + @Override + @Nullable + public WorkflowState waitForWorkflowStart(String instanceId, Duration timeout, + boolean getInputsAndOutputs) throws TimeoutException { + Observation obs = observation("dapr.workflow.wait_start") + .highCardinalityKeyValue("dapr.workflow.instance_id", instanceId) + .start(); + try { + return super.waitForWorkflowStart(instanceId, timeout, getInputsAndOutputs); + } catch (TimeoutException e) { + obs.error(e); + throw e; + } catch (RuntimeException e) { + obs.error(e); + throw e; + } finally { + obs.stop(); + } + } + + @Override + @Nullable + public WorkflowState waitForWorkflowCompletion(String instanceId, Duration timeout, + boolean getInputsAndOutputs) + throws TimeoutException { + Observation obs = observation("dapr.workflow.wait_completion") + .highCardinalityKeyValue("dapr.workflow.instance_id", instanceId) + .start(); + try { + return super.waitForWorkflowCompletion(instanceId, timeout, getInputsAndOutputs); + } catch (TimeoutException e) { + obs.error(e); + throw e; + } catch (RuntimeException e) { + obs.error(e); + throw e; + } finally { + obs.stop(); + } + } + + // ------------------------------------------------------------------------- + // Events + // ------------------------------------------------------------------------- + + @Override + public void raiseEvent(String workflowInstanceId, String eventName, Object eventPayload) { + Observation obs = observation("dapr.workflow.raise_event") + .highCardinalityKeyValue("dapr.workflow.instance_id", workflowInstanceId) + .highCardinalityKeyValue("dapr.workflow.event_name", eventName) + .start(); + try { + super.raiseEvent(workflowInstanceId, eventName, eventPayload); + } catch (RuntimeException e) { + obs.error(e); + throw e; + } finally { + obs.stop(); + } + } + + // ------------------------------------------------------------------------- + // Cleanup + // ------------------------------------------------------------------------- + + @Override + public boolean purgeWorkflow(String workflowInstanceId) { + Observation obs = observation("dapr.workflow.purge") + .highCardinalityKeyValue("dapr.workflow.instance_id", workflowInstanceId) + .start(); + try { + return super.purgeWorkflow(workflowInstanceId); + } catch (RuntimeException e) { + obs.error(e); + throw e; + } finally { + obs.stop(); + } + } + + // Deprecated methods (getInstanceState, waitForInstanceStart, waitForInstanceCompletion, + // purgeInstance) are intentionally not overridden — they fall through to the parent + // implementation without any observation. +} diff --git a/dapr-spring/dapr-spring-boot-autoconfigure/src/test/java/io/dapr/spring/boot/autoconfigure/client/DaprClientObservationAutoConfigurationTest.java b/dapr-spring/dapr-spring-boot-autoconfigure/src/test/java/io/dapr/spring/boot/autoconfigure/client/DaprClientObservationAutoConfigurationTest.java new file mode 100644 index 0000000000..d73945b7c4 --- /dev/null +++ b/dapr-spring/dapr-spring-boot-autoconfigure/src/test/java/io/dapr/spring/boot/autoconfigure/client/DaprClientObservationAutoConfigurationTest.java @@ -0,0 +1,159 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.spring.boot.autoconfigure.client; + +import io.dapr.client.DaprClient; +import io.dapr.client.DaprClientBuilder; +import io.dapr.workflows.client.DaprWorkflowClient; +import io.micrometer.observation.ObservationRegistry; +import io.micrometer.observation.tck.TestObservationRegistry; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Integration tests for the observation wiring in {@link DaprClientAutoConfiguration}. + * + *

Verifies two key requirements: + *

    + *
  1. When a non-noop {@link ObservationRegistry} is present, both {@code DaprClient} and + * {@code DaprWorkflowClient} beans are wrapped with observation decorators.
  2. + *
  3. Consumers inject the beans by their base types ({@code DaprClient}, + * {@code DaprWorkflowClient}) — no code changes needed.
  4. + *
+ */ +class DaprClientObservationAutoConfigurationTest { + + private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of(DaprClientAutoConfiguration.class)); + + // ------------------------------------------------------------------------- + // Without ObservationRegistry — plain beans + // ------------------------------------------------------------------------- + + @Test + @DisplayName("DaprClient is a plain client when no ObservationRegistry is present") + void daprClientIsPlainWhenNoRegistry() { + contextRunner + .withBean(DaprClientBuilder.class, () -> mockBuilderReturningMockClient()) + .run(context -> { + assertThat(context).hasSingleBean(DaprClient.class); + assertThat(context.getBean(DaprClient.class)) + .isNotInstanceOf(ObservationDaprClient.class); + }); + } + + // ------------------------------------------------------------------------- + // With ObservationRegistry — wrapped beans + // ------------------------------------------------------------------------- + + @Test + @DisplayName("DaprClient is ObservationDaprClient when a non-noop ObservationRegistry is present") + void daprClientIsObservationWrappedWhenRegistryPresent() { + contextRunner + .withBean(DaprClientBuilder.class, () -> mockBuilderReturningMockClient()) + .withBean(ObservationRegistry.class, TestObservationRegistry::create) + .run(context -> { + assertThat(context).hasSingleBean(DaprClient.class); + assertThat(context.getBean(DaprClient.class)) + .isInstanceOf(ObservationDaprClient.class); + }); + } + + @Test + @DisplayName("DaprWorkflowClient is ObservationDaprWorkflowClient when a non-noop registry is present") + void daprWorkflowClientIsObservationWrappedWhenRegistryPresent() { + contextRunner + .withBean(DaprClientBuilder.class, () -> mockBuilderReturningMockClient()) + .withBean(ObservationRegistry.class, TestObservationRegistry::create) + .run(context -> { + assertThat(context).hasSingleBean(DaprWorkflowClient.class); + assertThat(context.getBean(DaprWorkflowClient.class)) + .isInstanceOf(ObservationDaprWorkflowClient.class); + }); + } + + // ------------------------------------------------------------------------- + // Transparency — beans remain injectable by base type + // ------------------------------------------------------------------------- + + @Test + @DisplayName("Consumers can inject DaprClient by its base type regardless of wrapping") + void daprClientInjectableByBaseType() { + contextRunner + .withBean(DaprClientBuilder.class, () -> mockBuilderReturningMockClient()) + .withBean(ObservationRegistry.class, TestObservationRegistry::create) + .run(context -> { + // Injecting as DaprClient works — no ClassCastException, no code changes needed + DaprClient client = context.getBean(DaprClient.class); + assertThat(client).isNotNull(); + assertThat(client).isInstanceOf(DaprClient.class); + }); + } + + @Test + @DisplayName("Consumers can inject DaprWorkflowClient by its base type regardless of wrapping") + void daprWorkflowClientInjectableByBaseType() { + contextRunner + .withBean(DaprClientBuilder.class, () -> mockBuilderReturningMockClient()) + .withBean(ObservationRegistry.class, TestObservationRegistry::create) + .run(context -> { + // Injecting as DaprWorkflowClient works — ObservationDaprWorkflowClient IS-A DaprWorkflowClient + DaprWorkflowClient workflowClient = context.getBean(DaprWorkflowClient.class); + assertThat(workflowClient).isNotNull(); + assertThat(workflowClient).isInstanceOf(DaprWorkflowClient.class); + }); + } + + @Test + @DisplayName("Noop ObservationRegistry results in plain (unwrapped) DaprClient") + void noopRegistryResultsInPlainClient() { + contextRunner + .withBean(DaprClientBuilder.class, () -> mockBuilderReturningMockClient()) + .withBean(ObservationRegistry.class, ObservationRegistry::create) // NOOP registry + .run(context -> { + assertThat(context).hasSingleBean(DaprClient.class); + assertThat(context.getBean(DaprClient.class)) + .isNotInstanceOf(ObservationDaprClient.class); + }); + } + + @Test + @DisplayName("User-provided DaprClient bean is not replaced by autoconfiguration") + void userProvidedDaprClientIsNotReplaced() { + DaprClient userClient = mock(DaprClient.class); + contextRunner + .withBean(ObservationRegistry.class, TestObservationRegistry::create) + .withBean(DaprClient.class, () -> userClient) + .run(context -> { + assertThat(context).hasSingleBean(DaprClient.class); + assertThat(context.getBean(DaprClient.class)).isSameAs(userClient); + }); + } + + // ------------------------------------------------------------------------- + // Helpers + // ------------------------------------------------------------------------- + + private static DaprClientBuilder mockBuilderReturningMockClient() { + DaprClientBuilder builder = mock(DaprClientBuilder.class); + when(builder.build()).thenReturn(mock(DaprClient.class)); + return builder; + } +} diff --git a/dapr-spring/dapr-spring-boot-autoconfigure/src/test/java/io/dapr/spring/boot/autoconfigure/client/ObservationDaprClientTest.java b/dapr-spring/dapr-spring-boot-autoconfigure/src/test/java/io/dapr/spring/boot/autoconfigure/client/ObservationDaprClientTest.java new file mode 100644 index 0000000000..76d9bafd24 --- /dev/null +++ b/dapr-spring/dapr-spring-boot-autoconfigure/src/test/java/io/dapr/spring/boot/autoconfigure/client/ObservationDaprClientTest.java @@ -0,0 +1,367 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.spring.boot.autoconfigure.client; + +import io.dapr.client.DaprClient; +import io.dapr.client.domain.DeleteStateRequest; +import io.dapr.client.domain.GetSecretRequest; +import io.dapr.client.domain.GetStateRequest; +import io.dapr.client.domain.InvokeBindingRequest; +import io.dapr.client.domain.PublishEventRequest; +import io.dapr.client.domain.ScheduleJobRequest; +import io.dapr.client.domain.State; +import io.micrometer.observation.tck.TestObservationRegistry; +import io.micrometer.observation.tck.TestObservationRegistryAssert; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link ObservationDaprClient}. + * + *

Verifies two key requirements: + *

    + *
  1. Each non-deprecated method creates a correctly named Micrometer Observation span.
  2. + *
  3. The wrapper is fully transparent: it implements {@link DaprClient}, so consumers + * keep injecting {@code DaprClient} without any code changes.
  4. + *
+ */ +@ExtendWith(MockitoExtension.class) +class ObservationDaprClientTest { + + @Mock + private DaprClient delegate; + + private TestObservationRegistry registry; + private ObservationDaprClient client; + + @BeforeEach + void setUp() { + registry = TestObservationRegistry.create(); + client = new ObservationDaprClient(delegate, registry); + } + + // ------------------------------------------------------------------------- + // Transparency — the wrapper IS-A DaprClient + // ------------------------------------------------------------------------- + + @Test + @DisplayName("ObservationDaprClient is assignable to DaprClient (transparent to consumers)") + void isAssignableToDaprClient() { + assertThat(client).isInstanceOf(DaprClient.class); + } + + // ------------------------------------------------------------------------- + // Pub/Sub + // ------------------------------------------------------------------------- + + @Test + @DisplayName("publishEvent(pubsubName, topicName, data) creates span dapr.client.publish_event") + void publishEventCreatesSpan() { + when(delegate.publishEvent("my-pubsub", "my-topic", "payload")).thenReturn(Mono.empty()); + + client.publishEvent("my-pubsub", "my-topic", "payload").block(); + + TestObservationRegistryAssert.assertThat(registry) + .hasObservationWithNameEqualTo("dapr.client.publish_event") + .that() + .hasHighCardinalityKeyValue("dapr.pubsub.name", "my-pubsub") + .hasHighCardinalityKeyValue("dapr.topic.name", "my-topic"); + } + + @Test + @DisplayName("publishEvent(PublishEventRequest) creates span dapr.client.publish_event") + void publishEventRequestCreatesSpan() { + PublishEventRequest request = new PublishEventRequest("my-pubsub", "my-topic", "payload"); + when(delegate.publishEvent(request)).thenReturn(Mono.empty()); + + client.publishEvent(request).block(); + + TestObservationRegistryAssert.assertThat(registry) + .hasObservationWithNameEqualTo("dapr.client.publish_event"); + } + + @Test + @DisplayName("publishEvent span records error when delegate throws") + void publishEventRecordsError() { + RuntimeException boom = new RuntimeException("publish failed"); + when(delegate.publishEvent("pubsub", "topic", "data")).thenReturn(Mono.error(boom)); + + assertThatThrownBy(() -> client.publishEvent("pubsub", "topic", "data").block()) + .isInstanceOf(RuntimeException.class); + + TestObservationRegistryAssert.assertThat(registry) + .hasObservationWithNameEqualTo("dapr.client.publish_event") + .that() + .hasError(); + } + + // ------------------------------------------------------------------------- + // Bindings + // ------------------------------------------------------------------------- + + @Test + @DisplayName("invokeBinding(name, operation, data) creates span dapr.client.invoke_binding") + void invokeBindingCreatesSpan() { + when(delegate.invokeBinding("my-binding", "create", "data")).thenReturn(Mono.empty()); + + client.invokeBinding("my-binding", "create", "data").block(); + + TestObservationRegistryAssert.assertThat(registry) + .hasObservationWithNameEqualTo("dapr.client.invoke_binding") + .that() + .hasHighCardinalityKeyValue("dapr.binding.name", "my-binding") + .hasHighCardinalityKeyValue("dapr.binding.operation", "create"); + } + + @Test + @DisplayName("invokeBinding(InvokeBindingRequest) creates span dapr.client.invoke_binding") + void invokeBindingRequestCreatesSpan() { + InvokeBindingRequest request = new InvokeBindingRequest("my-binding", "create"); + when(delegate.invokeBinding(request)).thenReturn(Mono.empty()); + + client.invokeBinding(request).block(); + + TestObservationRegistryAssert.assertThat(registry) + .hasObservationWithNameEqualTo("dapr.client.invoke_binding"); + } + + // ------------------------------------------------------------------------- + // State + // ------------------------------------------------------------------------- + + @Test + @DisplayName("getState(storeName, key, Class) creates span dapr.client.get_state") + void getStateCreatesSpan() { + when(delegate.getState("my-store", "my-key", String.class)).thenReturn(Mono.empty()); + + client.getState("my-store", "my-key", String.class).block(); + + TestObservationRegistryAssert.assertThat(registry) + .hasObservationWithNameEqualTo("dapr.client.get_state") + .that() + .hasHighCardinalityKeyValue("dapr.store.name", "my-store") + .hasHighCardinalityKeyValue("dapr.state.key", "my-key"); + } + + @Test + @DisplayName("saveState creates span dapr.client.save_state") + void saveStateCreatesSpan() { + when(delegate.saveState("my-store", "my-key", "value")).thenReturn(Mono.empty()); + + client.saveState("my-store", "my-key", "value").block(); + + TestObservationRegistryAssert.assertThat(registry) + .hasObservationWithNameEqualTo("dapr.client.save_state") + .that() + .hasHighCardinalityKeyValue("dapr.store.name", "my-store") + .hasHighCardinalityKeyValue("dapr.state.key", "my-key"); + } + + @Test + @DisplayName("deleteState creates span dapr.client.delete_state") + void deleteStateCreatesSpan() { + when(delegate.deleteState("my-store", "my-key")).thenReturn(Mono.empty()); + + client.deleteState("my-store", "my-key").block(); + + TestObservationRegistryAssert.assertThat(registry) + .hasObservationWithNameEqualTo("dapr.client.delete_state") + .that() + .hasHighCardinalityKeyValue("dapr.store.name", "my-store") + .hasHighCardinalityKeyValue("dapr.state.key", "my-key"); + } + + @Test + @DisplayName("getBulkState creates span dapr.client.get_bulk_state") + void getBulkStateCreatesSpan() { + when(delegate.getBulkState("my-store", List.of("k1", "k2"), String.class)) + .thenReturn(Mono.empty()); + + client.getBulkState("my-store", List.of("k1", "k2"), String.class).block(); + + TestObservationRegistryAssert.assertThat(registry) + .hasObservationWithNameEqualTo("dapr.client.get_bulk_state"); + } + + @Test + @DisplayName("executeStateTransaction creates span dapr.client.execute_state_transaction") + void executeStateTransactionCreatesSpan() { + when(delegate.executeStateTransaction("my-store", List.of())).thenReturn(Mono.empty()); + + client.executeStateTransaction("my-store", List.of()).block(); + + TestObservationRegistryAssert.assertThat(registry) + .hasObservationWithNameEqualTo("dapr.client.execute_state_transaction"); + } + + // ------------------------------------------------------------------------- + // Secrets + // ------------------------------------------------------------------------- + + @Test + @DisplayName("getSecret(storeName, secretName) creates span dapr.client.get_secret") + void getSecretCreatesSpan() { + when(delegate.getSecret("my-vault", "db-password")).thenReturn(Mono.empty()); + + client.getSecret("my-vault", "db-password").block(); + + TestObservationRegistryAssert.assertThat(registry) + .hasObservationWithNameEqualTo("dapr.client.get_secret") + .that() + .hasHighCardinalityKeyValue("dapr.secret.store", "my-vault") + .hasHighCardinalityKeyValue("dapr.secret.name", "db-password"); + } + + @Test + @DisplayName("getBulkSecret creates span dapr.client.get_bulk_secret") + void getBulkSecretCreatesSpan() { + when(delegate.getBulkSecret("my-vault")).thenReturn(Mono.empty()); + + client.getBulkSecret("my-vault").block(); + + TestObservationRegistryAssert.assertThat(registry) + .hasObservationWithNameEqualTo("dapr.client.get_bulk_secret"); + } + + // ------------------------------------------------------------------------- + // Configuration + // ------------------------------------------------------------------------- + + @Test + @DisplayName("getConfiguration creates span dapr.client.get_configuration") + void getConfigurationCreatesSpan() { + when(delegate.getConfiguration("my-config-store", "feature-flag")).thenReturn(Mono.empty()); + + client.getConfiguration("my-config-store", "feature-flag").block(); + + TestObservationRegistryAssert.assertThat(registry) + .hasObservationWithNameEqualTo("dapr.client.get_configuration") + .that() + .hasHighCardinalityKeyValue("dapr.configuration.store", "my-config-store"); + } + + @Test + @DisplayName("subscribeConfiguration creates span dapr.client.subscribe_configuration") + void subscribeConfigurationCreatesSpan() { + when(delegate.subscribeConfiguration("my-store", "k1")).thenReturn(Flux.empty()); + + client.subscribeConfiguration("my-store", "k1").blockLast(); + + TestObservationRegistryAssert.assertThat(registry) + .hasObservationWithNameEqualTo("dapr.client.subscribe_configuration"); + } + + @Test + @DisplayName("unsubscribeConfiguration creates span dapr.client.unsubscribe_configuration") + void unsubscribeConfigurationCreatesSpan() { + when(delegate.unsubscribeConfiguration("sub-id", "my-store")).thenReturn(Mono.empty()); + + client.unsubscribeConfiguration("sub-id", "my-store").block(); + + TestObservationRegistryAssert.assertThat(registry) + .hasObservationWithNameEqualTo("dapr.client.unsubscribe_configuration"); + } + + // ------------------------------------------------------------------------- + // Metadata & Lifecycle + // ------------------------------------------------------------------------- + + @Test + @DisplayName("getMetadata creates span dapr.client.get_metadata") + void getMetadataCreatesSpan() { + when(delegate.getMetadata()).thenReturn(Mono.empty()); + + client.getMetadata().block(); + + TestObservationRegistryAssert.assertThat(registry) + .hasObservationWithNameEqualTo("dapr.client.get_metadata"); + } + + @Test + @DisplayName("waitForSidecar creates span dapr.client.wait_for_sidecar") + void waitForSidecarCreatesSpan() { + when(delegate.waitForSidecar(5000)).thenReturn(Mono.empty()); + + client.waitForSidecar(5000).block(); + + TestObservationRegistryAssert.assertThat(registry) + .hasObservationWithNameEqualTo("dapr.client.wait_for_sidecar"); + } + + @Test + @DisplayName("shutdown creates span dapr.client.shutdown") + void shutdownCreatesSpan() { + when(delegate.shutdown()).thenReturn(Mono.empty()); + + client.shutdown().block(); + + TestObservationRegistryAssert.assertThat(registry) + .hasObservationWithNameEqualTo("dapr.client.shutdown"); + } + + // ------------------------------------------------------------------------- + // Jobs + // ------------------------------------------------------------------------- + + @Test + @DisplayName("scheduleJob creates span dapr.client.schedule_job") + void scheduleJobCreatesSpan() { + ScheduleJobRequest request = new ScheduleJobRequest("nightly-cleanup", + io.dapr.client.domain.JobSchedule.daily()); + when(delegate.scheduleJob(request)).thenReturn(Mono.empty()); + + client.scheduleJob(request).block(); + + TestObservationRegistryAssert.assertThat(registry) + .hasObservationWithNameEqualTo("dapr.client.schedule_job") + .that() + .hasHighCardinalityKeyValue("dapr.job.name", "nightly-cleanup"); + } + + // ------------------------------------------------------------------------- + // Deprecated methods — must NOT create spans + // ------------------------------------------------------------------------- + + @Test + @DisplayName("Deprecated invokeMethod delegates without creating a span") + @SuppressWarnings("deprecation") + void deprecatedInvokeMethodDoesNotCreateSpan() { + when(delegate.invokeMethod(anyString(), anyString(), nullable(Object.class), + any(io.dapr.client.domain.HttpExtension.class), any(Class.class))) + .thenReturn(Mono.empty()); + + client.invokeMethod("app", "method", (Object) null, + io.dapr.client.domain.HttpExtension.NONE, String.class).block(); + + // Registry must be empty — no spans for deprecated methods + TestObservationRegistryAssert.assertThat(registry).doesNotHaveAnyObservation(); + } +} diff --git a/dapr-spring/dapr-spring-boot-autoconfigure/src/test/java/io/dapr/spring/boot/autoconfigure/client/ObservationDaprWorkflowClientTest.java b/dapr-spring/dapr-spring-boot-autoconfigure/src/test/java/io/dapr/spring/boot/autoconfigure/client/ObservationDaprWorkflowClientTest.java new file mode 100644 index 0000000000..f60dd2cf5a --- /dev/null +++ b/dapr-spring/dapr-spring-boot-autoconfigure/src/test/java/io/dapr/spring/boot/autoconfigure/client/ObservationDaprWorkflowClientTest.java @@ -0,0 +1,232 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.spring.boot.autoconfigure.client; + +import io.dapr.config.Properties; +import io.dapr.workflows.client.DaprWorkflowClient; +import io.micrometer.observation.tck.TestObservationRegistry; +import io.micrometer.observation.tck.TestObservationRegistryAssert; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Unit tests for {@link ObservationDaprWorkflowClient}. + * + *

The gRPC {@code ManagedChannel} created in the parent constructor connects lazily, so + * instantiation succeeds without a running Dapr sidecar. Actual RPC calls will fail, but the + * observation lifecycle (start → error → stop) is still validated. + * + *

Verifies two key requirements: + *

    + *
  1. Each non-deprecated method creates a correctly named Micrometer Observation span + * (even when the underlying call fails due to no sidecar being available).
  2. + *
  3. The wrapper extends {@link DaprWorkflowClient}, so consumers keep injecting + * {@code DaprWorkflowClient} without any code changes.
  4. + *
+ */ +class ObservationDaprWorkflowClientTest { + + private TestObservationRegistry registry; + private ObservationDaprWorkflowClient client; + + @BeforeEach + void setUp() { + registry = TestObservationRegistry.create(); + // Properties with no gRPC endpoint — channel created lazily, no sidecar needed + client = new ObservationDaprWorkflowClient(new Properties(), registry); + } + + // ------------------------------------------------------------------------- + // Transparency — the wrapper IS-A DaprWorkflowClient + // ------------------------------------------------------------------------- + + @Test + @DisplayName("ObservationDaprWorkflowClient is assignable to DaprWorkflowClient (transparent)") + void isAssignableToDaprWorkflowClient() { + assertThat(client).isInstanceOf(DaprWorkflowClient.class); + } + + // ------------------------------------------------------------------------- + // scheduleNewWorkflow — span is created even when the call fails (no sidecar) + // ------------------------------------------------------------------------- + + @Test + @DisplayName("scheduleNewWorkflow(String) creates span dapr.workflow.schedule") + void scheduleNewWorkflowByNameCreatesSpan() { + // The gRPC call will fail — but the span must be recorded with an error + assertThatThrownBy(() -> client.scheduleNewWorkflow("MyWorkflow")) + .isInstanceOf(RuntimeException.class); + + TestObservationRegistryAssert.assertThat(registry) + .hasObservationWithNameEqualTo("dapr.workflow.schedule") + .that() + .hasHighCardinalityKeyValue("dapr.workflow.name", "MyWorkflow") + .hasError(); + } + + @Test + @DisplayName("scheduleNewWorkflow(Class) delegates to String overload — span is still created") + void scheduleNewWorkflowByClassDelegatesToStringOverload() { + // The Class-based overload in the parent calls this.scheduleNewWorkflow(canonicalName), + // which resolves to our overridden String-based method — the span is created naturally. + assertThatThrownBy(() -> client.scheduleNewWorkflow(DummyWorkflow.class)) + .isInstanceOf(RuntimeException.class); + + TestObservationRegistryAssert.assertThat(registry) + .hasObservationWithNameEqualTo("dapr.workflow.schedule") + .that() + .hasHighCardinalityKeyValue("dapr.workflow.name", DummyWorkflow.class.getCanonicalName()) + .hasError(); + } + + @Test + @DisplayName("scheduleNewWorkflow(String, Object, String) includes instance_id in span") + void scheduleNewWorkflowWithInstanceIdCreatesSpan() { + assertThatThrownBy(() -> client.scheduleNewWorkflow("MyWorkflow", null, "my-instance-123")) + .isInstanceOf(RuntimeException.class); + + TestObservationRegistryAssert.assertThat(registry) + .hasObservationWithNameEqualTo("dapr.workflow.schedule") + .that() + .hasHighCardinalityKeyValue("dapr.workflow.instance_id", "my-instance-123") + .hasError(); + } + + // ------------------------------------------------------------------------- + // Lifecycle operations + // ------------------------------------------------------------------------- + + @Test + @DisplayName("suspendWorkflow creates span dapr.workflow.suspend") + void suspendWorkflowCreatesSpan() { + assertThatThrownBy(() -> client.suspendWorkflow("instance-1", "pausing")) + .isInstanceOf(RuntimeException.class); + + TestObservationRegistryAssert.assertThat(registry) + .hasObservationWithNameEqualTo("dapr.workflow.suspend") + .that() + .hasHighCardinalityKeyValue("dapr.workflow.instance_id", "instance-1") + .hasError(); + } + + @Test + @DisplayName("resumeWorkflow creates span dapr.workflow.resume") + void resumeWorkflowCreatesSpan() { + assertThatThrownBy(() -> client.resumeWorkflow("instance-1", "resuming")) + .isInstanceOf(RuntimeException.class); + + TestObservationRegistryAssert.assertThat(registry) + .hasObservationWithNameEqualTo("dapr.workflow.resume") + .that() + .hasHighCardinalityKeyValue("dapr.workflow.instance_id", "instance-1") + .hasError(); + } + + @Test + @DisplayName("terminateWorkflow creates span dapr.workflow.terminate") + void terminateWorkflowCreatesSpan() { + assertThatThrownBy(() -> client.terminateWorkflow("instance-1", null)) + .isInstanceOf(RuntimeException.class); + + TestObservationRegistryAssert.assertThat(registry) + .hasObservationWithNameEqualTo("dapr.workflow.terminate") + .that() + .hasHighCardinalityKeyValue("dapr.workflow.instance_id", "instance-1") + .hasError(); + } + + // ------------------------------------------------------------------------- + // State queries + // ------------------------------------------------------------------------- + + @Test + @DisplayName("getWorkflowState creates span dapr.workflow.get_state") + void getWorkflowStateCreatesSpan() { + assertThatThrownBy(() -> client.getWorkflowState("instance-1", false)) + .isInstanceOf(RuntimeException.class); + + TestObservationRegistryAssert.assertThat(registry) + .hasObservationWithNameEqualTo("dapr.workflow.get_state") + .that() + .hasHighCardinalityKeyValue("dapr.workflow.instance_id", "instance-1") + .hasError(); + } + + // ------------------------------------------------------------------------- + // Events + // ------------------------------------------------------------------------- + + @Test + @DisplayName("raiseEvent creates span dapr.workflow.raise_event") + void raiseEventCreatesSpan() { + assertThatThrownBy(() -> client.raiseEvent("instance-1", "OrderPlaced", "payload")) + .isInstanceOf(RuntimeException.class); + + TestObservationRegistryAssert.assertThat(registry) + .hasObservationWithNameEqualTo("dapr.workflow.raise_event") + .that() + .hasHighCardinalityKeyValue("dapr.workflow.instance_id", "instance-1") + .hasHighCardinalityKeyValue("dapr.workflow.event_name", "OrderPlaced") + .hasError(); + } + + // ------------------------------------------------------------------------- + // Cleanup + // ------------------------------------------------------------------------- + + @Test + @DisplayName("purgeWorkflow creates span dapr.workflow.purge") + void purgeWorkflowCreatesSpan() { + assertThatThrownBy(() -> client.purgeWorkflow("instance-1")) + .isInstanceOf(RuntimeException.class); + + TestObservationRegistryAssert.assertThat(registry) + .hasObservationWithNameEqualTo("dapr.workflow.purge") + .that() + .hasHighCardinalityKeyValue("dapr.workflow.instance_id", "instance-1") + .hasError(); + } + + // ------------------------------------------------------------------------- + // Deprecated methods — must NOT create spans + // ------------------------------------------------------------------------- + + @Test + @DisplayName("Deprecated getInstanceState falls through to parent without creating a span") + @SuppressWarnings("deprecation") + void deprecatedGetInstanceStateDoesNotCreateSpan() { + // This will fail (no sidecar) but must not leave any observations in the registry + assertThatThrownBy(() -> client.getInstanceState("instance-1", false)) + .isInstanceOf(RuntimeException.class); + + // No spans should have been created for deprecated methods + TestObservationRegistryAssert.assertThat(registry).doesNotHaveAnyObservation(); + } + + // ------------------------------------------------------------------------- + // Dummy workflow implementation for type-based tests + // ------------------------------------------------------------------------- + + static class DummyWorkflow implements io.dapr.workflows.Workflow { + @Override + public io.dapr.workflows.WorkflowStub create() { + return ctx -> { + }; + } + } +}