Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,9 @@ public class CommonParameter {
@Setter
public double rateLimiterDisconnect; // clearParam: 1.0
@Getter
@Setter
public boolean rateLimiterApiNonBlocking = false;
@Getter
public RocksDbSettings rocksDBCustomSettings;
@Getter
public GenesisBlock genesisBlock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class EventConfig {
@Getter
@Setter
public static class NativeConfig {
private boolean useNativeQueue = true;
private boolean useNativeQueue = false;
private int bindport = 5555;
private int sendqueuelength = 1000;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class RateLimiterConfig {
private P2pRateLimitConfig p2p = new P2pRateLimitConfig();
private List<HttpRateLimitItem> http = new ArrayList<>();
private List<RpcRateLimitItem> rpc = new ArrayList<>();
private boolean apiNonBlocking = false;

@Getter
@Setter
Expand Down
3 changes: 2 additions & 1 deletion common/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ rate.limiter = {
global.qps = 50000
global.ip.qps = 10000
global.api.qps = 1000
apiNonBlocking = false
}

seed.node = {
Expand Down Expand Up @@ -758,7 +759,7 @@ event.subscribe = {
enable = false

native = {
useNativeQueue = true
useNativeQueue = false
bindport = 5555
sendqueuelength = 1000
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.tron.core.config.args;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import com.typesafe.config.Config;
Expand Down Expand Up @@ -29,6 +30,7 @@ public void testDefaults() {
assertEquals(1.0, rl.getP2p().getDisconnect(), 0.001);
assertTrue(rl.getHttp().isEmpty());
assertTrue(rl.getRpc().isEmpty());
assertFalse(rl.isApiNonBlocking());
}

@Test
Expand All @@ -40,7 +42,8 @@ public void testFromConfig() {
+ " http = [{ component = TestServlet, strategy = QpsRateLimiterAdapter,"
+ " paramString = \"qps=10\" }],"
+ " rpc = [{ component = TestRpc, strategy = GlobalPreemptibleAdapter,"
+ " paramString = \"permit=1\" }]"
+ " paramString = \"permit=1\" }],"
+ " apiNonBlocking = true"
+ "}");
RateLimiterConfig rl = RateLimiterConfig.fromConfig(config);
assertEquals(100, rl.getGlobal().getQps());
Expand All @@ -50,5 +53,6 @@ public void testFromConfig() {
assertEquals("TestServlet", rl.getHttp().get(0).getComponent());
assertEquals(1, rl.getRpc().size());
assertEquals("TestRpc", rl.getRpc().get(0).getComponent());
assertTrue(rl.isApiNonBlocking());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.beust.jcommander.internal.Sets;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import java.io.File;
import java.util.HashSet;
import java.util.List;
Expand All @@ -14,8 +15,10 @@
import org.bouncycastle.util.encoders.Hex;
import org.pf4j.CompoundPluginDescriptorFinder;
import org.pf4j.DefaultPluginManager;
import org.pf4j.DefaultVersionManager;
import org.pf4j.ManifestPluginDescriptorFinder;
import org.pf4j.PluginManager;
import org.pf4j.VersionManager;
import org.springframework.util.StringUtils;
import org.tron.common.logsfilter.nativequeue.NativeMessageQueue;
import org.tron.common.logsfilter.trigger.BlockLogTrigger;
Expand All @@ -29,6 +32,16 @@
@Slf4j
public class EventPluginLoader {

/**
* Minimum event-plugin Plugin-Version compatible with this node. Bumped to 3.0.0 to
* reject pre-fastjson-removal builds whose worker threads would fail with
* NoClassDefFoundError on com.alibaba.fastjson at runtime. The previous event-plugin
* release is 2.2.0, so 3.0.0 is the first version that ships the Jackson replacement.
*/
static final String MIN_PLUGIN_VERSION = "3.0.0";

private static final VersionManager VERSION_MANAGER = new DefaultVersionManager();

private static EventPluginLoader instance;

private long MAX_PENDING_SIZE = 50000;
Expand Down Expand Up @@ -457,6 +470,10 @@ protected CompoundPluginDescriptorFinder createPluginDescriptorFinder() {
return false;
}

if (!isPluginVersionSupported(pluginManager, pluginId)) {
return false;
}

pluginManager.startPlugins();

eventListeners = pluginManager.getExtensions(IPluginEventListener.class);
Expand All @@ -471,6 +488,21 @@ protected CompoundPluginDescriptorFinder createPluginDescriptorFinder() {
return true;
}

static boolean isPluginVersionSupported(PluginManager pm, String pluginId) {
String pluginVersion = pm.getPlugin(pluginId).getDescriptor().getVersion();
if (Strings.isNullOrEmpty(pluginVersion)) {
return false;
}
boolean isSupported = VERSION_MANAGER.compareVersions(pluginVersion, MIN_PLUGIN_VERSION) >= 0;

if (!isSupported) {
logger.error(
"event-plugin '{}' version {} is older than required {}, please upgrade event-plugin",
pluginId, pluginVersion, MIN_PLUGIN_VERSION);
}
return isSupported;
}

public void stopPlugin() {
if (Objects.nonNull(pluginManager)) {
pluginManager.stopPlugins();
Expand Down
11 changes: 11 additions & 0 deletions framework/src/main/java/org/tron/core/Wallet.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.tron.common.utils.Commons.getAssetIssueStoreFinal;
import static org.tron.common.utils.Commons.getExchangeStoreFinal;
import static org.tron.common.utils.WalletUtil.isConstant;
import static org.tron.core.Constant.PER_SIGN_LENGTH;
import static org.tron.core.capsule.utils.TransactionUtil.buildInternalTransaction;
import static org.tron.core.config.Parameter.ChainConstant.BLOCK_PRODUCED_INTERVAL;
import static org.tron.core.config.Parameter.ChainConstant.TRX_PRECISION;
Expand Down Expand Up @@ -505,6 +506,16 @@ public GrpcAPI.Return broadcastTransaction(Transaction signedTransaction) {
trx.setTime(System.currentTimeMillis());
Sha256Hash txID = trx.getTransactionId();
try {
for (ByteString sig : signedTransaction.getSignatureList()) {
if (sig.size() != PER_SIGN_LENGTH) {
String info = "Signature size is " + sig.size();
logger.warn("Broadcast transaction {} has failed, {}.", txID, info);
return builder.setResult(false).setCode(response_code.SIGERROR)
.setMessage(ByteString.copyFromUtf8("Validate signature error: " + info))
.build();
}
}

if (tronNetDelegate.isBlockUnsolidified()) {
logger.warn("Broadcast transaction {} has failed, block unsolidified.", txID);
return builder.setResult(false).setCode(response_code.BLOCK_UNSOLIDIFIED)
Expand Down
41 changes: 24 additions & 17 deletions framework/src/main/java/org/tron/core/config/args/Args.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,16 +167,19 @@ public static void setParam(final String[] args, final String confFileName) {
? cmd.shellConfFileName : confFileName;
Config config = Configuration.getByFileName(configFilePath);

// 2. Config overrides defaults
// 2. Config overrides defaults (event config bean is read here but not yet applied)
applyConfigParams(config);

// 3. CLI overrides Config (highest priority)
// 3. CLI overrides Config (highest priority, including --es → eventSubscribe)
applyCLIParams(cmd, jc);

// 4. Apply platform constraints (e.g. ARM64 forces RocksDB)
// 4. Apply event config after CLI
applyEventConfig(eventConfig);

// 5. Apply platform constraints (e.g. ARM64 forces RocksDB)
applyPlatformConstraints();

// 5. Init witness (depends on CLI witness flag)
// 6. Init witness (depends on CLI witness flag)
initLocalWitnesses(config, cmd);
}

Expand Down Expand Up @@ -217,7 +220,7 @@ private static void applyStorageConfig(StorageConfig sc) {
PARAMETER.storage.setIndexSwitch(
org.apache.commons.lang3.StringUtils.isNotEmpty(indexSwitch) ? indexSwitch : "on");
PARAMETER.storage.setTransactionHistorySwitch(sc.getTransHistory().getSwitch());
// contractParse is set in applyEventConfig — it belongs to event.subscribe domain
// contractParse is set in applyConfigParams alongside event config, not here
PARAMETER.storage.setCheckpointVersion(sc.getCheckpoint().getVersion());
PARAMETER.storage.setCheckpointSync(sc.getCheckpoint().isSync());

Expand Down Expand Up @@ -325,6 +328,7 @@ private static void applyRateLimiterConfig(RateLimiterConfig rl) {
PARAMETER.rateLimiterSyncBlockChain = rl.getP2p().getSyncBlockChain();
PARAMETER.rateLimiterFetchInvData = rl.getP2p().getFetchInvData();
PARAMETER.rateLimiterDisconnect = rl.getP2p().getDisconnect();
PARAMETER.rateLimiterApiNonBlocking = rl.isApiNonBlocking();

// HTTP/RPC rate limiter items: convert bean lists to business objects
RateLimiterInitialization initialization = new RateLimiterInitialization();
Expand All @@ -343,21 +347,21 @@ private static void applyRateLimiterConfig(RateLimiterConfig rl) {
PARAMETER.rateLimiterInitialization = initialization;
}

/**
* Package-private entry point only for tests
*/
static void applyEventConfig() {
applyEventConfig(eventConfig);
}

/**
* Bridge EventConfig bean values to CommonParameter fields.
* Converts EventConfig (raw bean) into EventPluginConfig and FilterQuery (business objects).
*/
private static void applyEventConfig(EventConfig ec) {
PARAMETER.eventSubscribe = ec.isEnable();
// contractParse belongs to event.subscribe but Storage object holds it
PARAMETER.storage.setContractParseSwitch(ec.isContractParse());

// PARAMETER.eventPluginConfig and PARAMETER.eventFilter are only consumed by
// Manager.startEventSubscribing(), which itself is gated by isEventSubscribe()
// (= ec.isEnable()) at Manager.java:564. When subscribe is disabled, building
// these objects has no observable effect — skip both early so PARAMETER stays
// consistent with the runtime intent.
if (!ec.isEnable()) {
// cmd parameter has higher priority
PARAMETER.eventSubscribe = PARAMETER.eventSubscribe || ec.isEnable();
if (!PARAMETER.eventSubscribe) {
return;
}

Expand Down Expand Up @@ -770,9 +774,12 @@ public static void applyConfigParams(

// node.shutdown — handled in applyNodeConfig

// Event config: bind from config.conf "event.subscribe" section
// Event config: read bean here; applyEventConfig() is called once in setParam()
// after applyCLIParams() so that --es is already reflected in eventSubscribe.
eventConfig = EventConfig.fromConfig(config);
applyEventConfig(eventConfig);
// contractParse is event-domain but must be set from config before CLI can
// override it with --contract-parse-enable (which runs in applyCLIParams).
PARAMETER.storage.setContractParseSwitch(eventConfig.isContractParse());

logConfig();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.tron.core.net.messagehandler;

import static org.tron.core.Constant.PER_SIGN_LENGTH;

import com.google.protobuf.ByteString;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -142,6 +145,12 @@ private void check(PeerConnection peer, TransactionsMessage msg) throws P2pExcep
throw new P2pException(TypeEnum.BAD_TRX,
"tx " + item.getHash() + " contract size should be greater than 0");
}
for (ByteString sig : trx.getSignatureList()) {
if (sig.size() != PER_SIGN_LENGTH) {
throw new P2pException(TypeEnum.BAD_TRX,
"tx " + item.getHash() + " signature size is " + sig.size());
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ public class PeerConnection {

public void setChannel(Channel channel) {
this.channel = channel;
if (relayNodes.stream().anyMatch(n -> n.getAddress().equals(channel.getInetAddress()))) {
if (relayNodes != null
&& relayNodes.stream().anyMatch(n -> n.getAddress().equals(channel.getInetAddress()))) {
this.isRelayPeer = true;
}
this.nodeStatistics = TronStatsManager.getNodeStatistics(channel.getInetAddress());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.tron.core.net.service.relay;

import static org.tron.core.Constant.PER_SIGN_LENGTH;

import com.google.protobuf.ByteString;
import java.net.InetSocketAddress;
import java.util.Arrays;
Expand Down Expand Up @@ -150,6 +152,12 @@ public boolean checkHelloMessage(HelloMessage message, Channel channel) {
return false;
}

if (msg.getSignature().size() != PER_SIGN_LENGTH) {
logger.warn("HelloMessage from {}, signature size is {}.",
channel.getInetAddress(), msg.getSignature().size());
return false;
}

boolean flag;
try {
Sha256Hash hash = Sha256Hash.of(CommonParameter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,10 @@ protected void service(HttpServletRequest req, HttpServletResponse resp)
IRateLimiter rateLimiter = container.get(KEY_PREFIX_HTTP, getClass().getSimpleName());

// Check per-endpoint first to avoid consuming global IP/QPS quota for requests
// that would be rejected by the per-endpoint limiter anyway.
boolean perEndpointAcquired = rateLimiter == null || rateLimiter.tryAcquire(runtimeData);
boolean acquireResource = perEndpointAcquired && GlobalRateLimiter.tryAcquire(runtimeData);
// that would be rejected by the per-endpoint limiter anyway. acquirePermit()
// chooses blocking or non-blocking semantics based on rate.limiter.apiNonBlocking.
boolean perEndpointAcquired = rateLimiter == null || rateLimiter.acquirePermit(runtimeData);
boolean acquireResource = perEndpointAcquired && GlobalRateLimiter.acquirePermit(runtimeData);

String contextPath = req.getContextPath();
String url = Strings.isNullOrEmpty(req.getServletPath())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,43 @@ public class GlobalRateLimiter {
public static boolean tryAcquire(RuntimeData runtimeData) {
String ip = runtimeData.getRemoteAddr();
if (!Strings.isNullOrEmpty(ip)) {
RateLimiter r;
try {
// cache.get is atomic: only one loader executes per key under concurrent requests,
// preventing multiple RateLimiter instances from being created for the same IP.
r = cache.get(ip, () -> RateLimiter.create(IP_QPS));
} catch (Exception e) {
logger.warn("Failed to load IP rate limiter for {}, denying request: {}",
ip, e.getMessage());
RateLimiter r = loadIpLimiter(ip);
if (r == null || !r.tryAcquire()) {
return false;
}
if (!r.tryAcquire()) {
}
return rateLimiter.tryAcquire();
}

public static boolean acquire(RuntimeData runtimeData) {
String ip = runtimeData.getRemoteAddr();
if (!Strings.isNullOrEmpty(ip)) {
RateLimiter r = loadIpLimiter(ip);
if (r == null) {
return false;
}
r.acquire();
}
rateLimiter.acquire();
return true;
}

public static boolean acquirePermit(RuntimeData runtimeData) {
return Args.getInstance().isRateLimiterApiNonBlocking()
? tryAcquire(runtimeData)
: acquire(runtimeData);
}

private static RateLimiter loadIpLimiter(String ip) {
try {
// cache.get is atomic: only one loader executes per key under concurrent requests,
// preventing multiple RateLimiter instances from being created for the same IP.
return cache.get(ip, () -> RateLimiter.create(IP_QPS));
} catch (Exception e) {
logger.warn("Failed to load IP rate limiter for {}, denying request: {}",
ip, e.getMessage());
return null;
}
return rateLimiter.tryAcquire();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,10 @@ public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,

RuntimeData runtimeData = new RuntimeData(call);
// Check per-endpoint first to avoid consuming global IP/QPS quota for requests
// that would be rejected by the per-endpoint limiter anyway.
boolean perEndpointAcquired = rateLimiter == null || rateLimiter.tryAcquire(runtimeData);
boolean acquireResource = perEndpointAcquired && GlobalRateLimiter.tryAcquire(runtimeData);
// that would be rejected by the per-endpoint limiter anyway. acquirePermit()
// chooses blocking or non-blocking semantics based on rate.limiter.apiNonBlocking.
boolean perEndpointAcquired = rateLimiter == null || rateLimiter.acquirePermit(runtimeData);
boolean acquireResource = perEndpointAcquired && GlobalRateLimiter.acquirePermit(runtimeData);

if (!acquireResource) {
// Release the per-endpoint permit when global rejected, to avoid semaphore leak.
Expand Down
Loading
Loading