Skip to content

Commit 58136b5

Browse files
committed
protocol: add webrtc support
1 parent 4bed82c commit 58136b5

11 files changed

+395
-6
lines changed

client.go

+10
Original file line numberDiff line numberDiff line change
@@ -711,6 +711,16 @@ func (c *Client) Tell(method string, args ...interface{}) (result *dnode.Partial
711711
return c.TellWithTimeout(method, 0, args...)
712712
}
713713

714+
// SendWebRTCRequest sends requests to kontrol for signalling purposes.
715+
func (c *Client) SendWebRTCRequest(req *protocol.WebRTCSignalMessage) error {
716+
timeout := time.Duration(0)
717+
if c.Config != nil {
718+
timeout = c.Config.Timeout
719+
}
720+
_, err := c.TellWithTimeout(WebRTCHandlerName, timeout, req)
721+
return err
722+
}
723+
714724
// TellWithTimeout does the same thing with Tell() method except it takes an
715725
// extra argument that is the timeout for waiting reply from the remote Kite.
716726
// If timeout is given 0, the behavior is same as Tell().

config/config.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ import (
1919
"github.com/igm/sockjs-go/sockjs"
2020
)
2121

22-
// the implementation of New() doesn't have any error to be returned yet it
23-
// returns, so it's totally safe to neglect the error
22+
// CookieJar ignoring err: the implementation of New() doesn't have any error to
23+
// be returned yet it returns, so it's totally safe to neglect the error
2424
var CookieJar, _ = cookiejar.New(nil)
2525

2626
// Options is passed to kite.New when creating new instance.
@@ -110,6 +110,9 @@ type Config struct {
110110
KontrolURL string
111111
KontrolKey string
112112
KontrolUser string
113+
114+
// UseWebRTC is the flag for Kite's to communicate over WebRTC if possible.
115+
UseWebRTC bool
113116
}
114117

115118
// DefaultConfig contains the default settings.

handlers.go

+69
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,85 @@
11
package kite
22

33
import (
4+
"errors"
45
"fmt"
56
"net/http"
67
"net/url"
78
"os"
89
"os/exec"
910
"runtime"
11+
"time"
1012

1113
"github.com/gorilla/websocket"
14+
"github.com/koding/cache"
15+
"github.com/koding/kite/protocol"
1216
"github.com/koding/kite/sockjsclient"
1317
"github.com/koding/kite/systeminfo"
1418
"golang.org/x/crypto/ssh/terminal"
1519
)
1620

