diff --git a/controlplane/worker/create_checkpoint.go b/controlplane/worker/create_checkpoint.go index 1a259735..5a4a054d 100644 --- a/controlplane/worker/create_checkpoint.go +++ b/controlplane/worker/create_checkpoint.go @@ -31,6 +31,8 @@ import ( "github.com/spacechunks/explorer/controlplane/node" "github.com/spacechunks/explorer/internal/resource" "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) type CreateCheckpointWorkerConfig struct { @@ -100,7 +102,7 @@ func (w *CreateCheckpointWorker) Work(ctx context.Context, riverJob *river.Job[j return fmt.Errorf("validate args: %w", err) } - // TODO: check if checkpoint already exists in repo. + // TODO: check if checkpoint already exists in repo. -> should happen on platformd side // it could happen that things take too long // and the job times out, but in the background // platformd still executes the checkpointing @@ -138,7 +140,11 @@ func (w *CreateCheckpointWorker) Work(ctx context.Context, riverJob *river.Job[j CheckpointId: resp.CheckpointId, }) if err != nil { - // TODO: if error is not found return directly + if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound { + w.logger.WarnContext(ctx, "checkpoint not available anymore, retrying") + return err + } + w.logger.ErrorContext(ctx, "checkpoint status error", "err", err) continue } diff --git a/controlplane/worker/create_checkpoint_test.go b/controlplane/worker/create_checkpoint_test.go index ecfd9f68..34094f29 100644 --- a/controlplane/worker/create_checkpoint_test.go +++ b/controlplane/worker/create_checkpoint_test.go @@ -36,6 +36,8 @@ import ( "github.com/spacechunks/explorer/test" mocky "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) func TestCreateCheckpointWorker(t *testing.T) { @@ -141,3 +143,62 @@ func TestCreateCheckpointWorker(t *testing.T) { }) } } + +func TestCheckpointWorkerReturnsIfNotFound(t *testing.T) { + var ( + ctx = context.Background() + logger = slog.New(slog.NewTextHandler(os.Stdout, nil)) + mockNodeRepo = mock.NewMockNodeRepository(t) + mockChunkRepo = mock.NewMockChunkRepository(t) + mockClient = mock.NewMockV1alpha1CheckpointServiceClient(t) + newClient = func(_ string) (checkpointv1alpha1.CheckpointServiceClient, error) { + return mockClient, nil + } + baseImgURL = "some-url" + checkID = "checkpoint-id" + flavorVersionID = test.NewUUIDv7(t) + ) + + mockNodeRepo.EXPECT(). + RandomNode(mocky.Anything). + Return(node.Node{}, nil) // return value doesn't matter + + mockClient.EXPECT(). + CreateCheckpoint(mocky.Anything, &checkpointv1alpha1.CreateCheckpointRequest{ + BaseImageUrl: baseImgURL, + }). + Return(&checkpointv1alpha1.CreateCheckpointResponse{ + CheckpointId: checkID, + }, nil) + + mockClient.EXPECT(). + CheckpointStatus(mocky.Anything, &checkpointv1alpha1.CheckpointStatusRequest{ + CheckpointId: checkID, + }). + Return(nil, status.Errorf(codes.NotFound, "checkpoint not found")) + + w := worker.NewCheckpointWorker( + logger, + newClient, + mockNodeRepo, + mockChunkRepo, + worker.CreateCheckpointWorkerConfig{ + Timeout: 10 * time.Second, + StatusCheckInterval: 5 * time.Millisecond, + }, + ) + + riverJob := &river.Job[job.CreateCheckpoint]{ + JobRow: &rivertype.JobRow{ + Attempt: 0, + MaxAttempts: 5, + }, + Args: job.CreateCheckpoint{ + FlavorVersionID: flavorVersionID, + BaseImageURL: baseImgURL, + }, + } + + err := w.Work(ctx, riverJob) + require.ErrorIs(t, err, status.Error(codes.NotFound, "checkpoint not found")) +}