启动流程

Tendermint 的启动流程比较清析明了,各业务启动流程都在对应的实现代码,主启动流程加载所需配置,由各启动实现类启动自己对应业务,如节点启动相关在 nodeImpl,共识相关处理是 state 中进行处理。

流程大致:

  1. 加载配置 node.NewDefault
  2. 启动运行 Start
  3. 启动相关实现的 OnStart

先看启动流程

服务启动

启动入口代码,这里使用到了一个命令行工具:cobra
代码位置:cmd/tendermint/main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func main() {
...省略部份代码
// NOTE:
// Users wishing to:
// * Use an external signer for their validators
// * Supply an in-proc abci app
// * Supply a genesis doc file from another source
// * Provide their own DB implementation
// can copy this file and use something other than the
// node.NewDefault function
// 创建节点为默认动行节点,这里是函数引用,并未执行
// 在 cmd.NewRunNodeCmd 调用
nodeFunc := node.NewDefault

// Create & start node
// 主要方法 cmd.NewRunNodeCmd
rootCmd.AddCommand(cmd.NewRunNodeCmd(nodeFunc))

cmd := cli.PrepareBaseCmd(rootCmd, "TM", os.ExpandEnv(filepath.Join("$HOME", config.DefaultTendermintDir)))
if err := cmd.Execute(); err != nil {
panic(err)
}
}

创建运行节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// NewRunNodeCmd returns the command that allows the CLI to start a node.
// It can be used with a custom PrivValidator and in-process ABCI application.
func NewRunNodeCmd(nodeProvider cfg.ServiceProvider) *cobra.Command {
// 添加到命令行
cmd := &cobra.Command{
Use: "start",
Aliases: []string{"node", "run"},
Short: "Run the tendermint node",
RunE: func(cmd *cobra.Command, args []string) error {
if err := checkGenesisHash(config); err != nil {
return err
}
// 这里调用 node.NewDefault 这个方法实现
// 创建了节点
n, err := nodeProvider(config, logger)
if err != nil {
return fmt.Errorf("failed to create node: %w", err)
}
// 启动服务
if err := n.Start(); err != nil {
return fmt.Errorf("failed to start node: %w", err)
}

logger.Info("started node", "node", n.String())

// Stop upon receiving SIGTERM or CTRL-C.
tmos.TrapSignal(logger, func() {
if n.IsRunning() {
if err := n.Stop(); err != nil {
logger.Error("unable to stop the node", "error", err)
}
}
})

// Run forever.
select {}
},
}

AddNodeFlags(cmd)
return cmd
}

启动需的默认配置一目了然

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// DefaultConfig returns a default configuration for a Tendermint node
func DefaultConfig() *Config {
return &Config{
BaseConfig: DefaultBaseConfig(),
RPC: DefaultRPCConfig(),
P2P: DefaultP2PConfig(),
Mempool: DefaultMempoolConfig(),
StateSync: DefaultStateSyncConfig(),
BlockSync: DefaultBlockSyncConfig(),
Consensus: DefaultConsensusConfig(),
TxIndex: DefaultTxIndexConfig(),
Instrumentation: DefaultInstrumentationConfig(),
PrivValidator: DefaultPrivValidatorConfig(),
}
}

启动服务

启动服务接口 Service 主要实现类是BaseService

service.go

1
2
3
4
5
6
7
8
// Service defines a service that can be started, stopped, and reset.
type Service interface {
// Start the service.
// If it's already started or stopped, will return an error.
// If OnStart() returns an error, it's returned by Start()
Start() error
OnStart() error
}

启动 node

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Start implements Service by calling OnStart (if defined). An error will be
// returned if the service is already running or stopped. Not to start the
// stopped service, you need to call Reset.
func (bs *BaseService) Start() error {
if atomic.CompareAndSwapUint32(&bs.started, 0, 1) {
if atomic.LoadUint32(&bs.stopped) == 1 {
bs.Logger.Error("not starting service; already stopped", "service", bs.name, "impl", bs.impl.String())
atomic.StoreUint32(&bs.started, 0)
return ErrAlreadyStopped
}

bs.Logger.Info("starting service", "service", bs.name, "impl", bs.impl.String())
// 启动节点。BaseService 有很多实,都实现 OnStart。
// 服务启动是:node.go OnStart
// 共识启动是: state.go OnStart
if err := bs.impl.OnStart(); err != nil {
// revert flag
atomic.StoreUint32(&bs.started, 0)
return err
}
return nil
}

bs.Logger.Debug("not starting service; already started", "service", bs.name, "impl", bs.impl.String())
return ErrAlreadyStarted
}

nodeImpl 实现启动流程,总的来说还是比较清晰。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148

