前言

分析tron是如何接收到交易,并在接收到交易后,后续是如何处理的,交易处理细节可以看看:tron 交易处理--交易执行逻辑

接收交易

节点使用netty进行P2P连接,主要使用到的类:

  1. TransactionsMsgHandler: netty Handler处理器
  2. TronNetService: 消息分发
  3. AdvService: 消息广播
  4. FetchInvDataMsgHandler: 消息拉取

交易处理调用栈:

1
2
3
TronNetHandler.channelRead0 接收消息
\--TronNetService.onMessage 分发消息
\--transactionsMsgHandler.processMessage; 具体业务处理

TronNetService.onMessage 分发消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
protected void onMessage(PeerConnection peer, TronMessage msg) {
long startTime = System.currentTimeMillis();
try {
switch (msg.getType()) {
case SYNC_BLOCK_CHAIN:
syncBlockChainMsgHandler.processMessage(peer, msg);
break;
case BLOCK_CHAIN_INVENTORY:
chainInventoryMsgHandler.processMessage(peer, msg);
break;
case INVENTORY:
inventoryMsgHandler.processMessage(peer, msg);
break;
case FETCH_INV_DATA:
fetchInvDataMsgHandler.processMessage(peer, msg);
break;
case BLOCK:
blockMsgHandler.processMessage(peer, msg);
break;
case TRXS:
// 交易处理入口
transactionsMsgHandler.processMessage(peer, msg);
break;
case PBFT_COMMIT_MSG:
pbftDataSyncHandler.processMessage(peer, msg);
break;
default:
throw new P2pException(TypeEnum.NO_SUCH_MESSAGE, msg.getType().toString());
}
} catch (Exception e) {
processException(peer, msg, e);
} finally {
long costs = System.currentTimeMillis() - startTime;
if (costs > DURATION_STEP) {
logger.info("Message processing costs {} ms, peer: {}, type: {}, time tag: {}",
costs, peer.getInetAddress(), msg.getType(), getTimeTag(costs));
Metrics.histogramObserve(MetricKeys.Histogram.MESSAGE_PROCESS_LATENCY,
costs / Metrics.MILLISECONDS_PER_SECOND, msg.getType().name());
}
}
}

TransactionsMsgHandler

接收到的交易先放在线程池: trxHandlePool
再由trxHandlePool调用handleTransaction处理交易。
普通交易智能合约的交易,处理还不一样。

先看下交易缓冲池:

1
2
3
4
5
6
// 无界队列
private BlockingQueue<Runnable> queue = new LinkedBlockingQueue();
// 工作线程数
private int threadNum = Args.getInstance().getValidateSignThreadNum();
private ExecutorService trxHandlePool = new ThreadPoolExecutor(threadNum, threadNum, 0L,
TimeUnit.MILLISECONDS, queue);

processMessage 消息处理入口

区分普通交易和合约交易,另外会统计队列大小

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Override
public void processMessage(PeerConnection peer, TronMessage msg) throws P2pException {
TransactionsMessage transactionsMessage = (TransactionsMessage) msg;
check(peer, transactionsMessage);
int smartContractQueueSize = 0;
int trxHandlePoolQueueSize = 0;
int dropSmartContractCount = 0;

// 遍历交易
for (Transaction trx : transactionsMessage.getTransactions().getTransactionsList()) {
int type = trx.getRawData().getContract(0).getType().getNumber();
// 合约类型交易
if (type == ContractType.TriggerSmartContract_VALUE
|| type == ContractType.CreateSmartContract_VALUE) {
// 合约类型交易没有直接执行,而是添加到了 smartContractQueue 队列当中
// 注意,这里用的是 !offer,也就是说插入失败了,超过限制
// MAX_TRX_SIZE = 50_000
if (!smartContractQueue.offer(new TrxEvent(peer, new TransactionMessage(trx)))) {
smartContractQueueSize = smartContractQueue.size();
// queue 是线程池的队列长度
trxHandlePoolQueueSize = queue.size();
dropSmartContractCount++;
}
// 没有 else 处理,那这笔交易就丢掉了!!!
} else {
// 普通交易
trxHandlePool.submit(() -> handleTransaction(peer, new TransactionMessage(trx)));
}
}

// 上面没有else处理,但是这里加了判断,会打印出队列长度
if (dropSmartContractCount > 0) {
logger.warn("Add smart contract failed, drop count: {}, queueSize {}:{}",
dropSmartContractCount, smartContractQueueSize, trxHandlePoolQueueSize);
}
}

