Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add @assign, add support for Broadcasted objects #64

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .buildkite/Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
Documenter = "e30172f5-a6a5-5a46-863b-614d45cd2de4"
ExplicitImports = "7d51a73a-1435-4ff3-83d9-f097790105c7"
JuliaFormatter = "98e50ef6-434e-11e9-1051-2b60c6c9e899"
LazyBroadcast = "9dccce8e-a116-406d-9fcc-a88ed4f510c8"
MPI = "da04e1cc-30fd-572f-bb4f-1f8673147195"
NCDatasets = "85f8d34a-cbdd-5861-8df4-14fed0d494ab"
Profile = "9abbd945-dff8-562f-b5e8-e1ebf5ef1b79"
Expand Down
6 changes: 6 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# NEWS

v0.2.4
-------

- Add `@assign`.
- Add support for lazy compute functions that return `Base.Broadcast.Broadcasted` objects.

v0.2.3
-------

Expand Down
6 changes: 4 additions & 2 deletions Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "ClimaDiagnostics"
uuid = "1ecacbb8-0713-4841-9a07-eb5aa8a2d53f"
authors = ["Gabriele Bozzola <[email protected]>"]
version = "0.2.3"
version = "0.2.4"

[deps]
Accessors = "7d9f7c33-5ae7-4f3b-8dc6-eff91059b697"
Expand All @@ -21,6 +21,7 @@ ClimaTimeSteppers = "0.7.10"
Dates = "1"
Documenter = "1"
ExplicitImports = "1.6"
LazyBroadcast = "0.1.4"
JuliaFormatter = "1"
NCDatasets = "0.13.1, 0.14"
Profile = "1"
Expand All @@ -37,10 +38,11 @@ ClimaTimeSteppers = "595c0a79-7f3d-439a-bc5a-b232dc3bde79"
Documenter = "e30172f5-a6a5-5a46-863b-614d45cd2de4"
ExplicitImports = "7d51a73a-1435-4ff3-83d9-f097790105c7"
JuliaFormatter = "98e50ef6-434e-11e9-1051-2b60c6c9e899"
LazyBroadcast = "9dccce8e-a116-406d-9fcc-a88ed4f510c8"
Profile = "9abbd945-dff8-562f-b5e8-e1ebf5ef1b79"
ProfileCanvas = "efd6af41-a80b-495e-886c-e51b0c7d77a3"
SafeTestsets = "1bc83da4-3b8d-516f-aca4-4fe02f6d838f"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"

[targets]
test = ["Aqua", "BenchmarkTools", "ClimaTimeSteppers", "Documenter", "ExplicitImports", "JuliaFormatter", "Profile", "ProfileCanvas", "SafeTestsets", "Test"]
test = ["Aqua", "BenchmarkTools", "ClimaTimeSteppers", "Documenter", "ExplicitImports", "JuliaFormatter", "LazyBroadcast", "Profile", "ProfileCanvas", "SafeTestsets", "Test"]
1 change: 1 addition & 0 deletions docs/src/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

```@docs
ClimaDiagnostics.IntegratorWithDiagnostics
ClimaDiagnostics.@assign
```

