-
Notifications
You must be signed in to change notification settings - Fork 180
Sample code for Nexus messaging #776
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
Evanthx
wants to merge
5
commits into
main
Choose a base branch
from
signals-nexus-java
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
138 changes: 138 additions & 0 deletions
138
core/src/main/java/io/temporal/samples/nexus_messaging/README.md
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,138 @@ | ||
| This sample shows how to expose a long-running workflow's queries, updates, and signals as Nexus | ||
| operations. The caller interacts only with the Nexus service; the workflow is a private | ||
| implementation detail. | ||
|
|
||
| There are **two caller patterns** that share the same handler workflow (`GreetingWorkflow`): | ||
|
|
||
| | | `caller/` (entity pattern) | `caller_remote/` (remote-start pattern) | | ||
| |---|---|---| | ||
| | **Who creates the workflow?** | The handler worker starts it on boot | The caller starts it via a `runFromRemote` Nexus operation | | ||
| | **Who knows the workflow ID?** | Only the handler | The caller chooses and passes it in every operation | | ||
| | **Nexus service** | `NexusGreetingService` — inputs carry only business data | `NexusRemoteGreetingService` — every input includes a `workflowId` | | ||
| | **When to use** | Single shared entity; callers don't need lifecycle control | Caller needs to create and target specific workflow instances | | ||
|
|
||
| ### Directory structure | ||
|
|
||
| - `service/` — shared Nexus service definitions (`NexusGreetingService`, `NexusRemoteGreetingService`) and `Language` enum | ||
| - `handler/` — `GreetingWorkflow` and its implementation, `GreetingActivity`, both Nexus service impls (`NexusGreetingServiceImpl`, `NexusRemoteGreetingServiceImpl`), and the handler worker | ||
| - `caller/` — entity-pattern caller (workflow, worker, starter) | ||
| - `caller_remote/` — remote-start caller (workflow, worker, starter) | ||
|
|
||
| ### Running | ||
|
|
||
| Start a Temporal server: | ||
|
|
||
| ```bash | ||
| temporal server start-dev | ||
| ``` | ||
| Create the namespaces and Nexus endpoint: | ||
|
|
||
| ```bash | ||
| temporal operator namespace create --namespace nexus-messaging-handler-namespace | ||
| temporal operator namespace create --namespace nexus-messaging-caller-namespace | ||
|
|
||
| temporal operator nexus endpoint create \ | ||
| --name nexus-messaging-nexus-endpoint \ | ||
| --target-namespace nexus-messaging-handler-namespace \ | ||
| --target-task-queue nexus-messaging-handler-task-queue | ||
| ``` | ||
|
|
||
| In one terminal, start the handler worker (shared by both patterns): | ||
|
|
||
| ```bash | ||
| ./gradlew -q :core:execute -PmainClass=io.temporal.samples.nexus_messaging.handler.HandlerWorker | ||
| ``` | ||
|
|
||
| #### Entity pattern | ||
|
|
||
| In the second terminal, run the caller worker: | ||
|
|
||
| ```bash | ||
| ./gradlew -q :core:execute -PmainClass=io.temporal.samples.nexus_messaging.caller.CallerWorker | ||
| ``` | ||
|
|
||
| In the third terminal, start the caller workflow: | ||
|
|
||
| ```bash | ||
| ./gradlew -q :core:execute -PmainClass=io.temporal.samples.nexus_messaging.caller.CallerStarter | ||
| ``` | ||
|
|
||
| Expected output: | ||
|
|
||
| ``` | ||
| supported languages: [CHINESE, ENGLISH] | ||
| language changed: ENGLISH -> ARABIC | ||
| workflow approved | ||
| ``` | ||
|
|
||
| #### Remote-start pattern | ||
|
|
||
| In a second terminal, run the remote caller worker: | ||
|
|
||
| ```bash | ||
| ./gradlew -q :core:execute -PmainClass=io.temporal.samples.nexus_messaging.caller_remote.CallerRemoteWorker | ||
| ``` | ||
|
|
||
| In a third terminal, start the remote caller workflow: | ||
|
|
||
| ```bash | ||
| ./gradlew -q :core:execute -PmainClass=io.temporal.samples.nexus_messaging.caller_remote.CallerRemoteStarter | ||
| ``` | ||
|
|
||
| Expected output: | ||
|
|
||
| ``` | ||
| started remote greeting workflow: nexus-messaging-remote-greeting-workflow | ||
| supported languages: [CHINESE, ENGLISH] | ||
| language changed: ENGLISH -> ARABIC | ||
| workflow approved | ||
| workflow result: مرحبا بالعالم | ||
| ``` | ||
|
|
||
| ### How it works | ||
|
|
||
| #### The handler (shared by both patterns) | ||
|
|
||
| `GreetingWorkflow` is a long-running "entity" workflow that holds the current language and a map of | ||
| greetings. It exposes its state through standard Temporal primitives: | ||
|
|
||
| - `getLanguages` / `getLanguage` — `@QueryMethod`s for reading state | ||
| - `setLanguage` — an `@UpdateMethod` for switching between already-loaded languages | ||
| - `setLanguageUsingActivity` — an `@UpdateMethod` that calls an activity to fetch a greeting for | ||
| a language not yet in the map (uses `WorkflowLock` to serialize concurrent activity calls) | ||
| - `approve` — a `@SignalMethod` that lets the workflow complete | ||
|
|
||
| The workflow waits until approved and all in-flight update handlers have finished, then returns the | ||
| greeting in the current language. | ||
|
|
||
| Both Nexus service implementations translate incoming Nexus operations into calls against | ||
| `GreetingWorkflow` stubs — queries, updates, and signals. The caller never interacts with the | ||
| workflow directly. | ||
|
|
||
| #### Entity pattern (`caller/` + `NexusGreetingService`) | ||
|
|
||
| The handler worker starts a single `GreetingWorkflow` on boot with a fixed workflow ID. | ||
| `NexusGreetingServiceImpl` holds that workflow ID in its constructor and routes every operation to | ||
| it. The caller's inputs contain only business data (language, name), not workflow IDs. | ||
|
|
||
| `CallerWorkflowImpl` creates a `NexusGreetingService` stub and: | ||
| 1. Queries for supported languages (`getLanguages` — backed by a `@QueryMethod`) | ||
| 2. Changes the language to Arabic (`setLanguage` — backed by an `@UpdateMethod` that calls an activity) | ||
| 3. Confirms the change via a second query (`getLanguage`) | ||
| 4. Approves the workflow (`approve` — backed by a `@SignalMethod`) | ||
|
|
||
| #### Remote-start pattern (`caller_remote/` + `NexusRemoteGreetingService`) | ||
|
|
||
| No workflow is pre-started. Instead, `NexusRemoteGreetingService` adds a `runFromRemote` operation | ||
| that starts a new `GreetingWorkflow` with a caller-chosen workflow ID using | ||
| `WorkflowRunOperation`. Every other operation also includes the `workflowId` in its input so that | ||
| `NexusRemoteGreetingServiceImpl` can look up the right workflow stub. | ||
|
|
||
| `CallerRemoteWorkflowImpl` creates a `NexusRemoteGreetingService` stub and: | ||
| 1. Starts a remote `GreetingWorkflow` via `runFromRemote` and waits for it to be running | ||
| 2. Queries, updates, and approves that workflow — same operations as the entity pattern, but each | ||
| input carries the workflow ID | ||
| 3. Waits for the remote workflow to complete and returns its result (the greeting string) | ||
|
|
||
| This pattern is useful when the caller needs to control the lifecycle of individual workflow | ||
| instances rather than sharing a single entity. | ||
30 changes: 30 additions & 0 deletions
30
core/src/main/java/io/temporal/samples/nexus_messaging/caller/CallerStarter.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| package io.temporal.samples.nexus_messaging.caller; | ||
|
|
||
| import io.temporal.client.WorkflowClient; | ||
| import io.temporal.client.WorkflowClientOptions; | ||
| import io.temporal.client.WorkflowOptions; | ||
| import io.temporal.serviceclient.WorkflowServiceStubs; | ||
| import java.util.List; | ||
| import java.util.UUID; | ||
|
|
||
| public class CallerStarter { | ||
|
|
||
| public static void main(String[] args) { | ||
| WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); | ||
| WorkflowClient client = | ||
| WorkflowClient.newInstance( | ||
| service, | ||
| WorkflowClientOptions.newBuilder().setNamespace(CallerWorker.NAMESPACE).build()); | ||
|
|
||
| CallerWorkflow workflow = | ||
| client.newWorkflowStub( | ||
| CallerWorkflow.class, | ||
| WorkflowOptions.newBuilder() | ||
| .setWorkflowId("nexus-messaging-caller-" + UUID.randomUUID()) | ||
| .setTaskQueue(CallerWorker.TASK_QUEUE) | ||
| .build()); | ||
|
|
||
| List<String> log = workflow.run(); | ||
| log.forEach(System.out::println); | ||
| } | ||
| } |
43 changes: 43 additions & 0 deletions
43
core/src/main/java/io/temporal/samples/nexus_messaging/caller/CallerWorker.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| package io.temporal.samples.nexus_messaging.caller; | ||
|
|
||
| import io.temporal.client.WorkflowClient; | ||
| import io.temporal.client.WorkflowClientOptions; | ||
| import io.temporal.serviceclient.WorkflowServiceStubs; | ||
| import io.temporal.worker.Worker; | ||
| import io.temporal.worker.WorkerFactory; | ||
| import io.temporal.worker.WorkflowImplementationOptions; | ||
| import io.temporal.workflow.NexusServiceOptions; | ||
| import java.util.Collections; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| public class CallerWorker { | ||
| private static final Logger logger = LoggerFactory.getLogger(CallerWorker.class); | ||
|
|
||
| public static final String NAMESPACE = "nexus-messaging-caller-namespace"; | ||
| public static final String TASK_QUEUE = "nexus-messaging-caller-task-queue"; | ||
| static final String NEXUS_ENDPOINT = "nexus-messaging-nexus-endpoint"; | ||
|
|
||
| public static void main(String[] args) throws InterruptedException { | ||
| WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); | ||
| WorkflowClient client = | ||
| WorkflowClient.newInstance( | ||
| service, WorkflowClientOptions.newBuilder().setNamespace(NAMESPACE).build()); | ||
|
|
||
| WorkerFactory factory = WorkerFactory.newInstance(client); | ||
| Worker worker = factory.newWorker(TASK_QUEUE); | ||
| worker.registerWorkflowImplementationTypes( | ||
| WorkflowImplementationOptions.newBuilder() | ||
| .setNexusServiceOptions( | ||
| // The key must match the @Service-annotated interface name. | ||
| Collections.singletonMap( | ||
| "NexusGreetingService", | ||
| NexusServiceOptions.newBuilder().setEndpoint(NEXUS_ENDPOINT).build())) | ||
| .build(), | ||
| CallerWorkflowImpl.class); | ||
|
|
||
| factory.start(); | ||
| logger.info("Caller worker started, ctrl+c to exit"); | ||
| Thread.currentThread().join(); | ||
| } | ||
| } |
11 changes: 11 additions & 0 deletions
11
core/src/main/java/io/temporal/samples/nexus_messaging/caller/CallerWorkflow.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| package io.temporal.samples.nexus_messaging.caller; | ||
|
|
||
| import io.temporal.workflow.WorkflowInterface; | ||
| import io.temporal.workflow.WorkflowMethod; | ||
| import java.util.List; | ||
|
|
||
| @WorkflowInterface | ||
| public interface CallerWorkflow { | ||
| @WorkflowMethod | ||
| List<String> run(); | ||
| } |
62 changes: 62 additions & 0 deletions
62
core/src/main/java/io/temporal/samples/nexus_messaging/caller/CallerWorkflowImpl.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,62 @@ | ||
| package io.temporal.samples.nexus_messaging.caller; | ||
|
|
||
| import io.temporal.failure.ApplicationFailure; | ||
| import io.temporal.samples.nexus_messaging.caller_remote.CallerRemoteWorkflowImpl; | ||
| import io.temporal.samples.nexus_messaging.service.Language; | ||
| import io.temporal.samples.nexus_messaging.service.NexusGreetingService; | ||
| import io.temporal.workflow.NexusOperationOptions; | ||
| import io.temporal.workflow.NexusServiceOptions; | ||
| import io.temporal.workflow.Workflow; | ||
| import java.time.Duration; | ||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| public class CallerWorkflowImpl implements CallerWorkflow { | ||
|
|
||
| private static final Logger logger = LoggerFactory.getLogger(CallerRemoteWorkflowImpl.class); | ||
|
|
||
| // The endpoint is configured at the worker level in CallerWorker; only operation options are | ||
| // set here. | ||
| NexusGreetingService greetingService = | ||
| Workflow.newNexusServiceStub( | ||
| NexusGreetingService.class, | ||
| NexusServiceOptions.newBuilder() | ||
| .setOperationOptions( | ||
| NexusOperationOptions.newBuilder() | ||
| .setScheduleToCloseTimeout(Duration.ofSeconds(10)) | ||
| .build()) | ||
| .build()); | ||
|
|
||
| @Override | ||
| public List<String> run() { | ||
| List<String> log = new ArrayList<>(); | ||
|
|
||
| // 👉 Call a Nexus operation backed by a query against the entity workflow. | ||
| NexusGreetingService.GetLanguagesOutput languagesOutput = | ||
| greetingService.getLanguages(new NexusGreetingService.GetLanguagesInput(false)); | ||
| log.add("supported languages: " + languagesOutput.getLanguages()); | ||
|
|
||
| // 👉 Call a Nexus operation backed by an update against the entity workflow. | ||
| Language previousLanguage = | ||
| greetingService.setLanguage(new NexusGreetingService.SetLanguageInput(Language.ARABIC)); | ||
| logger.info("Language changed from {} to {}", previousLanguage, Language.ARABIC); | ||
|
|
||
| // 👉 Call a Nexus operation backed by a query to confirm the language change. | ||
| Language currentLanguage = | ||
| greetingService.getLanguage(new NexusGreetingService.GetLanguageInput()); | ||
| if (currentLanguage != Language.ARABIC) { | ||
| throw ApplicationFailure.newFailure( | ||
| "expected language ARABIC, got " + currentLanguage, "AssertionError"); | ||
| } | ||
|
|
||
| log.add("language changed: " + previousLanguage.name() + " -> " + Language.ARABIC.name()); | ||
|
|
||
| // 👉 Call a Nexus operation backed by a signal against the entity workflow. | ||
| greetingService.approve(new NexusGreetingService.ApproveInput("caller")); | ||
| log.add("workflow approved"); | ||
|
|
||
| return log; | ||
| } | ||
| } |
30 changes: 30 additions & 0 deletions
30
.../src/main/java/io/temporal/samples/nexus_messaging/caller_remote/CallerRemoteStarter.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| package io.temporal.samples.nexus_messaging.caller_remote; | ||
|
|
||
| import io.temporal.client.WorkflowClient; | ||
| import io.temporal.client.WorkflowClientOptions; | ||
| import io.temporal.client.WorkflowOptions; | ||
| import io.temporal.serviceclient.WorkflowServiceStubs; | ||
| import java.util.List; | ||
| import java.util.UUID; | ||
|
|
||
| public class CallerRemoteStarter { | ||
|
|
||
| public static void main(String[] args) { | ||
| WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); | ||
| WorkflowClient client = | ||
| WorkflowClient.newInstance( | ||
| service, | ||
| WorkflowClientOptions.newBuilder().setNamespace(CallerRemoteWorker.NAMESPACE).build()); | ||
|
|
||
| CallerRemoteWorkflow workflow = | ||
| client.newWorkflowStub( | ||
| CallerRemoteWorkflow.class, | ||
| WorkflowOptions.newBuilder() | ||
| .setWorkflowId("nexus-messaging-remote-caller-" + UUID.randomUUID()) | ||
| .setTaskQueue(CallerRemoteWorker.TASK_QUEUE) | ||
| .build()); | ||
|
|
||
| List<String> log = workflow.run(); | ||
| log.forEach(System.out::println); | ||
| } | ||
| } |
43 changes: 43 additions & 0 deletions
43
core/src/main/java/io/temporal/samples/nexus_messaging/caller_remote/CallerRemoteWorker.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| package io.temporal.samples.nexus_messaging.caller_remote; | ||
|
|
||
| import io.temporal.client.WorkflowClient; | ||
| import io.temporal.client.WorkflowClientOptions; | ||
| import io.temporal.serviceclient.WorkflowServiceStubs; | ||
| import io.temporal.worker.Worker; | ||
| import io.temporal.worker.WorkerFactory; | ||
| import io.temporal.worker.WorkflowImplementationOptions; | ||
| import io.temporal.workflow.NexusServiceOptions; | ||
| import java.util.Collections; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| public class CallerRemoteWorker { | ||
| private static final Logger logger = LoggerFactory.getLogger(CallerRemoteWorker.class); | ||
|
|
||
| public static final String NAMESPACE = "nexus-messaging-caller-namespace"; | ||
| public static final String TASK_QUEUE = "nexus-messaging-caller-remote-task-queue"; | ||
| static final String NEXUS_ENDPOINT = "nexus-messaging-nexus-endpoint"; | ||
|
|
||
| public static void main(String[] args) throws InterruptedException { | ||
| WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); | ||
| WorkflowClient client = | ||
| WorkflowClient.newInstance( | ||
| service, WorkflowClientOptions.newBuilder().setNamespace(NAMESPACE).build()); | ||
|
|
||
| WorkerFactory factory = WorkerFactory.newInstance(client); | ||
| Worker worker = factory.newWorker(TASK_QUEUE); | ||
| worker.registerWorkflowImplementationTypes( | ||
| WorkflowImplementationOptions.newBuilder() | ||
| .setNexusServiceOptions( | ||
| // The key must match the @Service-annotated interface name. | ||
| Collections.singletonMap( | ||
| "NexusRemoteGreetingService", | ||
| NexusServiceOptions.newBuilder().setEndpoint(NEXUS_ENDPOINT).build())) | ||
| .build(), | ||
| CallerRemoteWorkflowImpl.class); | ||
|
|
||
| factory.start(); | ||
| logger.info("Caller remote worker started, ctrl+c to exit"); | ||
| Thread.currentThread().join(); | ||
| } | ||
| } |
11 changes: 11 additions & 0 deletions
11
...src/main/java/io/temporal/samples/nexus_messaging/caller_remote/CallerRemoteWorkflow.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| package io.temporal.samples.nexus_messaging.caller_remote; | ||
|
|
||
| import io.temporal.workflow.WorkflowInterface; | ||
| import io.temporal.workflow.WorkflowMethod; | ||
| import java.util.List; | ||
|
|
||
| @WorkflowInterface | ||
| public interface CallerRemoteWorkflow { | ||
| @WorkflowMethod | ||
| List<String> run(); | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I would split these up into two separate samples since the amount of code here could be overwhelming and make it harder to follow