Skip to content

Commit

Permalink
done websocket protocol &
Browse files Browse the repository at this point in the history
  • Loading branch information
brewlin committed Oct 14, 2019
1 parent 2797cac commit 0389800
Show file tree
Hide file tree
Showing 11 changed files with 119 additions and 79 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
`./cmd/*.md`
## @application 应用层
- [x] http [docs](./cmd/http.md)
- [ ] websocket [docs](./cmd/websocket.md)
- [x] websocket [docs](./cmd/websocket.md)

## @transport 传输层
- [x] tcp [docs](./cmd/tcp.md)
Expand Down
15 changes: 10 additions & 5 deletions cmd/application/websocket/websocketserver.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"fmt"
"log"

"github.com/brewlin/net-protocol/pkg/logging"
Expand All @@ -13,20 +14,23 @@ func init() {
}
func main() {
serv := http.NewHTTP("tap1", "192.168.1.0/24", "192.168.1.1", "9502")
serv.HandleFunc("/websocket", echo)
serv.HandleFunc("/ws", echo)

serv.HandleFunc("/", func(request *http.Request, response *http.Response) {
response.End("hello")
})
fmt.Println("@main: server is start ip:192.168.1.1 port:9502 ")
serv.ListenAndServ()
}

//websocket处理器
func echo(r *http.Request, w *http.Response) {
//协议升级
fmt.Println("got http request ; start to upgrade websocket protocol....")
//协议升级 c *websocket.Conn
c, err := websocket.Upgrade(r, w)
if err != nil {
log.Print("Upgrade error:", err)
//升级协议失败,直接return 交由http处理响应
fmt.Println("Upgrade error:", err)
return
}
defer c.Close()
Expand All @@ -37,7 +41,8 @@ func echo(r *http.Request, w *http.Response) {
log.Println("read:", err)
break
}
log.Printf("recv:%s", message)
c.SendData(message)
fmt.Println("recv client msg:", string(message))
// c.SendData(message )
c.SendData([]byte("hello"))
}
}
71 changes: 58 additions & 13 deletions protocol/application/http/connection.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package http

