diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/README.md b/core/src/main/java/io/temporal/samples/nexus_messaging/README.md new file mode 100644 index 00000000..066bc213 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/README.md @@ -0,0 +1,138 @@ +This sample shows how to expose a long-running workflow's queries, updates, and signals as Nexus +operations. The caller interacts only with the Nexus service; the workflow is a private +implementation detail. + +There are **two caller patterns** that share the same handler workflow (`GreetingWorkflow`): + +| | `caller/` (entity pattern) | `caller_remote/` (remote-start pattern) | +|---|---|---| +| **Who creates the workflow?** | The handler worker starts it on boot | The caller starts it via a `runFromRemote` Nexus operation | +| **Who knows the workflow ID?** | Only the handler | The caller chooses and passes it in every operation | +| **Nexus service** | `NexusGreetingService` — inputs carry only business data | `NexusRemoteGreetingService` — every input includes a `workflowId` | +| **When to use** | Single shared entity; callers don't need lifecycle control | Caller needs to create and target specific workflow instances | + +### Directory structure + +- `service/` — shared Nexus service definitions (`NexusGreetingService`, `NexusRemoteGreetingService`) and `Language` enum +- `handler/` — `GreetingWorkflow` and its implementation, `GreetingActivity`, both Nexus service impls (`NexusGreetingServiceImpl`, `NexusRemoteGreetingServiceImpl`), and the handler worker +- `caller/` — entity-pattern caller (workflow, worker, starter) +- `caller_remote/` — remote-start caller (workflow, worker, starter) + +### Running + +Start a Temporal server: + +```bash +temporal server start-dev +``` +Create the namespaces and Nexus endpoint: + +```bash +temporal operator namespace create --namespace nexus-messaging-handler-namespace +temporal operator namespace create --namespace nexus-messaging-caller-namespace + +temporal operator nexus endpoint create \ + --name nexus-messaging-nexus-endpoint \ + --target-namespace nexus-messaging-handler-namespace \ + --target-task-queue nexus-messaging-handler-task-queue +``` + +In one terminal, start the handler worker (shared by both patterns): + +```bash +./gradlew -q :core:execute -PmainClass=io.temporal.samples.nexus_messaging.handler.HandlerWorker +``` + +#### Entity pattern + +In the second terminal, run the caller worker: + +```bash +./gradlew -q :core:execute -PmainClass=io.temporal.samples.nexus_messaging.caller.CallerWorker +``` + +In the third terminal, start the caller workflow: + +```bash +./gradlew -q :core:execute -PmainClass=io.temporal.samples.nexus_messaging.caller.CallerStarter +``` + +Expected output: + +``` +supported languages: [CHINESE, ENGLISH] +language changed: ENGLISH -> ARABIC +workflow approved +``` + +#### Remote-start pattern + +In a second terminal, run the remote caller worker: + +```bash +./gradlew -q :core:execute -PmainClass=io.temporal.samples.nexus_messaging.caller_remote.CallerRemoteWorker +``` + +In a third terminal, start the remote caller workflow: + +```bash +./gradlew -q :core:execute -PmainClass=io.temporal.samples.nexus_messaging.caller_remote.CallerRemoteStarter +``` + +Expected output: + +``` +started remote greeting workflow: nexus-messaging-remote-greeting-workflow +supported languages: [CHINESE, ENGLISH] +language changed: ENGLISH -> ARABIC +workflow approved +workflow result: مرحبا بالعالم +``` + +### How it works + +#### The handler (shared by both patterns) + +`GreetingWorkflow` is a long-running "entity" workflow that holds the current language and a map of +greetings. It exposes its state through standard Temporal primitives: + +- `getLanguages` / `getLanguage` — `@QueryMethod`s for reading state +- `setLanguage` — an `@UpdateMethod` for switching between already-loaded languages +- `setLanguageUsingActivity` — an `@UpdateMethod` that calls an activity to fetch a greeting for + a language not yet in the map (uses `WorkflowLock` to serialize concurrent activity calls) +- `approve` — a `@SignalMethod` that lets the workflow complete + +The workflow waits until approved and all in-flight update handlers have finished, then returns the +greeting in the current language. + +Both Nexus service implementations translate incoming Nexus operations into calls against +`GreetingWorkflow` stubs — queries, updates, and signals. The caller never interacts with the +workflow directly. + +#### Entity pattern (`caller/` + `NexusGreetingService`) + +The handler worker starts a single `GreetingWorkflow` on boot with a fixed workflow ID. +`NexusGreetingServiceImpl` holds that workflow ID in its constructor and routes every operation to +it. The caller's inputs contain only business data (language, name), not workflow IDs. + +`CallerWorkflowImpl` creates a `NexusGreetingService` stub and: +1. Queries for supported languages (`getLanguages` — backed by a `@QueryMethod`) +2. Changes the language to Arabic (`setLanguage` — backed by an `@UpdateMethod` that calls an activity) +3. Confirms the change via a second query (`getLanguage`) +4. Approves the workflow (`approve` — backed by a `@SignalMethod`) + +#### Remote-start pattern (`caller_remote/` + `NexusRemoteGreetingService`) + +No workflow is pre-started. Instead, `NexusRemoteGreetingService` adds a `runFromRemote` operation +that starts a new `GreetingWorkflow` with a caller-chosen workflow ID using +`WorkflowRunOperation`. Every other operation also includes the `workflowId` in its input so that +`NexusRemoteGreetingServiceImpl` can look up the right workflow stub. + +`CallerRemoteWorkflowImpl` creates a `NexusRemoteGreetingService` stub and: +1. Starts a remote `GreetingWorkflow` via `runFromRemote` and waits for it to be running +2. Queries, updates, and approves that workflow — same operations as the entity pattern, but each + input carries the workflow ID +3. Waits for the remote workflow to complete and returns its result (the greeting string) + +This pattern is useful when the caller needs to control the lifecycle of individual workflow +instances rather than sharing a single entity. diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/caller/CallerStarter.java b/core/src/main/java/io/temporal/samples/nexus_messaging/caller/CallerStarter.java new file mode 100644 index 00000000..95a3d025 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/caller/CallerStarter.java @@ -0,0 +1,30 @@ +package io.temporal.samples.nexus_messaging.caller; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.client.WorkflowOptions; +import io.temporal.serviceclient.WorkflowServiceStubs; +import java.util.List; +import java.util.UUID; + +public class CallerStarter { + + public static void main(String[] args) { + WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); + WorkflowClient client = + WorkflowClient.newInstance( + service, + WorkflowClientOptions.newBuilder().setNamespace(CallerWorker.NAMESPACE).build()); + + CallerWorkflow workflow = + client.newWorkflowStub( + CallerWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId("nexus-messaging-caller-" + UUID.randomUUID()) + .setTaskQueue(CallerWorker.TASK_QUEUE) + .build()); + + List log = workflow.run(); + log.forEach(System.out::println); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/caller/CallerWorker.java b/core/src/main/java/io/temporal/samples/nexus_messaging/caller/CallerWorker.java new file mode 100644 index 00000000..8194503b --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/caller/CallerWorker.java @@ -0,0 +1,43 @@ +package io.temporal.samples.nexus_messaging.caller; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; +import io.temporal.worker.WorkflowImplementationOptions; +import io.temporal.workflow.NexusServiceOptions; +import java.util.Collections; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CallerWorker { + private static final Logger logger = LoggerFactory.getLogger(CallerWorker.class); + + public static final String NAMESPACE = "nexus-messaging-caller-namespace"; + public static final String TASK_QUEUE = "nexus-messaging-caller-task-queue"; + static final String NEXUS_ENDPOINT = "nexus-messaging-nexus-endpoint"; + + public static void main(String[] args) throws InterruptedException { + WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); + WorkflowClient client = + WorkflowClient.newInstance( + service, WorkflowClientOptions.newBuilder().setNamespace(NAMESPACE).build()); + + WorkerFactory factory = WorkerFactory.newInstance(client); + Worker worker = factory.newWorker(TASK_QUEUE); + worker.registerWorkflowImplementationTypes( + WorkflowImplementationOptions.newBuilder() + .setNexusServiceOptions( + // The key must match the @Service-annotated interface name. + Collections.singletonMap( + "NexusGreetingService", + NexusServiceOptions.newBuilder().setEndpoint(NEXUS_ENDPOINT).build())) + .build(), + CallerWorkflowImpl.class); + + factory.start(); + logger.info("Caller worker started, ctrl+c to exit"); + Thread.currentThread().join(); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/caller/CallerWorkflow.java b/core/src/main/java/io/temporal/samples/nexus_messaging/caller/CallerWorkflow.java new file mode 100644 index 00000000..6c830d7d --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/caller/CallerWorkflow.java @@ -0,0 +1,11 @@ +package io.temporal.samples.nexus_messaging.caller; + +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import java.util.List; + +@WorkflowInterface +public interface CallerWorkflow { + @WorkflowMethod + List run(); +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/caller/CallerWorkflowImpl.java b/core/src/main/java/io/temporal/samples/nexus_messaging/caller/CallerWorkflowImpl.java new file mode 100644 index 00000000..ed07f99e --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/caller/CallerWorkflowImpl.java @@ -0,0 +1,62 @@ +package io.temporal.samples.nexus_messaging.caller; + +import io.temporal.failure.ApplicationFailure; +import io.temporal.samples.nexus_messaging.caller_remote.CallerRemoteWorkflowImpl; +import io.temporal.samples.nexus_messaging.service.Language; +import io.temporal.samples.nexus_messaging.service.NexusGreetingService; +import io.temporal.workflow.NexusOperationOptions; +import io.temporal.workflow.NexusServiceOptions; +import io.temporal.workflow.Workflow; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CallerWorkflowImpl implements CallerWorkflow { + + private static final Logger logger = LoggerFactory.getLogger(CallerRemoteWorkflowImpl.class); + + // The endpoint is configured at the worker level in CallerWorker; only operation options are + // set here. + NexusGreetingService greetingService = + Workflow.newNexusServiceStub( + NexusGreetingService.class, + NexusServiceOptions.newBuilder() + .setOperationOptions( + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(10)) + .build()) + .build()); + + @Override + public List run() { + List log = new ArrayList<>(); + + // 👉 Call a Nexus operation backed by a query against the entity workflow. + NexusGreetingService.GetLanguagesOutput languagesOutput = + greetingService.getLanguages(new NexusGreetingService.GetLanguagesInput(false)); + log.add("supported languages: " + languagesOutput.getLanguages()); + + // 👉 Call a Nexus operation backed by an update against the entity workflow. + Language previousLanguage = + greetingService.setLanguage(new NexusGreetingService.SetLanguageInput(Language.ARABIC)); + logger.info("Language changed from {} to {}", previousLanguage, Language.ARABIC); + + // 👉 Call a Nexus operation backed by a query to confirm the language change. + Language currentLanguage = + greetingService.getLanguage(new NexusGreetingService.GetLanguageInput()); + if (currentLanguage != Language.ARABIC) { + throw ApplicationFailure.newFailure( + "expected language ARABIC, got " + currentLanguage, "AssertionError"); + } + + log.add("language changed: " + previousLanguage.name() + " -> " + Language.ARABIC.name()); + + // 👉 Call a Nexus operation backed by a signal against the entity workflow. + greetingService.approve(new NexusGreetingService.ApproveInput("caller")); + log.add("workflow approved"); + + return log; + } +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/caller_remote/CallerRemoteStarter.java b/core/src/main/java/io/temporal/samples/nexus_messaging/caller_remote/CallerRemoteStarter.java new file mode 100644 index 00000000..ad20ab2a --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/caller_remote/CallerRemoteStarter.java @@ -0,0 +1,30 @@ +package io.temporal.samples.nexus_messaging.caller_remote; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.client.WorkflowOptions; +import io.temporal.serviceclient.WorkflowServiceStubs; +import java.util.List; +import java.util.UUID; + +public class CallerRemoteStarter { + + public static void main(String[] args) { + WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); + WorkflowClient client = + WorkflowClient.newInstance( + service, + WorkflowClientOptions.newBuilder().setNamespace(CallerRemoteWorker.NAMESPACE).build()); + + CallerRemoteWorkflow workflow = + client.newWorkflowStub( + CallerRemoteWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId("nexus-messaging-remote-caller-" + UUID.randomUUID()) + .setTaskQueue(CallerRemoteWorker.TASK_QUEUE) + .build()); + + List log = workflow.run(); + log.forEach(System.out::println); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/caller_remote/CallerRemoteWorker.java b/core/src/main/java/io/temporal/samples/nexus_messaging/caller_remote/CallerRemoteWorker.java new file mode 100644 index 00000000..4aa850cc --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/caller_remote/CallerRemoteWorker.java @@ -0,0 +1,43 @@ +package io.temporal.samples.nexus_messaging.caller_remote; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; +import io.temporal.worker.WorkflowImplementationOptions; +import io.temporal.workflow.NexusServiceOptions; +import java.util.Collections; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CallerRemoteWorker { + private static final Logger logger = LoggerFactory.getLogger(CallerRemoteWorker.class); + + public static final String NAMESPACE = "nexus-messaging-caller-namespace"; + public static final String TASK_QUEUE = "nexus-messaging-caller-remote-task-queue"; + static final String NEXUS_ENDPOINT = "nexus-messaging-nexus-endpoint"; + + public static void main(String[] args) throws InterruptedException { + WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); + WorkflowClient client = + WorkflowClient.newInstance( + service, WorkflowClientOptions.newBuilder().setNamespace(NAMESPACE).build()); + + WorkerFactory factory = WorkerFactory.newInstance(client); + Worker worker = factory.newWorker(TASK_QUEUE); + worker.registerWorkflowImplementationTypes( + WorkflowImplementationOptions.newBuilder() + .setNexusServiceOptions( + // The key must match the @Service-annotated interface name. + Collections.singletonMap( + "NexusRemoteGreetingService", + NexusServiceOptions.newBuilder().setEndpoint(NEXUS_ENDPOINT).build())) + .build(), + CallerRemoteWorkflowImpl.class); + + factory.start(); + logger.info("Caller remote worker started, ctrl+c to exit"); + Thread.currentThread().join(); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/caller_remote/CallerRemoteWorkflow.java b/core/src/main/java/io/temporal/samples/nexus_messaging/caller_remote/CallerRemoteWorkflow.java new file mode 100644 index 00000000..bd72d4ac --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/caller_remote/CallerRemoteWorkflow.java @@ -0,0 +1,11 @@ +package io.temporal.samples.nexus_messaging.caller_remote; + +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import java.util.List; + +@WorkflowInterface +public interface CallerRemoteWorkflow { + @WorkflowMethod + List run(); +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/caller_remote/CallerRemoteWorkflowImpl.java b/core/src/main/java/io/temporal/samples/nexus_messaging/caller_remote/CallerRemoteWorkflowImpl.java new file mode 100644 index 00000000..16ee4d9c --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/caller_remote/CallerRemoteWorkflowImpl.java @@ -0,0 +1,78 @@ +package io.temporal.samples.nexus_messaging.caller_remote; + +import io.temporal.samples.nexus_messaging.service.Language; +import io.temporal.samples.nexus_messaging.service.NexusGreetingService; +import io.temporal.samples.nexus_messaging.service.NexusRemoteGreetingService; +import io.temporal.workflow.NexusOperationHandle; +import io.temporal.workflow.NexusOperationOptions; +import io.temporal.workflow.NexusServiceOptions; +import io.temporal.workflow.Workflow; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CallerRemoteWorkflowImpl implements CallerRemoteWorkflow { + + private static final Logger logger = LoggerFactory.getLogger(CallerRemoteWorkflowImpl.class); + + private static final String REMOTE_WORKFLOW_ID = "nexus-messaging-remote-greeting-workflow"; + + NexusRemoteGreetingService greetingRemoteService = + Workflow.newNexusServiceStub( + NexusRemoteGreetingService.class, + NexusServiceOptions.newBuilder() + .setOperationOptions( + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(10)) + .build()) + .build()); + + @Override + public List run() { + List log = new ArrayList<>(); + + // 👉 Async Nexus operation — starts a workflow on the handler and returns a handle. + // Unlike the sync operations below (getLanguages, setLanguage, etc.), this does not block + // until the workflow completes. It is backed by WorkflowRunOperation on the handler side. + NexusOperationHandle handle = + Workflow.startNexusOperation( + greetingRemoteService::runFromRemote, + new NexusRemoteGreetingService.RunFromRemoteInput(REMOTE_WORKFLOW_ID)); + // Wait for the operation to be started (workflow is now running on the handler). + handle.getExecution().get(); + log.add("started remote greeting workflow: " + REMOTE_WORKFLOW_ID); + + // Query the remote workflow for supported languages. + // Output types (e.g. GetLanguagesOutput) are defined on NexusGreetingService and shared by + // both service interfaces. + NexusGreetingService.GetLanguagesOutput languagesOutput = + greetingRemoteService.getLanguages( + new NexusRemoteGreetingService.GetLanguagesInput(false, REMOTE_WORKFLOW_ID)); + log.add("supported languages: " + languagesOutput.getLanguages()); + + // Update the language on the remote workflow. + Language previousLanguage = + greetingRemoteService.setLanguage( + new NexusRemoteGreetingService.SetLanguageInput(Language.ARABIC, REMOTE_WORKFLOW_ID)); + logger.info("Language changed from {} to {}", previousLanguage, Language.ARABIC); + + // Confirm the change by querying. + Language currentLanguage = + greetingRemoteService.getLanguage( + new NexusRemoteGreetingService.GetLanguageInput(REMOTE_WORKFLOW_ID)); + log.add("language changed: " + previousLanguage.name() + " -> " + currentLanguage.name()); + + // Approve the remote workflow so it can complete. + greetingRemoteService.approve( + new NexusRemoteGreetingService.ApproveInput("remote-caller", REMOTE_WORKFLOW_ID)); + log.add("workflow approved"); + + // Wait for the remote workflow to finish and return its result. + String result = handle.getResult().get(); + log.add("workflow result: " + result); + + return log; + } +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/handler/GreetingActivity.java b/core/src/main/java/io/temporal/samples/nexus_messaging/handler/GreetingActivity.java new file mode 100644 index 00000000..83f97d1f --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/handler/GreetingActivity.java @@ -0,0 +1,12 @@ +package io.temporal.samples.nexus_messaging.handler; + +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; +import io.temporal.samples.nexus_messaging.service.Language; + +@ActivityInterface +public interface GreetingActivity { + // Simulates a call to a remote greeting service. Returns null if the language is not supported. + @ActivityMethod + String callGreetingService(Language language); +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/handler/GreetingActivityImpl.java b/core/src/main/java/io/temporal/samples/nexus_messaging/handler/GreetingActivityImpl.java new file mode 100644 index 00000000..e7927eba --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/handler/GreetingActivityImpl.java @@ -0,0 +1,25 @@ +package io.temporal.samples.nexus_messaging.handler; + +import io.temporal.samples.nexus_messaging.service.Language; +import java.util.EnumMap; +import java.util.Map; + +public class GreetingActivityImpl implements GreetingActivity { + + private static final Map GREETINGS = new EnumMap<>(Language.class); + + static { + GREETINGS.put(Language.ARABIC, "مرحبا بالعالم"); + GREETINGS.put(Language.CHINESE, "你好,世界"); + GREETINGS.put(Language.ENGLISH, "Hello, world"); + GREETINGS.put(Language.FRENCH, "Bonjour, monde"); + GREETINGS.put(Language.HINDI, "नमस्ते दुनिया"); + GREETINGS.put(Language.PORTUGUESE, "Olá mundo"); + GREETINGS.put(Language.SPANISH, "Hola mundo"); + } + + @Override + public String callGreetingService(Language language) { + return GREETINGS.get(language); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/handler/GreetingWorkflow.java b/core/src/main/java/io/temporal/samples/nexus_messaging/handler/GreetingWorkflow.java new file mode 100644 index 00000000..786309f3 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/handler/GreetingWorkflow.java @@ -0,0 +1,63 @@ +package io.temporal.samples.nexus_messaging.handler; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.temporal.samples.nexus_messaging.service.Language; +import io.temporal.samples.nexus_messaging.service.NexusGreetingService; +import io.temporal.workflow.QueryMethod; +import io.temporal.workflow.SignalMethod; +import io.temporal.workflow.UpdateMethod; +import io.temporal.workflow.UpdateValidatorMethod; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; + +/** + * A long-running "entity" workflow that backs the NexusGreetingService Nexus operations. The + * workflow exposes queries, an update, and a signal. These are private implementation details of + * the Nexus service: the caller only interacts via Nexus operations. + */ +@WorkflowInterface +public interface GreetingWorkflow { + + class ApproveInput { + private final String name; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public ApproveInput(@JsonProperty("name") String name) { + this.name = name; + } + + @JsonProperty("name") + public String getName() { + return name; + } + } + + @WorkflowMethod + String run(); + + // Returns the languages currently supported by the workflow. + @QueryMethod + NexusGreetingService.GetLanguagesOutput getLanguages( + NexusGreetingService.GetLanguagesInput input); + + // Returns the currently active language. + @QueryMethod + Language getLanguage(); + + // Approves the workflow, allowing it to complete. + @SignalMethod + void approve(ApproveInput input); + + // Changes the active language synchronously (only supports languages already in the greetings + // map). + @UpdateMethod + Language setLanguage(NexusGreetingService.SetLanguageInput input); + + @UpdateValidatorMethod(updateName = "setLanguage") + void validateSetLanguage(NexusGreetingService.SetLanguageInput input); + + // Changes the active language, calling an activity to fetch a greeting for new languages. + @UpdateMethod + Language setLanguageUsingActivity(NexusGreetingService.SetLanguageInput input); +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/handler/GreetingWorkflowImpl.java b/core/src/main/java/io/temporal/samples/nexus_messaging/handler/GreetingWorkflowImpl.java new file mode 100644 index 00000000..c5ca5596 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/handler/GreetingWorkflowImpl.java @@ -0,0 +1,110 @@ +package io.temporal.samples.nexus_messaging.handler; + +import io.temporal.activity.ActivityOptions; +import io.temporal.failure.ApplicationFailure; +import io.temporal.samples.nexus_messaging.service.Language; +import io.temporal.samples.nexus_messaging.service.NexusGreetingService; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowLock; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GreetingWorkflowImpl implements GreetingWorkflow { + + private boolean approvedForRelease = false; + private final Map greetings = new EnumMap<>(Language.class); + private Language language = Language.ENGLISH; + + private static final Logger logger = LoggerFactory.getLogger(GreetingWorkflowImpl.class); + + // Used to serialize concurrent setLanguageUsingActivity calls so that only one activity runs at + // a time per update handler execution. + private final WorkflowLock lock = Workflow.newWorkflowLock(); + + private final GreetingActivity greetingActivity = + Workflow.newActivityStub( + GreetingActivity.class, + ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(10)).build()); + + public GreetingWorkflowImpl() { + greetings.put(Language.CHINESE, "你好,世界"); + greetings.put(Language.ENGLISH, "Hello, world"); + } + + @Override + public String run() { + // Wait until approved and all in-flight update handlers have finished. + Workflow.await(() -> approvedForRelease && Workflow.isEveryHandlerFinished()); + return greetings.get(language); + } + + @Override + public NexusGreetingService.GetLanguagesOutput getLanguages( + NexusGreetingService.GetLanguagesInput input) { + List result; + if (input.isIncludeUnsupported()) { + result = new ArrayList<>(Arrays.asList(Language.values())); + } else { + result = new ArrayList<>(greetings.keySet()); + } + Collections.sort(result); + return new NexusGreetingService.GetLanguagesOutput(result); + } + + @Override + public Language getLanguage() { + return language; + } + + @Override + public void approve(ApproveInput input) { + logger.info("Approval signal received"); + approvedForRelease = true; + } + + @Override + public Language setLanguage(NexusGreetingService.SetLanguageInput input) { + logger.info("setLanguage update received"); + Language previous = language; + language = input.getLanguage(); + return previous; + } + + @Override + public void validateSetLanguage(NexusGreetingService.SetLanguageInput input) { + logger.info("validateSetLanguage called"); + if (!greetings.containsKey(input.getLanguage())) { + throw new IllegalArgumentException(input.getLanguage().name() + " is not supported"); + } + } + + @Override + public Language setLanguageUsingActivity(NexusGreetingService.SetLanguageInput input) { + if (!greetings.containsKey(input.getLanguage())) { + // Use a lock so that if this handler is called concurrently, each call executes its activity + // only after the previous one has completed. This ensures updates are processed in order. + lock.lock(); + try { + String greeting = greetingActivity.callGreetingService(input.getLanguage()); + if (greeting == null) { + throw ApplicationFailure.newFailure( + "Greeting service does not support " + input.getLanguage().name(), + "UnsupportedLanguage"); + } + greetings.put(input.getLanguage(), greeting); + } finally { + lock.unlock(); + } + } + Language previous = language; + language = input.getLanguage(); + return previous; + } +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/handler/HandlerWorker.java b/core/src/main/java/io/temporal/samples/nexus_messaging/handler/HandlerWorker.java new file mode 100644 index 00000000..e23fff61 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/handler/HandlerWorker.java @@ -0,0 +1,52 @@ +package io.temporal.samples.nexus_messaging.handler; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.client.WorkflowExecutionAlreadyStarted; +import io.temporal.client.WorkflowOptions; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HandlerWorker { + private static final Logger logger = LoggerFactory.getLogger(HandlerWorker.class); + + public static final String NAMESPACE = "nexus-messaging-handler-namespace"; + public static final String TASK_QUEUE = "nexus-messaging-handler-task-queue"; + static final String WORKFLOW_ID = "nexus-messaging-greeting-workflow"; + + public static void main(String[] args) throws InterruptedException { + WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); + WorkflowClient client = + WorkflowClient.newInstance( + service, WorkflowClientOptions.newBuilder().setNamespace(NAMESPACE).build()); + + // Start the long-running entity workflow that backs the Nexus service, if not already running. + GreetingWorkflow greetingWorkflow = + client.newWorkflowStub( + GreetingWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(WORKFLOW_ID) + .setTaskQueue(TASK_QUEUE) + .build()); + try { + WorkflowClient.start(greetingWorkflow::run); + logger.info("Started greeting workflow: {}", WORKFLOW_ID); + } catch (WorkflowExecutionAlreadyStarted e) { + logger.info("Greeting workflow already running: {}", WORKFLOW_ID); + } + + WorkerFactory factory = WorkerFactory.newInstance(client); + Worker worker = factory.newWorker(TASK_QUEUE); + worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class); + worker.registerActivitiesImplementations(new GreetingActivityImpl()); + worker.registerNexusServiceImplementation(new NexusGreetingServiceImpl(WORKFLOW_ID)); + worker.registerNexusServiceImplementation(new NexusRemoteGreetingServiceImpl()); + + factory.start(); + logger.info("Handler worker started, ctrl+c to exit"); + Thread.currentThread().join(); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/handler/NexusGreetingServiceImpl.java b/core/src/main/java/io/temporal/samples/nexus_messaging/handler/NexusGreetingServiceImpl.java new file mode 100644 index 00000000..0926f6ce --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/handler/NexusGreetingServiceImpl.java @@ -0,0 +1,81 @@ +package io.temporal.samples.nexus_messaging.handler; + +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.nexus.Nexus; +import io.temporal.samples.nexus_messaging.service.Language; +import io.temporal.samples.nexus_messaging.service.NexusGreetingService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Nexus operation handler implementation. Each operation is backed by the long-running + * GreetingWorkflow entity. The operations are synchronous (sync_operation) because queries and + * updates against a running workflow complete quickly. + */ +@ServiceImpl(service = NexusGreetingService.class) +public class NexusGreetingServiceImpl { + + private static final Logger logger = LoggerFactory.getLogger(NexusGreetingServiceImpl.class); + + private final String workflowId; + + public NexusGreetingServiceImpl(String workflowId) { + this.workflowId = workflowId; + } + + private GreetingWorkflow getWorkflowStub() { + return Nexus.getOperationContext() + .getWorkflowClient() + .newWorkflowStub(GreetingWorkflow.class, workflowId); + } + + // 👉 Backed by a query against the long-running entity workflow. + @OperationImpl + public OperationHandler< + NexusGreetingService.GetLanguagesInput, NexusGreetingService.GetLanguagesOutput> + getLanguages() { + return OperationHandler.sync( + (ctx, details, input) -> { + logger.info("Query for GetLanguages was received"); + return getWorkflowStub().getLanguages(input); + }); + } + + // 👉 Backed by a query against the long-running entity workflow. + @OperationImpl + public OperationHandler getLanguage() { + return OperationHandler.sync( + (ctx, details, input) -> { + logger.info("Query for GetLanguage was received"); + return getWorkflowStub().getLanguage(); + }); + } + + // 👉 Backed by an update against the long-running entity workflow. Routes to + // setLanguageUsingActivity (not setLanguage) so that new languages not already in the greetings + // map can be fetched via an activity. Although updates can run for an arbitrarily long time, when + // exposed via a sync Nexus operation the update should complete quickly (sync operations must + // finish in under 10s). + @OperationImpl + public OperationHandler setLanguage() { + return OperationHandler.sync( + (ctx, details, input) -> { + logger.info("Update for SetLanguage was received"); + return getWorkflowStub().setLanguageUsingActivity(input); + }); + } + + // 👉 Backed by a signal against the long-running entity workflow. + @OperationImpl + public OperationHandler + approve() { + return OperationHandler.sync( + (ctx, details, input) -> { + logger.info("Signal for Approve was received"); + getWorkflowStub().approve(new GreetingWorkflow.ApproveInput(input.getName())); + return new NexusGreetingService.ApproveOutput(); + }); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/handler/NexusRemoteGreetingServiceImpl.java b/core/src/main/java/io/temporal/samples/nexus_messaging/handler/NexusRemoteGreetingServiceImpl.java new file mode 100644 index 00000000..42867f58 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/handler/NexusRemoteGreetingServiceImpl.java @@ -0,0 +1,104 @@ +package io.temporal.samples.nexus_messaging.handler; + +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.client.WorkflowOptions; +import io.temporal.nexus.Nexus; +import io.temporal.nexus.WorkflowHandle; +import io.temporal.nexus.WorkflowRunOperation; +import io.temporal.samples.nexus_messaging.service.Language; +import io.temporal.samples.nexus_messaging.service.NexusGreetingService; +import io.temporal.samples.nexus_messaging.service.NexusRemoteGreetingService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Nexus operation handler for the remote-start pattern. Unlike {@link NexusGreetingServiceImpl}, + * this implementation does not hold a fixed workflow ID. Instead, each operation receives the + * target workflow ID in its input, and {@code runFromRemote} starts a brand-new GreetingWorkflow. + */ +@ServiceImpl(service = NexusRemoteGreetingService.class) +public class NexusRemoteGreetingServiceImpl { + + private static final Logger logger = + LoggerFactory.getLogger(NexusRemoteGreetingServiceImpl.class); + + private GreetingWorkflow getWorkflowStub(String workflowId) { + return Nexus.getOperationContext() + .getWorkflowClient() + .newWorkflowStub(GreetingWorkflow.class, workflowId); + } + + // Starts a new GreetingWorkflow with the caller-specified workflow ID. This is an async + // Nexus operation backed by WorkflowRunOperation. + // + // fromWorkflowHandle (rather than fromWorkflowMethod) is used here because the Nexus operation + // input (RunFromRemoteInput) differs from the workflow method parameters — run() takes no args. + // The input is consumed to set the workflow ID on the stub; the workflow itself is invoked + // with no arguments via the ::run method reference. + @OperationImpl + public OperationHandler runFromRemote() { + return WorkflowRunOperation.fromWorkflowHandle( + (ctx, details, input) -> { + logger.info("RunFromRemote was received for workflow {}", input.getWorkflowId()); + return WorkflowHandle.fromWorkflowMethod( + Nexus.getOperationContext() + .getWorkflowClient() + .newWorkflowStub( + GreetingWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(input.getWorkflowId()) + .setTaskQueue(HandlerWorker.TASK_QUEUE) + .build()) + ::run); + }); + } + + @OperationImpl + public OperationHandler< + NexusRemoteGreetingService.GetLanguagesInput, NexusGreetingService.GetLanguagesOutput> + getLanguages() { + return OperationHandler.sync( + (ctx, details, input) -> { + logger.info("Query for GetLanguages was received for workflow {}", input.getWorkflowId()); + return getWorkflowStub(input.getWorkflowId()) + .getLanguages( + new NexusGreetingService.GetLanguagesInput(input.isIncludeUnsupported())); + }); + } + + @OperationImpl + public OperationHandler getLanguage() { + return OperationHandler.sync( + (ctx, details, input) -> { + logger.info("Query for GetLanguage was received for workflow {}", input.getWorkflowId()); + return getWorkflowStub(input.getWorkflowId()).getLanguage(); + }); + } + + // Uses setLanguageUsingActivity so that new languages are fetched via an activity. + @OperationImpl + public OperationHandler setLanguage() { + return OperationHandler.sync( + (ctx, details, input) -> { + logger.info("Update for SetLanguage was received for workflow {}", input.getWorkflowId()); + return getWorkflowStub(input.getWorkflowId()) + .setLanguageUsingActivity( + new NexusGreetingService.SetLanguageInput(input.getLanguage())); + }); + } + + @OperationImpl + public OperationHandler< + NexusRemoteGreetingService.ApproveInput, NexusGreetingService.ApproveOutput> + approve() { + return OperationHandler.sync( + (ctx, details, input) -> { + logger.info("Signal for Approve was received for workflow {}", input.getWorkflowId()); + getWorkflowStub(input.getWorkflowId()) + .approve(new GreetingWorkflow.ApproveInput(input.getName())); + return new NexusGreetingService.ApproveOutput(); + }); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/service/Language.java b/core/src/main/java/io/temporal/samples/nexus_messaging/service/Language.java new file mode 100644 index 00000000..3c81c7c5 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/service/Language.java @@ -0,0 +1,11 @@ +package io.temporal.samples.nexus_messaging.service; + +public enum Language { + ARABIC, + CHINESE, + ENGLISH, + FRENCH, + HINDI, + PORTUGUESE, + SPANISH +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/service/NexusGreetingService.java b/core/src/main/java/io/temporal/samples/nexus_messaging/service/NexusGreetingService.java new file mode 100644 index 00000000..841efb76 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/service/NexusGreetingService.java @@ -0,0 +1,100 @@ +package io.temporal.samples.nexus_messaging.service; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.nexusrpc.Operation; +import io.nexusrpc.Service; +import java.util.List; + +/** + * Nexus service definition. Shared between the handler and caller. The caller uses this to create a + * type-safe Nexus client stub; the handler implements the operations. + */ +@Service +public interface NexusGreetingService { + + class GetLanguagesInput { + private final boolean includeUnsupported; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public GetLanguagesInput(@JsonProperty("includeUnsupported") boolean includeUnsupported) { + this.includeUnsupported = includeUnsupported; + } + + @JsonProperty("includeUnsupported") + public boolean isIncludeUnsupported() { + return includeUnsupported; + } + } + + class GetLanguagesOutput { + private final List languages; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public GetLanguagesOutput(@JsonProperty("languages") List languages) { + this.languages = languages; + } + + @JsonProperty("languages") + public List getLanguages() { + return languages; + } + } + + @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY) + class GetLanguageInput { + @JsonCreator + public GetLanguageInput() {} + } + + class ApproveInput { + private final String name; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public ApproveInput(@JsonProperty("name") String name) { + this.name = name; + } + + @JsonProperty("name") + public String getName() { + return name; + } + } + + @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY) + class ApproveOutput { + @JsonCreator + public ApproveOutput() {} + } + + class SetLanguageInput { + private final Language language; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public SetLanguageInput(@JsonProperty("language") Language language) { + this.language = language; + } + + @JsonProperty("language") + public Language getLanguage() { + return language; + } + } + + // Returns the languages supported by the greeting workflow. + @Operation + GetLanguagesOutput getLanguages(GetLanguagesInput input); + + // Returns the currently active language. + @Operation + Language getLanguage(GetLanguageInput input); + + // Changes the active language, returning the previous one. + @Operation + Language setLanguage(SetLanguageInput input); + + // Approves the workflow, allowing it to complete. + @Operation + ApproveOutput approve(ApproveInput input); +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/service/NexusRemoteGreetingService.java b/core/src/main/java/io/temporal/samples/nexus_messaging/service/NexusRemoteGreetingService.java new file mode 100644 index 00000000..60447a61 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/service/NexusRemoteGreetingService.java @@ -0,0 +1,134 @@ +package io.temporal.samples.nexus_messaging.service; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.nexusrpc.Operation; +import io.nexusrpc.Service; + +/** + * Nexus service definition for the remote-start pattern. Unlike {@link NexusGreetingService}, every + * operation includes a {@code workflowId} so the caller controls which workflow instance is + * targeted. This also exposes a {@code runFromRemote} operation that starts a new GreetingWorkflow. + */ +@Service +public interface NexusRemoteGreetingService { + + class RunFromRemoteInput { + private final String workflowId; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public RunFromRemoteInput(@JsonProperty("workflowId") String workflowId) { + this.workflowId = workflowId; + } + + @JsonProperty("workflowId") + public String getWorkflowId() { + return workflowId; + } + } + + class GetLanguagesInput { + private final boolean includeUnsupported; + private final String workflowId; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public GetLanguagesInput( + @JsonProperty("includeUnsupported") boolean includeUnsupported, + @JsonProperty("workflowId") String workflowId) { + this.includeUnsupported = includeUnsupported; + this.workflowId = workflowId; + } + + @JsonProperty("includeUnsupported") + public boolean isIncludeUnsupported() { + return includeUnsupported; + } + + @JsonProperty("workflowId") + public String getWorkflowId() { + return workflowId; + } + } + + @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY) + class GetLanguageInput { + private final String workflowId; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public GetLanguageInput(@JsonProperty("workflowId") String workflowId) { + this.workflowId = workflowId; + } + + @JsonProperty("workflowId") + public String getWorkflowId() { + return workflowId; + } + } + + class SetLanguageInput { + private final Language language; + private final String workflowId; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public SetLanguageInput( + @JsonProperty("language") Language language, + @JsonProperty("workflowId") String workflowId) { + this.language = language; + this.workflowId = workflowId; + } + + @JsonProperty("language") + public Language getLanguage() { + return language; + } + + @JsonProperty("workflowId") + public String getWorkflowId() { + return workflowId; + } + } + + class ApproveInput { + private final String name; + private final String workflowId; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public ApproveInput( + @JsonProperty("name") String name, @JsonProperty("workflowId") String workflowId) { + this.name = name; + this.workflowId = workflowId; + } + + @JsonProperty("name") + public String getName() { + return name; + } + + @JsonProperty("workflowId") + public String getWorkflowId() { + return workflowId; + } + } + + // Starts a new GreetingWorkflow with the given workflow ID. This is an asynchronous Nexus + // operation: the caller receives a handle and can wait for the workflow to complete. + @Operation + String runFromRemote(RunFromRemoteInput input); + + // Returns the languages supported by the specified workflow. + @Operation + NexusGreetingService.GetLanguagesOutput getLanguages(GetLanguagesInput input); + + // Returns the currently active language of the specified workflow. + @Operation + Language getLanguage(GetLanguageInput input); + + // Changes the active language on the specified workflow, returning the previous one. + @Operation + Language setLanguage(SetLanguageInput input); + + // Approves the specified workflow, allowing it to complete. + @Operation + NexusGreetingService.ApproveOutput approve(ApproveInput input); +}