Skip to content

Commit 57c0ea8

Browse files
committed
Named pipe fixes and revert skip logic
1 parent b12b68e commit 57c0ea8

File tree

2 files changed

+40
-132
lines changed

2 files changed

+40
-132
lines changed

src/cliraop.c

Lines changed: 39 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
#include <fcntl.h>
1414
#include <stdlib.h>
1515
#include <string.h>
16-
#include <stdbool.h>
1716
#include "../crosstools/src/platform.h"
1817
#include <assert.h>
1918

@@ -57,7 +56,7 @@ bool startsWith(const char *pre, const char *str)
5756
static bool glMainRunning = true;
5857
static pthread_t glCmdPipeReaderThread;
5958
char cmdPipeName[32];
60-
int cmdPipeFd = -1;
59+
int cmdPipeFd;
6160
char cmdPipeBuf[512];
6261
int latency = MS2TS(1000, 44100);
6362
struct raopcl_s *raopcl;
@@ -67,7 +66,6 @@ enum
6766
PAUSED,
6867
PLAYING
6968
} status;
70-
double skip_seconds = 0.0; // seconds to skip for late joiners
7169

7270
// debug level from tools & other elements
7371
log_level util_loglevel;
@@ -109,11 +107,9 @@ static int print_usage(char *argv[])
109107
"\t[-latency <latency> (frames]\n"
110108
"\t[-wait <wait>] (start after <wait> milliseconds)\n"
111109
"\t[-ntpstart <start>] (start at NTP <start> + <wait>)\n"
112-
"\t[-skip <seconds>] (skip N seconds of audio for late joiners)\n"
113110
"\t[-encrypt] audio payload encryption\n"
114111
"\t[-dacp <dacp_id>] (DACP id)\n"
115112
"\t[-activeremote <activeremote_id>] (Active Remote id)\n"
116-
"\t[-cmdpipe <path>] (optional named pipe for metadata/commands)\n"
117113
"\t[-alac] send ALAC compressed audio\n"
118114

119115
"\t[-et <value>] (et in mDNS: 4 for airport-express and used to detect MFi)\n"
@@ -127,8 +123,9 @@ static int print_usage(char *argv[])
127123
"\t[-udn <UDN>] (UDN name in mdns, required for password)\n"
128124

129125
"\t[-if <ipaddress>] (IP of the interface to bind to)\n"
126+
"\t[-cmdpipe <path>] (optional named pipe for metadata/commands)\n"
130127

