Skip to content

Commit 18db82e

Browse files
committedJun 13, 2017
Use bigger buffer size when recv()
1 parent 7e130fe commit 18db82e

8 files changed

+37
-34
lines changed
 

‎env/environment.py

+13-4
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import os
2+
from os import path
23
import sys
34
import signal
4-
import project_root
5-
from os import path
6-
from sender import Sender
75
from subprocess import Popen
6+
from sender import Sender
7+
import project_root
88
from helpers.helpers import get_open_udp_port
99

1010

@@ -44,6 +44,7 @@ def setup(self):
4444
def set_sample_action(self, sample_action):
4545
"""Set the sender's policy. Must be called before run()."""
4646

47+
self.sample_action = sample_action
4748
self.sender.set_sample_action(sample_action)
4849

4950
def rollout(self):
@@ -60,11 +61,19 @@ def rollout(self):
6061
def cleanup(self):
6162
if self.sender:
6263
self.sender.cleanup()
64+
self.sender = None
6365

6466
if self.receiver:
6567
try:
6668
os.killpg(os.getpgid(self.receiver.pid), signal.SIGTERM)
6769
except OSError as e:
6870
sys.stderr.write('%s\n' % e)
71+
finally:
72+
self.receiver = None
6973

70-
sys.stderr.write('\nEnvironment cleaned up.\n')
74+
def reset(self):
75+
self.cleanup()
76+
self.setup()
77+
78+
if self.sender:
79+
self.sender.set_sample_action(self.sample_action)

‎env/receiver.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ def handshake(self):
6868
sys.exit('Channel closed or error occurred')
6969

7070
if flag & READ_FLAGS:
71-
msg, addr = self.sock.recvfrom(1500)
71+
msg, addr = self.sock.recvfrom(1600)
7272

7373
if addr == self.peer_addr:
7474
if msg != 'Hello from sender':
@@ -83,7 +83,7 @@ def run(self):
8383
self.sock.setblocking(1) # blocking UDP socket
8484

8585
while True:
86-
serialized_data, addr = self.sock.recvfrom(1500)
86+
serialized_data, addr = self.sock.recvfrom(1600)
8787

8888
if addr == self.peer_addr:
8989
ack = self.construct_ack_from_data(serialized_data)

‎env/sender.py

+17-23
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def __init__(self, port=0, train=False, debug=False):
2727

2828
# UDP datagram template
2929
self.data = {}
30-
self.data['payload'] = 'x' * 1400
30+
self.data['payload'] = 'x' * 1350
3131

3232
# dimension of state space and action space
3333
self.state_dim = 4
@@ -45,8 +45,7 @@ def __init__(self, port=0, train=False, debug=False):
4545
self.prev_recv_ts = None
4646

4747
if self.train:
48-
self.max_step_cnt = 2000
49-
self.max_runtime = 5000
48+
self.max_runtime = 10000
5049

5150
# statistics variables to compute rewards
5251
self.sent_bytes = 0
@@ -62,7 +61,7 @@ def handshake(self):
6261
"""Handshake with peer receiver. Must be called before run()."""
6362

6463
while True:
65-
msg, addr = self.sock.recvfrom(1500)
64+
msg, addr = self.sock.recvfrom(1600)
6665

6766
if msg == 'Hello from receiver' and self.peer_addr is None:
6867
self.peer_addr = addr
@@ -79,7 +78,7 @@ def set_sample_action(self, sample_action):
7978
self.sample_action = sample_action
8079

8180
def reset(self):
82-
"""Reset the sender. Must be called in every training iteration."""
81+
"""Reset the sender. Must be called after every training iteration."""
8382

8483
self.seq_num = 0
8584
self.next_ack = 0
@@ -89,13 +88,14 @@ def reset(self):
8988
self.prev_send_ts = None
9089
self.prev_recv_ts = None
9190

92-
self.sent_bytes = 0
93-
self.acked_bytes = 0
94-
self.first_recv_ts = float('inf')
95-
self.last_recv_ts = 0
96-
self.total_delays = []
91+
if self.train:
92+
self.sent_bytes = 0
93+
self.acked_bytes = 0
94+
self.first_recv_ts = float('inf')
95+
self.last_recv_ts = 0
96+
self.total_delays = []
9797