## `Schedules`
Expand Down
27 changes: 22 additions & 5 deletions docs/src/developer_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,27 @@ add_diagnostic_variable!(
)
```

When writing compute functions, consider using [`@assign`](@ref) to simplify
your code and, when possible, making your expression lazy with
[LazyBroadcast.jl](https://github.com/CliMA/LazyBroadcast.jl) to further improve
clarity and performance. To do that, add `LazyBroadcast` to your dependencies
and import `@lazy`. The previous example would look like:

```julia
###
# Density (3d)
###
add_diagnostic_variable!(
short_name = "rhoa",
long_name = "Air Density",
standard_name = "air_density",
units = "kg m^-3",
compute! = (out, state, cache, time) -> begin
return @lazy @. state.c.ρ
end,
)
```

It is a good idea to put safeguards in place to ensure that your users will not
be allowed to call diagnostics that do not make sense for the simulation they
are running. If your package has a notion of `Model` that is stored in `p`, you
Expand All @@ -224,11 +245,7 @@ function compute_hus!(
time,
moisture_model::T,
) where {T <: Union{EquilMoistModel, NonEquilMoistModel}}
if isnothing(out)
return state.c.ρq_tot ./ state.c.ρ
else
out .= state.c.ρq_tot ./ state.c.ρ
end
@assign out state.c.ρq_tot ./ state.c.ρ
end

add_diagnostic_variable!(
Expand Down
3 changes: 2 additions & 1 deletion docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ the Developer guide page.
- Allow users to define arbitrary new diagnostics;
- Trigger diagnostics on arbitrary conditions;
- Save output to HDF5 or NetCDF files, or a dictionary in memory;

- Work with lazy expressions (such as the ones produced by
[LazyBroadcast.jl](https://github.com/CliMA/LazyBroadcast.jl)).

40 changes: 31 additions & 9 deletions docs/src/user_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,40 @@ var = DiagnosticVariable(;
`compute_ta!` is the key function here. It determines how the variable should be
computed from the `state`, `cache`, and `time` of the simulation. Typically,
these are packaged within an `integrator` object (e.g., `state = integrator.u`
or `integrator.Y`).
or `integrator.Y`). `copy` is needed because we want to return a new area of
memory (without `copy`, `ClimaDiagnostics` might end up modifying `state.ta`).

`compute_ta!` takes another argument, `out`. `out` is an area of memory managed
by `ClimaDiagnostics` that is used to reduce the number of allocations needed
when working with diagnostics. The first time the diagnostic is called, an area
of memory is allocated and filled with the value (this is when `out` is
`nothing`). All the subsequent times, the same space is overwritten, leading to
much better performance. You should follow this pattern in all your diagnostics.

> Note, in the future, we hope to improve this rather clumsy way to write
> diagnostics. Hopefully, at some point you will just have to write something like
> `state.ta` and not worry about the `out` at all.
when working with diagnostics and improve performance. The first time the
diagnostic is called, an area of memory is allocated and filled with the value
(this is when `out` is `nothing`). All the subsequent times, the same space is
overwritten, leading to much better performance. You should follow this pattern
in all your diagnostics. To help you with that, `ClimaDiagnostics` provides a
macro, `@assign`, that expands to the if switch, so that `compute_ta!` can be
written as
```julia
import ClimaDiagnostics: @assign

function compute_ta!(out, state, cache, time)
@assign out copy(state.ta) state.ta
end
```
Note that when the second and third arguments of `@assign` are the same, one of
the two can be omitted.

`ClimaDiagnostics` supports working with unevaluated expressions represented by
`Base.Broadcast.Broadcasted` objects, such as the ones produced with
[LazyBroadcast.jl](https://github.com/CliMA/LazyBroadcast.jl). Using
`LazyBroadcast.jl`, the previous snippet can be rewritten as
```julia
import LazyBroadcast: @lazy

function compute_ta!(out, state, cache, time)
@lazy @. out = state.ta
end
```
Using lazy expressions can lead to improved performance and clearer code.

A `DiagnosticVariable` defines what a variable is and how to compute it, but
does not specify when to compute/output it. For that, we need
Expand Down
2 changes: 1 addition & 1 deletion docs/src/writers.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ linearly. For the vertical dimension, the behavior can be customized by passing
the `z_sampling_method` variable. When `z_sampling_method =
ClimaDiagnostics.Writers.LevelMethod()`, points evaluated on the grid levels
(and the provided number of points ignored), when `z_sampling_method =
ClimaDiagnostics.Writers.FakePressureLevelMethod()`, points are sampled
ClimaDiagnostics.Writers.FakePressureLevelsMethod()`, points are sampled
uniformly in simplified hydrostatic atmospheric model.

