AdvService 作用

AdvService 负责将数据广播到tron网络当中。
基础框架是netty,在此之上开发AdvService对业务进行了封装。

数据包括:

  1. 交易
  2. 区块

需要注意的是,tronAdvService的这套广播逻辑,不是单向广播,而是双向互动。
啥意思,就是说,一般理解,一条数据广播出去后,就广播到对方节上了。
但是tron的广播不是这样,而是先广播一个交易ID到目录节点上,目标节点收到ID后,再发一条请求接取的网络请求,把数据接回去!!!!
是不是有点反直觉!!!

AdvService 主要成员

invToFetch:
invToSpread: 待广播的数据:交易、区块
invToFetchCache:

主要方法

consumerInvToSpread: 处理发送队列
consumerInvToFetch: 处理拉取队列
broadcast: 广播

处理流程

  1. broadcast: 构建广播消息体:只包含ID
    1. 将数据添加入trxCache/blockCache
    2. 封装item
    3. 保存待发送消息: invToSpread.put(item)
  2. consumerInvToSpread: 处理 invToSpread

广播 id

需要发送的数据如:交易区块,通过调用AdvService.broadcastid广播。
但是广播并不是一调用broadcast就发送出去的,还需要在各个队列中导来导去好几次。
调用栈

1
2
3
4
5
6
broadcast()
\--consumerInvToSpread()
\--invSender.sendInv();
\--peer.sendMessage(new InventoryMessage(value, key));
\--msgQueue.sendMessage(message);
\--requestQueue.add(new MessageRoundTrip(msg));

步骤:

  1. 判断block/trx
    blockCache //接收到的数据进缓存
    trxCache //接收到的数据进缓存
  2. 将数据装成 item
  3. invToSpread.put(item)
  4. consumerInvToSpread //处理 invToSpread 的中数据

具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public void broadcast(Message msg) {
// 如果是 fastForward 节点不广播
if (fastForward) {
return;
}

// MAX_SPREAD_SIZE = 1_000
if (invToSpread.size() > MAX_SPREAD_SIZE) {
logger.warn("Drop message, type: {}, ID: {}", msg.getType(), msg.getMessageId());
return;
}

Item item;
if (msg instanceof BlockMessage) {
BlockMessage blockMsg = (BlockMessage) msg;
// 注意这里,是id,而不是区块本身
item = new Item(blockMsg.getMessageId(), InventoryType.BLOCK);
logger.info("Ready to broadcast block {}", blockMsg.getBlockId().getString());
// 把Block中的交易放到trxCache
blockMsg.getBlockCapsule().getTransactions().forEach(transactionCapsule -> {
Sha256Hash tid = transactionCapsule.getTransactionId();
invToSpread.remove(tid);
trxCache.put(new Item(tid, InventoryType.TRX),
new TransactionMessage(transactionCapsule.getInstance()));
});
blockCache.put(item, msg);
} else if (msg instanceof TransactionMessage) {
TransactionMessage trxMsg = (TransactionMessage) msg;
// getMessageId 是交易id
item = new Item(trxMsg.getMessageId(), InventoryType.TRX);
trxCount.add();
trxCache.put(item, new TransactionMessage(trxMsg.getTransactionCapsule().getInstance()));
} else {
logger.error("Adv item is neither block nor trx, type: {}", msg.getType());
return;
}

invToSpread.put(item, System.currentTimeMillis());

if (InventoryType.BLOCK.equals(item.getType())) {
consumerInvToSpread();
}
}

@Override
public Sha256Hash getMessageId() {
return this.transactionCapsule.getTransactionId();
}

@Override
public Sha256Hash getMessageId() {
return getBlockCapsule().getBlockId();
}

consumerInvToSpread 中处理

处理过程:

  1. 拿到所有活跃PeerConnection
  2. 创建InvSender
  3. 把数据copy到 peer 的AdvInvSpread
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private synchronized void consumerInvToSpread() {
// 判断 peer 状态,这段代码不能忽略
// peer.isNeedSyncFromPeer() 表示这个peer的状态,正在同步,区块状态不完整,不完整的不要广播给它
// peer.isNeedSyncFromUs() 表示peer,正在给别的节点同步数据:
// 区块还不完整,那么链的状态也是不完整的,广播过去的交易,它们也处理不了
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.isNeedSyncFromPeer() && !peer.isNeedSyncFromUs())
.collect(Collectors.toList());

if (invToSpread.isEmpty() || peers.isEmpty()) {
return;
}

InvSender invSender = new InvSender();

invToSpread.forEach((item, time) -> peers.forEach(peer -> {
if (peer.getAdvInvReceive().getIfPresent(item) == null
&& peer.getAdvInvSpread().getIfPresent(item) == null
&& !(item.getType().equals(InventoryType.BLOCK) // 如果是Block 且 超过3秒就不广播了
&& System.currentTimeMillis() - time > BLOCK_PRODUCED_INTERVAL)) {
// 把交易塞到 peer 的 AdvInvSpread 队列
peer.getAdvInvSpread().put(item, Time.getCurrentMillis());
invSender.add(item, peer);
}
// 移除提交易
invToSpread.remove(item);
}));

invSender.sendInv();
}

//TODO