Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2025 IEXEC BLOCKCHAIN TECH
* Copyright 2020-2026 IEXEC BLOCKCHAIN TECH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,7 +38,11 @@
LatestBlockMetric latestBlockMetric;
long dealEventsCount;
long dealsCount;
/**
* @deprecated obsolete after deal watching simplification
*/
@Deprecated(forRemoval = true)
long replayDealsCount;

Check warning on line 45 in iexec-task-api/src/main/java/com/iexec/core/metric/PlatformMetric.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Do not forget to remove this deprecated code someday.

See more on https://sonarcloud.io/project/issues?id=com.iexec.core%3Aiexec-core&issues=AZ3USfIsqA9SeRODpHDj&open=AZ3USfIsqA9SeRODpHDj&pullRequest=774
BigInteger latestBlockNumberWithDeal;

// region backward compatibility
Expand Down
37 changes: 0 additions & 37 deletions src/main/java/com/iexec/core/chain/DealEvent.java

This file was deleted.

198 changes: 69 additions & 129 deletions src/main/java/com/iexec/core/chain/DealWatcherService.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2025 IEXEC BLOCKCHAIN TECH
* Copyright 2020-2026 IEXEC BLOCKCHAIN TECH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,36 +17,32 @@
package com.iexec.core.chain;

import com.iexec.commons.poco.chain.ChainDeal;
import com.iexec.commons.poco.contract.generated.IexecHubContract;
import com.iexec.commons.poco.encoding.LogTopic;
import com.iexec.commons.poco.tee.TeeUtils;
import com.iexec.commons.poco.utils.BytesUtils;
import com.iexec.core.chain.event.ChainConnectedEvent;
import com.iexec.core.chain.event.ChainDisconnectedEvent;
import com.iexec.core.chain.event.DealEvent;
import com.iexec.core.configuration.ConfigurationService;
import com.iexec.core.task.Task;
import com.iexec.core.task.TaskService;
import com.iexec.core.task.event.TaskCreatedEvent;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import io.reactivex.disposables.Disposable;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.web3j.abi.EventEncoder;
import org.web3j.protocol.core.DefaultBlockParameter;
import org.web3j.protocol.core.DefaultBlockParameterName;
import org.web3j.protocol.core.methods.request.EthFilter;
import org.web3j.protocol.core.methods.response.EthLog;
import org.web3j.protocol.core.methods.response.Log;
import org.web3j.utils.Numeric;

import java.math.BigInteger;
import java.util.List;
import java.util.Optional;

import static com.iexec.commons.poco.contract.generated.IexecHubContract.SCHEDULERNOTICE_EVENT;
import static com.iexec.commons.poco.chain.Web3jAbstractService.toEthereumAddress;

@Slf4j
@Service
Expand All @@ -64,116 +60,68 @@
private final ApplicationEventPublisher applicationEventPublisher;
private final TaskService taskService;
private final Web3jService web3jService;
private final BigInteger maxLogsBatchSize;
// internal variables
private boolean outOfService = true;
private Disposable dealEventsSubscription;
private Disposable dealEventSubscriptionReplay;
@Getter
private BigInteger latestBlockNumberWithDeal = BigInteger.ZERO;
@Getter
private long dealEventsCount = 0;
@Getter
private long dealsCount = 0;
/**
* @deprecated obsolete after deal watching simplification
*/
@Deprecated(forRemoval = true)
@Getter
private long replayDealsCount = 0;

Check warning on line 76 in src/main/java/com/iexec/core/chain/DealWatcherService.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Do not forget to remove this deprecated code someday.

See more on https://sonarcloud.io/project/issues?id=com.iexec.core%3Aiexec-core&issues=AZ3USfANqA9SeRODpHDc&open=AZ3USfANqA9SeRODpHDc&pullRequest=774

private final Counter dealEventsCounter;
private final Counter dealsCounter;
/**
* @deprecated obsolete after deal watching simplification
*/
@Deprecated(forRemoval = true)
private final Counter replayDealsCounter;

Check warning on line 83 in src/main/java/com/iexec/core/chain/DealWatcherService.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Do not forget to remove this deprecated code someday.