98-
self.drain_packets()
98+
self.drain_packets()
9999

100100
def drain_packets(self):
101101
"""Drain all the packets left in the channel."""
@@ -116,7 +116,7 @@ def drain_packets(self):
116116
sys.exit('Channel closed or error occurred')
117117

118118
if flag & READ_FLAGS:
119-
self.sock.recvfrom(1500)
119+
self.sock.recvfrom(1600)
120120

121121
def update_state(self, ack):
122122
send_ts = ack['ack_send_ts']
@@ -145,10 +145,6 @@ def update_state(self, ack):
145145
self.first_recv_ts = min(recv_ts, self.first_recv_ts)
146146
self.last_recv_ts = max(recv_ts, self.last_recv_ts)
147147

148-
self.step_cnt += 1
149-
if self.step_cnt >= self.max_step_cnt:
150-
self.running = False
151-
152148
if curr_ts_ms() - self.runtime_start > self.max_runtime:
153149
self.running = False
154150

@@ -157,9 +153,7 @@ def update_state(self, ack):
157153

158154
def take_action(self, action):
159155
self.cwnd += self.action_mapping[action]
160-
161-
if self.cwnd < 5.0:
162-
self.cwnd = 5.0
156+
self.cwnd = max(5.0, self.cwnd)
163157

164158
if self.debug:
165159
sys.stderr.write('cwnd %.2f\n' % self.cwnd)
@@ -175,8 +169,9 @@ def compute_reward(self):
175169
delay_percentile = float(np.percentile(self.total_delays, 95))
176170
loss_rate = 1.0 - float(self.acked_bytes) / self.sent_bytes
177171

178-
reward = np.log(max(1e-4, avg_throughput))
179-
reward -= np.log(max(1.0, delay_percentile / 10.0))
172+
reward = 2 * np.log(max(1e-3, avg_throughput))
173+
reward -= np.log(max(1.0, delay_percentile))
174+
reward += np.log(1.0 - loss_rate)
180175

181176
sys.stderr.write('Average throughput: %.2f Mbps\n' % avg_throughput)
182177
sys.stderr.write('95th percentile one-way delay: %d ms\n' %
@@ -204,7 +199,7 @@ def send(self):
204199
sys.stderr.write('Sent seq_num %d\n' % int(self.data['seq_num']))
205200

206201
def recv(self):
207-
serialized_ack, addr = self.sock.recvfrom(1500)
202+
serialized_ack, addr = self.sock.recvfrom(1600)
208203

209204
if addr != self.peer_addr:
210205
return
@@ -231,7 +226,6 @@ def run(self):
231226
curr_flags = ALL_FLAGS
232227

233228
self.running = True
234-
self.step_cnt = 0
235229
self.runtime_start = curr_ts_ms()
236230

237231
while not self.train or self.running:
File renamed without changes.
File renamed without changes.

‎tests/test_environment.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
#!/usr/bin/env python
22

3+
from os import path
34
import sys
4-
import project_root
55
import numpy as np
6-
import tensorflow as tf
7-
from os import path
6+
import project_root
87
from env.environment import Environment
98

109

@@ -39,6 +38,7 @@ def cleanup(self):
3938
def run(self):
4039
for episode_i in xrange(1, 3):
4140
sys.stderr.write('\nEpisode %d\n' % episode_i)
41+
self.env.reset()
4242

4343
# get an episode of experience
4444
final_reward = self.env.rollout()

‎tests/test_helpers.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#!/usr/bin/env python
22

3-
import project_root
43
import numpy as np
4+
import project_root
55
from helpers.helpers import RingBuffer, MeanVarHistory
66

77

‎tests/test_sender.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
import sys
44
import argparse
5-
import project_root
65
import numpy as np
6+
import project_root
77
from env.sender import Sender
88

99

0 commit comments

Comments
 (0)
Please sign in to comment.