Browse Source

Guarantee reading happens in order and add some comments

pull/1422/head
Alex Browne 5 years ago
committed by Steven Allen
parent
commit
4979f94d4e
  1. 138
      p2p/transport/websocket/conn_js.go

138
p2p/transport/websocket/conn_js.go

@ -18,7 +18,9 @@ const (
webSocketStateClosed = 3 webSocketStateClosed = 3
) )
var errIsClosed = errors.New("connection is closed") const incomingDataBufferSize = 100
var errConnectionClosed = errors.New("connection is closed")
// Conn implements net.Conn interface for WebSockets in js/wasm. // Conn implements net.Conn interface for WebSockets in js/wasm.
type Conn struct { type Conn struct {
@ -27,50 +29,78 @@ type Conn struct {
closeHandler *js.Func closeHandler *js.Func
mut sync.Mutex mut sync.Mutex
incomingData chan []byte incomingData chan []byte
currData *bytes.Buffer currDataMut sync.RWMutex
currData bytes.Buffer
closeSignal chan struct{} closeSignal chan struct{}
dataSignal chan struct{}
}
// NewConn creates a Conn given a regular js/wasm WebSocket Conn.
func NewConn(raw js.Value) *Conn {
conn := &Conn{
Value: raw,
incomingData: make(chan []byte, incomingDataBufferSize),
closeSignal: make(chan struct{}),
dataSignal: make(chan struct{}),
}
conn.setUpHandlers()
go func() {
// TODO(albrow): Handle error appropriately
err := conn.readLoop()
if err != nil {
panic(err)
}
}()
return conn
} }
func (c *Conn) Read(b []byte) (int, error) { func (c *Conn) Read(b []byte) (int, error) {
c.mut.Lock()
defer c.mut.Unlock()
if err := c.checkOpen(); err != nil { if err := c.checkOpen(); err != nil {
return 0, io.EOF return 0, io.EOF
} }
if c.currData == nil { for {
c.currDataMut.RLock()
n, err := c.currData.Read(b)
c.currDataMut.RUnlock()
if err != nil && err != io.EOF {
// Return any unexpected errors immediately.
return n, err
} else if n == 0 || err == io.EOF {
// There is no data ready to be read. Wait for more data or for the
// connection to be closed.
select { select {
case data := <-c.incomingData: case <-c.dataSignal:
c.currData = bytes.NewBuffer(data) continue
case <-c.closeSignal: case <-c.closeSignal:
return 0, io.EOF return 0, io.EOF
} }
} else {
return n, err
} }
n, err := c.currData.Read(b)
if err == io.EOF {
c.currData = nil
return n, nil
} }
return n, err
} }
// checkOpen returns an error if the connection is not open. Otherwise, it
// returns nil.
func (c *Conn) checkOpen() error { func (c *Conn) checkOpen() error {
state := c.Get("readyState").Int() state := c.Get("readyState").Int()
switch state { switch state {
case webSocketStateClosed, webSocketStateClosing: case webSocketStateClosed, webSocketStateClosing:
return errIsClosed return errConnectionClosed
} }
return nil return nil
} }
func (c *Conn) Write(b []byte) (n int, err error) { func (c *Conn) Write(b []byte) (n int, err error) {
// c.mut.Lock()
// defer c.mut.Unlock()
if err := c.checkOpen(); err != nil { if err := c.checkOpen(); err != nil {
return 0, err return 0, err
} }
c.Call("send", string(b)) uint8Array := js.Global().Get("Uint8Array").New(len(b))
for i := 0; i < len(b); i++ {
uint8Array.SetIndex(i, b[i])
}
c.Call("send", uint8Array.Get("buffer"))
return len(b), nil return len(b), nil
} }
@ -78,8 +108,8 @@ func (c *Conn) Write(b []byte) (n int, err error) {
// close error, subsequent and concurrent calls will return nil. // close error, subsequent and concurrent calls will return nil.
// This method is thread-safe. // This method is thread-safe.
func (c *Conn) Close() error { func (c *Conn) Close() error {
// c.mut.Lock() c.mut.Lock()
// defer c.mut.Unlock() defer c.mut.Unlock()
c.Call("close") c.Call("close")
if c.messageHandler != nil { if c.messageHandler != nil {
c.Call("removeEventListener", "message", *c.messageHandler) c.Call("removeEventListener", "message", *c.messageHandler)
@ -93,7 +123,6 @@ func (c *Conn) Close() error {
} }
func (c *Conn) LocalAddr() net.Addr { func (c *Conn) LocalAddr() net.Addr {
// TODO(albrow): is there some way to get the local address?
return NewAddr("0.0.0.0:0") return NewAddr("0.0.0.0:0")
} }
@ -119,20 +148,9 @@ func (c *Conn) SetWriteDeadline(t time.Time) error {
return nil return nil
} }
// NewConn creates a Conn given a regular js/wasm WebSocket Conn.
func NewConn(raw js.Value) *Conn {
conn := &Conn{
Value: raw,
incomingData: make(chan []byte),
closeSignal: make(chan struct{}),
}
conn.setUpHandlers()
return conn
}
func (c *Conn) setUpHandlers() { func (c *Conn) setUpHandlers() {
// c.mut.Lock() c.mut.Lock()
// defer c.mut.Unlock() defer c.mut.Unlock()
if c.messageHandler != nil { if c.messageHandler != nil {
// Message handlers already created. Nothing to do. // Message handlers already created. Nothing to do.
return return
@ -142,8 +160,8 @@ func (c *Conn) setUpHandlers() {
// TODO(albrow): Currently we assume data is of type Blob. Really we // TODO(albrow): Currently we assume data is of type Blob. Really we
// should check binaryType and then decode accordingly. // should check binaryType and then decode accordingly.
blob := args[0].Get("data") blob := args[0].Get("data")
text := readBlob(blob) data := readBlob(blob)
c.incomingData <- []byte(text) c.incomingData <- data
}() }()
return nil return nil
}) })
@ -158,39 +176,55 @@ func (c *Conn) setUpHandlers() {
c.Call("addEventListener", "close", closeHandler) c.Call("addEventListener", "close", closeHandler)
} }
// readLoop continuosly reads from the c.incoming data channel and writes to the
// current data buffer.
func (c *Conn) readLoop() error {
for data := range c.incomingData {
c.currDataMut.Lock()
_, err := c.currData.Write(data)
if err != nil {
c.currDataMut.Unlock()
return err
}
c.currDataMut.Unlock()
c.dataSignal <- struct{}{}
}
return nil
}
func (c *Conn) waitForOpen() error { func (c *Conn) waitForOpen() error {
openSignal := make(chan struct{}) openSignal := make(chan struct{})
handler := js.FuncOf(func(this js.Value, args []js.Value) interface{} { handler := js.FuncOf(func(this js.Value, args []js.Value) interface{} {
close(openSignal) close(openSignal)
return nil return nil
}) })
defer func() { defer c.Call("removeEventListener", "open", handler)
c.Call("removeEventListener", "open", handler)
}()
defer handler.Release() defer handler.Release()
c.Call("addEventListener", "open", handler) c.Call("addEventListener", "open", handler)
<-openSignal <-openSignal
return nil return nil
} }
func readBlob(blob js.Value) string { // readBlob converts a JavaScript Blob into a slice of bytes. It uses the
// const reader = new FileReader(); // FileReader API under the hood.
// reader.addEventListener('loadend', (e) => { func readBlob(blob js.Value) []byte {
// const text = e.srcElement.result;
// console.log(text);
// });
// reader.readAsText(blb);
reader := js.Global().Get("FileReader").New() reader := js.Global().Get("FileReader").New()
textChan := make(chan string) dataChan := make(chan []byte)
loadEndFunc := js.FuncOf(func(this js.Value, args []js.Value) interface{} { loadEndFunc := js.FuncOf(func(this js.Value, args []js.Value) interface{} {
go func() { // event.result is of type ArrayBuffer. We can convert it to a JavaScript
text := args[0].Get("srcElement").Get("result").String() // Uint8Array, which is directly translatable to the Go type []byte.
textChan <- text buffer := reader.Get("result")
}() view := js.Global().Get("Uint8Array").New(buffer)
dataLen := view.Get("length").Int()
data := make([]byte, dataLen)
for i := 0; i < dataLen; i++ {
data[i] = byte(view.Index(i).Int())
}
dataChan <- data
return nil return nil
}) })
defer loadEndFunc.Release() defer loadEndFunc.Release()
reader.Call("addEventListener", "loadend", loadEndFunc) reader.Call("addEventListener", "loadend", loadEndFunc)
reader.Call("readAsText", blob) reader.Call("readAsArrayBuffer", blob)
return <-textChan return <-dataChan
} }

Loading…
Cancel
Save