diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 3b94ee9..cccc51e 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -13,8 +13,8 @@ jobs: - uses: actions/checkout@v3 - uses: erlef/setup-beam@v1 with: - otp-version: "26.2" - gleam-version: "1.4.0" + otp-version: "27" + gleam-version: "1.10.0" rebar3-version: "3" - run: gleam test - run: gleam format --check src test diff --git a/CHANGELOG.md b/CHANGELOG.md index 7e47d27..0181c96 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,43 @@ # Changelog +## v1.0.0-rc1 - Unreleased + +- The `supervisor` module has been removed. +- The `intensity_tracker` module has been removed. +- The `task` module has been removed. +- The supervision module has been introduced. This module contains types and + functions usable by different supervisor implementations. +- In the `gleam/otp/static_supervisor` module: + - The `start_link` module has been removed. + - The `start` module has been added. + - The `supervised` module has been added. + - Types and functions for defining child specifications have been moved to the + `supervision` module. + - Child ids are now generated, removing possibility of a collision. + - The `Supervisor` type has been added. +- In the `gleam/otp/actor` module: + - The `to_erlang_start_result` function has been removed. + - The `InitResult` type has been removed. + - The `Spec` type has been replaced with the `Builder` type. + - The `new` function has been added. + - The `new_with_initialiser` function has been added. + - The `start` function has been removed. + - The `start_spec` function has been renamed to `start`. + - The `Initialised` type has been added, along with the `initialised`, + `selecting`, and `returning` functions to create and work with it. + - The `InitCrashed` variant of `StartError` has been replaced with the + `InitExited` variant. + - The `InitFailed` variant of `StartError` now contains a string rather than + an `StartError`. + - The `Next` type now has the state type as the first type parameter and the + message type as the second. + - The `Next` type is now opaque. + - The argument order of the actor message handler callback function has been + changed so the state is the first argument, and the message is the second. + - The argument order of `call` has changed. + - The `StartResult` alias has been removed. + - The `ErlangStartResult` alias has been removed. + ## Unreleased - Fixed a bug where using `actor.Stop()` with other reasons than `Normal` diff --git a/gleam.toml b/gleam.toml index 7736fce..2d88aa3 100644 --- a/gleam.toml +++ b/gleam.toml @@ -1,5 +1,5 @@ name = "gleam_otp" -version = "0.16.1" +version = "1.0.0-rc1" licences = ["Apache-2.0"] description = "Fault tolerant multicore Gleam programs with OTP" @@ -12,8 +12,8 @@ links = [ ] [dependencies] -gleam_stdlib = ">= 0.32.0 and < 1.0.0" -gleam_erlang = ">= 0.22.0 and < 1.0.0" +gleam_stdlib = ">= 0.59.0 and < 2.0.0" +gleam_erlang = ">= 1.0.0-rc1 and < 2.0.0" [dev-dependencies] gleeunit = ">= 1.0.0 and < 2.0.0" diff --git a/manifest.toml b/manifest.toml index 2afe8a3..bb464f8 100644 --- a/manifest.toml +++ b/manifest.toml @@ -2,14 +2,14 @@ # You typically do not need to edit this file packages = [ - { name = "gleam_erlang", version = "0.33.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "A1D26B80F01901B59AABEE3475DD4C18D27D58FA5C897D922FCB9B099749C064" }, - { name = "gleam_stdlib", version = "0.45.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "206FCE1A76974AECFC55AEBCD0217D59EDE4E408C016E2CFCCC8FF51278F186E" }, - { name = "gleeunit", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "F7A7228925D3EE7D0813C922E062BFD6D7E9310F0BEE585D3A42F3307E3CFD13" }, + { name = "gleam_erlang", version = "1.0.0-rc1", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "6E0CF4E1F66E2C9226B7554589544F00F12CE14858440EB1BF7EFDACDE1BBC64" }, + { name = "gleam_stdlib", version = "0.59.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "F8FEE9B35797301994B81AF75508CF87C328FE1585558B0FFD188DC2B32EAA95" }, + { name = "gleeunit", version = "1.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "0E6C83834BA65EDCAAF4FE4FB94AC697D9262D83E6F58A750D63C9F6C8A9D9FF" }, { name = "logging", version = "1.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "logging", source = "hex", outer_checksum = "1098FBF10B54B44C2C7FDF0B01C1253CAFACDACABEFB4B0D027803246753E06D" }, ] [requirements] -gleam_erlang = { version = ">= 0.22.0 and < 1.0.0" } -gleam_stdlib = { version = ">= 0.32.0 and < 1.0.0" } +gleam_erlang = { version = ">= 1.0.0-rc1 and < 2.0.0" } +gleam_stdlib = { version = ">= 0.59.0 and < 2.0.0" } gleeunit = { version = ">= 1.0.0 and < 2.0.0" } logging = { version = ">= 1.3.0 and < 2.0.0" } diff --git a/src/gleam/otp/actor.gleam b/src/gleam/otp/actor.gleam index 858a097..6065167 100644 --- a/src/gleam/otp/actor.gleam +++ b/src/gleam/otp/actor.gleam @@ -1,6 +1,6 @@ //// This module provides the _Actor_ abstraction, one of the most common //// building blocks of Gleam OTP programs. -//// +//// //// An Actor is a process like any other BEAM process and can be used to hold //// state, execute code, and communicate with other processes by sending and //// receiving messages. The advantage of using the actor abstraction over a bare @@ -10,7 +10,8 @@ //// //// Gleam's Actor is similar to Erlang's `gen_server` and Elixir's `GenServer` //// but differs in that it offers a fully typed interface. This different API is -//// why Gleam uses the name Actor rather than some variation of generic-server. +//// why Gleam uses the name "Actor" rather than some variation of +//// "generic-server". //// //// [erlang-sys]: https://www.erlang.org/doc/man/sys.html //// @@ -26,17 +27,19 @@ //// // `handle_message` callback function (defined below). //// // We assert that it starts successfully. //// // -//// // In real-world Gleam OTP programs we would likely write wrapper functions +//// // In real-world Gleam OTP programs we would likely write a wrapper functions //// // called `start`, `push` `pop`, `shutdown` to start and interact with the //// // Actor. We are not doing that here for the sake of showing how the Actor //// // API works. -//// let assert Ok(my_actor) = actor.start([], handle_message) -//// +//// let assert Ok(actor) = +//// actor.new([]) |> actor.on_message(handle_message) |> actor.start +//// let subject = actor.data +//// //// // We can send a message to the actor to push elements onto the stack. -//// process.send(my_actor, Push("Joe")) -//// process.send(my_actor, Push("Mike")) -//// process.send(my_actor, Push("Robert")) -//// +//// process.send(subject, Push("Joe")) +//// process.send(subject, Push("Mike")) +//// process.send(subject, Push("Robert")) +//// //// // The `Push` message expects no response, these messages are sent purely for //// // the side effect of mutating the state held by the actor. //// // @@ -47,32 +50,32 @@ //// // In this instance we are giving the actor 10 milliseconds to reply, if the //// // `call` function doesn't get a reply within this time it will panic and //// // crash the client process. -//// let assert Ok("Robert") = process.call(my_actor, Pop, 10) -//// let assert Ok("Mike") = process.call(my_actor, Pop, 10) -//// let assert Ok("Joe") = process.call(my_actor, Pop, 10) -//// +//// let assert Ok("Robert") = process.call(subject, 10, Pop) +//// let assert Ok("Mike") = process.call(subject, 10, Pop) +//// let assert Ok("Joe") = process.call(subject, 10, Pop) +//// //// // The stack is now empty, so if we pop again the actor replies with an error. -//// let assert Error(Nil) = process.call(my_actor, Pop, 10) -//// +//// let assert Error(Nil) = process.call(subject, 10, Pop) +//// //// // Lastly, we can send a message to the actor asking it to shut down. -//// process.send(my_actor, Shutdown) +//// process.send(subject, Shutdown) //// } //// ``` //// //// Here is the code that is used to implement this actor: //// //// ```gleam -//// import gleam/erlang/process.{type Subject} -//// import gleam/otp/actor -//// //// // First step of implementing the stack Actor is to define the message type that //// // it can receive. //// // -//// // The type of the elements in the stack is not fixed so a type parameter is used -//// // for it instead of a concrete type such as `String` or `Int`. +//// // The type of the elements in the stack is not fixed so a type parameter +//// // is used for it instead of a concrete type such as `String` or `Int`. //// pub type Message(element) { //// // The `Shutdown` message is used to tell the actor to stop. //// // It is the simplest message type, it contains no data. +//// // +//// // Most the time we don't define an API to shut down an actor, but in this +//// // example we do to show how it can be done. //// Shutdown //// //// // The `Push` message is used to add a new element to the stack. @@ -88,20 +91,22 @@ //// //// // The last part is to implement the `handle_message` callback function. //// // -//// // This function is called by the Actor for each message it receives. -//// // Actor is single threaded and only does one thing at a time, so it handles +//// // This function is called by the Actor each for each message it receives. +//// // Actors are single threaded only does one thing at a time, so they handle //// // messages sequentially and one at a time, in the order they are received. //// // //// // The function takes the message and the current state, and returns a data //// // structure that indicates what to do next, along with the new state. //// fn handle_message( -//// message: Message(e), -//// stack: List(e), -//// ) -> actor.Next(Message(e), List(e)) { +//// stack: List(e), +//// message: Message(e), +//// ) -> actor.Next(List(e), Message(e)) { //// case message { -//// // For the `Shutdown` message we return the `actor.Stop` value, which causes +//// // For the `Shutdown` message we return the `actor.stop` value, which causes //// // the actor to discard any remaining messages and stop. -//// Shutdown -> actor.Stop(process.Normal) +//// // We may chose to do some clean-up work here, but this actor doesn't need +//// // to do this. +//// Shutdown -> actor.stop() //// //// // For the `Push` message we add the new element to the stack and return //// // `actor.continue` with this new stack, causing the actor to process any @@ -113,7 +118,7 @@ //// //// // For the `Pop` message we attempt to remove an element from the stack, //// // sending it or an error back to the caller, before continuing. -//// Pop(client) -> +//// Pop(client) -> { //// case stack { //// [] -> { //// // When the stack is empty we can't pop an element, so we send an @@ -129,12 +134,11 @@ //// actor.continue(rest) //// } //// } +//// } //// } //// } //// ``` -// - import gleam/dynamic.{type Dynamic} import gleam/erlang/atom import gleam/erlang/charlist.{type Charlist} @@ -146,6 +150,7 @@ import gleam/otp/system.{ type DebugState, type Mode, type StatusInfo, type SystemMessage, GetState, GetStatus, Resume, Running, StatusInfo, Suspend, Suspended, } +import gleam/result import gleam/string type Message(message) { @@ -161,7 +166,7 @@ type Message(message) { /// The type used to indicate what to do after handling a message. /// -pub type Next(message, state) { +pub opaque type Next(state, message) { /// Continue handling messages. /// /// An optional selector can be provided to changes the messages that the @@ -175,8 +180,18 @@ pub type Next(message, state) { Stop(ExitReason) } -pub fn continue(state: state) -> Next(message, state) { - Continue(state, None) +/// Indicate the actor should continue, processing any waiting or future messages. +/// +pub fn continue(state: state) -> Next(state, message) { + Continue(state:, selector: None) +} + +/// Indicate the actor should stop and shut-down, handling no futher messages. +/// +/// The reason for exiting is `Normal`. +/// +pub fn stop() -> Next(state, message) { + Stop(process.Normal) } /// Provide a selector to change the messages that the actor is handling @@ -184,30 +199,15 @@ pub fn continue(state: state) -> Next(message, state) { /// in the actor's `init` callback, or in any previous `Next` value. /// pub fn with_selector( - value: Next(message, state), + value: Next(state, message), selector: Selector(message), -) -> Next(message, state) { +) -> Next(state, message) { case value { Continue(state, _) -> Continue(state, Some(selector)) Stop(_) -> value } } -/// The type used to indicate whether an actor has started successfully or not. -/// -pub type InitResult(state, message) { - /// The actor has successfully initialised. The actor can start handling - /// messages and actor's channel sender can be returned to the parent - /// process. - /// - Ready(state: state, selector: Selector(message)) - - /// The actor has failed to initialise. The actor shuts down and an error is - /// returned to the parent process. - /// - Failed(String) -} - type Self(state, msg) { Self( /// The mode the actor is currently in, either active or suspended. @@ -216,26 +216,74 @@ type Self(state, msg) { parent: Pid, /// The state of this actor, provided by the programmer. state: state, - /// The subject that was created by this actor during initialisation. - subject: Subject(msg), /// The selector that actor is currently using to reveive messages. This /// can be changed by the `Next` value returned by the actor's `loop` callback. selector: Selector(Message(msg)), /// An opaque value used by the OTP system debug APIs. debug_state: DebugState, /// The message handling code provided by the programmer. - message_handler: fn(msg, state) -> Next(msg, state), + message_handler: fn(state, msg) -> Next(state, msg), + ) +} + +/// A value returned to the parent when their child actor successfully starts. +pub type Started(data) { + Started( + /// The process identifier of the started actor. This can be used to + /// monitor the actor, make it exit, or anything else you can do with a + /// pid. + pid: Pid, + /// Data returned by the actor after it initialised. Commonly this will be + /// a subject that it will receive messages from. + data: data, ) } -/// This data structure holds all the values required by the `start_spec` -/// function in order to create an actor. +/// A convenience for the type returned when an actor process is started. +pub type StartResult(data) = + Result(Started(data), StartError) + +/// A type returned from an actor's initialiser, containing the actor state, a +/// selector to receive messages using, and data to return to the parent. /// -/// If you do not need to configure the initialisation behaviour of your actor -/// consider using the `start` function. +/// Use the `initialised`, `selecting`, and `returning` functions to construct +/// this type. +/// +pub opaque type Initialised(state, message, data) { + Initialised(state: state, selector: Option(Selector(message)), return: data) +} + +/// Takes the post-initialisation state of the actor. This state will be passed +/// to the `on_message` callback each time a message is received. +/// +pub fn initialised(state: state) -> Initialised(state, message, Nil) { + Initialised(state, None, Nil) +} + +/// Add a selector for the actor to receive messages with. /// -pub type Spec(state, msg) { - Spec( +/// If a message is received by the actor but not selected for with the +/// selector then the actor will discard it and log a warning. +/// +pub fn selecting( + initialised: Initialised(state, old_message, return), + selector: Selector(message), +) -> Initialised(state, message, return) { + Initialised(..initialised, selector: Some(selector)) +} + +/// Add the data to return to the parent process. This might be a subject that +/// the actor will receive messages over. +/// +pub fn returning( + initialised: Initialised(state, message, old_return), + return: return, +) -> Initialised(state, message, return) { + Initialised(..initialised, return:) +} + +pub opaque type Builder(state, message, return) { + Builder( /// The initialisation functionality for the actor. This function is called /// just after the actor starts but before the channel sender is returned /// to the parent. @@ -244,18 +292,104 @@ pub type Spec(state, msg) { /// correct. If this function returns an error it means that the actor has /// failed to start and an error is returned to the parent. /// - init: fn() -> InitResult(state, msg), + initialise: fn(Subject(message)) -> + Result(Initialised(state, message, return), String), /// How many milliseconds the `init` function has to return before it is /// considered to have taken too long and failed. /// - init_timeout: Int, + initialisation_timeout: Int, /// This function is called to handle each message that the actor receives. /// - loop: fn(msg, state) -> Next(msg, state), + on_message: fn(state, message) -> Next(state, message), + /// The actor can be named for you at start. + /// + name: Option(process.Name(message)), ) } -// TODO: Check needed functionality here to be OTP compatible +/// Create a builder for an actor without a custom initialiser. The actor +/// returns a subject to the parent that can be used to send messages to the +/// actor. +/// +/// If the actor has been given a name with the `named` function then the +/// subject is a named subject. +/// +/// If you wish to create an actor with some other initialisation logic that +/// runs before it starts handling messages, see `new_with_initialiser`. +/// +pub fn new(state: state) -> Builder(state, message, Subject(message)) { + let initialise = fn(subject) { + initialised(state) |> returning(subject) |> Ok + } + Builder( + initialise:, + initialisation_timeout: 1000, + on_message: fn(state, _) { continue(state) }, + name: option.None, + ) +} + +/// Create a builder for an actor with a custom initialiser that runs before +/// the start function returns to the parent, and before the actor starts +/// handling messages. +/// +/// The first argument is a number of milliseconds that the initialiser +/// function is expected to return within. If it takes longer the initialiser +/// is considered to have failed and the actor will be killed, and an error +/// will be returned to the parent. +/// +/// The actor's default subject is passed to the initialiser function. You can +/// chose to return it to the parent with `returning`, use it in some other +/// way, or ignore it completely. +/// +/// If a custom selector is given using the `selecting` function then this +/// overwrites the default selector, which selects for the default subject, so +/// you will need to add the subject to the custom selector yourself. +/// +pub fn new_with_initialiser( + timeout: Int, + initialise: fn(Subject(message)) -> + Result(Initialised(state, message, return), String), +) -> Builder(state, message, return) { + Builder( + initialise:, + initialisation_timeout: timeout, + on_message: fn(state, _) { continue(state) }, + name: option.None, + ) +} + +/// Set the message handler for the actor. This callback function will be +/// called each time the actor receives a message. +/// +/// Actors handle messages sequentially, later messages being handled after the +/// previous one has been handled. +pub fn on_message( + builder: Builder(state, message, return), + handler: fn(state, message) -> Next(state, message), +) -> Builder(state, message, return) { + Builder(..builder, on_message: handler) +} + +/// Provide a name for the actor to be registered with when started, enabling +/// it to receive messages via a named subject. This is useful for making +/// processes that can take over from an older one that has exited due to a +/// failure, or to avoid passing subjects from receiver processes to sender +/// processes. +/// +/// If the name is already registered to another process then the actor will +/// fail to start. +/// +/// When this function is used the actor's default subject will be a named +/// subject using this name. +/// +pub fn named( + builder: Builder(state, message, return), + name: process.Name(message), +) -> Builder(state, message, return) { + Builder(..builder, name: option.Some(name)) +} + fn exit_process(reason: ExitReason) -> ExitReason { case reason { Abnormal(reason) -> process.send_abnormal_exit(process.self(), reason) @@ -271,7 +405,7 @@ fn receive_message(self: Self(state, msg)) -> Message(msg) { // When suspended we only respond to system messages Suspended -> process.new_selector() - |> selecting_system_messages + |> select_system_messages // When running we respond to all messages Running -> @@ -289,30 +423,27 @@ fn receive_message(self: Self(state, msg)) -> Message(msg) { // We add the handler for unexpected messages first so that the user // supplied selector can override it if desired. process.new_selector() - |> process.selecting_anything(Unexpected) + |> process.select_other(Unexpected) |> process.merge_selector(self.selector) - |> selecting_system_messages + |> select_system_messages } - process.select_forever(selector) + process.selector_receive_forever(selector) } -fn selecting_system_messages( +fn select_system_messages( selector: Selector(Message(msg)), ) -> Selector(Message(msg)) { selector - |> process.selecting_record3( - atom.create_from_string("system"), - convert_system_message, - ) + |> process.select_record(atom.create("system"), 2, convert_system_message) } @external(erlang, "gleam_otp_external", "convert_system_message") -fn convert_system_message(a: Dynamic, b: Dynamic) -> Message(msg) +fn convert_system_message(b: Dynamic) -> Message(msg) fn process_status_info(self: Self(state, msg)) -> StatusInfo { StatusInfo( - module: atom.create_from_string("gleam@otp@actor"), + module: atom.create("gleam@otp@actor"), parent: self.parent, mode: self.mode, debug_state: self.debug_state, @@ -357,14 +488,14 @@ fn loop(self: Self(state, msg)) -> ExitReason { // A regular message that the programmer is expecting, either over the // subject or some other messsage that the programmer's selector expects. Message(msg) -> - case self.message_handler(msg, self.state) { + case self.message_handler(self.state, msg) { Stop(reason) -> exit_process(reason) Continue(state: state, selector: new_selector) -> { - let selector = - new_selector - |> option.map(init_selector(self.subject, _)) - |> option.unwrap(self.selector) + let selector = case new_selector { + None -> self.selector + Some(s) -> process.map_selector(s, Message) + } loop(Self(..self, state: state, selector: selector)) } } @@ -377,32 +508,42 @@ fn log_warning(a: Charlist, b: List(Charlist)) -> Nil // Run automatically when the actor is first started. fn initialise_actor( - spec: Spec(state, msg), - ack: Subject(Result(Subject(msg), ExitReason)), + builder: Builder(state, msg, return), + parent: Pid, + ack: Subject(Result(return, String)), ) -> ExitReason { - // This is the main subject for the actor, the one that the actor.start - // functions return. - // Once the actor has been initialised this will be sent to the parent for - // the function to return. - let subject = process.new_subject() - - // Run the programmer supplied initialisation code. - let result = spec.init() + // Run the actor initialiser. + let result = { + use subject <- result.try(case builder.name { + None -> Ok(process.new_subject()) + Some(name) -> { + use _ <- result.try(try_register_self(name)) + Ok(process.named_subject(name)) + } + }) + use result <- result.try(builder.initialise(subject)) + Ok(#(subject, result)) + } case result { // Init was OK, send the subject to the parent and start handling messages. - Ready(state, selector) -> { - let selector = init_selector(subject, selector) + Ok(#(subject, Initialised(state:, selector:, return:))) -> { + // Add the default subject to the selector provided by the initialiser. + // The initialiser may have added additional handlers to the selector. + let selector = case selector { + Some(selector) -> selector + None -> process.new_selector() |> process.select(subject) + } + let selector = process.map_selector(selector, Message) // Signal to parent that the process has initialised successfully - process.send(ack, Ok(subject)) + process.send(ack, Ok(return)) // Start message receive loop let self = Self( state: state, - parent: process.subject_owner(ack), - subject: subject, + parent:, selector: selector, - message_handler: spec.loop, + message_handler: builder.on_message, debug_state: system.debug_state([]), mode: Running, ) @@ -410,57 +551,31 @@ fn initialise_actor( } // The init failed. Send the reason back to the parent, but exit normally. - Failed(reason) -> { - process.send(ack, Error(Abnormal(reason))) + Error(reason) -> { + process.send(ack, Error(reason)) exit_process(process.Normal) } } } -fn init_selector(subject, selector) { - process.new_selector() - |> process.selecting(subject, Message) - |> process.merge_selector(process.map_selector(selector, Message)) +fn try_register_self(name: process.Name(msg)) -> Result(Nil, String) { + case process.register(process.self(), name) { + Ok(Nil) -> Ok(Nil) + Error(_) -> Error("name already registered") + } } pub type StartError { InitTimeout - InitFailed(ExitReason) - InitCrashed(Dynamic) -} - -/// The result of starting a Gleam actor. -/// -/// This type is compatible with Gleam supervisors. If you wish to convert it -/// to a type compatible with Erlang supervisors see the `ErlangStartResult` -/// type and `erlang_start_result` function. -/// -pub type StartResult(msg) = - Result(Subject(msg), StartError) - -/// An Erlang supervisor compatible process start result. -/// -pub type ErlangStartResult = - Result(Pid, Dynamic) - -/// Convert a Gleam actor start result into an Erlang supervisor-compatible -/// process start result. -/// -pub fn to_erlang_start_result(res: StartResult(msg)) -> ErlangStartResult { - case res { - Ok(x) -> Ok(process.subject_owner(x)) - Error(x) -> Error(dynamic.from(x)) - } + InitFailed(String) + InitExited(ExitReason) } -type StartInitMessage(msg) { - Ack(Result(Subject(msg), ExitReason)) - Mon(process.ProcessDown) +type StartInitMessage(data) { + Ack(Result(data, String)) + Mon(process.Down) } -// TODO: test init_timeout. Currently if we test it eunit prints an error from -// the process death. How do we avoid this? -// /// Start an actor from a given specification. If the actor's `init` function /// returns an error or does not return within `init_timeout` then an error is /// returned. @@ -468,29 +583,31 @@ type StartInitMessage(msg) { /// If you do not need to specify the initialisation behaviour of your actor /// consider using the `start` function. /// -pub fn start_spec(spec: Spec(state, msg)) -> Result(Subject(msg), StartError) { +pub fn start( + builder: Builder(state, msg, return), +) -> Result(Started(return), StartError) { + let timeout = builder.initialisation_timeout let ack_subject = process.new_subject() + let self = process.self() let child = - process.start(linked: True, running: fn() { - initialise_actor(spec, ack_subject) - }) + process.spawn(fn() { initialise_actor(builder, self, ack_subject) }) - let monitor = process.monitor_process(child) + let monitor = process.monitor(child) let selector = process.new_selector() - |> process.selecting(ack_subject, Ack) - |> process.selecting_process_down(monitor, Mon) + |> process.select_map(ack_subject, Ack) + |> process.select_specific_monitor(monitor, Mon) - let result = case process.select(selector, spec.init_timeout) { + let result = case process.selector_receive(selector, timeout) { // Child started OK - Ok(Ack(Ok(channel))) -> Ok(channel) + Ok(Ack(Ok(subject))) -> Ok(subject) // Child initialiser returned an error Ok(Ack(Error(reason))) -> Error(InitFailed(reason)) // Child went down while initialising - Ok(Mon(down)) -> Error(InitCrashed(down.reason)) + Ok(Mon(down)) -> Error(InitExited(down.reason)) // Child did not finish initialising in time Error(Nil) -> { @@ -506,28 +623,10 @@ pub fn start_spec(spec: Spec(state, msg)) -> Result(Subject(msg), StartError) { // message arriving at the parent if the child dies later. process.demonitor_process(monitor) - result -} - -/// Start an actor with a given initial state and message handling loop -/// function. -/// -/// This function returns a `Result` but it will always be `Ok` so it is safe -/// to use with `assert` if you are not starting this actor as part of a -/// supervision tree. -/// -/// If you wish to configure the initialisation behaviour of a new actor see -/// the `Spec` record and the `start_spec` function. -/// -pub fn start( - state: state, - loop: fn(msg, state) -> Next(msg, state), -) -> Result(Subject(msg), StartError) { - start_spec(Spec( - init: fn() { Ready(state, process.new_selector()) }, - loop: loop, - init_timeout: 5000, - )) + case result { + Ok(data) -> Ok(Started(pid: child, data:)) + Error(error) -> Error(error) + } } /// Send a message over a given channel. @@ -549,8 +648,8 @@ pub fn send(subject: Subject(msg), msg: msg) -> Nil { /// pub fn call( subject: Subject(message), - make_message: fn(Subject(reply)) -> message, timeout: Int, + make_message: fn(Subject(reply)) -> message, ) -> reply { - process.call(subject, make_message, timeout) + process.call(subject, timeout, make_message) } diff --git a/src/gleam/otp/intensity_tracker.gleam b/src/gleam/otp/intensity_tracker.gleam deleted file mode 100644 index 7d9c4f2..0000000 --- a/src/gleam/otp/intensity_tracker.gleam +++ /dev/null @@ -1,45 +0,0 @@ -//// The intensity tracker is used to monitor how frequently an event happens, -//// erroring if it happens too many times within a period of time. - -import gleam/list - -pub opaque type IntensityTracker { - IntensityTracker(limit: Int, period: Int, events: List(Int)) -} - -pub type TooIntense { - TooIntense -} - -pub fn new(limit limit: Int, period period: Int) -> IntensityTracker { - IntensityTracker(limit: limit, period: period, events: []) -} - -@external(erlang, "erlang", "monotonic_time") -fn monotonic_time(a: Int) -> Int - -fn now_seconds() -> Int { - monotonic_time(1) -} - -pub fn trim_window(events: List(Int), now: Int, period: Int) -> List(Int) { - case events { - [] -> [] - [event, ..events] -> - case now < event + period { - True -> [event, ..trim_window(events, now, period)] - False -> [] - } - } -} - -pub fn add_event( - tracker: IntensityTracker, -) -> Result(IntensityTracker, TooIntense) { - let now = now_seconds() - let events = trim_window([now, ..tracker.events], now, tracker.period) - case list.length(events) > tracker.limit { - True -> Error(TooIntense) - False -> Ok(IntensityTracker(..tracker, events: events)) - } -} diff --git a/src/gleam/otp/static_supervisor.gleam b/src/gleam/otp/static_supervisor.gleam index 27d4fe1..e98f689 100644 --- a/src/gleam/otp/static_supervisor.gleam +++ b/src/gleam/otp/static_supervisor.gleam @@ -6,25 +6,39 @@ //// # Example //// //// ```gleam -//// import gleam/erlang/process.{type Pid} -//// import gleam/otp/static_supervisor as sup +//// import gleam/erlang/actor +//// import gleam/otp/static_supervisor.{type Supervisor} as supervisor +//// import app/database_pool +//// import app/http_server //// -//// pub fn start_supervisor() { -//// sup.new(sup.OneForOne) -//// |> sup.add(sup.worker_child("db", start_database_connection)) -//// |> sup.add(sup.worker_child("workers", start_workers)) -//// |> sup.add(sup.worker_child("web", start_http_server)) -//// |> sup.start_link +//// pub fn start_supervisor() -> { +//// supervisor.new(supervisor.OneForOne) +//// |> supervisor.add(database_pool.supervised()) +//// |> supervisor.add(http_server.supervised()) +//// |> supervisor.start //// } //// ``` -import gleam/dict.{type Dict} import gleam/dynamic.{type Dynamic} import gleam/erlang/atom.{type Atom} import gleam/erlang/process.{type Pid} import gleam/list -import gleam/result +import gleam/otp/actor +import gleam/otp/supervision.{type ChildSpecification} +/// A reference to the running supervisor. In future this could be used to send +/// commands to the supervisor to perform certain actions, but today no such +/// APIs have been exposed. +/// +/// This supervisor wrap Erlang/OTP's `supervisor` module, and as such it does +/// not use subjects for message sending. If it was implemented in Gleam a +/// subject might be used instead of this type. +/// +pub opaque type Supervisor { + Supervisor(pid: Pid) +} + +/// How the supervisor should react when one of its children terminates. pub type Strategy { /// If one child process terminates and is to be restarted, only that child /// process is affected. This is the default restart strategy. @@ -61,16 +75,37 @@ pub type AutoShutdown { AllSignificant } +/// A builder for configuring and starting a supervisor. See each of the +/// functions that take this type for details of the configuration possible. +/// +/// # Example +/// +/// ```gleam +/// import gleam/erlang/actor +/// import gleam/otp/static_supervisor.{type Supervisor} as supervisor +/// import app/database_pool +/// import app/http_server +/// +/// pub fn start_supervisor() -> { +/// supervisor.new(supervisor.OneForOne) +/// |> supervisor.add(database_pool.supervised()) +/// |> supervisor.add(http_server.supervised()) +/// |> supervisor.start +/// } +/// ``` +/// pub opaque type Builder { Builder( strategy: Strategy, intensity: Int, period: Int, auto_shutdown: AutoShutdown, - children: List(ChildBuilder), + children: List(ChildSpecification(Nil)), ) } +/// Create a new supervisor builder, ready for further configuration. +/// pub fn new(strategy strategy: Strategy) -> Builder { Builder( strategy: strategy, @@ -101,202 +136,128 @@ pub fn restart_tolerance( /// A supervisor can be configured to automatically shut itself down with /// exit reason shutdown when significant children terminate. +/// pub fn auto_shutdown(builder: Builder, value: AutoShutdown) -> Builder { Builder(..builder, auto_shutdown: value) } -/// Restart defines when a terminated child process must be restarted. -pub type Restart { - /// A permanent child process is always restarted. - Permanent - /// A transient child process is restarted only if it terminates abnormally, - /// that is, with another exit reason than `normal`, `shutdown`, or - /// `{shutdown,Term}`. - Transient - /// A temporary child process is never restarted (even when the supervisor's - /// restart strategy is `RestForOne` or `OneForAll` and a sibling's death - /// causes the temporary process to be terminated). - Temporary -} - -pub type ChildType { - Worker( - /// The number of milliseconds the child is given to shut down. The - /// supervisor tells the child process to terminate by calling - /// `exit(Child,shutdown)` and then wait for an exit signal with reason - /// shutdown back from the child process. If no exit signal is received - /// within the specified number of milliseconds, the child process is - /// unconditionally terminated using `exit(Child,kill)`. - shutdown_ms: Int, - ) - Supervisor -} - -pub opaque type ChildBuilder { - ChildBuilder( - /// id is used to identify the child specification internally by the - /// supervisor. - /// - /// Notice that this identifier on occations has been called "name". As far - /// as possible, the terms "identifier" or "id" are now used but to keep - /// backward compatibility, some occurences of "name" can still be found, for - /// example in error messages. - id: String, - /// A function to call to start the child process. - starter: fn() -> Result(Pid, Dynamic), - /// When the child is to be restarted. See the `Restart` documentation for - /// more. - /// - /// You most likely want the `Permanent` variant. - restart: Restart, - /// This defines if a child is considered significant for automatic - /// self-shutdown of the supervisor. - /// - /// You most likely do not want to consider any children significant. - /// - /// This will be ignored if the supervisor auto shutdown is set to `Never`, - /// which is the default. - significant: Bool, - /// Whether the child is a supervisor or not. - child_type: ChildType, - ) -} - -pub fn start_link(builder: Builder) -> Result(Pid, Dynamic) { - let flags = - dict.new() - |> property("strategy", builder.strategy) - |> property("intensity", builder.intensity) - |> property("period", builder.period) - |> property("auto_shutdown", builder.auto_shutdown) - - let children = builder.children |> list.reverse |> list.map(convert_child) - - erlang_start_link(#(flags, children)) -} - -@external(erlang, "gleam_otp_external", "static_supervisor_start_link") -fn erlang_start_link( - args: #(Dict(Atom, Dynamic), List(Dict(Atom, Dynamic))), -) -> Result(Pid, Dynamic) - -/// Add a child to the supervisor. -pub fn add(builder: Builder, child: ChildBuilder) -> Builder { - Builder(..builder, children: [child, ..builder.children]) -} - -/// A regular child that is not also a supervisor. +/// Start a new supervisor process with the configuration and children +/// specified within the builder. /// -/// id is used to identify the child specification internally by the -/// supervisor. -/// Notice that this identifier on occations has been called "name". As far -/// as possible, the terms "identifier" or "id" are now used but to keep -/// backward compatibility, some occurences of "name" can still be found, for -/// example in error messages. +/// Typically you would use the `supervised` function to add your supervisor to +/// a supervision tree instead of using this function directly. /// -pub fn worker_child( - id id: String, - run starter: fn() -> Result(Pid, whatever), -) -> ChildBuilder { - ChildBuilder( - id: id, - starter: fn() { starter() |> result.map_error(dynamic.from) }, - restart: Permanent, - significant: False, - child_type: Worker(5000), - ) -} - -/// A special child that is a supervisor itself. +/// The supervisor will be linked to the parent process that calls this +/// function. /// -/// id is used to identify the child specification internally by the -/// supervisor. -/// Notice that this identifier on occations has been called "name". As far -/// as possible, the terms "identifier" or "id" are now used but to keep -/// backward compatibility, some occurences of "name" can still be found, for -/// example in error messages. +/// If any child fails to start the supevisor first terminates all already +/// started child processes with reason shutdown and then terminate itself and +/// returns an error. /// -pub fn supervisor_child( - id id: String, - run starter: fn() -> Result(Pid, whatever), -) -> ChildBuilder { - ChildBuilder( - id: id, - starter: fn() { starter() |> result.map_error(dynamic.from) }, - restart: Permanent, - significant: False, - child_type: Supervisor, - ) -} +pub fn start( + builder: Builder, +) -> Result(actor.Started(Supervisor), actor.StartError) { + let flags = + make_erlang_start_flags([ + Strategy(builder.strategy), + Intensity(builder.intensity), + Period(builder.period), + AutoShutdown(builder.auto_shutdown), + ]) -/// This defines if a child is considered significant for automatic -/// self-shutdown of the supervisor. -/// -/// You most likely do not want to consider any children significant. -/// -/// This will be ignored if the supervisor auto shutdown is set to `Never`, -/// which is the default. -/// -/// The default value for significance is `False`. -pub fn significant(child: ChildBuilder, significant: Bool) -> ChildBuilder { - ChildBuilder(..child, significant: significant) + let module = atom.create("gleam@otp@static_supervisor") + let children = + builder.children |> list.reverse |> list.index_map(convert_child) + case erlang_start_link(module, #(flags, children)) { + Ok(pid) -> Ok(actor.Started(pid:, data: Supervisor(pid))) + Error(error) -> Error(convert_erlang_start_error(error)) + } } -/// This defines the amount of milliseconds a child has to shut down before -/// being brutal killed by the supervisor. +/// Create a `ChildSpecification` that adds this supervisor as the child of +/// another, making it fault tolerant and part of the application's supervision +/// tree. You should prefer to starting unsupervised supervisors with the +/// `start` function. /// -/// If not set the default for a child is 5000ms. +/// If any child fails to start the supevisor first terminates all already +/// started child processes with reason shutdown and then terminate itself and +/// returns an error. /// -/// This will be ignored if the child is a supervisor itself. -/// -pub fn timeout(child: ChildBuilder, ms ms: Int) -> ChildBuilder { - case child.child_type { - Worker(_) -> ChildBuilder(..child, child_type: Worker(ms)) - _ -> child - } +pub fn supervised(builder: Builder) -> ChildSpecification(Supervisor) { + supervision.supervisor(fn() { start(builder) }) } -/// When the child is to be restarted. See the `Restart` documentation for -/// more. -/// -/// The default value for restart is `Permanent`. -pub fn restart(child: ChildBuilder, restart: Restart) -> ChildBuilder { - ChildBuilder(..child, restart: restart) +@external(erlang, "gleam_otp_external", "convert_erlang_start_error") +fn convert_erlang_start_error(dynamic: Dynamic) -> actor.StartError + +@external(erlang, "supervisor", "start_link") +fn erlang_start_link( + module: Atom, + args: #(ErlangStartFlags, List(ErlangChildSpec)), +) -> Result(Pid, Dynamic) + +/// Add a child to the supervisor. +pub fn add(builder: Builder, child: ChildSpecification(data)) -> Builder { + Builder(..builder, children: [ + supervision.map_data(child, fn(_) { Nil }), + ..builder.children + ]) } -fn convert_child(child: ChildBuilder) -> Dict(Atom, Dynamic) { +fn convert_child(child: ChildSpecification(data), id: Int) -> ErlangChildSpec { let mfa = #( - atom.create_from_string("erlang"), - atom.create_from_string("apply"), - [dynamic.from(child.starter), dynamic.from([])], + atom.create("gleam@otp@static_supervisor"), + atom.create("start_child_callback"), + [dynamic.from(child.start)], ) let #(type_, shutdown) = case child.child_type { - Supervisor -> #( - atom.create_from_string("supervisor"), - dynamic.from(atom.create_from_string("infinity")), + supervision.Supervisor -> #( + atom.create("supervisor"), + dynamic.from(atom.create("infinity")), ) - Worker(timeout) -> #( - atom.create_from_string("worker"), + supervision.Worker(timeout) -> #( + atom.create("worker"), dynamic.from(timeout), ) } - dict.new() - |> property("id", child.id) - |> property("start", mfa) - |> property("restart", child.restart) - |> property("significant", child.significant) - |> property("type", type_) - |> property("shutdown", shutdown) + make_erlang_child_spec([ + Id(id), + Start(mfa), + Restart(child.restart), + Significant(child.significant), + Type(type_), + Shutdown(shutdown), + ]) +} + +type ErlangStartFlags + +@external(erlang, "maps", "from_list") +fn make_erlang_start_flags(flags: List(ErlangStartFlag)) -> ErlangStartFlags + +type ErlangStartFlag { + Strategy(Strategy) + Intensity(Int) + Period(Int) + AutoShutdown(AutoShutdown) } -fn property( - dict: Dict(Atom, Dynamic), - key: String, - value: anything, -) -> Dict(Atom, Dynamic) { - dict.insert(dict, atom.create_from_string(key), dynamic.from(value)) +type ErlangChildSpec + +@external(erlang, "maps", "from_list") +fn make_erlang_child_spec( + properties: List(ErlangChildSpecProperty), +) -> ErlangChildSpec + +type ErlangChildSpecProperty { + Id(Int) + Start(#(Atom, Atom, List(Dynamic))) + Restart(supervision.Restart) + Significant(Bool) + Type(Atom) + Shutdown(Dynamic) } // Callback used by the Erlang supervisor module. @@ -304,3 +265,14 @@ fn property( pub fn init(start_data: Dynamic) -> Result(Dynamic, never) { Ok(start_data) } + +// Callback used by the Erlang supervisor module. +@internal +pub fn start_child_callback( + start: fn() -> Result(actor.Started(anything), actor.StartError), +) -> Result(Pid, actor.StartError) { + case start() { + Ok(started) -> Ok(started.pid) + Error(error) -> Error(error) + } +} diff --git a/src/gleam/otp/supervision.gleam b/src/gleam/otp/supervision.gleam new file mode 100644 index 0000000..86e4e04 --- /dev/null +++ b/src/gleam/otp/supervision.gleam @@ -0,0 +1,138 @@ +import gleam/otp/actor + +/// Restart defines when a terminated child process must be restarted. +pub type Restart { + /// A permanent child process is always restarted. + Permanent + /// A transient child process is restarted only if it terminates abnormally, + /// that is, with another exit reason than `normal`, `shutdown`, or + /// `{shutdown,Term}`. + Transient + /// A temporary child process is never restarted (even when the supervisor's + /// restart strategy is `RestForOne` or `OneForAll` and a sibling's death + /// causes the temporary process to be terminated). + Temporary +} + +pub type ChildType { + /// A worker child has to shut-down within a given amount of time. + Worker( + /// The number of milliseconds the child is given to shut down. The + /// supervisor tells the child process to terminate by calling + /// `exit(Child,shutdown)` and then wait for an exit signal with reason + /// shutdown back from the child process. If no exit signal is received + /// within the specified number of milliseconds, the child process is + /// unconditionally terminated using `exit(Child,kill)`. + shutdown_ms: Int, + ) + Supervisor +} + +/// A description a how to start a new child process under an OTP supervisor. +pub type ChildSpecification(data) { + ChildSpecification( + /// A function to call to start the child process. + start: fn() -> Result(actor.Started(data), actor.StartError), + /// When the child is to be restarted. See the `Restart` documentation for + /// more. + /// + /// You most likely want the `Permanent` variant. + restart: Restart, + /// This defines if a child is considered significant for automatic + /// self-shutdown of the supervisor. + /// + /// You most likely do not want to consider any children significant. + /// + /// This will be ignored if the supervisor auto shutdown is set to `Never`, + /// which is the default. + significant: Bool, + /// Whether the child is a supervisor or not. + child_type: ChildType, + ) +} + +/// A regular child process. +/// +/// You should use this unless your process is also a supervisor. +/// +pub fn worker( + run start: fn() -> Result(actor.Started(data), actor.StartError), +) -> ChildSpecification(data) { + ChildSpecification( + start:, + restart: Permanent, + significant: False, + child_type: Worker(5000), + ) +} + +/// A special child that is a supervisor itself. +/// +pub fn supervisor( + run start: fn() -> Result(actor.Started(data), actor.StartError), +) -> ChildSpecification(data) { + ChildSpecification( + start:, + restart: Permanent, + significant: False, + child_type: Supervisor, + ) +} + +/// This defines if a child is considered significant for automatic +/// self-shutdown of the supervisor. +/// +/// You most likely do not want to consider any children significant. +/// +/// This will be ignored if the supervisor auto shutdown is set to `Never`, +/// which is the default. +/// +/// The default value for significance is `False`. +pub fn significant( + child: ChildSpecification(data), + significant: Bool, +) -> ChildSpecification(data) { + ChildSpecification(..child, significant: significant) +} + +/// This defines the amount of milliseconds a child has to shut down before +/// being brutal killed by the supervisor. +/// +/// If not set the default for a child is 5000ms. +/// +/// This will be ignored if the child is a supervisor itself. +/// +pub fn timeout( + child: ChildSpecification(data), + ms ms: Int, +) -> ChildSpecification(data) { + case child.child_type { + Worker(_) -> ChildSpecification(..child, child_type: Worker(ms)) + _ -> child + } +} + +/// When the child is to be restarted. See the `Restart` documentation for +/// more. +/// +/// The default value for restart is `Permanent`. +pub fn restart( + child: ChildSpecification(data), + restart: Restart, +) -> ChildSpecification(data) { + ChildSpecification(..child, restart: restart) +} + +/// Transform the data of the started child process. +/// +pub fn map_data( + child: ChildSpecification(a), + transform: fn(a) -> b, +) -> ChildSpecification(b) { + ChildSpecification(..child, start: fn() { + case child.start() { + Ok(started) -> Ok(actor.Started(..started, data: transform(started.data))) + Error(e) -> Error(e) + } + }) +} diff --git a/src/gleam/otp/supervisor.gleam b/src/gleam/otp/supervisor.gleam deleted file mode 100644 index c293a15..0000000 --- a/src/gleam/otp/supervisor.gleam +++ /dev/null @@ -1,459 +0,0 @@ -//// A supervisor that can pass state from older children to younger ones. -//// -//// If you don't need this consider using the `gleam/otp/static_supervisor` -//// module instead. - -// TODO: specify amount of time permitted for shut-down -import gleam/erlang/node.{type Node} -import gleam/erlang/process.{type Pid, type Subject} -import gleam/option.{type Option, None, Some} -import gleam/otp/actor.{type StartError} -import gleam/otp/intensity_tracker.{type IntensityTracker} -import gleam/result -import gleam/string - -/// This data structure holds all the values required by the `start_spec` -/// function in order to create an supervisor. -/// -/// If you do not need to configure the behaviour of your supervisor consider -/// using the `start` function. -/// -pub type Spec(argument, return) { - Spec( - argument: argument, - max_frequency: Int, - frequency_period: Int, - init: fn(Children(argument)) -> Children(return), - ) -} - -/// This type represents the starting children of a supervisor within the -/// `init` function. -/// -pub opaque type Children(argument) { - Ready(Starter(argument)) - Failed(ChildStartError) -} - -/// This type contains all the information required to start a new child and -/// add it to the `Children`. -/// -/// This is typically created with the `worker` function. -/// -pub opaque type ChildSpec(msg, argument, returning) { - ChildSpec( - // TODO: merge this into one field - start: fn(argument) -> Result(Subject(msg), StartError), - returning: fn(argument, Subject(msg)) -> returning, - ) -} - -type ChildStartError { - ChildStartError(previous_pid: Option(Pid), error: StartError) -} - -pub opaque type Message { - Exit(process.ExitMessage) - RetryRestart(Pid) -} - -type Instruction { - StartAll - StartFrom(Pid) -} - -type State(a) { - State( - restarts: IntensityTracker, - starter: Starter(a), - retry_restarts: Subject(Pid), - ) -} - -type Starter(argument) { - Starter( - argument: argument, - exec: Option( - fn(Instruction) -> - Result(#(Starter(argument), Instruction), ChildStartError), - ), - ) -} - -type Child(argument) { - Child(pid: Pid, argument: argument) -} - -fn start_child( - child_spec: ChildSpec(msg, argument_in, argument_out), - argument: argument_in, -) -> Result(Child(argument_out), ChildStartError) { - use subject <- result.then( - child_spec.start(argument) - |> result.map_error(ChildStartError(None, _)), - ) - - Ok(Child( - pid: process.subject_owner(subject), - // Merge the new child's pid into the argument to produce the new argument - // used to start any remaining children. - argument: child_spec.returning(argument, subject), - )) -} - -// TODO: more sophsiticated stopping of processes. i.e. give supervisors -// more time to shut down. -fn shutdown_child(pid: Pid, _spec: ChildSpec(msg, arg_1, arg_2)) -> Nil { - process.send_exit(pid) -} - -fn perform_instruction_for_child( - argument: argument_in, - instruction: Instruction, - child_spec: ChildSpec(msg, argument_in, argument_out), - child: Child(argument_out), -) -> Result(#(Child(argument_out), Instruction), ChildStartError) { - let current = child.pid - case instruction { - // This child is older than the StartFrom target, we don't need to - // restart it - StartFrom(target) if target != current -> Ok(#(child, instruction)) - - // This pid either is the cause of the problem, or we have the StartAll - // instruction. Either way it and its younger siblings need to be restarted. - _ -> { - shutdown_child(current, child_spec) - use child <- result.then(start_child(child_spec, argument)) - Ok(#(child, StartAll)) - } - } -} - -fn add_child_to_starter( - starter: Starter(argument_in), - child_spec: ChildSpec(msg, argument_in, argument_out), - child: Child(argument_out), -) -> Starter(argument_out) { - let starter = fn( - instruction, - // Restart the older children. We use `try` to return early if the older - // children failed to start - ) { - use #(starter, instruction) <- result.then(case starter.exec { - Some(start) -> start(instruction) - None -> Ok(#(starter, instruction)) - }) - - // Perform the instruction, restarting the child as required - use #(child, instruction) <- result.then(perform_instruction_for_child( - starter.argument, - instruction, - child_spec, - child, - )) - - // Create a new starter for the next time the supervisor needs to restart - let starter = add_child_to_starter(starter, child_spec, child) - - Ok(#(starter, instruction)) - } - - Starter(exec: Some(starter), argument: child.argument) -} - -fn start_and_add_child( - state: Starter(argument_0), - child_spec: ChildSpec(msg, argument_0, argument_1), -) -> Children(argument_1) { - case start_child(child_spec, state.argument) { - Ok(child) -> Ready(add_child_to_starter(state, child_spec, child)) - Error(reason) -> Failed(reason) - } -} - -/// Add a child to the collection of children of the supervisor -/// -/// This function starts the child from the child spec. -/// -pub fn add( - children: Children(argument), - child_spec: ChildSpec(msg, argument, new_argument), -) -> Children(new_argument) { - case children { - // If one of the previous children has failed then we cannot continue - Failed(fail) -> Failed(fail) - - // If everything is OK so far then we can add the child - Ready(state) -> start_and_add_child(state, child_spec) - } -} - -// TODO: test -// TODO: unlimitd shut down duration -/// Prepare a new supervisor type child. -/// -/// If you wish to prepare a new non-supervisor type child see the `worker` -/// function. -/// -/// If you wish to change the type of the argument for later children see the -/// `returning` function. -/// -/// Note: Gleam supervisors do not yet support different shutdown periods per -/// child so this function is currently identical in behaviour to `worker`. It is -/// recommended to use this function for supervisor children nevertheless so the -/// correct shut down behaviour is used in later releases of this library. -/// -pub fn supervisor( - start: fn(argument) -> Result(Subject(msg), StartError), -) -> ChildSpec(msg, argument, argument) { - ChildSpec(start: start, returning: fn(argument, _channel) { argument }) -} - -/// Prepare a new worker type child. -/// -/// If you wish to prepare a new supervisor type child see the `supervisor` -/// function. -/// -/// If you wish to change the type of the argument for later children see the -/// `returning` function. -/// -/// ## Examples -/// -/// ```gleam -/// worker(fn(argument) { -/// my_actor.start(argument) -/// }) -/// ``` -/// -pub fn worker( - start: fn(argument) -> Result(Subject(msg), StartError), -) -> ChildSpec(msg, argument, argument) { - ChildSpec(start: start, returning: fn(argument, _channel) { argument }) -} - -// TODO: test -/// As each child is added to a supervisors children a new argument is prepared -/// with which to start the next child. By default argument is the same as the -/// previous argument, but this function can be used to change it to something -/// else by passing a function that takes the previous argument and the sender -/// of the previous child. -/// -pub fn returning( - child: ChildSpec(msg, argument_a, argument_b), - updater: fn(argument_a, Subject(msg)) -> argument_c, -) -> ChildSpec(msg, argument_a, argument_c) { - ChildSpec(start: child.start, returning: updater) -} - -fn init( - spec: Spec(argument, return), -) -> actor.InitResult(State(return), Message) { - // Create a subject so that we can asynchronously retry restarting when we - // fail to bring an exited child - let retry = process.new_subject() - - // Trap exits so that we get a message when a child crashes - process.trap_exits(True) - - // Combine selectors - let selector = - process.new_selector() - |> process.selecting(retry, RetryRestart) - |> process.selecting_trapped_exits(Exit) - - // Start any children - let result = - Starter(argument: spec.argument, exec: None) - |> Ready - |> spec.init - - // Pass back up the result - case result { - Ready(starter) -> { - let restarts = - intensity_tracker.new( - limit: spec.max_frequency, - period: spec.frequency_period, - ) - let state = - State(starter: starter, restarts: restarts, retry_restarts: retry) - actor.Ready(state, selector) - } - - Failed(error) -> - actor.Failed(case error.error { - actor.InitTimeout -> "Child initialisation timed out" - actor.InitCrashed(reason) -> - string.append( - "Child crashed during initialisation: ", - string.inspect(reason), - ) - actor.InitFailed(reason) -> - string.append( - "Child failed to start during initialisation: ", - string.inspect(reason), - ) - }) - } -} - -type HandleExitError { - RestartFailed(pid: Pid, restarts: IntensityTracker) - TooManyRestarts -} - -fn handle_exit(pid: Pid, state: State(a)) -> actor.Next(Message, State(a)) { - let outcome = { - // If we are handling an exit then we must have some children - let assert Some(start) = state.starter.exec - - // Check to see if there has been too many restarts in this period - use restarts <- result.then( - state.restarts - |> intensity_tracker.add_event - |> result.map_error(fn(_) { TooManyRestarts }), - ) - - // Restart the exited child and any following children - use #(starter, _) <- result.then( - start(StartFrom(pid)) - |> result.map_error(fn(e: ChildStartError) { - RestartFailed(option.unwrap(e.previous_pid, pid), restarts) - }), - ) - - Ok(State(..state, starter: starter, restarts: restarts)) - } - - case outcome { - Ok(state) -> actor.continue(state) - Error(RestartFailed(failed_child, restarts)) -> { - // Asynchronously enqueue the restarting of this child again as we were - // unable to restart them this time. We do this asynchronously as we want - // to have a chance to handle any system messages that have come in. - process.send(state.retry_restarts, failed_child) - let state = State(..state, restarts: restarts) - actor.continue(state) - } - Error(TooManyRestarts) -> - actor.Stop(process.Abnormal( - "Child processes restarted too many times within allowed period", - )) - } -} - -fn loop( - message: Message, - state: State(argument), -) -> actor.Next(Message, State(argument)) { - case message { - Exit(exit_message) -> handle_exit(exit_message.pid, state) - RetryRestart(pid) -> handle_exit(pid, state) - } -} - -/// Start a supervisor from a given specification. -/// -/// -/// ## Examples -/// -/// ```gleam -/// let worker = worker(my_actor.start) -/// -/// let children = fn(children) { -/// children -/// |> add(worker) -/// |> add(worker) -/// } -/// -/// start_spec(Spec( -/// argument: initial_state, -/// frequency_period: 1, -/// max_frequency: 5, -/// init: children, -/// )) -/// ``` -/// -pub fn start_spec(spec: Spec(a, b)) -> Result(Subject(Message), StartError) { - actor.start_spec(actor.Spec( - init: fn() { init(spec) }, - loop: loop, - init_timeout: 60_000, - )) -} - -/// Start a supervisor from a given `init` function. -/// -/// The init argument passed to children will be `Nil` and the maximum restart -/// intensity will be 1 restart per 5 seconds (the same as the default for -/// [Erlang supervisors][erl-sup]). If you wish to specify these values, see -/// the `start_spec` function and the `Spec` type. -/// -/// [erl-sup]: https://www.erlang.org/doc/design_principles/sup_princ.html#maximum-restart-intensity -/// -/// ## Examples -/// -/// ```gleam -/// let worker = worker(my_actor.start) -/// -/// let children = fn(children) { -/// children -/// |> add(worker) -/// |> add(worker) -/// } -/// -/// start(children) -/// ``` -/// -pub fn start( - init: fn(Children(Nil)) -> Children(a), -) -> Result(Subject(Message), StartError) { - start_spec(Spec( - init: init, - argument: Nil, - max_frequency: 1, - frequency_period: 5, - )) -} - -/// A type used to describe the situation in which an Erlang based application -/// is starting. -/// -/// For more information see the [Erlang distributed application -/// documentation][1] and the Learn You Some Erlang chapter on [distributed -/// applications][2]. -/// -/// [1]: https://erlang.org/doc/design_principles/distributed_applications.html -/// [2]: https://learnyousomeerlang.com/distributed-otp-applications -/// -pub type ApplicationStartMode { - Normal - Takeover(Node) - Failover(Node) -} - -pub type ApplicationStop - -@external(erlang, "gleam_otp_external", "application_stopped") -pub fn application_stopped() -> ApplicationStop - -/// The result of starting a Gleam actor. -/// -/// This type is compatible with Gleam supervisors. If you wish to convert it -/// to a type compatible with Erlang supervisors see the `ErlangStartResult` -/// type and `erlang_start_result` function. -/// -pub type StartResult(msg) = - actor.StartResult(msg) - -/// An Erlang supervisor compatible process start result. -/// -pub type ErlangStartResult = - actor.ErlangStartResult - -/// Convert a Gleam actor start result into an Erlang supervisor compatible -/// process start result. -/// -pub fn to_erlang_start_result(res: StartResult(msg)) -> ErlangStartResult { - actor.to_erlang_start_result(res) -} diff --git a/src/gleam/otp/task.gleam b/src/gleam/otp/task.gleam deleted file mode 100644 index 70b7802..0000000 --- a/src/gleam/otp/task.gleam +++ /dev/null @@ -1,457 +0,0 @@ -//// A task is a kind of process that computes a value and then sends the result back -//// to its parent. Commonly multiple tasks are used to compute multiple things at -//// once. -//// -//// If you do not care to receive a result back at the end then you should not -//// use this module, `actor` or `process` are likely more suitable. -//// -//// ```gleam -//// let task = task.async(fn() { do_some_work() }) -//// let value = do_some_other_work() -//// value + task.await(task, 100) -//// ``` -//// -//// Tasks spawned with async can be awaited on by their caller process (and -//// only their caller) as shown in the example above. They are implemented by -//// spawning a process that sends a message to the caller once the given -//// computation is performed. -//// -//// There are some important things to consider when using tasks: -//// -//// 1. If you are using async tasks, you must await a reply as they are always -//// sent. -//// -//// 2. Tasks link the caller and the spawned process. This means that, -//// if the caller crashes, the task will crash too and vice-versa. This is -//// on purpose: if the process meant to receive the result no longer -//// exists, there is no purpose in completing the computation. -//// -//// 3. A task's callback function must complete by returning or panicking. -//// It must not `exit` with the reason "normal". -//// -//// This module is inspired by Elixir's [Task module][1]. -//// -//// [1]: https://hexdocs.pm/elixir/master/Task.html -//// - -import gleam/dict -import gleam/dynamic.{type Dynamic} -import gleam/erlang/process.{type Pid, type Selector, type Subject} -import gleam/function -import gleam/list -import gleam/option.{type Option, None, Some} - -pub opaque type Task(value) { - Task(owner: Pid, pid: Pid, subject: Subject(value)) -} - -// TODO: test -/// Spawn a task process that calls a given function in order to perform some -/// work. The result of this function is sent back to the parent and can be -/// received using the `await` function. -/// -/// See the top level module documentation for more information on async/await. -/// -pub fn async(work: fn() -> value) -> Task(value) { - let owner = process.self() - let subject = process.new_subject() - let pid = - process.start(linked: True, running: fn() { process.send(subject, work()) }) - Task(owner: owner, pid: pid, subject: subject) -} - -pub type AwaitError { - Timeout - Exit(reason: Dynamic) -} - -// We can only wait on a task if we are the owner of it so crash if we are -// waiting on a task we don't own. -fn assert_owner(task: Task(a)) -> Nil { - let self = process.self() - case task.owner == self { - True -> Nil - False -> - process.send_abnormal_exit( - self, - "awaited on a task that does not belong to this process", - ) - } -} - -// TODO: test -/// Wait for the value computed by a task. -/// -/// If the a value is not received before the timeout has elapsed then an error -/// is returned. -/// -pub fn try_await(task: Task(value), timeout: Int) -> Result(value, AwaitError) { - assert_owner(task) - let selector = - process.new_selector() - |> process.selecting(task.subject, function.identity) - case process.select(selector, timeout) { - // The task process has sent back a value - Ok(x) -> Ok(x) - - // The task process is alive but has not sent a value yet - Error(Nil) -> Error(Timeout) - } -} - -/// Wait for the value computed by a task. -/// -/// If the a value is not received before the timeout has elapsed or if the -/// task process crashes then this function crashes. -/// -pub fn await(task: Task(value), timeout: Int) -> value { - let assert Ok(value) = try_await(task, timeout) - value -} - -/// Get the `Pid` for a task. -/// -pub fn pid(task: Task(value)) -> Pid { - task.pid -} - -/// Wait endlessly for the value computed by a task. -/// -/// Be Careful! Like `try_await_forever`, this function does not return until -/// there is a value to receive. -/// -/// If the task process crashes then this function crashes. -/// -pub fn await_forever(task: Task(value)) -> value { - assert_owner(task) - let selector = - process.new_selector() - |> process.selecting(task.subject, function.identity) - process.select_forever(selector) -} - -type Message2(t1, t2) { - M2FromSubject1(t1) - M2FromSubject2(t2) - M2Timeout -} - -/// Wait for the values computed by multiple tasks. -/// -/// For each task, if the a value is not received before the timeout has -/// elapsed then an error is returned. -/// -pub fn try_await2( - task1: Task(t1), - task2: Task(t2), - timeout: Int, -) -> #(Result(t1, AwaitError), Result(t2, AwaitError)) { - assert_owner(task1) - assert_owner(task2) - - let timeout_subject = process.new_subject() - let timer = process.send_after(timeout_subject, timeout, M2Timeout) - - process.new_selector() - |> process.selecting(task1.subject, M2FromSubject1) - |> process.selecting(task2.subject, M2FromSubject2) - |> process.selecting(timeout_subject, function.identity) - |> try_await2_loop(None, None, timer) -} - -fn try_await2_loop( - selector: Selector(Message2(t1, t2)), - t1: Option(Result(t1, AwaitError)), - t2: Option(Result(t2, AwaitError)), - timer: process.Timer, -) -> #(Result(t1, AwaitError), Result(t2, AwaitError)) { - case t1, t2 { - Some(t1), Some(t2) -> { - process.cancel_timer(timer) - #(t1, t2) - } - - _, _ -> { - case process.select_forever(selector) { - // The task process has sent back a value - M2FromSubject1(x) -> { - let t1 = Some(Ok(x)) - try_await2_loop(selector, t1, t2, timer) - } - M2FromSubject2(x) -> { - let t2 = Some(Ok(x)) - try_await2_loop(selector, t1, t2, timer) - } - - M2Timeout -> { - #( - option.unwrap(t1, Error(Timeout)), - option.unwrap(t2, Error(Timeout)), - ) - } - } - } - } -} - -type Message3(t1, t2, t3) { - M3FromSubject1(t1) - M3FromSubject2(t2) - M3FromSubject3(t3) - M3Timeout -} - -/// Wait for the values computed by multiple tasks. -/// -/// For each task, if the a value is not received before the timeout has -/// elapsed then an error is returned. -/// -pub fn try_await3( - task1: Task(t1), - task2: Task(t2), - task3: Task(t3), - timeout: Int, -) -> #(Result(t1, AwaitError), Result(t2, AwaitError), Result(t3, AwaitError)) { - assert_owner(task1) - assert_owner(task2) - assert_owner(task3) - - let timeout_subject = process.new_subject() - let timer = process.send_after(timeout_subject, timeout, M3Timeout) - - process.new_selector() - |> process.selecting(task1.subject, M3FromSubject1) - |> process.selecting(task2.subject, M3FromSubject2) - |> process.selecting(task3.subject, M3FromSubject3) - |> process.selecting(timeout_subject, function.identity) - |> try_await3_loop(None, None, None, timer) -} - -fn try_await3_loop( - selector: Selector(Message3(t1, t2, t3)), - t1: Option(Result(t1, AwaitError)), - t2: Option(Result(t2, AwaitError)), - t3: Option(Result(t3, AwaitError)), - timer: process.Timer, -) -> #(Result(t1, AwaitError), Result(t2, AwaitError), Result(t3, AwaitError)) { - case t1, t2, t3 { - Some(t1), Some(t2), Some(t3) -> { - process.cancel_timer(timer) - #(t1, t2, t3) - } - - _, _, _ -> { - case process.select_forever(selector) { - // The task process has sent back a value - M3FromSubject1(x) -> { - let t1 = Some(Ok(x)) - try_await3_loop(selector, t1, t2, t3, timer) - } - M3FromSubject2(x) -> { - let t2 = Some(Ok(x)) - try_await3_loop(selector, t1, t2, t3, timer) - } - M3FromSubject3(x) -> { - let t3 = Some(Ok(x)) - try_await3_loop(selector, t1, t2, t3, timer) - } - - M3Timeout -> { - #( - option.unwrap(t1, Error(Timeout)), - option.unwrap(t2, Error(Timeout)), - option.unwrap(t3, Error(Timeout)), - ) - } - } - } - } -} - -type Message4(t1, t2, t3, t4) { - M4FromSubject1(t1) - M4FromSubject2(t2) - M4FromSubject3(t3) - M4FromSubject4(t4) - M4Timeout -} - -/// Wait for the values computed by multiple tasks. -/// -/// For each task, if the a value is not received before the timeout has -/// elapsed then an error is returned. -/// -pub fn try_await4( - task1: Task(t1), - task2: Task(t2), - task3: Task(t3), - task4: Task(t4), - timeout: Int, -) -> #( - Result(t1, AwaitError), - Result(t2, AwaitError), - Result(t3, AwaitError), - Result(t4, AwaitError), -) { - assert_owner(task1) - assert_owner(task2) - assert_owner(task3) - - let timeout_subject = process.new_subject() - let timer = process.send_after(timeout_subject, timeout, M4Timeout) - - process.new_selector() - |> process.selecting(task1.subject, M4FromSubject1) - |> process.selecting(task2.subject, M4FromSubject2) - |> process.selecting(task3.subject, M4FromSubject3) - |> process.selecting(task4.subject, M4FromSubject4) - |> process.selecting(timeout_subject, function.identity) - |> try_await4_loop(None, None, None, None, timer) -} - -fn try_await4_loop( - selector: Selector(Message4(t1, t2, t3, t4)), - t1: Option(Result(t1, AwaitError)), - t2: Option(Result(t2, AwaitError)), - t3: Option(Result(t3, AwaitError)), - t4: Option(Result(t4, AwaitError)), - timer: process.Timer, -) -> #( - Result(t1, AwaitError), - Result(t2, AwaitError), - Result(t3, AwaitError), - Result(t4, AwaitError), -) { - case t1, t2, t3, t4 { - Some(t1), Some(t2), Some(t3), Some(t4) -> { - process.cancel_timer(timer) - #(t1, t2, t3, t4) - } - - _, _, _, _ -> { - case process.select_forever(selector) { - // The task process has sent back a value - M4FromSubject1(x) -> { - let t1 = Some(Ok(x)) - try_await4_loop(selector, t1, t2, t3, t4, timer) - } - M4FromSubject2(x) -> { - let t2 = Some(Ok(x)) - try_await4_loop(selector, t1, t2, t3, t4, timer) - } - M4FromSubject3(x) -> { - let t3 = Some(Ok(x)) - try_await4_loop(selector, t1, t2, t3, t4, timer) - } - M4FromSubject4(x) -> { - let t4 = Some(Ok(x)) - try_await4_loop(selector, t1, t2, t3, t4, timer) - } - - M4Timeout -> { - #( - option.unwrap(t1, Error(Timeout)), - option.unwrap(t2, Error(Timeout)), - option.unwrap(t3, Error(Timeout)), - option.unwrap(t4, Error(Timeout)), - ) - } - } - } - } -} - -type Message(t) { - Message(from: Int, value: t) - MessageTimeout -} - -/// Wait for the values computed by multiple tasks. -/// -/// For each task, if the a value is not received before the timeout has -/// elapsed then an error is returned. -/// -pub fn try_await_all( - tasks: List(Task(t)), - timeout: Int, -) -> List(Result(t, AwaitError)) { - let #(selector, tasks_count) = { - let acc = #(process.new_selector(), 0) - // We need to do a couple of things before we start listening: - // - we have to check the owner of every task - // - we have to count the tasks to know when we can stop waiting for results - // - we have to map each task's message to keep track of its position in the - // original list - // - // Instead of iterating through the tasks list three times we do everything - // in a single fold that has to go through the list just once! - use #(selector, tasks_count), task, index <- list.index_fold(tasks, acc) - assert_owner(task) - let selector = process.selecting(selector, task.subject, Message(index, _)) - #(selector, tasks_count + 1) - } - - let timeout_subject = process.new_subject() - let timer = process.send_after(timeout_subject, timeout, MessageTimeout) - let selector = process.selecting(selector, timeout_subject, function.identity) - - try_await_all_loop(dict.new(), tasks_count, timer, selector) -} - -fn try_await_all_loop( - values: dict.Dict(Int, Result(b, AwaitError)), - tasks_count: Int, - timer: process.Timer, - selector: Selector(Message(b)), -) -> List(Result(b, AwaitError)) { - case dict.size(values) == tasks_count { - // If there's no more values to receive then we can return the list... - True -> { - process.cancel_timer(timer) - dict_to_list(values, tasks_count, Error(Timeout)) - } - // Otherwise we wait to receive another message (or the timeout) from one of - // the tasks. - False -> - case process.select_forever(selector) { - MessageTimeout -> dict_to_list(values, tasks_count, Error(Timeout)) - Message(from: index, value:) -> { - let values = dict.insert(values, index, Ok(value)) - try_await_all_loop(values, tasks_count, timer, selector) - } - } - } -} - -/// Given a dict returns a list with size `sized` where each item at index `i` -/// is `dict.get(dict, i) |> result.unwrap(default)`. -/// -/// ## Examples -/// -/// ```gleam -/// dict.from_list([#(1, "a"), #(4, "b")]) -/// |> dict_to_list(5, " ") -/// // -> [" ", "a", " ", " ", "b"] -/// ``` -/// -fn dict_to_list(dict: dict.Dict(Int, a), sized: Int, default: a) -> List(a) { - dict_to_list_loop(dict, default, sized - 1, []) -} - -fn dict_to_list_loop( - dict: dict.Dict(Int, a), - default: a, - index: Int, - list: List(a), -) -> List(a) { - case index < 0 { - True -> list - False -> { - let value = case dict.get(dict, index) { - Error(_) -> default - Ok(value) -> value - } - dict_to_list_loop(dict, default, index - 1, [value, ..list]) - } - } -} diff --git a/src/gleam_otp_external.erl b/src/gleam_otp_external.erl index 27c1073..13d0546 100644 --- a/src/gleam_otp_external.erl +++ b/src/gleam_otp_external.erl @@ -1,10 +1,12 @@ -module(gleam_otp_external). -export([ - application_stopped/0, convert_system_message/2, - static_supervisor_start_link/1 + application_stopped/0, convert_system_message/1, identity/1, + convert_erlang_start_error/1 ]). +identity(X) -> X. + % TODO: support other system messages % {replace_state, StateFn} % {change_code, Mod, Vsn, Extra} @@ -17,10 +19,16 @@ % {debug, {install, {Func, FuncState}}} % {debug, {install, {FuncId, Func, FuncState}}} % {debug, {remove, FuncOrId}} -% GetStatus(Subject(StatusInfo)) -convert_system_message({From, Ref}, Request) when is_pid(From) -> +convert_system_message({system, {From, Ref}, Request}) when is_pid(From) -> Reply = fun(Msg) -> - erlang:send(From, {Ref, Msg}), + case Ref of + [alias|Alias] = Tag when is_reference(Alias) -> + erlang:send(Alias, {Tag, Msg}); + [[alias|Alias] | _] = Tag when is_reference(Alias) -> + erlang:send(Alias, {Tag, Msg}); + _ -> + erlang:send(From, {Ref, Msg}) + end, nil end, System = fun(Callback) -> @@ -45,8 +53,9 @@ process_status({status_info, Module, Parent, Mode, DebugState, State}) -> application_stopped() -> ok. -static_supervisor_start_link(Arg) -> - case supervisor:start_link(gleam@otp@static_supervisor, Arg) of - {ok, P} -> {ok, P}; - {error, E} -> {error, {init_crashed, E}} - end. +convert_erlang_start_error({already_started, _}) -> + {init_failed, "already started"}; +convert_erlang_start_error({shutdown, _}) -> + {init_failed, "shutdown"}; +convert_erlang_start_error(Term) -> + {init_exited, {abnormal, Term}}. diff --git a/test/gleam/otp/actor_documentation_example_test.gleam b/test/gleam/otp/actor_documentation_example_test.gleam index 9de001a..b5e31d5 100644 --- a/test/gleam/otp/actor_documentation_example_test.gleam +++ b/test/gleam/otp/actor_documentation_example_test.gleam @@ -13,12 +13,14 @@ pub fn example_test() { // called `start`, `push` `pop`, `shutdown` to start and interact with the // Actor. We are not doing that here for the sake of showing how the Actor // API works. - let assert Ok(actor) = actor.start([], handle_message) + let assert Ok(actor) = + actor.new([]) |> actor.on_message(handle_message) |> actor.start + let subject = actor.data // We can send a message to the actor to push elements onto the stack. - process.send(actor, Push("Joe")) - process.send(actor, Push("Mike")) - process.send(actor, Push("Robert")) + process.send(subject, Push("Joe")) + process.send(subject, Push("Mike")) + process.send(subject, Push("Robert")) // The `Push` message expects no response, these messages are sent purely for // the side effect of mutating the state held by the actor. @@ -30,25 +32,28 @@ pub fn example_test() { // In this instance we are giving the actor 10 milliseconds to reply, if the // `call` function doesn't get a reply within this time it will panic and // crash the client process. - let assert Ok("Robert") = process.call(actor, Pop, 10) - let assert Ok("Mike") = process.call(actor, Pop, 10) - let assert Ok("Joe") = process.call(actor, Pop, 10) + let assert Ok("Robert") = process.call(subject, 10, Pop) + let assert Ok("Mike") = process.call(subject, 10, Pop) + let assert Ok("Joe") = process.call(subject, 10, Pop) // The stack is now empty, so if we pop again the actor replies with an error. - let assert Error(Nil) = process.call(actor, Pop, 10) + let assert Error(Nil) = process.call(subject, 10, Pop) // Lastly, we can send a message to the actor asking it to shut down. - process.send(actor, Shutdown) + process.send(subject, Shutdown) } // First step of implementing the stack Actor is to define the message type that // it can receive. // -// The type of the elements in the stack is no fixed so a type parameter is used +// The type of the elements in the stack is not fixed so a type parameter is used // for it instead of a concrete type such as `String` or `Int`. pub type Message(element) { // The `Shutdown` message is used to tell the actor to stop. // It is the simplest message type, it contains no data. + // + // Most the time we don't define an API to shut down an actor, but in this + // example we do to show how it can be done. Shutdown // The `Push` message is used to add a new element to the stack. @@ -64,23 +69,25 @@ pub type Message(element) { // The last part is to implement the `handle_message` callback function. // -// This function is called by the Actor each for each message it receives. -// Actor is single threaded only does one thing at a time, so it handles +// This function is called by the Actor for each message it receives. +// Actors are single threaded only does one thing at a time, so they handle // messages sequentially and one at a time, in the order they are received. // // The function takes the message and the current state, and returns a data // structure that indicates what to do next, along with the new state. fn handle_message( - message: Message(e), stack: List(e), -) -> actor.Next(Message(e), List(e)) { + message: Message(e), +) -> actor.Next(List(e), Message(e)) { case message { - // For the `Shutdown` message we return the `actor.Stop` value, which causes + // For the `Shutdown` message we return the `actor.stop` value, which causes // the actor to discard any remaining messages and stop. - Shutdown -> actor.Stop(process.Normal) + // We may chose to do some clean-up work here, but this actor doesn't need + // to do this. + Shutdown -> actor.stop() // For the `Push` message we add the new element to the stack and return - // `actor.Continue` with this new stack, causing the actor to process any + // `actor.continue` with this new stack, causing the actor to process any // queued messages or wait for more. Push(value) -> { let new_state = [value, ..stack] diff --git a/test/gleam/otp/actor_test.gleam b/test/gleam/otp/actor_test.gleam index b242d8e..030ebd2 100644 --- a/test/gleam/otp/actor_test.gleam +++ b/test/gleam/otp/actor_test.gleam @@ -1,3 +1,8 @@ +//// +//// NOTE: Don't forget to update the moduledoc of gleam/otp/actor if you +//// change this file. +//// + import gleam/dynamic.{type Dynamic} import gleam/erlang/atom.{type Atom} import gleam/erlang/process.{type Pid, type Subject} @@ -6,14 +11,16 @@ import gleam/int import gleam/otp/actor import gleam/otp/system import gleam/result +import gleam/string import gleeunit/should pub fn get_state_test() { - let assert Ok(subject) = - actor.start("Test state", fn(_msg, state) { actor.continue(state) }) + let assert Ok(actor) = + actor.new("Test state") + |> actor.on_message(fn(_msg, state) { actor.continue(state) }) + |> actor.start - subject - |> process.subject_owner + actor.pid |> system.get_state |> should.equal(dynamic.from("Test state")) } @@ -22,22 +29,20 @@ pub fn get_state_test() { fn get_status(a: Pid) -> Dynamic pub fn get_status_test() { - let assert Ok(subject) = - actor.start(Nil, fn(_msg, state) { actor.continue(state) }) + let assert Ok(actor) = + actor.new(Nil) + |> actor.on_message(fn(_msg, state) { actor.continue(state) }) + |> actor.start - subject - |> process.subject_owner + actor.pid |> get_status // TODO: assert something about the response } pub fn failed_init_test() { - actor.Spec( - init: fn() { actor.Failed("not enough wiggles") }, - loop: fn(_msg, state) { actor.continue(state) }, - init_timeout: 10, - ) - |> actor.start_spec + actor.new_with_initialiser(100, fn(_) { Error("not enough wiggles") }) + |> actor.on_message(fn(state, _msg) { actor.continue(state) }) + |> actor.start |> result.is_error |> should.be_true } @@ -46,22 +51,19 @@ pub fn timed_out_init_test() { process.trap_exits(True) let exit_selector = process.new_selector() - |> process.selecting_trapped_exits(function.identity) + |> process.select_trapped_exits(function.identity) let result = - actor.Spec( - init: fn() { - process.sleep(1000) - panic as "should not be reached" - }, - loop: fn(_msg, _state) { panic as "should not be reached" }, - init_timeout: 1, - ) - |> actor.start_spec + actor.new_with_initialiser(1, fn(_) { + process.sleep(100) + panic as "should not be reached" + }) + |> actor.on_message(fn(_state, _msg) { panic as "should not be reached" }) + |> actor.start - // Check that the exit isn't unhandled: it should be handled by start_spec. + // Check that the exit isn't unhandled: it should be handled by start. // Stop trapping exits before asserting, to avoid interfering with other tests. - let exit = process.select(exit_selector, 10) + let exit = process.selector_receive(exit_selector, 10) process.trap_exits(False) result |> should.equal(Error(actor.InitTimeout)) @@ -69,96 +71,89 @@ pub fn timed_out_init_test() { } pub fn suspend_resume_test() { - let assert Ok(subject) = - actor.start(0, fn(_msg, iter) { actor.continue(iter + 1) }) + let assert Ok(actor) = + actor.new(0) + |> actor.on_message(fn(iter, _msg) { actor.continue(iter + 1) }) + |> actor.start // Suspend process - subject - |> process.subject_owner + actor.pid |> system.suspend |> should.equal(Nil) // This normal message will not be handled yet so the state remains 0 - actor.send(subject, "hi") + actor.send(actor.data, "hi") // System messages are still handled - subject - |> process.subject_owner + actor.pid |> system.get_state |> should.equal(dynamic.from(0)) // Resume process - subject - |> process.subject_owner + actor.pid |> system.resume |> should.equal(Nil) // The queued regular message has been handled so the state has incremented - subject - |> process.subject_owner + actor.pid |> system.get_state |> should.equal(dynamic.from(1)) } pub fn subject_test() { - let assert Ok(subject) = - actor.start("state 1", fn(msg, _state) { actor.continue(msg) }) + let assert Ok(actor) = + actor.new("state 1") + |> actor.on_message(fn(_state, msg) { actor.continue(msg) }) + |> actor.start - subject - |> process.subject_owner + actor.pid |> system.get_state() |> should.equal(dynamic.from("state 1")) - actor.send(subject, "state 2") + actor.send(actor.data, "state 2") - subject - |> process.subject_owner + actor.pid |> system.get_state() |> should.equal(dynamic.from("state 2")) } pub fn unexpected_message_test() { // Quieten the logger - logger_set_primary_config( - atom.create_from_string("level"), - atom.create_from_string("error"), - ) + logger_set_primary_config(atom.create("level"), atom.create("error")) - let assert Ok(subject) = - actor.start("state 1", fn(msg, _state) { actor.continue(msg) }) + let assert Ok(actor) = + actor.new("state 1") + |> actor.on_message(fn(_state, msg) { actor.continue(msg) }) + |> actor.start - subject - |> process.subject_owner + actor.pid |> system.get_state() |> should.equal(dynamic.from("state 1")) - raw_send(process.subject_owner(subject), "Unexpected message 1") - actor.send(subject, "state 2") - raw_send(process.subject_owner(subject), "Unexpected message 2") + raw_send(actor.pid, "Unexpected message 1") + actor.send(actor.data, "state 2") + raw_send(actor.pid, "Unexpected message 2") - subject - |> process.subject_owner + actor.pid |> system.get_state() |> should.equal(dynamic.from("state 2")) } pub fn unexpected_message_handled_test() { - let assert Ok(subject) = - actor.start_spec(actor.Spec( - init: fn() { - let selector = - process.new_selector() - |> process.selecting_anything(function.identity) - actor.Ready(dynamic.from("init"), selector) - }, - loop: fn(msg, _state) { actor.continue(msg) }, - init_timeout: 10, - )) - - raw_send(process.subject_owner(subject), "Unexpected message 1") - - subject - |> process.subject_owner + let assert Ok(actor) = + actor.new_with_initialiser(10, fn(_) { + let selector = + process.new_selector() |> process.select_other(function.identity) + actor.initialised(dynamic.from("initial")) + |> actor.selecting(selector) + |> Ok + }) + |> actor.on_message(fn(_state, msg) { actor.continue(msg) }) + |> actor.start + + raw_send(actor.pid, "Unexpected message 1") + + actor.pid |> system.get_state() |> should.equal(dynamic.from("Unexpected message 1")) } @@ -171,78 +166,101 @@ type ActorMessage { mapper: fn(String) -> ActorMessage, ) SetIntSelector(reply: Subject(Subject(Int)), mapper: fn(Int) -> ActorMessage) + GetText(reply: Subject(String)) } pub fn replace_selector_test() { - let assert Ok(subject) = - actor.start("init", fn(msg: ActorMessage, state) { + let assert Ok(actor) = + actor.new_with_initialiser(50, fn(_) { + let subject = process.new_subject() + let selector = process.new_selector() |> process.select(subject) + actor.initialised(#(selector, "initial")) + |> actor.returning(subject) + |> actor.selecting(selector) + |> Ok + }) + |> actor.on_message(fn(state, msg) { case msg { - UserMessage(string) -> actor.continue("user message: " <> string) + UserMessage(string) -> + actor.continue(#(state.0, "user message: " <> string)) + Unknown(val) -> - actor.continue("unknown message: " <> dynamic.classify(val)) + actor.continue(#( + state.0, + "unknown message: " <> dynamic.classify(val), + )) + SetStringSelector(reply, mapper) -> { - let #(subject, selector) = mapped_selector(mapper) + let #(subject, selector) = mapped_selector(state.0, mapper) process.send(reply, subject) - actor.continue(state) |> actor.with_selector(selector) } + SetIntSelector(reply, mapper) -> { - let #(subject, selector) = mapped_selector(mapper) + let #(subject, selector) = mapped_selector(state.0, mapper) process.send(reply, subject) - actor.continue(state) |> actor.with_selector(selector) } + + GetText(reply) -> { + process.send(reply, state.1) + actor.continue(state) + } } }) + |> actor.start // Send initial user message to original subject - process.send(subject, UserMessage("test 1")) + process.send(actor.data, UserMessage("test 1")) // Check state - get_actor_state(subject) - |> should.equal(dynamic.from("user message: test 1")) + process.call(actor.data, 50, GetText) + |> should.equal("user message: test 1") // Get a new subject with string selector - let str_subj = process.call(subject, SetStringSelector(_, UserMessage), 1000) + let str_subj = + process.call(actor.data, 1000, SetStringSelector(_, UserMessage)) // Send to new string subject process.send(str_subj, "test 2") // Check state - get_actor_state(subject) - |> should.equal(dynamic.from("user message: test 2")) + process.call(actor.data, 50, GetText) + |> should.equal("user message: test 2") // Get a new subject with int selector let int_subj = process.call( - subject, - SetIntSelector(_, fn(n: Int) { UserMessage("test " <> int.to_string(n)) }), + actor.data, 1000, + SetIntSelector(_, fn(n: Int) { UserMessage("test " <> int.to_string(n)) }), ) // Send to new int subject process.send(int_subj, 3) // Check state - get_actor_state(subject) - |> should.equal(dynamic.from("user message: test 3")) + process.call(actor.data, 50, GetText) + |> should.equal("user message: test 3") // Try to send to old string subject process.send(str_subj, "test 4") // Check state - get_actor_state(subject) - |> should.equal(dynamic.from("unknown message: String")) + process.call(actor.data, 50, GetText) + |> should.equal("unknown message: Tuple of 2 elements") } pub fn abnormal_exit_can_be_trapped_test() { process.trap_exits(True) let exits = process.new_selector() - |> process.selecting_trapped_exits(function.identity) + |> process.select_trapped_exits(function.identity) // Make an actor exit with an abnormal reason - let assert Ok(subject) = - actor.start(Nil, fn(_, _) { actor.Stop(process.Abnormal("reason")) }) - process.send(subject, Nil) + let assert Ok(actor) = + actor.new(Nil) + |> actor.on_message(fn(_, _) { actor.continue(Nil) }) + |> actor.start + process.send_abnormal_exit(actor.pid, "boo!") - let trapped_reason = process.select(exits, 10) + let trapped_reason = process.selector_receive(exits, 10) // Stop trapping exits, as otherwise other tests fail process.trap_exits(False) @@ -250,10 +268,7 @@ pub fn abnormal_exit_can_be_trapped_test() { // The weird reason below is because of https://github.com/gleam-lang/erlang/issues/66 trapped_reason |> should.equal( - Ok(process.ExitMessage( - process.subject_owner(subject), - process.Abnormal("Abnormal(\"reason\")"), - )), + Ok(process.ExitMessage(actor.pid, process.Abnormal(dynamic.from("boo!")))), ) } @@ -261,49 +276,136 @@ pub fn killed_exit_can_be_trapped_test() { process.trap_exits(True) let exits = process.new_selector() - |> process.selecting_trapped_exits(function.identity) + |> process.select_trapped_exits(function.identity) // Make an actor exit with a killed reason - let assert Ok(subject) = - actor.start(Nil, fn(_, _) { actor.Stop(process.Killed) }) - process.send(subject, Nil) + let assert Ok(actor) = + actor.new(Nil) + |> actor.on_message(fn(_, _) { actor.continue(Nil) }) + |> actor.start + process.kill(actor.pid) - let trapped_reason = process.select(exits, 10) + let trapped_reason = process.selector_receive(exits, 10) // Stop trapping exits, as otherwise other tests fail process.trap_exits(False) trapped_reason - |> should.equal( - Ok(process.ExitMessage(process.subject_owner(subject), process.Killed)), - ) + |> should.equal(Ok(process.ExitMessage(actor.pid, process.Killed))) } -fn mapped_selector(mapper: fn(a) -> ActorMessage) { +fn mapped_selector( + selector: process.Selector(ActorMessage), + mapper: fn(a) -> ActorMessage, +) { let subject = process.new_subject() let selector = - process.new_selector() - |> process.selecting(subject, mapper) + selector + |> process.select_map(subject, mapper) // Always create a selector that catches unknown messages - |> process.selecting_anything(fn(data) { - data - |> dynamic.element(1, dynamic.dynamic) - |> result.unwrap(dynamic.from("unknown")) - |> Unknown - }) + |> process.select_other(Unknown) #(subject, selector) } -fn get_actor_state(subject: Subject(a)) { - subject - |> process.subject_owner - |> system.get_state -} - @external(erlang, "erlang", "send") fn raw_send(a: Pid, b: anything) -> anything @external(erlang, "logger", "set_primary_config") fn logger_set_primary_config(a: Atom, b: Atom) -> Nil + +pub fn named_new_test() { + let name = process.new_name("my_actor") + let assert Ok(actor) = + actor.new(0) + |> actor.named(name) + |> actor.on_message(fn(rename, message) { + actor.send(message, rename) + actor.continue(rename + 1) + }) + |> actor.start + + // It's alive! (and registered) + process.named(name) + |> should.be_ok + |> should.equal(actor.pid) + + // We can call the process using a new subject made with the name + let subject = process.named_subject(name) + actor.call(subject, 10, function.identity) + |> should.equal(0) + + // We can call the process using the returned subject + actor.call(actor.data, 10, function.identity) + |> should.equal(1) +} + +pub fn named_new_with_initialiser_test() { + let name = process.new_name("my_actor") + let assert Ok(actor) = + actor.new_with_initialiser(50, fn(subject) { + actor.initialised(0) + |> actor.returning(subject) + |> Ok + }) + |> actor.on_message(fn(rename, message) { + actor.send(message, rename) + actor.continue(rename + 1) + }) + |> actor.named(name) + |> actor.start + + // It's alive! (and registered) + process.named(name) + |> should.be_ok + |> should.equal(actor.pid) + + // We can call the process using a new subject made with the name + let subject = process.named_subject(name) + actor.call(subject, 10, function.identity) + |> should.equal(0) + + // We can call the process using the returned subject + actor.call(actor.data, 10, function.identity) + |> should.equal(1) +} + +pub fn new_with_initialiser_custom_selector_test() { + let parent_subject = process.new_subject() + + let assert Ok(actor) = + actor.new_with_initialiser(50, fn(subject1) { + let subject2 = process.new_subject() + let selector = + process.new_selector() + |> process.select_map(subject2, fn(message) { + "subject2: " <> string.inspect(message) + }) + |> process.select_other(fn(message) { + "not selected: " <> string.inspect(message) + }) + actor.initialised(Nil) + |> actor.returning(#(subject1, subject2)) + |> actor.selecting(selector) + |> Ok + }) + |> actor.on_message(fn(state, message) { + actor.send(parent_subject, message) + actor.continue(state) + }) + |> actor.start + + // There is a custom selector provided by the intialiser, and it didn't + // select for the default subject, so messages to that one do not get + // selected. Here that is shown by it going to the `select_other` catch-all + // handler. + actor.send(actor.data.0, "Hello!") + let assert Ok("not selected: " <> _) = process.receive(parent_subject, 10) + let assert Error(Nil) = process.receive(parent_subject, 10) + + // The subject that was added to the selector does get selected for. + actor.send(actor.data.1, "Hi!") + let assert Ok("subject2: \"Hi!\"" <> _) = process.receive(parent_subject, 10) + let assert Error(Nil) = process.receive(parent_subject, 10) +} diff --git a/test/gleam/otp/intensity_tracker_test.gleam b/test/gleam/otp/intensity_tracker_test.gleam deleted file mode 100644 index 76995b6..0000000 --- a/test/gleam/otp/intensity_tracker_test.gleam +++ /dev/null @@ -1,42 +0,0 @@ -import gleam/erlang/process -import gleam/otp/intensity_tracker.{type IntensityTracker, add_event, new} -import gleeunit -import gleeunit/should - -pub fn main() { - gleeunit.main() -} - -pub fn errors_when_too_intense_test() { - let count = 5 - new(limit: count, period: 1) - |> add_events(count, _) - |> add_event() - |> should.be_error() -} - -pub fn cools_down_after_period_test() { - let count = 1000 - let limiter = - new(limit: count, period: 1) - |> add_events(count, _) - - // Since the intensity_tracker is enforced in seconds, - // wait for that second to pass - process.sleep(1000) - limiter - |> add_events(count, _) -} - -fn add_events(count: Int, limiter: IntensityTracker) { - case count { - 0 -> limiter - i if i > 0 -> { - limiter - |> add_event() - |> should.be_ok() - |> add_events(count - 1, _) - } - _ -> panic - } -} diff --git a/test/gleam/otp/static_supervisor_test.gleam b/test/gleam/otp/static_supervisor_test.gleam index 97df232..efc8ddf 100644 --- a/test/gleam/otp/static_supervisor_test.gleam +++ b/test/gleam/otp/static_supervisor_test.gleam @@ -1,12 +1,13 @@ import gleam/erlang/process.{type Pid, type Subject} import gleam/otp/actor -import gleam/otp/static_supervisor as sup - -fn actor_child(name name, init init, loop loop) -> sup.ChildBuilder { - sup.worker_child(name, fn() { - let spec = actor.Spec(init: init, init_timeout: 10, loop: loop) - let assert Ok(subject) = actor.start_spec(spec) - Ok(process.subject_owner(subject)) +import gleam/otp/static_supervisor +import gleam/otp/supervision + +fn actor_child(init init, loop loop) -> supervision.ChildSpecification(Nil) { + supervision.worker(fn() { + actor.new_with_initialiser(50, init) + |> actor.on_message(loop) + |> actor.start }) } @@ -15,12 +16,11 @@ fn actor_child(name name, init init, loop loop) -> sup.ChildBuilder { fn init_notifier_child( subject: Subject(#(String, Pid)), name: String, -) -> sup.ChildBuilder { +) -> supervision.ChildSpecification(Nil) { actor_child( - name: name, - init: fn() { + init: fn(_) { process.send(subject, #(name, process.self())) - actor.Ready(name, process.new_selector()) + Ok(actor.initialised(name)) }, loop: fn(_msg, state) { actor.continue(state) }, ) @@ -30,11 +30,11 @@ pub fn one_for_one_test() { let subject = process.new_subject() let assert Ok(supervisor) = - sup.new(sup.OneForOne) - |> sup.add(init_notifier_child(subject, "1")) - |> sup.add(init_notifier_child(subject, "2")) - |> sup.add(init_notifier_child(subject, "3")) - |> sup.start_link + static_supervisor.new(static_supervisor.OneForOne) + |> static_supervisor.add(init_notifier_child(subject, "1")) + |> static_supervisor.add(init_notifier_child(subject, "2")) + |> static_supervisor.add(init_notifier_child(subject, "3")) + |> static_supervisor.start // Assert children have started let assert Ok(#("1", p1)) = process.receive(subject, 10) @@ -58,19 +58,19 @@ pub fn one_for_one_test() { let assert True = process.is_alive(p2) let assert True = process.is_alive(p3) - let assert True = process.is_alive(supervisor) - process.send_exit(supervisor) + let assert True = process.is_alive(supervisor.pid) + process.send_exit(supervisor.pid) } pub fn rest_for_one_test() { let subject = process.new_subject() let assert Ok(supervisor) = - sup.new(sup.RestForOne) - |> sup.add(init_notifier_child(subject, "1")) - |> sup.add(init_notifier_child(subject, "2")) - |> sup.add(init_notifier_child(subject, "3")) - |> sup.start_link + static_supervisor.new(static_supervisor.RestForOne) + |> static_supervisor.add(init_notifier_child(subject, "1")) + |> static_supervisor.add(init_notifier_child(subject, "2")) + |> static_supervisor.add(init_notifier_child(subject, "3")) + |> static_supervisor.start // Assert children have started let assert Ok(#("1", p1)) = process.receive(subject, 10) @@ -97,19 +97,19 @@ pub fn rest_for_one_test() { let assert True = process.is_alive(p2) let assert True = process.is_alive(p3) - let assert True = process.is_alive(supervisor) - process.send_exit(supervisor) + let assert True = process.is_alive(supervisor.pid) + process.send_exit(supervisor.pid) } pub fn one_for_all_test() { let subject = process.new_subject() let assert Ok(supervisor) = - sup.new(sup.OneForAll) - |> sup.add(init_notifier_child(subject, "1")) - |> sup.add(init_notifier_child(subject, "2")) - |> sup.add(init_notifier_child(subject, "3")) - |> sup.start_link + static_supervisor.new(static_supervisor.OneForAll) + |> static_supervisor.add(init_notifier_child(subject, "1")) + |> static_supervisor.add(init_notifier_child(subject, "2")) + |> static_supervisor.add(init_notifier_child(subject, "3")) + |> static_supervisor.start // Assert children have started let assert Ok(#("1", p1)) = process.receive(subject, 10) @@ -137,6 +137,23 @@ pub fn one_for_all_test() { let assert True = process.is_alive(p2) let assert True = process.is_alive(p3) - let assert True = process.is_alive(supervisor) - process.send_exit(supervisor) + let assert True = process.is_alive(supervisor.pid) + process.send_exit(supervisor.pid) +} + +pub fn duplicate_child_test() { + let subject = process.new_subject() + let spec = init_notifier_child(subject, "1") + + // Even though the spec is the same there are no problems starting the + // children. This demonstrates that the spec is not used as the supervisor + // child id at all, so it is not possible to have duplicate ids. + let assert Ok(supervisor) = + static_supervisor.new(static_supervisor.OneForAll) + |> static_supervisor.add(spec) + |> static_supervisor.add(spec) + |> static_supervisor.add(spec) + |> static_supervisor.start + + let assert True = process.is_alive(supervisor.pid) } diff --git a/test/gleam/otp/supervisor_test.gleam b/test/gleam/otp/supervisor_test.gleam deleted file mode 100644 index da65b67..0000000 --- a/test/gleam/otp/supervisor_test.gleam +++ /dev/null @@ -1,64 +0,0 @@ -import gleam/erlang/process -import gleam/otp/actor -import gleam/otp/supervisor.{add, returning, worker} -import gleeunit/should - -pub fn supervisor_test() { - let subject = process.new_subject() - - // Children send their name back to the test process during - // initialisation so that we can tell they (re)started - let child = - worker(fn(name) { - actor.start_spec( - actor.Spec( - init: fn() { - process.send(subject, #(name, process.self())) - actor.Ready(name, process.new_selector()) - }, - init_timeout: 10, - loop: fn(_msg, state) { actor.continue(state) }, - ), - ) - }) - - // Each child returns the next name, which is their name + 1 - let child = - child - |> returning(fn(name, _subject) { name + 1 }) - - supervisor.start_spec( - supervisor.Spec( - argument: 1, - frequency_period: 1, - max_frequency: 5, - init: fn(children) { - children - |> add(child) - |> add(child) - |> add(child) - }, - ), - ) - |> should.be_ok - - // Assert children have started - let assert Ok(#(1, p)) = process.receive(subject, 10) - let assert Ok(#(2, _)) = process.receive(subject, 10) - let assert Ok(#(3, _)) = process.receive(subject, 10) - let assert Error(Nil) = process.receive(subject, 10) - - // Kill first child an assert they all restart - process.kill(p) - let assert Ok(#(1, p1)) = process.receive(subject, 10) - let assert Ok(#(2, p2)) = process.receive(subject, 10) - let assert Ok(#(3, _)) = process.receive(subject, 10) - let assert Error(Nil) = process.receive(subject, 10) - - // Kill second child an assert the following children restart - process.kill(p2) - let assert Ok(#(2, _)) = process.receive(subject, 10) - let assert Ok(#(3, _)) = process.receive(subject, 10) - let assert Error(Nil) = process.receive(subject, 10) - let assert True = process.is_alive(p1) -} diff --git a/test/gleam/otp/task_test.gleam b/test/gleam/otp/task_test.gleam deleted file mode 100644 index ae80c8d..0000000 --- a/test/gleam/otp/task_test.gleam +++ /dev/null @@ -1,236 +0,0 @@ -import gleam/erlang/process.{type Pid} -import gleam/function -import gleam/otp/task.{Timeout} -import gleeunit/should - -@external(erlang, "gleam_otp_test_external", "flush") -fn flush() -> Nil - -@external(erlang, "gleam_otp_test_external", "get_message_queue_length") -fn get_message_queue_length(pid pid: Pid) -> Int - -@external(erlang, "timer", "sleep") -fn sleep(a: Int) -> Nil - -fn work(x) { - fn() { - sleep(15) - x - } -} - -pub fn async_await_test() { - // Spawn 3 tasks, performing 45ms work collectively - let t1 = task.async(work(1)) - let t2 = task.async(work(2)) - let t3 = task.async(work(3)) - - // Assert they run concurrently (not taking 45ms total) - task.try_await(t1, 35) - |> should.equal(Ok(1)) - task.try_await(t2, 5) - |> should.equal(Ok(2)) - task.try_await(t3, 5) - |> should.equal(Ok(3)) - - // Assert awaiting on previously retrieved tasks returns an error - // An already finished task will always time out! - let assert Error(Timeout) = task.try_await(t1, 35) - let assert Error(Timeout) = task.try_await(t2, 35) - let assert Error(Timeout) = task.try_await(t3, 35) -} - -pub fn async_await_unmonitor_test() { - // Start with an empty mailbox - flush() - - // Perform an asynchronous task - // and monitor it until it's done - let _result = - task.async(work(1)) - |> task.try_await(50) - - // Mailbox should be empty; - // no "DOWN" message should have been sent - process.self() - |> get_message_queue_length - |> should.equal(0) -} - -fn assert_no_leftover_messages() -> Nil { - let selector = - process.new_selector() - |> process.selecting_anything(function.identity) - - case process.select(selector, 20) { - Error(Nil) -> Nil - Ok(_) -> panic as "leftover message" - } -} - -pub fn try_await2_timeout_test() { - // Start with an empty mailbox - flush() - - let work = fn(x, y) { - fn() { - sleep(y) - x - } - } - - // 2 will not finish in time - let task1 = task.async(work(1, 0)) - let task2 = task.async(work(2, 10)) - - task.try_await2(task1, task2, 5) - |> should.equal(#(Ok(1), Error(Timeout))) - - // We don't want task messages to leak so we still have to wait for the ones - // that timed out earlier! - task.await_forever(task2) -} - -pub fn try_await3_test() { - // Start with an empty mailbox - flush() - - let work = fn(x) { - fn() { - sleep(5) - x - } - } - - let task1 = task.async(work(1)) - let task2 = task.async(work(2)) - let task3 = task.async(work(3)) - - task.try_await3(task1, task2, task3, 8) - |> should.equal(#(Ok(1), Ok(2), Ok(3))) - - // We want to make sure timers don't leak! - assert_no_leftover_messages() -} - -pub fn try_await3_timeout_test() { - // Start with an empty mailbox - flush() - - let work = fn(x, y) { - fn() { - sleep(y) - x - } - } - - // 1 will not finish in time - let task1 = task.async(work(1, 100)) - let task2 = task.async(work(2, 1)) - let task3 = task.async(work(3, 1)) - - task.try_await3(task1, task2, task3, 20) - |> should.equal(#(Error(Timeout), Ok(2), Ok(3))) - - // We don't want task messages to leak so we still have to wait for the ones - // that timed out earlier! - task.await_forever(task1) -} - -pub fn try_await4_test() { - // Start with an empty mailbox - flush() - - let work = fn(x) { - fn() { - sleep(5) - x - } - } - - let task1 = task.async(work(1)) - let task2 = task.async(work(2)) - let task3 = task.async(work(3)) - let task4 = task.async(work(4)) - - task.try_await4(task1, task2, task3, task4, 8) - |> should.equal(#(Ok(1), Ok(2), Ok(3), Ok(4))) - - // We want to make sure timers don't leak! - assert_no_leftover_messages() -} - -pub fn try_await4_timeout_test() { - // Start with an empty mailbox - flush() - - let work = fn(x, y) { - fn() { - sleep(y) - x - } - } - - // 1 will not finish in time - let task1 = task.async(work(1, 100)) - let task2 = task.async(work(2, 1)) - let task3 = task.async(work(3, 1)) - let task4 = task.async(work(4, 1)) - - task.try_await4(task1, task2, task3, task4, 20) - |> should.equal(#(Error(Timeout), Ok(2), Ok(3), Ok(4))) - - // We don't want task messages to leak so we still have to wait for the ones - // that timed out earlier! - task.await_forever(task1) -} - -pub fn try_await_all_test() { - // Start with an empty mailbox - flush() - - let work = fn(x) { - fn() { - sleep(5) - x - } - } - - let task1 = task.async(work(1)) - let task2 = task.async(work(2)) - let task3 = task.async(work(3)) - let task4 = task.async(work(4)) - - task.try_await_all([task1, task2, task3, task4], 8) - |> should.equal([Ok(1), Ok(2), Ok(3), Ok(4)]) - - // We want to make sure timers don't leak! - assert_no_leftover_messages() -} - -pub fn try_await_all_timeout_test() { - // Start with an empty mailbox - flush() - - let work = fn(x, y) { - fn() { - sleep(y) - x - } - } - - // 3 and 5 will not finish in time - let task1 = task.async(work(1, 1)) - let task2 = task.async(work(2, 1)) - let task3 = task.async(work(3, 50)) - let task4 = task.async(work(4, 1)) - let task5 = task.async(work(5, 100)) - - task.try_await_all([task1, task2, task3, task4, task5], 20) - |> should.equal([Ok(1), Ok(2), Error(Timeout), Ok(4), Error(Timeout)]) - - // We don't want task messages to leak so we still have to wait for the ones - // that timed out earlier! - task.await_forever(task3) - task.await_forever(task5) -} diff --git a/test/gleam/periodic_actor.gleam b/test/gleam/periodic_actor.gleam index 48d794f..2dec54c 100644 --- a/test/gleam/periodic_actor.gleam +++ b/test/gleam/periodic_actor.gleam @@ -1,32 +1,26 @@ //// An example of an actor that concurrently does something every X amount of //// time (or when it is sent a message by another process). -import gleam/erlang/process.{type Subject} +import gleam/erlang/process import gleam/io -import gleam/otp/actor.{type StartError, Ready, Spec} +import gleam/otp/actor.{type StartError} pub fn periodic_actor( every period_milliseconds: Int, run callback: fn() -> Nil, -) -> Result(Subject(Nil), StartError) { - let init = fn( - // Create a channel to periodically send a message to the actor on - ) { - let subject = process.new_subject() - let selector = - process.new_selector() - |> process.selecting(subject, fn(x) { x }) +) -> Result(actor.Started(Nil), StartError) { + let init = fn(subject) { + let selector = process.new_selector() |> process.select(subject) // Send the first message to trigger the looping process.send(subject, Nil) // We're ready to start receiving messages - Ready(subject, selector) + actor.initialised(subject) + |> actor.selecting(selector) + |> Ok } - let loop = fn( - _msg, - subject, + let loop = fn(subject, _msg) { // Send a message to itself in the future - ) { process.send_after(subject, period_milliseconds, Nil) // Run the callback as the timer has triggered again callback() @@ -35,7 +29,9 @@ pub fn periodic_actor( } // Start the actor - actor.start_spec(Spec(init: init, loop: loop, init_timeout: 50)) + actor.new_with_initialiser(50, init) + |> actor.on_message(loop) + |> actor.start } pub fn main_test() {