AdvService 作用
AdvService
负责将数据广播到tron
网络当中。
基础框架是netty
,在此之上开发AdvService
对业务进行了封装。
数据包括:
- 交易
- 区块
需要注意的是,tron
的AdvService
的这套广播逻辑,不是单向广播,而是双向互动。
啥意思,就是说,一般理解,一条数据广播出去后,就广播到对方节上了。
但是tron的广播不是这样,而是先广播一个交易ID到目录节点上,目标节点收到ID后,再发一条请求接取的网络请求,把数据接回去!!!!
是不是有点反直觉!!!
AdvService 主要成员
invToFetch:
invToSpread: 待广播的数据:交易、区块
invToFetchCache:
主要方法
consumerInvToSpread: 处理发送队列
consumerInvToFetch: 处理拉取队列
broadcast: 广播
处理流程
- broadcast: 构建广播消息体:只包含ID
- 将数据添加入trxCache/blockCache
- 封装item
- 保存待发送消息: invToSpread.put(item)
- consumerInvToSpread: 处理 invToSpread
广播 id
需要发送的数据如:交易、区块,通过调用AdvService.broadcast
将id
广播。
但是广播并不是一调用broadcast
就发送出去的,还需要在各个队列中导来导去好几次。
调用栈
1 2 3 4 5 6
| broadcast() \--consumerInvToSpread() \--invSender.sendInv(); \--peer.sendMessage(new InventoryMessage(value, key)); \--msgQueue.sendMessage(message); \--requestQueue.add(new MessageRoundTrip(msg));
|
步骤:
- 判断block/trx
blockCache //接收到的数据进缓存
trxCache //接收到的数据进缓存
- 将数据装成 item
- invToSpread.put(item)
- consumerInvToSpread //处理 invToSpread 的中数据
具体实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| public void broadcast(Message msg) { if (fastForward) { return; }
if (invToSpread.size() > MAX_SPREAD_SIZE) { logger.warn("Drop message, type: {}, ID: {}", msg.getType(), msg.getMessageId()); return; }
Item item; if (msg instanceof BlockMessage) { BlockMessage blockMsg = (BlockMessage) msg; item = new Item(blockMsg.getMessageId(), InventoryType.BLOCK); logger.info("Ready to broadcast block {}", blockMsg.getBlockId().getString()); blockMsg.getBlockCapsule().getTransactions().forEach(transactionCapsule -> { Sha256Hash tid = transactionCapsule.getTransactionId(); invToSpread.remove(tid); trxCache.put(new Item(tid, InventoryType.TRX), new TransactionMessage(transactionCapsule.getInstance())); }); blockCache.put(item, msg); } else if (msg instanceof TransactionMessage) { TransactionMessage trxMsg = (TransactionMessage) msg; item = new Item(trxMsg.getMessageId(), InventoryType.TRX); trxCount.add(); trxCache.put(item, new TransactionMessage(trxMsg.getTransactionCapsule().getInstance())); } else { logger.error("Adv item is neither block nor trx, type: {}", msg.getType()); return; }
invToSpread.put(item, System.currentTimeMillis());
if (InventoryType.BLOCK.equals(item.getType())) { consumerInvToSpread(); } }
@Override public Sha256Hash getMessageId() { return this.transactionCapsule.getTransactionId(); }
@Override public Sha256Hash getMessageId() { return getBlockCapsule().getBlockId(); }
|
consumerInvToSpread 中处理
处理过程:
- 拿到所有活跃PeerConnection
- 创建InvSender
- 把数据copy到 peer 的AdvInvSpread
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| private synchronized void consumerInvToSpread() { List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream() .filter(peer -> !peer.isNeedSyncFromPeer() && !peer.isNeedSyncFromUs()) .collect(Collectors.toList());
if (invToSpread.isEmpty() || peers.isEmpty()) { return; }
InvSender invSender = new InvSender();
invToSpread.forEach((item, time) -> peers.forEach(peer -> { if (peer.getAdvInvReceive().getIfPresent(item) == null && peer.getAdvInvSpread().getIfPresent(item) == null && !(item.getType().equals(InventoryType.BLOCK) && System.currentTimeMillis() - time > BLOCK_PRODUCED_INTERVAL)) { peer.getAdvInvSpread().put(item, Time.getCurrentMillis()); invSender.add(item, peer); } invToSpread.remove(item); }));
invSender.sendInv(); }
|
//TODO