智能合约处理 handleSmartContract

智能合约交易,会有单独的线程来处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void handleSmartContract() {
// 这是个单线程的延时处理线程池
// 也就是智能合约的交易,20ms执行一次
smartContractExecutor.scheduleWithFixedDelay(() -> {
try {
// 限制 MAX_SMART_CONTRACT_SUBMIT_SIZE = 100
// 那 queue 里数据多了,还执行不了!!
// 也就是 queue 一定要先消费到 < MAX_SMART_CONTRACT_SUBMIT_SIZE
while (queue.size() < MAX_SMART_CONTRACT_SUBMIT_SIZE) {
TrxEvent event = smartContractQueue.take();
trxHandlePool.submit(() -> handleTransaction(event.getPeer(), event.getMsg()));
}
} catch (InterruptedException e) {
logger.warn("Handle smart server interrupted");
Thread.currentThread().interrupt();
} catch (Exception e) {
logger.error("Handle smart contract exception", e);
}
}, 1000, 20, TimeUnit.MILLISECONDS);
}

交易处理、广播 TransactionsMsgHandler.handleTransaction

调用栈

1
2
TransactionsMsgHandler.handleTransaction
\--AdvService.broadcast: 广播服务

在这里可以看到,每个tron节点在接到到交易到后:

  1. 先自己处理
  2. 再广播交易

广播也挺复杂,单独写个博客细扣。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void handleTransaction(PeerConnection peer, TransactionMessage trx) {
if (peer.isDisconnect()) {
logger.warn("Drop trx {} from {}, peer is disconnect", trx.getMessageId(),
peer.getInetAddress());
return;
}

// 广播队列验重
if (advService.getMessage(new Item(trx.getMessageId(), InventoryType.TRX)) != null) {
return;
}

try {
tronNetDelegate.pushTransaction(trx.getTransactionCapsule());
// 广播交易
advService.broadcast(trx);
} catch (P2pException e) {
logger.warn("Trx {} from peer {} process failed. type: {}, reason: {}",
trx.getMessageId(), peer.getInetAddress(), e.getType(), e.getMessage());
// 如果是 BAD_TRX 断开连接
if (e.getType().equals(TypeEnum.BAD_TRX)) {
peer.disconnect(ReasonCode.BAD_TX);
}
} catch (Exception e) {
logger.error("Trx {} from peer {} process failed", trx.getMessageId(), peer.getInetAddress(),
e);
}
}

广播数据 AdvService.broadcast

首先要明确一个点:广播过去的,并示是交易,而是交易ID!!
广播的方式并不是把交易直接广播到其它节点,而是广播ID,然后其它节点到这个节点来拉取交易信息!!

广播缓存,使用guave cache,最老淘汰机制,如果超过MAX_TRX_CACHE_SIZE大小则老数据会丢弃,已经验证过这个场景,不过一般超达不到这个限制,只有在极端测试环境下能达到。
数据也就保留1H,也就是超时就丢弃。

重要成员变量

1
2
3
4
5
6
7
8
9
10
11
12
private final int MAX_TRX_CACHE_SIZE = 50_000;
// 广播缓存,MAX_TRX_CACHE_SIZE
// 提供缓存供外部获取、验重等作用
private Cache<Item, Message> trxCache = CacheBuilder.newBuilder()
.maximumSize(MAX_TRX_CACHE_SIZE).expireAfterWrite(1, TimeUnit.HOURS)
.recordStats().build();

