Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resource handler #73

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions config/resource_handler/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Resource handler

This module handles resource allocations including granular per-process allocations, base allocation modifications, and retry setup. It makes use of the [retry module](../retry) module for retry setup.

The resource handler requires a JSON file containing allocation information, in the following format:

```JSON
{
"allocation_group": {
"process1": {
"cpus": {
"min": 1,
"fraction": 0.5,
"max": 1
},
"memory": {
"min": "150 MB",
"fraction": 0.23,
"max": "1 GB"
},
"retry_strategy": {
"memory": {
"strategy": "exponential",
"operand": 2
}
}
},
"process2": {
"cpus": {
"min": 2,
"fraction": 0.5,
"max": 2
},
"memory": {
"min": "1500 MB",
"fraction": 0.23,
"max": "10 GB"
}
},
},
"default": {
"process1": {
"cpus": {
"min": 1,
"fraction": 0.5,
"max": 1
},
"memory": {
"min": "150 MB",
"fraction": 0.23,
"max": "1 GB"
}
},
"process2": {
"cpus": {
"min": 2,
"fraction": 0.5,
"max": 2
},
"memory": {
"min": "1500 MB",
"fraction": 0.23,
"max": "10 GB"
}
},
}
}
```

A resource group with the label `default` is required in the JSON. Additionally, groups labeled as F-series and M-series allocations can be provided and will be automatically detected. For example, an F16 node will be recognized and the corresponding group label `f16` will automatically be loaded if defined in the resources JSON. A specific group can also be selected through configuration with the parameter `resource_allocation_profile_tag`.

## Example
An example of integrating the handler and setting up allocations:
```Nextflow
methods {
...
setup = {
...
resource_handler.handle_resources("${projectDir}/config/resources.json")
}
}
```
279 changes: 279 additions & 0 deletions config/resource_handler/resource_handler.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
import nextflow.util.SysHelper
import groovy.json.JsonSlurper

includeConfig "../retry/retry.config"

class SystemResources {
private Map resource_limits = [
'cpus': [
'type': java.lang.Integer,
'min': 1,
'max': SysHelper.getAvailCpus()
],
'memory': [
'type': nextflow.util.MemoryUnit,
'min': 1.MB,
'max': SysHelper.getAvailMemory()
],
'time': [
'type': nextflow.util.Duration,
'min': 1.s,
'max': 1000.d
]
]
private String resource_profile = null

SystemResources(Map params) {
// Search for config-defined resource limits
this.resource_limits.each { resource, resource_info ->
['min', 'max'].each { limit_end ->
try {
if (params.containsKey("${limit_end}_${resource}" as String)) {
this.resource_limits[resource][limit_end] = params["${limit_end}_${resource}" as String].asType(this.resource_limits[resource]['type'])
}
} catch (all) {
// Do nothing, let default value defined above take effect
}
}
}

this.identify_resource_profile()
}

Map get_limits(String type) {
return [
("min_${type}" as String): this.resource_limits[type]['min'],
("max_${type}" as String): this.resource_limits[type]['max']
]
}

Object check_limits(Object obj, String type) {
return SystemResources.check_limits(obj, type, this.resource_limits[type]['min'], this.resource_limits[type]['max'])
}

static Object check_limits(Object obj, String type, Object min, Object max) {
if (obj.compareTo(max) == 1) {
return max
} else if (obj.compareTo(min) == -1) {
return min
} else {
return obj
}
}

private void identify_resource_profile() {
// Identify if available resources match F- or M-series nodes
def cpus = this.resource_limits.cpus.max
def memory = this.resource_limits.memory.max.toGiga()

if (memory >= (cpus * 2 * 0.9 - 1) && (memory <= (cpus * 2))) {
this.resource_profile = "f${cpus}"
}

if (memory >= (cpus * 16 * 0.9 - 1) && (memory <= (cpus * 16))) {
this.resource_profile = "m${cpus}"
}
}

Object resolve_resource_allocation(Map allocation, String type) {
def min_raw = allocation['min'].asType(this.resource_limits[type]['type'])
def max_raw = allocation['max'].asType(this.resource_limits[type]['type'])

def min_allocation = this.check_limits(min_raw, type)
def max_allocation = this.check_limits(max_raw, type)

def requested_allocation = allocation.fraction * (this.resource_limits[type]['max'])

return SystemResources.check_limits(requested_allocation, type, min_allocation, max_allocation)
}

String get_resource_profile_tag() {
return this.resource_profile
}
}

