Skip to content

Commit 856996b

Browse files
cataliniiCatalin Toda
andauthored
Refactor stream locking (#1320)
* Log when the stream is locked * Do not lock a disabled stream * Disable adapter for stream during setup * Refactor locking and unlocking streams --------- Co-authored-by: Catalin Toda <[email protected]>
1 parent e505a96 commit 856996b

File tree

2 files changed

+33
-39
lines changed

2 files changed

+33
-39
lines changed

src/stream.cpp

Lines changed: 33 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,14 @@
1717
* USA
1818
*
1919
*/
20+
#include "stream.h"
21+
#include "adapter.h"
22+
#include "api/symbols.h"
23+
#include "api/variables.h"
24+
#include "dvb.h"
25+
#include "minisatip.h"
26+
#include "pmt.h"
27+
#include "socketworks.h"
2028
#include <arpa/inet.h>
2129
#include <errno.h>
2230
#include <fcntl.h>
@@ -35,15 +43,7 @@
3543
#include <sys/types.h>
3644
#include <time.h>
3745
#include <unistd.h>
38-
39-
#include "adapter.h"
40-
#include "api/symbols.h"
41-
#include "api/variables.h"
42-
#include "dvb.h"
43-
#include "minisatip.h"
44-
#include "pmt.h"
45-
#include "socketworks.h"
46-
#include "stream.h"
46+
#include <vector>
4747

4848
#include "utils/ticks.h"
4949

@@ -224,6 +224,7 @@ streams *setup_stream(char *str, sockets *s) {
224224
int ad = sid->adapter;
225225
if (!strstr(tmp_str, "addpids") && !strstr(tmp_str, "delpids")) {
226226
close_adapter_for_stream(sid->sid, ad, 0);
227+
sid->adapter = -1;
227228
}
228229
}
229230

@@ -991,7 +992,6 @@ int process_packets_for_stream(streams *sid, adapter *ad) {
991992
flush_stream(sid, iov, iiov, rtime);
992993
return 0;
993994
}
994-
995995
int process_dmx(sockets *s) {
996996
int i;
997997
adapter *ad;
@@ -1039,13 +1039,33 @@ int process_dmx(sockets *s) {
10391039
return 0;
10401040
}
10411041

1042+
std::vector<SMutex *> lock_streams_for_adapter(int aid) {
1043+
streams *sid;
1044+
std::vector<SMutex *> locks;
1045+
for (int i = 0; i < MAX_STREAMS; i++)
1046+
if ((sid = get_sid_nw(i)) && sid->adapter == aid) {
1047+
mutex_lock(&sid->mutex);
1048+
if (!sid->enabled) {
1049+
mutex_unlock(&sid->mutex);
1050+
continue;
1051+
}
1052+
locks.push_back(&sid->mutex);
1053+
}
1054+
return locks;
1055+
}
1056+
void unlock_streams_for_adapter(std::vector<SMutex *> locks) {
1057+
int i = 0;
1058+
for (i = locks.size() - 1; i >= 0; i--)
1059+
mutex_unlock(locks[i]);
1060+
}
1061+
10421062
// lock order: socket -> stream -> adapter
10431063
// after stream or adapter, avoid locking socket
10441064

10451065
int read_dmx(sockets *s) {
10461066
static int cnt;
10471067
adapter *ad;
1048-
int send = 0, force_send = 0, ls, lse;
1068+
int send = 0, force_send = 0;
10491069
int threshold = opts.udp_threshold;
10501070
int64_t rtime = getTick();
10511071

@@ -1112,13 +1132,11 @@ int read_dmx(sockets *s) {
11121132
return 0;
11131133

11141134
ad->flush = 0;
1115-
ls = lock_streams_for_adapter(ad->id);
1135+
auto locks = lock_streams_for_adapter(ad->id);
11161136
mutex_lock(&ad->mutex);
11171137
process_dmx(s);
11181138
mutex_unlock(&ad->mutex);
1119-
lse = unlock_streams_for_adapter(ad->id);
1120-
if (ls != lse)
1121-
LOG("leak detected %d %d!!! ", ls, lse);
1139+
unlock_streams_for_adapter(locks);
11221140
return 0;
11231141
}
11241142
#undef DEFAULT_LOG
@@ -1220,28 +1238,6 @@ void dump_streams() {
12201238
get_stream_rport(sid->sid));
12211239
}
12221240

1223-
int lock_streams_for_adapter(int aid) {
1224-
streams *sid;
1225-
int i = 0, ls = 0;
1226-
for (i = 0; i < MAX_STREAMS; i++)
1227-
if ((sid = get_sid_nw(i)) && sid->adapter == aid) {
1228-
mutex_lock(&sid->mutex);
1229-
ls++;
1230-
}
1231-
return ls;
1232-
}
1233-
1234-
int unlock_streams_for_adapter(int aid) {
1235-
streams *sid;
1236-
int i = 0, ls = 0;
1237-
for (i = MAX_STREAMS - 1; i >= 0; i--)
1238-
if ((sid = get_sid_nw(i)) && sid->adapter == aid) {
1239-
mutex_unlock(&sid->mutex);
1240-
ls++;
1241-
}
1242-
return ls;
1243-
}
1244-
12451241
void free_all_streams() {
12461242
int i;
12471243
std::lock_guard<SMutex> lock(st_mutex);

src/stream.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,6 @@ int get_stream_rport(int s_id);
8484
int get_streams_for_adapter(int aid);
8585
int find_session_id(int id);
8686
int calculate_bw(sockets *s);
87-
int lock_streams_for_adapter(int aid);
88-
int unlock_streams_for_adapter(int aid);
8987

9088
#define get_sid(a) get_sid1(a, __FILE__, __LINE__)
9189
#define get_sid_nw(i) \

0 commit comments

Comments
 (0)