Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport autocue improvements from 2.2.x #4184

Merged
merged 14 commits into from
Oct 23, 2024
30 changes: 28 additions & 2 deletions src/core/builtins/builtins_resolvers.ml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ let _ =
Lang.getter_t Lang.int_t,
Some (Lang.int 1),
Some "Resolver's priority." );
( "mime_types",
Lang.nullable_t (Lang.list_t Lang.string_t),
Some Lang.null,
Some
"Decode files that match the mime types in this list. Accept any \
file if `null`." );
( "file_extensions",
Lang.nullable_t (Lang.list_t Lang.string_t),
Some Lang.null,
Some
"Decode files that have the file extensions in this list. Accept any \
file if `null`." );
("", Lang.string_t, None, Some "Format/resolver's name.");
( "",
resolver_t,
Expand All @@ -47,11 +59,25 @@ let _ =
(fun p ->
let format = Lang.to_string (Lang.assoc "" 1 p) in
let f = Lang.assoc "" 2 p in
let mimes =
Lang.to_valued_option
(fun v -> List.map Lang.to_string (Lang.to_list v))
(List.assoc "mime_types" p)
in
let extensions =
Lang.to_valued_option
(fun v -> List.map Lang.to_string (Lang.to_list v))
(List.assoc "file_extensions" p)
in
let log = Log.make ["decoder"; "metadata"] in
let priority = Lang.to_int_getter (List.assoc "priority" p) in
let resolver ~metadata ~extension:_ ~mime:_ name =
let resolver ~metadata ~extension ~mime fname =
if
not (Decoder.test_file ~log ~extension ~mime ~mimes ~extensions fname)
then raise Metadata.Invalid;
let ret =
Lang.apply f
[("metadata", Lang.metadata metadata); ("", Lang.string name)]
[("metadata", Lang.metadata metadata); ("", Lang.string fname)]
in
let ret = Lang.to_list ret in
let ret = List.map Lang.to_product ret in
Expand Down
120 changes: 101 additions & 19 deletions src/core/builtins/builtins_source.ml
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,24 @@ let _ =
Some
"Time ratio. A value of `50` means process data at `50x` real rate, \
when possible." );
( "timeout",
Lang.float_t,
Some (Lang.float 1.),
Some
"Stop processing the source if it has not started after the given \
timeout." );
( "sleep_latency",
Lang.float_t,
Some (Lang.float 0.1),
Some
"How much time ahead, in seconds, should we should be before pausing \
the processing." );
]
Lang.unit_t
(fun p ->
let module Time = (val Clock.time_implementation () : Liq_time.T) in
let open Time in
let started = ref false in
let stopped = ref false in
let proto =
let p = Pipe_output.file_proto (Lang.univ_t ()) in
Expand All @@ -198,18 +211,44 @@ let _ =
p
in
let proto = ("fallible", Lang.bool true) :: proto in
let p = (("id", Lang.string "source_dumper") :: p) @ proto in
let clock = Clock.create ~id:"source_dumper" ~sync:`Passive () in
let _ = Pipe_output.new_file_output ~clock p in
let p = (("id", Lang.string "source.drop") :: p) @ proto in
let clock =
Clock.create ~id:"source.dump" ~sync:`Passive
~on_error:(fun exn bt ->
stopped := true;
Utils.log_exception ~log
~bt:(Printexc.raw_backtrace_to_string bt)
(Printf.sprintf "Error while dropping source: %s"
(Printexc.to_string exn)))
()
in
let s = Pipe_output.new_file_output ~clock p in
let ratio = Lang.to_float (List.assoc "ratio" p) in
let latency = Time.of_float (Lazy.force Frame.duration /. ratio) in
Clock.start clock;
let timeout = Time.of_float (Lang.to_float (List.assoc "timeout" p)) in
let sleep_latency =
Time.of_float (Lang.to_float (List.assoc "sleep_latency" p))
in
Clock.start ~force:true clock;
log#info "Start dumping source (ratio: %.02fx)" ratio;
while (not (Atomic.get should_stop)) && not !stopped do
let start_time = Time.time () in
Clock.tick clock;
sleep_until (start_time |+| latency)
done;
let start_time = Time.time () in
let timeout_time = Time.(start_time |+| timeout) in
let target_time () =
Time.(
start_time |+| sleep_latency |+| of_float (Clock.time clock /. ratio))
in
(try
while (not (Atomic.get should_stop)) && not !stopped do
if not !started then started := s#is_ready;
if (not !started) && Time.(timeout_time |<=| start_time) then (
log#important "Timeout while waiting for the source to start!";
stopped := true)
else (
Clock.tick clock;
let target_time = target_time () in
if Time.(time () |<| (target_time |+| sleep_latency)) then
sleep_until target_time)
done
with Clock.Has_stopped -> ());
log#info "Source dumped.";
Clock.stop clock;
Lang.unit)
Expand All @@ -227,14 +266,36 @@ let _ =
Some
"Time ratio. A value of `50` means process data at `50x` real rate, \
when possible." );
( "timeout",
Lang.float_t,
Some (Lang.float 1.),
Some
"Stop processing the source if it has not started after the given \
timeout." );
( "sleep_latency",
Lang.float_t,
Some (Lang.float 0.1),
Some
"How much time ahead, in seconds, should we should be before pausing \
the processing." );
]
Lang.unit_t
(fun p ->
let module Time = (val Clock.time_implementation () : Liq_time.T) in
let open Time in
let s = List.assoc "" p |> Lang.to_source in
let started = ref false in
let stopped = ref false in
let clock = Clock.create ~id:"source_dumper" ~sync:`Passive () in
let clock =
Clock.create ~id:"source.dump" ~sync:`Passive
~on_error:(fun exn bt ->
stopped := true;
Utils.log_exception ~log
~bt:(Printexc.raw_backtrace_to_string bt)
(Printf.sprintf "Error while dropping source: %s"
(Printexc.to_string exn)))
()
in
let _ =
new Output.dummy
~clock ~infallible:false
Expand All @@ -243,14 +304,35 @@ let _ =
~register_telnet:false ~autostart:true (Lang.source s)
in
let ratio = Lang.to_float (List.assoc "ratio" p) in
let latency = Time.of_float (Lazy.force Frame.duration /. ratio) in
Clock.start clock;
let timeout = Time.of_float (Lang.to_float (List.assoc "timeout" p)) in
let sleep_latency =
Time.of_float (Lang.to_float (List.assoc "sleep_latency" p))
in
Clock.start ~force:true clock;
log#info "Start dropping source (ratio: %.02fx)" ratio;
while (not (Atomic.get should_stop)) && not !stopped do
let start_time = Time.time () in
Clock.tick clock;
sleep_until (start_time |+| latency)
done;
log#info "Source dropped.";
let start_time = Time.time () in
let timeout_time = Time.(start_time |+| timeout) in
let target_time () =
Time.(start_time |+| of_float (Clock.time clock /. ratio))
in
(try
while (not (Atomic.get should_stop)) && not !stopped do
let start_time = Time.time () in
if not !started then started := s#is_ready;
if (not !started) && Time.(timeout_time |<=| start_time) then (
log#important "Timeout while waiting for the source to start!";
stopped := true)
else (
Clock.tick clock;
let target_time = target_time () in
if Time.(time () |<| (target_time |+| sleep_latency)) then
sleep_until target_time)
done
with Clock.Has_stopped -> ());
let processing_time = Time.(to_float (time () |-| start_time)) in
let effective_ratio = Clock.time clock /. processing_time in
log#info
"Source dropped. Total processing time: %.02fs, effective ratio: %.02fx"
processing_time effective_ratio;
Clock.stop clock;
Lang.unit)
21 changes: 15 additions & 6 deletions src/core/clock.ml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ let conf_clock_preferred =
let conf_clock_sleep_latency =
Dtools.Conf.int
~p:(conf_clock#plug "sleep_latency")
~d:1
~d:5
"How much time ahead (in frame duration) we should be until we let the \
streaming loop sleep."
~comments:
Expand Down Expand Up @@ -316,9 +316,13 @@ let ticks c =
| `Stopped _ -> 0
| `Stopping { ticks } | `Started { ticks } -> Atomic.get ticks

let _target_time { time_implementation; t0; frame_duration; ticks } =
let _time { time_implementation; frame_duration; ticks } =
let module Time = (val time_implementation : Liq_time.T) in
Time.(t0 |+| (frame_duration |*| of_float (float_of_int (Atomic.get ticks))))
Time.(frame_duration |*| of_float (float_of_int (Atomic.get ticks)))

let _target_time ({ time_implementation; t0 } as c) =
let module Time = (val time_implementation : Liq_time.T) in
Time.(t0 |+| _time c)

let _set_time { time_implementation; t0; frame_duration; ticks } t =
let module Time = (val time_implementation : Liq_time.T) in
Expand Down Expand Up @@ -464,7 +468,7 @@ and _can_start ?(force = false) clock =
`True sync
| _ -> `False

and _start ~sync clock =
and _start ?force ~sync clock =
Unifier.set clock.id (Lang_string.generate_id (Unifier.deref clock.id));
let id = _id clock in
log#important "Starting clock %s with %d source(s) and sync: %s" id
Expand Down Expand Up @@ -497,14 +501,14 @@ and _start ~sync clock =
ticks = Atomic.make 0;
}
in
Queue.iter clock.sub_clocks (fun c -> start c);
Queue.iter clock.sub_clocks (fun c -> start ?force c);
Atomic.set clock.state (`Started x);
if sync <> `Passive then _clock_thread ~clock x

and start ?force c =
let clock = Unifier.deref c in
match _can_start ?force clock with
| `True sync -> _start ~sync clock
| `True sync -> _start ?force ~sync clock
| `False -> ()

let create ?(stack = []) ?on_error ?(id = "generic") ?(sub_ids = [])
Expand All @@ -526,6 +530,11 @@ let create ?(stack = []) ?on_error ?(id = "generic") ?(sub_ids = [])
Queue.push clocks c;
c

let time c =
let ({ time_implementation } as c) = active_params c in
let module Time = (val time_implementation : Liq_time.T) in
Time.to_float (_time c)

let start_pending () =
let c = Queue.flush_elements clocks in
let c = List.map (fun c -> (c, Unifier.deref c)) c in
Expand Down
2 changes: 2 additions & 0 deletions src/core/clock.mli
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
*****************************************************************************)

exception Invalid_state
exception Has_stopped

type t
type active_source = < reset : unit ; output : unit >
Expand Down Expand Up @@ -96,6 +97,7 @@ val started : t -> bool
val stop : t -> unit
val set_stack : t -> Liquidsoap_lang.Pos.t list -> unit
val self_sync : t -> bool
val time : t -> float
val unify : pos:Liquidsoap_lang.Pos.Option.t -> t -> t -> unit
val create_sub_clock : id:string -> t -> t
val attach : t -> source -> unit
Expand Down
Loading
Loading