21+
var (
22+
errDstNotSet = errors.New("dst not set")
23+
errDstNotRegistered = errors.New("dst not registered")
24+
)
25+
26+
// WebRTCHandlerName provides the naming scheme for the handler
27+
const WebRTCHandlerName = "kite.handleWebRTC"
28+
29+
type webRTCHandler struct {
30+
kitesColl cache.Cache
31+
}
32+
33+
// NewWebRCTHandler creates a new handler for web rtc signalling services.
34+
func NewWebRCTHandler() *webRTCHandler {
35+
return &webRTCHandler{
36+
kitesColl: cache.NewMemory(),
37+
}
38+
}
39+
40+
func (w *webRTCHandler) registerSrc(src *Client) {
41+
w.kitesColl.Set(src.ID, src)
42+
src.OnDisconnect(func() {
43+
time.Sleep(time.Second * 2)
44+
id := src.ID
45+
// delete from the collection
46+
w.kitesColl.Delete(id)
47+
})
48+
}
49+
50+
func (w *webRTCHandler) getDst(dst string) (*Client, error) {
51+
if dst == "" {
52+
return nil, errDstNotSet
53+
}
54+
55+
dstKite, err := w.kitesColl.Get(dst)
56+
if err != nil {
57+
return nil, errDstNotRegistered
58+
}
59+
60+
return dstKite.(*Client), nil
61+
}
62+
63+
// ServeKite implements Hander interface.
64+
func (w *webRTCHandler) ServeKite(r *Request) (interface{}, error) {
65+
var args protocol.WebRTCSignalMessage
66+
67+
if err := r.Args.One().Unmarshal(&args); err != nil {
68+
return nil, fmt.Errorf("invalid query: %s", err)
69+
}
70+
71+
args.Src = r.Client.ID
72+
73+
w.registerSrc(r.Client)
74+
75+
dst, err := w.getDst(args.Dst)
76+
if err != nil {
77+
return nil, err
78+
}
79+
80+
return nil, dst.SendWebRTCRequest(&args)
81+
}
82+
1783
func (k *Kite) addDefaultHandlers() {
1884
// Default RPC methods
1985
k.HandleFunc("kite.systemInfo", handleSystemInfo)
@@ -27,6 +93,9 @@ func (k *Kite) addDefaultHandlers() {
2793
if runtime.GOOS == "darwin" {
2894
k.HandleFunc("kite.notify", handleNotifyDarwin)
2995
}
96+
if k.WebRTCHandler != nil {
97+
k.Handle(WebRTCHandlerName, k.WebRTCHandler)
98+
}
3099
}
31100

32101
// handleSystemInfo returns info about the system (CPU, memory, disk...).

kite.go

+7
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ type Kite struct {
7070
// Deprecated: Set Config.XHR field instead.
7171
ClientFunc func(*sockjsclient.DialOptions) *http.Client
7272

73+
// WebRTCHandler handles the webrtc responses coming from a signalling server.
74+
WebRTCHandler Handler
75+
7376
// Handlers added with Kite.HandleFunc().
7477
handlers map[string]*Method // method map for exported methods
7578
preHandlers []Handler // a list of handlers that are executed before any handler
@@ -197,6 +200,10 @@ func NewWithConfig(name, version string, cfg *config.Config) *Kite {
197200
muxer: mux.NewRouter(),
198201
}
199202

203+
if cfg != nil && cfg.UseWebRTC {
204+
k.WebRTCHandler = NewWebRCTHandler()
205+
}
206+
200207
// All sockjs communication is done through this endpoint..
201208
k.muxer.PathPrefix("/kite").Handler(sockjs.NewHandler("/kite", *cfg.SockJS, k.sockjsHandler))
202209

kitetest/kitetest.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ import (
99
"os/user"
1010
"time"
1111

12-
"github.com/dgrijalva/jwt-go"
12+
jwt "github.com/dgrijalva/jwt-go"
1313
"github.com/koding/kite/protocol"
14-
"github.com/satori/go.uuid"
14+
uuid "github.com/satori/go.uuid"
1515
)
1616

1717
// KeyPair represents PEM encoded RSA key pair.

kontrol/handlers_test.go

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package kontrol
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
"testing"
7+
8+
"github.com/koding/kite"
9+
"github.com/koding/kite/protocol"
10+
"github.com/koding/kite/testkeys"
11+
)
12+
13+
// createTestKite creates a test kite, caller of this func should close the kite
14+
func createTestKite(name string, conf *Config, t *testing.T) *HelloKite {
15+
k, err := NewHelloKite(name, conf)
16+
if err != nil {
17+
t.Fatalf("error creating %s: %s", name, err)
18+
}
19+
20+
k.Kite.HandleFunc(kite.WebRTCHandlerName, func(req *kite.Request) (interface{}, error) {
21+
return nil, fmt.Errorf("%s is called", name)
22+
})
23+
24+
return k
25+
}
26+
27+
func TestKontrol_HandleWebRTC(t *testing.T) {
28+
kont, conf := startKontrol(testkeys.PrivateThird, testkeys.PublicThird, 5501)
29+
defer kont.Close()
30+
31+
hk1 := createTestKite("kite1", conf, t)
32+
defer hk1.Close()
33+
34+
hk2 := createTestKite("kite2", conf, t)
35+
defer hk2.Close()
36+
37+
err := hk1.Kite.SendWebRTCRequest(&protocol.WebRTCSignalMessage{Dst: hk2.Kite.Id})
38+
if err == nil || !strings.Contains(err.Error(), "not registered") {
39+
t.Fatalf("expected kite.errDstNotRegistered, got: %+v", err)
40+
}
41+
42+
err = hk2.Kite.SendWebRTCRequest(&protocol.WebRTCSignalMessage{Dst: hk1.Kite.Id})
43+
if !strings.Contains(err.Error(), fmt.Sprintf("%s is called", hk1.Kite.Kite().Name)) {
44+
t.Fatalf("expected hk1 error, got: %+v", err)
45+
}
46+
}

kontrol/helper_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func startKontrol(pem, pub string, port int) (*Kontrol, *Config) {
5151
conf.KontrolUser = "testuser"
5252
conf.KiteKey = testutil.NewToken("testuser", pem, pub).Raw
5353
conf.ReadEnvironmentVariables()
54-
54+
conf.UseWebRTC = true
5555
DefaultPort = port
5656
kon := New(conf.Copy(), "1.0.0")
5757
// kon.Kite.SetLogLevel(kite.DEBUG)

kontrol/kontrol_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
"testing"
1515
"time"
1616

17-
"github.com/dgrijalva/jwt-go"
17+
jwt "github.com/dgrijalva/jwt-go"
1818
"github.com/koding/kite"
1919
"github.com/koding/kite/kitekey"
2020
"github.com/koding/kite/protocol"

kontrolclient.go

+12
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,18 @@ func (k *Kite) GetToken(kite *protocol.Kite) (string, error) {
205205
return tkn, nil
206206
}
207207

208+
// SendWebRTCRequest sends requests to kontrol for signalling purposes.
209+
func (k *Kite) SendWebRTCRequest(req *protocol.WebRTCSignalMessage) error {
210+
if err := k.SetupKontrolClient(); err != nil {
211+
return err
212+
}
213+
214+
<-k.kontrol.readyConnected
215+
216+
_, err := k.kontrol.TellWithTimeout(WebRTCHandlerName, k.Config.Timeout, req)
217+
return err
218+
}
219+
208220
// GetTokenForce is used to obtain a new token for the given kite.
209221
//
210222
// It always returns a new token and forces a Kontrol to

protocol/webrtc.go

+93
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package protocol
2+
3+
import (
4+
"encoding/json"
5+
"errors"
6+
"strings"
7+
"sync"
8+
"unicode/utf8"
9+
)
10+
11+
var (
12+
errInvalidChar = errors.New("message contains invalid chars")
13+
errInvalidOp = errors.New("invalid start operation")
14+
)
15+
16+
// WebRTCSignalMessage represents a signalling message between peers and the singalling server
17+
type WebRTCSignalMessage struct {
18+
Type string `json:"type,omitempty"`
19+
Src string `json:"src,omitempty"`
20+
Dst string `json:"dst,omitempty"`
21+
Payload json.RawMessage `json:"payload,omitempty"`
22+
23+
parsedPayload *Payload
24+
isParsed bool
25+
mu sync.Mutex
26+
}
27+
28+
// Payload is the content of `payload` in the json
29+
type Payload struct {
30+
Msg *string `json:"msg,omitempty"`
31+
Sdp *struct {
32+
Type *string `json:"type,omitempty"`
33+
Sdp *string `json:"sdp,omitempty"`
34+
} `json:"sdp,omitempty"`
35+
Type *string `json:"type,omitempty"`
36+
Label *string `json:"label,omitempty"`
37+
ConnectionID *string `json:"connectionId,omitempty"`
38+
Reliable *bool `json:"reliable,omitempty"`
39+
Serialization *string `json:"serialization,omitempty"`
40+
Browser *string `json:"browser,omitempty"`
41+
Candidate *struct {
42+
Candidate *string `json:"candidate,omitempty"`
43+
SdpMid *string `json:"sdpMid,omitempty"`
44+
SdpMLineIndex *int `json:"sdpMLineIndex,omitempty"`
45+
} `json:"candidate,omitempty"`
46+
}
47+
48+
// ParsePayload parses the payload if it is not parsed previously. This method
49+
// can be called concurrently.
50+
func (w *WebRTCSignalMessage) ParsePayload() (*Payload, error) {
51+
w.mu.Lock()
52+
defer w.mu.Unlock()
53+
54+
if w.isParsed {
55+
return w.parsedPayload, nil
56+
}
57+
58+
payload := &Payload{}
59+
if err := json.Unmarshal(w.Payload, payload); err != nil {
60+
return nil, err
61+
}
62+
63+
w.parsedPayload = payload
64+
return payload, nil
65+
}
66+
67+
// ParseWebRTCSignalMessage parses the web rtc command/message
68+
func ParseWebRTCSignalMessage(msg string) (*WebRTCSignalMessage, error) {
69+
// All messages are text (utf-8 encoded at present)
70+
if !utf8.Valid([]byte(msg)) {
71+
return nil, errInvalidChar
72+
}
73+
74+
w := &WebRTCSignalMessage{}
75+
if err := json.Unmarshal([]byte(msg), w); err != nil {
76+
return nil, err
77+
}
78+
79+
if err := validateOperation(w.Type); err != nil {
80+
return nil, err
81+
}
82+
83+
return w, nil
84+
}
85+
86+
func validateOperation(op string) error {
87+
switch strings.ToUpper(op) {
88+
case "ANSWER", "OFFER", "CANDIDATE", "LEAVE":
89+
return nil
90+
default:
91+
return errInvalidOp
92+
}
93+
}

0 commit comments

Comments
 (0)