Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@
package io.serverlessworkflow.impl;

import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent;
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener;
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionCompletableListener;
import io.serverlessworkflow.impl.scheduler.Cancellable;
import io.serverlessworkflow.impl.scheduler.WorkflowScheduler;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

class SchedulerListener implements WorkflowExecutionListener, AutoCloseable {
class SchedulerListener implements WorkflowExecutionCompletableListener {

private final WorkflowScheduler scheduler;
private final Map<WorkflowDefinition, WorkflowValueResolver<Duration>> afterMap =
Expand All @@ -39,7 +40,7 @@ public void addAfter(WorkflowDefinition definition, WorkflowValueResolver<Durati
}

@Override
public void onWorkflowCompleted(WorkflowCompletedEvent ev) {
public CompletableFuture<?> onWorkflowCompleted(WorkflowCompletedEvent ev) {
WorkflowDefinition workflowDefinition = (WorkflowDefinition) ev.workflowContext().definition();
WorkflowValueResolver<Duration> after = afterMap.get(workflowDefinition);
if (after != null) {
Expand All @@ -49,6 +50,7 @@ public void onWorkflowCompleted(WorkflowCompletedEvent ev) {
workflowDefinition,
after.apply((WorkflowContext) ev.workflowContext(), null, ev.output())));
}
return CompletableFuture.completedFuture(null);
}

public void removeAfter(WorkflowDefinition definition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import io.serverlessworkflow.impl.executors.TaskExecutorFactory;
import io.serverlessworkflow.impl.expressions.ExpressionFactory;
import io.serverlessworkflow.impl.expressions.RuntimeDescriptor;
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionCompletableListener;
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener;
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListenerAdapter;
import io.serverlessworkflow.impl.resources.DefaultResourceLoaderFactory;
import io.serverlessworkflow.impl.resources.ExternalResourceHandler;
import io.serverlessworkflow.impl.resources.ResourceLoaderFactory;
Expand Down Expand Up @@ -67,7 +69,7 @@ public class WorkflowApplication implements AutoCloseable {
private final ResourceLoaderFactory resourceLoaderFactory;
private final SchemaValidatorFactory schemaValidatorFactory;
private final WorkflowInstanceIdFactory idFactory;
private final Collection<WorkflowExecutionListener> listeners;
private final Collection<WorkflowExecutionCompletableListener> listeners;
private final Map<WorkflowDefinitionId, WorkflowDefinition> definitions;
private final WorkflowPositionFactory positionFactory;
private final ExecutorServiceFactory executorFactory;
Expand Down Expand Up @@ -137,7 +139,7 @@ public ResourceLoaderFactory resourceLoaderFactory() {
return resourceLoaderFactory;
}

public Collection<WorkflowExecutionListener> listeners() {
public Collection<WorkflowExecutionCompletableListener> listeners() {
return listeners;
}

Expand Down Expand Up @@ -175,8 +177,10 @@ public SchemaValidator getValidator(SchemaInline inline) {
private String id;
private TaskExecutorFactory taskFactory;
private Collection<ExpressionFactory> exprFactories = new HashSet<>();
private List<WorkflowExecutionListener> listeners =
loadFromServiceLoader(WorkflowExecutionListener.class);
private List<WorkflowExecutionCompletableListener> listeners =
ServiceLoader.load(WorkflowExecutionListener.class).stream()
.map(v -> new WorkflowExecutionListenerAdapter(v.get()))
.collect(Collectors.toList());
private List<CallableTaskProxyBuilder> callableProxyBuilders =
loadFromServiceLoader(CallableTaskProxyBuilder.class);
private ResourceLoaderFactory resourceLoaderFactory = DefaultResourceLoaderFactory.get();
Expand Down Expand Up @@ -212,6 +216,11 @@ public Builder withId(String id) {
}

public Builder withListener(WorkflowExecutionListener listener) {
listeners.add(new WorkflowExecutionListenerAdapter(listener));
return this;
}

public Builder withListener(WorkflowExecutionCompletableListener listener) {
listeners.add(listener);
return this;
}
Expand Down Expand Up @@ -414,7 +423,7 @@ public void close() {
}
definitions.clear();

for (WorkflowExecutionListener listener : listeners) {
for (WorkflowExecutionCompletableListener listener : listeners) {
safeClose(listener);
}
listeners.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,30 +70,43 @@ public CompletableFuture<WorkflowModel> start() {
return startExecution(
() -> {
startedAt = Instant.now();
publishEvent(
return publishEvent(
workflowContext, l -> l.onWorkflowStarted(new WorkflowStartedEvent(workflowContext)));
});
}

protected final CompletableFuture<WorkflowModel> startExecution(Runnable runnable) {
protected final CompletableFuture<WorkflowModel> startExecution(
Supplier<CompletableFuture<?>> runnable) {
CompletableFuture<WorkflowModel> future = futureRef.get();
if (future != null) {
return future;
}
status(WorkflowStatus.RUNNING);
runnable.run();

future =
TaskExecutorHelper.processTaskList(
workflowContext.definition().startTask(),
workflowContext,
Optional.empty(),
workflowContext
.definition()
.inputFilter()
.map(f -> f.apply(workflowContext, null, input))
.orElse(input))
.whenComplete(this::whenCompleted)
.thenApply(this::whenSuccess);
runnable
.get()
.thenCompose(
v ->
TaskExecutorHelper.processTaskList(
workflowContext.definition().startTask(),
workflowContext,
Optional.empty(),
workflowContext
.definition()
.inputFilter()
.map(f -> f.apply(workflowContext, null, input))
.orElse(input))
.whenComplete(this::whenCompleted)
.thenApply(this::whenSuccess)
.thenCompose(
model ->
publishEvent(
workflowContext,
l ->
l.onWorkflowCompleted(
new WorkflowCompletedEvent(workflowContext, model)))
.thenApply(__ -> model)));
futureRef.set(future);
return future;
}
Expand Down Expand Up @@ -126,9 +139,6 @@ private WorkflowModel whenSuccess(WorkflowModel node) {
.orElse(node);
workflowContext.definition().outputSchemaValidator().ifPresent(v -> v.validate(output));
status(WorkflowStatus.COMPLETED);
publishEvent(
workflowContext,
l -> l.onWorkflowCompleted(new WorkflowCompletedEvent(workflowContext, output)));
return output;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,17 +213,24 @@ public CompletableFuture<TaskContext> apply(
completable =
completable
.thenCompose(workflowContext.instance()::suspendedCheck)
.thenCompose(
t -> {
CompletableFuture<?> events =
t.isRetrying()
? publishEvent(
workflowContext,
l ->
l.onTaskRetried(
new TaskRetriedEvent(workflowContext, taskContext)))
: publishEvent(
workflowContext,
l ->
l.onTaskStarted(
new TaskStartedEvent(workflowContext, taskContext)));
return events.thenApply(v -> t);
})
.thenApply(
t -> {
if (t.isRetrying()) {
publishEvent(
workflowContext,
l -> l.onTaskRetried(new TaskRetriedEvent(workflowContext, taskContext)));
} else {
publishEvent(
workflowContext,
l -> l.onTaskStarted(new TaskStartedEvent(workflowContext, taskContext)));
}
inputSchemaValidator.ifPresent(s -> s.validate(t.rawInput()));
inputProcessor.ifPresent(
p -> taskContext.input(p.apply(workflowContext, t, t.rawInput())));
Expand Down Expand Up @@ -251,13 +258,16 @@ public CompletableFuture<TaskContext> apply(
p.apply(workflowContext, t, workflowContext.context())));
contextSchemaValidator.ifPresent(s -> s.validate(workflowContext.context()));
t.completedAt(Instant.now());
publishEvent(
workflowContext,
l ->
l.onTaskCompleted(
new TaskCompletedEvent(workflowContext, taskContext)));
return t;
});
})
.thenCompose(
t ->
publishEvent(
workflowContext,
l ->
l.onTaskCompleted(
new TaskCompletedEvent(workflowContext, taskContext)))
.thenApply(__ -> t));
if (timeout.isPresent()) {
completable =
completable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,31 @@
package io.serverlessworkflow.impl.lifecycle;

import io.serverlessworkflow.impl.WorkflowContext;
import java.util.function.Consumer;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LifecycleEventsUtils {

private LifecycleEventsUtils() {}

private static final Logger logger = LoggerFactory.getLogger(LifecycleEventsUtils.class);

public static <T extends TaskEvent> void publishEvent(
WorkflowContext workflowContext, Consumer<WorkflowExecutionListener> consumer) {
workflowContext
.definition()
.application()
.listeners()
.forEach(
v -> {
try {
consumer.accept(v);
} catch (Exception ex) {
logger.error("Error processing listener. Ignoring and going on", ex);
}
});
private LifecycleEventsUtils() {}

public static CompletableFuture<?> publishEvent(
WorkflowContext workflowContext,
Function<WorkflowExecutionCompletableListener, CompletableFuture<?>> function) {
return CompletableFuture.allOf(
workflowContext.definition().application().listeners().stream()
.map(
v ->
function
.apply(v)
.exceptionally(
ex -> {
logger.error("Error while executing listener", ex);
return null;
}))
.toArray(CompletableFuture[]::new));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.lifecycle;

import io.serverlessworkflow.impl.ServicePriority;
import java.util.concurrent.CompletableFuture;

public interface WorkflowExecutionCompletableListener extends AutoCloseable, ServicePriority {

default CompletableFuture<?> onWorkflowStarted(WorkflowStartedEvent ev) {
return CompletableFuture.completedFuture(null);
}

default CompletableFuture<?> onWorkflowSuspended(WorkflowSuspendedEvent ev) {
return CompletableFuture.completedFuture(null);
}

default CompletableFuture<?> onWorkflowResumed(WorkflowResumedEvent ev) {
return CompletableFuture.completedFuture(null);
}

default CompletableFuture<?> onWorkflowCompleted(WorkflowCompletedEvent ev) {
return CompletableFuture.completedFuture(null);
}

default CompletableFuture<?> onWorkflowFailed(WorkflowFailedEvent ev) {
return CompletableFuture.completedFuture(null);
}

default CompletableFuture<?> onWorkflowCancelled(WorkflowCancelledEvent ev) {
return CompletableFuture.completedFuture(null);
}

default CompletableFuture<?> onTaskStarted(TaskStartedEvent ev) {
return CompletableFuture.completedFuture(null);
}

default CompletableFuture<?> onTaskCompleted(TaskCompletedEvent ev) {
return CompletableFuture.completedFuture(null);
}

default CompletableFuture<?> onTaskFailed(TaskFailedEvent ev) {
return CompletableFuture.completedFuture(null);
}

default CompletableFuture<?> onTaskCancelled(TaskCancelledEvent ev) {
return CompletableFuture.completedFuture(null);
}

default CompletableFuture<?> onTaskSuspended(TaskSuspendedEvent ev) {
return CompletableFuture.completedFuture(null);
}

default CompletableFuture<?> onTaskResumed(TaskResumedEvent ev) {
return CompletableFuture.completedFuture(null);
}

default CompletableFuture<?> onTaskRetried(TaskRetriedEvent ev) {
return CompletableFuture.completedFuture(null);
}

default CompletableFuture<?> onWorkflowStatusChanged(WorkflowStatusEvent ev) {
return CompletableFuture.completedFuture(null);
}

@Override
default void close() {}
}
Loading
Loading