From a41cd378e7e23dd094d6c50f26c4faa8e64e5954 Mon Sep 17 00:00:00 2001 From: Vasily Evseenko Date: Fri, 13 Sep 2024 22:32:09 +0300 Subject: [PATCH] Add distributed mode for control plane --- Makefile | 2 +- scripts/install_gs.sh | 2 +- scripts/wifibroadcast@.service | 2 +- src/rx.cpp | 18 +- src/rx.hpp | 2 +- src/tx.cpp | 666 ++++++++++++++++---- src/tx.hpp | 191 +++++- src/wifibroadcast.cpp | 4 +- src/wifibroadcast.hpp | 5 +- stdeb.cfg | 2 +- wfb_ng/cli.py | 45 +- wfb_ng/cluster.py | 149 +++++ wfb_ng/conf/master.cfg | 23 + wfb_ng/protocols.py | 577 ++++++++++++++++++ wfb_ng/server.py | 1042 +++----------------------------- wfb_ng/services.py | 512 ++++++++++++++++ wfb_ng/tests/test_txrx.py | 4 +- 17 files changed, 2139 insertions(+), 1107 deletions(-) create mode 100644 wfb_ng/cluster.py create mode 100644 wfb_ng/protocols.py create mode 100644 wfb_ng/services.py diff --git a/Makefile b/Makefile index b1aa078d..e42b834c 100644 --- a/Makefile +++ b/Makefile @@ -23,7 +23,7 @@ _CFLAGS := $(CFLAGS) -Wall -O2 -fno-strict-aliasing -DWFB_VERSION='"$(VERSION)-$ all: all_bin gs.key test $(ENV): - virtualenv --python=$(PYTHON) $(ENV) + $(PYTHON) -m virtualenv $(ENV) $$(PATH=$(ENV)/bin:$(ENV)/local/bin:$(PATH) which pip3) install --upgrade pip setuptools $(STDEB) all_bin: wfb_rx wfb_tx wfb_keygen wfb_tx_cmd diff --git a/scripts/install_gs.sh b/scripts/install_gs.sh index cb9886cf..2320a148 100755 --- a/scripts/install_gs.sh +++ b/scripts/install_gs.sh @@ -15,7 +15,7 @@ apt update apt upgrade apt install python3-all python3-all-dev libpcap-dev libsodium-dev python3-pip python3-pyroute2 python3-msgpack \ - python3-future python3-twisted python3-serial iw virtualenv debhelper dh-python fakeroot build-essential -y + python3-future python3-twisted python3-serial python3-jinja2 iw virtualenv debhelper dh-python fakeroot build-essential -y # Build make deb diff --git a/scripts/wifibroadcast@.service b/scripts/wifibroadcast@.service index 985b40e1..75a67fce 100644 --- a/scripts/wifibroadcast@.service +++ b/scripts/wifibroadcast@.service @@ -9,7 +9,7 @@ Type=simple EnvironmentFile=/etc/default/wifibroadcast # per-profile environment EnvironmentFile=-/etc/default/wifibroadcast.%i -ExecStart=/usr/bin/wfb-server %i ${WFB_NICS} +ExecStart=/bin/bash -c "exec /usr/bin/wfb-server --profiles $(echo %i | tr : ' ') --wlans ${WFB_NICS}" TimeoutStopSec=5s Restart=on-failure RestartSec=5s diff --git a/src/rx.cpp b/src/rx.cpp index d90b69e6..76cb429b 100644 --- a/src/rx.cpp +++ b/src/rx.cpp @@ -50,7 +50,7 @@ extern "C" using namespace std; -Receiver::Receiver(const char *wlan, int wlan_idx, uint32_t channel_id, BaseAggregator *agg) : wlan_idx(wlan_idx), agg(agg) +Receiver::Receiver(const char *wlan, int wlan_idx, uint32_t channel_id, BaseAggregator *agg, int rcv_buf_size) : wlan_idx(wlan_idx), agg(agg) { char errbuf[PCAP_ERRBUF_SIZE]; @@ -60,9 +60,10 @@ Receiver::Receiver(const char *wlan, int wlan_idx, uint32_t channel_id, BaseAggr throw runtime_error(string_format("Unable to open interface %s in pcap: %s", wlan, errbuf)); } - if (pcap_set_snaplen(ppcap, 4096) !=0) throw runtime_error("set_snaplen failed"); + if (rcv_buf_size > 0 && pcap_set_buffer_size(ppcap, rcv_buf_size) != 0) throw runtime_error("set_buffer_size failed"); + if (pcap_set_snaplen(ppcap, MAX_PCAP_PACKET_SIZE) != 0) throw runtime_error("set_snaplen failed"); if (pcap_set_promisc(ppcap, 1) != 0) throw runtime_error("set_promisc failed"); - if (pcap_set_timeout(ppcap, -1) !=0) throw runtime_error("set_timeout failed"); + if (pcap_set_timeout(ppcap, -1) != 0) throw runtime_error("set_timeout failed"); if (pcap_set_immediate_mode(ppcap, 1) != 0) throw runtime_error(string_format("pcap_set_immediate_mode failed: %s", pcap_geterr(ppcap))); if (pcap_activate(ppcap) !=0) throw runtime_error(string_format("pcap_activate failed: %s", pcap_geterr(ppcap))); if (pcap_setnonblock(ppcap, 1, errbuf) != 0) throw runtime_error(string_format("set_nonblock failed: %s", errbuf)); @@ -850,7 +851,7 @@ void Aggregator::apply_fec(int ring_idx) fec_decode(fec_p, (const uint8_t**)in_blocks, out_blocks, index, MAX_FEC_PAYLOAD); } -void radio_loop(int argc, char* const *argv, int optind, uint32_t channel_id, shared_ptr agg, int log_interval) +void radio_loop(int argc, char* const *argv, int optind, uint32_t channel_id, shared_ptr agg, int log_interval, int rcv_buf_size) { int nfds = argc - optind; uint64_t log_send_ts = 0; @@ -866,7 +867,7 @@ void radio_loop(int argc, char* const *argv, int optind, uint32_t channel_id, sh for(int i = 0; i < nfds; i++) { - rx[i] = new Receiver(argv[optind + i], i, channel_id, agg.get()); + rx[i] = new Receiver(argv[optind + i], i, channel_id, agg.get(), rcv_buf_size); fds[i].fd = rx[i]->getfd(); fds[i].events = POLLIN; } @@ -970,7 +971,6 @@ void network_loop(int srv_port, Aggregator &agg, int log_interval, int rcv_buf_s if (rsize < (ssize_t)sizeof(wrxfwd_t)) { - fprintf(stderr, "Short packet (rx fwd header)\n"); continue; } agg.process_packet(buf, rsize - sizeof(wrxfwd_t), @@ -1035,8 +1035,8 @@ int main(int argc, char* const *argv) break; default: /* '?' */ show_usage: - fprintf(stderr, "Local receiver: %s [-K rx_key] [-c client_addr] [-u client_port] [-p radio_port] [-l log_interval] [-e epoch] [-i link_id] interface1 [interface2] ...\n", argv[0]); - fprintf(stderr, "Remote (forwarder): %s -f [-c client_addr] [-u client_port] [-p radio_port] [-i link_id] interface1 [interface2] ...\n", argv[0]); + fprintf(stderr, "Local receiver: %s [-K rx_key] [-c client_addr] [-u client_port] [-p radio_port] [-R rcv_buf] [-l log_interval] [-e epoch] [-i link_id] interface1 [interface2] ...\n", argv[0]); + fprintf(stderr, "Remote (forwarder): %s -f [-c client_addr] [-u client_port] [-p radio_port] [-R rcv_buf] [-i link_id] interface1 [interface2] ...\n", argv[0]); fprintf(stderr, "Remote (aggregator): %s -a server_port [-K rx_key] [-c client_addr] [-R rcv_buf] [-u client_port] [-l log_interval] [-p radio_port] [-e epoch] [-i link_id]\n", argv[0]); fprintf(stderr, "Default: K='%s', connect=%s:%d, link_id=0x%06x, radio_port=%u, epoch=%" PRIu64 ", log_interval=%d, rcv_buf=system_default\n", keypair.c_str(), client_addr.c_str(), client_port, link_id, radio_port, epoch, log_interval); fprintf(stderr, "WFB-ng version %s\n", WFB_VERSION); @@ -1080,7 +1080,7 @@ int main(int argc, char* const *argv) agg = shared_ptr(new Forwarder(client_addr, client_port)); } - radio_loop(argc, argv, optind, channel_id, agg, log_interval); + radio_loop(argc, argv, optind, channel_id, agg, log_interval, rcv_buf); }else if(rx_mode == AGGREGATOR) { if (optind > argc) goto show_usage; diff --git a/src/rx.hpp b/src/rx.hpp index 129fa81b..d8adebc2 100644 --- a/src/rx.hpp +++ b/src/rx.hpp @@ -246,7 +246,7 @@ class Aggregator : public BaseAggregator class Receiver { public: - Receiver(const char* wlan, int wlan_idx, uint32_t channel_id, BaseAggregator* agg); + Receiver(const char* wlan, int wlan_idx, uint32_t channel_id, BaseAggregator* agg, int rcv_buf_size); ~Receiver(); void loop_iter(void); int getfd(void){ return fd; } diff --git a/src/tx.cpp b/src/tx.cpp index 3d6008df..cb23be1e 100644 --- a/src/tx.cpp +++ b/src/tx.cpp @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -37,6 +38,7 @@ #include #include #include +#include extern "C" { @@ -187,44 +189,10 @@ void Transmitter::init_session(int k, int n) assert(session_packet_size <= MAX_SESSION_PACKET_SIZE); } -void RawSocketTransmitter::set_mark(uint32_t idx) -{ - if (!use_qdisc) - { - return; - } - - if (current_output >= 0) - { - int fd = sockfds[current_output]; - uint32_t sockopt = fwmark + idx; - - if(setsockopt(fd, SOL_SOCKET, SO_MARK, (const void *)&sockopt , sizeof(sockopt)) !=0) - { - throw runtime_error(string_format("Unable to set SO_MARK fd(%d)=%u: %s", fd, sockopt, strerror(errno))); - } - - return; - } - - // Handle mirror mode - for(auto it = sockfds.begin(); it != sockfds.end(); it++) - { - int fd = *it; - uint32_t sockopt = fwmark + idx; - - if(setsockopt(fd, SOL_SOCKET, SO_MARK, (const void *)&sockopt , sizeof(sockopt)) !=0) - { - throw runtime_error(string_format("Unable to set SO_MARK fd(%d)=%u: %s", fd, sockopt, strerror(errno))); - } - } -} - - RawSocketTransmitter::RawSocketTransmitter(int k, int n, const string &keypair, uint64_t epoch, uint32_t channel_id, uint32_t fec_delay, vector &tags, const vector &wlans, radiotap_header_t &radiotap_header, - uint8_t frame_type, bool use_qdisc, uint32_t fwmark) : \ + uint8_t frame_type, bool use_qdisc, uint32_t fwmark_base) : \ Transmitter(k, n, keypair, epoch, channel_id, fec_delay, tags), channel_id(channel_id), current_output(0), @@ -232,7 +200,8 @@ RawSocketTransmitter::RawSocketTransmitter(int k, int n, const string &keypair, radiotap_header(radiotap_header), frame_type(frame_type), use_qdisc(use_qdisc), - fwmark(fwmark) + fwmark_base(fwmark_base), + fwmark(fwmark_base) { for(auto it=wlans.begin(); it!=wlans.end(); it++) { @@ -275,6 +244,7 @@ RawSocketTransmitter::RawSocketTransmitter(int k, int n, const string &keypair, } sockfds.push_back(fd); + fd_fwmarks[fd] = 0; } } @@ -328,7 +298,21 @@ void RawSocketTransmitter::inject_packet(const uint8_t *buf, size_t size) { // Normal mode - only one card do packet transmission in a time uint64_t start_us = get_time_us(); - int rc = sendmsg(sockfds[current_output], &msghdr, 0); + int fd = sockfds[current_output]; + + if (use_qdisc && fd_fwmarks[fd] != fwmark) + { + uint32_t sockopt = fwmark; + + if(setsockopt(fd, SOL_SOCKET, SO_MARK, (const void *)&sockopt , sizeof(sockopt)) !=0) + { + throw runtime_error(string_format("Unable to set SO_MARK fd(%d)=%u: %s", fd, sockopt, strerror(errno))); + } + + fd_fwmarks[fd] = fwmark; + } + + int rc = sendmsg(fd, &msghdr, 0); if (rc < 0 && errno != ENOBUFS) { @@ -346,7 +330,21 @@ void RawSocketTransmitter::inject_packet(const uint8_t *buf, size_t size) for(auto it=sockfds.begin(); it != sockfds.end(); it++, i++) { uint64_t start_us = get_time_us(); - int rc = sendmsg(*it, &msghdr, 0); + int fd = *it; + + if (use_qdisc && fd_fwmarks[fd] != fwmark) + { + uint32_t sockopt = fwmark; + + if(setsockopt(fd, SOL_SOCKET, SO_MARK, (const void *)&sockopt , sizeof(sockopt)) !=0) + { + throw runtime_error(string_format("Unable to set SO_MARK fd(%d)=%u: %s", fd, sockopt, strerror(errno))); + } + + fd_fwmarks[fd] = fwmark; + } + + int rc = sendmsg(fd, &msghdr, 0); if (rc < 0 && errno != ENOBUFS) { @@ -387,6 +385,161 @@ RawSocketTransmitter::~RawSocketTransmitter() } +RemoteTransmitter::RemoteTransmitter(int k, int n, const string &keypair, uint64_t epoch, uint32_t channel_id, uint32_t fec_delay, + vector &tags, const vector>> &remote_hosts, radiotap_header_t &radiotap_header, + uint8_t frame_type, bool use_qdisc, uint32_t fwmark_base) : \ + Transmitter(k, n, keypair, epoch, channel_id, fec_delay, tags), + channel_id(channel_id), + current_output(0), + ieee80211_seq(0), + radiotap_header(radiotap_header), + frame_type(frame_type), + use_qdisc(use_qdisc), + fwmark_base(fwmark_base), + fwmark(fwmark_base) +{ + + sockfd = socket(AF_INET, SOCK_DGRAM, 0); + if (sockfd < 0) throw std::runtime_error(string_format("Error opening socket: %s", strerror(errno))); + + int output = 0; + for(auto h_it=remote_hosts.begin(); h_it!=remote_hosts.end(); h_it++) + { + uint8_t wlan_id = 0; + for(auto p_it=h_it->second.begin(); p_it != h_it->second.end(); p_it++, output++, wlan_id++) + { + struct sockaddr_in saddr; + memset(&saddr, '\0', sizeof(saddr)); + saddr.sin_family = AF_INET; + saddr.sin_addr.s_addr = inet_addr(h_it->first.c_str()); + saddr.sin_port = htons((unsigned short)*p_it); + sockaddrs.push_back(saddr); + output_to_ant_id[output] = ((uint64_t)ntohl(saddr.sin_addr.s_addr) << 32) | (uint64_t)(wlan_id) << 8 | (uint64_t)0xff; + } + } +} + +void RemoteTransmitter::inject_packet(const uint8_t *buf, size_t size) +{ + assert(size <= MAX_FORWARDER_PACKET_SIZE); + uint8_t ieee_hdr[sizeof(ieee80211_header)]; + + // fill default values + memcpy(ieee_hdr, ieee80211_header, sizeof(ieee80211_header)); + + // frame_type + ieee_hdr[0] = frame_type; + + // channel_id + uint32_t channel_id_be = htobe32(channel_id); + memcpy(ieee_hdr + SRC_MAC_THIRD_BYTE, &channel_id_be, sizeof(uint32_t)); + memcpy(ieee_hdr + DST_MAC_THIRD_BYTE, &channel_id_be, sizeof(uint32_t)); + + // sequence number + ieee_hdr[FRAME_SEQ_LB] = ieee80211_seq & 0xff; + ieee_hdr[FRAME_SEQ_HB] = (ieee80211_seq >> 8) & 0xff; + ieee80211_seq += 16; + + uint32_t _fwmark = use_qdisc ? htonl(this->fwmark) : 0; + + struct iovec iov[4] = \ + { + // fwmark + { + .iov_base = (void*)&_fwmark, + .iov_len = sizeof(_fwmark), + }, + // radiotap header + { .iov_base = (void*)&radiotap_header.header[0], + .iov_len = radiotap_header.header.size() + }, + // ieee80211 header + { .iov_base = (void*)ieee_hdr, + .iov_len = sizeof(ieee_hdr) + }, + // packet payload + { .iov_base = (void*)buf, + .iov_len = size + } + }; + + struct msghdr msghdr = \ + { .msg_name = NULL, + .msg_namelen = 0, + .msg_iov = iov, + .msg_iovlen = 4, + .msg_control = NULL, + .msg_controllen = 0, + .msg_flags = 0}; + + struct sockaddr_in saddr; + + if (current_output >= 0) + { + // Normal mode - only one card do packet transmission in a time + uint64_t start_us = get_time_us(); + + saddr = sockaddrs[current_output]; + msghdr.msg_name = &saddr; + msghdr.msg_namelen = sizeof(saddr); + + int rc = sendmsg(sockfd, &msghdr, 0); + + if (rc < 0 && errno != ENOBUFS) + { + throw runtime_error(string_format("Unable to inject packet: %s", strerror(errno))); + } + + uint64_t key = output_to_ant_id[current_output]; + antenna_stat[key].log_latency(get_time_us() - start_us, rc >= 0, size); + } + else + { + // Mirror mode - transmit packet via all cards + // Use only for different frequency channels + int i = 0; + for(auto it=sockaddrs.begin(); it != sockaddrs.end(); it++, i++) + { + uint64_t start_us = get_time_us(); + + saddr = *it; + msghdr.msg_name = &saddr; + msghdr.msg_namelen = sizeof(saddr); + + int rc = sendmsg(sockfd, &msghdr, 0); + + if (rc < 0 && errno != ENOBUFS) + { + throw runtime_error(string_format("Unable to inject packet: %s", strerror(errno))); + } + + uint64_t key = output_to_ant_id[i]; + antenna_stat[key].log_latency(get_time_us() - start_us, rc >= 0, size); + } + } + +} + +void RemoteTransmitter::dump_stats(FILE *fp, uint64_t ts, uint32_t &injected_packets, uint32_t &dropped_packets, uint32_t &injected_bytes) +{ + for(auto it = antenna_stat.begin(); it != antenna_stat.end(); it++) + { + fprintf(fp, "%" PRIu64 "\tTX_ANT\t%" PRIx64 "\t%u:%u:%" PRIu64 ":%" PRIu64 ":%" PRIu64 "\n", + ts, it->first, + it->second.count_p_injected, it->second.count_p_dropped, + it->second.latency_min, + it->second.latency_sum / (it->second.count_p_injected + it->second.count_p_dropped), + it->second.latency_max); + + injected_packets += it->second.count_p_injected; + dropped_packets += it->second.count_p_dropped; + injected_bytes += it->second.count_b_injected; + } + antenna_stat.clear(); +} + + + void Transmitter::send_block_fragment(size_t packet_size) { uint8_t ciphertext[MAX_FORWARDER_PACKET_SIZE]; @@ -529,7 +682,7 @@ void data_source(shared_ptr &t, vector &rx_fd, int control_fd, uint64_t fec_close_ts = fec_timeout > 0 ? get_time_ms() + fec_timeout : 0; uint32_t count_p_fec_timeouts = 0; // empty packets sent to close fec block due to timeout uint32_t count_p_incoming = 0; // incoming udp packets (received + dropped due to rxq overflow) - uint32_t count_b_incoming = 0; // incoming udp bytes (received + dropped due to rxq overflow) + uint32_t count_b_incoming = 0; // incoming udp bytes (received only) uint32_t count_p_injected = 0; // successfully injected packets (include additional fec packets) uint32_t count_b_injected = 0; // successfully injected bytes (include additional fec packets) uint32_t count_p_dropped = 0; // dropped due to rxq overflows or injection timeout @@ -878,8 +1031,6 @@ radiotap_header_t init_radiotap_header(uint8_t stbc, switch(bandwidth) { case 10: - flags |= IEEE80211_RADIOTAP_MCS_BW_20; - break; case 20: flags |= IEEE80211_RADIOTAP_MCS_BW_20; break; @@ -942,8 +1093,6 @@ radiotap_header_t init_radiotap_header(uint8_t stbc, switch(bandwidth) { case 10: - res.header[VHT_BW_OFF] = IEEE80211_RADIOTAP_VHT_BW_20M; - break; case 20: res.header[VHT_BW_OFF] = IEEE80211_RADIOTAP_VHT_BW_20M; break; @@ -974,6 +1123,340 @@ radiotap_header_t init_radiotap_header(uint8_t stbc, } +void packet_injector(RawSocketInjector &t, vector &rx_fd, int log_interval) +{ + int nfds = rx_fd.size(); + assert(nfds > 0); + + struct pollfd fds[nfds]; + memset(fds, '\0', sizeof(fds)); + + for(size_t i=0; i < rx_fd.size(); i++) + { + fds[i].fd = rx_fd[i]; + fds[i].events = POLLIN; + } + + uint32_t rxq_overflow = 0; + uint64_t log_send_ts = 0; + + uint32_t count_p_incoming = 0; // incoming udp packets (received + dropped due to rxq overflow) + uint32_t count_b_incoming = 0; // incoming udp bytes (received only) + uint32_t count_p_dropped = 0; // dropped due to rxq overflows or injection timeout + uint32_t count_p_bad = 0; // injected large packets that were bad + + int start_fd_idx = 0; + + for(;;) + { + uint64_t cur_ts = get_time_ms(); + int poll_timeout = log_send_ts > cur_ts ? log_send_ts - cur_ts : 0; + int rc = poll(fds, nfds, poll_timeout); + + if (rc < 0) + { + if (errno == EINTR || errno == EAGAIN) continue; + throw runtime_error(string_format("poll error: %s", strerror(errno))); + } + + cur_ts = get_time_ms(); + + if (cur_ts >= log_send_ts) // log timeout expired + { + if(count_p_dropped) + { + fprintf(stderr, "%u packets dropped\n", count_p_dropped); + } + + if(count_p_bad) + { + fprintf(stderr, "%u packets bad\n", count_p_bad); + } + + count_p_incoming = 0; + count_b_incoming = 0; + count_p_dropped = 0; + count_p_bad = 0; + + log_send_ts = cur_ts + log_interval; + } + + if (rc == 0) // poll timeout + { + continue; + } + + // rc > 0: events detected + // start from last fd index and reset it to zero + int i = start_fd_idx; + for(start_fd_idx = 0; rc > 0; i++) + { + if (fds[i % nfds].revents & (POLLERR | POLLNVAL)) + { + throw runtime_error(string_format("socket error: %s", strerror(errno))); + } + + if (fds[i % nfds].revents & POLLIN) + { + uint8_t buf[MAX_DISTRIBUTION_PACKET_SIZE - sizeof(uint32_t) + 1]; + uint8_t cmsgbuf[CMSG_SPACE(sizeof(uint32_t))]; + rc -= 1; + + for(;;) + { + ssize_t rsize; + uint32_t _fwmark; + int fd = fds[i % nfds].fd; + + struct iovec iov[2] = { + // fwmark + { + .iov_base = (void*)&_fwmark, + .iov_len = sizeof(_fwmark), + }, + // packet with radiotap header + { + .iov_base = (void*)buf, + .iov_len = sizeof(buf), + } + }; + + struct msghdr msghdr = { .msg_name = NULL, + .msg_namelen = 0, + .msg_iov = iov, + .msg_iovlen = 2, + .msg_control = &cmsgbuf, + .msg_controllen = sizeof(cmsgbuf), + .msg_flags = 0 }; + + memset(cmsgbuf, '\0', sizeof(cmsgbuf)); + + if ((rsize = recvmsg(fd, &msghdr, MSG_DONTWAIT)) < 0) + { + if (errno != EWOULDBLOCK) throw runtime_error(string_format("Error receiving packet: %s", strerror(errno))); + break; + } + + if (rsize < (ssize_t)MIN_DISTRIBUTION_PACKET_SIZE || rsize > (ssize_t)MAX_DISTRIBUTION_PACKET_SIZE) + { + count_p_bad += 1; + continue; + } + + rsize -= sizeof(uint32_t); + count_p_incoming += 1; + count_b_incoming += rsize; + + uint32_t cur_rxq_overflow = extract_rxq_overflow(&msghdr); + if (cur_rxq_overflow != rxq_overflow) + { + // Count dropped packets as possible incoming + count_p_dropped += (cur_rxq_overflow - rxq_overflow); + count_p_incoming += (cur_rxq_overflow - rxq_overflow); + rxq_overflow = cur_rxq_overflow; + } + + cur_ts = get_time_ms(); + + t.inject_packet(i % nfds, buf, rsize, ntohl(_fwmark)); + + if (cur_ts >= log_send_ts) // log timeout expired + { + // We need to transmit all packets from the queue before tx card switch + start_fd_idx = i % nfds; + break; + } + } + } + } + } +} + +void injector_loop(int argc, char* const* argv, int optind, int srv_port, int rcv_buf, bool use_qdisc, int log_interval) +{ + vector rx_fd; + vector wlans; + for(int i = 0; optind + i < argc; i++) + { + int bind_port = srv_port != 0 ? srv_port + i : 0; + int fd = open_udp_socket_for_rx(bind_port, rcv_buf); + + if (srv_port == 0) + { + struct sockaddr_in saddr; + socklen_t saddr_size = sizeof(saddr); + + if (getsockname(fd, (struct sockaddr *)&saddr, &saddr_size) != 0) + { + throw runtime_error(string_format("Unable to get socket info: %s", strerror(errno))); + } + bind_port = ntohs(saddr.sin_port); + printf("%" PRIu64 "\tLISTEN_UDP\t%d:%x\n", get_time_ms(), bind_port, i); + } + fprintf(stderr, "Listen on %d for %s\n", bind_port, argv[optind + i]); + rx_fd.push_back(fd); + wlans.push_back(string(argv[optind + i])); + } + + if (srv_port == 0) + { + printf("%" PRIu64 "\tLISTEN_UDP_END\n", get_time_ms()); + fflush(stdout); + } + + auto t = RawSocketInjector(wlans, use_qdisc); + packet_injector(t, rx_fd, log_interval); +} + + +int open_control_fd(int control_port) +{ + int control_fd = open_udp_socket_for_rx(control_port, 0, 0x7f000001); // bind to 127.0.0.1 for security reasons + + if (control_port == 0) + { + struct sockaddr_in saddr; + socklen_t saddr_size = sizeof(saddr); + + if (getsockname(control_fd, (struct sockaddr *)&saddr, &saddr_size) != 0) + { + throw runtime_error(string_format("Unable to get socket info: %s", strerror(errno))); + } + control_port = ntohs(saddr.sin_port); + printf("%" PRIu64 "\tLISTEN_UDP_CONTROL\t%d\n", get_time_ms(), control_port); + } + + fprintf(stderr, "Listen on %d for management commands\n", control_port); + return control_fd; +} + +void local_loop(int argc, char* const* argv, int optind, int srv_port, int rcv_buf, int log_interval, + int udp_port, int debug_port, int k, int n, const string &keypair, int fec_timeout, + uint64_t epoch, uint32_t channel_id, uint32_t fec_delay, bool use_qdisc, uint32_t fwmark, + radiotap_header_t &radiotap_header, uint8_t frame_type, int control_port, bool mirror) +{ + vector rx_fd; + vector wlans; + vector tags; + shared_ptr t; + + for(int i = 0; optind + i < argc; i++) + { + int bind_port = udp_port != 0 ? udp_port + i : 0; + int fd = open_udp_socket_for_rx(bind_port, rcv_buf); + + if (udp_port == 0) + { + struct sockaddr_in saddr; + socklen_t saddr_size = sizeof(saddr); + + if (getsockname(fd, (struct sockaddr *)&saddr, &saddr_size) != 0) + { + throw runtime_error(string_format("Unable to get socket info: %s", strerror(errno))); + } + bind_port = ntohs(saddr.sin_port); + printf("%" PRIu64 "\tLISTEN_UDP\t%d:%x\n", get_time_ms(), bind_port, i); + } + fprintf(stderr, "Listen on %d for %s\n", bind_port, argv[optind + i]); + rx_fd.push_back(fd); + wlans.push_back(string(argv[optind + i])); + } + + if (udp_port == 0) + { + printf("%" PRIu64 "\tLISTEN_UDP_END\n", get_time_ms()); + fflush(stdout); + } + + if (debug_port) + { + fprintf(stderr, "Using %zu ports from %d for wlan emulation\n", wlans.size(), debug_port); + t = shared_ptr(new UdpTransmitter(k, n, keypair, "127.0.0.1", debug_port, epoch, channel_id, + fec_delay, tags, use_qdisc, fwmark)); + } + else + { + t = shared_ptr(new RawSocketTransmitter(k, n, keypair, epoch, channel_id, fec_delay, tags, + wlans, radiotap_header, frame_type, use_qdisc, fwmark)); + } + + int control_fd = open_control_fd(control_port); + data_source(t, rx_fd, control_fd, fec_timeout, mirror, log_interval); +} + +void distributor_loop(int argc, char* const* argv, int optind, int srv_port, int rcv_buf, int log_interval, + int udp_port, int k, int n, const string &keypair, int fec_timeout, + uint64_t epoch, uint32_t channel_id, uint32_t fec_delay, bool use_qdisc, uint32_t fwmark, + radiotap_header_t &radiotap_header, uint8_t frame_type, int control_port, bool mirror) +{ + vector rx_fd; + vector>> remote_hosts; + int port_idx = 0; + + set hosts; + + for(int i = optind; i < argc; i++) + { + vector remote_ports; + char *p = argv[i]; + char *t = NULL; + + t = strsep(&p, ":"); + if (t == NULL) continue; + + string remote_host = string(t); + + if(hosts.count(remote_host)) + { + throw runtime_error(string_format("Duplicate host %s", remote_host.c_str())); + } + + hosts.insert(remote_host); + + for(int j=0; (t=strsep(&p, ",")) != NULL; j++) + { + uint16_t remote_port = atoi(t); + int bind_port = (udp_port != 0) ? (udp_port + port_idx++) : 0; + int fd = open_udp_socket_for_rx(bind_port, rcv_buf); + + if (udp_port == 0) + { + struct sockaddr_in saddr; + socklen_t saddr_size = sizeof(saddr); + + if (getsockname(fd, (struct sockaddr *)&saddr, &saddr_size) != 0) + { + throw runtime_error(string_format("Unable to get socket info: %s", strerror(errno))); + } + bind_port = ntohs(saddr.sin_port); + + uint64_t wlan_id = (uint64_t)ntohl(inet_addr(remote_host.c_str())) << 24 | j; + printf("%" PRIu64 "\tLISTEN_UDP\t%d:%" PRIx64 "\n", get_time_ms(), bind_port, wlan_id); + } + + fprintf(stderr, "Listen on %d for %s:%d\n", bind_port, remote_host.c_str(), remote_port); + + rx_fd.push_back(fd); + remote_ports.push_back(remote_port); + } + + remote_hosts.push_back(pair>(remote_host, remote_ports)); + } + + if (udp_port == 0) + { + printf("%" PRIu64 "\tLISTEN_UDP_END\n", get_time_ms()); + fflush(stdout); + } + + vector tags; + shared_ptr t = shared_ptr(new RemoteTransmitter(k, n, keypair, epoch, channel_id, fec_delay, tags, + remote_hosts, radiotap_header, frame_type, use_qdisc, fwmark)); + + int control_fd = open_control_fd(control_port); + data_source(t, rx_fd, control_fd, fec_timeout, mirror, log_interval); +} + int main(int argc, char * const *argv) { int opt; @@ -981,6 +1464,7 @@ int main(int argc, char * const *argv) uint32_t fec_delay = 0; uint32_t link_id = 0x0; uint64_t epoch = 0; + int srv_port = 10000; int udp_port=5600; int control_port=0; int log_interval = 1000; @@ -1000,9 +1484,17 @@ int main(int argc, char * const *argv) uint8_t frame_type = FRAME_TYPE_DATA; bool use_qdisc = false; uint32_t fwmark = 0; + tx_mode_t tx_mode = LOCAL; - while ((opt = getopt(argc, argv, "K:k:n:u:p:F:l:B:G:S:L:M:N:D:T:i:e:R:f:mVQP:C:")) != -1) { + while ((opt = getopt(argc, argv, "dI:K:k:n:u:p:F:l:B:G:S:L:M:N:D:T:i:e:R:f:mVQP:C:")) != -1) { switch (opt) { + case 'I': + tx_mode = INJECTOR; + srv_port = atoi(optarg); + break; + case 'd': + tx_mode = DISTRIBUTOR; + break; case 'K': keypair = optarg; break; @@ -1095,7 +1587,13 @@ int main(int argc, char * const *argv) break; default: /* '?' */ show_usage: - fprintf(stderr, "Usage: %s [-K tx_key] [-k RS_K] [-n RS_N] [-u udp_port] [-R rcv_buf] [-p radio_port] [-F fec_delay] [-B bandwidth] [-G guard_interval] [-S stbc] [-L ldpc] [-M mcs_index] [-N VHT_NSS] [-T fec_timeout] [-l log_interval] [-e epoch] [-i link_id] [-f { data | rts }] [-m] [-V] [-Q] [-P fwmark] [-C control_port] interface1 [interface2] ...\n", + fprintf(stderr, "Local TX: %s [-K tx_key] [-k RS_K] [-n RS_N] [-u udp_port] [-R rcv_buf] [-p radio_port] [-F fec_delay] [-B bandwidth] [-G guard_interval] [-S stbc] [-L ldpc] [-M mcs_index] [-N VHT_NSS]\n" + " [-T fec_timeout] [-l log_interval] [-e epoch] [-i link_id] [-f { data | rts }] [-m] [-V] [-Q] [-P fwmark] [-C control_port] interface1 [interface2] ...\n", + argv[0]); + fprintf(stderr, "TX distributor: %s -d [-K tx_key] [-k RS_K] [-n RS_N] [-u udp_port] [-R rcv_buf] [-p radio_port] [-F fec_delay] [-B bandwidth] [-G guard_interval] [-S stbc] [-L ldpc] [-M mcs_index] [-N VHT_NSS]\n" + " [-T fec_timeout] [-l log_interval] [-e epoch] [-i link_id] [-f { data | rts }] [-m] [-V] [-Q] [-P fwmark] [-C control_port] host1:port1,port2,... [host2:port1,port2,...] ...\n", + argv[0]); + fprintf(stderr, "TX injector: %s -I port [-Q] [-R rcv_buf] [-l log_interval] interface1 [interface2] ...\n", argv[0]); fprintf(stderr, "Default: K='%s', k=%d, n=%d, fec_delay=%u [us], udp_port=%d, link_id=0x%06x, radio_port=%u, epoch=%" PRIu64 ", bandwidth=%d guard_interval=%s stbc=%d ldpc=%d mcs_index=%d vht_nss=%d, vht_mode=%d, fec_timeout=%d, log_interval=%d, rcv_buf=system_default, frame_type=data, mirror=false, use_qdisc=false, fwmark=%u, control_port=%d\n", keypair.c_str(), k, n, fec_delay, udp_port, link_id, radio_port, epoch, bandwidth, short_gi ? "short" : "long", stbc, ldpc, mcs_index, vht_nss, vht_mode, fec_timeout, log_interval, fwmark, control_port); @@ -1134,70 +1632,34 @@ int main(int argc, char * const *argv) try { auto radiotap_header = init_radiotap_header(stbc, ldpc, short_gi, bandwidth, mcs_index, vht_mode, vht_nss); + uint32_t channel_id = (link_id << 8) + radio_port; - vector rx_fd; - vector wlans; - int control_fd = open_udp_socket_for_rx(control_port, 0, 0x7f000001); // bind to 127.0.0.1 for security reasons - - if (control_port == 0) - { - struct sockaddr_in saddr; - socklen_t saddr_size = sizeof(saddr); - - if (getsockname(control_fd, (struct sockaddr *)&saddr, &saddr_size) != 0) - { - throw runtime_error(string_format("Unable to get socket info: %s", strerror(errno))); - } - control_port = ntohs(saddr.sin_port); - printf("%" PRIu64 "\tLISTEN_UDP_CONTROL\t%d\n", get_time_ms(), control_port); - } - fprintf(stderr, "Listen on %d for management commands\n", control_port); - - for(int i = 0; optind + i < argc; i++) + switch(tx_mode) { - int bind_port = udp_port != 0 ? udp_port + i : 0; - int fd = open_udp_socket_for_rx(bind_port, rcv_buf); - - if (udp_port == 0) - { - struct sockaddr_in saddr; - socklen_t saddr_size = sizeof(saddr); - - if (getsockname(fd, (struct sockaddr *)&saddr, &saddr_size) != 0) - { - throw runtime_error(string_format("Unable to get socket info: %s", strerror(errno))); - } - bind_port = ntohs(saddr.sin_port); - printf("%" PRIu64 "\tLISTEN_UDP\t%d:%s\n", get_time_ms(), bind_port, argv[optind + i]); - } - fprintf(stderr, "Listen on %d for %s\n", bind_port, argv[optind + i]); - rx_fd.push_back(fd); - wlans.push_back(string(argv[optind + i])); - } + case INJECTOR: + injector_loop(argc, argv, optind, srv_port, rcv_buf, use_qdisc, log_interval); + break; - if (udp_port == 0) - { - printf("%" PRIu64 "\tLISTEN_UDP_END\n", get_time_ms()); - fflush(stdout); - } + case LOCAL: + local_loop(argc, argv, optind, srv_port, rcv_buf, log_interval, + udp_port, debug_port, k, n, keypair, fec_timeout, + epoch, channel_id, fec_delay, use_qdisc, fwmark, + radiotap_header, frame_type, control_port, mirror); + break; - vector tags; - shared_ptr t; - uint32_t channel_id = (link_id << 8) + radio_port; + case DISTRIBUTOR: + distributor_loop(argc, argv, optind, srv_port, rcv_buf, log_interval, + udp_port, k, n, keypair, fec_timeout, + epoch, channel_id, fec_delay, use_qdisc, fwmark, + radiotap_header, frame_type, control_port, mirror); + break; - if (debug_port) - { - fprintf(stderr, "Using %zu ports from %d for wlan emulation\n", wlans.size(), debug_port); - t = shared_ptr(new UdpTransmitter(k, n, keypair, "127.0.0.1", debug_port, epoch, channel_id, - fec_delay, tags, use_qdisc, fwmark)); - } else { - t = shared_ptr(new RawSocketTransmitter(k, n, keypair, epoch, channel_id, fec_delay, tags, - wlans, radiotap_header, frame_type, use_qdisc, fwmark)); + default: + assert(0); } - - data_source(t, rx_fd, control_fd, fec_timeout, mirror, log_interval); - }catch(runtime_error &e) + } + catch(runtime_error &e) { fprintf(stderr, "Error: %s\n", e.what()); exit(1); diff --git a/src/tx.hpp b/src/tx.hpp index ab7211c4..3213fcfc 100644 --- a/src/tx.hpp +++ b/src/tx.hpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include @@ -52,6 +53,21 @@ typedef struct { uint8_t vht_nss; } radiotap_header_t; + +radiotap_header_t init_radiotap_header(uint8_t stbc, + bool ldpc, + bool short_gi, + uint8_t bandwidth, + uint8_t mcs_index, + bool vht_mode, + uint8_t vht_nss); + +typedef enum { + LOCAL, + INJECTOR, + DISTRIBUTOR +} tx_mode_t; + class Transmitter { public: @@ -95,6 +111,7 @@ class Transmitter std::vector tags; }; + class txAntennaItem { public: @@ -140,19 +157,14 @@ class RawSocketTransmitter : public Transmitter public: RawSocketTransmitter(int k, int n, const std::string &keypair, uint64_t epoch, uint32_t channel_id, uint32_t fec_delay, std::vector &tags, const std::vector &wlans, radiotap_header_t &radiotap_header, - uint8_t frame_type, bool use_qdisc, uint32_t fwmark); + uint8_t frame_type, bool use_qdisc, uint32_t fwmark_base); virtual ~RawSocketTransmitter(); + virtual void select_output(int idx) { - bool sw = current_output != idx; current_output = idx; - if (sw) - { - // select_output call should happend only between data packets - // All FEC packets issued after last data packet in block and will have set_mark(1) - set_mark(0); - } } + virtual void dump_stats(FILE *fp, uint64_t ts, uint32_t &injected_packets, uint32_t &dropped_packets, uint32_t &injected_bytes); virtual void update_radiotap_header(radiotap_header_t &radiotap_header) { @@ -166,16 +178,23 @@ class RawSocketTransmitter : public Transmitter private: virtual void inject_packet(const uint8_t *buf, size_t size); - virtual void set_mark(uint32_t idx); + + virtual void set_mark(uint32_t idx) + { + fwmark = fwmark_base + idx; + } + const uint32_t channel_id; int current_output; uint16_t ieee80211_seq; std::vector sockfds; + std::map fd_fwmarks; tx_antenna_stat_t antenna_stat; radiotap_header_t radiotap_header; const uint8_t frame_type; const bool use_qdisc; - const uint32_t fwmark; + const uint32_t fwmark_base; + uint32_t fwmark; }; @@ -183,8 +202,8 @@ class UdpTransmitter : public Transmitter { public: UdpTransmitter(int k, int n, const std::string &keypair, const std::string &client_addr, int base_port, uint64_t epoch, uint32_t channel_id, - uint32_t fec_delay, std::vector &tags, bool use_qdisc, uint32_t fwmark): \ - Transmitter(k, n, keypair, epoch, channel_id, fec_delay, tags), radiotap_header({}), base_port(base_port), use_qdisc(use_qdisc), fwmark(fwmark) + uint32_t fec_delay, std::vector &tags, bool use_qdisc, uint32_t fwmark_base): \ + Transmitter(k, n, keypair, epoch, channel_id, fec_delay, tags), radiotap_header({}), base_port(base_port), use_qdisc(use_qdisc), fwmark_base(fwmark_base) { sockfd = socket(AF_INET, SOCK_DGRAM, 0); if (sockfd < 0) throw std::runtime_error(string_format("Error opening socket: %s", strerror(errno))); @@ -215,7 +234,7 @@ class UdpTransmitter : public Transmitter return; } - uint32_t sockopt = this->fwmark + idx; + uint32_t sockopt = this->fwmark_base + idx; if(setsockopt(sockfd, SOL_SOCKET, SO_MARK, (const void *)&sockopt , sizeof(sockopt)) !=0) { throw runtime_error(string_format("Unable to set SO_MARK fd(%d)=%u: %s", sockfd, sockopt, strerror(errno))); @@ -271,13 +290,143 @@ class UdpTransmitter : public Transmitter int base_port; struct sockaddr_in saddr; const bool use_qdisc; - const uint32_t fwmark; + const uint32_t fwmark_base; }; -radiotap_header_t init_radiotap_header(uint8_t stbc, - bool ldpc, - bool short_gi, - uint8_t bandwidth, - uint8_t mcs_index, - bool vht_mode, - uint8_t vht_nss); + +class RemoteTransmitter : public Transmitter +{ +public: + RemoteTransmitter(int k, int n, const std::string &keypair, uint64_t epoch, uint32_t channel_id, uint32_t fec_delay, std::vector &tags, + const std::vector>> &remote_hosts, radiotap_header_t &radiotap_header, + uint8_t frame_type, bool use_qdisc, uint32_t fwmark_base); + virtual ~RemoteTransmitter() + { + close(sockfd); + } + + virtual void select_output(int idx) + { + current_output = idx; + } + + virtual void dump_stats(FILE *fp, uint64_t ts, uint32_t &injected_packets, uint32_t &dropped_packets, uint32_t &injected_bytes); + virtual void update_radiotap_header(radiotap_header_t &radiotap_header) + { + this->radiotap_header = radiotap_header; + } + + virtual radiotap_header_t get_radiotap_header(void) + { + return radiotap_header; + } + +private: + virtual void inject_packet(const uint8_t *buf, size_t size); + + virtual void set_mark(uint32_t idx) + { + fwmark = fwmark_base + idx; + } + + const uint32_t channel_id; + int current_output; + uint16_t ieee80211_seq; + int sockfd; + std::vector sockaddrs; + std::map output_to_ant_id; + tx_antenna_stat_t antenna_stat; + radiotap_header_t radiotap_header; + const uint8_t frame_type; + const bool use_qdisc; + const uint32_t fwmark_base; + uint32_t fwmark; +}; + + +class RawSocketInjector +{ +public: + + RawSocketInjector(const vector &wlans, bool use_qdisc) : use_qdisc(use_qdisc) + { + for(auto it=wlans.begin(); it!=wlans.end(); it++) + { + int fd = socket(PF_PACKET, SOCK_RAW, 0); + if (fd < 0) + { + throw runtime_error(string_format("Unable to open PF_PACKET socket: %s", strerror(errno))); + } + + if(!use_qdisc) + { + const int optval = 1; + if(setsockopt(fd, SOL_PACKET, PACKET_QDISC_BYPASS, (const void *)&optval , sizeof(optval)) !=0) + { + close(fd); + throw runtime_error(string_format("Unable to set PACKET_QDISC_BYPASS: %s", strerror(errno))); + } + } + + struct ifreq ifr; + memset(&ifr, '\0', sizeof(ifr)); + strncpy(ifr.ifr_name, it->c_str(), sizeof(ifr.ifr_name) - 1); + + if (ioctl(fd, SIOCGIFINDEX, &ifr) < 0) + { + close(fd); + throw runtime_error(string_format("Unable to get interface index for %s: %s", it->c_str(), strerror(errno))); + } + + struct sockaddr_ll sll; + memset(&sll, '\0', sizeof(sll)); + sll.sll_family = AF_PACKET; + sll.sll_ifindex = ifr.ifr_ifindex; + sll.sll_protocol = 0; + + if (::bind(fd, (struct sockaddr *) &sll, sizeof(sll)) < 0) + { + close(fd); + throw runtime_error(string_format("Unable to bind to %s: %s", it->c_str(), strerror(errno))); + } + + sockfds.push_back(fd); + fd_fwmarks[fd] = 0; + } + } + + void inject_packet(int wlan_idx, const uint8_t *buf, size_t size, uint32_t fwmark) + { + int fd = sockfds[wlan_idx]; + + if (use_qdisc && fd_fwmarks[fd] != fwmark) + { + uint32_t sockopt = fwmark; + + if(setsockopt(fd, SOL_SOCKET, SO_MARK, (const void *)&sockopt , sizeof(sockopt)) !=0) + { + throw runtime_error(string_format("Unable to set SO_MARK fd(%d)=%u: %s", fd, sockopt, strerror(errno))); + } + + fd_fwmarks[fd] = fwmark; + } + + if (send(fd, buf, size, 0) < 0 && errno != ENOBUFS) + { + throw runtime_error(string_format("Unable to inject packet: %s", strerror(errno))); + } + } + + ~RawSocketInjector() + { + for(auto it=sockfds.begin(); it != sockfds.end(); it++) + { + close(*it); + } + } + +private: + std::vector sockfds; + std::map fd_fwmarks; + const bool use_qdisc; +}; diff --git a/src/wifibroadcast.cpp b/src/wifibroadcast.cpp index 469538a5..416693c3 100644 --- a/src/wifibroadcast.cpp +++ b/src/wifibroadcast.cpp @@ -62,10 +62,10 @@ uint64_t get_time_us(void) // in microseconds return ts.tv_sec * 1000000LL + ts.tv_nsec / 1000; } -int open_udp_socket_for_rx(int port, int rcv_buf_size, uint32_t bind_addr) +int open_udp_socket_for_rx(int port, int rcv_buf_size, uint32_t bind_addr, int socket_type, int socket_protocol) { struct sockaddr_in saddr; - int fd = socket(AF_INET, SOCK_DGRAM, 0); + int fd = socket(AF_INET, socket_type, socket_protocol); if (fd < 0) throw runtime_error(string_format("Error opening socket: %s", strerror(errno))); const int optval = 1; diff --git a/src/wifibroadcast.hpp b/src/wifibroadcast.hpp index c726d00f..8651ec9c 100644 --- a/src/wifibroadcast.hpp +++ b/src/wifibroadcast.hpp @@ -253,8 +253,11 @@ typedef struct { #define MAX_FEC_PAYLOAD (WIFI_MTU - sizeof(ieee80211_header) - sizeof(wblock_hdr_t) - crypto_aead_chacha20poly1305_ABYTES) #define MAX_FORWARDER_PACKET_SIZE (WIFI_MTU - sizeof(ieee80211_header)) #define MAX_SESSION_PACKET_SIZE (WIFI_MTU - sizeof(ieee80211_header)) +#define MIN_DISTRIBUTION_PACKET_SIZE (sizeof(uint32_t) + sizeof(radiotap_header_ht) + sizeof(ieee80211_header)) // ht hdr < vht hdr +#define MAX_DISTRIBUTION_PACKET_SIZE (sizeof(uint32_t) + sizeof(radiotap_header_vht) + WIFI_MTU) +#define MAX_PCAP_PACKET_SIZE (WIFI_MTU + 256) // radiotap header is variable but 8812au/eu has max rtap buffer size 256 -int open_udp_socket_for_rx(int port, int rcv_buf_size, uint32_t bind_addr = INADDR_ANY); +int open_udp_socket_for_rx(int port, int rcv_buf_size, uint32_t bind_addr = INADDR_ANY, int socket_type = SOCK_DGRAM, int socket_protocol = 0); uint64_t get_time_ms(void); uint64_t get_time_us(void); diff --git a/stdeb.cfg b/stdeb.cfg index 279676c4..da56d099 100644 --- a/stdeb.cfg +++ b/stdeb.cfg @@ -1,5 +1,5 @@ [DEFAULT] -Depends3: python3-twisted, libpcap-dev, libsodium-dev, python3-pyroute2, python3-future, python3-serial, python3-msgpack +Depends3: python3-twisted, libpcap-dev, libsodium-dev, python3-pyroute2, python3-future, python3-serial, python3-msgpack, python3-jinja2 Package3: wfb-ng Replaces3: wifibroadcast Maintainer: Vasily Evseenko diff --git a/wfb_ng/cli.py b/wfb_ng/cli.py index 7f74103c..1a637fb1 100644 --- a/wfb_ng/cli.py +++ b/wfb_ng/cli.py @@ -125,8 +125,22 @@ def human_rate(r): return '%3d %s' % (rate, mod) +def format_ant(ant_id): + if ant_id < (1<<32): + if ant_id & 0xff == 0xff: + return '%2X:X ' % (ant_id >> 8) + else: + return '%2X:%X ' % (ant_id >> 8, ant_id & 0xff) + + if ant_id & 0xff == 0xff: + return '%08X:%X:X' % (ant_id >> 32, (ant_id >> 8) & 0xff) + else: + return '%08X:%X:%X' % (ant_id >> 32, (ant_id >> 8) & 0xff, ant_id & 0xff) + + class AntennaStat(Int32StringReceiver): MAX_LENGTH = 1024 * 1024 + is_cluster = False def stringReceived(self, string): attrs = msgpack.unpackb(string, strict_map_key=False, use_list=False) @@ -136,13 +150,14 @@ def stringReceived(self, string): elif attrs['type'] == 'tx': self.draw_tx(attrs) elif attrs['type'] == 'cli_title': + self.is_cluster = attrs['is_cluster'] set_window_title(attrs['cli_title']) def draw_rx(self, attrs): p = attrs['packets'] session_d = attrs['session'] stats_d = attrs['rx_ant_stats'] - tx_ant = attrs.get('tx_ant') + tx_wlan = attrs.get('tx_wlan') rx_id = attrs['id'] window = self.factory.windows.get(rx_id) @@ -175,14 +190,21 @@ def draw_rx(self, attrs): addstr_markup(window, 0, 20, flow_str) if stats_d: - addstr_markup(window, 2, 20, '{Freq MCS BW [ANT] pkt/s} {RSSI} [dBm] {SNR} [dB]') + if self.is_cluster: + lpad = ' ' * 4 + rpad = ' ' * 3 + else: + lpad = '' + rpad = '' + + addstr_markup(window, 2, 20, '{Freq MCS BW %s[ANT]%s pkt/s} {RSSI} [dBm] {SNR} [dB]' % (lpad, rpad)) for y, (((freq, mcs_index, bandwith), ant_id), v) in enumerate(sorted(stats_d.items()), 3): pkt_s, rssi_min, rssi_avg, rssi_max, snr_min, snr_avg, snr_max = v if y < ymax: - active_tx = (ant_id >> 8) == tx_ant - addstr_markup(window, y, 20, '%04d %3d %2d %s%04x%s %4d %3d < {%3d} < %3d %3d < {%3d} < %3d' % \ + active_tx = ((ant_id >> 8) == tx_wlan) + addstr_markup(window, y, 20, '%04d %3d %2d %s%s%s %4d %3d < {%3d} < %3d %3d < {%3d} < %3d' % \ (freq, mcs_index, bandwith, - '{' if active_tx else '', ant_id, '}' if active_tx else '', + '{' if active_tx else '', format_ant(ant_id), '}' if active_tx else '', pkt_s, rssi_min, rssi_avg, rssi_max, snr_min, snr_avg, snr_max), 0 if active_tx else curses.A_DIM) @@ -221,7 +243,14 @@ def draw_tx(self, attrs): human_rate(p['injected_bytes'][0]))) if latency_d: - addstr_markup(window, 2, 20, '{[ANT] pkt/s} {\u00b0C} {Injection} [us]') + if self.is_cluster: + lpad = ' ' * 4 + rpad = ' ' * 3 + else: + lpad = '' + rpad = '' + + addstr_markup(window, 2, 20, '{%s[ANT]%s pkt/s} {\u00b0C} {Injection} [us]' % (lpad, rpad)) for y, (k, v) in enumerate(sorted(latency_d.items()), 3): k = int(k) # json doesn't support int keys injected, dropped, lat_min, lat_avg, lat_max = v @@ -237,7 +266,7 @@ def draw_tx(self, attrs): temp = ' (--)' if y < ymax: - addstr_markup(window, y, 21, '{%02x}(XX) %4d %3s %4d < {%4d} < %4d' % (k >> 8, injected, temp, lat_min, lat_avg, lat_max)) + addstr_markup(window, y, 20, '{%s} %4d %3s %4d < {%4d} < %4d' % (format_ant(k), injected, temp, lat_min, lat_avg, lat_max)) else: addstr_noerr(window, 2, 20, '[No data]', curses.A_REVERSE) @@ -262,7 +291,7 @@ def init_windows(self): curses.resize_term(height, width) self.stdscr.clear() - service_list = list((s_name, cfg.stream_rx is not None, cfg.stream_tx is not None) for s_name, _, cfg in parse_services(self.profile)) + service_list = list((s_name, cfg.stream_rx is not None, cfg.stream_tx is not None) for s_name, _, cfg in parse_services(self.profile, None)) if not service_list: rectangle(self.stdscr, 0, 0, height - 1, width - 1) diff --git a/wfb_ng/cluster.py b/wfb_ng/cluster.py new file mode 100644 index 00000000..d0c09f13 --- /dev/null +++ b/wfb_ng/cluster.py @@ -0,0 +1,149 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2018-2024 Vasily Evseenko + +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 3. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# + +import itertools +from jinja2 import Environment, StrictUndefined + +from .services import parse_services, hash_link_domain, bandwidth_map +from .conf import settings + +def parse_cluster_services(profiles): + if not settings.cluster.nodes: + raise Exception('Cluster is empty!') + + if not settings.cluster.server_address: + raise Exception('Server IP address is not set!') + + udp_port_allocator = itertools.count(settings.cluster.base_port_server) + services = list((profile, parse_services(profile, udp_port_allocator)) for profile in profiles) + port_allocators = {} + cluster_nodes = {} + + def update_node(node, profile, service_name, link_id, tx_port_base, wlans, srv_cfg): + d = dict(wlans = wlans, + link_id = link_id, + bandwidth = srv_cfg.bandwidth, + stream_tx = srv_cfg.stream_tx, + stream_rx = srv_cfg.stream_rx, + tx_port_base = tx_port_base, + rx_fwd = (settings.cluster.server_address, srv_cfg.udp_port_auto)) + + if node not in cluster_nodes: + cluster_nodes[node] = {} + + cluster_nodes[node]['%s_%s' % (profile, service_name)] = d + + + def get_allocator(node): + alloc = port_allocators.get(node) + if alloc is not None: + return alloc + + alloc = itertools.count(settings.cluster.base_port_node) + port_allocators[node] = alloc + + return alloc + + for profile, service_list in services: + link_id = hash_link_domain(getattr(settings, profile).link_domain) + + for service_name, service_type, srv_cfg in service_list: + auto_peers = [] + # Sort cluster nodes for stable result + for node, attrs in sorted(settings.cluster.nodes.items(), key=lambda x: x[0]): + ports = list(next(get_allocator(node)) for wlan in attrs['wlans']) + if not ports: + raise Exception('WiFi interface list is empty for node %s!' % (node,)) + + auto_peers.append('%s:%s' % (node, ','.join(map(str, ports)))) + update_node(node, profile, service_name, link_id, min(ports), attrs['wlans'], srv_cfg) + + srv_cfg.udp_peers_auto = auto_peers + + return services, cluster_nodes + + + +env = Environment(autoescape=False, undefined=StrictUndefined, trim_blocks=True, lstrip_blocks=True) +env.globals.update({'sorted': sorted, 'repr': repr, 'max': max, + 'min': min, 'None': None, 'settings': settings}) + +script_template = '''\ +#!/bin/bash +set -em + +export LC_ALL=C + +_cleanup() +{ + plist=$(jobs -p) + if [ -n "$plist" ] + then + kill -TERM $plist || true + fi + exit 1 +} + +trap _cleanup EXIT + +iw reg set {{ settings.common.wifi_region }} + +for wlan in {{ wlans|join(' ') }} +do + if which nmcli > /dev/null && ! nmcli device show $wlan | grep -q '(unmanaged)' + then + nmcli device set $wlan managed no + fi + + ip link set $wlan down + iw dev $wlan set monitor otherbss + ip link set $wlan up + iw dev $wlan set channel {{ settings.common.wifi_channel }} {{ ht_mode }} +{% if settings.common.wifi_txpower != None %} + iw dev $wlan set txpower fixed {{ settings.common.wifi_txpower }} +{% endif %} +done +{% for service, attrs in services.items() %} + +# {{ service }} +{% if attrs['stream_rx'] != None %} +wfb_rx -f -c {{ attrs['rx_fwd'][0] }} -u {{ attrs['rx_fwd'][1] }} -p {{ attrs['stream_rx'] }} -i {{ attrs['link_id'] }} -R {{ settings.common.tx_rcv_buf_size }} {{ attrs['wlans']|join(' ') }} & +{% endif %} +{% if attrs['stream_tx'] != None %} +wfb_tx -I {{ attrs['tx_port_base'] }} -R {{ settings.common.tx_rcv_buf_size }} {{ attrs['wlans']|join(' ') }} & +{% endif %} +{% endfor %} + +# Fail in case of connection loss +(set +x; sleep 1; exec cat > /dev/null) & + +wait -n +''' + +script_template = env.from_string(script_template) + +def gen_cluster_scripts(cluster_nodes): + res = {} + + for node, node_attrs in cluster_nodes.items(): + wlans = sorted(set().union(*[srv_attrs['wlans'] for srv_attrs in node_attrs.values()])) + max_bw = max(srv_attrs['bandwidth'] for srv_attrs in node_attrs.values()) + res[node] = script_template.render(wlans=wlans, ht_mode=bandwidth_map[max_bw], services=node_attrs) + + return res diff --git a/wfb_ng/conf/master.cfg b/wfb_ng/conf/master.cfg index 9c9443fd..37e561be 100644 --- a/wfb_ng/conf/master.cfg +++ b/wfb_ng/conf/master.cfg @@ -43,6 +43,29 @@ temp_measurement_interval = 10 # [s] (8812eu only) Internal RF path temp measur temp_overheat_warning = 60 # [*C] (8812eu only) Overheat warning threshold. +# Cluster definition for distributed mode + +[cluster] + +nodes = { + #'127.0.0.1': { 'wlans': ['wlan1', 'wlan2'] }, + #'10.5.1.1' : { 'wlans': ['wlan0', 'wlan1'] }, + } + +# Cluster init can be auto (--cluster ssh) or manual (--cluster manual) +# In second case you need to generate node setup scripts via (--gen-init) +# and run them on cluster nodes yourself + +ssh_user = 'root' +ssh_port = 22 +ssh_key = None # Path to ssh private key. If None then it will try to use ssh-agent + +server_address = None # Set to IP address which is reachable from all cluster nodes +base_port_server = 10000 # UDP ports allocated on server +base_port_node = 11000 # UDP ports allocated on nodes + + + ## Stream allocation scheme: # Down streams (vehicle to GS): 0 - 127 diff --git a/wfb_ng/protocols.py b/wfb_ng/protocols.py new file mode 100644 index 00000000..6820b5c6 --- /dev/null +++ b/wfb_ng/protocols.py @@ -0,0 +1,577 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2018-2024 Vasily Evseenko + +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 3. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# + +import sys +import msgpack +import os +import time + +from itertools import groupby +from twisted.python import log, failure +from twisted.internet import reactor, defer, threads, task +from twisted.internet.protocol import ProcessProtocol, Factory +from twisted.protocols.basic import LineReceiver, Int32StringReceiver + +from .conf import settings + + +class BadTelemetry(Exception): + pass + + +class WFBFlags(object): + LINK_LOST = 1 + LINK_JAMMED = 2 + + +fec_types = {1: 'VDM_RS'} + +class StatisticsProtocol(Int32StringReceiver): + MAX_LENGTH = 1024 * 1024 + + def connectionMade(self): + self.sendString(msgpack.packb(dict(type='cli_title', + cli_title=self.factory.cli_title or "", + is_cluster=self.factory.is_cluster))) + self.factory.ui_sessions.append(self) + + def stringReceived(self, string): + pass + + def connectionLost(self, reason): + self.factory.ui_sessions.remove(self) + + def send_stats(self, data): + self.sendString(msgpack.packb(data)) + + +class RFTempMeter(object): + def __init__(self, wlans, measurement_interval): + # RF module temperature by rf_path + self.wlans = tuple(wlans) + self.rf_temperature = {} + + self.lc = task.LoopingCall(self.read_temperature) + self.lc.start(measurement_interval, now=True) + + def _cleanup(self): + self.lc.stop() + + def read_temperature(self): + def _read_temperature(): + res = {} + for idx, wlan in enumerate(self.wlans): + fname = '/proc/net/rtl88x2eu/%s/thermal_state' % (wlan,) + try: + with open(fname) as fd: + for line in fd: + line = line.strip() + if not line: + continue + + d = {} + for f in line.split(','): + k, v = f.split(':', 1) + d[k.strip()] = int(v.strip()) + + ant_id = (idx << 8) + d['rf_path'] + res[ant_id] = d['temperature'] + except FileNotFoundError: + pass + except Exception as v: + reactor.callFromThread(log.err, v, 'Unable to parse %s:' % (fname,)) + return res + + def _got_temp(temp_d): + self.rf_temperature = temp_d + + return threads.deferToThread(_read_temperature).addCallback(_got_temp) + + +class StatsAndSelectorFactory(Factory): + noisy = False + protocol = StatisticsProtocol + + """ + Aggregate RX stats and select TX antenna + """ + + def __init__(self, logger, cli_title=None, rf_temp_meter=None, is_cluster=False): + self.is_cluster = is_cluster + self.ant_sel_cb_list = [] + self.rssi_cb_l = [] + + self.tx_sel = None + self.tx_sel_rssi_delta = settings.common.tx_sel_rssi_delta + self.tx_sel_counter_rel_delta = settings.common.tx_sel_counter_rel_delta + self.tx_sel_counter_abs_delta = settings.common.tx_sel_counter_abs_delta + + # tcp sockets for UI + self.ui_sessions = [] + + # machine-readable logger + self.logger = logger + + if logger is not None: + self.ui_sessions.append(logger) + + self.cli_title = cli_title + self.rf_temp_meter = rf_temp_meter + + def _cleanup(self): + pass + + def add_ant_sel_cb(self, ant_sel_cb): + self.ant_sel_cb_list.append(ant_sel_cb) + ant_sel_cb(self.tx_sel) + + def add_rssi_cb(self, rssi_cb): + self.rssi_cb_l.append(rssi_cb) + + def _stats_agg_by_freq(self, ant_stats): + stats_agg = {} + + for (((freq, mcs_index, bandwidth), ant_id), + (pkt_s, + rssi_min, rssi_avg, rssi_max, + snr_min, snr_avg, snr_max)) in ant_stats.items(): + + if ant_id not in stats_agg: + stats_agg[ant_id] = (pkt_s, + rssi_min, rssi_avg * pkt_s, rssi_max, + snr_min, snr_avg * pkt_s, snr_max) + else: + tmp = stats_agg[ant_id] + stats_agg[ant_id] = (pkt_s + tmp[0], + min(rssi_min, tmp[1]), + rssi_avg * pkt_s + tmp[2], + max(rssi_max, tmp[3]), + min(snr_min, tmp[4]), + snr_avg * pkt_s + tmp[5], + max(snr_max, tmp[6])) + + return dict((ant_id, (pkt_s, + rssi_min, rssi_avg // pkt_s, rssi_max, + snr_min, snr_avg // pkt_s, snr_max)) \ + for ant_id, (pkt_s, + rssi_min, rssi_avg, rssi_max, + snr_min, snr_avg, snr_max) in stats_agg.items()) + + def select_tx_antenna(self, stats_agg): + wlan_rssi_and_pkts = {} + max_pkts = 0 + + for wlan_id, grp in groupby(sorted(((ant_id >> 8), pkt_s, rssi_avg) \ + for ant_id, (pkt_s, + rssi_min, rssi_avg, rssi_max, + snr_min, snr_avg, snr_max) in stats_agg.items()), + lambda x: x[0]): + + grp = list(grp) + # Use max average rssi [dBm] from all wlan's antennas + # Use max packet counter per antenna from all wlan's antennas + rssi = max(rssi for _, pkt_s, rssi in grp) + pkts = max(pkt_s for _, pkt_s, rssi in grp) + max_pkts = max(pkts, max_pkts) + wlan_rssi_and_pkts[wlan_id] = (rssi, pkts) + + if not wlan_rssi_and_pkts: + return + + # Select antennas with near-maximum RX packet counters only + tx_sel_counter_thr = max_pkts - max(self.tx_sel_counter_abs_delta, max_pkts * self.tx_sel_counter_rel_delta) + wlans_with_max_pkts = set(wlan_id for wlan_id, (rssi, pkt_s) in wlan_rssi_and_pkts.items() if pkt_s >= tx_sel_counter_thr) + + if not wlans_with_max_pkts: + return + + new_max_rssi, new_tx_wlan = max((rssi, wlan_id) for wlan_id, (rssi, pkt_s) in wlan_rssi_and_pkts.items() if wlan_id in wlans_with_max_pkts) + cur_max_rssi = wlan_rssi_and_pkts.get(self.tx_sel, (-1000, 0))[0] + + if new_tx_wlan == self.tx_sel: + return + + if self.tx_sel in wlans_with_max_pkts and new_max_rssi - cur_max_rssi < self.tx_sel_rssi_delta: + # Already selected antenna with near-maximum RX packets counter + # and other antennas doesn't have significally large RSSI + return + + log.msg('Switch TX wlan %x -> %x, RSSI %d -> %d[dB]' % (self.tx_sel if self.tx_sel is not None else -1, + new_tx_wlan, cur_max_rssi, new_max_rssi)) + + for ant_sel_cb in self.ant_sel_cb_list: + try: + ant_sel_cb(new_tx_wlan) + except Exception: + log.err() + + self.tx_sel = new_tx_wlan + + def process_new_session(self, rx_id, session): + if self.logger is not None: + self.logger.send_stats(dict(type='new_session', + timestamp = time.time(), + id=rx_id, + **session)) + + def update_rx_stats(self, rx_id, packet_stats, ant_stats, session): + stats_agg = self._stats_agg_by_freq(ant_stats) + card_rssi_l = list(rssi_avg + for pkt_s, + rssi_min, rssi_avg, rssi_max, + snr_min, snr_avg, snr_max + in stats_agg.values()) + + if stats_agg and self.ant_sel_cb_list: + self.select_tx_antenna(stats_agg) + + if self.rssi_cb_l: + _idx = 0 if settings.common.mavlink_err_rate else 1 + flags = 0 + + if not card_rssi_l: + flags |= WFBFlags.LINK_LOST + + elif packet_stats['dec_err'][0] + packet_stats['bad'][0] > 0: + flags |= WFBFlags.LINK_JAMMED + + rx_errors = min(packet_stats['dec_err'][_idx] + packet_stats['bad'][_idx] + packet_stats['lost'][_idx], 65535) + rx_fec = min(packet_stats['fec_rec'][_idx], 65535) + mav_rssi = (max(card_rssi_l) if card_rssi_l else -128) % 256 + + for rssi_cb in self.rssi_cb_l: + try: + rssi_cb(rx_id, mav_rssi, rx_errors, rx_fec, flags) + except Exception: + log.err() + + if settings.common.debug: + log.msg('%s rssi %s TX %x %s %s' % (rx_id, max(card_rssi_l) if card_rssi_l else 'N/A', + self.tx_sel if self.tx_sel is not None else -1, packet_stats, ant_stats)) + + # Send stats to CLI sessions and logger + for s in self.ui_sessions: + s.send_stats(dict(type='rx', + timestamp = time.time(), + id=rx_id, tx_wlan=self.tx_sel, + packets=packet_stats, rx_ant_stats=ant_stats, + session=session)) + + def update_tx_stats(self, tx_id, packet_stats, ant_latency): + if settings.common.debug: + log.msg("%s %r %r" % (tx_id, packet_stats, ant_latency)) + + # Send stats to CLI sessions and logger + for s in self.ui_sessions: + rf_temperature = dict(self.rf_temp_meter.rf_temperature) if self.rf_temp_meter is not None else {} + s.send_stats(dict(type='tx', + timestamp = time.time(), + id=tx_id, + packets=packet_stats, + latency=ant_latency, + rf_temperature=rf_temperature)) + + + +class RXAntennaProtocol(LineReceiver): + delimiter = b'\n' + + """ + wfb_rx log parser + """ + + def __init__(self, ant_stat_cb, rx_id): + self.ant_stat_cb = ant_stat_cb + self.rx_id = rx_id + self.ant = {} + self.count_all = None + self.session = None + + def lineReceived(self, line): + line = line.decode('utf-8').strip() + cols = line.split('\t') + + try: + if len(cols) < 2: + raise BadTelemetry() + + #ts = int(cols[0]) + cmd = cols[1] + + if cmd == 'RX_ANT': + if len(cols) != 5: + raise BadTelemetry() + self.ant[(tuple(int(i) for i in cols[2].split(':')), int(cols[3], 16))] = tuple(int(i) for i in cols[4].split(':')) + + elif cmd == 'PKT': + if len(cols) != 3: + raise BadTelemetry() + + p_all, b_all, p_dec_err, p_dec_ok, p_fec_rec, p_lost, p_bad, p_outgoing, b_outgoing = list(int(i) for i in cols[2].split(':')) + + if not self.count_all: + self.count_all = (p_all, b_all, p_dec_ok, p_fec_rec, p_lost, p_dec_err, p_bad, p_outgoing, b_outgoing) + else: + self.count_all = tuple((a + b) for a, b in zip((p_all, b_all, p_dec_ok, p_fec_rec, p_lost, p_dec_err, p_bad, p_outgoing, b_outgoing), + self.count_all)) + + stats = dict(zip(('all', 'all_bytes', 'dec_ok', 'fec_rec', 'lost', 'dec_err', 'bad', 'out', 'out_bytes'), + zip((p_all, b_all, p_dec_ok, p_fec_rec, p_lost, p_dec_err, p_bad, p_outgoing, b_outgoing), + self.count_all))) + + # Send stats to aggregators + if self.ant_stat_cb is not None: + self.ant_stat_cb.update_rx_stats(self.rx_id, stats, dict(self.ant), self.session) + + self.ant.clear() + + elif cmd == 'SESSION': + if len(cols) != 3: + raise BadTelemetry() + + epoch, fec_type, fec_k, fec_n = list(int(i) for i in cols[2].split(':')) + self.session = dict(fec_type=fec_types.get(fec_type, 'Unknown'), fec_k=fec_k, fec_n=fec_n, epoch=epoch) + log.msg('New session detected [%s]: FEC=%s K=%d, N=%d, epoch=%d' % (self.rx_id, fec_types.get(fec_type, 'Unknown'), fec_k, fec_n, epoch)) + + if self.ant_stat_cb is not None: + self.ant_stat_cb.process_new_session(self.rx_id, self.session) + else: + raise BadTelemetry() + except BadTelemetry: + log.msg('Bad telemetry [%s]: %s' % (self.rx_id, line), isError=1) + + +class DbgProtocol(LineReceiver): + delimiter = b'\n' + + """ + stderr parser + """ + + def __init__(self, rx_id): + self.rx_id = rx_id + + def lineReceived(self, line): + log.msg('%s: %s' % (self.rx_id, line.decode('utf-8'))) + + + +class TXAntennaProtocol(LineReceiver): + delimiter = b'\n' + + def __init__(self, ant_stat_cb, tx_id, ports_df, control_port_df): + self.ant_stat_cb = ant_stat_cb + self.tx_id = tx_id + self.ports_df = ports_df + self.control_port_df = control_port_df + self.ports = {} + self.control_port = None + self.ant = {} + self.count_all = None + + def lineReceived(self, line): + cols = line.decode('utf-8').strip().split('\t') + if len(cols) < 2: + return + + #ts = int(cols[0]) + cmd = cols[1] + + if cmd == 'LISTEN_UDP' and len(cols) == 3: + port, wlan_id = cols[2].split(':', 1) + self.ports[int(wlan_id, 16)] = int(port) + + elif cmd == 'LISTEN_UDP_END' and self.ports_df is not None: + self.ports_df.callback(self.ports) + + elif cmd == 'LISTEN_UDP_CONTROL' and len(cols) == 3 and self.control_port_df is not None: + port = cols[2] + self.control_port = int(port) + self.control_port_df.callback(self.control_port) + + elif cmd == 'TX_ANT': + if len(cols) != 4: + raise BadTelemetry() + self.ant[int(cols[2], 16)] = tuple(int(i) for i in cols[3].split(':')) + + elif cmd == 'PKT': + if len(cols) != 3: + raise BadTelemetry() + + p_fec_timeouts, p_incoming, b_incoming, p_injected, b_injected, p_dropped, p_truncated = list(int(i) for i in cols[2].split(':')) + + if not self.count_all: + self.count_all = (p_fec_timeouts, p_incoming, b_incoming, p_injected, b_injected, p_dropped, p_truncated) + else: + self.count_all = tuple((a + b) for a, b in zip((p_fec_timeouts, p_incoming, b_incoming, p_injected, b_injected, p_dropped, p_truncated), + self.count_all)) + + stats = dict(zip(('fec_timeouts', 'incoming', 'incoming_bytes', 'injected', 'injected_bytes', 'dropped', 'truncated'), + zip((p_fec_timeouts, p_incoming, b_incoming, p_injected, b_injected, p_dropped, p_truncated), + self.count_all))) + + # Send stats to aggregators + if self.ant_stat_cb is not None: + self.ant_stat_cb.update_tx_stats(self.tx_id, stats, dict(self.ant)) + + self.ant.clear() + + +class RXProtocol(ProcessProtocol): + """ + manager for wfb_rx process + """ + + def __init__(self, ant_stat_cb, cmd, rx_id): + self.cmd = cmd + self.rx_id = rx_id + self.ant = RXAntennaProtocol(ant_stat_cb, rx_id) if ant_stat_cb else None + self.dbg = DbgProtocol(rx_id) + self.df = defer.Deferred() + + def connectionMade(self): + log.msg('Started %s' % (self.rx_id,)) + + def outReceived(self, data): + if self.ant is not None: + self.ant.dataReceived(data) + + def errReceived(self, data): + self.dbg.dataReceived(data) + + def processEnded(self, status): + rc = status.value.exitCode + log.msg('Stopped RX %s with code %s' % (self.rx_id, rc)) + + if rc == 0: + self.df.callback(str(status.value)) + else: + self.df.errback(status) + + def start(self): + df = defer.maybeDeferred(reactor.spawnProcess, self, self.cmd[0], self.cmd, env=os.environ, childFDs={0: "w", 1: "r", 2: "r"}) + return df.addCallback(lambda _: self.df) + + +class TXProtocol(ProcessProtocol): + """ + manager for wfb_tx process + """ + + def __init__(self, ant_stat_cb, cmd, tx_id, ports_df=None, control_port_df=None): + self.cmd = cmd + self.tx_id = tx_id + self.dbg = DbgProtocol(tx_id) + self.ports_df = ports_df + self.control_port_df = control_port_df + self.port_parser = TXAntennaProtocol(ant_stat_cb, tx_id, ports_df, control_port_df) + self.df = defer.Deferred() + + def connectionMade(self): + log.msg('Started %s' % (self.tx_id,)) + + def outReceived(self, data): + self.port_parser.dataReceived(data) + + def errReceived(self, data): + self.dbg.dataReceived(data) + + def processEnded(self, status): + rc = status.value.exitCode + log.msg('Stopped TX %s with code %s' % (self.tx_id, rc)) + + if self.ports_df is not None: + self.ports_df.cancel() + + if self.control_port_df is not None: + self.control_port_df.cancel() + + if rc == 0: + self.df.callback(str(status.value)) + else: + self.df.errback(status) + + def start(self): + df = defer.maybeDeferred(reactor.spawnProcess, self, self.cmd[0], self.cmd, env=os.environ, + childFDs={0: "w", 1: "r", 2: "r"}) + return df.addCallback(lambda _: self.df) + + +class SSHClientProtocol(ProcessProtocol): + """ + manager for wfb_tx process + """ + + def __init__(self, host, username, cmd, *cmd_args, stdin=None, key=None, port=22, use_agent=True): + self.host = host + self.username = username + self.cmd = cmd + self.cmd_args = cmd_args + self.stdin = stdin + self.key = key + self.port = 22 + self.use_agent = use_agent + self.dbg = DbgProtocol('ssh %s' % (host,)) + self.df = defer.Deferred() + + def connectionMade(self): + log.msg('Started ssh %s' % (self.host,)) + if self.stdin is not None: + self.transport.write(self.stdin.encode('utf-8')) + + def outReceived(self, data): + self.dbg.dataReceived(data) + + def errReceived(self, data): + self.dbg.dataReceived(data) + + def processEnded(self, status): + rc = status.value.exitCode + log.msg('Stopped ssh %s with code %s' % (self.host, rc)) + + if rc == 0: + self.df.callback(str(status.value)) + else: + self.df.errback(status) + + def start(self): + args = ['ssh', + '-o', 'StrictHostKeyChecking=no', + '-o', 'KbdInteractiveAuthentication=no', + '-o', 'PasswordAuthentication=no'] + + if self.stdin is None: + args += ['-n'] + + if self.key: + args += ['-i', self.key, + '-o', 'IdentitiesOnly=yes'] + + args += ['%s@%s' % (self.username, self.host), self.cmd] + list(self.cmd_args) + + env = dict(os.environ) + + if not self.use_agent: + env.pop('SSH_AUTH_SOCK', None) + + df = defer.maybeDeferred(reactor.spawnProcess, self, + args[0], args, env=env, + childFDs={0: "w", 1: "r", 2: "r"}) + + return df.addCallback(lambda _: self.df) diff --git a/wfb_ng/server.py b/wfb_ng/server.py index df805221..e401c123 100644 --- a/wfb_ng/server.py +++ b/wfb_ng/server.py @@ -21,43 +21,21 @@ import sys import msgpack import os -import re -import hashlib import time import struct import gzip +import argparse -from itertools import groupby from twisted.python import log, failure -from twisted.internet import reactor, defer, main as ti_main, threads, task -from twisted.internet.protocol import ProcessProtocol, Factory -from twisted.protocols.basic import LineReceiver, Int32StringReceiver -from twisted.internet.serialport import SerialPort +from twisted.internet import reactor, defer -from . import _log_msg, ConsoleObserver, ErrorSafeLogFile, call_and_check_rc, ExecError +from . import _log_msg, ConsoleObserver, ErrorSafeLogFile, call_and_check_rc, ExecError, version_msg from .common import abort_on_crash, exit_status, df_sleep -from .config_parser import Section -from .proxy import UDPProxyProtocol, MavlinkSerialProxyProtocol, MavlinkUDPProxyProtocol -from .mavlink_protocol import MavlinkARMProtocol, MavlinkTCPFactory, MavlinkLoggerProtocol -from .tuntap import TUNTAPProtocol, TUNTAPTransport +from .protocols import StatsAndSelectorFactory, RFTempMeter, SSHClientProtocol +from .services import parse_services, init_udp_direct_tx, init_udp_direct_rx, init_mavlink, init_tunnel, init_udp_proxy, hash_link_domain, bandwidth_map +from .cluster import parse_cluster_services, gen_cluster_scripts from .conf import settings, cfg_files -connect_re = re.compile(r'^connect://(?P[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+):(?P[0-9]+)$', re.IGNORECASE) -listen_re = re.compile(r'^listen://(?P[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+):(?P[0-9]+)$', re.IGNORECASE) -serial_re = re.compile(r'^serial:(?P[a-z0-9\-\_/]+):(?P[0-9]+)$', re.IGNORECASE) - - -class BadTelemetry(Exception): - pass - - -class WFBFlags(object): - LINK_LOST = 1 - LINK_JAMMED = 2 - - -fec_types = {1: 'VDM_RS'} - # Log format is gzipped sequence of int32 strings # For every run new file will be open to avoid framing errors @@ -80,488 +58,9 @@ def send_stats(self, data): self.write(b''.join((struct.pack('!I', len(data)), data))) -class StatisticsProtocol(Int32StringReceiver): - MAX_LENGTH = 1024 * 1024 - - def connectionMade(self): - self.sendString(msgpack.packb(dict(type='cli_title', cli_title=self.factory.cli_title))) - self.factory.ui_sessions.append(self) - - def stringReceived(self, string): - pass - - def connectionLost(self, reason): - self.factory.ui_sessions.remove(self) - - def send_stats(self, data): - self.sendString(msgpack.packb(data)) - - -class StatsAndSelectorFactory(Factory): - noisy = False - protocol = StatisticsProtocol - - """ - Aggregate RX stats and select TX antenna - """ - - def __init__(self, profile, wlans, link_domain, logger): - self.wlans = tuple(wlans) - self.ant_sel_cb_list = [] - self.rssi_cb_l = [] - - # Select antenna #0 by default - self.tx_sel = 0 - self.tx_sel_rssi_delta = settings.common.tx_sel_rssi_delta - self.tx_sel_counter_rel_delta = settings.common.tx_sel_counter_rel_delta - self.tx_sel_counter_abs_delta = settings.common.tx_sel_counter_abs_delta - - # tcp sockets for UI - self.ui_sessions = [] - - # machine-readable logger - self.logger = logger - - if logger is not None: - logger.send_stats(dict(type='init', - timestamp = time.time(), - version=settings.common.version, - profile=profile, - wlans=wlans, - link_domain=link_domain)) - - self.ui_sessions.append(logger) - - # CLI title - self.cli_title = 'WFB-ng_%s @%s %s [%s]' % (settings.common.version, profile, ', '.join(wlans), link_domain) - - # RF module temperature by rf_path - self.rf_temperature = {} - - self.lc = task.LoopingCall(self.read_temperature) - self.lc.start(settings.common.temp_measurement_interval, now=True) - - def _cleanup(self): - self.lc.stop() - - def read_temperature(self): - def _read_temperature(): - res = {} - for idx, wlan in enumerate(self.wlans): - fname = '/proc/net/rtl88x2eu/%s/thermal_state' % (wlan,) - try: - with open(fname) as fd: - for line in fd: - line = line.strip() - if not line: - continue - - d = {} - for f in line.split(','): - k, v = f.split(':', 1) - d[k.strip()] = int(v.strip()) - - ant_id = (idx << 8) + d['rf_path'] - res[ant_id] = d['temperature'] - except FileNotFoundError: - pass - except Exception as v: - reactor.callFromThread(log.err, v, 'Unable to parse %s:' % (fname,)) - return res - - def _got_temp(temp_d): - self.rf_temperature = temp_d - - return threads.deferToThread(_read_temperature).addCallback(_got_temp) - - - def add_ant_sel_cb(self, ant_sel_cb): - self.ant_sel_cb_list.append(ant_sel_cb) - ant_sel_cb(self.tx_sel) - - def add_rssi_cb(self, rssi_cb): - self.rssi_cb_l.append(rssi_cb) - - def _stats_agg_by_freq(self, ant_stats): - stats_agg = {} - - for (((freq, mcs_index, bandwidth), ant_id), - (pkt_s, - rssi_min, rssi_avg, rssi_max, - snr_min, snr_avg, snr_max)) in ant_stats.items(): - - if ant_id not in stats_agg: - stats_agg[ant_id] = (pkt_s, - rssi_min, rssi_avg * pkt_s, rssi_max, - snr_min, snr_avg * pkt_s, snr_max) - else: - tmp = stats_agg[ant_id] - stats_agg[ant_id] = (pkt_s + tmp[0], - min(rssi_min, tmp[1]), - rssi_avg * pkt_s + tmp[2], - max(rssi_max, tmp[3]), - min(snr_min, tmp[4]), - snr_avg * pkt_s + tmp[5], - max(snr_max, tmp[6])) - - return dict((ant_id, (pkt_s, - rssi_min, rssi_avg // pkt_s, rssi_max, - snr_min, snr_avg // pkt_s, snr_max)) \ - for ant_id, (pkt_s, - rssi_min, rssi_avg, rssi_max, - snr_min, snr_avg, snr_max) in stats_agg.items()) - - def select_tx_antenna(self, stats_agg): - wlan_rssi_and_pkts = {} - max_pkts = 0 - - for k, grp in groupby(sorted(((ant_id >> 8) & 0xff, pkt_s, rssi_avg) \ - for ant_id, (pkt_s, - rssi_min, rssi_avg, rssi_max, - snr_min, snr_avg, snr_max) in stats_agg.items()), - lambda x: x[0]): - - grp = list(grp) - # Use max average rssi [dBm] from all wlan's antennas - # Use max packet counter per antenna from all wlan's antennas - rssi = max(rssi for _, pkt_s, rssi in grp) - pkts = max(pkt_s for _, pkt_s, rssi in grp) - max_pkts = max(pkts, max_pkts) - wlan_rssi_and_pkts[k] = (rssi, pkts) - - if not wlan_rssi_and_pkts: - return - - # Select antennas with near-maximum RX packet counters only - tx_sel_counter_thr = max_pkts - max(self.tx_sel_counter_abs_delta, max_pkts * self.tx_sel_counter_rel_delta) - ants_with_max_pkts = set(idx for idx, (rssi, pkt_s) in wlan_rssi_and_pkts.items() if pkt_s >= tx_sel_counter_thr) - - if not ants_with_max_pkts: - return - - new_max_rssi, new_tx_ant = max((rssi, idx) for idx, (rssi, pkt_s) in wlan_rssi_and_pkts.items() if idx in ants_with_max_pkts) - cur_max_rssi = wlan_rssi_and_pkts.get(self.tx_sel, (-1000, 0))[0] - - if new_tx_ant == self.tx_sel: - return - - if self.tx_sel in ants_with_max_pkts and new_max_rssi - cur_max_rssi < self.tx_sel_rssi_delta: - # Already selected antenna with near-maximum RX packets counter - # and other antennas doesn't have significally large RSSI - return - - log.msg('Switch TX antenna #%d -> #%d, RSSI %d -> %d[dB]' % (self.tx_sel, new_tx_ant, cur_max_rssi, new_max_rssi)) - - for ant_sel_cb in self.ant_sel_cb_list: - try: - ant_sel_cb(new_tx_ant) - except Exception: - log.err() - - self.tx_sel = new_tx_ant - - def process_new_session(self, rx_id, session): - if self.logger is not None: - self.logger.send_stats(dict(type='new_session', - timestamp = time.time(), - id=rx_id, - **session)) - - def update_rx_stats(self, rx_id, packet_stats, ant_stats, session): - stats_agg = self._stats_agg_by_freq(ant_stats) - card_rssi_l = list(rssi_avg - for pkt_s, - rssi_min, rssi_avg, rssi_max, - snr_min, snr_avg, snr_max - in stats_agg.values()) - - if stats_agg and self.ant_sel_cb_list: - self.select_tx_antenna(stats_agg) - - if self.rssi_cb_l: - _idx = 0 if settings.common.mavlink_err_rate else 1 - flags = 0 - - if not card_rssi_l: - flags |= WFBFlags.LINK_LOST - - elif packet_stats['dec_err'][0] + packet_stats['bad'][0] > 0: - flags |= WFBFlags.LINK_JAMMED - - rx_errors = min(packet_stats['dec_err'][_idx] + packet_stats['bad'][_idx] + packet_stats['lost'][_idx], 65535) - rx_fec = min(packet_stats['fec_rec'][_idx], 65535) - mav_rssi = (max(card_rssi_l) if card_rssi_l else -128) % 256 - - for rssi_cb in self.rssi_cb_l: - try: - rssi_cb(rx_id, mav_rssi, rx_errors, rx_fec, flags) - except Exception: - log.err() - - if settings.common.debug: - log.msg('%s rssi %s tx#%d %s %s' % (rx_id, max(card_rssi_l) if card_rssi_l else 'N/A', self.tx_sel, packet_stats, ant_stats)) - - # Send stats to CLI sessions and logger - for s in self.ui_sessions: - s.send_stats(dict(type='rx', - timestamp = time.time(), - id=rx_id, tx_ant=self.tx_sel, - packets=packet_stats, rx_ant_stats=ant_stats, - session=session)) - - def update_tx_stats(self, tx_id, packet_stats, ant_latency): - if settings.common.debug: - log.msg("%s %r %r" % (tx_id, packet_stats, ant_latency)) - - # Send stats to CLI sessions and logger - for s in self.ui_sessions: - s.send_stats(dict(type='tx', - timestamp = time.time(), - id=tx_id, - packets=packet_stats, - latency=ant_latency, - rf_temperature=self.rf_temperature)) - - - -class RXAntennaProtocol(LineReceiver): - delimiter = b'\n' - - """ - wfb_rx log parser - """ - - def __init__(self, ant_stat_cb, rx_id): - self.ant_stat_cb = ant_stat_cb - self.rx_id = rx_id - self.ant = {} - self.count_all = None - self.session = None - - def lineReceived(self, line): - line = line.decode('utf-8').strip() - cols = line.split('\t') - - try: - if len(cols) < 2: - raise BadTelemetry() - - #ts = int(cols[0]) - cmd = cols[1] - - if cmd == 'RX_ANT': - if len(cols) != 5: - raise BadTelemetry() - self.ant[(tuple(int(i) for i in cols[2].split(':')), int(cols[3], 16))] = tuple(int(i) for i in cols[4].split(':')) - - elif cmd == 'PKT': - if len(cols) != 3: - raise BadTelemetry() - - p_all, b_all, p_dec_err, p_dec_ok, p_fec_rec, p_lost, p_bad, p_outgoing, b_outgoing = list(int(i) for i in cols[2].split(':')) - - if not self.count_all: - self.count_all = (p_all, b_all, p_dec_ok, p_fec_rec, p_lost, p_dec_err, p_bad, p_outgoing, b_outgoing) - else: - self.count_all = tuple((a + b) for a, b in zip((p_all, b_all, p_dec_ok, p_fec_rec, p_lost, p_dec_err, p_bad, p_outgoing, b_outgoing), - self.count_all)) - - stats = dict(zip(('all', 'all_bytes', 'dec_ok', 'fec_rec', 'lost', 'dec_err', 'bad', 'out', 'out_bytes'), - zip((p_all, b_all, p_dec_ok, p_fec_rec, p_lost, p_dec_err, p_bad, p_outgoing, b_outgoing), - self.count_all))) - - # Send stats to aggregators - if self.ant_stat_cb is not None: - self.ant_stat_cb.update_rx_stats(self.rx_id, stats, dict(self.ant), self.session) - - self.ant.clear() - - elif cmd == 'SESSION': - if len(cols) != 3: - raise BadTelemetry() - - epoch, fec_type, fec_k, fec_n = list(int(i) for i in cols[2].split(':')) - self.session = dict(fec_type=fec_types.get(fec_type, 'Unknown'), fec_k=fec_k, fec_n=fec_n, epoch=epoch) - log.msg('New session detected [%s]: FEC=%s K=%d, N=%d, epoch=%d' % (self.rx_id, fec_types.get(fec_type, 'Unknown'), fec_k, fec_n, epoch)) - - if self.ant_stat_cb is not None: - self.ant_stat_cb.process_new_session(self.rx_id, self.session) - else: - raise BadTelemetry() - except BadTelemetry: - log.msg('Bad telemetry [%s]: %s' % (self.rx_id, line), isError=1) - - -class DbgProtocol(LineReceiver): - delimiter = b'\n' - - """ - stderr parser - """ - - def __init__(self, rx_id): - self.rx_id = rx_id - - def lineReceived(self, line): - log.msg('%s: %s' % (self.rx_id, line.decode('utf-8'))) - - - -class TXAntennaProtocol(LineReceiver): - delimiter = b'\n' - - def __init__(self, ant_stat_cb, tx_id, ports_df, control_port_df): - self.ant_stat_cb = ant_stat_cb - self.tx_id = tx_id - self.ports_df = ports_df - self.control_port_df = control_port_df - self.ports = {} - self.control_port = None - self.ant = {} - self.count_all = None - - def lineReceived(self, line): - cols = line.decode('utf-8').strip().split('\t') - if len(cols) < 2: - return - - #ts = int(cols[0]) - cmd = cols[1] - - if cmd == 'LISTEN_UDP' and len(cols) == 3: - port, wlan = cols[2].split(':', 1) - self.ports[wlan] = int(port) - - elif cmd == 'LISTEN_UDP_END' and self.ports_df is not None: - self.ports_df.callback(self.ports) - - elif cmd == 'LISTEN_UDP_CONTROL' and len(cols) == 3 and self.control_port_df is not None: - port = cols[2] - self.control_port = int(port) - self.control_port_df.callback(self.control_port) - - elif cmd == 'TX_ANT': - if len(cols) != 4: - raise BadTelemetry() - self.ant[int(cols[2], 16)] = tuple(int(i) for i in cols[3].split(':')) - - elif cmd == 'PKT': - if len(cols) != 3: - raise BadTelemetry() - - p_fec_timeouts, p_incoming, b_incoming, p_injected, b_injected, p_dropped, p_truncated = list(int(i) for i in cols[2].split(':')) - - if not self.count_all: - self.count_all = (p_fec_timeouts, p_incoming, b_incoming, p_injected, b_injected, p_dropped, p_truncated) - else: - self.count_all = tuple((a + b) for a, b in zip((p_fec_timeouts, p_incoming, b_incoming, p_injected, b_injected, p_dropped, p_truncated), - self.count_all)) - - stats = dict(zip(('fec_timeouts', 'incoming', 'incoming_bytes', 'injected', 'injected_bytes', 'dropped', 'truncated'), - zip((p_fec_timeouts, p_incoming, b_incoming, p_injected, b_injected, p_dropped, p_truncated), - self.count_all))) - - # Send stats to aggregators - if self.ant_stat_cb is not None: - self.ant_stat_cb.update_tx_stats(self.tx_id, stats, dict(self.ant)) - - self.ant.clear() - - -class RXProtocol(ProcessProtocol): - """ - manager for wfb_rx process - """ - - def __init__(self, ant_stat_cb, cmd, rx_id): - self.cmd = cmd - self.rx_id = rx_id - self.ant = RXAntennaProtocol(ant_stat_cb, rx_id) if ant_stat_cb else None - self.dbg = DbgProtocol(rx_id) - self.df = defer.Deferred() - - def connectionMade(self): - log.msg('Started %s' % (self.rx_id,)) - - def outReceived(self, data): - if self.ant is not None: - self.ant.dataReceived(data) - - def errReceived(self, data): - self.dbg.dataReceived(data) - - def processEnded(self, status): - rc = status.value.exitCode - log.msg('Stopped RX %s with code %s' % (self.rx_id, rc)) - - if rc == 0: - self.df.callback(str(status.value)) - else: - self.df.errback(status) - - def start(self): - df = defer.maybeDeferred(reactor.spawnProcess, self, self.cmd[0], self.cmd, env=os.environ, childFDs={0: "w", 1: "r", 2: "r"}) - return df.addCallback(lambda _: self.df) - - -class TXProtocol(ProcessProtocol): - """ - manager for wfb_tx process - """ - - def __init__(self, ant_stat_cb, cmd, tx_id, ports_df=None, control_port_df=None): - self.cmd = cmd - self.tx_id = tx_id - self.dbg = DbgProtocol(tx_id) - self.ports_df = ports_df - self.control_port_df = control_port_df - self.port_parser = TXAntennaProtocol(ant_stat_cb, tx_id, ports_df, control_port_df) - self.df = defer.Deferred() - - def connectionMade(self): - log.msg('Started %s' % (self.tx_id,)) - - def outReceived(self, data): - self.port_parser.dataReceived(data) - - def errReceived(self, data): - self.dbg.dataReceived(data) - - def processEnded(self, status): - rc = status.value.exitCode - log.msg('Stopped TX %s with code %s' % (self.tx_id, rc)) - - if self.ports_df is not None: - self.ports_df.cancel() - - if self.control_port_df is not None: - self.control_port_df.cancel() - - if rc == 0: - self.df.callback(str(status.value)) - else: - self.df.errback(status) - - def start(self): - df = defer.maybeDeferred(reactor.spawnProcess, self, self.cmd[0], self.cmd, env=os.environ, - childFDs={0: "w", 1: "r", 2: "r"}) - return df.addCallback(lambda _: self.df) - - @defer.inlineCallbacks def init_wlans(max_bw, wlans): - if max_bw == 10: - ht_mode = '10MHz' - elif max_bw == 20: - ht_mode = 'HT20' - elif max_bw == 40: - ht_mode = 'HT40+' - elif max_bw == 80: - ht_mode = '80MHz' - elif max_bw == 160: - ht_mode = '160MHz' - else: - raise Exception('Unsupported bandwidth %d MHz' % (max_bw,)) + ht_mode = bandwidth_map[max_bw] if not settings.common.primary: log.msg('Skip card init due to secondary role') @@ -599,50 +98,54 @@ def init_wlans(max_bw, wlans): raise -def parse_services(profile_name): - res = [] - for stream in getattr(settings, profile_name).streams: - cfg = Section() - stream = dict(stream) - name = stream.pop('name') - service_type = stream.pop('service_type') - - for profile in stream.pop('profiles'): - cfg.__dict__.update(getattr(settings, profile).__dict__) - - cfg.__dict__.update(stream) - res.append((name, service_type, cfg)) - - return res - - @defer.inlineCallbacks -def init(profiles, wlans): +def init(profiles, wlans, cluster_mode): type_map = dict(udp_direct_rx=init_udp_direct_rx, udp_direct_tx=init_udp_direct_tx, mavlink=init_mavlink, tunnel=init_tunnel, udp_proxy=init_udp_proxy) - services = list((profile, parse_services(profile)) for profile in profiles) - max_bw = max(cfg.bandwidth for _, tmp in services for _, _, cfg in tmp) + dl = [] + is_cluster = bool(cluster_mode) + + if is_cluster: + services, cluster_nodes = parse_cluster_services(profiles) + if cluster_mode == 'ssh': + for node, setup_script in gen_cluster_scripts(cluster_nodes).items(): + dl.append(SSHClientProtocol(node, settings.cluster.ssh_user, + '/bin/bash', '-x', + key=settings.cluster.ssh_key, + port=settings.cluster.ssh_port, + use_agent=settings.cluster.ssh_key is None, + stdin=setup_script).start()) + else: + services = list((profile, parse_services(profile, None)) for profile in profiles) + # Do cards init + if not wlans: + raise Exception('WiFi interface list is empty!') - # Do cards init - yield init_wlans(max_bw, wlans) + max_bw = max(cfg.bandwidth for _, tmp in services for _, _, cfg in tmp) + yield init_wlans(max_bw, wlans) - dl = [] sockets = [] - ant_sel_l = [] + cleanup_l = [] def _cleanup(x): for s in sockets: s.stopListening() - for f in ant_sel_l: + for f in cleanup_l: f._cleanup() return x + if not is_cluster: + rf_temp_meter = RFTempMeter(wlans, settings.common.temp_measurement_interval) + cleanup_l.append(rf_temp_meter) + else: + rf_temp_meter = None + for profile, service_list in services: # Domain wide antenna selector profile_cfg = getattr(settings, profile) @@ -650,445 +153,60 @@ def _cleanup(x): if settings.common.binary_log_file is not None: logger = BinLogger(settings.common.binary_log_file % (profile,), settings.path.log_dir) + + logger.send_stats(dict(type='init', + timestamp = time.time(), + version=settings.common.version, + profile=profile, + wlans=None if is_cluster else wlans, + link_domain=profile_cfg.link_domain)) else: logger = None - ant_sel_f = StatsAndSelectorFactory(profile, wlans, profile_cfg.link_domain, logger) - ant_sel_l.append(ant_sel_f) - link_id = int.from_bytes(hashlib.sha1(profile_cfg.link_domain.encode('utf-8')).digest()[:3], 'big') + cli_title = 'WFB-ng_%s @%s %s [%s]' % (settings.common.version, profile, + 'cluster' if is_cluster else ', '.join(wlans), + profile_cfg.link_domain) + + ant_sel_f = StatsAndSelectorFactory(logger, cli_title, rf_temp_meter, is_cluster) + cleanup_l.append(ant_sel_f) + + link_id = hash_link_domain(profile_cfg.link_domain) if profile_cfg.stats_port: sockets.append(reactor.listenTCP(profile_cfg.stats_port, ant_sel_f)) for service_name, service_type, srv_cfg in service_list: - log.msg('Starting %s/%s@%s on %s' % (profile, service_name, profile_cfg.link_domain, ', '.join(wlans))) - dl.append(defer.maybeDeferred(type_map[service_type], service_name, srv_cfg, wlans, link_id, ant_sel_f)) + log.msg('Starting %s/%s@%s' % (profile, service_name, profile_cfg.link_domain)) + dl.append(defer.maybeDeferred(type_map[service_type], service_name, srv_cfg, + srv_cfg.udp_peers_auto if is_cluster else wlans, + link_id, ant_sel_f, is_cluster)) yield defer.gatherResults(dl, consumeErrors=True).addBoth(_cleanup).addErrback(lambda f: f.trap(defer.FirstError) and f.value.subFailure) -@defer.inlineCallbacks -def init_udp_direct_tx(service_name, cfg, wlans, link_id, ant_sel_f): - if not listen_re.match(cfg.peer): - raise Exception('%s: unsupported peer address: %s' % (service_name, cfg.peer)) - - m = listen_re.match(cfg.peer) - listen = m.group('addr'), int(m.group('port')) - log.msg('Listen for %s stream %d on %s:%d' % (service_name, cfg.stream_tx, listen[0], listen[1])) - - cmd = ('%(cmd)s -f %(frame_type)s -p %(stream)d -u %(port)d -K %(key)s '\ - '-B %(bw)d -G %(gi)s -S %(stbc)d -L %(ldpc)d -M %(mcs)d'\ - '%(mirror)s%(force_vht)s%(qdisc)s '\ - '-k %(fec_k)d -n %(fec_n)d -T %(fec_timeout)d -F %(fec_delay)d -i %(link_id)d -R %(rcv_buf_size)d -C %(control_port)d' % \ - dict(cmd=os.path.join(settings.path.bin_dir, 'wfb_tx'), - frame_type=cfg.frame_type, - stream=cfg.stream_tx, - port=listen[1], - control_port = cfg.control_port, - key=os.path.join(settings.path.conf_dir, cfg.keypair), - bw=cfg.bandwidth, - force_vht=' -V' if cfg.force_vht else '', - qdisc=' -Q -P %d' % (cfg.fwmark,) if cfg.use_qdisc else '', - gi="short" if cfg.short_gi else "long", - stbc=cfg.stbc, - ldpc=cfg.ldpc, - mcs=cfg.mcs_index, - mirror=' -m' if cfg.mirror else '', - fec_k=cfg.fec_k, - fec_n=cfg.fec_n, - fec_timeout=cfg.fec_timeout, - fec_delay=cfg.fec_delay, - link_id=link_id, - rcv_buf_size=settings.common.tx_rcv_buf_size) - ).split() + wlans[0:(None if cfg.mirror else 1)] - - # Direct udp doesn't support TX diversity - only first card will be used. - # But if mirror mode is enabled it will use all cards. - - if not cfg.mirror and len(wlans) > 1: - raise Exception("udp_direct_tx doesn't supports diversity but multiple cards selected. Use udp_proxy for such case.") - - control_port_df = defer.Deferred() if cfg.control_port == 0 else None - df = TXProtocol(ant_sel_f, cmd, 'video tx', control_port_df=control_port_df).start() - log.msg('%s: %s' % (service_name, ' '.join(cmd),)) - - control_port = cfg.control_port - - if control_port == 0: - control_port = yield control_port_df - - log.msg('%s use wfb_tx control_port %d' % (service_name, control_port)) - - yield df - - -def init_udp_direct_rx(service_name, cfg, wlans, link_id, ant_sel_f): - if not connect_re.match(cfg.peer): - raise Exception('%s: unsupported peer address: %s' % (service_name, cfg.peer)) - - m = connect_re.match(cfg.peer) - connect = m.group('addr'), int(m.group('port')) - log.msg('Send %s stream %d to %s:%d' % (service_name, cfg.stream_rx, connect[0], connect[1])) - - cmd = ('%(cmd)s -p %(stream)d -c %(ip_addr)s -u %(port)d -K %(key)s -i %(link_id)d' % \ - dict(cmd=os.path.join(settings.path.bin_dir, 'wfb_rx'), - stream=cfg.stream_rx, - ip_addr=connect[0], - port=connect[1], - key=os.path.join(settings.path.conf_dir, cfg.keypair), - link_id=link_id)).split() + wlans - - df = RXProtocol(ant_sel_f, cmd, '%s rx' % (service_name,)).start() - - log.msg('%s: %s' % (service_name, ' '.join(cmd),)) - return df - - -@defer.inlineCallbacks -def init_mavlink(service_name, cfg, wlans, link_id, ant_sel_f): - listen = None - connect = None - serial = None - osd_peer = None - - if connect_re.match(cfg.peer): - m = connect_re.match(cfg.peer) - connect = m.group('addr'), int(m.group('port')) - log.msg('Connect %s stream %d(RX), %d(TX) to %s:%d' % (service_name, cfg.stream_rx, cfg.stream_tx, connect[0], connect[1])) - - elif listen_re.match(cfg.peer): - m = listen_re.match(cfg.peer) - listen = m.group('addr'), int(m.group('port')) - log.msg('Listen for %s stream %d(RX), %d(TX) on %s:%d' % (service_name, cfg.stream_rx, cfg.stream_tx, listen[0], listen[1])) - - elif serial_re.match(cfg.peer): - m = serial_re.match(cfg.peer) - serial = m.group('dev'), int(m.group('baud')) - log.msg('Open serial port %s on speed %d' % (serial[0], serial[1])) - - else: - raise Exception('Unsupported peer address: %s' % (cfg.peer,)) - - if cfg.osd is not None and connect_re.match(cfg.osd): - m = connect_re.match(cfg.osd) - osd_peer = m.group('addr'), int(m.group('port')) - log.msg('Mirror %s stream to OSD at %s:%d' % (service_name, osd_peer[0], osd_peer[1])) - - rx_hooks = [] - tx_hooks = [] - - if cfg.call_on_arm or cfg.call_on_disarm: - arm_proto = MavlinkARMProtocol(cfg.call_on_arm, cfg.call_on_disarm) - rx_hooks.append(arm_proto.dataReceived) - tx_hooks.append(arm_proto.dataReceived) - - if cfg.log_messages and ant_sel_f.logger is not None: - mav_log_proto = MavlinkLoggerProtocol(ant_sel_f.logger) - rx_hooks.append(mav_log_proto.dataReceived) - tx_hooks.append(mav_log_proto.dataReceived) - - if serial: - p_in = MavlinkSerialProxyProtocol(agg_max_size=settings.common.radio_mtu, - agg_timeout=settings.common.mavlink_agg_timeout, - inject_rssi=cfg.inject_rssi, - mavlink_sys_id=cfg.mavlink_sys_id, - mavlink_comp_id=cfg.mavlink_comp_id, - rx_hooks=rx_hooks, tx_hooks=tx_hooks) - else: - # The first argument is not None only if we initiate mavlink connection - p_in = MavlinkUDPProxyProtocol(connect, agg_max_size=settings.common.radio_mtu, - agg_timeout=settings.common.mavlink_agg_timeout, - inject_rssi=cfg.inject_rssi, - mirror=osd_peer, - mavlink_sys_id=cfg.mavlink_sys_id, - mavlink_comp_id=cfg.mavlink_comp_id, - rx_hooks=rx_hooks, tx_hooks=tx_hooks) - - p_rx = UDPProxyProtocol() - p_rx.peer = p_in - - rx_socket = reactor.listenUDP(0, p_rx) - sockets = [rx_socket] - - cmd_rx = ('%(cmd)s -p %(stream)d -u %(port)d -K %(key)s -i %(link_id)d' % \ - dict(cmd=os.path.join(settings.path.bin_dir, 'wfb_rx'), - stream=cfg.stream_rx, - port=rx_socket.getHost().port, - key=os.path.join(settings.path.conf_dir, cfg.keypair), - link_id=link_id)).split() + wlans - - cmd_tx = ('%(cmd)s -f %(frame_type)s -p %(stream)d -u %(port)d -K %(key)s -B %(bw)d '\ - '-G %(gi)s -S %(stbc)d -L %(ldpc)d -M %(mcs)d'\ - '%(mirror)s%(force_vht)s%(qdisc)s '\ - '-k %(fec_k)d -n %(fec_n)d -T %(fec_timeout)d -F %(fec_delay)d -i %(link_id)d -R %(rcv_buf_size)d -C %(control_port)d' % \ - dict(cmd=os.path.join(settings.path.bin_dir, 'wfb_tx'), - frame_type=cfg.frame_type, - stream=cfg.stream_tx, - port=0, - control_port=cfg.control_port, - key=os.path.join(settings.path.conf_dir, cfg.keypair), - bw=cfg.bandwidth, - force_vht=' -V' if cfg.force_vht else '', - qdisc=' -Q -P %d' % (cfg.fwmark,) if cfg.use_qdisc else '', - gi="short" if cfg.short_gi else "long", - stbc=cfg.stbc, - ldpc=cfg.ldpc, - mcs=cfg.mcs_index, - mirror=' -m' if cfg.mirror else '', - fec_k=cfg.fec_k, - fec_n=cfg.fec_n, - fec_timeout=cfg.fec_timeout, - fec_delay=cfg.fec_delay, - link_id=link_id, - rcv_buf_size=settings.common.tx_rcv_buf_size)).split() + wlans - - log.msg('%s RX: %s' % (service_name, ' '.join(cmd_rx))) - log.msg('%s TX: %s' % (service_name, ' '.join(cmd_tx))) - - # Setup mavlink TCP proxy - if cfg.mavlink_tcp_port: - mav_tcp_f = MavlinkTCPFactory(p_in) - p_in.rx_hooks.append(mav_tcp_f.write) - reactor.listenTCP(cfg.mavlink_tcp_port, mav_tcp_f) - - tx_ports_df = defer.Deferred() - control_port_df = defer.Deferred() if cfg.control_port == 0 else None - - dl = [TXProtocol(ant_sel_f, cmd_tx, '%s tx' % (service_name,), tx_ports_df, control_port_df).start()] - - # Wait while wfb_tx allocates ephemeral udp ports and reports them back - tx_ports = yield tx_ports_df - control_port = cfg.control_port - - if control_port == 0: - control_port = yield control_port_df - - log.msg('%s use wfb_tx ports %s, control_port %d' % (service_name, tx_ports, control_port)) - - p_tx_l = [UDPProxyProtocol(('127.0.0.1', tx_ports[wlan])) for wlan in wlans] - - if serial: - serial_port = SerialPort(p_in, os.path.join('/dev', serial[0]), reactor, baudrate=serial[1]) - serial_port._serial.exclusive = True - - else: - serial_port = None - sockets += [ reactor.listenUDP(listen[1] if listen else 0, p_in) ] - - sockets += [ reactor.listenUDP(0, p_tx) for p_tx in p_tx_l ] - - def ant_sel_cb(ant_idx): - p_in.peer = p_tx_l[ant_idx] - ant_sel_f.add_ant_sel_cb(ant_sel_cb) - - # Report RSSI to OSD - ant_sel_f.add_rssi_cb(p_in.send_rssi) - - dl.append(RXProtocol(ant_sel_f, cmd_rx, '%s rx' % (service_name,)).start()) - - def _cleanup(x): - if serial_port is not None: - serial_port.loseConnection() - serial_port.connectionLost(failure.Failure(ti_main.CONNECTION_DONE)) - - for s in sockets: - s.stopListening() - - return x - - yield defer.gatherResults(dl, consumeErrors=True).addBoth(_cleanup)\ - .addErrback(lambda f: f.trap(defer.FirstError) and f.value.subFailure) - - -@defer.inlineCallbacks -def init_tunnel(service_name, cfg, wlans, link_id, ant_sel_f): - p_in = TUNTAPProtocol(mtu=settings.common.radio_mtu, - agg_timeout=settings.common.tunnel_agg_timeout) - - p_rx = UDPProxyProtocol() - p_rx.peer = p_in - - rx_socket = reactor.listenUDP(0, p_rx) - sockets = [rx_socket] - - cmd_rx = ('%(cmd)s -p %(stream)d -u %(port)d -K %(key)s -i %(link_id)d' % \ - dict(cmd=os.path.join(settings.path.bin_dir, 'wfb_rx'), - stream=cfg.stream_rx, - port=rx_socket.getHost().port, - key=os.path.join(settings.path.conf_dir, cfg.keypair), - link_id=link_id)).split() + wlans - - cmd_tx = ('%(cmd)s -f %(frame_type)s -p %(stream)d -u %(port)d -K %(key)s -B %(bw)d -G %(gi)s '\ - '-S %(stbc)d -L %(ldpc)d -M %(mcs)d'\ - '%(mirror)s%(force_vht)s%(qdisc)s '\ - '-k %(fec_k)d -n %(fec_n)d -T %(fec_timeout)d -F %(fec_delay)d -i %(link_id)d -R %(rcv_buf_size)d -C %(control_port)d' % \ - dict(cmd=os.path.join(settings.path.bin_dir, 'wfb_tx'), - frame_type=cfg.frame_type, - stream=cfg.stream_tx, - port=0, - control_port=cfg.control_port, - key=os.path.join(settings.path.conf_dir, cfg.keypair), - bw=cfg.bandwidth, - force_vht=' -V' if cfg.force_vht else '', - qdisc=' -Q -P %d' % (cfg.fwmark,) if cfg.use_qdisc else '', - gi="short" if cfg.short_gi else "long", - stbc=cfg.stbc, - ldpc=cfg.ldpc, - mcs=cfg.mcs_index, - mirror=' -m' if cfg.mirror else '', - fec_k=cfg.fec_k, - fec_n=cfg.fec_n, - fec_timeout=cfg.fec_timeout, - fec_delay=cfg.fec_delay, - link_id=link_id, - rcv_buf_size=settings.common.tx_rcv_buf_size)).split() + wlans - - log.msg('%s RX: %s' % (service_name, ' '.join(cmd_rx))) - log.msg('%s TX: %s' % (service_name, ' '.join(cmd_tx),)) - - tx_ports_df = defer.Deferred() - control_port_df = defer.Deferred() if cfg.control_port == 0 else None - - dl = [TXProtocol(ant_sel_f, cmd_tx, '%s tx' % (service_name,), tx_ports_df, control_port_df).start()] - - # Wait while wfb_tx allocates ephemeral udp ports and reports them back - tx_ports = yield tx_ports_df - control_port = cfg.control_port - - if control_port == 0: - control_port = yield control_port_df - - log.msg('%s use wfb_tx ports %s, control_port %d' % (service_name, tx_ports, control_port)) - - p_tx_l = [UDPProxyProtocol(('127.0.0.1', tx_ports[wlan])) for wlan in wlans] - - tun_ep = TUNTAPTransport(reactor, p_in, cfg.ifname, cfg.ifaddr, mtu=settings.common.radio_mtu, default_route=cfg.default_route) - - sockets += [ reactor.listenUDP(0, p_tx) for p_tx in p_tx_l ] - - def ant_sel_cb(ant_idx): - p_in.peer = p_tx_l[ant_idx] - - # Broadcast keepalive message to all cards, not to active one - # This allow to use direct antennas on both ends and/or differenct frequencies. - # But when mirroring enabled it will be done by wfb_tx itself - - if cfg.mirror: - p_in.all_peers = [p_tx_l[0]] - else: - p_in.all_peers = p_tx_l - - ant_sel_f.add_ant_sel_cb(ant_sel_cb) - - dl.append(RXProtocol(ant_sel_f, cmd_rx, '%s rx' % (service_name,)).start()) - - def _cleanup(x): - tun_ep.loseConnection() - for s in sockets: - s.stopListening() - return x - - yield defer.gatherResults(dl, consumeErrors=True).addBoth(_cleanup)\ - .addErrback(lambda f: f.trap(defer.FirstError) and f.value.subFailure) - - -@defer.inlineCallbacks -def init_udp_proxy(service_name, cfg, wlans, link_id, ant_sel_f): - listen = None - connect = None - - if connect_re.match(cfg.peer): - m = connect_re.match(cfg.peer) - connect = m.group('addr'), int(m.group('port')) - log.msg('Connect %s stream %s(RX), %s(TX) to %s:%d' % (service_name, cfg.stream_rx, cfg.stream_tx, connect[0], connect[1])) - - elif listen_re.match(cfg.peer): - m = listen_re.match(cfg.peer) - listen = m.group('addr'), int(m.group('port')) - log.msg('Listen for %s stream %s(RX), %s(TX) on %s:%d' % (service_name, cfg.stream_rx, cfg.stream_tx, listen[0], listen[1])) - - else: - raise Exception('Unsupported peer address: %s' % (cfg.peer,)) - - # The first argument is not None only if we initiate mavlink connection - p_in = UDPProxyProtocol(connect) - sockets = [reactor.listenUDP(listen[1] if listen else 0, p_in)] - dl = [] - - if cfg.stream_rx is not None: - p_rx = UDPProxyProtocol() - p_rx.peer = p_in - rx_socket = reactor.listenUDP(0, p_rx) - sockets = [rx_socket] - cmd_rx = ('%(cmd)s -p %(stream)d -u %(port)d -K %(key)s -i %(link_id)d' % \ - dict(cmd=os.path.join(settings.path.bin_dir, 'wfb_rx'), - stream=cfg.stream_rx, - port=rx_socket.getHost().port, - key=os.path.join(settings.path.conf_dir, cfg.keypair), - link_id=link_id)).split() + wlans - log.msg('%s RX: %s' % (service_name, ' '.join(cmd_rx))) - dl.append(RXProtocol(ant_sel_f, cmd_rx, '%s rx' % (service_name,)).start()) - - if cfg.stream_tx is not None: - cmd_tx = ('%(cmd)s -f %(frame_type)s -p %(stream)d -u %(port)d -K %(key)s -B %(bw)d '\ - '-G %(gi)s -S %(stbc)d -L %(ldpc)d -M %(mcs)d'\ - '%(mirror)s%(force_vht)s%(qdisc)s '\ - '-k %(fec_k)d -n %(fec_n)d -T %(fec_timeout)d -F %(fec_delay)d -i %(link_id)d -R %(rcv_buf_size)d -C %(control_port)d' % \ - dict(cmd=os.path.join(settings.path.bin_dir, 'wfb_tx'), - frame_type=cfg.frame_type, - stream=cfg.stream_tx, - port=0, - control_port=cfg.control_port, - key=os.path.join(settings.path.conf_dir, cfg.keypair), - bw=cfg.bandwidth, - force_vht=' -V' if cfg.force_vht else '', - qdisc=' -Q -P %d' % (cfg.fwmark,) if cfg.use_qdisc else '', - gi="short" if cfg.short_gi else "long", - stbc=cfg.stbc, - ldpc=cfg.ldpc, - mcs=cfg.mcs_index, - mirror=' -m' if cfg.mirror else '', - fec_k=cfg.fec_k, - fec_n=cfg.fec_n, - fec_timeout=cfg.fec_timeout, - fec_delay=cfg.fec_delay, - link_id=link_id, - rcv_buf_size=settings.common.tx_rcv_buf_size)).split() + wlans - log.msg('%s TX: %s' % (service_name, ' '.join(cmd_tx))) - - tx_ports_df = defer.Deferred() - control_port_df = defer.Deferred() if cfg.control_port == 0 else None - - dl += [TXProtocol(ant_sel_f, cmd_tx, '%s tx' % (service_name,), tx_ports_df, control_port_df).start()] - - # Wait while wfb_tx allocates ephemeral udp ports and reports them back - tx_ports = yield tx_ports_df - control_port = cfg.control_port - - if control_port == 0: - control_port = yield control_port_df - - log.msg('%s use wfb_tx ports %s, control_port %d' % (service_name, tx_ports, control_port)) - - p_tx_l = [UDPProxyProtocol(('127.0.0.1', tx_ports[wlan])) for wlan in wlans] - sockets += [reactor.listenUDP(0, p_tx) for p_tx in p_tx_l ] +def main(): + description = 'WFB-ng version %s-%s' % (settings.common.version, settings.common.commit[:8]) - def ant_sel_cb(ant_idx): - p_in.peer = p_tx_l[ant_idx] + parser = argparse.ArgumentParser(description=description, + formatter_class=argparse.RawTextHelpFormatter) - ant_sel_f.add_ant_sel_cb(ant_sel_cb) + parser.add_argument('--version', action='version', version=version_msg % settings) + parser.add_argument('--profiles', type=str, required=True, nargs='+', metavar='profile', help='Use service profile(s)') - def _cleanup(x): - for s in sockets: - s.stopListening() + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument('--cluster', type=str, choices=('ssh', 'manual'), help='Distributed mode') + group.add_argument('--gen-init', type=str, metavar='node', help='Generate init script for cluster node') + group.add_argument('--wlans', type=str, nargs='+', metavar='wlan', help='WiFi interfaces for local mode') - return x + args = parser.parse_args() + profiles = sorted(args.profiles) - yield defer.gatherResults(dl, consumeErrors=True).addBoth(_cleanup)\ - .addErrback(lambda f: f.trap(defer.FirstError) and f.value.subFailure) + if args.gen_init: + _, cluster_nodes = parse_cluster_services(profiles) + print(gen_cluster_scripts(cluster_nodes)[args.gen_init]) + return -def main(): log.msg = _log_msg if settings.common.log_file: @@ -1104,14 +222,22 @@ def main(): obs = ConsoleObserver() log.theLogPublisher._startLogging(obs.emit, False) + log.msg(description) - log.msg('WFB-ng version %s-%s' % (settings.common.version, settings.common.commit[:8])) - profiles, wlans = sys.argv[1], list(wlan for arg in sys.argv[2:] for wlan in arg.split()) uname = os.uname() - log.msg('Run on %s/%s @%s, profile(s) %s using %s' % (uname[4], uname[2], uname[1], profiles, ', '.join(wlans))) - log.msg('Using cfg files:\n%s' % ('\n'.join(cfg_files),)) - reactor.callWhenRunning(lambda: defer.maybeDeferred(init, profiles.split(':'), wlans)\ + if args.cluster: + cluster_mode = args.cluster + wlans = None + log.msg('Run on %s/%s @%s, profile(s): %s, cluster mode' % (uname[4], uname[2], uname[1], profiles)) + else: + cluster_mode = None + wlans = args.wlans + log.msg('Run on %s/%s @%s, profile(s): %s, using: %s' % (uname[4], uname[2], uname[1], profiles, wlans)) + + log.msg('Using config files:\n%s' % ('\n'.join(cfg_files),)) + + reactor.callWhenRunning(lambda: defer.maybeDeferred(init, profiles, wlans, cluster_mode)\ .addErrback(abort_on_crash)) reactor.run() diff --git a/wfb_ng/services.py b/wfb_ng/services.py new file mode 100644 index 00000000..b2d79e22 --- /dev/null +++ b/wfb_ng/services.py @@ -0,0 +1,512 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2018-2024 Vasily Evseenko + +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 3. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# + +import sys +import os +import re +import hashlib + +from twisted.python import log, failure +from twisted.internet import reactor, defer, main as ti_main, threads +from twisted.internet.serialport import SerialPort + +from .protocols import RXProtocol, TXProtocol +from .proxy import UDPProxyProtocol, MavlinkSerialProxyProtocol, MavlinkUDPProxyProtocol +from .mavlink_protocol import MavlinkARMProtocol, MavlinkTCPFactory, MavlinkLoggerProtocol +from .tuntap import TUNTAPProtocol, TUNTAPTransport +from .config_parser import Section +from .conf import settings + +connect_re = re.compile(r'^connect://(?P[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+):(?P[0-9]+)$', re.IGNORECASE) +listen_re = re.compile(r'^listen://(?P[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+):(?P[0-9]+)$', re.IGNORECASE) +serial_re = re.compile(r'^serial:(?P[a-z0-9\-\_/]+):(?P[0-9]+)$', re.IGNORECASE) + + +bandwidth_map = { + 5: '5MHz', + 10: '10MHz', + 20: 'HT20', + 40: 'HT40+', + 80: '80MHz', + 160: '160MHz', +} + +def hash_link_domain(link_domain): + return int.from_bytes(hashlib.sha1(link_domain.encode('utf-8')).digest()[:3], 'big') + + +def parse_services(profile_name, udp_port_allocator): + res = [] + for stream in getattr(settings, profile_name).streams: + cfg = Section() + stream = dict(stream) + name = stream.pop('name') + service_type = stream.pop('service_type') + + for profile in stream.pop('profiles'): + cfg.__dict__.update(getattr(settings, profile).__dict__) + + cfg.__dict__.update(stream) + + # Allocate udp port for cluster aggregator and services + if udp_port_allocator is not None: + cfg.udp_port_auto = next(udp_port_allocator) + + res.append((name, service_type, cfg)) + + return res + + +@defer.inlineCallbacks +def init_udp_direct_tx(service_name, cfg, wlans, link_id, ant_sel_f, is_cluster): + # Direct udp doesn't support TX diversity - only first card will be used. + # But if mirror mode is enabled it will use all cards. + + if not cfg.mirror and (len(wlans) > 1 or ',' in wlans[0]): + raise Exception("udp_direct_tx doesn't supports diversity but multiple cards selected. Use udp_proxy for such case.") + + if not listen_re.match(cfg.peer): + raise Exception('%s: unsupported peer address: %s' % (service_name, cfg.peer)) + + m = listen_re.match(cfg.peer) + listen = m.group('addr'), int(m.group('port')) + log.msg('Listen for %s stream %d on %s:%d' % (service_name, cfg.stream_tx, listen[0], listen[1])) + + cmd = ('%(cmd)s%(cluster)s -f %(frame_type)s -p %(stream)d -u %(port)d -K %(key)s '\ + '-B %(bw)d -G %(gi)s -S %(stbc)d -L %(ldpc)d -M %(mcs)d'\ + '%(mirror)s%(force_vht)s%(qdisc)s '\ + '-k %(fec_k)d -n %(fec_n)d -T %(fec_timeout)d -F %(fec_delay)d -i %(link_id)d -R %(rcv_buf_size)d -C %(control_port)d' % \ + dict(cmd=os.path.join(settings.path.bin_dir, 'wfb_tx'), + cluster=' -d' if is_cluster else '', + frame_type=cfg.frame_type, + stream=cfg.stream_tx, + port=listen[1], + control_port = cfg.control_port, + key=os.path.join(settings.path.conf_dir, cfg.keypair), + bw=cfg.bandwidth, + force_vht=' -V' if cfg.force_vht else '', + qdisc=' -Q -P %d' % (cfg.fwmark,) if cfg.use_qdisc else '', + gi="short" if cfg.short_gi else "long", + stbc=cfg.stbc, + ldpc=cfg.ldpc, + mcs=cfg.mcs_index, + mirror=' -m' if cfg.mirror else '', + fec_k=cfg.fec_k, + fec_n=cfg.fec_n, + fec_timeout=cfg.fec_timeout, + fec_delay=cfg.fec_delay, + link_id=link_id, + rcv_buf_size=settings.common.tx_rcv_buf_size) + ).split() + wlans + + control_port_df = defer.Deferred() if cfg.control_port == 0 else None + df = TXProtocol(ant_sel_f, cmd, 'video tx', control_port_df=control_port_df).start() + log.msg('%s: %s' % (service_name, ' '.join(cmd),)) + + control_port = cfg.control_port + + if control_port == 0: + control_port = yield control_port_df + + log.msg('%s use wfb_tx control_port %d' % (service_name, control_port)) + + yield df + + +def init_udp_direct_rx(service_name, cfg, wlans, link_id, ant_sel_f, is_cluster): + if not connect_re.match(cfg.peer): + raise Exception('%s: unsupported peer address: %s' % (service_name, cfg.peer)) + + m = connect_re.match(cfg.peer) + connect = m.group('addr'), int(m.group('port')) + log.msg('Send %s stream %d to %s:%d' % (service_name, cfg.stream_rx, connect[0], connect[1])) + + cmd = ('%(cmd)s%(cluster)s -p %(stream)d -c %(ip_addr)s -u %(port)d -K %(key)s -R %(rcv_buf_size)d -i %(link_id)d' % \ + dict(cmd=os.path.join(settings.path.bin_dir, 'wfb_rx'), + cluster=' -a %d' % (cfg.udp_port_auto,) if is_cluster else '', + stream=cfg.stream_rx, + ip_addr=connect[0], + port=connect[1], + key=os.path.join(settings.path.conf_dir, cfg.keypair), + rcv_buf_size=settings.common.tx_rcv_buf_size, + link_id=link_id)).split() + (wlans if not is_cluster else []) + + df = RXProtocol(ant_sel_f, cmd, '%s rx' % (service_name,)).start() + + log.msg('%s: %s' % (service_name, ' '.join(cmd),)) + return df + + +@defer.inlineCallbacks +def init_mavlink(service_name, cfg, wlans, link_id, ant_sel_f, is_cluster): + listen = None + connect = None + serial = None + osd_peer = None + + if connect_re.match(cfg.peer): + m = connect_re.match(cfg.peer) + connect = m.group('addr'), int(m.group('port')) + log.msg('Connect %s stream %d(RX), %d(TX) to %s:%d' % (service_name, cfg.stream_rx, cfg.stream_tx, connect[0], connect[1])) + + elif listen_re.match(cfg.peer): + m = listen_re.match(cfg.peer) + listen = m.group('addr'), int(m.group('port')) + log.msg('Listen for %s stream %d(RX), %d(TX) on %s:%d' % (service_name, cfg.stream_rx, cfg.stream_tx, listen[0], listen[1])) + + elif serial_re.match(cfg.peer): + m = serial_re.match(cfg.peer) + serial = m.group('dev'), int(m.group('baud')) + log.msg('Open serial port %s on speed %d' % (serial[0], serial[1])) + + else: + raise Exception('Unsupported peer address: %s' % (cfg.peer,)) + + if cfg.osd is not None and connect_re.match(cfg.osd): + m = connect_re.match(cfg.osd) + osd_peer = m.group('addr'), int(m.group('port')) + log.msg('Mirror %s stream to OSD at %s:%d' % (service_name, osd_peer[0], osd_peer[1])) + + rx_hooks = [] + tx_hooks = [] + + if cfg.call_on_arm or cfg.call_on_disarm: + arm_proto = MavlinkARMProtocol(cfg.call_on_arm, cfg.call_on_disarm) + rx_hooks.append(arm_proto.dataReceived) + tx_hooks.append(arm_proto.dataReceived) + + if cfg.log_messages and ant_sel_f.logger is not None: + mav_log_proto = MavlinkLoggerProtocol(ant_sel_f.logger) + rx_hooks.append(mav_log_proto.dataReceived) + tx_hooks.append(mav_log_proto.dataReceived) + + if serial: + p_in = MavlinkSerialProxyProtocol(agg_max_size=settings.common.radio_mtu, + agg_timeout=settings.common.mavlink_agg_timeout, + inject_rssi=cfg.inject_rssi, + mavlink_sys_id=cfg.mavlink_sys_id, + mavlink_comp_id=cfg.mavlink_comp_id, + rx_hooks=rx_hooks, tx_hooks=tx_hooks) + else: + # The first argument is not None only if we initiate mavlink connection + p_in = MavlinkUDPProxyProtocol(connect, agg_max_size=settings.common.radio_mtu, + agg_timeout=settings.common.mavlink_agg_timeout, + inject_rssi=cfg.inject_rssi, + mirror=osd_peer, + mavlink_sys_id=cfg.mavlink_sys_id, + mavlink_comp_id=cfg.mavlink_comp_id, + rx_hooks=rx_hooks, tx_hooks=tx_hooks) + + p_rx = UDPProxyProtocol() + p_rx.peer = p_in + + rx_socket = reactor.listenUDP(0, p_rx) + sockets = [rx_socket] + + cmd_rx = ('%(cmd)s%(cluster)s -p %(stream)d -u %(port)d -K %(key)s -R %(rcv_buf_size)d -i %(link_id)d' % \ + dict(cmd=os.path.join(settings.path.bin_dir, 'wfb_rx'), + cluster=' -a %d' % (cfg.udp_port_auto,) if is_cluster else '', + stream=cfg.stream_rx, + port=rx_socket.getHost().port, + key=os.path.join(settings.path.conf_dir, cfg.keypair), + rcv_buf_size=settings.common.tx_rcv_buf_size, + link_id=link_id)).split() + (wlans if not is_cluster else []) + + cmd_tx = ('%(cmd)s%(cluster)s -f %(frame_type)s -p %(stream)d -u %(port)d -K %(key)s -B %(bw)d '\ + '-G %(gi)s -S %(stbc)d -L %(ldpc)d -M %(mcs)d'\ + '%(mirror)s%(force_vht)s%(qdisc)s '\ + '-k %(fec_k)d -n %(fec_n)d -T %(fec_timeout)d -F %(fec_delay)d -i %(link_id)d -R %(rcv_buf_size)d -C %(control_port)d' % \ + dict(cmd=os.path.join(settings.path.bin_dir, 'wfb_tx'), + cluster=' -d' if is_cluster else '', + frame_type=cfg.frame_type, + stream=cfg.stream_tx, + port=0, + control_port=cfg.control_port, + key=os.path.join(settings.path.conf_dir, cfg.keypair), + bw=cfg.bandwidth, + force_vht=' -V' if cfg.force_vht else '', + qdisc=' -Q -P %d' % (cfg.fwmark,) if cfg.use_qdisc else '', + gi="short" if cfg.short_gi else "long", + stbc=cfg.stbc, + ldpc=cfg.ldpc, + mcs=cfg.mcs_index, + mirror=' -m' if cfg.mirror else '', + fec_k=cfg.fec_k, + fec_n=cfg.fec_n, + fec_timeout=cfg.fec_timeout, + fec_delay=cfg.fec_delay, + link_id=link_id, + rcv_buf_size=settings.common.tx_rcv_buf_size)).split() + wlans + + log.msg('%s RX: %s' % (service_name, ' '.join(cmd_rx))) + log.msg('%s TX: %s' % (service_name, ' '.join(cmd_tx))) + + # Setup mavlink TCP proxy + if cfg.mavlink_tcp_port: + mav_tcp_f = MavlinkTCPFactory(p_in) + p_in.rx_hooks.append(mav_tcp_f.write) + reactor.listenTCP(cfg.mavlink_tcp_port, mav_tcp_f) + + tx_ports_df = defer.Deferred() + control_port_df = defer.Deferred() if cfg.control_port == 0 else None + + dl = [TXProtocol(ant_sel_f, cmd_tx, '%s tx' % (service_name,), tx_ports_df, control_port_df).start()] + + # Wait while wfb_tx allocates ephemeral udp ports and reports them back + tx_ports = yield tx_ports_df + control_port = cfg.control_port + + if control_port == 0: + control_port = yield control_port_df + + log.msg('%s use wfb_tx ports %s, control_port %d' % (service_name, tx_ports, control_port)) + + p_tx_map = dict((wlan_id, UDPProxyProtocol(('127.0.0.1', port))) for wlan_id, port in tx_ports.items()) + + if serial: + serial_port = SerialPort(p_in, os.path.join('/dev', serial[0]), reactor, baudrate=serial[1]) + serial_port._serial.exclusive = True + + else: + serial_port = None + sockets += [ reactor.listenUDP(listen[1] if listen else 0, p_in) ] + + sockets += [ reactor.listenUDP(0, p_tx) for p_tx in p_tx_map.values() ] + + def ant_sel_cb(wlan_id): + p_in.peer = p_tx_map[wlan_id] \ + if wlan_id is not None \ + else list(p_tx_map.values())[0] + + ant_sel_f.add_ant_sel_cb(ant_sel_cb) + + # Report RSSI to OSD + ant_sel_f.add_rssi_cb(p_in.send_rssi) + + dl.append(RXProtocol(ant_sel_f, cmd_rx, '%s rx' % (service_name,)).start()) + + def _cleanup(x): + if serial_port is not None: + serial_port.loseConnection() + serial_port.connectionLost(failure.Failure(ti_main.CONNECTION_DONE)) + + for s in sockets: + s.stopListening() + + return x + + yield defer.gatherResults(dl, consumeErrors=True).addBoth(_cleanup)\ + .addErrback(lambda f: f.trap(defer.FirstError) and f.value.subFailure) + + +@defer.inlineCallbacks +def init_tunnel(service_name, cfg, wlans, link_id, ant_sel_f, is_cluster): + p_in = TUNTAPProtocol(mtu=settings.common.radio_mtu, + agg_timeout=settings.common.tunnel_agg_timeout) + + p_rx = UDPProxyProtocol() + p_rx.peer = p_in + + rx_socket = reactor.listenUDP(0, p_rx) + sockets = [rx_socket] + + cmd_rx = ('%(cmd)s%(cluster)s -p %(stream)d -u %(port)d -K %(key)s -R %(rcv_buf_size)d -i %(link_id)d' % \ + dict(cmd=os.path.join(settings.path.bin_dir, 'wfb_rx'), + cluster=' -a %d' % (cfg.udp_port_auto,) if is_cluster else '', + stream=cfg.stream_rx, + port=rx_socket.getHost().port, + key=os.path.join(settings.path.conf_dir, cfg.keypair), + rcv_buf_size=settings.common.tx_rcv_buf_size, + link_id=link_id)).split() + (wlans if not is_cluster else []) + + cmd_tx = ('%(cmd)s%(cluster)s -f %(frame_type)s -p %(stream)d -u %(port)d -K %(key)s -B %(bw)d -G %(gi)s '\ + '-S %(stbc)d -L %(ldpc)d -M %(mcs)d'\ + '%(mirror)s%(force_vht)s%(qdisc)s '\ + '-k %(fec_k)d -n %(fec_n)d -T %(fec_timeout)d -F %(fec_delay)d -i %(link_id)d -R %(rcv_buf_size)d -C %(control_port)d' % \ + dict(cmd=os.path.join(settings.path.bin_dir, 'wfb_tx'), + cluster=' -d' if is_cluster else '', + frame_type=cfg.frame_type, + stream=cfg.stream_tx, + port=0, + control_port=cfg.control_port, + key=os.path.join(settings.path.conf_dir, cfg.keypair), + bw=cfg.bandwidth, + force_vht=' -V' if cfg.force_vht else '', + qdisc=' -Q -P %d' % (cfg.fwmark,) if cfg.use_qdisc else '', + gi="short" if cfg.short_gi else "long", + stbc=cfg.stbc, + ldpc=cfg.ldpc, + mcs=cfg.mcs_index, + mirror=' -m' if cfg.mirror else '', + fec_k=cfg.fec_k, + fec_n=cfg.fec_n, + fec_timeout=cfg.fec_timeout, + fec_delay=cfg.fec_delay, + link_id=link_id, + rcv_buf_size=settings.common.tx_rcv_buf_size)).split() + wlans + + log.msg('%s RX: %s' % (service_name, ' '.join(cmd_rx))) + log.msg('%s TX: %s' % (service_name, ' '.join(cmd_tx),)) + + tx_ports_df = defer.Deferred() + control_port_df = defer.Deferred() if cfg.control_port == 0 else None + + dl = [TXProtocol(ant_sel_f, cmd_tx, '%s tx' % (service_name,), tx_ports_df, control_port_df).start()] + + # Wait while wfb_tx allocates ephemeral udp ports and reports them back + tx_ports = yield tx_ports_df + control_port = cfg.control_port + + if control_port == 0: + control_port = yield control_port_df + + log.msg('%s use wfb_tx ports %s, control_port %d' % (service_name, tx_ports, control_port)) + p_tx_map = dict((wlan_id, UDPProxyProtocol(('127.0.0.1', port))) for wlan_id, port in tx_ports.items()) + + tun_ep = TUNTAPTransport(reactor, p_in, cfg.ifname, cfg.ifaddr, mtu=settings.common.radio_mtu, default_route=cfg.default_route) + sockets += [ reactor.listenUDP(0, p_tx) for p_tx in p_tx_map.values() ] + + def ant_sel_cb(wlan_id): + p_in.peer = p_tx_map[wlan_id] \ + if wlan_id is not None \ + else list(p_tx_map.values())[0] + + # Broadcast keepalive message to all cards, not to active one + # This allow to use direct antennas on both ends and/or differenct frequencies. + # But when mirroring enabled it will be done by wfb_tx itself + + if cfg.mirror: + p_in.all_peers = list(p_tx_map.values())[0:1] + else: + p_in.all_peers = list(p_tx_map.values()) + + ant_sel_f.add_ant_sel_cb(ant_sel_cb) + + dl.append(RXProtocol(ant_sel_f, cmd_rx, '%s rx' % (service_name,)).start()) + + def _cleanup(x): + tun_ep.loseConnection() + for s in sockets: + s.stopListening() + return x + + yield defer.gatherResults(dl, consumeErrors=True).addBoth(_cleanup)\ + .addErrback(lambda f: f.trap(defer.FirstError) and f.value.subFailure) + + +@defer.inlineCallbacks +def init_udp_proxy(service_name, cfg, wlans, link_id, ant_sel_f, is_cluster): + listen = None + connect = None + + if connect_re.match(cfg.peer): + m = connect_re.match(cfg.peer) + connect = m.group('addr'), int(m.group('port')) + log.msg('Connect %s stream %s(RX), %s(TX) to %s:%d' % (service_name, cfg.stream_rx, cfg.stream_tx, connect[0], connect[1])) + + elif listen_re.match(cfg.peer): + m = listen_re.match(cfg.peer) + listen = m.group('addr'), int(m.group('port')) + log.msg('Listen for %s stream %s(RX), %s(TX) on %s:%d' % (service_name, cfg.stream_rx, cfg.stream_tx, listen[0], listen[1])) + + else: + raise Exception('Unsupported peer address: %s' % (cfg.peer,)) + + # The first argument is not None only if we initiate mavlink connection + p_in = UDPProxyProtocol(connect) + sockets = [reactor.listenUDP(listen[1] if listen else 0, p_in)] + dl = [] + + if cfg.stream_rx is not None: + p_rx = UDPProxyProtocol() + p_rx.peer = p_in + rx_socket = reactor.listenUDP(0, p_rx) + sockets = [rx_socket] + cmd_rx = ('%(cmd)s%(cluster)s -p %(stream)d -u %(port)d -K %(key)s -R %(rcv_buf_size)d -i %(link_id)d' % \ + dict(cmd=os.path.join(settings.path.bin_dir, 'wfb_rx'), + cluster=' -a %d' % (cfg.udp_port_auto,) if is_cluster else '', + stream=cfg.stream_rx, + port=rx_socket.getHost().port, + key=os.path.join(settings.path.conf_dir, cfg.keypair), + rcv_buf_size=settings.common.tx_rcv_buf_size, + link_id=link_id)).split() + (wlans if not is_cluster else []) + + log.msg('%s RX: %s' % (service_name, ' '.join(cmd_rx))) + dl.append(RXProtocol(ant_sel_f, cmd_rx, '%s rx' % (service_name,)).start()) + + if cfg.stream_tx is not None: + cmd_tx = ('%(cmd)s%(cluster)s -f %(frame_type)s -p %(stream)d -u %(port)d -K %(key)s -B %(bw)d '\ + '-G %(gi)s -S %(stbc)d -L %(ldpc)d -M %(mcs)d'\ + '%(mirror)s%(force_vht)s%(qdisc)s '\ + '-k %(fec_k)d -n %(fec_n)d -T %(fec_timeout)d -F %(fec_delay)d -i %(link_id)d -R %(rcv_buf_size)d -C %(control_port)d' % \ + dict(cmd=os.path.join(settings.path.bin_dir, 'wfb_tx'), + cluster=' -d' if is_cluster else '', + frame_type=cfg.frame_type, + stream=cfg.stream_tx, + port=0, + control_port=cfg.control_port, + key=os.path.join(settings.path.conf_dir, cfg.keypair), + bw=cfg.bandwidth, + force_vht=' -V' if cfg.force_vht else '', + qdisc=' -Q -P %d' % (cfg.fwmark,) if cfg.use_qdisc else '', + gi="short" if cfg.short_gi else "long", + stbc=cfg.stbc, + ldpc=cfg.ldpc, + mcs=cfg.mcs_index, + mirror=' -m' if cfg.mirror else '', + fec_k=cfg.fec_k, + fec_n=cfg.fec_n, + fec_timeout=cfg.fec_timeout, + fec_delay=cfg.fec_delay, + link_id=link_id, + rcv_buf_size=settings.common.tx_rcv_buf_size)).split() + wlans + log.msg('%s TX: %s' % (service_name, ' '.join(cmd_tx))) + + tx_ports_df = defer.Deferred() + control_port_df = defer.Deferred() if cfg.control_port == 0 else None + + dl += [TXProtocol(ant_sel_f, cmd_tx, '%s tx' % (service_name,), tx_ports_df, control_port_df).start()] + + # Wait while wfb_tx allocates ephemeral udp ports and reports them back + tx_ports = yield tx_ports_df + control_port = cfg.control_port + + if control_port == 0: + control_port = yield control_port_df + + log.msg('%s use wfb_tx ports %s, control_port %d' % (service_name, tx_ports, control_port)) + + p_tx_map = dict((wlan_id, UDPProxyProtocol(('127.0.0.1', port))) for wlan_id, port in tx_ports.items()) + sockets += [ reactor.listenUDP(0, p_tx) for p_tx in p_tx_map.values() ] + + def ant_sel_cb(wlan_id): + p_in.peer = p_tx_map[wlan_id] \ + if wlan_id is not None \ + else list(p_tx_map.values())[0] + + ant_sel_f.add_ant_sel_cb(ant_sel_cb) + + def _cleanup(x): + for s in sockets: + s.stopListening() + + return x + + yield defer.gatherResults(dl, consumeErrors=True).addBoth(_cleanup)\ + .addErrback(lambda f: f.trap(defer.FirstError) and f.value.subFailure) diff --git a/wfb_ng/tests/test_txrx.py b/wfb_ng/tests/test_txrx.py index 315c70d8..b9f56690 100644 --- a/wfb_ng/tests/test_txrx.py +++ b/wfb_ng/tests/test_txrx.py @@ -12,7 +12,9 @@ from twisted.internet.protocol import DatagramProtocol from ..common import df_sleep -from ..server import RXProtocol, TXProtocol, call_and_check_rc +from ..protocols import RXProtocol, TXProtocol +from .. import call_and_check_rc + class UDP_TXRX(DatagramProtocol): def __init__(self, tx_addr):