Skip to content

Commit

Permalink
Add customizations for distributed mode
Browse files Browse the repository at this point in the history
  • Loading branch information
svpcom committed Sep 16, 2024
1 parent fa26cc8 commit 3608b88
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 36 deletions.
70 changes: 48 additions & 22 deletions wfb_ng/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,34 @@
import itertools
from jinja2 import Environment, StrictUndefined

from .common import search_attr
from .services import parse_services, hash_link_domain, bandwidth_map
from .conf import settings

def parse_cluster_services(profiles):
if not settings.cluster.nodes:
raise Exception('Cluster is empty!')

if not settings.cluster.server_address:
raise Exception('Server IP address is not set!')

udp_port_allocator = itertools.count(settings.cluster.base_port_server)
services = list((profile, parse_services(profile, udp_port_allocator)) for profile in profiles)
port_allocators = {}
cluster_nodes = {}

def update_node(node, profile, service_name, link_id, tx_port_base, wlans, srv_cfg):
server_address = search_attr('server_address',
settings.cluster.nodes[node],
settings.cluster.__dict__)

if not server_address:
raise Exception('Server IP address is not set!')

d = dict(wlans = wlans,
link_id = link_id,
bandwidth = srv_cfg.bandwidth,
stream_tx = srv_cfg.stream_tx,
stream_rx = srv_cfg.stream_rx,
tx_port_base = tx_port_base,
rx_fwd = (settings.cluster.server_address, srv_cfg.udp_port_auto))
rx_fwd = (server_address, srv_cfg.udp_port_auto))

if node not in cluster_nodes:
cluster_nodes[node] = {}
Expand Down Expand Up @@ -86,7 +91,7 @@ def get_allocator(node):

script_template = '''\
#!/bin/bash
set -em
set -emb
export LC_ALL=C
Expand All @@ -103,22 +108,22 @@ def get_allocator(node):
trap _cleanup EXIT
iw reg set {{ settings.common.wifi_region }}
for wlan in {{ wlans|join(' ') }}
do
if which nmcli > /dev/null && ! nmcli device show $wlan | grep -q '(unmanaged)'
then
nmcli device set $wlan managed no
fi
ip link set $wlan down
iw dev $wlan set monitor otherbss
ip link set $wlan up
iw dev $wlan set channel {{ settings.common.wifi_channel }} {{ ht_mode }}
{% if settings.common.wifi_txpower != None %}
iw dev $wlan set txpower fixed {{ settings.common.wifi_txpower }}
{% for wlan in wlans %}
# init {{ wlan }}
if which nmcli > /dev/null && ! nmcli device show {{ wlan }} | grep -q '(unmanaged)'
then
nmcli device set {{ wlan }} managed no
fi
ip link set {{ wlan }} down
iw dev {{ wlan }} set monitor otherbss
ip link set {{ wlan }} up
iw dev {{ wlan }} set channel {{ channel[wlan] }} {{ ht_mode }}
{% if txpower[wlan] != None %}
iw dev {{ wlan }} set txpower fixed {{ txpower[wlan] }}
{% endif %}
done
{% endfor %}
{% for service, attrs in services.items() %}
# {{ service }}
Expand All @@ -130,10 +135,12 @@ def get_allocator(node):
{% endif %}
{% endfor %}
# Fail in case of connection loss
# Will fail in case of connection loss
(sleep 1; exec cat > /dev/null) &
echo "WFB-ng init done"
wait -n
'''

script_template = env.from_string(script_template)
Expand All @@ -144,6 +151,25 @@ def gen_cluster_scripts(cluster_nodes):
for node, node_attrs in cluster_nodes.items():
wlans = sorted(set().union(*[srv_attrs['wlans'] for srv_attrs in node_attrs.values()]))
max_bw = max(srv_attrs['bandwidth'] for srv_attrs in node_attrs.values())
res[node] = script_template.render(wlans=wlans, ht_mode=bandwidth_map[max_bw], services=node_attrs)

channel = search_attr('wifi_channel',
settings.cluster.nodes[node],
settings.common.__dict__)

if not isinstance(channel, dict):
channel = dict((wlan, channel) for wlan in wlans)

txpower = search_attr('wifi_txpower',
settings.cluster.nodes[node],
settings.common.__dict__)

if not isinstance(txpower, dict):
txpower = dict((wlan, txpower) for wlan in wlans)

res[node] = script_template.render(wlans=wlans,
ht_mode=bandwidth_map[max_bw],
services=node_attrs,
txpower=txpower,
channel=channel)

return res
10 changes: 8 additions & 2 deletions wfb_ng/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,19 @@ def abort_on_crash(f, stop_reactor=True, warn_cancel=True):
f = f.value.subFailure

if settings.common.debug:
log.err(f, 'Stopping reactor due to fatal error')
log.err(f, 'Stopping reactor due to fatal error', isError=1)
else:
log.msg('Stopping reactor due to fatal error: %s' % (f.value,))
log.msg('Stopping reactor due to fatal error: %s' % (f.value,), isError=1)

fatal_error(stop_reactor)


def df_sleep(timeout, res=None):
return task.deferLater(reactor, timeout, lambda: res)


