Skip to content

Commit

Permalink
Websocket (#411)
Browse files Browse the repository at this point in the history
* add websocket prober

* fix, 1) remove websocket from the notify module. 2) remove the pointer

* fix doc

* fix accordking to issue review
  • Loading branch information
qdongxu authored Sep 14, 2023
1 parent 9fc5efc commit 27d8368
Show file tree
Hide file tree
Showing 7 changed files with 301 additions and 13 deletions.
24 changes: 13 additions & 11 deletions conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/megaease/easeprobe/probe/ssh"
"github.com/megaease/easeprobe/probe/tcp"
"github.com/megaease/easeprobe/probe/tls"
"github.com/megaease/easeprobe/probe/websocket"

"github.com/invopop/jsonschema"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -138,17 +139,18 @@ type Settings struct {

// Conf is Probe configuration
type Conf struct {
Version string `yaml:"version" json:"version,omitempty" jsonschema:"title=Version,description=Version of the EaseProbe configuration"`
HTTP []http.HTTP `yaml:"http" json:"http,omitempty" jsonschema:"title=HTTP Probe,description=HTTP Probe Configuration"`
TCP []tcp.TCP `yaml:"tcp" json:"tcp,omitempty" jsonschema:"title=TCP Probe,description=TCP Probe Configuration"`
Shell []shell.Shell `yaml:"shell" json:"shell,omitempty" jsonschema:"title=Shell Probe,description=Shell Probe Configuration"`
Client []client.Client `yaml:"client" json:"client,omitempty" jsonschema:"title=Native Client Probe,description=Native Client Probe Configuration"`
SSH ssh.SSH `yaml:"ssh" json:"ssh,omitempty" jsonschema:"title=SSH Probe,description=SSH Probe Configuration"`
TLS []tls.TLS `yaml:"tls" json:"tls,omitempty" jsonschema:"title=TLS Probe,description=TLS Probe Configuration"`
Host host.Host `yaml:"host" json:"host,omitempty" jsonschema:"title=Host Probe,description=Host Probe Configuration"`
Ping []ping.Ping `yaml:"ping" json:"ping,omitempty" jsonschema:"title=Ping Probe,description=Ping Probe Configuration"`
Notify notify.Config `yaml:"notify" json:"notify,omitempty" jsonschema:"title=Notification,description=Notification Configuration"`
Settings Settings `yaml:"settings" json:"settings,omitempty" jsonschema:"title=Global Settings,description=EaseProbe Global configuration"`
Version string `yaml:"version" json:"version,omitempty" jsonschema:"title=Version,description=Version of the EaseProbe configuration"`
HTTP []http.HTTP `yaml:"http" json:"http,omitempty" jsonschema:"title=HTTP Probe,description=HTTP Probe Configuration"`
TCP []tcp.TCP `yaml:"tcp" json:"tcp,omitempty" jsonschema:"title=TCP Probe,description=TCP Probe Configuration"`
Shell []shell.Shell `yaml:"shell" json:"shell,omitempty" jsonschema:"title=Shell Probe,description=Shell Probe Configuration"`
Client []client.Client `yaml:"client" json:"client,omitempty" jsonschema:"title=Native Client Probe,description=Native Client Probe Configuration"`
SSH ssh.SSH `yaml:"ssh" json:"ssh,omitempty" jsonschema:"title=SSH Probe,description=SSH Probe Configuration"`
TLS []tls.TLS `yaml:"tls" json:"tls,omitempty" jsonschema:"title=TLS Probe,description=TLS Probe Configuration"`
Host host.Host `yaml:"host" json:"host,omitempty" jsonschema:"title=Host Probe,description=Host Probe Configuration"`
Ping []ping.Ping `yaml:"ping" json:"ping,omitempty" jsonschema:"title=Ping Probe,description=Ping Probe Configuration"`
WebSocket []websocket.WebSocket `yaml:"websocket" json:"websocket,omitempty" jsonschema:"title=WebSocket Probe,description=WebSocket Probe Configuration"`
Notify notify.Config `yaml:"notify" json:"notify,omitempty" jsonschema:"title=Notification,description=Notification Configuration"`
Settings Settings `yaml:"settings" json:"settings,omitempty" jsonschema:"title=Global Settings,description=EaseProbe Global configuration"`
}

// JSONSchema return the json schema of the configuration
Expand Down
24 changes: 24 additions & 0 deletions docs/Manual.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ EaseProbe has the following major modules:
- [1.9.5 Kafka](#195-kafka)
- [1.9.6 PostgreSQL](#196-postgresql)
- [1.9.7 Zookeeper](#197-zookeeper)
- [1.10 WebSocket](#110-websocket)
- [2. Notification](#2-notification)
- [2.1 Slack](#21-slack)
- [2.2 Discord](#22-discord)
Expand Down Expand Up @@ -850,6 +851,29 @@ client:
ca: /path/to/file.ca
cert: /path/to/file.crt
key: /path/to/file.key
```
## 1.10 WebSocket
The websocket probe uses `websocket` identifier, it pings a websocket server with Ping/Pong message type of the WebSocket Protocol.

```yaml
websocket:
- name: asr-server
url: wss://example.com/asr/
- name: tts-server
url: wss://example.com/tts/
timeout: 5s
interval: 30s
headers:
Authorization: Bearer 2322f5d2-52d7-11ee-be56-0242ac120002
proxy: http://192.168.18.7
labels:
service: tts
idc: idc-a
```


Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/go-redis/redis/v8 v8.11.5
github.com/go-sql-driver/mysql v1.7.1
github.com/go-zookeeper/zk v1.0.3
github.com/gorilla/websocket v1.5.0
github.com/prometheus-community/pro-bing v0.3.0
github.com/prometheus/client_golang v1.16.0
github.com/segmentio/kafka-go v0.4.42
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4=
github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM=
github.com/iancoleman/orderedmap v0.0.0-20190318233801-ac98e3ecb4b0 h1:i462o439ZjprVSFSZLZxcsoAe592sZB1rci2Z8j4wdk=
github.com/iancoleman/orderedmap v0.0.0-20190318233801-ac98e3ecb4b0/go.mod h1:N0Wam8K1arqPXNWjMo21EXnBPOPp36vB07FNRdD2geA=
Expand Down
6 changes: 4 additions & 2 deletions metric/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ func mergeLabels(labels []string, constLabels prometheus.Labels) []string {
l := make([]string, 0, len(labels)+len(constLabels))
l = append(l, labels...)

for k, _ := range constLabels {
l = append(l, k)
for labelName := range constLabels {
l = append(l, labelName)
}

return l
Expand Down Expand Up @@ -208,6 +208,8 @@ func RemoveInvalidChars(name string) string {
return string(result)
}

// AddConstLabels append user defined labels in the configuration file to the
// predefined label set.
func AddConstLabels(labels prometheus.Labels, constLabels prometheus.Labels) prometheus.Labels {
for k, v := range constLabels {
labels[k] = v
Expand Down
139 changes: 139 additions & 0 deletions probe/websocket/ws.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright (c) 2022, MegaEase
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package websocket

import (
"crypto/tls"
"fmt"
"net/http"
"net/url"
"time"

"github.com/gorilla/websocket"
log "github.com/sirupsen/logrus"

"github.com/megaease/easeprobe/global"
"github.com/megaease/easeprobe/probe/base"
)

// WebSocket implements a Config for a websocket prober.
type WebSocket struct {
base.DefaultProbe `yaml:",inline"`
URL string `yaml:"url" json:"url" jsonschema:"format=uri,title=WebSocket URL,description=WebSocket URL to probe"`
Proxy string `yaml:"proxy" json:"proxy,omitempty" jsonschema:"format=url,title=Proxy Server,description=proxy to use for the HTTP request"`
Headers map[string]string `yaml:"headers,omitempty" json:"headers,omitempty" jsonschema:"title=HTTP Headers,description=HTTP headers for the initial HTTP request"`

proxy *url.URL
}

// Config Websocket config Object
func (h *WebSocket) Config(gConf global.ProbeSettings) error {
kind := "websocket"
tag := ""
name := h.ProbeName
h.DefaultProbe.Config(gConf, kind, tag, name, h.URL, h.DoProbe)

url, err := url.Parse(h.URL)
if err != nil {
return err
}

if url.Scheme != "ws" && url.Scheme != "wss" {
return fmt.Errorf(`the scheme should be "ws" or "wss", but got: %s`, url.Scheme)
}

if h.Proxy != "" {
h.proxy, err = url.Parse(h.Proxy)
if err != nil {
return err
}
}

return nil
}

// DoProbe return the checking result
func (h *WebSocket) DoProbe() (bool, string) {
wsHeader := make(http.Header)
for k, v := range h.Headers {
wsHeader.Set(k, v)
}

begin := time.Now()
remaining := h.ProbeTimeout

var dial = websocket.DefaultDialer
dial.TLSClientConfig = &tls.Config{
InsecureSkipVerify: true,
}
dial.HandshakeTimeout = remaining
if h.proxy != nil {
dial.Proxy = func(request *http.Request) (*url.URL, error) {
return h.proxy, nil
}
}

ws, _, err := dial.Dial(h.URL, wsHeader)
if err != nil {
return false, err.Error()
}

defer ws.Close()

pingPongChan := make(chan struct{})
ws.SetPongHandler(func(appData string) error {
pingPongChan <- struct{}{}
return nil
})

// doing nothing but trigger the read message loop to receive the
// Pong and Close messages from the server. exit after ws.Close()
go func() {
for {
_, _, err := ws.ReadMessage()
if err != nil {
break
}
}
}()

remaining = h.ProbeTimeout - time.Since(begin)
err = ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(remaining))
if err != nil {
return false, err.Error()
}

remaining = h.ProbeTimeout - time.Since(begin)
t := time.NewTimer(remaining)
defer t.Stop()

select {
case <-t.C:
return false, "ping timeout"
case <-pingPongChan:
remaining = h.ProbeTimeout - time.Since(begin)
closeCode := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
// try to do a graceful close, but do not care the result
err := ws.WriteControl(websocket.CloseMessage, closeCode, time.Now().Add(remaining))
if err != nil {
log.Error(err)
}

return true, ""
}
}
118 changes: 118 additions & 0 deletions probe/websocket/ws_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright (c) 2022, MegaEase
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package websocket

import (
"fmt"
"log"
"net/http"
"testing"
"time"

"github.com/gorilla/websocket"
"github.com/stretchr/testify/assert"

"github.com/megaease/easeprobe/global"
"github.com/megaease/easeprobe/probe/base"
)

type TestCase struct {
URL string
Timeout time.Duration
Headers map[string]string
Want bool
}

var (
token = map[string]string{"Authorization": "token 123456"}
)

func TestWSPing(t *testing.T) {
go func() {
err := http.ListenAndServe(":18080", &Handler{})
if err != nil {
log.Fatal(err)
}
}()

testcases := []TestCase{
{URL: "ws://127.0.0.1:18080/right", Timeout: 500 * time.Millisecond, Headers: token, Want: false},
{URL: "ws://127.0.0.1:18080/right", Timeout: 2000 * time.Millisecond, Headers: token, Want: true},
{URL: "ws://127.0.0.1:18080/right", Timeout: 2000 * time.Millisecond, Headers: nil, Want: false},
{URL: "ws://127.0.0.1:18080/wrong", Timeout: 2000 * time.Millisecond, Headers: token, Want: false},
}

for i, test := range testcases {
ws := WebSocket{
DefaultProbe: base.DefaultProbe{
ProbeTimeout: test.Timeout,
},
URL: test.URL,
Headers: test.Headers,
}
ws.Config(global.ProbeSettings{})
ok, _ := ws.DoProbe()
assert.Equalf(t, test.Want, ok, fmt.Sprintf("case %d", i))

}
}

type Handler struct {
}

func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var u = websocket.Upgrader{} // use default options
reqToken := r.Header.Get("Authorization")

if r.URL.Path != "/right" {
w.WriteHeader(http.StatusNotFound)
return
}

if reqToken != token["Authorization"] {
w.WriteHeader(http.StatusUnauthorized)
return
}

c, err := u.Upgrade(w, r, nil)
if err != nil {
log.Print("upgrade:", err)
return
}
defer c.Close()

c.SetPingHandler(func(appData string) error {
log.Printf("ping from: %s", appData)
time.Sleep(1 * time.Second)
c.WriteControl(websocket.PongMessage, []byte{}, time.Now().Add(1*time.Second))
return nil
})

c.SetCloseHandler(func(code int, text string) error {
fmt.Printf("closed")
return nil
})

for {
// do nothing but trigger the loop to handle Ping/Pong message internally
_, _, err := c.ReadMessage()
if err != nil {
break
}
}
}

0 comments on commit 27d8368

Please sign in to comment.