//注册 broker public RegisterBrokerResult registerBroker( final String clusterName, final String brokerAddr, final String brokerName, finallong brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final Channel channel){ RegisterBrokerResult result = new RegisterBrokerResult(); try { try { this.lock.writeLock().lockInterruptibly(); //集群名称 Set<String> brokerNames = this.clusterAddrTable.get(clusterName); if (null == brokerNames) { //如果没有拿到 broker名,broker就用 clusterName brokerNames = new HashSet<String>(); this.clusterAddrTable.put(clusterName, brokerNames); } brokerNames.add(brokerName);
boolean registerFirst = false;
//brokerData 数据,第一次注册,并没有数据 BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null == brokerData) { registerFirst = true; brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>()); this.brokerAddrTable.put(brokerName, brokerData); } //key 是 0-n Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs(); //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT> //The same IP:PORT must only have one record in brokerAddrTable //slave 切换到 master:删除1,再将slave改为0,add到brokerAddrTable Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator(); while (it.hasNext()) { Entry<Long, String> item = it.next(); //brokerAddr 申请注册的 broker //去重,找到 IP:PORT 只允许一条存在,如果 IP:PORT 存在,ID不同,删除这一条 if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) { it.remove(); } } //上面删完,这里add进去,可以理解成更新操作 String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr);//返回旧值