def search_attr(key, *attrs):
for a in attrs:
if key in a:
return a[key]
raise KeyError('Attribute %r is not defined!' % (key,))
12 changes: 10 additions & 2 deletions wfb_ng/conf/master.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ wifi_region = 'BO' # Set CRDA region
wifi_txpower = None # Leave None to use default power settings from driver.
# For 8812au set to -dBm * 100. I.e for 30dBm set to -3000
# For 8812eu set to dBm * 100. I.e for 30dBm set to 3000
# Also you can set own txpower for each wifi card, for example:
# {'wlan0': -100, 'wlan1': 100}


temp_measurement_interval = 10 # [s] (8812eu only) Internal RF path temp measurement.
temp_overheat_warning = 60 # [*C] (8812eu only) Overheat warning threshold.
Expand All @@ -49,8 +52,13 @@ temp_overheat_warning = 60 # [*C] (8812eu only) Overheat warning threshold.
[cluster]

nodes = {
#'127.0.0.1': { 'wlans': ['wlan1', 'wlan2'] },
#'10.5.1.1' : { 'wlans': ['wlan0', 'wlan1'] },
# required host attrs: 'wlans'
# optional host attrs (will override defaults): 'ssh', 'server_address', 'wifi_txpower', 'wifi_channel', 'ssh_user', 'ssh_port', 'ssh_key'
# If ssh_user or ssh_port is set to None then node will not be automatically initialized in ssh mode.
# If ssh_key is None, then ssh_agent will be used.

#'127.0.0.1': { 'wlans': ['wlan1', 'wlan2'], 'wifi_txpower': None},
#'10.5.1.1' : { 'wlans': ['wlan0', 'wlan1']},
}

# Cluster init can be auto (--cluster ssh) or manual (--cluster manual)
Expand Down
2 changes: 1 addition & 1 deletion wfb_ng/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ def errReceived(self, data):

def processEnded(self, status):
rc = status.value.exitCode
log.msg('Stopped ssh %s with code %s' % (self.host, rc))
log.msg('Stopped ssh %s with code %s' % (self.host, rc), isError=(rc != 0))

if rc == 0:
self.df.callback(str(status.value))
Expand Down
43 changes: 34 additions & 9 deletions wfb_ng/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from twisted.internet import reactor, defer

from . import _log_msg, ConsoleObserver, ErrorSafeLogFile, call_and_check_rc, ExecError, version_msg
from .common import abort_on_crash, exit_status, df_sleep
from .common import abort_on_crash, exit_status, df_sleep, search_attr
from .protocols import StatsAndSelectorFactory, RFTempMeter, SSHClientProtocol
from .services import parse_services, init_udp_direct_tx, init_udp_direct_rx, init_mavlink, init_tunnel, init_udp_proxy, hash_link_domain, bandwidth_map
from .cluster import parse_cluster_services, gen_cluster_scripts
Expand Down Expand Up @@ -88,8 +88,15 @@ def init_wlans(max_bw, wlans):

yield call_and_check_rc('iw', 'dev', wlan, 'set', 'channel', str(channel), ht_mode)

if settings.common.wifi_txpower:
yield call_and_check_rc('iw', 'dev', wlan, 'set', 'txpower', 'fixed', str(settings.common.wifi_txpower))
# You can set own tx power for each card
if isinstance(settings.common.wifi_txpower, dict):
txpower = settings.common.wifi_txpower[wlan]
else:
txpower = settings.common.wifi_txpower

if txpower is not None:
yield call_and_check_rc('iw', 'dev', wlan, 'set', 'txpower', 'fixed', str(txpower))

except ExecError as v:
if v.stdout:
log.msg(v.stdout, isError=1)
Expand All @@ -109,16 +116,34 @@ def init(profiles, wlans, cluster_mode):
dl = []
is_cluster = bool(cluster_mode)

def _ssh_exited(x, node):
raise Exception('Connection to %s closed, aborting' % (node,))

if is_cluster:
services, cluster_nodes = parse_cluster_services(profiles)
if cluster_mode == 'ssh':
for node, setup_script in gen_cluster_scripts(cluster_nodes).items():
dl.append(SSHClientProtocol(node, settings.cluster.ssh_user,
'/bin/bash',
key=settings.cluster.ssh_key,
port=settings.cluster.ssh_port,
use_agent=settings.cluster.ssh_key is None,
stdin=setup_script).start())
ssh_user = search_attr('ssh_user',
settings.cluster.nodes[node],
settings.cluster.__dict__)

ssh_port = search_attr('ssh_port',
settings.cluster.nodes[node],
settings.cluster.__dict__)

ssh_key = search_attr('ssh_key',
settings.cluster.nodes[node],
settings.cluster.__dict__)

if ssh_user and ssh_port:
dl.append(SSHClientProtocol(node,
ssh_user,
'/bin/bash',
key=ssh_key,
port=ssh_port,
use_agent=ssh_key is None,
stdin=setup_script).start()\
.addBoth(_ssh_exited, node))
else:
services = list((profile, parse_services(profile, None)) for profile in profiles)
# Do cards init
Expand Down

0 comments on commit 3608b88

Please sign in to comment.