class PipelineAllocation {
private SystemResources system_resources
private File resource_json
private String allocation_profile
private Map processed_resources = [:]
private Map retry_configurations = [:]
private Map raw_process_resources = [:]

PipelineAllocation(Object resource_json, Object params) {
this.system_resources = new SystemResources(params)
this.resource_json = new File(resource_json)

def json_slurper = new JsonSlurper()

// TO-DO: Dump original for logging, keeping in class for now
this.raw_process_resources = json_slurper.parse(this.resource_json)

assert this.raw_process_resources instanceof Map
// TO-DO: Validate JSON is in expected format

this.select_allocation_profile(params)

// Separate the retry strategies from the base allocations
this.processed_resources.each { process, allocations ->
if (allocations.containsKey("retry_strategy")) {
this.retry_configurations[process] = allocations["retry_strategy"]
allocations.remove("retry_strategy")
}
}

// Convert string memory units to memory unit
this.processed_resources.each { process, allocations ->
for (resource_type in ["cpus", "memory", "time"]) {
if (allocations.containsKey(resource_type)) {
allocations[resource_type] = this.system_resources.resolve_resource_allocation(allocations[resource_type], resource_type)
}
}
}

// TO-DO: Singularize processes that may be separated with `|`
}

private void load_resource_profile(String profile_tag) {
this.processed_resources = this.raw_process_resources[profile_tag]

if (!this.processed_resources) {
throw new Exception(" ### ERROR ### Failed to find requested resource profile: `${profile_tag}`")
}
}

private void select_allocation_profile(Map params) {
String profile_tag = null

// Try for user-given profile
if (params.containsKey('resource_allocation_profile_tag') && params.resource_allocation_profile_tag) {
profile_tag = params.resource_allocation_profile_tag
this.load_resource_profile(profile_tag)
return
}

// Try loading detected tag based on system resources
profile_tag = this.system_resources.get_resource_profile_tag()

if (profile_tag) {
try {
this.load_resource_profile(profile_tag)
return
} catch (all) {
// Continue to try loading `default` profile
}
}

// Resort to loading `default` profile
this.load_resource_profile('default')
}

// TO-DO: functionality to dump original loaded JSON to file for logging

void update_base_allocation(String resource, String process, Object multiplier) {
if (this.processed_resources.containsKey(process) && this.processed_resources[process].containsKey(resource)) {
this.processed_resources[process][resource] = this.system_resources.check_limits(this.processed_resources[process][resource] * multiplier, resource)
} else {
System.out.println(" ### WARNING ### No base value found for resource `${resource}` for process `${process}`. Update will be skipped.")
}
}

// Apply base resource updates
void apply_base_updates(Map resource_updates) {
resource_updates.each { resource, updates ->
updates.each { processes, multiplier ->
List processes_to_update = (processes instanceof String || processes instanceof GString) ? [processes] : processes

if (processes_to_update == []) {
processes_to_update = this.processed_resources.keySet() as List
}

processes_to_update.each { process ->
this.update_base_allocation(resource, process, multiplier)
}
}
}
}

Map get_base_resource_allocations() {
return this.processed_resources
}

Map get_retry_configuration() {
return this.retry_configurations
}

void print_resources() {
System.out.println(this.processed_resources)
}

SystemResources get_system_resources() {
return this.system_resources
}
}

resource_handler {
set_retry = { String proc_key, String proc_name, String type ->
if (process[proc_key]?[type] && \
params.retry_information?[proc_name]?[type] && \
params.retry_information?[proc_name]?[type]?.operand && \
params.retry_information?[proc_name]?[type]?.strategy) {
process[proc_key][type] = { retry.retry_updater(params.base_allocations[task.process.split(':')[-1]][type], \
params.retry_information[task.process.split(':')[-1]][type].strategy, \
params.retry_information[task.process.split(':')[-1]][type].operand, \
task.attempt, \
type) }
}
}

set_resource_limit_params = { SystemResources system_resources ->
["cpus", "memory", "time"].each { resource_type ->
system_resources.get_limits(resource_type).each { resource_limit_key, resource_limit ->
params[resource_limit_key] = resource_limit
}
}
}

setup_retry = { Map resources_to_allocate, Map retry_configuration ->
params.retry_information = [:]

for (proc_allocation in resources_to_allocate) {
def proc_key = "withName:${proc_allocation.key}" as String
def allocation = proc_allocation.value

def retry_strategy = retry_configuration.getOrDefault(proc_allocation.key, null)
if (retry_strategy) {
params.retry_information[proc_allocation.key] = retry_strategy
}

for (resource_allocation in allocation) {
process[proc_key]["${resource_allocation.key}"] = resource_allocation.value
if (retry_strategy && retry_strategy.containsKey(resource_allocation.key)) {
resource_handler.set_retry(proc_key, proc_allocation.key, 'cpus')
resource_handler.set_retry(proc_key, proc_allocation.key, 'memory')
}
}
}
}

handle_resources = { Object resource_file, Map current_params=params ->
def allocation_handler = new PipelineAllocation(resource_file, current_params)
def system_resources = allocation_handler.get_system_resources()

// Set params for limits for each resources
resource_handler.set_resource_limit_params(system_resources)

// Apply base resource updates if given
if (params.containsKey('base_resource_update') && params.base_resource_update) {
allocation_handler.apply_base_updates(params.base_resource_update)
}

// Set base allocations and retry
Map resources_to_allocate = allocation_handler.get_base_resource_allocations()
Map retry_configuration = allocation_handler.get_retry_configuration()

params.base_allocations = resources_to_allocate

resource_handler.setup_retry(resources_to_allocate, retry_configuration)
}
}
Loading