diff --git a/conf/conf.go b/conf/conf.go index 6d810d4f..b74cb350 100644 --- a/conf/conf.go +++ b/conf/conf.go @@ -43,6 +43,7 @@ import ( "github.com/megaease/easeprobe/probe/ssh" "github.com/megaease/easeprobe/probe/tcp" "github.com/megaease/easeprobe/probe/tls" + "github.com/megaease/easeprobe/probe/websocket" "github.com/invopop/jsonschema" log "github.com/sirupsen/logrus" @@ -138,17 +139,18 @@ type Settings struct { // Conf is Probe configuration type Conf struct { - Version string `yaml:"version" json:"version,omitempty" jsonschema:"title=Version,description=Version of the EaseProbe configuration"` - HTTP []http.HTTP `yaml:"http" json:"http,omitempty" jsonschema:"title=HTTP Probe,description=HTTP Probe Configuration"` - TCP []tcp.TCP `yaml:"tcp" json:"tcp,omitempty" jsonschema:"title=TCP Probe,description=TCP Probe Configuration"` - Shell []shell.Shell `yaml:"shell" json:"shell,omitempty" jsonschema:"title=Shell Probe,description=Shell Probe Configuration"` - Client []client.Client `yaml:"client" json:"client,omitempty" jsonschema:"title=Native Client Probe,description=Native Client Probe Configuration"` - SSH ssh.SSH `yaml:"ssh" json:"ssh,omitempty" jsonschema:"title=SSH Probe,description=SSH Probe Configuration"` - TLS []tls.TLS `yaml:"tls" json:"tls,omitempty" jsonschema:"title=TLS Probe,description=TLS Probe Configuration"` - Host host.Host `yaml:"host" json:"host,omitempty" jsonschema:"title=Host Probe,description=Host Probe Configuration"` - Ping []ping.Ping `yaml:"ping" json:"ping,omitempty" jsonschema:"title=Ping Probe,description=Ping Probe Configuration"` - Notify notify.Config `yaml:"notify" json:"notify,omitempty" jsonschema:"title=Notification,description=Notification Configuration"` - Settings Settings `yaml:"settings" json:"settings,omitempty" jsonschema:"title=Global Settings,description=EaseProbe Global configuration"` + Version string `yaml:"version" json:"version,omitempty" jsonschema:"title=Version,description=Version of the EaseProbe configuration"` + HTTP []http.HTTP `yaml:"http" json:"http,omitempty" jsonschema:"title=HTTP Probe,description=HTTP Probe Configuration"` + TCP []tcp.TCP `yaml:"tcp" json:"tcp,omitempty" jsonschema:"title=TCP Probe,description=TCP Probe Configuration"` + Shell []shell.Shell `yaml:"shell" json:"shell,omitempty" jsonschema:"title=Shell Probe,description=Shell Probe Configuration"` + Client []client.Client `yaml:"client" json:"client,omitempty" jsonschema:"title=Native Client Probe,description=Native Client Probe Configuration"` + SSH ssh.SSH `yaml:"ssh" json:"ssh,omitempty" jsonschema:"title=SSH Probe,description=SSH Probe Configuration"` + TLS []tls.TLS `yaml:"tls" json:"tls,omitempty" jsonschema:"title=TLS Probe,description=TLS Probe Configuration"` + Host host.Host `yaml:"host" json:"host,omitempty" jsonschema:"title=Host Probe,description=Host Probe Configuration"` + Ping []ping.Ping `yaml:"ping" json:"ping,omitempty" jsonschema:"title=Ping Probe,description=Ping Probe Configuration"` + WebSocket []websocket.WebSocket `yaml:"websocket" json:"websocket,omitempty" jsonschema:"title=WebSocket Probe,description=WebSocket Probe Configuration"` + Notify notify.Config `yaml:"notify" json:"notify,omitempty" jsonschema:"title=Notification,description=Notification Configuration"` + Settings Settings `yaml:"settings" json:"settings,omitempty" jsonschema:"title=Global Settings,description=EaseProbe Global configuration"` } // JSONSchema return the json schema of the configuration diff --git a/docs/Manual.md b/docs/Manual.md index 81fbfaaa..38f48e9b 100644 --- a/docs/Manual.md +++ b/docs/Manual.md @@ -41,6 +41,7 @@ EaseProbe has the following major modules: - [1.9.5 Kafka](#195-kafka) - [1.9.6 PostgreSQL](#196-postgresql) - [1.9.7 Zookeeper](#197-zookeeper) + - [1.10 WebSocket](#110-websocket) - [2. Notification](#2-notification) - [2.1 Slack](#21-slack) - [2.2 Discord](#22-discord) @@ -850,6 +851,29 @@ client: ca: /path/to/file.ca cert: /path/to/file.crt key: /path/to/file.key +``` +## 1.10 WebSocket + +The websocket probe uses `websocket` identifier, it pings a websocket server with Ping/Pong message type of the WebSocket Protocol. + +```yaml +websocket: + - name: asr-server + url: wss://example.com/asr/ + - name: tts-server + url: wss://example.com/tts/ + timeout: 5s + interval: 30s + headers: + Authorization: Bearer 2322f5d2-52d7-11ee-be56-0242ac120002 + proxy: http://192.168.18.7 + labels: + service: tts + idc: idc-a + + + + ``` diff --git a/go.mod b/go.mod index 428bcdfa..6004842a 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/go-redis/redis/v8 v8.11.5 github.com/go-sql-driver/mysql v1.7.1 github.com/go-zookeeper/zk v1.0.3 + github.com/gorilla/websocket v1.5.0 github.com/prometheus-community/pro-bing v0.3.0 github.com/prometheus/client_golang v1.16.0 github.com/segmentio/kafka-go v0.4.42 diff --git a/go.sum b/go.sum index b38e7a1a..2f3cc639 100644 --- a/go.sum +++ b/go.sum @@ -68,6 +68,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= github.com/iancoleman/orderedmap v0.0.0-20190318233801-ac98e3ecb4b0 h1:i462o439ZjprVSFSZLZxcsoAe592sZB1rci2Z8j4wdk= github.com/iancoleman/orderedmap v0.0.0-20190318233801-ac98e3ecb4b0/go.mod h1:N0Wam8K1arqPXNWjMo21EXnBPOPp36vB07FNRdD2geA= diff --git a/metric/prometheus.go b/metric/prometheus.go index c2e95560..42a1ae20 100644 --- a/metric/prometheus.go +++ b/metric/prometheus.go @@ -118,8 +118,8 @@ func mergeLabels(labels []string, constLabels prometheus.Labels) []string { l := make([]string, 0, len(labels)+len(constLabels)) l = append(l, labels...) - for k, _ := range constLabels { - l = append(l, k) + for labelName := range constLabels { + l = append(l, labelName) } return l @@ -208,6 +208,8 @@ func RemoveInvalidChars(name string) string { return string(result) } +// AddConstLabels append user defined labels in the configuration file to the +// predefined label set. func AddConstLabels(labels prometheus.Labels, constLabels prometheus.Labels) prometheus.Labels { for k, v := range constLabels { labels[k] = v diff --git a/probe/websocket/ws.go b/probe/websocket/ws.go new file mode 100644 index 00000000..7513837c --- /dev/null +++ b/probe/websocket/ws.go @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2022, MegaEase + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package websocket + +import ( + "crypto/tls" + "fmt" + "net/http" + "net/url" + "time" + + "github.com/gorilla/websocket" + log "github.com/sirupsen/logrus" + + "github.com/megaease/easeprobe/global" + "github.com/megaease/easeprobe/probe/base" +) + +// WebSocket implements a Config for a websocket prober. +type WebSocket struct { + base.DefaultProbe `yaml:",inline"` + URL string `yaml:"url" json:"url" jsonschema:"format=uri,title=WebSocket URL,description=WebSocket URL to probe"` + Proxy string `yaml:"proxy" json:"proxy,omitempty" jsonschema:"format=url,title=Proxy Server,description=proxy to use for the HTTP request"` + Headers map[string]string `yaml:"headers,omitempty" json:"headers,omitempty" jsonschema:"title=HTTP Headers,description=HTTP headers for the initial HTTP request"` + + proxy *url.URL +} + +// Config Websocket config Object +func (h *WebSocket) Config(gConf global.ProbeSettings) error { + kind := "websocket" + tag := "" + name := h.ProbeName + h.DefaultProbe.Config(gConf, kind, tag, name, h.URL, h.DoProbe) + + url, err := url.Parse(h.URL) + if err != nil { + return err + } + + if url.Scheme != "ws" && url.Scheme != "wss" { + return fmt.Errorf(`the scheme should be "ws" or "wss", but got: %s`, url.Scheme) + } + + if h.Proxy != "" { + h.proxy, err = url.Parse(h.Proxy) + if err != nil { + return err + } + } + + return nil +} + +// DoProbe return the checking result +func (h *WebSocket) DoProbe() (bool, string) { + wsHeader := make(http.Header) + for k, v := range h.Headers { + wsHeader.Set(k, v) + } + + begin := time.Now() + remaining := h.ProbeTimeout + + var dial = websocket.DefaultDialer + dial.TLSClientConfig = &tls.Config{ + InsecureSkipVerify: true, + } + dial.HandshakeTimeout = remaining + if h.proxy != nil { + dial.Proxy = func(request *http.Request) (*url.URL, error) { + return h.proxy, nil + } + } + + ws, _, err := dial.Dial(h.URL, wsHeader) + if err != nil { + return false, err.Error() + } + + defer ws.Close() + + pingPongChan := make(chan struct{}) + ws.SetPongHandler(func(appData string) error { + pingPongChan <- struct{}{} + return nil + }) + + // doing nothing but trigger the read message loop to receive the + // Pong and Close messages from the server. exit after ws.Close() + go func() { + for { + _, _, err := ws.ReadMessage() + if err != nil { + break + } + } + }() + + remaining = h.ProbeTimeout - time.Since(begin) + err = ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(remaining)) + if err != nil { + return false, err.Error() + } + + remaining = h.ProbeTimeout - time.Since(begin) + t := time.NewTimer(remaining) + defer t.Stop() + + select { + case <-t.C: + return false, "ping timeout" + case <-pingPongChan: + remaining = h.ProbeTimeout - time.Since(begin) + closeCode := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "") + // try to do a graceful close, but do not care the result + err := ws.WriteControl(websocket.CloseMessage, closeCode, time.Now().Add(remaining)) + if err != nil { + log.Error(err) + } + + return true, "" + } +} diff --git a/probe/websocket/ws_test.go b/probe/websocket/ws_test.go new file mode 100644 index 00000000..e0a0fe66 --- /dev/null +++ b/probe/websocket/ws_test.go @@ -0,0 +1,118 @@ +/* + * Copyright (c) 2022, MegaEase + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package websocket + +import ( + "fmt" + "log" + "net/http" + "testing" + "time" + + "github.com/gorilla/websocket" + "github.com/stretchr/testify/assert" + + "github.com/megaease/easeprobe/global" + "github.com/megaease/easeprobe/probe/base" +) + +type TestCase struct { + URL string + Timeout time.Duration + Headers map[string]string + Want bool +} + +var ( + token = map[string]string{"Authorization": "token 123456"} +) + +func TestWSPing(t *testing.T) { + go func() { + err := http.ListenAndServe(":18080", &Handler{}) + if err != nil { + log.Fatal(err) + } + }() + + testcases := []TestCase{ + {URL: "ws://127.0.0.1:18080/right", Timeout: 500 * time.Millisecond, Headers: token, Want: false}, + {URL: "ws://127.0.0.1:18080/right", Timeout: 2000 * time.Millisecond, Headers: token, Want: true}, + {URL: "ws://127.0.0.1:18080/right", Timeout: 2000 * time.Millisecond, Headers: nil, Want: false}, + {URL: "ws://127.0.0.1:18080/wrong", Timeout: 2000 * time.Millisecond, Headers: token, Want: false}, + } + + for i, test := range testcases { + ws := WebSocket{ + DefaultProbe: base.DefaultProbe{ + ProbeTimeout: test.Timeout, + }, + URL: test.URL, + Headers: test.Headers, + } + ws.Config(global.ProbeSettings{}) + ok, _ := ws.DoProbe() + assert.Equalf(t, test.Want, ok, fmt.Sprintf("case %d", i)) + + } +} + +type Handler struct { +} + +func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + var u = websocket.Upgrader{} // use default options + reqToken := r.Header.Get("Authorization") + + if r.URL.Path != "/right" { + w.WriteHeader(http.StatusNotFound) + return + } + + if reqToken != token["Authorization"] { + w.WriteHeader(http.StatusUnauthorized) + return + } + + c, err := u.Upgrade(w, r, nil) + if err != nil { + log.Print("upgrade:", err) + return + } + defer c.Close() + + c.SetPingHandler(func(appData string) error { + log.Printf("ping from: %s", appData) + time.Sleep(1 * time.Second) + c.WriteControl(websocket.PongMessage, []byte{}, time.Now().Add(1*time.Second)) + return nil + }) + + c.SetCloseHandler(func(code int, text string) error { + fmt.Printf("closed") + return nil + }) + + for { + // do nothing but trigger the loop to handle Ping/Pong message internally + _, _, err := c.ReadMessage() + if err != nil { + break + } + } +}