// invToSpread 最大限制
private final int MAX_SPREAD_SIZE = 1_000
// 待发送队列
private ConcurrentHashMap<Item, Long> invToSpread = new ConcurrentHashMap<>();

广播逻辑
可以广播blocktransaction数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public void broadcast(Message msg) {

if (fastForward) {
return;
}

// 校验交易缓存大小,这里会限制,不过一般超不过这个限制,可以适当调大或调小
if (invToSpread.size() > MAX_SPREAD_SIZE) {
logger.warn("Drop message, type: {}, ID: {}", msg.getType(), msg.getMessageId());
return;
}

Item item;
if (msg instanceof BlockMessage) {
BlockMessage blockMsg = (BlockMessage) msg;
item = new Item(blockMsg.getMessageId(), InventoryType.BLOCK);
logger.info("Ready to broadcast block {}", blockMsg.getBlockId().getString());
blockMsg.getBlockCapsule().getTransactions().forEach(transactionCapsule -> {
Sha256Hash tid = transactionCapsule.getTransactionId();
invToSpread.remove(tid);
trxCache.put(new Item(tid, InventoryType.TRX),
new TransactionMessage(transactionCapsule.getInstance()));
});
blockCache.put(item, msg);
} else if (msg instanceof TransactionMessage) {
TransactionMessage trxMsg = (TransactionMessage) msg;
// 注意,trxMsg.getMessageId() 是交易id: transactionCapsule.getTransactionId()
// 也就是这里构建了一条广播消息的Item,包含了:交易ID、交易类型 TRX
item = new Item(trxMsg.getMessageId(), InventoryType.TRX);
trxCount.add();
trxCache.put(item, new TransactionMessage(trxMsg.getTransactionCapsule().getInstance()));
} else {
logger.error("Adv item is neither block nor trx, type: {}", msg.getType());
return;
}

invToSpread.put(item, System.currentTimeMillis());

if (InventoryType.BLOCK.equals(item.getType())) {
consumerInvToSpread();
}
}

拉取数据 FetchInvDataMsgHandler

假设上面的的交易通过节点A广播到了节点B,节点B收到消息后,就会来拉取直正的交易数据。
B 节点会发送 FETCH_INV_DATA 类型消息来A节点获数据。
核心方法在:FetchInvDataMsgHandler.processMessage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
for (Sha256Hash hash : fetchInvDataMsg.getHashList()) {
Item item = new Item(hash, type);
// 遍历 advService 的缓存数据,getMessage 中包含之前已发送的数据
Message message = advService.getMessage(item);
if (message == null) {
try {
// type: block、trx
message = tronNetDelegate.getData(hash, type);
} catch (Exception e) {
throw new P2pException(TypeEnum.DB_ITEM_NOT_FOUND,
"Fetch item " + item + " failed. reason: " + e.getMessage());
}
}
···
}

FetchInvDataMsgHandler.processMessageAdvServicetrxCache中拉取之前缓存的数据,这样就完成了一个广播到获取数据的流程。

AdvService.getMessage

1
2
3
4
5
6
7
public Message getMessage(Item item) {
if (item.getType() == InventoryType.TRX) {
return trxCache.getIfPresent(item);
} else {
return blockCache.getIfPresent(item);
}
}

发送 consumerInvToSpread

发送数据由: consumerInvToSpread 方法执行,通过:

  1. spreadExecutor 定时执行
  2. broadcast 中判断类型为InventoryType.BLOCK则立即发送
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private synchronized void consumerInvToSpread() {

List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.isNeedSyncFromPeer() && !peer.isNeedSyncFromUs())
.collect(Collectors.toList());

if (invToSpread.isEmpty() || peers.isEmpty()) {
return;
}

InvSender invSender = new InvSender();