The output in the `NetCDFWriter` roughly follows the CF conventions.
Expand Down
87 changes: 83 additions & 4 deletions src/clima_diagnostics.jl
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ struct DiagnosticsHandler{
STORAGE <: Dict,
ACC <: Dict,
COUNT <: Dict,
BROAD <: Dict,
}
"""An iterable with the `ScheduledDiagnostic`s that are scheduled."""
scheduled_diagnostics::SD
Expand All @@ -34,6 +35,11 @@ struct DiagnosticsHandler{
many times the given diagnostics was computed from the last time it was output to
disk."""
counters::COUNT

"""Dictionary that maps a given `ScheduledDiagnostic` to a Base.Broadcast.Broadcasted
object. This is used to allow lazy evaluation of expressions, which can lead to reduced
code verbosity and improved performance."""
broadcasted_expressions::BROAD
end

"""
Expand All @@ -57,9 +63,11 @@ function DiagnosticsHandler(scheduled_diagnostics, Y, p, t; dt = nothing)

# For diagnostics that perform reductions, the storage is used for the values computed
# at each call. Reductions also save the accumulated value in accumulators.
# broadcasted_expressions maps diagnostics with LazyBroadcast objects.
storage = Dict()
accumulators = Dict()
counters = Dict()
broadcasted_expressions = Dict()

unique_scheduled_diagnostics = Tuple(unique(scheduled_diagnostics))
if length(unique_scheduled_diagnostics) != length(scheduled_diagnostics)
Expand Down Expand Up @@ -90,8 +98,18 @@ function DiagnosticsHandler(scheduled_diagnostics, Y, p, t; dt = nothing)
isa_time_reduction = !isnothing(diag.reduction_time_func)

# The first time we call compute! we use its return value. All the subsequent times
# (in the callbacks), we will write the result in place
storage[diag] = variable.compute!(nothing, Y, p, t)
# (in the callbacks), we will write the result in place. ClimaDiagnostics supports
# LazyBroadcast.jl. In this case, the return value of `compute!` is a
# `Base.Broadcast.Broadcasted` and we have to manually materialize the result.
out_or_broadcasted_expr = variable.compute!(nothing, Y, p, t)
if out_or_broadcasted_expr isa Base.Broadcast.Broadcasted
broadcasted_expressions[diag] = out_or_broadcasted_expr
storage[diag] =
Base.Broadcast.materialize(broadcasted_expressions[diag])
Comment on lines +107 to +108
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
storage[diag] =
Base.Broadcast.materialize(broadcasted_expressions[diag])
Base.Broadcast.materialize!(storage[diag], out_or_broadcasted_expr)

This is where we can use the mutating version of materialize

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would fail because storage[diag] does not exist at this point.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry, I wrote a message earlier but must not have hit comment. Can we then check if isnothing(storage[diag]) and conditionally call materialize!? materialize will always allocate a new field

else
storage[diag] = out_or_broadcasted_expr
end

counters[diag] = 1

# If it is not a reduction, call the output writer as well
Expand All @@ -115,6 +133,7 @@ function DiagnosticsHandler(scheduled_diagnostics, Y, p, t; dt = nothing)
storage,
accumulators,
counters,
broadcasted_expressions,
)
end

