From 163ad9ae81ea1c910a739540f86adac09ac3144c Mon Sep 17 00:00:00 2001 From: Matheus Cruz Date: Wed, 8 Apr 2026 15:32:12 -0300 Subject: [PATCH 1/7] Add wait to DSL Signed-off-by: Matheus Cruz --- .../fluent/spec/BaseTaskItemListBuilder.java | 1 + .../fluent/spec/DoTaskBuilder.java | 6 ++ .../fluent/spec/TaskItemListBuilder.java | 8 ++ .../fluent/spec/WaitTaskBuilder.java | 83 +++++++++++++++++ .../fluent/spec/dsl/DSL.java | 46 ++++++++++ .../fluent/spec/spi/DoFluent.java | 2 + .../fluent/spec/spi/WaitFluent.java | 28 ++++++ .../spec/TaskItemDefaultNamingTest.java | 18 ++++ .../fluent/spec/dsl/DSLTest.java | 16 ++++ .../impl/executors/WaitExecutor.java | 11 +-- impl/test/pom.xml | 5 + .../impl/test/WaitTest.java | 91 +++++++++++++++++++ 12 files changed, 309 insertions(+), 6 deletions(-) create mode 100644 fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/WaitTaskBuilder.java create mode 100644 fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/WaitFluent.java create mode 100644 impl/test/src/test/java/io/serverlessworkflow/impl/test/WaitTest.java diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/BaseTaskItemListBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/BaseTaskItemListBuilder.java index 0133a98d4..e42b23b6b 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/BaseTaskItemListBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/BaseTaskItemListBuilder.java @@ -42,6 +42,7 @@ public abstract class BaseTaskItemListBuilder itemsConfigurer) { + this.listBuilder().wait(name, itemsConfigurer); + return this; + } + @Override public DoTaskBuilder switchCase(String name, Consumer itemsConfigurer) { this.listBuilder().switchCase(name, itemsConfigurer); diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/TaskItemListBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/TaskItemListBuilder.java index cf1c6845c..aa5c3bf84 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/TaskItemListBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/TaskItemListBuilder.java @@ -56,6 +56,14 @@ public TaskItemListBuilder set(String name, final String expr) { return this.set(name, s -> s.expr(expr)); } + @Override + public TaskItemListBuilder wait(String name, Consumer itemsConfigurer) { + name = defaultNameAndRequireConfig(name, itemsConfigurer, TYPE_WAIT); + final WaitTaskBuilder waitBuilder = new WaitTaskBuilder(); + itemsConfigurer.accept(waitBuilder); + return addTaskItem(new TaskItem(name, new Task().withWaitTask(waitBuilder.build()))); + } + @Override public TaskItemListBuilder forEach( String name, Consumer> itemsConfigurer) { diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/WaitTaskBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/WaitTaskBuilder.java new file mode 100644 index 000000000..6964085d3 --- /dev/null +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/WaitTaskBuilder.java @@ -0,0 +1,83 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.spec; + +import io.serverlessworkflow.api.types.DurationInline; +import io.serverlessworkflow.api.types.TimeoutAfter; +import io.serverlessworkflow.api.types.WaitTask; +import java.time.Duration; +import java.util.Objects; +import java.util.function.Consumer; + +public class WaitTaskBuilder extends TaskBaseBuilder { + + private final WaitTask waitTask; + + public WaitTaskBuilder() { + this.waitTask = new WaitTask(); + setTask(this.waitTask); + } + + @Override + protected WaitTaskBuilder self() { + return this; + } + + public WaitTaskBuilder wait(Consumer waitConsumer) { + final TimeoutBuilder timeoutBuilder = new TimeoutBuilder(); + waitConsumer.accept(timeoutBuilder); + this.waitTask.setWait(timeoutBuilder.build().getAfter()); + return this; + } + + public WaitTaskBuilder wait(String durationExpression) { + this.waitTask.setWait(new TimeoutAfter().withDurationExpression(durationExpression)); + return this; + } + + public WaitTaskBuilder wait(Duration duration) { + Objects.requireNonNull(duration, "duration must not be null"); + if (duration.isNegative()) { + throw new IllegalArgumentException("duration must not be negative"); + } + + long millis = duration.toMillis(); + + int days = Math.toIntExact(millis / 86_400_000L); + millis %= 86_400_000L; + int hours = Math.toIntExact(millis / 3_600_000L); + millis %= 3_600_000L; + int minutes = Math.toIntExact(millis / 60_000L); + millis %= 60_000L; + int seconds = Math.toIntExact(millis / 1_000L); + int milliseconds = Math.toIntExact(millis % 1_000L); + + this.waitTask.setWait( + new TimeoutAfter() + .withDurationInline( + new DurationInline() + .withDays(days) + .withHours(hours) + .withMinutes(minutes) + .withSeconds(seconds) + .withMilliseconds(milliseconds))); + return this; + } + + public WaitTask build() { + return this.waitTask; + } +} diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/DSL.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/DSL.java index 45b80a3ce..1eda05284 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/DSL.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/DSL.java @@ -797,6 +797,52 @@ public static TasksConfigurer listen(String name, ListenConfigurer configurer) { return list -> list.listen(name, configurer); } + /** + * Create a {@link TasksConfigurer} that adds a {@code wait} task configured with an inline + * duration builder. + * + * @param duration timeout builder consumer + * @return a {@link TasksConfigurer} that adds a WaitTask + */ + public static TasksConfigurer wait(Consumer duration) { + return list -> list.wait(w -> w.wait(duration)); + } + + /** + * Create a {@link TasksConfigurer} that adds a named {@code wait} task configured with an inline + * duration builder. + * + * @param name task name + * @param duration timeout builder consumer + * @return a {@link TasksConfigurer} that adds a WaitTask + */ + public static TasksConfigurer wait(String name, Consumer duration) { + return list -> list.wait(name, w -> w.wait(duration)); + } + + /** + * Create a {@link TasksConfigurer} that adds a {@code wait} task configured with a duration + * expression. + * + * @param durationExpression duration expression + * @return a {@link TasksConfigurer} that adds a WaitTask + */ + public static TasksConfigurer wait(String durationExpression) { + return list -> list.wait(w -> w.wait(durationExpression)); + } + + /** + * Create a {@link TasksConfigurer} that adds a named {@code wait} task configured with a duration + * expression. + * + * @param name task name + * @param durationExpression duration expression + * @return a {@link TasksConfigurer} that adds a WaitTask + */ + public static TasksConfigurer wait(String name, String durationExpression) { + return list -> list.wait(name, w -> w.wait(durationExpression)); + } + /** * Create a {@link TasksConfigurer} that adds a {@code forEach} task. * diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/DoFluent.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/DoFluent.java index 4d1048fbf..57958b2d6 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/DoFluent.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/DoFluent.java @@ -26,6 +26,7 @@ import io.serverlessworkflow.fluent.spec.SwitchTaskBuilder; import io.serverlessworkflow.fluent.spec.TaskItemListBuilder; import io.serverlessworkflow.fluent.spec.TryTaskBuilder; +import io.serverlessworkflow.fluent.spec.WaitTaskBuilder; import io.serverlessworkflow.fluent.spec.WorkflowTaskBuilder; /** @@ -44,6 +45,7 @@ public interface DoFluent ForEachFluent, T>, ForkFluent, ListenFluent, + WaitFluent, RaiseFluent, CallOpenAPIFluent, WorkflowFluent {} diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/WaitFluent.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/WaitFluent.java new file mode 100644 index 000000000..a9b1df4e1 --- /dev/null +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/WaitFluent.java @@ -0,0 +1,28 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.spec.spi; + +import io.serverlessworkflow.fluent.spec.TaskBaseBuilder; +import java.util.function.Consumer; + +public interface WaitFluent, LIST> { + + LIST wait(String name, Consumer itemsConfigurer); + + default LIST wait(Consumer itemsConfigurer) { + return this.wait(null, itemsConfigurer); + } +} diff --git a/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/TaskItemDefaultNamingTest.java b/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/TaskItemDefaultNamingTest.java index 9d130d71f..e22489e7b 100644 --- a/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/TaskItemDefaultNamingTest.java +++ b/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/TaskItemDefaultNamingTest.java @@ -25,6 +25,7 @@ import io.serverlessworkflow.api.types.TryTask; import io.serverlessworkflow.api.types.TryTaskCatch; import io.serverlessworkflow.api.types.Workflow; +import java.time.Duration; import java.util.List; import org.junit.jupiter.api.Test; @@ -128,6 +129,23 @@ void testNestedForEachTaskAutoNaming() { assertEquals("set-1", nestedForItems.get(1).getName()); } + @Test + void testWaitTaskAutoNamingAndDuration() { + Workflow wf = + WorkflowBuilder.workflow("flowAutoNameWait") + .tasks(d -> d.wait(null, w -> w.wait(Duration.ofSeconds(2).plusMillis(250)))) + .build(); + + List items = wf.getDo(); + assertNotNull(items, "Do list must not be null"); + assertEquals(1, items.size(), "There should be one wait task"); + assertEquals("wait-0", items.get(0).getName(), "Wait task should be wait-0"); + assertEquals( + 2, items.get(0).getTask().getWaitTask().getWait().getDurationInline().getSeconds()); + assertEquals( + 250, items.get(0).getTask().getWaitTask().getWait().getDurationInline().getMilliseconds()); + } + @Test void testNestedForkTaskAutoNaming() { Workflow wf = diff --git a/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/dsl/DSLTest.java b/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/dsl/DSLTest.java index e8a5c3fcb..70118019a 100644 --- a/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/dsl/DSLTest.java +++ b/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/dsl/DSLTest.java @@ -116,6 +116,22 @@ public void when_listen_all_then_emit() { assertThat(evProps.getDatacontenttype()).isNull(); } + @Test + public void when_wait_task_from_dsl_helpers() { + Workflow wf = + WorkflowBuilder.workflow("myFlow", "myNs", "1.2.3") + .tasks(DSL.wait("PT5S"), DSL.wait("pause", DSL.timeoutSeconds(2))) + .build(); + + assertThat(wf.getDo()).hasSize(2); + assertThat(wf.getDo().get(0).getTask().getWaitTask()).isNotNull(); + assertThat(wf.getDo().get(0).getTask().getWaitTask().getWait().getDurationExpression()) + .isEqualTo("PT5S"); + assertThat(wf.getDo().get(1).getName()).isEqualTo("pause"); + assertThat(wf.getDo().get(1).getTask().getWaitTask().getWait().getDurationInline().getSeconds()) + .isEqualTo(2); + } + @Test public void when_listen_any_with_until() { final String untilExpr = "$.count > 0"; diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/WaitExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/WaitExecutor.java index d649f47eb..40838dacf 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/WaitExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/WaitExecutor.java @@ -45,12 +45,11 @@ protected WaitExecutorBuilder( } private Duration toLong(DurationInline durationInline) { - Duration duration = Duration.ofMillis(durationInline.getMilliseconds()); - duration.plus(Duration.ofSeconds(durationInline.getSeconds())); - duration.plus(Duration.ofMinutes(durationInline.getMinutes())); - duration.plus(Duration.ofHours(durationInline.getHours())); - duration.plus(Duration.ofDays(durationInline.getDays())); - return duration; + return Duration.ofMillis(durationInline.getMilliseconds()) + .plusSeconds(durationInline.getSeconds()) + .plusMinutes(durationInline.getMinutes()) + .plusHours(durationInline.getHours()) + .plusDays(durationInline.getDays()); } @Override diff --git a/impl/test/pom.xml b/impl/test/pom.xml index 49ad1d893..cc238e7f5 100644 --- a/impl/test/pom.xml +++ b/impl/test/pom.xml @@ -95,6 +95,11 @@ grpc-netty test + + org.awaitility + awaitility + test + diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/WaitTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/WaitTest.java new file mode 100644 index 000000000..e0c9f4330 --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/WaitTest.java @@ -0,0 +1,91 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.fluent.spec.WorkflowBuilder; +import io.serverlessworkflow.fluent.spec.dsl.DSL; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import java.time.Duration; +import java.util.Map; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; + +@Execution(ExecutionMode.CONCURRENT) +public class WaitTest { + + @Test + void waitTaskBuilder_should_wait_5_seconds() { + Workflow workflow = + WorkflowBuilder.workflow("waitTaskBuilder", "wait") + .tasks( + t -> + t.wait( + "wait5Seconds", + w -> w.wait(timeout -> timeout.duration(d -> d.seconds(5)))) + .set("setFinished", s -> s.put("finished", true))) + .build(); + + assertThatExecuteAtLeast5Seconds(workflow); + } + + @Test + void durationExpressions_should_wait_5_seconds() { + Workflow workflow = WorkflowBuilder.workflow("durationExpr", "wait") + .tasks(DSL.wait("PT5S"), DSL.set("setFinished", s -> s.put("finished", true))) + .build(); + + assertThatExecuteAtLeast5Seconds(workflow); + } + + @Test + void namedDurationExpressions_should_wait_5_seconds() { + Workflow workflow = WorkflowBuilder.workflow("durationExpr", "wait") + .tasks(DSL.wait("named","PT5S"), DSL.set("setFinished", s -> s.put("finished", true))) + .build(); + + assertThatExecuteAtLeast5Seconds(workflow); + } + + @Test + void timeoutBuilder_should_wait_5_seconds() { + Workflow workflow = WorkflowBuilder.workflow("durationExpr", "wait") + .tasks(DSL.wait(timeoutBuilder -> timeoutBuilder.duration(durationInlineBuilder -> durationInlineBuilder.seconds(5))), DSL.set("setFinished", s -> s.put("finished", true))) + .build(); + + assertThatExecuteAtLeast5Seconds(workflow); + } + + private static void assertThatExecuteAtLeast5Seconds(Workflow workflow) { + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + WorkflowDefinition definition = app.workflowDefinition(workflow); + long startNanos = System.nanoTime(); + WorkflowModel model = definition.instance(Map.of()).start().join(); + long elapsedMillis = Duration.ofNanos(System.nanoTime() - startNanos).toMillis(); + + assertEquals(true, model.asMap().orElseThrow().get("finished")); + assertTrue( + elapsedMillis >= 5_000, + "Workflow should wait at least 5 seconds, but waited " + elapsedMillis + " ms"); + } + } +} From 1504a5ee2e5bc5e0adba2e7d2237480f7d44d41d Mon Sep 17 00:00:00 2001 From: Matheus Cruz Date: Thu, 9 Apr 2026 09:22:28 -0300 Subject: [PATCH 2/7] Use LifeCycleEventsTest to test DSL Signed-off-by: Matheus Cruz --- impl/test/pom.xml | 5 - .../impl/test/LifeCycleEventsTest.java | 111 ++++++++++++++---- .../impl/test/WaitTest.java | 91 -------------- 3 files changed, 86 insertions(+), 121 deletions(-) delete mode 100644 impl/test/src/test/java/io/serverlessworkflow/impl/test/WaitTest.java diff --git a/impl/test/pom.xml b/impl/test/pom.xml index cc238e7f5..49ad1d893 100644 --- a/impl/test/pom.xml +++ b/impl/test/pom.xml @@ -95,11 +95,6 @@ grpc-netty test - - org.awaitility - awaitility - test - diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java index 3af374aa1..92d9ae65c 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java @@ -23,6 +23,8 @@ import io.cloudevents.core.data.PojoCloudEventData; import io.serverlessworkflow.api.WorkflowReader; import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.fluent.spec.WorkflowBuilder; +import io.serverlessworkflow.fluent.spec.dsl.DSL; import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowInstance; import io.serverlessworkflow.impl.WorkflowModel; @@ -51,10 +53,14 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Stream; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; class LifeCycleEventsTest { @@ -113,13 +119,33 @@ void simpleWorkflow() throws IOException { assertThat(taskStartedEvent.startedAt()).isBefore(taskCompletedEvent.completedAt()); } - @Test - void testSuspendResumeNotWait() + @ParameterizedTest(name = "{0}") + @MethodSource("waitSetWorkflowSources") + void testSuspendResumeNotWait(String sourceName, WorkflowSource source) throws IOException, ExecutionException, InterruptedException, TimeoutException { - WorkflowInstance instance = - appl.workflowDefinition( - WorkflowReader.readWorkflowFromClasspath("workflows-samples/wait-set.yaml")) - .instance(Map.of()); + doTestSuspendResumeNotWait(waitSetInstance(source)); + } + + private static Stream waitSetWorkflowSources() { + return Stream.of( + Arguments.of("dsl", WorkflowSource.DSL), + Arguments.of("yaml", WorkflowSource.YAML)); + } + + private static Workflow waitTestWorkflow() { + return WorkflowBuilder.workflow("wait-test", "test", "0.1.0") + .tasks( + // wait 500 ms + DSL.wait( + "waitABit", + timeoutBuilder -> + timeoutBuilder.duration(durationBuilder -> durationBuilder.milliseconds(500))), + DSL.set("useExpression", setTaskBuilder -> setTaskBuilder.put("name", "Javierito"))) + .build(); + } + + private void doTestSuspendResumeNotWait(WorkflowInstance instance) + throws InterruptedException, ExecutionException, TimeoutException { CompletableFuture future = instance.start(); instance.suspend(); assertThat(instance.status()).isEqualTo(WorkflowStatus.SUSPENDED); @@ -136,13 +162,21 @@ void testSuspendResumeNotWait() .isBeforeOrEqualTo(workflowResumedEvent.resumedAt()); } - @Test - void testSuspendResumeWait() + @ParameterizedTest(name = "{0}") + @MethodSource("waitSetWorkflowSourcesForSuspendResumeWait") + void testSuspendResumeWait(String sourceName, WorkflowSource source) throws IOException, ExecutionException, InterruptedException, TimeoutException { - WorkflowInstance instance = - appl.workflowDefinition( - WorkflowReader.readWorkflowFromClasspath("workflows-samples/wait-set.yaml")) - .instance(Map.of()); + doTestSuspendResumeWait(waitSetInstance(source)); + } + + private static Stream waitSetWorkflowSourcesForSuspendResumeWait() { + return Stream.of( + Arguments.of("dsl", WorkflowSource.DSL), + Arguments.of("yaml", WorkflowSource.YAML)); + } + + private void doTestSuspendResumeWait(WorkflowInstance instance) + throws InterruptedException, ExecutionException, TimeoutException { CompletableFuture future = instance.start(); assertThat(instance.status()).isEqualTo(WorkflowStatus.WAITING); instance.suspend(); @@ -160,12 +194,19 @@ void testSuspendResumeWait() assertThat(workflowSuspendedEvent.suspendedAt()).isBefore(workflowResumedEvent.resumedAt()); } - @Test - void testCancel() throws IOException, InterruptedException { - WorkflowInstance instance = - appl.workflowDefinition( - WorkflowReader.readWorkflowFromClasspath("workflows-samples/wait-set.yaml")) - .instance(Map.of()); + @ParameterizedTest(name = "{0}") + @MethodSource("waitSetWorkflowSourcesForCancel") + void testCancel(String sourceName, WorkflowSource source) throws IOException { + doTestCancel(waitSetInstance(source)); + } + + private static Stream waitSetWorkflowSourcesForCancel() { + return Stream.of( + Arguments.of("dsl", WorkflowSource.DSL), + Arguments.of("yaml", WorkflowSource.YAML)); + } + + private void doTestCancel(WorkflowInstance instance) { CompletableFuture future = instance.start(); instance.cancel(); assertThat(catchThrowableOfType(ExecutionException.class, () -> future.get().asMap())) @@ -175,13 +216,19 @@ void testCancel() throws IOException, InterruptedException { assertPojoInCE("io.serverlessworkflow.workflow.cancelled.v1", WorkflowCancelledCEData.class); } - @Test - void testSuspendResumeTimeout() - throws IOException, ExecutionException, InterruptedException, TimeoutException { - WorkflowInstance instance = - appl.workflowDefinition( - WorkflowReader.readWorkflowFromClasspath("workflows-samples/wait-set.yaml")) - .instance(Map.of()); + @ParameterizedTest(name = "{0}") + @MethodSource("waitSetWorkflowSourcesForSuspendResumeTimeout") + void testSuspendResumeTimeout(String sourceName, WorkflowSource source) throws IOException { + doTestSuspendResumeTimeout(waitSetInstance(source)); + } + + private static Stream waitSetWorkflowSourcesForSuspendResumeTimeout() { + return Stream.of( + Arguments.of("dsl", WorkflowSource.DSL), + Arguments.of("yaml", WorkflowSource.YAML)); + } + + private static void doTestSuspendResumeTimeout(WorkflowInstance instance) { CompletableFuture future = instance.start(); instance.suspend(); assertThat(catchThrowableOfType(TimeoutException.class, () -> future.get(1, TimeUnit.SECONDS))) @@ -220,4 +267,18 @@ private T assertPojoInCE(String type, Class clazz) { assertThat(pojo).isInstanceOf(clazz); return clazz.cast(pojo); } + + private static WorkflowInstance waitSetInstance(WorkflowSource source) throws IOException { + return switch (source) { + case DSL -> appl.workflowDefinition(waitTestWorkflow()).instance(Map.of()); + case YAML -> appl.workflowDefinition( + WorkflowReader.readWorkflowFromClasspath("workflows-samples/wait-set.yaml")) + .instance(Map.of()); + }; + } + + private enum WorkflowSource { + DSL, + YAML + } } diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/WaitTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/WaitTest.java deleted file mode 100644 index e0c9f4330..000000000 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/WaitTest.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.impl.test; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import io.serverlessworkflow.api.types.Workflow; -import io.serverlessworkflow.fluent.spec.WorkflowBuilder; -import io.serverlessworkflow.fluent.spec.dsl.DSL; -import io.serverlessworkflow.impl.WorkflowApplication; -import io.serverlessworkflow.impl.WorkflowDefinition; -import io.serverlessworkflow.impl.WorkflowModel; -import java.time.Duration; -import java.util.Map; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.parallel.Execution; -import org.junit.jupiter.api.parallel.ExecutionMode; - -@Execution(ExecutionMode.CONCURRENT) -public class WaitTest { - - @Test - void waitTaskBuilder_should_wait_5_seconds() { - Workflow workflow = - WorkflowBuilder.workflow("waitTaskBuilder", "wait") - .tasks( - t -> - t.wait( - "wait5Seconds", - w -> w.wait(timeout -> timeout.duration(d -> d.seconds(5)))) - .set("setFinished", s -> s.put("finished", true))) - .build(); - - assertThatExecuteAtLeast5Seconds(workflow); - } - - @Test - void durationExpressions_should_wait_5_seconds() { - Workflow workflow = WorkflowBuilder.workflow("durationExpr", "wait") - .tasks(DSL.wait("PT5S"), DSL.set("setFinished", s -> s.put("finished", true))) - .build(); - - assertThatExecuteAtLeast5Seconds(workflow); - } - - @Test - void namedDurationExpressions_should_wait_5_seconds() { - Workflow workflow = WorkflowBuilder.workflow("durationExpr", "wait") - .tasks(DSL.wait("named","PT5S"), DSL.set("setFinished", s -> s.put("finished", true))) - .build(); - - assertThatExecuteAtLeast5Seconds(workflow); - } - - @Test - void timeoutBuilder_should_wait_5_seconds() { - Workflow workflow = WorkflowBuilder.workflow("durationExpr", "wait") - .tasks(DSL.wait(timeoutBuilder -> timeoutBuilder.duration(durationInlineBuilder -> durationInlineBuilder.seconds(5))), DSL.set("setFinished", s -> s.put("finished", true))) - .build(); - - assertThatExecuteAtLeast5Seconds(workflow); - } - - private static void assertThatExecuteAtLeast5Seconds(Workflow workflow) { - try (WorkflowApplication app = WorkflowApplication.builder().build()) { - WorkflowDefinition definition = app.workflowDefinition(workflow); - long startNanos = System.nanoTime(); - WorkflowModel model = definition.instance(Map.of()).start().join(); - long elapsedMillis = Duration.ofNanos(System.nanoTime() - startNanos).toMillis(); - - assertEquals(true, model.asMap().orElseThrow().get("finished")); - assertTrue( - elapsedMillis >= 5_000, - "Workflow should wait at least 5 seconds, but waited " + elapsedMillis + " ms"); - } - } -} From b9c9b749101bf34416db34ef1c8bcea4b7e05613 Mon Sep 17 00:00:00 2001 From: Matheus Cruz Date: Thu, 9 Apr 2026 09:39:18 -0300 Subject: [PATCH 3/7] Add DSL test to ForkWaitTest class Signed-off-by: Matheus Cruz --- .../impl/test/ForkWaitTest.java | 73 +++++++++++++++---- .../impl/test/LifeCycleEventsTest.java | 41 +++++------ 2 files changed, 78 insertions(+), 36 deletions(-) diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/ForkWaitTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/ForkWaitTest.java index 8cf95fb8d..6962b90f5 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/ForkWaitTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/ForkWaitTest.java @@ -20,6 +20,8 @@ import static org.awaitility.Awaitility.await; import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.fluent.spec.WorkflowBuilder; +import io.serverlessworkflow.fluent.spec.dsl.DSL; import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowInstance; import io.serverlessworkflow.impl.WorkflowModel; @@ -30,38 +32,39 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; +import java.util.stream.Stream; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; class ForkWaitTest { private static WorkflowApplication appl; @BeforeAll - static void init() throws IOException { + static void init() { appl = WorkflowApplication.builder().build(); } @AfterAll - static void tearDown() throws IOException { + static void tearDown() { appl.close(); } - @Test - void testForkWait() throws IOException, InterruptedException, ExecutionException { + @ParameterizedTest(name = "{0}") + @MethodSource("forkWaitWorkflowSources") + void testForkWait(String sourceName, WorkflowSource source) throws IOException { assertModel( - appl.workflowDefinition(readWorkflowFromClasspath("workflows-samples/fork-wait.yaml")) - .instance(Map.of()) - .start() - .join()); + appl.workflowDefinition(forkWaitWorkflow(source)).instance(Map.of()).start().join()); } - @Test - void testForkWaitWithSuspend() throws IOException, InterruptedException { - Workflow workflow = readWorkflowFromClasspath("workflows-samples/fork-wait.yaml"); - WorkflowInstance instance = appl.workflowDefinition(workflow).instance(Map.of()); + @ParameterizedTest(name = "{0}") + @MethodSource("forkWaitWorkflowSources") + void testForkWaitWithSuspend(String sourceName, WorkflowSource source) throws IOException { + WorkflowInstance instance = + appl.workflowDefinition(forkWaitWorkflow(source)).instance(Map.of()); CompletableFuture future = instance.start(); await() .pollDelay(Duration.ofMillis(5)) @@ -75,6 +78,43 @@ void testForkWaitWithSuspend() throws IOException, InterruptedException { assertModel(model); } + private static Stream forkWaitWorkflowSources() { + return Stream.of( + Arguments.of("yaml", WorkflowSource.YAML), Arguments.of("dsl", WorkflowSource.DSL)); + } + + private static Workflow forkWaitWorkflow(WorkflowSource source) throws IOException { + if (source == WorkflowSource.DSL) { + return forkWaitWorkflow(); + } + return readWorkflowFromClasspath("workflows-samples/fork-wait.yaml"); + } + + private static Workflow forkWaitWorkflow() { + return WorkflowBuilder.workflow("fork-wait", "test", "0.1.0") + .tasks( + DSL.fork( + "incrParallel", + forkTaskBuilder -> + forkTaskBuilder + .compete(false) + .branches( + b -> + b.wait( + "waitABit", + waitTaskBuilder -> + waitTaskBuilder.wait(Duration.ofMillis(90))) + .set("helloBranch", s -> s.put("value", 1))) + .branches( + b -> + b.wait( + "waitABit", + waitTaskBuilder -> + waitTaskBuilder.wait(Duration.ofMillis(90))) + .set("byeBranch", s -> s.put("value", 2))))) + .build(); + } + private void assertModel(WorkflowModel current) { assertThat((Collection>) current.asJavaObject()) .containsExactlyInAnyOrderElementsOf( @@ -82,4 +122,9 @@ private void assertModel(WorkflowModel current) { Map.of("helloBranch", Map.of("value", 1)), Map.of("byeBranch", Map.of("value", 2)))); } + + private enum WorkflowSource { + YAML, + DSL + } } diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java index 92d9ae65c..2f28838cd 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java @@ -128,20 +128,7 @@ void testSuspendResumeNotWait(String sourceName, WorkflowSource source) private static Stream waitSetWorkflowSources() { return Stream.of( - Arguments.of("dsl", WorkflowSource.DSL), - Arguments.of("yaml", WorkflowSource.YAML)); - } - - private static Workflow waitTestWorkflow() { - return WorkflowBuilder.workflow("wait-test", "test", "0.1.0") - .tasks( - // wait 500 ms - DSL.wait( - "waitABit", - timeoutBuilder -> - timeoutBuilder.duration(durationBuilder -> durationBuilder.milliseconds(500))), - DSL.set("useExpression", setTaskBuilder -> setTaskBuilder.put("name", "Javierito"))) - .build(); + Arguments.of("dsl", WorkflowSource.DSL), Arguments.of("yaml", WorkflowSource.YAML)); } private void doTestSuspendResumeNotWait(WorkflowInstance instance) @@ -171,8 +158,7 @@ void testSuspendResumeWait(String sourceName, WorkflowSource source) private static Stream waitSetWorkflowSourcesForSuspendResumeWait() { return Stream.of( - Arguments.of("dsl", WorkflowSource.DSL), - Arguments.of("yaml", WorkflowSource.YAML)); + Arguments.of("dsl", WorkflowSource.DSL), Arguments.of("yaml", WorkflowSource.YAML)); } private void doTestSuspendResumeWait(WorkflowInstance instance) @@ -202,8 +188,7 @@ void testCancel(String sourceName, WorkflowSource source) throws IOException { private static Stream waitSetWorkflowSourcesForCancel() { return Stream.of( - Arguments.of("dsl", WorkflowSource.DSL), - Arguments.of("yaml", WorkflowSource.YAML)); + Arguments.of("dsl", WorkflowSource.DSL), Arguments.of("yaml", WorkflowSource.YAML)); } private void doTestCancel(WorkflowInstance instance) { @@ -224,8 +209,7 @@ void testSuspendResumeTimeout(String sourceName, WorkflowSource source) throws I private static Stream waitSetWorkflowSourcesForSuspendResumeTimeout() { return Stream.of( - Arguments.of("dsl", WorkflowSource.DSL), - Arguments.of("yaml", WorkflowSource.YAML)); + Arguments.of("dsl", WorkflowSource.DSL), Arguments.of("yaml", WorkflowSource.YAML)); } private static void doTestSuspendResumeTimeout(WorkflowInstance instance) { @@ -271,12 +255,25 @@ private T assertPojoInCE(String type, Class clazz) { private static WorkflowInstance waitSetInstance(WorkflowSource source) throws IOException { return switch (source) { case DSL -> appl.workflowDefinition(waitTestWorkflow()).instance(Map.of()); - case YAML -> appl.workflowDefinition( - WorkflowReader.readWorkflowFromClasspath("workflows-samples/wait-set.yaml")) + case YAML -> + appl.workflowDefinition( + WorkflowReader.readWorkflowFromClasspath("workflows-samples/wait-set.yaml")) .instance(Map.of()); }; } + private static Workflow waitTestWorkflow() { + return WorkflowBuilder.workflow("wait-test", "test", "0.1.0") + .tasks( + // wait 500 ms + DSL.wait( + "waitABit", + timeoutBuilder -> + timeoutBuilder.duration(durationBuilder -> durationBuilder.milliseconds(500))), + DSL.set("useExpression", setTaskBuilder -> setTaskBuilder.put("name", "Javierito"))) + .build(); + } + private enum WorkflowSource { DSL, YAML From 1c1d994ee6ffbaca32434d493e0e4ed0f4414d28 Mon Sep 17 00:00:00 2001 From: Matheus Cruz Date: Thu, 9 Apr 2026 13:54:05 -0300 Subject: [PATCH 4/7] Refactor test and to allow named branches Signed-off-by: Matheus Cruz --- .../fluent/spec/ForkTaskBuilder.java | 31 +++++++- .../fluent/spec/spi/ForkTaskFluent.java | 6 ++ .../spec/TaskItemDefaultNamingTest.java | 41 ++++++++++ .../impl/test/ForkWaitTest.java | 40 ++++------ .../impl/test/LifeCycleEventsTest.java | 77 ++++++++----------- 5 files changed, 124 insertions(+), 71 deletions(-) diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/ForkTaskBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/ForkTaskBuilder.java index d3d363e15..ed4c1bde2 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/ForkTaskBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/ForkTaskBuilder.java @@ -15,11 +15,15 @@ */ package io.serverlessworkflow.fluent.spec; +import io.serverlessworkflow.api.types.DoTask; import io.serverlessworkflow.api.types.ForkTask; import io.serverlessworkflow.api.types.ForkTaskConfiguration; +import io.serverlessworkflow.api.types.Task; import io.serverlessworkflow.api.types.TaskItem; import io.serverlessworkflow.fluent.spec.spi.ForkTaskFluent; +import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.function.Consumer; public class ForkTaskBuilder extends TaskBaseBuilder @@ -58,7 +62,7 @@ public ForkTaskBuilder branches(Consumer branchesConsumer) if (existingBranches == null || existingBranches.isEmpty()) { this.forkTaskConfiguration.setBranches(newBranches); } else { - List merged = new java.util.ArrayList<>(existingBranches); + List merged = new ArrayList<>(existingBranches); merged.addAll(newBranches); this.forkTaskConfiguration.setBranches(merged); } @@ -66,6 +70,31 @@ public ForkTaskBuilder branches(Consumer branchesConsumer) return this; } + @Override + public ForkTaskBuilder branch(String name, Consumer branchConsumer) { + Objects.requireNonNull(branchConsumer, "Branch consumer must not be null"); + + List existingBranches = this.forkTaskConfiguration.getBranches(); + int currentOffset = (existingBranches == null) ? 0 : existingBranches.size(); + String branchName = (name == null || name.isBlank()) ? "branch-" + currentOffset : name; + + TaskItemListBuilder branchItems = new TaskItemListBuilder(0); + branchConsumer.accept(branchItems); + + TaskItem branchItem = + new TaskItem(branchName, new Task().withDoTask(new DoTask().withDo(branchItems.build()))); + + if (existingBranches == null || existingBranches.isEmpty()) { + this.forkTaskConfiguration.setBranches(List.of(branchItem)); + } else { + List merged = new ArrayList<>(existingBranches); + merged.add(branchItem); + this.forkTaskConfiguration.setBranches(merged); + } + + return this; + } + @Override public ForkTask build() { return this.forkTask.withFork(this.forkTaskConfiguration); diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/ForkTaskFluent.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/ForkTaskFluent.java index 31c40c9b1..39ff142d7 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/ForkTaskFluent.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/ForkTaskFluent.java @@ -27,5 +27,11 @@ public interface ForkTaskFluent< SELF branches(Consumer branchesConsumer); + default SELF branch(Consumer branchConsumer) { + return branch(null, branchConsumer); + } + + SELF branch(String name, Consumer branchConsumer); + ForkTask build(); } diff --git a/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/TaskItemDefaultNamingTest.java b/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/TaskItemDefaultNamingTest.java index e22489e7b..c4ddfa9a9 100644 --- a/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/TaskItemDefaultNamingTest.java +++ b/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/TaskItemDefaultNamingTest.java @@ -450,4 +450,45 @@ null, http().GET().endpoint("http://test")))))) assertEquals("set-1", nestedTasks.get(1).getName(), "Second tasks() call picks up index 1"); assertEquals("http-2", nestedTasks.get(2).getName(), "Third tasks() call picks up index 2"); } + + @Test + void testNamedForkBranchesWrapDoTaskAndPreserveBranchName() { + Workflow wf = + WorkflowBuilder.workflow("flowNamedForkBranch") + .tasks( + d -> + d.fork( + null, + f -> + f.branch( + "helloBranch", + b -> + b.wait(null, w -> w.wait(Duration.ofMillis(10))) + .set(null, s -> s.expr("$.value = 1"))) + .branch( + "byeBranch", + b -> + b.wait(null, w -> w.wait(Duration.ofMillis(10))) + .set(null, s -> s.expr("$.value = 2"))))) + .build(); + + List topItems = wf.getDo(); + assertEquals(1, topItems.size(), "Should have one top-level fork task"); + + ForkTaskConfiguration forkConfig = topItems.get(0).getTask().getForkTask().getFork(); + assertNotNull(forkConfig, "Fork configuration must not be null"); + + List branches = forkConfig.getBranches(); + assertEquals(2, branches.size(), "Should have two named branches"); + assertEquals("helloBranch", branches.get(0).getName()); + assertEquals("byeBranch", branches.get(1).getName()); + + List helloDo = branches.get(0).getTask().getDoTask().getDo(); + List byeDo = branches.get(1).getTask().getDoTask().getDo(); + + assertEquals("wait-0", helloDo.get(0).getName()); + assertEquals("set-1", helloDo.get(1).getName()); + assertEquals("wait-0", byeDo.get(0).getName()); + assertEquals("set-1", byeDo.get(1).getName()); + } } diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/ForkWaitTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/ForkWaitTest.java index 6962b90f5..488d595c4 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/ForkWaitTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/ForkWaitTest.java @@ -23,6 +23,7 @@ import io.serverlessworkflow.fluent.spec.WorkflowBuilder; import io.serverlessworkflow.fluent.spec.dsl.DSL; import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinitionId; import io.serverlessworkflow.impl.WorkflowInstance; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowStatus; @@ -55,16 +56,14 @@ static void tearDown() { @ParameterizedTest(name = "{0}") @MethodSource("forkWaitWorkflowSources") - void testForkWait(String sourceName, WorkflowSource source) throws IOException { - assertModel( - appl.workflowDefinition(forkWaitWorkflow(source)).instance(Map.of()).start().join()); + void testForkWait(String sourceName, Workflow workflow) { + assertModel(appl.workflowDefinition(workflow).instance(Map.of()).start().join()); } @ParameterizedTest(name = "{0}") @MethodSource("forkWaitWorkflowSources") - void testForkWaitWithSuspend(String sourceName, WorkflowSource source) throws IOException { - WorkflowInstance instance = - appl.workflowDefinition(forkWaitWorkflow(source)).instance(Map.of()); + void testForkWaitWithSuspend(String sourceName, Workflow workflow) { + WorkflowInstance instance = appl.workflowDefinition(workflow).instance(Map.of()); CompletableFuture future = instance.start(); await() .pollDelay(Duration.ofMillis(5)) @@ -78,40 +77,36 @@ void testForkWaitWithSuspend(String sourceName, WorkflowSource source) throws IO assertModel(model); } - private static Stream forkWaitWorkflowSources() { + private static Stream forkWaitWorkflowSources() throws IOException { return Stream.of( - Arguments.of("yaml", WorkflowSource.YAML), Arguments.of("dsl", WorkflowSource.DSL)); - } - - private static Workflow forkWaitWorkflow(WorkflowSource source) throws IOException { - if (source == WorkflowSource.DSL) { - return forkWaitWorkflow(); - } - return readWorkflowFromClasspath("workflows-samples/fork-wait.yaml"); + readWorkflowFromClasspath("workflows-samples/fork-wait.yaml"), forkWaitWorkflow()) + .map(workflow -> Arguments.of(WorkflowDefinitionId.of(workflow).toString(":"), workflow)); } private static Workflow forkWaitWorkflow() { - return WorkflowBuilder.workflow("fork-wait", "test", "0.1.0") + return WorkflowBuilder.workflow("fork-wait-java-dsl", "test", "0.1.0") .tasks( DSL.fork( "incrParallel", forkTaskBuilder -> forkTaskBuilder .compete(false) - .branches( + .branch( + "helloBranch", b -> b.wait( "waitABit", waitTaskBuilder -> waitTaskBuilder.wait(Duration.ofMillis(90))) - .set("helloBranch", s -> s.put("value", 1))) - .branches( + .set("set", s -> s.put("value", 1))) + .branch( + "byeBranch", b -> b.wait( "waitABit", waitTaskBuilder -> waitTaskBuilder.wait(Duration.ofMillis(90))) - .set("byeBranch", s -> s.put("value", 2))))) + .set("set", s -> s.put("value", 2))))) .build(); } @@ -122,9 +117,4 @@ private void assertModel(WorkflowModel current) { Map.of("helloBranch", Map.of("value", 1)), Map.of("byeBranch", Map.of("value", 2)))); } - - private enum WorkflowSource { - YAML, - DSL - } } diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java index 2f28838cd..d54a42cc8 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java @@ -26,6 +26,8 @@ import io.serverlessworkflow.fluent.spec.WorkflowBuilder; import io.serverlessworkflow.fluent.spec.dsl.DSL; import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowDefinitionId; import io.serverlessworkflow.impl.WorkflowInstance; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowStatus; @@ -121,18 +123,22 @@ void simpleWorkflow() throws IOException { @ParameterizedTest(name = "{0}") @MethodSource("waitSetWorkflowSources") - void testSuspendResumeNotWait(String sourceName, WorkflowSource source) - throws IOException, ExecutionException, InterruptedException, TimeoutException { - doTestSuspendResumeNotWait(waitSetInstance(source)); + void testSuspendResumeNotWait(String sourceName, Workflow workflow) + throws ExecutionException, InterruptedException, TimeoutException { + doTestSuspendResumeNotWait(workflow); } - private static Stream waitSetWorkflowSources() { + private static Stream waitSetWorkflowSources() throws IOException { return Stream.of( - Arguments.of("dsl", WorkflowSource.DSL), Arguments.of("yaml", WorkflowSource.YAML)); + WorkflowReader.readWorkflowFromClasspath("workflows-samples/wait-set.yaml"), + waitTestWorkflow()) + .map(workflow -> Arguments.of(WorkflowDefinitionId.of(workflow).toString(":"), workflow)); } - private void doTestSuspendResumeNotWait(WorkflowInstance instance) + private void doTestSuspendResumeNotWait(Workflow workflow) throws InterruptedException, ExecutionException, TimeoutException { + WorkflowDefinition def = appl.workflowDefinition(workflow); + WorkflowInstance instance = def.instance(Map.of()); CompletableFuture future = instance.start(); instance.suspend(); assertThat(instance.status()).isEqualTo(WorkflowStatus.SUSPENDED); @@ -150,19 +156,16 @@ private void doTestSuspendResumeNotWait(WorkflowInstance instance) } @ParameterizedTest(name = "{0}") - @MethodSource("waitSetWorkflowSourcesForSuspendResumeWait") - void testSuspendResumeWait(String sourceName, WorkflowSource source) - throws IOException, ExecutionException, InterruptedException, TimeoutException { - doTestSuspendResumeWait(waitSetInstance(source)); - } - - private static Stream waitSetWorkflowSourcesForSuspendResumeWait() { - return Stream.of( - Arguments.of("dsl", WorkflowSource.DSL), Arguments.of("yaml", WorkflowSource.YAML)); + @MethodSource("waitSetWorkflowSources") + void testSuspendResumeWait(String sourceName, Workflow workflow) + throws ExecutionException, InterruptedException, TimeoutException { + doTestSuspendResumeWait(workflow); } - private void doTestSuspendResumeWait(WorkflowInstance instance) + private void doTestSuspendResumeWait(Workflow workflow) throws InterruptedException, ExecutionException, TimeoutException { + WorkflowDefinition def = appl.workflowDefinition(workflow); + WorkflowInstance instance = def.instance(Map.of()); CompletableFuture future = instance.start(); assertThat(instance.status()).isEqualTo(WorkflowStatus.WAITING); instance.suspend(); @@ -181,17 +184,14 @@ private void doTestSuspendResumeWait(WorkflowInstance instance) } @ParameterizedTest(name = "{0}") - @MethodSource("waitSetWorkflowSourcesForCancel") - void testCancel(String sourceName, WorkflowSource source) throws IOException { - doTestCancel(waitSetInstance(source)); - } - - private static Stream waitSetWorkflowSourcesForCancel() { - return Stream.of( - Arguments.of("dsl", WorkflowSource.DSL), Arguments.of("yaml", WorkflowSource.YAML)); + @MethodSource("waitSetWorkflowSources") + void testCancel(String sourceName, Workflow workflow) throws IOException { + doTestCancel(workflow); } - private void doTestCancel(WorkflowInstance instance) { + private void doTestCancel(Workflow workflow) { + WorkflowDefinition def = appl.workflowDefinition(workflow); + WorkflowInstance instance = def.instance(Map.of()); CompletableFuture future = instance.start(); instance.cancel(); assertThat(catchThrowableOfType(ExecutionException.class, () -> future.get().asMap())) @@ -202,17 +202,14 @@ private void doTestCancel(WorkflowInstance instance) { } @ParameterizedTest(name = "{0}") - @MethodSource("waitSetWorkflowSourcesForSuspendResumeTimeout") - void testSuspendResumeTimeout(String sourceName, WorkflowSource source) throws IOException { - doTestSuspendResumeTimeout(waitSetInstance(source)); - } - - private static Stream waitSetWorkflowSourcesForSuspendResumeTimeout() { - return Stream.of( - Arguments.of("dsl", WorkflowSource.DSL), Arguments.of("yaml", WorkflowSource.YAML)); + @MethodSource("waitSetWorkflowSources") + void testSuspendResumeTimeout(String sourceName, Workflow workflow) { + doTestSuspendResumeTimeout(workflow); } - private static void doTestSuspendResumeTimeout(WorkflowInstance instance) { + private static void doTestSuspendResumeTimeout(Workflow workflow) { + WorkflowDefinition def = appl.workflowDefinition(workflow); + WorkflowInstance instance = def.instance(Map.of()); CompletableFuture future = instance.start(); instance.suspend(); assertThat(catchThrowableOfType(TimeoutException.class, () -> future.get(1, TimeUnit.SECONDS))) @@ -252,18 +249,8 @@ private T assertPojoInCE(String type, Class clazz) { return clazz.cast(pojo); } - private static WorkflowInstance waitSetInstance(WorkflowSource source) throws IOException { - return switch (source) { - case DSL -> appl.workflowDefinition(waitTestWorkflow()).instance(Map.of()); - case YAML -> - appl.workflowDefinition( - WorkflowReader.readWorkflowFromClasspath("workflows-samples/wait-set.yaml")) - .instance(Map.of()); - }; - } - private static Workflow waitTestWorkflow() { - return WorkflowBuilder.workflow("wait-test", "test", "0.1.0") + return WorkflowBuilder.workflow("wait-test-java-dsl", "test", "0.1.0") .tasks( // wait 500 ms DSL.wait( From fe71e872ef15c3a028e29edd1b9c1f99790ef7ea Mon Sep 17 00:00:00 2001 From: Matheus Cruz Date: Thu, 9 Apr 2026 15:37:51 -0300 Subject: [PATCH 5/7] Update FuncForkTaskBuilder Signed-off-by: Matheus Cruz --- .../fluent/func/FuncForkTaskBuilder.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncForkTaskBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncForkTaskBuilder.java index c57edbae5..630ab20f8 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncForkTaskBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncForkTaskBuilder.java @@ -59,6 +59,19 @@ public FuncForkTaskBuilder branch( return branch(name, function, argParam, null); } + @Override + public FuncForkTaskBuilder branch(String name, Consumer branchConsumer) { + FuncTaskItemListBuilder branchItems = new FuncTaskItemListBuilder(0); + branchConsumer.accept(branchItems); + List builtBranchItems = branchItems.build(); + if (builtBranchItems.isEmpty()) { + return this; + } + String branchName = (name == null || name.isBlank()) ? "branch-" + this.items.size() : name; + this.items.add(new TaskItem(branchName, builtBranchItems.get(0).getTask())); + return this; + } + public FuncForkTaskBuilder branch( String name, Function function, Class argParam, Class returnClass) { if (name == null || name.isBlank()) { From fee59aabb5f11505222bb5fc4b4f8cf2692362f2 Mon Sep 17 00:00:00 2001 From: Matheus Cruz Date: Thu, 9 Apr 2026 16:27:21 -0300 Subject: [PATCH 6/7] FuncForkTaskBuilder#branch method Signed-off-by: Matheus Cruz --- .../fluent/func/FuncForkTaskBuilder.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncForkTaskBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncForkTaskBuilder.java index 630ab20f8..808a4cbea 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncForkTaskBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncForkTaskBuilder.java @@ -15,6 +15,7 @@ */ package io.serverlessworkflow.fluent.func; +import io.serverlessworkflow.api.types.DoTask; import io.serverlessworkflow.api.types.ForkTask; import io.serverlessworkflow.api.types.ForkTaskConfiguration; import io.serverlessworkflow.api.types.Task; @@ -61,14 +62,12 @@ public FuncForkTaskBuilder branch( @Override public FuncForkTaskBuilder branch(String name, Consumer branchConsumer) { - FuncTaskItemListBuilder branchItems = new FuncTaskItemListBuilder(0); - branchConsumer.accept(branchItems); - List builtBranchItems = branchItems.build(); - if (builtBranchItems.isEmpty()) { - return this; + if (name == null || name.isBlank()) { + name = "branch-" + this.items.size(); } - String branchName = (name == null || name.isBlank()) ? "branch-" + this.items.size() : name; - this.items.add(new TaskItem(branchName, builtBranchItems.get(0).getTask())); + final FuncTaskItemListBuilder branchItems = new FuncTaskItemListBuilder(this.items); + this.items.add( + new TaskItem(name, new Task().withDoTask(new DoTask().withDo(branchItems.build())))); return this; } From a5896b7309f464807c3ced2791dfd9a1b19dee51 Mon Sep 17 00:00:00 2001 From: Matheus Cruz Date: Thu, 9 Apr 2026 17:09:12 -0300 Subject: [PATCH 7/7] Remove dead code Signed-off-by: Matheus Cruz --- .../io/serverlessworkflow/impl/test/LifeCycleEventsTest.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java index d54a42cc8..4633e786a 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java @@ -260,9 +260,4 @@ private static Workflow waitTestWorkflow() { DSL.set("useExpression", setTaskBuilder -> setTaskBuilder.put("name", "Javierito"))) .build(); } - - private enum WorkflowSource { - DSL, - YAML - } }