invToSpread.forEach((item, time) -> peers.forEach(peer -> {
if (peer.getAdvInvReceive().getIfPresent(item) == null
&& peer.getAdvInvSpread().getIfPresent(item) == null
&& !(item.getType().equals(InventoryType.BLOCK)
&& System.currentTimeMillis() - time > BLOCK_PRODUCED_INTERVAL)) {
peer.getAdvInvSpread().put(item, Time.getCurrentMillis());
invSender.add(item, peer);
}
// 移除本次发送的数据,这样才不会越来越大
invToSpread.remove(item);
}));
// 发送
invSender.sendInv();
}

Transaction 交易结构

tron链使用protobuf进行序列化和反序列化,观察一下Transaction的结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
message Transaction {
message Contract {
enum ContractType {
AccountCreateContract = 0;
// 普通交易
TransferContract = 1;
// TRC10资产交易
TransferAssetContract = 2;
VoteAssetContract = 3;
VoteWitnessContract = 4;
WitnessCreateContract = 5;
AssetIssueContract = 6;
// 7 呢?
WitnessUpdateContract = 8;
ParticipateAssetIssueContract = 9;
AccountUpdateContract = 10;
// 冻结
FreezeBalanceContract = 11;
// 解冻
UnfreezeBalanceContract = 12;
// 提取奖励
WithdrawBalanceContract = 13;
UnfreezeAssetContract = 14;
UpdateAssetContract = 15;
ProposalCreateContract = 16;
ProposalApproveContract = 17;
ProposalDeleteContract = 18;
SetAccountIdContract = 19;
CustomContract = 20;
CreateSmartContract = 30;
TriggerSmartContract = 31;
GetContract = 32;
UpdateSettingContract = 33;
ExchangeCreateContract = 41;
ExchangeInjectContract = 42;
ExchangeWithdrawContract = 43;
ExchangeTransactionContract = 44;
UpdateEnergyLimitContract = 45;
AccountPermissionUpdateContract = 46;
ClearABIContract = 48;
UpdateBrokerageContract = 49;
ShieldedTransferContract = 51;
MarketSellAssetContract = 52;
MarketCancelOrderContract = 53;
}
ContractType type = 1;
google.protobuf.Any parameter = 2;
bytes provider = 3;
bytes ContractName = 4;
int32 Permission_id = 5;
}

message Result {
enum code {
SUCESS = 0;
FAILED = 1;
}
enum contractResult {
DEFAULT = 0;
SUCCESS = 1;
REVERT = 2;
BAD_JUMP_DESTINATION = 3;
OUT_OF_MEMORY = 4;
PRECOMPILED_CONTRACT = 5;
STACK_TOO_SMALL = 6;
STACK_TOO_LARGE = 7;
ILLEGAL_OPERATION = 8;
STACK_OVERFLOW = 9;
OUT_OF_ENERGY = 10;
OUT_OF_TIME = 11;
JVM_STACK_OVER_FLOW = 12;
UNKNOWN = 13;
TRANSFER_FAILED = 14;
INVALID_CODE = 15;
}
int64 fee = 1;
code ret = 2;
contractResult contractRet = 3;

string assetIssueID = 14;
int64 withdraw_amount = 15;
int64 unfreeze_amount = 16;
int64 exchange_received_amount = 18;
int64 exchange_inject_another_amount = 19;
int64 exchange_withdraw_another_amount = 20;
int64 exchange_id = 21;
int64 shielded_transaction_fee = 22;


bytes orderId = 25;
repeated MarketOrderDetail orderDetails = 26;
}

message raw {
bytes ref_block_bytes = 1;
int64 ref_block_num = 3;
bytes ref_block_hash = 4;
int64 expiration = 8;
repeated authority auths = 9;
// data not used
bytes data = 10;
//only support size = 1, repeated list here for extension
repeated Contract contract = 11;
// scripts not used
bytes scripts = 12;
int64 timestamp = 14;
int64 fee_limit = 18;
}

raw raw_data = 1;
// only support size = 1, repeated list here for muti-sig extension
repeated bytes signature = 2;
repeated Result ret = 5;
}

交易广播播代码:

TronNetService.java
AdvService.java

总结

了解这块代码的意义在于知道交易是怎么接收、处理、广播的,了解交易在所以节点之间的处理、流转。