131-
"\t[-pair] enter pairing mode for AppleTV (use with <player_ip>, default port 7000)\n"
128+
"\t[-pair] enter pairing mode for AppleTV\n"
132129
"\t[-debug <debug level>] (0 = silent)\n",
133130
name);
134131
return -1;
@@ -151,19 +148,7 @@ static void close_platform()
151148
/*----------------------------------------------------------------------------*/
152149
static void *CmdPipeReaderThread(void *args)
153150
{
154-
// Open with O_NONBLOCK to avoid blocking when no writer is connected yet
155-
cmdPipeFd = open(cmdPipeName, O_RDONLY | O_NONBLOCK);
156-
if (cmdPipeFd < 0) {
157-
LOG_ERROR("Failed to open command pipe: %s", cmdPipeName);
158-
return NULL;
159-
}
160-
161-
// Set back to blocking mode after opening
162-
int file_flags = fcntl(cmdPipeFd, F_GETFL);
163-
fcntl(cmdPipeFd, F_SETFL, file_flags & ~O_NONBLOCK);
164-
165-
LOG_INFO("Command pipe opened: %s", cmdPipeName);
166-
151+
cmdPipeFd = open(cmdPipeName, O_RDONLY);
167152
struct
168153
{
169154
char *title;
@@ -179,12 +164,8 @@ static void *CmdPipeReaderThread(void *args)
179164
if (!glMainRunning)
180165
break;
181166

182-
int bytesRead = read(cmdPipeFd, cmdPipeBuf, sizeof(cmdPipeBuf) - 1);
183-
if (bytesRead > 0)
167+
if (read(cmdPipeFd, cmdPipeBuf, 512) > 0)
184168
{
185-
// Null-terminate the buffer
186-
cmdPipeBuf[bytesRead] = '\0';
187-
188169
// read lines
189170
char *save_ptr1, *save_ptr2;
190171
char *line = strtok_r(cmdPipeBuf, "\n", &save_ptr1);
@@ -315,30 +296,8 @@ static void *CmdPipeReaderThread(void *args)
315296
// clear cmdPipeBuf
316297
memset(cmdPipeBuf, 0, sizeof cmdPipeBuf);
317298
}
318-
else if (bytesRead == 0)
319-
{
320-
// EOF - writer has disconnected, need to re-open pipe for next writer
321-
LOG_INFO("Command pipe writer disconnected, re-opening pipe...");
322-
close(cmdPipeFd);
323-
324-
// Re-open with O_NONBLOCK to avoid blocking
325-
cmdPipeFd = open(cmdPipeName, O_RDONLY | O_NONBLOCK);
326-
if (cmdPipeFd < 0)
327-
{
328-
LOG_ERROR("Failed to re-open command pipe: %s", cmdPipeName);
329-
break;
330-
}
331-
332-
// Set back to blocking mode for reads
333-
int file_flags = fcntl(cmdPipeFd, F_GETFL);
334-
fcntl(cmdPipeFd, F_SETFL, file_flags & ~O_NONBLOCK);
335-
336-
// Small sleep to avoid tight loop if open/close is rapid
337-
usleep(10 * 1000); // 10ms
338-
}
339299
else
340300
{
341-
// Error reading - sleep and retry
342301
usleep(250 * 1000);
343302
}
344303
}
@@ -427,14 +386,6 @@ int main(int argc, char *argv[])
427386
{
428387
activeRemote = argv[++i];
429388
}
430-
else if (!strcmp(argv[i], "-cmdpipe"))
431-
{
432-
cmdpipe = argv[++i];
433-
}
434-
else if (!strcmp(argv[i], "-skip"))
435-
{
436-
skip_seconds = atof(argv[++i]);
437-
}
438389
else if (!strcmp(argv[i], "-alac"))
439390
{
440391
alac = true;
@@ -463,6 +414,10 @@ int main(int argc, char *argv[])
463414
{
464415
strcpy(glInterface, argv[++i]);
465416
}
417+
else if (!strcmp(argv[i], "-cmdpipe"))
418+
{
419+
cmdpipe = argv[++i];
420+
}
466421
else if (!strcmp(argv[i], "-secret"))
467422
{
468423
secret = argv[++i];
@@ -547,28 +502,27 @@ int main(int argc, char *argv[])
547502
if (!strcmp(fname, "-"))
548503
{
549504
infile = fileno(stdin);
505+
LOG_INFO("Reading audio from stdin");
550506
}
551507
else
552508
{
509+
// Check if this is a FIFO/named pipe
553510
struct stat st;
554511
bool is_fifo = false;
555512

556-
// Check if file exists and is a FIFO
557513
if (stat(fname, &st) == 0)
558514
{
559-
if (S_ISFIFO(st.st_mode))
560-
{
561-
is_fifo = true;
562-
LOG_INFO("Using existing audio pipe: %s", fname);
563-
}
515+
// File exists, check if it's a FIFO
516+
is_fifo = S_ISFIFO(st.st_mode);
517+
LOG_INFO("Using existing audio source: %s (%s)", fname, is_fifo ? "named pipe" : "file");
564518
}
565519
else
566520
{
567-
// File doesn't exist - try to create it as a FIFO
568-
LOG_INFO("Creating audio pipe: %s", fname);
521+
// File doesn't exist - try to create as FIFO
569522
if (mkfifo(fname, 0666) == 0)
570523
{
571524
is_fifo = true;
525+
LOG_INFO("Created audio named pipe: %s", fname);
572526
}
573527
else
574528
{
@@ -579,7 +533,8 @@ int main(int argc, char *argv[])
579533

580534
// Open with O_NONBLOCK for FIFOs to avoid blocking on open
581535
int flags = O_RDONLY;
582-
if (is_fifo) flags |= O_NONBLOCK;
536+
if (is_fifo)
537+
flags |= O_NONBLOCK;
583538

584539
if ((infile = open(fname, flags)) == -1)
585540
{
@@ -588,11 +543,12 @@ int main(int argc, char *argv[])
588543
exit(1);
589544
}
590545

591-
// If it was a FIFO, set it back to blocking mode for reads
546+
// For FIFOs, set back to blocking mode after opening
592547
if (is_fifo)
593548
{
594549
int file_flags = fcntl(infile, F_GETFL);
595550
fcntl(infile, F_SETFL, file_flags & ~O_NONBLOCK);
551+
LOG_INFO("Audio pipe opened (now in blocking mode for reads)");
596552
}
597553
}
598554

@@ -611,21 +567,23 @@ int main(int argc, char *argv[])
611567
exit(1);
612568
}
613569

614-
// setup named pipe for metadata/commands (only if -cmdpipe is specified)
570+
// setup named pipe for metadata/commands
615571
if (cmdpipe)
616572
{
617-
struct stat st;
618573
strncpy(cmdPipeName, cmdpipe, sizeof(cmdPipeName) - 1);
619574
cmdPipeName[sizeof(cmdPipeName) - 1] = '\0';
620575

621-
// Only create the FIFO if it doesn't exist
576+
// Create pipe only if it doesn't already exist (could be created by caller)
577+
struct stat st;
622578
if (stat(cmdPipeName, &st) != 0)
623579
{
624-
LOG_INFO("Creating command pipe: %s", cmdPipeName);
580+
// Pipe doesn't exist, create it
625581
if (mkfifo(cmdPipeName, 0666) != 0)
626582
{
627583
LOG_ERROR("Failed to create command pipe: %s", cmdPipeName);
584+
exit(1);
628585
}
586+
LOG_INFO("Created command pipe: %s", cmdPipeName);
629587
}
630588
else
631589
{
@@ -685,52 +643,28 @@ int main(int argc, char *argv[])
685643

686644
latency = raopcl_latency(raopcl);
687645

688-
// start the command/metadata reader thread BEFORE printing "connected to"
689-
// so that the pipe reader is ready when Python tries to open the write side
690-
if (cmdpipe)
691-
{
692-
pthread_create(&glCmdPipeReaderThread, NULL, CmdPipeReaderThread, NULL);
693-
// Give the thread a moment to open the pipe
694-
usleep(50 * 1000); // 50ms
695-
}
696-
697646
LOG_INFO("connected to %s on port %d, player latency is %d ms", inet_ntoa(player.addr),
698647
player.port, (int)TS2MS(latency, raopcl_sample_rate(raopcl)));
699648

700649
if (start || wait)
701650
{
702651
uint64_t now = raopcl_get_ntp(NULL);
703652

704-
// For late joiners, we need different logic:
705-
// - Start immediately (don't wait) to avoid blocking Python
706-
// - Offset RTP timestamp to match original players
707-
if (skip_seconds > 0)
708-
{
709-
// Late joiner calculation:
710-
// - Use the ORIGINAL start time (not 'now')
711-
// - Add the skip offset for RTP timestamp sync
712-
// - Subtract latency as normal
713-
// - Do NOT add 'wait' (start immediately)
714-
uint64_t skip_ntp = MS2NTP((uint32_t)(skip_seconds * 1000));
715-
start_at = start + skip_ntp - TS2NTP(latency, raopcl_sample_rate(raopcl));
653+
start_at = (start ? start : now) + MS2NTP(wait) -
654+
TS2NTP(latency, raopcl_sample_rate(raopcl));
716655

717-
LOG_INFO("Late joiner: original start %u.%u + skip %.2fs - latency = start_at %u.%u",
718-
RAOP_SECNTP(start), skip_seconds, RAOP_SECNTP(start_at));
719-
LOG_INFO("now %u.%u, will start immediately (start_at is in the past)", RAOP_SECNTP(now));
720-
}
721-
else
722-
{
723-
// Normal start: use original timing with wait parameter
724-
start_at = (start ? start : now) + MS2NTP(wait) -
725-
TS2NTP(latency, raopcl_sample_rate(raopcl));
726-
727-
LOG_INFO("now %u.%u, audio starts at NTP %u.%u (in %u ms)", RAOP_SECNTP(now), RAOP_SECNTP(start_at),
728-
(start_at + TS2NTP(latency, raopcl_sample_rate(raopcl)) > now) ? (uint32_t)NTP2MS(start_at - now + TS2NTP(latency, raopcl_sample_rate(raopcl))) : 0);
729-
}
656+
LOG_INFO("now %u.%u, audio starts at NTP %u.%u (in %u ms)", RAOP_SECNTP(now), RAOP_SECNTP(start_at),
657+
(start_at + TS2NTP(latency, raopcl_sample_rate(raopcl)) > now) ? (uint32_t)NTP2MS(start_at - now + TS2NTP(latency, raopcl_sample_rate(raopcl))) : 0);
730658

731659
raopcl_start_at(raopcl, start_at);
732660
}
733661

662+
// start the command/metadata reader thread if cmdpipe was specified
663+
if (cmdpipe)
664+
{
665+
pthread_create(&glCmdPipeReaderThread, NULL, CmdPipeReaderThread, NULL);
666+
}
667+
734668
start = raopcl_get_ntp(NULL);
735669
status = PLAYING;
736670

@@ -763,33 +697,14 @@ int main(int argc, char *argv[])
763697
}
764698

765699
// send chunk if needed
766-
bool can_accept = raopcl_accept_frames(raopcl);
767-
768-
// Debug logging: track accept/reject patterns
769-
static int consecutive_rejects = 0;
770-
771-
if (status == PLAYING && !can_accept)
772-
{
773-
consecutive_rejects++;
774-
}
775-
else if (status == PLAYING && can_accept)
776-
{
777-
if (consecutive_rejects > 500) // Log if buffer was full for >500ms
778-
{
779-
LOG_WARN("Buffer was full for %.1f seconds (rejected %d times)",
780-
consecutive_rejects / 1000.0, consecutive_rejects);
781-
}
782-
consecutive_rejects = 0;
783-
}
784-
785-
if (status == PLAYING && can_accept)
700+
if (status == PLAYING && raopcl_accept_frames(raopcl))
786701
{
787702
n = read(infile, buf, DEFAULT_FRAMES_PER_CHUNK * 4);
788703

789704
if (!n)
790705
{
791706
// EOF or no writer on FIFO yet - sleep to avoid busy-waiting
792-
usleep(10000); // 10ms
707+
usleep(10000); // 10ms
793708
continue;
794709
}
795710

@@ -807,8 +722,6 @@ int main(int argc, char *argv[])
807722
glMainRunning = false;
808723
free(buf);
809724
raopcl_disconnect(raopcl);
810-
811-
// Clean up command pipe thread if it was created
812725
if (cmdpipe)
813726
{
814727
pthread_join(glCmdPipeReaderThread, NULL);
@@ -817,14 +730,11 @@ int main(int argc, char *argv[])
817730

818731
exit:
819732
LOG_INFO("exiting...");
820-
821-
// Clean up command pipe if it was created
822-
if (cmdPipeFd >= 0)
733+
if (cmdpipe)
823734
{
824735
close(cmdPipeFd);
825736
unlink(cmdPipeName);
826737
}
827-
828738
raopcl_destroy(raopcl);
829739
close_platform();
830740
return 0;

src/raop_client.c

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -489,9 +489,7 @@ bool raopcl_accept_frames(struct raopcl_s *p)
489489
if (p->pause_ts) now_ts = p->pause_ts;
490490
else now_ts = NTP2TS(raopcl_get_ntp(NULL), p->sample_rate);
491491

492-
// Accept frames if current buffer level hasn't exceeded the target latency
493-
// This allows continuous reading to maintain buffer, not just one chunk at a time
494-
if (p->head_ts < now_ts + raopcl_latency(p)) accept = true;
492+
if (now_ts >= p->head_ts + p->chunk_len) accept = true;
495493

496494
pthread_mutex_unlock(&p->mutex);
497495

0 commit comments

Comments
 (0)