Skip to content

Commit 826bb3f

Browse files
authored
Merge branch 'master' into fix-leaked-socket
2 parents 7db6681 + 5d0491a commit 826bb3f

File tree

3 files changed

+56
-18
lines changed

3 files changed

+56
-18
lines changed

src/main/java/io/antmedia/streamsource/StreamFetcherManager.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -540,23 +540,20 @@ public void controlStreamFetchers(boolean restart) {
540540

541541
//get the updated broadcast object
542542
Broadcast broadcast = datastore.get(streamScheduler.getStreamId());
543-
544-
if (broadcast != null && AntMediaApplicationAdapter.PLAY_LIST.equals(broadcast.getType())) {
545-
//if it's playlist, continue
543+
544+
if (broadcast != null && AntMediaApplicationAdapter.PLAY_LIST.equals(broadcast.getType())) {
545+
// For playlists, only check auto-stop if autoStartStopEnabled is true
546+
if (broadcast.isAutoStartStopEnabled() && isToBeStoppedAutomatically(broadcast)) {
547+
logger.info("Auto-stopping playlist {} because no viewers are watching", streamScheduler.getStreamId());
548+
stopPlayList(streamScheduler.getStreamId());
549+
}
546550
continue;
547551
}
548-
552+
549553
boolean autoStop = false;
550-
if (restart || broadcast == null ||
551-
(autoStop = isToBeStoppedAutomatically(broadcast)))
554+
if (restart || broadcast == null ||
555+
(autoStop = isToBeStoppedAutomatically(broadcast)))
552556
{
553-
//logic of If condition is
554-
555-
// stop it if it's restart = true
556-
// or
557-
// brodcast == null because it means stream is deleted
558-
// or
559-
// autoStop
560557

561558
logger.info("Calling stop stream {} due to restart -> {}, broadcast is null -> {}, auto stop because no viewer -> {}",
562559
streamScheduler.getStreamId(), restart, broadcast == null, autoStop);

src/main/java/io/antmedia/websocket/WebSocketCommunityHandler.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ else if (cmd.equals(WebSocketConstants.STOP_COMMAND)) {
153153
else if (cmd.equals(WebSocketConstants.PING_COMMAND)) {
154154
sendPongMessage(session);
155155
}
156-
else if (cmd.equals(WebSocketConstants.GET_STREAM_INFO_COMMAND) || cmd.equals(WebSocketConstants.PLAY_COMMAND))
156+
else if (cmd.equals(WebSocketConstants.GET_STREAM_INFO_COMMAND) || cmd.equals(WebSocketConstants.PLAY_COMMAND))
157157
{
158158
sendNotFoundJSON(streamId, session);
159159
}
@@ -166,7 +166,6 @@ else if (cmd.equals(WebSocketConstants.GET_STREAM_INFO_COMMAND) || cmd.equals(We
166166
}
167167

168168
}
169-
170169

171170
private void startRTMPAdaptor(Session session, final String streamId, boolean enableVideo) {
172171
int rtmpPort = appAdaptor.getServerSettings().getRtmpPort();

src/test/java/io/antmedia/test/StreamSchedularUnitTest.java

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -732,7 +732,7 @@ public void testIsStreamRunning()
732732

733733
@Test
734734
public void testControlStreamFetchersPlayListAndRestart() {
735-
DataStore dataStore = Mockito.mock(DataStore.class);
735+
DataStore dataStore = Mockito.mock(DataStore.class);
736736
StreamFetcherManager streamFetcherManager = Mockito.spy(new StreamFetcherManager(vertx, dataStore, appScope));
737737
Map<String, StreamFetcher> streamFetcherList = new ConcurrentHashMap<>();
738738

@@ -742,7 +742,7 @@ public void testControlStreamFetchersPlayListAndRestart() {
742742
streamFetcherList.put(streamId, fetcher);
743743
Mockito.when(fetcher.getStreamId()).thenReturn(streamId);
744744
Mockito.when(fetcher.getStreamUrl()).thenReturn(streamUrl);
745-
745+
746746
when(fetcher.isStreamAlive()).thenReturn(true);
747747
when(fetcher.isStreamBlocked()).thenReturn(false);
748748

@@ -751,11 +751,12 @@ public void testControlStreamFetchersPlayListAndRestart() {
751751
when(broadcast.getStreamId()).thenReturn(streamId);
752752
when(broadcast.getStreamUrl()).thenReturn("streamurl");
753753
when(broadcast.getType()).thenReturn(AntMediaApplicationAdapter.PLAY_LIST);
754+
when(broadcast.isAutoStartStopEnabled()).thenReturn(false);
754755

755756
streamFetcherManager.setStreamFetcherList(streamFetcherList);
756757

757758
streamFetcherManager.controlStreamFetchers(false);
758-
//it should not call anything because type is playlist
759+
//it should not call isToBeStoppedAutomatically because type is playlist and autoStartStopEnabled is false
759760
Mockito.verify(streamFetcherManager, Mockito.never()).isToBeStoppedAutomatically(Mockito.any());
760761

761762

@@ -773,6 +774,47 @@ public void testControlStreamFetchersPlayListAndRestart() {
773774
Mockito.verify(streamFetcherManager).startStreaming(broadcast);
774775

775776
}
777+
778+
@Test
779+
public void testControlStreamFetchersPlayListAutoStop() {
780+
DataStore dataStore = Mockito.mock(DataStore.class);
781+
StreamFetcherManager streamFetcherManager = Mockito.spy(new StreamFetcherManager(vertx, dataStore, appScope));
782+
Map<String, StreamFetcher> streamFetcherList = new ConcurrentHashMap<>();
783+
784+
StreamFetcher fetcher = Mockito.mock(StreamFetcher.class);
785+
String streamId = "playlistStream123";
786+
String streamUrl = "streamurl";
787+
streamFetcherList.put(streamId, fetcher);
788+
Mockito.when(fetcher.getStreamId()).thenReturn(streamId);
789+
Mockito.when(fetcher.getStreamUrl()).thenReturn(streamUrl);
790+
791+
when(fetcher.isStreamAlive()).thenReturn(true);
792+
when(fetcher.isStreamBlocked()).thenReturn(false);
793+
794+
Broadcast broadcast = mock(Broadcast.class);
795+
when(dataStore.get(Mockito.any())).thenReturn(broadcast);
796+
when(broadcast.getStreamId()).thenReturn(streamId);
797+
when(broadcast.getStreamUrl()).thenReturn("streamurl");
798+
when(broadcast.getType()).thenReturn(AntMediaApplicationAdapter.PLAY_LIST);
799+
when(broadcast.isAutoStartStopEnabled()).thenReturn(true);
800+
801+
streamFetcherManager.setStreamFetcherList(streamFetcherList);
802+
803+
// When autoStartStopEnabled is true, isToBeStoppedAutomatically should be called for playlists
804+
Mockito.doReturn(false).when(streamFetcherManager).isToBeStoppedAutomatically(Mockito.any());
805+
streamFetcherManager.controlStreamFetchers(false);
806+
Mockito.verify(streamFetcherManager, Mockito.times(1)).isToBeStoppedAutomatically(broadcast);
807+
808+
// Reset and test when isToBeStoppedAutomatically returns true - should call stopPlayList
809+
Mockito.reset(streamFetcherManager);
810+
streamFetcherManager.setStreamFetcherList(streamFetcherList);
811+
Mockito.doReturn(true).when(streamFetcherManager).isToBeStoppedAutomatically(Mockito.any());
812+
Mockito.doReturn(new Result(true)).when(streamFetcherManager).stopPlayList(Mockito.any());
813+
814+
streamFetcherManager.controlStreamFetchers(false);
815+
Mockito.verify(streamFetcherManager, Mockito.times(1)).isToBeStoppedAutomatically(broadcast);
816+
Mockito.verify(streamFetcherManager, Mockito.times(1)).stopPlayList(streamId);
817+
}
776818

777819

778820
@Test

0 commit comments

Comments
 (0)