前言
分析tron
是如何接收到交易,并在接收到交易后,后续是如何处理的,交易处理细节可以看看:tron 交易处理--交易执行逻辑
接收交易
节点使用netty进行P2P连接,主要使用到的类:
- TransactionsMsgHandler: netty Handler处理器
- TronNetService: 消息分发
- AdvService: 消息广播
- FetchInvDataMsgHandler: 消息拉取
交易处理调用栈:
1 2 3
| TronNetHandler.channelRead0 接收消息 \--TronNetService.onMessage 分发消息 \--transactionsMsgHandler.processMessage; 具体业务处理
|
TronNetService.onMessage 分发消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| protected void onMessage(PeerConnection peer, TronMessage msg) { long startTime = System.currentTimeMillis(); try { switch (msg.getType()) { case SYNC_BLOCK_CHAIN: syncBlockChainMsgHandler.processMessage(peer, msg); break; case BLOCK_CHAIN_INVENTORY: chainInventoryMsgHandler.processMessage(peer, msg); break; case INVENTORY: inventoryMsgHandler.processMessage(peer, msg); break; case FETCH_INV_DATA: fetchInvDataMsgHandler.processMessage(peer, msg); break; case BLOCK: blockMsgHandler.processMessage(peer, msg); break; case TRXS: transactionsMsgHandler.processMessage(peer, msg); break; case PBFT_COMMIT_MSG: pbftDataSyncHandler.processMessage(peer, msg); break; default: throw new P2pException(TypeEnum.NO_SUCH_MESSAGE, msg.getType().toString()); } } catch (Exception e) { processException(peer, msg, e); } finally { long costs = System.currentTimeMillis() - startTime; if (costs > DURATION_STEP) { logger.info("Message processing costs {} ms, peer: {}, type: {}, time tag: {}", costs, peer.getInetAddress(), msg.getType(), getTimeTag(costs)); Metrics.histogramObserve(MetricKeys.Histogram.MESSAGE_PROCESS_LATENCY, costs / Metrics.MILLISECONDS_PER_SECOND, msg.getType().name()); } } }
|
TransactionsMsgHandler
接收到的交易先放在线程池: trxHandlePool
再由trxHandlePool
调用handleTransaction
处理交易。
普通交易和智能合约的交易,处理还不一样。
先看下交易缓冲池:
1 2 3 4 5 6
| private BlockingQueue<Runnable> queue = new LinkedBlockingQueue();
private int threadNum = Args.getInstance().getValidateSignThreadNum(); private ExecutorService trxHandlePool = new ThreadPoolExecutor(threadNum, threadNum, 0L, TimeUnit.MILLISECONDS, queue);
|
processMessage 消息处理入口
区分普通交易和合约交易,另外会统计队列大小
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| @Override public void processMessage(PeerConnection peer, TronMessage msg) throws P2pException { TransactionsMessage transactionsMessage = (TransactionsMessage) msg; check(peer, transactionsMessage); int smartContractQueueSize = 0; int trxHandlePoolQueueSize = 0; int dropSmartContractCount = 0;
for (Transaction trx : transactionsMessage.getTransactions().getTransactionsList()) { int type = trx.getRawData().getContract(0).getType().getNumber(); if (type == ContractType.TriggerSmartContract_VALUE || type == ContractType.CreateSmartContract_VALUE) { if (!smartContractQueue.offer(new TrxEvent(peer, new TransactionMessage(trx)))) { smartContractQueueSize = smartContractQueue.size(); trxHandlePoolQueueSize = queue.size(); dropSmartContractCount++; } } else { trxHandlePool.submit(() -> handleTransaction(peer, new TransactionMessage(trx))); } }
if (dropSmartContractCount > 0) { logger.warn("Add smart contract failed, drop count: {}, queueSize {}:{}", dropSmartContractCount, smartContractQueueSize, trxHandlePoolQueueSize); } }
|
智能合约处理 handleSmartContract
智能合约交易,会有单独的线程来处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| private void handleSmartContract() { smartContractExecutor.scheduleWithFixedDelay(() -> { try { while (queue.size() < MAX_SMART_CONTRACT_SUBMIT_SIZE) { TrxEvent event = smartContractQueue.take(); trxHandlePool.submit(() -> handleTransaction(event.getPeer(), event.getMsg())); } } catch (InterruptedException e) { logger.warn("Handle smart server interrupted"); Thread.currentThread().interrupt(); } catch (Exception e) { logger.error("Handle smart contract exception", e); } }, 1000, 20, TimeUnit.MILLISECONDS); }
|
交易处理、广播 TransactionsMsgHandler.handleTransaction
调用栈
1 2
| TransactionsMsgHandler.handleTransaction \--AdvService.broadcast: 广播服务
|
在这里可以看到,每个tron
节点在接到到交易到后:
- 先自己处理
- 再广播交易
广播也挺复杂,单独写个博客细扣。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| private void handleTransaction(PeerConnection peer, TransactionMessage trx) { if (peer.isDisconnect()) { logger.warn("Drop trx {} from {}, peer is disconnect", trx.getMessageId(), peer.getInetAddress()); return; }
if (advService.getMessage(new Item(trx.getMessageId(), InventoryType.TRX)) != null) { return; }
try { tronNetDelegate.pushTransaction(trx.getTransactionCapsule()); advService.broadcast(trx); } catch (P2pException e) { logger.warn("Trx {} from peer {} process failed. type: {}, reason: {}", trx.getMessageId(), peer.getInetAddress(), e.getType(), e.getMessage()); if (e.getType().equals(TypeEnum.BAD_TRX)) { peer.disconnect(ReasonCode.BAD_TX); } } catch (Exception e) { logger.error("Trx {} from peer {} process failed", trx.getMessageId(), peer.getInetAddress(), e); } }
|
广播数据 AdvService.broadcast
首先要明确一个点:广播过去的,并示是交易,而是交易ID!!
广播的方式并不是把交易直接广播到其它节点,而是广播ID,然后其它节点到这个节点来拉取交易信息!!
广播缓存,使用guave cache
,最老淘汰机制,如果超过MAX_TRX_CACHE_SIZE
大小则老数据会丢弃,已经验证过这个场景,不过一般超达不到这个限制,只有在极端测试环境下能达到。
数据也就保留1H,也就是超时就丢弃。
重要成员变量
1 2 3 4 5 6 7 8 9 10 11 12
| private final int MAX_TRX_CACHE_SIZE = 50_000;
private Cache<Item, Message> trxCache = CacheBuilder.newBuilder() .maximumSize(MAX_TRX_CACHE_SIZE).expireAfterWrite(1, TimeUnit.HOURS) .recordStats().build();
private final int MAX_SPREAD_SIZE = 1_000
private ConcurrentHashMap<Item, Long> invToSpread = new ConcurrentHashMap<>();
|
广播逻辑
可以广播block
和transaction
数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| public void broadcast(Message msg) {
if (fastForward) { return; }
if (invToSpread.size() > MAX_SPREAD_SIZE) { logger.warn("Drop message, type: {}, ID: {}", msg.getType(), msg.getMessageId()); return; }
Item item; if (msg instanceof BlockMessage) { BlockMessage blockMsg = (BlockMessage) msg; item = new Item(blockMsg.getMessageId(), InventoryType.BLOCK); logger.info("Ready to broadcast block {}", blockMsg.getBlockId().getString()); blockMsg.getBlockCapsule().getTransactions().forEach(transactionCapsule -> { Sha256Hash tid = transactionCapsule.getTransactionId(); invToSpread.remove(tid); trxCache.put(new Item(tid, InventoryType.TRX), new TransactionMessage(transactionCapsule.getInstance())); }); blockCache.put(item, msg); } else if (msg instanceof TransactionMessage) { TransactionMessage trxMsg = (TransactionMessage) msg; item = new Item(trxMsg.getMessageId(), InventoryType.TRX); trxCount.add(); trxCache.put(item, new TransactionMessage(trxMsg.getTransactionCapsule().getInstance())); } else { logger.error("Adv item is neither block nor trx, type: {}", msg.getType()); return; }
invToSpread.put(item, System.currentTimeMillis());
if (InventoryType.BLOCK.equals(item.getType())) { consumerInvToSpread(); } }
|
拉取数据 FetchInvDataMsgHandler
假设上面的的交易通过节点A广播到了节点B,节点B收到消息后,就会来拉取直正的交易数据。
B 节点会发送 FETCH_INV_DATA
类型消息来A节点获数据。
核心方法在:FetchInvDataMsgHandler.processMessage
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| for (Sha256Hash hash : fetchInvDataMsg.getHashList()) { Item item = new Item(hash, type); Message message = advService.getMessage(item); if (message == null) { try { message = tronNetDelegate.getData(hash, type); } catch (Exception e) { throw new P2pException(TypeEnum.DB_ITEM_NOT_FOUND, "Fetch item " + item + " failed. reason: " + e.getMessage()); } } ··· }
|
FetchInvDataMsgHandler.processMessage
从AdvService
的trxCache
中拉取之前缓存的数据,这样就完成了一个广播到获取数据的流程。
AdvService.getMessage
1 2 3 4 5 6 7
| public Message getMessage(Item item) { if (item.getType() == InventoryType.TRX) { return trxCache.getIfPresent(item); } else { return blockCache.getIfPresent(item); } }
|
发送 consumerInvToSpread
发送数据由: consumerInvToSpread
方法执行,通过:
spreadExecutor
定时执行
broadcast
中判断类型为InventoryType.BLOCK
则立即发送
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| private synchronized void consumerInvToSpread() {
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream() .filter(peer -> !peer.isNeedSyncFromPeer() && !peer.isNeedSyncFromUs()) .collect(Collectors.toList());
if (invToSpread.isEmpty() || peers.isEmpty()) { return; }
InvSender invSender = new InvSender();
invToSpread.forEach((item, time) -> peers.forEach(peer -> { if (peer.getAdvInvReceive().getIfPresent(item) == null && peer.getAdvInvSpread().getIfPresent(item) == null && !(item.getType().equals(InventoryType.BLOCK) && System.currentTimeMillis() - time > BLOCK_PRODUCED_INTERVAL)) { peer.getAdvInvSpread().put(item, Time.getCurrentMillis()); invSender.add(item, peer); } invToSpread.remove(item); })); invSender.sendInv(); }
|
Transaction 交易结构
tron
链使用protobuf
进行序列化和反序列化,观察一下Transaction
的结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
| message Transaction { message Contract { enum ContractType { AccountCreateContract = 0; TransferContract = 1; TransferAssetContract = 2; VoteAssetContract = 3; VoteWitnessContract = 4; WitnessCreateContract = 5; AssetIssueContract = 6; WitnessUpdateContract = 8; ParticipateAssetIssueContract = 9; AccountUpdateContract = 10; FreezeBalanceContract = 11; UnfreezeBalanceContract = 12; WithdrawBalanceContract = 13; UnfreezeAssetContract = 14; UpdateAssetContract = 15; ProposalCreateContract = 16; ProposalApproveContract = 17; ProposalDeleteContract = 18; SetAccountIdContract = 19; CustomContract = 20; CreateSmartContract = 30; TriggerSmartContract = 31; GetContract = 32; UpdateSettingContract = 33; ExchangeCreateContract = 41; ExchangeInjectContract = 42; ExchangeWithdrawContract = 43; ExchangeTransactionContract = 44; UpdateEnergyLimitContract = 45; AccountPermissionUpdateContract = 46; ClearABIContract = 48; UpdateBrokerageContract = 49; ShieldedTransferContract = 51; MarketSellAssetContract = 52; MarketCancelOrderContract = 53; } ContractType type = 1; google.protobuf.Any parameter = 2; bytes provider = 3; bytes ContractName = 4; int32 Permission_id = 5; }
message Result { enum code { SUCESS = 0; FAILED = 1; } enum contractResult { DEFAULT = 0; SUCCESS = 1; REVERT = 2; BAD_JUMP_DESTINATION = 3; OUT_OF_MEMORY = 4; PRECOMPILED_CONTRACT = 5; STACK_TOO_SMALL = 6; STACK_TOO_LARGE = 7; ILLEGAL_OPERATION = 8; STACK_OVERFLOW = 9; OUT_OF_ENERGY = 10; OUT_OF_TIME = 11; JVM_STACK_OVER_FLOW = 12; UNKNOWN = 13; TRANSFER_FAILED = 14; INVALID_CODE = 15; } int64 fee = 1; code ret = 2; contractResult contractRet = 3;
string assetIssueID = 14; int64 withdraw_amount = 15; int64 unfreeze_amount = 16; int64 exchange_received_amount = 18; int64 exchange_inject_another_amount = 19; int64 exchange_withdraw_another_amount = 20; int64 exchange_id = 21; int64 shielded_transaction_fee = 22;
bytes orderId = 25; repeated MarketOrderDetail orderDetails = 26; }
message raw { bytes ref_block_bytes = 1; int64 ref_block_num = 3; bytes ref_block_hash = 4; int64 expiration = 8; repeated authority auths = 9; bytes data = 10; repeated Contract contract = 11; bytes scripts = 12; int64 timestamp = 14; int64 fee_limit = 18; }
raw raw_data = 1; repeated bytes signature = 2; repeated Result ret = 5; }
|
交易广播播代码:
TronNetService.java
AdvService.java
总结
了解这块代码的意义在于知道交易是怎么接收、处理、广播的,了解交易在所以节点之间的处理、流转。