import (
"fmt"
"errors"
"log"
"sync"

"github.com/brewlin/net-protocol/pkg/buffer"
"github.com/brewlin/net-protocol/pkg/waiter"
Expand All @@ -29,6 +30,10 @@ type Connection struct {
// 请求文件的真实路径
real_path string

//接受队列缓存区
buf buffer.View
bufmu sync.RWMutex

q *waiter.Queue
waitEntry waiter.Entry
notifyC chan struct{}
Expand Down Expand Up @@ -64,7 +69,6 @@ func newCon(e tcpip.Endpoint, q *waiter.Queue) *Connection {
func (con *Connection) handler() {
<-con.notifyC
log.Println("@应用层 http: waiting new event trigger ...")
fmt.Println("@应用层 http: waiting new event trigger ...")
for {
v, _, err := con.socket.Read(con.addr)
if err != nil {
Expand All @@ -76,8 +80,8 @@ func (con *Connection) handler() {
}
con.recv_buf += string(v)
}
fmt.Println("http协议原始数据:")
fmt.Println(con.recv_buf)
log.Println("http协议原始数据:")
log.Println(con.recv_buf)
con.request.parse(con)
//dispatch the route request
defaultMux.dispatch(con)
Expand All @@ -92,22 +96,63 @@ func (c *Connection) set_status_code(code int) {
}

//Write write
func (c *Connection) Write(buf []byte) *tcpip.Error {
func (c *Connection) Write(buf []byte) error {
v := buffer.View(buf)
_, _, err := c.socket.Write(tcpip.SlicePayload(v),
c.socket.Write(tcpip.SlicePayload(v),
tcpip.WriteOptions{To: c.addr})
return err
return nil
}

//Read data
func (c *Connection) Read(p []byte) (int, error) {
buf, _, err := c.socket.Read(c.addr)
if err != nil {
return 0, err
func (c *Connection) Read() ([]byte, error) {

var buf []byte
var err error
for {
v, _, e := c.socket.Read(c.addr)
if e != nil {
err = e
break
}
buf = append(buf, v...)
}
n := copy(p, buf)
return n, nil
if buf == nil {
return nil, err
}
return buf, nil

}

//Readn 读取固定字节的数据
func (c *Connection) Readn(p []byte) (int, error) {
c.bufmu.Lock()
defer c.bufmu.Unlock()
//获取足够长度的字节
if len(p) > len(c.buf) {

for {
if len(p) <= len(c.buf) {
break
}
buf, _, err := c.socket.Read(c.addr)
if err != nil {
if err == tcpip.ErrWouldBlock {
//阻塞等待数据
<-c.notifyC
continue
}
return 0, err
}
c.buf = append(c.buf, buf...)
}
}
if len(p) > len(c.buf) {
return 0, errors.New("package len is smaller than p need")
}

n := copy(p, c.buf)
c.buf = c.buf[len(p):]
return n, nil
}

//关闭连接
Expand Down
14 changes: 7 additions & 7 deletions protocol/application/http/request.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package http

import (
"fmt"
"log"
"strings"
)

Expand Down Expand Up @@ -36,7 +36,7 @@ func (req *Request) parse(con *Connection) {
buf := con.recv_buf

req.method_raw, buf = match_until(buf, " ")
fmt.Println("@application http: header parse method_raw:", req.method_raw)
log.Println("@application http: header parse method_raw:", req.method_raw)

if req.method_raw == "" {
con.status_code = 400
Expand All @@ -45,7 +45,7 @@ func (req *Request) parse(con *Connection) {

// 获得HTTP方法
req.method = get_method(req.method_raw)
fmt.Println("@application http: header parse method:", req.method)
log.Println("@application http: header parse method:", req.method)

if req.method == HTTP_METHOD_NOT_SUPPORTED {
con.set_status_code(501)
Expand All @@ -56,7 +56,7 @@ func (req *Request) parse(con *Connection) {

// 获得URI
req.uri, buf = match_until(buf, " ")
fmt.Println("@application http: header parse uri:", req.uri)
log.Println("@application http: header parse uri:", req.uri)

if req.uri == "" {
con.status_code = 400
Expand All @@ -80,7 +80,7 @@ func (req *Request) parse(con *Connection) {

// 获得HTTP版本
req.version_raw, buf = match_until(buf, "\r\n")
fmt.Println("@application http: header parse version_raw:", req.version_raw)
log.Println("@application http: header parse version_raw:", req.version_raw)

if req.version_raw == "" {
con.status_code = 400
Expand All @@ -95,8 +95,8 @@ func (req *Request) parse(con *Connection) {
} else {
con.set_status_code(400)
}
fmt.Println("@application http: header parse version:", req.version)
fmt.Println("@application http: header parse status_code:", con.status_code)
log.Println("@application http: header parse version:", req.version)
log.Println("@application http: header parse status_code:", con.status_code)
if con.status_code > 0 {
return
}
Expand Down
6 changes: 3 additions & 3 deletions protocol/application/http/response.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package http

import (
"fmt"
"log"
"strconv"

"github.com/brewlin/net-protocol/pkg/buffer"
Expand Down Expand Up @@ -133,8 +133,8 @@ func (r *Response) build_and_send_response() {
}
buf += "\r\n"
buf += r.entity_body
fmt.Println("@application http:response send 构建http响应包体")
fmt.Println(buf)
log.Println("@application http:response send 构建http响应包体")
log.Println(buf)
// 将字符串缓存发送到客户端
r.send_all(buf)
}
Expand Down
8 changes: 3 additions & 5 deletions protocol/application/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package http

import (
"flag"
"fmt"
"log"
"net"
"strconv"
Expand Down Expand Up @@ -144,11 +143,10 @@ func (s *Server) ListenAndServ() {
if err != nil {
if err == tcpip.ErrWouldBlock {
log.Println("@application http:", " now waiting to new client connection ...")
fmt.Println("@application http:", " now waiting to new client connection ...")
<-notifyCh
continue
}
fmt.Println("@application http: Accept() failed: ", err)
log.Println("@application http: Accept() failed: ", err)
panic(err)
}

Expand All @@ -157,9 +155,9 @@ func (s *Server) ListenAndServ() {
}

func (s *Server) dispatch(e tcpip.Endpoint, wq *waiter.Queue) {
fmt.Println("@application http: dispatch got new request")
log.Println("@application http: dispatch got new request")
con := newCon(e, wq)
con.handler()
fmt.Println("@application http: dispatch close this request")
log.Println("@application http: dispatch close this request")
con.Close()
}
8 changes: 0 additions & 8 deletions protocol/application/http/server_patttern.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,6 @@ type muxEntry struct {
pattern string
}

//NewMuxEntry entry
func NewMuxEntry(pattern string, handler func(*Request, *Response)) muxEntry {
var entry muxEntry
entry.h = handler
entry.pattern = pattern
return entry
}

var defaultMux ServeMux

//handle
Expand Down
17 changes: 8 additions & 9 deletions protocol/application/websocket/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package websocket
import (
"encoding/binary"
"errors"
"fmt"
"log"

"github.com/brewlin/net-protocol/protocol/application/http"
Expand Down Expand Up @@ -67,7 +66,7 @@ func (c *Conn) SendData(data []byte) {
* => 1 0 0 0 0 0 0 1
*/
c.writeBuf[0] = byte(TextMessage) | finalBit
fmt.Printf("1 bit:%b\n", c.writeBuf[0])
log.Printf("1 bit:%b\n", c.writeBuf[0])

//数据帧第二个字节,服务器发送的数据不需要进行掩码处理
switch {
Expand All @@ -88,7 +87,7 @@ func (c *Conn) SendData(data []byte) {
//c.writeBuf[1] = byte(0x00) | byte(length)
c.writeBuf[1] = byte(length)
}
fmt.Printf("2 bit:%b\n", c.writeBuf[1])
log.Printf("2 bit:%b\n", c.writeBuf[1])

copy(c.writeBuf[payloadStart:], data[:])
c.conn.Write(c.writeBuf[:payloadStart+length])
Expand All @@ -98,12 +97,12 @@ func (c *Conn) SendData(data []byte) {
func (c *Conn) ReadData() (data []byte, err error) {
var b [8]byte
//读取数据帧的前两个字节
if _, err := c.conn.Read(b[:2]); err != nil {
if _, err := c.conn.Readn(b[:2]); err != nil {
return nil, err
}
//开始解析第一个字节 是否还有后续数据帧
final := b[0]&finalBit != 0
fmt.Printf("read data 1 bit :%b\n", b[0])
log.Printf("read data 1 bit :%b\n", b[0])
//不支持数据分片
if !final {
log.Println("Recived fragemented frame,not support")
Expand Down Expand Up @@ -138,13 +137,13 @@ func (c *Conn) ReadData() (data []byte, err error) {
//根据payload length 判断数据的真实长度
switch payloadLen {
case 126: //扩展2字节
if _, err := c.conn.Read(b[:2]); err != nil {
if _, err := c.conn.Readn(b[:2]); err != nil {
return nil, err
}
//获取扩展二字节的真实数据长度
dataLen = int64(binary.BigEndian.Uint16(b[:2]))
case 127:
if _, err := c.conn.Read(b[:8]); err != nil {
if _, err := c.conn.Readn(b[:8]); err != nil {
return nil, err
}
dataLen = int64(binary.BigEndian.Uint64(b[:8]))
Expand All @@ -154,13 +153,13 @@ func (c *Conn) ReadData() (data []byte, err error) {
//读取mask key
if mask { //如果需要掩码处理的话 需要取出key
//maskKey 是 4 字节 32位
if _, err := c.conn.Read(c.maskKey[:]); err != nil {
if _, err := c.conn.Readn(c.maskKey[:]); err != nil {
return nil, err
}
}
//读取数据内容
p := make([]byte, dataLen)
if _, err := c.conn.Read(p); err != nil {
if _, err := c.conn.Readn(p); err != nil {
return nil, err
}
if mask {
Expand Down
Loading

0 comments on commit 0389800

Please sign in to comment.