-
Notifications
You must be signed in to change notification settings - Fork 52
/
Copy pathsync_protocol.go
131 lines (109 loc) · 3.02 KB
/
sync_protocol.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package main
import (
"fmt"
"io"
"strings"
"time"
"github.com/reconquest/prefixwriter-go"
)
var (
syncProtocolPrefix = "ORGALORG"
syncProtocolHello = "HELLO"
syncProtocolNode = "NODE"
syncProtocolNodeCurrent = "CURRENT"
syncProtocolStart = "START"
syncProtocolSync = "SYNC"
)
// syncProtocol handles SYNC protocol described in the main.go.
//
// It will handle protocol over all connected nodes.
type syncProtocol struct {
// output represents writer, that should be connected to stdins of
// all connected nodes.
output io.WriteCloser
// prefix is a unique string which prefixes every protocol message.
prefix string
}
// newSyncProtocol returns syncProtocol instantiated with unique prefix.
func newSyncProtocol() *syncProtocol {
return &syncProtocol{
prefix: fmt.Sprintf(
"%s:%d",
syncProtocolPrefix,
time.Now().UnixNano(),
),
}
}
// Init starts protocol and sends HELLO message to the writer. Specified writer
// will be used in all further communications.
func (protocol *syncProtocol) Init(output io.WriteCloser) error {
protocol.output = prefixwriter.New(output, protocol.prefix+" ")
_, err := io.WriteString(
protocol.output,
syncProtocolHello+"\n",
)
if err != nil {
return protocolSuspendEOF(err)
}
return nil
}
// SendNode sends to the writer serialized representation of specified node as
// NODE message.
func (protocol *syncProtocol) SendNode(
current *remoteExecutionNode,
neighbor *remoteExecutionNode,
) error {
var line = syncProtocolNode + " " + neighbor.String()
if current == neighbor {
line += " " + syncProtocolNodeCurrent
}
_, err := io.WriteString(current.stdin, line+"\n")
if err != nil {
return protocolSuspendEOF(err)
}
return nil
}
// SendStart sends START message to the writer.
func (protocol *syncProtocol) SendStart() error {
_, err := io.WriteString(
protocol.output,
syncProtocolStart+"\n",
)
if err != nil {
return protocolSuspendEOF(err)
}
return nil
}
// IsSyncCommand will return true, if specified line looks like incoming
// SYNC message from the remote node.
func (protocol *syncProtocol) IsSyncCommand(line string) bool {
return strings.HasPrefix(line, protocol.prefix+" "+syncProtocolSync)
}
// SendSync sends SYNC message to the writer, tagging it as sent from node,
// described by given source and adding optional description for the given
// SYNC phase taken by extraction it from the original SYNC message, sent
// by node.
func (protocol *syncProtocol) SendSync(
source fmt.Stringer,
sync string,
) error {
data := strings.TrimSpace(
strings.TrimPrefix(sync, protocol.prefix+" "+syncProtocolSync),
)
_, err := io.WriteString(
protocol.output,
syncProtocolSync+" "+source.String()+" "+data+"\n",
)
if err != nil {
return protocolSuspendEOF(err)
}
return nil
}
// Suspend EOF for be compatible with simple commands, that are not support
// protocol, and therefore can close exit earlier, than protocol is initiated.
func protocolSuspendEOF(err error) error {
if err == io.EOF {
return nil
}
return err
}