Expand Down Expand Up @@ -153,13 +172,42 @@ function orchestrate_diagnostics(
active_compute[diag_index] || continue
diag = scheduled_diagnostics[diag_index]

diag.variable.compute!(
diagnostic_handler.counters[diag] += 1

# ClimaDiagnostics supports LazyBroadcast.jl. When used, the return value of
# `compute!` is a `Base.Broadcast.Broadcasted`. We materialize the output to
# diagnostic_handler.storage[diag] in the next for loop. If the output is not a
# `Base.Broadcast.Broadcasted`, we don't have to do anything because compute! will
# already update diagnostic_handler.storage[diag].

out_or_broadcasted_expr = diag.variable.compute!(
diagnostic_handler.storage[diag],
integrator.u,
integrator.p,
integrator.t,
)
diagnostic_handler.counters[diag] += 1
if out_or_broadcasted_expr isa Base.Broadcast.Broadcasted
diagnostic_handler.broadcasted_expressions[diag] =
out_or_broadcasted_expr
end
end

# Evaluate the lazy compute (aka, materialize everything)
for diag_index in 1:length(scheduled_diagnostics)
active_compute[diag_index] || continue
diag = scheduled_diagnostics[diag_index]
haskey(diagnostic_handler.broadcasted_expressions, diag) || continue

Base.Broadcast.materialize!(
diagnostic_handler.storage[diag],
diagnostic_handler.broadcasted_expressions[diag],
)
end

# Process possible time reductions (now we have evaluated storage[diag])
for diag_index in 1:length(scheduled_diagnostics)
active_compute[diag_index] || continue
diag = scheduled_diagnostics[diag_index]

isa_time_reduction = !isnothing(diag.reduction_time_func)
if isa_time_reduction
Expand Down Expand Up @@ -300,3 +348,34 @@ function IntegratorWithDiagnostics(integrator, scheduled_diagnostics)

return integrator
end

"""
@assign out rhs1 rhs2
@assign out rhs

This macro expands to
```
if isnothing(out, rhs1, rhs2)
rhs1
else
out .= rhs2
end
```
When `rhs2` is not provided, it is assumed that `rhs2 = rhs1`.

This macro is used for `compute!` functions. The first time a `compute!` function
is called, `out` is `nothing` and a new output is allocated. The subsequent times,
`out` is used.

!!! compat "ClimaDiagnostics 0.2.4"
This macro was introduced in version `0.2.4`.
"""
macro assign(out, rhs1, rhs2 = rhs1)
quote
if isnothing($(esc(out)))
$(esc(rhs1))
else
$(esc(out)) .= $(esc(rhs2))
end
end
end
11 changes: 11 additions & 0 deletions test/diagnostics.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@ import SciMLBase

include("TestTools.jl")

@testset "Assign macro" begin
myvar = nothing
myrhs = 2
myrhs2 = 10
myvar_mut = [1]
@test ClimaDiagnostics.@assign(myvar, myrhs, myrhs2) == 2

ClimaDiagnostics.@assign(myvar_mut, myrhs, myrhs2)
@test myvar_mut == [10]
end

@testset "Diagnostics" begin
t0 = 0.0
tf = 1.0
Expand Down
18 changes: 17 additions & 1 deletion test/integration_test.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import ClimaComms
ClimaComms.@import_required_backends
end

import LazyBroadcast: @lazy

const context = ClimaComms.context()
ClimaComms.init(context)

Expand Down Expand Up @@ -45,16 +47,25 @@ function setup_integrator(output_dir; context, more_compute_diagnostics = 0)
return copy(u.my_var)
else
out .= u.my_var
return nothing
end
end

function compute_my_var_lazy!(out, u, p, t)
return @lazy @. out = u.my_var
end

simple_var = ClimaDiagnostics.DiagnosticVariable(;
compute! = compute_my_var!,
short_name = "YO",
long_name = "YO YO",
)

simple_var_lazy = ClimaDiagnostics.DiagnosticVariable(;
compute! = compute_my_var_lazy!,
short_name = "YO LAZY",
long_name = "YO YO LAZY",
)

average_diagnostic = ClimaDiagnostics.ScheduledDiagnostic(
variable = simple_var,
output_writer = nc_writer,
Expand All @@ -66,6 +77,10 @@ function setup_integrator(output_dir; context, more_compute_diagnostics = 0)
variable = simple_var,
output_writer = nc_writer,
)
inst_diagnostic_lazy = ClimaDiagnostics.ScheduledDiagnostic(
variable = simple_var_lazy,
output_writer = nc_writer,
)
inst_every3s_diagnostic = ClimaDiagnostics.ScheduledDiagnostic(
variable = simple_var,
output_writer = nc_writer,
Expand All @@ -81,6 +96,7 @@ function setup_integrator(output_dir; context, more_compute_diagnostics = 0)
scheduled_diagnostics = [
average_diagnostic,
inst_diagnostic,
inst_diagnostic_lazy,
inst_diagnostic_h5,
inst_every3s_diagnostic,
]
Expand Down
Loading