Skip to content

Commit

Permalink
feat(probe): probe support keep alive
Browse files Browse the repository at this point in the history
Signed-off-by: Rory Z <[email protected]>
  • Loading branch information
Rory-Z committed Nov 20, 2023
1 parent b0975a7 commit d43e737
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 10 deletions.
32 changes: 25 additions & 7 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,28 @@ type Metrics struct {
}

type Probe struct {
Target string `yaml:"target"`
Scheme string `yaml:"scheme,omitempty"`
ClientID string `yaml:"client_id,omitempty"`
Username string `yaml:"username,omitempty"`
Password string `yaml:"password,omitempty"`
Topic string `yaml:"topic,omitempty"`
QoS byte `yaml:"qos,omitempty"`
// Target is the address of the EMQX node to probe. Required.
Target string `yaml:"target"`
// Scheme is the protocol scheme of the EMQX node to probe.
// Enum: [mqtt | tcp | mqtts | ssl | tls | ws | wss]
// Default: tcp
Scheme string `yaml:"scheme,omitempty"`
// ClientID is the MQTT client ID to use when probing.
// Default: emqx_exporter_probe_<index>
ClientID string `yaml:"client_id,omitempty"`
// Username is the MQTT username to use when probing.
Username string `yaml:"username,omitempty"`
// Password is the MQTT password to use when probing.
Password string `yaml:"password,omitempty"`
// Topic is the MQTT topic to use when probing.
// Default: emqx-exporter-probe-<index>
Topic string `yaml:"topic,omitempty"`
// QoS is the MQTT QoS to use when probing.
// Default: 0
QoS byte `yaml:"qos,omitempty"`
// KeepAlive is the keep alive period in seconds. Defaults to 30 seconds.
KeepAlive int64 `yaml:"keep_alive,omitempty"`
// TLSClientConfig is the TLS configuration to use when probing.
TLSClientConfig *TLSClientConfig `yaml:"tls_config,omitempty"`
}

Expand Down Expand Up @@ -160,6 +175,9 @@ func (sc *SafeConfig) ReloadConfig(confFile string) (err error) {
if probe.Topic == "" {
probe.Topic = "emqx-exporter-probe-" + fmt.Sprintf("%d", index)
}
if probe.KeepAlive == 0 {
probe.KeepAlive = 30
}
c.Probes[index] = probe
}

Expand Down
1 change: 1 addition & 0 deletions config/example/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ probes:
password:
topic:
qos:
keep_alive:
10 changes: 7 additions & 3 deletions prober/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,18 @@ func init() {
select {
case <-context.Background().Done():
return
case <-time.After(5 * time.Second):
case <-time.After(60 * time.Second):
}
}
}()
}

func initMQTTProbe(probe config.Probe, logger log.Logger) (*MQTTProbe, error) {
opt := mqtt.NewClientOptions().AddBroker(probe.Scheme + "://" + probe.Target).SetClientID(probe.ClientID).SetUsername(probe.Username).SetPassword(probe.Password)
opt := mqtt.NewClientOptions().AddBroker(probe.Scheme + "://" + probe.Target)
opt.SetClientID(probe.ClientID)
opt.SetUsername(probe.Username)
opt.SetPassword(probe.Password)
opt.SetKeepAlive(time.Duration(probe.KeepAlive) * time.Second)
if probe.TLSClientConfig != nil {
opt.SetTLSConfig(probe.TLSClientConfig.ToTLSConfig())
}
Expand Down Expand Up @@ -108,7 +112,7 @@ func ProbeMQTT(probe config.Probe, logger log.Logger) bool {
if msg == nil {
return false
}
case <-time.After(5 * time.Second):
case <-time.After(time.Duration(probe.KeepAlive) * time.Second):
return false
}

Expand Down

0 comments on commit d43e737

Please sign in to comment.