See more on https://sonarcloud.io/project/issues?id=com.iexec.core%3Aiexec-core&issues=AZ3USfANqA9SeRODpHDd&open=AZ3USfANqA9SeRODpHDd&pullRequest=774
private final Counter latestBlockNumberWithDealCounter;

public DealWatcherService(ChainConfig chainConfig,
IexecHubService iexecHubService,
ConfigurationService configurationService,
ApplicationEventPublisher applicationEventPublisher,
TaskService taskService,
Web3jService web3jService) {
public DealWatcherService(final ChainConfig chainConfig,
final IexecHubService iexecHubService,
final ConfigurationService configurationService,
final ApplicationEventPublisher applicationEventPublisher,
final TaskService taskService,
final Web3jService web3jService,
@Value("${chain.max-logs-batch-size}") final BigInteger maxLogsBatchSize) {
this.chainConfig = chainConfig;
this.iexecHubService = iexecHubService;
this.configurationService = configurationService;
this.applicationEventPublisher = applicationEventPublisher;
this.taskService = taskService;
this.web3jService = web3jService;
this.maxLogsBatchSize = maxLogsBatchSize;

this.dealEventsCounter = Metrics.counter(METRIC_DEALS_EVENTS_COUNT);
this.dealsCounter = Metrics.counter(METRIC_DEALS_COUNT);
this.replayDealsCounter = Metrics.counter(METRIC_DEALS_REPLAY_COUNT);

Check warning on line 103 in src/main/java/com/iexec/core/chain/DealWatcherService.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this call to a deprecated field, it has been marked for removal.

See more on https://sonarcloud.io/project/issues?id=com.iexec.core%3Aiexec-core&issues=AZ3USfANqA9SeRODpHDe&open=AZ3USfANqA9SeRODpHDe&pullRequest=774
this.latestBlockNumberWithDealCounter = Metrics.counter(METRIC_DEALS_LAST_BLOCK);
}

/**
* This should be non-blocking to liberate
* the main thread, since deals can have
* a large number of tasks (BoT).
*/
@Async
@EventListener(ChainConnectedEvent.class)
public void run() {
outOfService = false;
disposeSubscription(dealEventsSubscription);
dealEventsSubscription = subscribeToDealEventFromOneBlockToLatest(configurationService.getLastSeenBlockWithDeal());
}

/**
* Dispose of deal watching subscription.
*/
@Async
@EventListener(ChainDisconnectedEvent.class)
public void stop() {
outOfService = true;
disposeSubscription(dealEventsSubscription);
}

/**
* Dispose of a {@link Disposable} subscription if it exists and is not already disposed.
*
* @param subscription Deal event subscription to dispose of.
*/
private void disposeSubscription(Disposable subscription) {
if (subscription != null && !subscription.isDisposed()) {
subscription.dispose();
}
}

/**
* Subscribe to onchain deal events from
* a given block to the latest block.
*
* @param from start block
* @return disposable subscription
*/
Disposable subscribeToDealEventFromOneBlockToLatest(BigInteger from) {
log.info("Watcher DealEvent started [from:{}, to:{}]", from, "latest");
EthFilter filter = createDealEventFilter(from, null);
return iexecHubService.getDealEventObservable(filter)
.map(this::schedulerNoticeToDealEvent)
.subscribe(dealEvent -> dealEvent.ifPresent(event -> onDealEvent(event, "start")));
}

/**
* Update last seen block in the database
* and run {@link DealEvent} handler.
*
* @param dealEvent
* @param dealEvent deal event to process
*/
private void onDealEvent(DealEvent dealEvent, String watcher) {
if ("replay".equals(watcher)) {
replayDealsCount++;
replayDealsCounter.increment();
} else {
dealsCount++;
dealsCounter.increment();
}
String dealId = dealEvent.getChainDealId();
BigInteger dealBlock = dealEvent.getBlockNumber();
log.info("Received deal [dealId:{}, block:{}, watcher: {}]",
dealId, dealBlock, watcher);
if (dealBlock.equals(BigInteger.ZERO)) {
log.warn("Deal block number is empty, fetching later blockchain " +
"events will be more expensive [chainDealId:{}, dealBlock:{}, " +
"lastBlock:{}]", dealId, dealBlock, web3jService.getLatestBlockNumber());
}
void onDealEvent(final DealEvent dealEvent) {
dealsCount++;
dealsCounter.increment();
final String dealId = dealEvent.getChainDealId();
final BigInteger dealBlock = dealEvent.getBlockNumber();
log.info("Received deal [dealId:{}, block:{}]", dealId, dealBlock);
this.handleDeal(dealEvent);
if (latestBlockNumberWithDeal.compareTo(dealBlock) < 0) {
double deltaBlocksNumber = dealBlock.subtract(latestBlockNumberWithDeal).doubleValue();
latestBlockNumberWithDeal = dealBlock;
latestBlockNumberWithDealCounter.increment(deltaBlocksNumber);
}
if (configurationService.getLastSeenBlockWithDeal().compareTo(dealBlock) < 0) {
configurationService.setLastSeenBlockWithDeal(dealBlock);
}
Expand All @@ -184,7 +132,7 @@
*
* @param dealEvent Object representing PoCo SchedulerNoticeEvent
*/
private void handleDeal(DealEvent dealEvent) {
private void handleDeal(final DealEvent dealEvent) {
String chainDealId = dealEvent.getChainDealId();
Optional<ChainDeal> oChainDeal = iexecHubService.getChainDealWithDetails(chainDealId);
if (oChainDeal.isEmpty()) {
Expand Down Expand Up @@ -238,7 +186,7 @@
// do not process deals with TEE tag but trust not in {0,1}.
final String tag = chainDeal.getTag();
final BigInteger trust = chainDeal.getTrust();
if (TeeUtils.isTeeTag(tag)
if (TeeUtils.getTeeFramework(tag) != null
&& !CORRECT_TEE_TRUSTS.contains(trust)) {
log.error("Deal with TEE tag and trust not zero nor one [chainDealId:{}, tag:{}, trust:{}]",
chainDealId, tag, trust);
Expand All @@ -265,65 +213,57 @@
*/
@Scheduled(fixedRateString = "#{@cronConfiguration.getDealReplay()}")
void replayDealEvent() {
disposeSubscription(dealEventSubscriptionReplay);
if (outOfService) {
log.info("OUT-OF-SERVICE do not create replay subscription");
return;
}
final BigInteger lastSeenBlockWithDeal = configurationService.getLastSeenBlockWithDeal();
final BigInteger replayFromBlock = configurationService.getFromReplay();
if (replayFromBlock.compareTo(lastSeenBlockWithDeal) >= 0) {
final BigInteger lastSeenBlock = BigInteger.valueOf(web3jService.getLatestBlockNumber());
final BigInteger from = configurationService.getFromReplay();
if (from.compareTo(lastSeenBlock) >= 0) {
return;
}
dealEventSubscriptionReplay = subscribeToDealEventInRange(replayFromBlock, lastSeenBlockWithDeal);
configurationService.setFromReplay(lastSeenBlockWithDeal);
final BigInteger to = from.add(maxLogsBatchSize).subtract(BigInteger.ONE).min(lastSeenBlock);
subscribeToDealEventInRange(from, to);
}

/**
* Subscribe to onchain deal events for
* a fixed range of blocks.
* Subscribe to on-chain deal events for a fixed range of blocks.
*
* @param from start block
* @param to end block
* @return disposable subscription
*/
private Disposable subscribeToDealEventInRange(BigInteger from, BigInteger to) {
log.info("Replay Watcher DealEvent started [from:{}, to:{}]",
from, (to == null) ? "latest" : to);
EthFilter filter = createDealEventFilter(from, to);
return iexecHubService.getDealEventObservable(filter)
.map(this::schedulerNoticeToDealEvent)
.subscribe(dealEvent -> dealEvent.ifPresent(event -> onDealEvent(event, "replay")));
private void subscribeToDealEventInRange(final BigInteger from, final BigInteger to) {
try {
log.info("Query SchedulerNoticeEvent on range [from:{}, to:{}]", from, to);
final EthFilter filter = createDealEventFilter(from, to);
web3jService.getWeb3j().ethGetLogs(filter).send()
.getLogs()
.forEach(this::schedulerNoticeToDealEvent);
configurationService.setFromReplay(to);
} catch (Exception e) {
log.warn("Communication failed", e);
}
}

EthFilter createDealEventFilter(BigInteger from, BigInteger to) {
EthFilter createDealEventFilter(final BigInteger from, final BigInteger to) {
final DefaultBlockParameter fromBlock = DefaultBlockParameter.valueOf(from);
final DefaultBlockParameter toBlock = to == null
? DefaultBlockParameterName.LATEST
: DefaultBlockParameter.valueOf(to);
final DefaultBlockParameter toBlock = DefaultBlockParameter.valueOf(to);
final EthFilter filter = new EthFilter(fromBlock, toBlock, chainConfig.getHubAddress());
final BigInteger poolAddressBigInt = Numeric.toBigInt(chainConfig.getPoolAddress());
filter.addSingleTopic(EventEncoder.encode(SCHEDULERNOTICE_EVENT));
filter.addSingleTopic(LogTopic.SCHEDULER_NOTICE_EVENT);
filter.addSingleTopic(Numeric.toHexStringWithPrefixZeroPadded(poolAddressBigInt, 64));
return filter;
}

Optional<DealEvent> schedulerNoticeToDealEvent(IexecHubContract.SchedulerNoticeEventResponse schedulerNotice) {
void schedulerNoticeToDealEvent(final EthLog.LogResult<Log> ethLog) {
dealEventsCount++;
dealEventsCounter.increment();
BigInteger noticeBlockNumber = schedulerNotice.log.getBlockNumber();
if (latestBlockNumberWithDeal.compareTo(noticeBlockNumber) < 0) {
double deltaBlocksNumber = noticeBlockNumber.subtract(latestBlockNumberWithDeal).doubleValue();
latestBlockNumberWithDeal = noticeBlockNumber;
latestBlockNumberWithDealCounter.increment(deltaBlocksNumber);
}
log.info("Received new deal [blockNumber:{}, chainDealId:{}, dealEventsCount:{}]",
schedulerNotice.log.getBlockNumber(), BytesUtils.bytesToString(schedulerNotice.dealid), dealEventsCount);
if (schedulerNotice.workerpool.equalsIgnoreCase(chainConfig.getPoolAddress())) {
return Optional.of(new DealEvent(schedulerNotice));
final BigInteger noticeBlockNumber = ethLog.get().getBlockNumber();
final String dealId = ethLog.get().getData();
final String eventPoolAddress = toEthereumAddress(ethLog.get().getTopics().get(1));
if (eventPoolAddress.equalsIgnoreCase(chainConfig.getPoolAddress())) {
log.info("SchedulerNoticeEvent received [blockNumber:{}, chainDealId:{}, dealEventsCount:{}]",
noticeBlockNumber, dealId, dealEventsCount);
onDealEvent(new DealEvent(this, dealId, noticeBlockNumber));
} else {
log.warn("SchedulerNoticeEvent should not have been received [blockNumber:{}, chainDealId:{}, dealEventsCount:{}]",
noticeBlockNumber, dealId, dealEventsCount);
}
log.warn("This deal event should not have been received [blockNumber:{}, chainDealId:{}, dealEventsCount:{}]",
schedulerNotice.log.getBlockNumber(), BytesUtils.bytesToString(schedulerNotice.dealid), dealEventsCount);
return Optional.empty();
}
}
8 changes: 1 addition & 7 deletions src/main/java/com/iexec/core/chain/IexecHubService.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2025 IEXEC BLOCKCHAIN TECH
* Copyright 2020-2026 IEXEC BLOCKCHAIN TECH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,9 +18,7 @@

import com.iexec.common.lifecycle.purge.Purgeable;
import com.iexec.commons.poco.chain.*;
import com.iexec.commons.poco.contract.generated.IexecHubContract;
import com.iexec.commons.poco.encoding.LogTopic;
import io.reactivex.Flowable;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
Expand Down Expand Up @@ -120,10 +118,6 @@ public Date getChainDealFinalDeadline(ChainDeal chainDeal) {
return new Date(startTime + maxTime * 10);
}

Flowable<IexecHubContract.SchedulerNoticeEventResponse> getDealEventObservable(EthFilter filter) {
return iexecHubContract.schedulerNoticeEventFlowable(filter);
}

public boolean hasEnoughGas() {
final boolean hasEnoughGas = hasEnoughGas(signerService.getAddress());
log.debug("Gas status [hasEnoughGas:{}]", hasEnoughGas);
Expand Down
Loading
Loading