diff --git a/iexec-task-api/src/main/java/com/iexec/core/metric/PlatformMetric.java b/iexec-task-api/src/main/java/com/iexec/core/metric/PlatformMetric.java index 27876864c..750ff0161 100644 --- a/iexec-task-api/src/main/java/com/iexec/core/metric/PlatformMetric.java +++ b/iexec-task-api/src/main/java/com/iexec/core/metric/PlatformMetric.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2025 IEXEC BLOCKCHAIN TECH + * Copyright 2020-2026 IEXEC BLOCKCHAIN TECH * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,6 +38,10 @@ public class PlatformMetric { LatestBlockMetric latestBlockMetric; long dealEventsCount; long dealsCount; + /** + * @deprecated obsolete after deal watching simplification + */ + @Deprecated(forRemoval = true) long replayDealsCount; BigInteger latestBlockNumberWithDeal; diff --git a/src/main/java/com/iexec/core/chain/DealEvent.java b/src/main/java/com/iexec/core/chain/DealEvent.java deleted file mode 100644 index cf65843f7..000000000 --- a/src/main/java/com/iexec/core/chain/DealEvent.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright 2020-2023 IEXEC BLOCKCHAIN TECH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.iexec.core.chain; - -import com.iexec.commons.poco.contract.generated.IexecHubContract; -import com.iexec.commons.poco.utils.BytesUtils; -import lombok.Getter; - -import java.math.BigInteger; - -@Getter -public class DealEvent { - - private final String chainDealId; - private final BigInteger blockNumber; - - public DealEvent(IexecHubContract.SchedulerNoticeEventResponse schedulerNoticeEventResponse) { - this.chainDealId = BytesUtils.bytesToString(schedulerNoticeEventResponse.dealid); - this.blockNumber = schedulerNoticeEventResponse.log.getBlockNumber() != null ? - schedulerNoticeEventResponse.log.getBlockNumber() : BigInteger.ZERO; - } - -} diff --git a/src/main/java/com/iexec/core/chain/DealWatcherService.java b/src/main/java/com/iexec/core/chain/DealWatcherService.java index 277c4fd9a..6b33dbb3e 100644 --- a/src/main/java/com/iexec/core/chain/DealWatcherService.java +++ b/src/main/java/com/iexec/core/chain/DealWatcherService.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2025 IEXEC BLOCKCHAIN TECH + * Copyright 2020-2026 IEXEC BLOCKCHAIN TECH * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,36 +17,32 @@ package com.iexec.core.chain; import com.iexec.commons.poco.chain.ChainDeal; -import com.iexec.commons.poco.contract.generated.IexecHubContract; +import com.iexec.commons.poco.encoding.LogTopic; import com.iexec.commons.poco.tee.TeeUtils; -import com.iexec.commons.poco.utils.BytesUtils; -import com.iexec.core.chain.event.ChainConnectedEvent; -import com.iexec.core.chain.event.ChainDisconnectedEvent; +import com.iexec.core.chain.event.DealEvent; import com.iexec.core.configuration.ConfigurationService; import com.iexec.core.task.Task; import com.iexec.core.task.TaskService; import com.iexec.core.task.event.TaskCreatedEvent; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Metrics; -import io.reactivex.disposables.Disposable; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationEventPublisher; -import org.springframework.context.event.EventListener; -import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; -import org.web3j.abi.EventEncoder; import org.web3j.protocol.core.DefaultBlockParameter; -import org.web3j.protocol.core.DefaultBlockParameterName; import org.web3j.protocol.core.methods.request.EthFilter; +import org.web3j.protocol.core.methods.response.EthLog; +import org.web3j.protocol.core.methods.response.Log; import org.web3j.utils.Numeric; import java.math.BigInteger; import java.util.List; import java.util.Optional; -import static com.iexec.commons.poco.contract.generated.IexecHubContract.SCHEDULERNOTICE_EVENT; +import static com.iexec.commons.poco.chain.Web3jAbstractService.toEthereumAddress; @Slf4j @Service @@ -64,36 +60,43 @@ public class DealWatcherService { private final ApplicationEventPublisher applicationEventPublisher; private final TaskService taskService; private final Web3jService web3jService; + private final BigInteger maxLogsBatchSize; // internal variables - private boolean outOfService = true; - private Disposable dealEventsSubscription; - private Disposable dealEventSubscriptionReplay; @Getter private BigInteger latestBlockNumberWithDeal = BigInteger.ZERO; @Getter private long dealEventsCount = 0; @Getter private long dealsCount = 0; + /** + * @deprecated obsolete after deal watching simplification + */ + @Deprecated(forRemoval = true) @Getter private long replayDealsCount = 0; - private final Counter dealEventsCounter; private final Counter dealsCounter; + /** + * @deprecated obsolete after deal watching simplification + */ + @Deprecated(forRemoval = true) private final Counter replayDealsCounter; private final Counter latestBlockNumberWithDealCounter; - public DealWatcherService(ChainConfig chainConfig, - IexecHubService iexecHubService, - ConfigurationService configurationService, - ApplicationEventPublisher applicationEventPublisher, - TaskService taskService, - Web3jService web3jService) { + public DealWatcherService(final ChainConfig chainConfig, + final IexecHubService iexecHubService, + final ConfigurationService configurationService, + final ApplicationEventPublisher applicationEventPublisher, + final TaskService taskService, + final Web3jService web3jService, + @Value("${chain.max-logs-batch-size}") final BigInteger maxLogsBatchSize) { this.chainConfig = chainConfig; this.iexecHubService = iexecHubService; this.configurationService = configurationService; this.applicationEventPublisher = applicationEventPublisher; this.taskService = taskService; this.web3jService = web3jService; + this.maxLogsBatchSize = maxLogsBatchSize; this.dealEventsCounter = Metrics.counter(METRIC_DEALS_EVENTS_COUNT); this.dealsCounter = Metrics.counter(METRIC_DEALS_COUNT); @@ -101,79 +104,24 @@ public DealWatcherService(ChainConfig chainConfig, this.latestBlockNumberWithDealCounter = Metrics.counter(METRIC_DEALS_LAST_BLOCK); } - /** - * This should be non-blocking to liberate - * the main thread, since deals can have - * a large number of tasks (BoT). - */ - @Async - @EventListener(ChainConnectedEvent.class) - public void run() { - outOfService = false; - disposeSubscription(dealEventsSubscription); - dealEventsSubscription = subscribeToDealEventFromOneBlockToLatest(configurationService.getLastSeenBlockWithDeal()); - } - - /** - * Dispose of deal watching subscription. - */ - @Async - @EventListener(ChainDisconnectedEvent.class) - public void stop() { - outOfService = true; - disposeSubscription(dealEventsSubscription); - } - - /** - * Dispose of a {@link Disposable} subscription if it exists and is not already disposed. - * - * @param subscription Deal event subscription to dispose of. - */ - private void disposeSubscription(Disposable subscription) { - if (subscription != null && !subscription.isDisposed()) { - subscription.dispose(); - } - } - - /** - * Subscribe to onchain deal events from - * a given block to the latest block. - * - * @param from start block - * @return disposable subscription - */ - Disposable subscribeToDealEventFromOneBlockToLatest(BigInteger from) { - log.info("Watcher DealEvent started [from:{}, to:{}]", from, "latest"); - EthFilter filter = createDealEventFilter(from, null); - return iexecHubService.getDealEventObservable(filter) - .map(this::schedulerNoticeToDealEvent) - .subscribe(dealEvent -> dealEvent.ifPresent(event -> onDealEvent(event, "start"))); - } - /** * Update last seen block in the database * and run {@link DealEvent} handler. * - * @param dealEvent + * @param dealEvent deal event to process */ - private void onDealEvent(DealEvent dealEvent, String watcher) { - if ("replay".equals(watcher)) { - replayDealsCount++; - replayDealsCounter.increment(); - } else { - dealsCount++; - dealsCounter.increment(); - } - String dealId = dealEvent.getChainDealId(); - BigInteger dealBlock = dealEvent.getBlockNumber(); - log.info("Received deal [dealId:{}, block:{}, watcher: {}]", - dealId, dealBlock, watcher); - if (dealBlock.equals(BigInteger.ZERO)) { - log.warn("Deal block number is empty, fetching later blockchain " + - "events will be more expensive [chainDealId:{}, dealBlock:{}, " + - "lastBlock:{}]", dealId, dealBlock, web3jService.getLatestBlockNumber()); - } + void onDealEvent(final DealEvent dealEvent) { + dealsCount++; + dealsCounter.increment(); + final String dealId = dealEvent.getChainDealId(); + final BigInteger dealBlock = dealEvent.getBlockNumber(); + log.info("Received deal [dealId:{}, block:{}]", dealId, dealBlock); this.handleDeal(dealEvent); + if (latestBlockNumberWithDeal.compareTo(dealBlock) < 0) { + double deltaBlocksNumber = dealBlock.subtract(latestBlockNumberWithDeal).doubleValue(); + latestBlockNumberWithDeal = dealBlock; + latestBlockNumberWithDealCounter.increment(deltaBlocksNumber); + } if (configurationService.getLastSeenBlockWithDeal().compareTo(dealBlock) < 0) { configurationService.setLastSeenBlockWithDeal(dealBlock); } @@ -184,7 +132,7 @@ private void onDealEvent(DealEvent dealEvent, String watcher) { * * @param dealEvent Object representing PoCo SchedulerNoticeEvent */ - private void handleDeal(DealEvent dealEvent) { + private void handleDeal(final DealEvent dealEvent) { String chainDealId = dealEvent.getChainDealId(); Optional oChainDeal = iexecHubService.getChainDealWithDetails(chainDealId); if (oChainDeal.isEmpty()) { @@ -238,7 +186,7 @@ boolean shouldProcessDeal(ChainDeal chainDeal) { // do not process deals with TEE tag but trust not in {0,1}. final String tag = chainDeal.getTag(); final BigInteger trust = chainDeal.getTrust(); - if (TeeUtils.isTeeTag(tag) + if (TeeUtils.getTeeFramework(tag) != null && !CORRECT_TEE_TRUSTS.contains(trust)) { log.error("Deal with TEE tag and trust not zero nor one [chainDealId:{}, tag:{}, trust:{}]", chainDealId, tag, trust); @@ -265,65 +213,57 @@ private void publishTaskCreatedEvent(final Task task) { */ @Scheduled(fixedRateString = "#{@cronConfiguration.getDealReplay()}") void replayDealEvent() { - disposeSubscription(dealEventSubscriptionReplay); - if (outOfService) { - log.info("OUT-OF-SERVICE do not create replay subscription"); - return; - } - final BigInteger lastSeenBlockWithDeal = configurationService.getLastSeenBlockWithDeal(); - final BigInteger replayFromBlock = configurationService.getFromReplay(); - if (replayFromBlock.compareTo(lastSeenBlockWithDeal) >= 0) { + final BigInteger lastSeenBlock = BigInteger.valueOf(web3jService.getLatestBlockNumber()); + final BigInteger from = configurationService.getFromReplay(); + if (from.compareTo(lastSeenBlock) >= 0) { return; } - dealEventSubscriptionReplay = subscribeToDealEventInRange(replayFromBlock, lastSeenBlockWithDeal); - configurationService.setFromReplay(lastSeenBlockWithDeal); + final BigInteger to = from.add(maxLogsBatchSize).subtract(BigInteger.ONE).min(lastSeenBlock); + subscribeToDealEventInRange(from, to); } /** - * Subscribe to onchain deal events for - * a fixed range of blocks. + * Subscribe to on-chain deal events for a fixed range of blocks. * * @param from start block * @param to end block - * @return disposable subscription */ - private Disposable subscribeToDealEventInRange(BigInteger from, BigInteger to) { - log.info("Replay Watcher DealEvent started [from:{}, to:{}]", - from, (to == null) ? "latest" : to); - EthFilter filter = createDealEventFilter(from, to); - return iexecHubService.getDealEventObservable(filter) - .map(this::schedulerNoticeToDealEvent) - .subscribe(dealEvent -> dealEvent.ifPresent(event -> onDealEvent(event, "replay"))); + private void subscribeToDealEventInRange(final BigInteger from, final BigInteger to) { + try { + log.info("Query SchedulerNoticeEvent on range [from:{}, to:{}]", from, to); + final EthFilter filter = createDealEventFilter(from, to); + web3jService.getWeb3j().ethGetLogs(filter).send() + .getLogs() + .forEach(this::schedulerNoticeToDealEvent); + configurationService.setFromReplay(to); + } catch (Exception e) { + log.warn("Communication failed", e); + } } - EthFilter createDealEventFilter(BigInteger from, BigInteger to) { + EthFilter createDealEventFilter(final BigInteger from, final BigInteger to) { final DefaultBlockParameter fromBlock = DefaultBlockParameter.valueOf(from); - final DefaultBlockParameter toBlock = to == null - ? DefaultBlockParameterName.LATEST - : DefaultBlockParameter.valueOf(to); + final DefaultBlockParameter toBlock = DefaultBlockParameter.valueOf(to); final EthFilter filter = new EthFilter(fromBlock, toBlock, chainConfig.getHubAddress()); final BigInteger poolAddressBigInt = Numeric.toBigInt(chainConfig.getPoolAddress()); - filter.addSingleTopic(EventEncoder.encode(SCHEDULERNOTICE_EVENT)); + filter.addSingleTopic(LogTopic.SCHEDULER_NOTICE_EVENT); filter.addSingleTopic(Numeric.toHexStringWithPrefixZeroPadded(poolAddressBigInt, 64)); return filter; } - Optional schedulerNoticeToDealEvent(IexecHubContract.SchedulerNoticeEventResponse schedulerNotice) { + void schedulerNoticeToDealEvent(final EthLog.LogResult ethLog) { dealEventsCount++; dealEventsCounter.increment(); - BigInteger noticeBlockNumber = schedulerNotice.log.getBlockNumber(); - if (latestBlockNumberWithDeal.compareTo(noticeBlockNumber) < 0) { - double deltaBlocksNumber = noticeBlockNumber.subtract(latestBlockNumberWithDeal).doubleValue(); - latestBlockNumberWithDeal = noticeBlockNumber; - latestBlockNumberWithDealCounter.increment(deltaBlocksNumber); - } - log.info("Received new deal [blockNumber:{}, chainDealId:{}, dealEventsCount:{}]", - schedulerNotice.log.getBlockNumber(), BytesUtils.bytesToString(schedulerNotice.dealid), dealEventsCount); - if (schedulerNotice.workerpool.equalsIgnoreCase(chainConfig.getPoolAddress())) { - return Optional.of(new DealEvent(schedulerNotice)); + final BigInteger noticeBlockNumber = ethLog.get().getBlockNumber(); + final String dealId = ethLog.get().getData(); + final String eventPoolAddress = toEthereumAddress(ethLog.get().getTopics().get(1)); + if (eventPoolAddress.equalsIgnoreCase(chainConfig.getPoolAddress())) { + log.info("SchedulerNoticeEvent received [blockNumber:{}, chainDealId:{}, dealEventsCount:{}]", + noticeBlockNumber, dealId, dealEventsCount); + onDealEvent(new DealEvent(this, dealId, noticeBlockNumber)); + } else { + log.warn("SchedulerNoticeEvent should not have been received [blockNumber:{}, chainDealId:{}, dealEventsCount:{}]", + noticeBlockNumber, dealId, dealEventsCount); } - log.warn("This deal event should not have been received [blockNumber:{}, chainDealId:{}, dealEventsCount:{}]", - schedulerNotice.log.getBlockNumber(), BytesUtils.bytesToString(schedulerNotice.dealid), dealEventsCount); - return Optional.empty(); } } diff --git a/src/main/java/com/iexec/core/chain/IexecHubService.java b/src/main/java/com/iexec/core/chain/IexecHubService.java index a5ab75e84..e8c90d5b3 100644 --- a/src/main/java/com/iexec/core/chain/IexecHubService.java +++ b/src/main/java/com/iexec/core/chain/IexecHubService.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2025 IEXEC BLOCKCHAIN TECH + * Copyright 2020-2026 IEXEC BLOCKCHAIN TECH * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,9 +18,7 @@ import com.iexec.common.lifecycle.purge.Purgeable; import com.iexec.commons.poco.chain.*; -import com.iexec.commons.poco.contract.generated.IexecHubContract; import com.iexec.commons.poco.encoding.LogTopic; -import io.reactivex.Flowable; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -120,10 +118,6 @@ public Date getChainDealFinalDeadline(ChainDeal chainDeal) { return new Date(startTime + maxTime * 10); } - Flowable getDealEventObservable(EthFilter filter) { - return iexecHubContract.schedulerNoticeEventFlowable(filter); - } - public boolean hasEnoughGas() { final boolean hasEnoughGas = hasEnoughGas(signerService.getAddress()); log.debug("Gas status [hasEnoughGas:{}]", hasEnoughGas); diff --git a/src/main/java/com/iexec/core/chain/event/DealEvent.java b/src/main/java/com/iexec/core/chain/event/DealEvent.java new file mode 100644 index 000000000..31c4acc7d --- /dev/null +++ b/src/main/java/com/iexec/core/chain/event/DealEvent.java @@ -0,0 +1,38 @@ +/* + * Copyright 2020-2026 IEXEC BLOCKCHAIN TECH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.iexec.core.chain.event; + +import lombok.EqualsAndHashCode; +import lombok.Value; +import org.springframework.context.ApplicationEvent; + +import java.math.BigInteger; + +@Value +@EqualsAndHashCode(callSuper = true) +public class DealEvent extends ApplicationEvent { + + String chainDealId; + BigInteger blockNumber; + + public DealEvent(final Object source, final String chainDealId, final BigInteger blockNumber) { + super(source); + this.chainDealId = chainDealId; + this.blockNumber = blockNumber; + } + +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 53650536b..5c68a9b88 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -20,7 +20,7 @@ mongock: cron: # all in milliseconds metrics.refresh.period: 20000 #20s - deal.replay: 60000 # 1m + deal.replay: 10000 # 10s detector: worker-lost: 30000 # 30s chain: @@ -51,6 +51,7 @@ wallet: password: ${IEXEC_CORE_WALLET_PASSWORD:whatever} chain: + max-logs-batch-size: 1000 node-address: ${IEXEC_BLOCKCHAIN_NODE_ADDRESS:http://localhost:8545} out-of-service-threshold: PT15S pool-address: ${POOL_ADDRESS:0x365E7BABAa85eC61Dffe5b520763062e6C29dA27} diff --git a/src/test/java/com/iexec/core/chain/DealWatcherServiceTests.java b/src/test/java/com/iexec/core/chain/DealWatcherServiceTests.java index 81c673554..735a96464 100644 --- a/src/test/java/com/iexec/core/chain/DealWatcherServiceTests.java +++ b/src/test/java/com/iexec/core/chain/DealWatcherServiceTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2025 IEXEC BLOCKCHAIN TECH + * Copyright 2020-2026 IEXEC BLOCKCHAIN TECH * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,8 +20,8 @@ import com.iexec.commons.poco.chain.ChainCategory; import com.iexec.commons.poco.chain.ChainDeal; import com.iexec.commons.poco.chain.DealParams; -import com.iexec.commons.poco.contract.generated.IexecHubContract; -import com.iexec.commons.poco.utils.BytesUtils; +import com.iexec.commons.poco.encoding.LogTopic; +import com.iexec.core.chain.event.DealEvent; import com.iexec.core.configuration.ConfigurationService; import com.iexec.core.task.Task; import com.iexec.core.task.TaskService; @@ -29,8 +29,6 @@ import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; -import io.reactivex.Flowable; -import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -44,9 +42,14 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.context.ApplicationEventPublisher; import org.springframework.test.util.ReflectionTestUtils; -import org.web3j.protocol.core.methods.response.Log; +import org.web3j.protocol.Web3j; +import org.web3j.protocol.core.Request; +import org.web3j.protocol.core.methods.response.EthLog; +import org.web3j.utils.Numeric; +import java.io.IOException; import java.math.BigInteger; +import java.util.List; import java.util.Optional; import java.util.stream.Stream; @@ -59,7 +62,8 @@ @ExtendWith(MockitoExtension.class) class DealWatcherServiceTests { - private static final String OUT_OF_SERVICE_FIELD_NAME = "outOfService"; + private static final String DEAL_ID = "0x0"; + @Mock private ChainConfig chainConfig; @Mock @@ -74,19 +78,18 @@ class DealWatcherServiceTests { @Mock private TaskService taskService; + @Mock + private Web3jService web3jService; + + @Mock + private Web3j web3j; + + @Mock + private Request ethLogRequest; + @InjectMocks private DealWatcherService dealWatcherService; - private IexecHubContract.SchedulerNoticeEventResponse createSchedulerNotice(BigInteger noticeBlockNumber) { - IexecHubContract.SchedulerNoticeEventResponse schedulerNotice = new IexecHubContract.SchedulerNoticeEventResponse(); - schedulerNotice.workerpool = "0x1"; - schedulerNotice.dealid = "chainDealId".getBytes(); - Log schedulerNoticeLog = new Log(); - schedulerNoticeLog.setBlockNumber(noticeBlockNumber.toString()); - schedulerNotice.log = schedulerNoticeLog; - return schedulerNotice; - } - @BeforeAll static void initRegistry() { Metrics.globalRegistry.add(new SimpleMeterRegistry()); @@ -94,7 +97,7 @@ static void initRegistry() { void initMocks() { when(chainConfig.getHubAddress()).thenReturn("hubAddress"); - when(chainConfig.getPoolAddress()).thenReturn("0x1"); + when(chainConfig.getPoolAddress()).thenReturn("0xe0cdddbd7956622874d4eb796e8ec600ad2ff14f"); } @AfterEach @@ -109,56 +112,36 @@ void shouldReturnZeroForAllCountersWhereNothingHasAppended() { Counter dealsReplayCounter = Metrics.globalRegistry.find(DealWatcherService.METRIC_DEALS_REPLAY_COUNT).counter(); Counter lastBlockCounter = Metrics.globalRegistry.find(DealWatcherService.METRIC_DEALS_LAST_BLOCK).counter(); - Assertions.assertThat(dealsCounter).isNotNull(); - Assertions.assertThat(dealsEventsCounter).isNotNull(); - Assertions.assertThat(dealsReplayCounter).isNotNull(); - Assertions.assertThat(lastBlockCounter).isNotNull(); - - Assertions.assertThat(dealsCounter.count()).isZero(); - Assertions.assertThat(dealsEventsCounter.count()).isZero(); - Assertions.assertThat(dealsReplayCounter.count()).isZero(); - Assertions.assertThat(lastBlockCounter.count()).isZero(); - } + assertThat(dealsCounter).isNotNull(); + assertThat(dealsEventsCounter).isNotNull(); + assertThat(dealsReplayCounter).isNotNull(); + assertThat(lastBlockCounter).isNotNull(); - @Test - void shouldRunAndStop() { - BigInteger blockNumber = BigInteger.TEN; - initMocks(); - when(configurationService.getLastSeenBlockWithDeal()).thenReturn(blockNumber); - when(iexecHubService.getDealEventObservable(any())).thenReturn(Flowable.empty()); - dealWatcherService.run(); - verify(iexecHubService).getDealEventObservable(any()); - assertThat(ReflectionTestUtils.getField(dealWatcherService, DealWatcherService.class, OUT_OF_SERVICE_FIELD_NAME)) - .isEqualTo(false); - dealWatcherService.stop(); - assertThat(ReflectionTestUtils.getField(dealWatcherService, DealWatcherService.class, OUT_OF_SERVICE_FIELD_NAME)) - .isEqualTo(true); + assertThat(dealsCounter.count()).isZero(); + assertThat(dealsEventsCounter.count()).isZero(); + assertThat(dealsReplayCounter.count()).isZero(); + assertThat(lastBlockCounter.count()).isZero(); } - // region subscribeToDealEventFromOneBlockToLatest + // region onDealEvent @Test void shouldUpdateLastSeenBlockWhenOneDeal() { BigInteger from = BigInteger.valueOf(0); BigInteger blockOfDeal = BigInteger.valueOf(3); - IexecHubContract.SchedulerNoticeEventResponse schedulerNotice = createSchedulerNotice(blockOfDeal); - - initMocks(); when(configurationService.getLastSeenBlockWithDeal()).thenReturn(from); - when(iexecHubService.getDealEventObservable(any())).thenReturn(Flowable.just(schedulerNotice)); - dealWatcherService.subscribeToDealEventFromOneBlockToLatest(from); + dealWatcherService.onDealEvent(new DealEvent(this, DEAL_ID, blockOfDeal)); verify(configurationService).setLastSeenBlockWithDeal(blockOfDeal); Counter lastBlockCounter = Metrics.globalRegistry.find(DealWatcherService.METRIC_DEALS_LAST_BLOCK).counter(); Counter dealsCounter = Metrics.globalRegistry.find(DealWatcherService.METRIC_DEALS_COUNT).counter(); Counter dealsEventsCounter = Metrics.globalRegistry.find(DealWatcherService.METRIC_DEALS_EVENTS_COUNT).counter(); - Assertions.assertThat(lastBlockCounter).isNotNull(); - Assertions.assertThat(dealsCounter).isNotNull(); - Assertions.assertThat(dealsEventsCounter).isNotNull(); - Assertions.assertThat(lastBlockCounter.count()).isEqualTo(blockOfDeal.doubleValue()); - Assertions.assertThat(dealsCounter.count()).isEqualTo(1); - Assertions.assertThat(dealsEventsCounter.count()).isEqualTo(1); - + assertThat(lastBlockCounter).isNotNull(); + assertThat(dealsCounter).isNotNull(); + assertThat(dealsEventsCounter).isNotNull(); + assertThat(lastBlockCounter.count()).isEqualTo(blockOfDeal.doubleValue()); + assertThat(dealsCounter.count()).isOne(); + assertThat(dealsEventsCounter.count()).isZero(); } @Test @@ -179,13 +162,8 @@ void shouldUpdateLastSeenBlockWhenOneDealAndCreateTask() { BigInteger from = BigInteger.valueOf(0); BigInteger blockOfDeal = BigInteger.valueOf(3); - IexecHubContract.SchedulerNoticeEventResponse schedulerNotice = createSchedulerNotice(blockOfDeal); - Task task = new Task(); - - initMocks(); - when(iexecHubService.getDealEventObservable(any())).thenReturn(Flowable.just(schedulerNotice)); - when(iexecHubService.getChainDealWithDetails(BytesUtils.bytesToString(schedulerNotice.dealid))).thenReturn(Optional.of(chainDeal)); + when(iexecHubService.getChainDealWithDetails(DEAL_ID)).thenReturn(Optional.of(chainDeal)); when(iexecHubService.isBeforeContributionDeadline(chainDeal)).thenReturn(true); when(taskService.addTask(any(), anyInt(), anyLong(), any(), any(), anyInt(), anyLong(), any(), any(), any())) .thenReturn(Optional.of(task)); @@ -193,7 +171,7 @@ void shouldUpdateLastSeenBlockWhenOneDealAndCreateTask() { ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(TaskCreatedEvent.class); - dealWatcherService.subscribeToDealEventFromOneBlockToLatest(from); + dealWatcherService.onDealEvent(new DealEvent(this, DEAL_ID, blockOfDeal)); verify(configurationService).setLastSeenBlockWithDeal(blockOfDeal); verify(applicationEventPublisher).publishEvent(any(TaskCreatedEvent.class)); @@ -217,15 +195,11 @@ void shouldUpdateLastSeenBlockWhenOneDealAndNotCreateTaskSinceDealIsExpired() { BigInteger from = BigInteger.valueOf(0); BigInteger blockOfDeal = BigInteger.valueOf(3); - IexecHubContract.SchedulerNoticeEventResponse schedulerNotice = createSchedulerNotice(blockOfDeal); - - initMocks(); - when(iexecHubService.getDealEventObservable(any())).thenReturn(Flowable.just(schedulerNotice)); - when(iexecHubService.getChainDealWithDetails(BytesUtils.bytesToString(schedulerNotice.dealid))).thenReturn(Optional.of(chainDeal)); + when(iexecHubService.getChainDealWithDetails(DEAL_ID)).thenReturn(Optional.of(chainDeal)); when(iexecHubService.isBeforeContributionDeadline(chainDeal)).thenReturn(false); when(configurationService.getLastSeenBlockWithDeal()).thenReturn(from); - dealWatcherService.subscribeToDealEventFromOneBlockToLatest(from); + dealWatcherService.onDealEvent(new DealEvent(this, DEAL_ID, blockOfDeal)); verify(configurationService).setLastSeenBlockWithDeal(blockOfDeal); verifyNoInteractions(taskService, applicationEventPublisher); @@ -235,19 +209,14 @@ void shouldUpdateLastSeenBlockWhenOneDealAndNotCreateTaskSinceDealIsExpired() { void shouldUpdateLastSeenBlockWhenOneDealAndNotCreateTaskSinceBotSizeIsZero() { BigInteger from = BigInteger.valueOf(0); BigInteger blockOfDeal = BigInteger.valueOf(3); - IexecHubContract.SchedulerNoticeEventResponse schedulerNotice = createSchedulerNotice(blockOfDeal); - ChainDeal chainDeal = ChainDeal.builder() .botFirst(BigInteger.valueOf(0)) .botSize(BigInteger.valueOf(0)) .build(); - - initMocks(); - when(iexecHubService.getDealEventObservable(any())).thenReturn(Flowable.just(schedulerNotice)); - when(iexecHubService.getChainDealWithDetails(BytesUtils.bytesToString(schedulerNotice.dealid))).thenReturn(Optional.of(chainDeal)); + when(iexecHubService.getChainDealWithDetails(DEAL_ID)).thenReturn(Optional.of(chainDeal)); when(configurationService.getLastSeenBlockWithDeal()).thenReturn(from); - dealWatcherService.subscribeToDealEventFromOneBlockToLatest(from); + dealWatcherService.onDealEvent(new DealEvent(this, DEAL_ID, blockOfDeal)); verify(configurationService).setLastSeenBlockWithDeal(blockOfDeal); verifyNoInteractions(taskService, applicationEventPublisher); @@ -257,53 +226,27 @@ void shouldUpdateLastSeenBlockWhenOneDealAndNotCreateTaskSinceBotSizeIsZero() { void shouldUpdateLastSeenBlockWhenOneDealButNotCreateTaskSinceExceptionThrown() { BigInteger from = BigInteger.valueOf(0); BigInteger blockOfDeal = BigInteger.valueOf(3); - IexecHubContract.SchedulerNoticeEventResponse schedulerNotice = createSchedulerNotice(blockOfDeal); - ChainDeal chainDeal = ChainDeal.builder() .botFirst(BigInteger.valueOf(0)) .botSize(BigInteger.valueOf(1)) .build(); - initMocks(); - when(iexecHubService.getDealEventObservable(any())).thenReturn(Flowable.just(schedulerNotice)); - when(iexecHubService.getChainDealWithDetails(BytesUtils.bytesToString(schedulerNotice.dealid))).thenReturn(Optional.of(chainDeal)); + when(iexecHubService.getChainDealWithDetails(DEAL_ID)).thenReturn(Optional.of(chainDeal)); when(configurationService.getLastSeenBlockWithDeal()).thenReturn(from); - dealWatcherService.subscribeToDealEventFromOneBlockToLatest(from); + dealWatcherService.onDealEvent(new DealEvent(this, DEAL_ID, blockOfDeal)); verify(configurationService).setLastSeenBlockWithDeal(blockOfDeal); verifyNoInteractions(taskService, applicationEventPublisher); } - @Test - void shouldUpdateLastSeenBlockTwiceWhenTwoDeals() { - BigInteger from = BigInteger.valueOf(0); - BigInteger blockOfDeal1 = BigInteger.valueOf(3); - BigInteger blockOfDeal2 = BigInteger.valueOf(5); - IexecHubContract.SchedulerNoticeEventResponse schedulerNotice1 = createSchedulerNotice(blockOfDeal1); - IexecHubContract.SchedulerNoticeEventResponse schedulerNotice2 = createSchedulerNotice(blockOfDeal2); - - initMocks(); - when(configurationService.getLastSeenBlockWithDeal()).thenReturn(from); - when(iexecHubService.getDealEventObservable(any())).thenReturn(Flowable.just(schedulerNotice1, schedulerNotice2)); - - dealWatcherService.subscribeToDealEventFromOneBlockToLatest(from); - - verify(configurationService).setLastSeenBlockWithDeal(blockOfDeal1); - verify(configurationService).setLastSeenBlockWithDeal(blockOfDeal2); - } - @Test void shouldNotUpdateLastSeenBlockWhenReceivingOldMissedDeal() { BigInteger from = BigInteger.valueOf(5); BigInteger blockOfDeal = BigInteger.valueOf(3); - IexecHubContract.SchedulerNoticeEventResponse schedulerNotice = createSchedulerNotice(blockOfDeal); - - initMocks(); when(configurationService.getLastSeenBlockWithDeal()).thenReturn(from); - when(iexecHubService.getDealEventObservable(any())).thenReturn(Flowable.just(schedulerNotice)); - dealWatcherService.subscribeToDealEventFromOneBlockToLatest(from); + dealWatcherService.onDealEvent(new DealEvent(this, DEAL_ID, blockOfDeal)); verify(configurationService).getLastSeenBlockWithDeal(); verify(configurationService, never()).setLastSeenBlockWithDeal(blockOfDeal); @@ -312,41 +255,40 @@ void shouldNotUpdateLastSeenBlockWhenReceivingOldMissedDeal() { // region replayDealEvent @Test - void shouldReplayAllEventInRange() { - ReflectionTestUtils.setField(dealWatcherService, "outOfService", false); - BigInteger blockOfDeal = BigInteger.valueOf(3); - IexecHubContract.SchedulerNoticeEventResponse schedulerNotice = createSchedulerNotice(blockOfDeal); + void shouldReplayAllEventInRange() throws IOException { + ReflectionTestUtils.setField(dealWatcherService, "maxLogsBatchSize", BigInteger.valueOf(100)); + final BigInteger blockOfDeal = BigInteger.valueOf(3); + final EthLog.LogObject logObject = new EthLog.LogObject(); + logObject.setBlockNumber("0x3"); + logObject.setTopics(List.of(LogTopic.SCHEDULER_NOTICE_EVENT, Numeric.toHexStringNoPrefixZeroPadded(Numeric.toBigInt("0xe0cdddbd7956622874d4eb796e8ec600ad2ff14f"), 64))); + final EthLog ethLog = new EthLog(); + ethLog.setResult(List.of(logObject)); initMocks(); - when(configurationService.getLastSeenBlockWithDeal()).thenReturn(BigInteger.TEN); + when(web3jService.getLatestBlockNumber()).thenReturn(10L); when(configurationService.getFromReplay()).thenReturn(BigInteger.ZERO); - when(iexecHubService.getDealEventObservable(any())).thenReturn(Flowable.just(schedulerNotice)); + when(web3jService.getWeb3j()).thenReturn(web3j); + doReturn(ethLogRequest).when(web3j).ethGetLogs(any()); + when(ethLogRequest.send()).thenReturn(ethLog); dealWatcherService.replayDealEvent(); verify(iexecHubService).getChainDealWithDetails(any()); Counter lastBlockCounter = Metrics.globalRegistry.find(DealWatcherService.METRIC_DEALS_LAST_BLOCK).counter(); - Counter dealsReplayCounter = Metrics.globalRegistry.find(DealWatcherService.METRIC_DEALS_REPLAY_COUNT).counter(); - Assertions.assertThat(lastBlockCounter).isNotNull(); - Assertions.assertThat(dealsReplayCounter).isNotNull(); - Assertions.assertThat(lastBlockCounter.count()).isEqualTo(blockOfDeal.doubleValue()); - Assertions.assertThat(dealsReplayCounter.count()).isEqualTo(1); + Counter dealsCounter = Metrics.globalRegistry.find(DealWatcherService.METRIC_DEALS_COUNT).counter(); + assertThat(lastBlockCounter).isNotNull(); + assertThat(dealsCounter).isNotNull(); + assertThat(lastBlockCounter.count()).isEqualTo(blockOfDeal.doubleValue()); + assertThat(dealsCounter.count()).isEqualTo(1); } @Test void shouldNotReplayIfFromReplayEqualsLastSeenBlock() { - ReflectionTestUtils.setField(dealWatcherService, OUT_OF_SERVICE_FIELD_NAME, false); - when(configurationService.getLastSeenBlockWithDeal()).thenReturn(BigInteger.ZERO); + when(web3jService.getLatestBlockNumber()).thenReturn(0L); when(configurationService.getFromReplay()).thenReturn(BigInteger.ZERO); dealWatcherService.replayDealEvent(); verifyNoInteractions(iexecHubService); } - - @Test - void shouldNotReplayIfOutOfService() { - dealWatcherService.replayDealEvent(); - verifyNoInteractions(configurationService, iexecHubService); - } // endregion // region shouldProcessDeal