Skip to content
This repository has been archived by the owner on Apr 19, 2023. It is now read-only.

Support WebSocket connection #1

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 95 additions & 2 deletions connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package stompngo

import (
"bufio"
"github.com/gorilla/websocket"
"log"
"os"

Expand All @@ -35,6 +36,10 @@ func NewConnector(n net.Conn, h Headers) (STOMPConnector, error) {
return Connect(n, h)
}

func NewConnectorOverWS(n *websocket.Conn, h Headers) (STOMPConnector, error) {
return ConnectOverWS(n, h)
}

/*
Primary STOMP Connect.

Expand Down Expand Up @@ -110,9 +115,16 @@ func Connect(n net.Conn, h Headers) (*Connection, error) {
log.Ldate|log.Lmicroseconds|log.Lshortfile))
}

// Initialize elapsed time tracking data if needed
c.eltd = nil
if os.Getenv("STOMP_TRACKELT") != "" {
c.eltd = &eltmets{}
}

// OK, put a CONNECT on the wire
c.wtr = bufio.NewWriter(n) // Create the writer
go c.writer() // Start it
c.wtr = bufio.NewWriterSize(n, senv.WriteBufsz()) // Create the writer
// fmt.Println("TCDBG", c.wtr.Size())
go c.writer() // Start it
var f Frame
if senv.UseStomp() {
if ch.Value("accept-version") == SPL_11 || ch.Value("accept-version") == SPL_12 {
Expand Down Expand Up @@ -148,3 +160,84 @@ func Connect(n net.Conn, h Headers) (*Connection, error) {
//
return c, e
}

func ConnectOverWS(n *websocket.Conn, h Headers) (STOMPConnector, error) {
if h == nil {
return nil, EHDRNIL
}
if e := h.Validate(); e != nil {
return nil, e
}
if _, ok := h.Contains(HK_RECEIPT); ok {
return nil, ENORECPT
}
ch := h.Clone()

//fmt.Printf("CONDB01\n")
c := &Connection{wsConn: n,
input: make(chan MessageData, 1),
output: make(chan wiredata),
connected: false,
session: "",
protocol: SPL_10,
subs: make(map[string]*subscription),
DisconnectReceipt: MessageData{},
ssdc: make(chan struct{}),
wtrsdc: make(chan struct{}),
scc: 1,
dld: &deadlineData{}}

// Basic metric data
c.mets = &metrics{st: time.Now()}

// Assumed for now
c.MessageData = c.input

// Validate that the client wants a version we support
if e := c.checkClientVersions(h); e != nil {
return c, e
}
// Optional logging from connection start
ln := senv.WantLogger()
if ln != "" {
c.SetLogger(log.New(os.Stdout, ln+" ",
log.Ldate|log.Lmicroseconds|log.Lshortfile))
}

// OK, put a CONNECT on the wire
go c.writerOverWS()
var f Frame
if senv.UseStomp() {
if ch.Value("accept-version") == SPL_11 || ch.Value("accept-version") == SPL_12 {
f = Frame{STOMP, ch, NULLBUFF} // Create actual STOMP frame
} else {
f = Frame{CONNECT, ch, NULLBUFF} // Create actual STOMP frame
}
// fmt.Printf("Frame: %q\n", f)
} else {
f = Frame{CONNECT, ch, NULLBUFF} // Create actual CONNECT frame
// fmt.Printf("Frame: %q\n", f)
}
r := make(chan error) // Make the error channel for a write
if e := c.writeWireData(wiredata{f, r}); e != nil { // Send the CONNECT frame
return c, e
}
e := <-r // Retrieve any error
//
if e != nil {
c.sysAbort() // Shutdown, we are done with errors
return c, e
}
//fmt.Printf("CONDB03\n")
//
e = c.connectHandlerOverWS(ch)
if e != nil {
c.sysAbort() // Shutdown , we are done with errors
return c, e
}

// We are connected
go c.readerOverWS()
//
return c, e
}
59 changes: 58 additions & 1 deletion connect_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package stompngo
import (
"bufio"
"bytes"
"io"

// "fmt"
"github.com/gmallard/stompngo/senv"
"strings"
)

Expand All @@ -41,7 +43,7 @@ func (e *CONNERROR) Error() string {
*/
func (c *Connection) connectHandler(h Headers) (e error) {
//fmt.Printf("CHDB01\n")
c.rdr = bufio.NewReader(c.netconn)
c.rdr = bufio.NewReaderSize(c.netconn, senv.ReadBufsz())
b, e := c.rdr.ReadBytes(0)
if e != nil {
return e
Expand Down Expand Up @@ -85,6 +87,61 @@ func (c *Connection) connectHandler(h Headers) (e error) {
return nil
}

func (c *Connection) connectHandlerOverWS(h Headers) (e error) {
//fmt.Printf("CHDB01\n")
var b []byte
// affected by heartbeat may receive empty frame
for strings.TrimSpace(string(b)) == "" {
_, r, e := c.wsConn.NextReader()
if e != nil {
return e
}
c.rdr = bufio.NewReader(r)
b, e = c.rdr.ReadBytes(0)
if e != nil && e != io.EOF {
return e
}
}

//fmt.Printf("CHDB02\n")
f, e := connectResponse(string(b))
if e != nil {
return e
}
//fmt.Printf("CHDB03\n")
//
c.ConnectResponse = &Message{f.Command, f.Headers, f.Body}
if c.ConnectResponse.Command == ERROR {
return &CONNERROR{ECONERR, string(f.Body)}
}
//fmt.Printf("CHDB04\n")
//
e = c.setProtocolLevel(h, c.ConnectResponse.Headers)
if e != nil {
return e
}
//fmt.Printf("CHDB05\n")
//
if s, ok := c.ConnectResponse.Headers.Contains(HK_SESSION); ok {
c.sessLock.Lock()
c.session = s
c.sessLock.Unlock()
}

if c.Protocol() >= SPL_11 {
e = c.initializeHeartBeats(h)
if e != nil {
return e
}
}
//fmt.Printf("CHDB06\n")

c.setConnected(true)
c.mets.tfr += 1
c.mets.tbr += c.ConnectResponse.Size(false)
return nil
}

/*
Handle data from the wire after CONNECT is sent. Attempt to create a Frame
from the wire data.
Expand Down
14 changes: 9 additions & 5 deletions data.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"net"
"sync"
"time"

"github.com/gorilla/websocket"
)

const (
Expand Down Expand Up @@ -132,7 +134,7 @@ type StatsReader interface {
}

/*
HBDataReader is an interface that modela a reader for the heart beat
HBDataReader is an interface that models a reader for the heart beat
data maintained by the stompngo package.
*/
type HBDataReader interface {
Expand Down Expand Up @@ -218,10 +220,12 @@ type Connection struct {
Hbrf bool // Indicates a heart beat read/receive failure, which is possibly transient. Valid for 1.1+ only.
Hbsf bool // Indicates a heart beat send failure, which is possibly transient. Valid for 1.1+ only.
logger *log.Logger
mets *metrics // Client metrics
scc int // Subscribe channel capacity
discLock sync.Mutex // DISCONNECT lock
dld *deadlineData // Deadline data
mets *metrics // Client metrics
scc int // Subscribe channel capacity
discLock sync.Mutex // DISCONNECT lock
dld *deadlineData // Deadline data
eltd *eltmets // Elapsed time data
wsConn *websocket.Conn // WebSocket connection
}

type subscription struct {
Expand Down
5 changes: 1 addition & 4 deletions disconnect.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,10 @@ func (c *Connection) getMessageData() (MessageData, error) {
md = <-c.input
} else {
c.log("DISCGETMD DUR -> ", d)
ticker := time.NewTicker(d)
select {
case _ = <-ticker.C:
case <-time.After(d):
me = EDISCTO
ticker.Stop()
case md = <-c.input:
ticker.Stop()
}
}
} else {
Expand Down
129 changes: 129 additions & 0 deletions elttime.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
//
// Copyright © 2019-2020 Guy M. Allard
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed, an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package stompngo

import (
"fmt"
"log"
)

type eltd struct {
ens int64 // elapsed nanoseconds
ec int64 // call count
}

type eltmets struct {

// Reader overall
rov eltd
// Reader command
rcmd eltd
// Reader individual headers
rivh eltd
// Reader - until null
run eltd
// Reader - Body
rbdy eltd

// Writer overall
wov eltd
// Writer command
wcmd eltd
// Writer individual headers
wivh eltd
// Writer - Body
wbdy eltd
}

func (c *Connection) ShowEltd(ll *log.Logger) {
if c.eltd == nil {
return
}
//
ll.Println("Reader Elapsed Time Information")
//
ll.Printf("Overall - ns %d count %d\n",
c.eltd.rov.ens, c.eltd.rov.ec)
//
ll.Printf("Command - ns %d count %d\n",
c.eltd.rcmd.ens, c.eltd.rcmd.ec)
//
ll.Printf("Individual Headers - ns %d count %d\n",
c.eltd.rivh.ens, c.eltd.rivh.ec)
//
ll.Printf("Until Null - ns %d count %d\n",
c.eltd.run.ens, c.eltd.run.ec)
//
ll.Printf("Body - ns %d count %d\n",
c.eltd.rbdy.ens, c.eltd.rbdy.ec)

//
ll.Println("Writer Elapsed Time Information")
//
ll.Printf("Overall - ns %d count %d\n",
c.eltd.wov.ens, c.eltd.wov.ec)
//
ll.Printf("Command - ns %d count %d\n",
c.eltd.wcmd.ens, c.eltd.wcmd.ec)
//
ll.Printf("Individual Headers - ns %d count %d\n",
c.eltd.wivh.ens, c.eltd.wivh.ec)
//
ll.Printf("Body - ns %d count %d\n",
c.eltd.wbdy.ens, c.eltd.wbdy.ec)
}

func (c *Connection) ShowEltdCsv() {
if c.eltd == nil {
return
}
//
fmt.Println("SECTION,ELTNS,COUNT,PCT")
//
fmt.Printf("ROV,%d,%d,%s\n",
c.eltd.rov.ens, c.eltd.rov.ec, "100.00")
//
fmt.Printf("RCMD,%d,%d,%s\n",
c.eltd.rcmd.ens, c.eltd.rcmd.ec, getpct(c.eltd.rcmd.ens, c.eltd.rov.ens))
//
fmt.Printf("RIVH,%d,%d,%s\n",
c.eltd.rivh.ens, c.eltd.rivh.ec, getpct(c.eltd.rivh.ens, c.eltd.rov.ens))
//
fmt.Printf("RUN,%d,%d,%s\n",
c.eltd.run.ens, c.eltd.run.ec, getpct(c.eltd.run.ens, c.eltd.rov.ens))
//
fmt.Printf("RBDY,%d,%d,%s\n",
c.eltd.rbdy.ens, c.eltd.rbdy.ec, getpct(c.eltd.rbdy.ens, c.eltd.rov.ens))

//
fmt.Printf("WOV,%d,%d,%s\n",
c.eltd.wov.ens, c.eltd.wov.ec, "100.00")
//
fmt.Printf("WCMD,%d,%d,%s\n",
c.eltd.wcmd.ens, c.eltd.wcmd.ec, getpct(c.eltd.wcmd.ens, c.eltd.wov.ens))
//
fmt.Printf("WIVH,%d,%d,%s\n",
c.eltd.wivh.ens, c.eltd.wivh.ec, getpct(c.eltd.wivh.ens, c.eltd.wov.ens))
//
fmt.Printf("WBDY,%d,%d,%s\n",
c.eltd.wbdy.ens, c.eltd.wbdy.ec, getpct(c.eltd.wbdy.ens, c.eltd.wov.ens))
}

func getpct(num, den int64) string {
fv := float64(num) / float64(den)
return fmt.Sprintf("%f", 100.0*fv)
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
module github.com/gmallard/stompngo

go 1.13

require github.com/gorilla/websocket v1.4.1
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
Loading