// OnStart starts the Node. It implements service.Service.
func (n *nodeImpl) OnStart() error {
now := tmtime.Now()
genTime := n.genesisDoc.GenesisTime
if genTime.After(now) {
n.Logger.Info("Genesis time is in the future. Sleeping until then...", "genTime", genTime)
time.Sleep(genTime.Sub(now))
}

// Start the RPC server before the P2P server
// so we can eg. receive txs for the first block
// 这里顺带说下,tendermint 的3种节点为类型
// ModeFull = "full" 数据转发节点
// ModeValidator = "validator" 数据验证节点
// ModeSeed = "seed" 用来做节点发现
if n.config.RPC.ListenAddress != "" && n.config.Mode != config.ModeSeed {
// 启动 RPC
listeners, err := n.startRPC()
if err != nil {
return err
}
n.rpcListeners = listeners
}

if n.config.Instrumentation.Prometheus &&
n.config.Instrumentation.PrometheusListenAddr != "" {
n.prometheusSrv = n.startPrometheusServer(n.config.Instrumentation.PrometheusListenAddr)
}

// Start the transport.
addr, err := types.NewNetAddressString(n.nodeKey.ID.AddressString(n.config.P2P.ListenAddress))
if err != nil {
return err
}
if err := n.transport.Listen(p2p.NewEndpoint(addr)); err != nil {
return err
}

n.isListening = true

// p2p 路由
if err = n.router.Start(); err != nil {
return err
}

if n.config.Mode != config.ModeSeed {
if n.config.BlockSync.Enable {
// 开启区块同步
if err := n.bcReactor.Start(); err != nil {
return err
}
}

// Start the real consensus reactor separately since the switch uses the shim.
if err := n.consensusReactor.Start(); err != nil {
return err
}

// Start the real state sync reactor separately since the switch uses the shim.
if err := n.stateSyncReactor.Start(); err != nil {
return err
}

// Start the real mempool reactor separately since the switch uses the shim.
if err := n.mempoolReactor.Start(); err != nil {
return err
}

// Start the real evidence reactor separately since the switch uses the shim.
if err := n.evidenceReactor.Start(); err != nil {
return err
}
}

if err := n.pexReactor.Start(); err != nil {
return err
}

// Run state sync
// TODO: We shouldn't run state sync if we already have state that has a
// LastBlockHeight that is not InitialHeight
if n.stateSync {
bcR, ok := n.bcReactor.(consensus.BlockSyncReactor)
if !ok {
return fmt.Errorf("this blockchain reactor does not support switching from state sync")
}

// we need to get the genesis state to get parameters such as
state, err := sm.MakeGenesisState(n.genesisDoc)
if err != nil {
return fmt.Errorf("unable to derive state: %w", err)
}

// TODO: we may want to move these events within the respective
// reactors.
// At the beginning of the statesync start, we use the initialHeight as the event height
// because of the statesync doesn't have the concreate state height before fetched the snapshot.
d := types.EventDataStateSyncStatus{Complete: false, Height: state.InitialHeight}
if err := n.eventBus.PublishEventStateSyncStatus(d); err != nil {
n.eventBus.Logger.Error("failed to emit the statesync start event", "err", err)
}

// FIXME: We shouldn't allow state sync to silently error out without
// bubbling up the error and gracefully shutting down the rest of the node
go func() {
n.Logger.Info("starting state sync")
state, err := n.stateSyncReactor.Sync(context.TODO())
if err != nil {
n.Logger.Error("state sync failed; shutting down this node", "err", err)
// stop the node
if err := n.Stop(); err != nil {
n.Logger.Error("failed to shut down node", "err", err)
}
return
}

n.consensusReactor.SetStateSyncingMetrics(0)

d := types.EventDataStateSyncStatus{Complete: true, Height: state.LastBlockHeight}
if err := n.eventBus.PublishEventStateSyncStatus(d); err != nil {
n.eventBus.Logger.Error("failed to emit the statesync start event", "err", err)
}

// TODO: Some form of orchestrator is needed here between the state
// advancing reactors to be able to control which one of the three
// is running
if n.config.BlockSync.Enable {
// FIXME Very ugly to have these metrics bleed through here.
n.consensusReactor.SetBlockSyncingMetrics(1)
if err := bcR.SwitchToBlockSync(state); err != nil {
n.Logger.Error("failed to switch to block sync", "err", err)
return
}

d := types.EventDataBlockSyncStatus{Complete: false, Height: state.LastBlockHeight}
if err := n.eventBus.PublishEventBlockSyncStatus(d); err != nil {
n.eventBus.Logger.Error("failed to emit the block sync starting event", "err", err)
}

} else {
n.consensusReactor.SwitchToConsensus(state, true)
}
}()
}

return nil
}