Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): add new pebble functions #6888 #7191

Merged
merged 16 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ public class Extension extends AbstractExtension {
@Inject
private FileSizeFunction fileSizeFunction;

@Inject
private FileEmptyFunction fileEmptyFunction;

@Inject
private FileExistsFunction fileExistsFunction;

@Inject
@Nullable
private ErrorLogsFunction errorLogsFunction;
Expand Down Expand Up @@ -152,6 +158,8 @@ public Map<String, Function> getFunctions() {
}
functions.put("randomInt", new RandomIntFunction());
functions.put("randomPort", new RandomPortFunction());
functions.put("fileExists", fileExistsFunction);
functions.put("fileEmpty", fileEmptyFunction);
return functions;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package io.kestra.core.runners.pebble.functions;

import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.Slugify;
import io.pebbletemplates.pebble.error.PebbleException;
import io.pebbletemplates.pebble.extension.Function;
import io.pebbletemplates.pebble.template.EvaluationContext;
import io.pebbletemplates.pebble.template.PebbleTemplate;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.List;
import java.util.Map;

@Singleton
public class FileEmptyFunction implements Function {
private static final String ERROR_MESSAGE = "The 'fileEmpty' function expects an argument 'path' that is a path to a namespace file or an internal storage URI.";
private static final String KESTRA_SCHEME = "kestra:///";
private static final String TRIGGER = "trigger";
private static final String NAMESPACE = "namespace";
private static final String ID = "id";

@Inject
private StorageInterface storageInterface;

@Override
public List<String> getArgumentNames() {
return List.of("path");
}

@Override
public Object execute(Map<String, Object> args, PebbleTemplate self, EvaluationContext context, int lineNumber) {

if (!args.containsKey("path")) {
throw new PebbleException(null, ERROR_MESSAGE, lineNumber, self.getName());
}

Object path = args.get("path");
URI uri = getUriFromThePath(path, lineNumber, self);

try {
return readAndCheckEmptyFileFromInternalStorage(context, uri);
} catch (IOException e) {
throw new PebbleException(e, e.getMessage(), lineNumber, self.getName());
}

}

private URI getUriFromThePath(Object path, int lineNumber, PebbleTemplate self) {
if (path instanceof URI u) {
return u;
} else if (path instanceof String str && str.startsWith(KESTRA_SCHEME)) {
return URI.create(str);
} else {
throw new PebbleException(null, "Unable to create the URI from the path " + path, lineNumber, self.getName());
}
}

@SuppressWarnings("unchecked")
private boolean readAndCheckEmptyFileFromInternalStorage(EvaluationContext context, URI path) throws IOException {
Map<String, String> flow = (Map<String, String>) context.getVariable("flow");
Map<String, String> execution = (Map<String, String>) context.getVariable("execution");

boolean isFileFromCurrentExecution = isFileUriValid(flow.get(NAMESPACE), flow.get(ID), execution.get(ID), path);

if (!isFileFromCurrentExecution) {
checkIfFileFromParentExecution(context, path);
}
try (InputStream inputStream = storageInterface.get(flow.get("tenantId"), flow.get("namespace"), path)) {
byte[] buffer = new byte[1];
return inputStream.read(buffer, 0, 1) <= 0;
}
}

private void checkIfFileFromParentExecution(EvaluationContext context, URI path) {
if (context.getVariable(TRIGGER) != null) {
// if there is a trigger of type execution, we also allow accessing a file from the parent execution
Map<String, String> trigger = (Map<String, String>) context.getVariable(TRIGGER);

if (!isFileUriValid(trigger.get(NAMESPACE), trigger.get("flowId"), trigger.get("executionId"), path)) {
throw new IllegalArgumentException("Unable to read the file '" + path + "' as it didn't belong to the current execution");
}
}
else {
throw new IllegalArgumentException("Unable to read the file '" + path + "' as it didn't belong to the current execution");
}
}

private boolean isFileUriValid(String namespace, String flowId, String executionId, URI path) {
// Internal storage URI should be: kestra:///$namespace/$flowId/executions/$executionId/tasks/$taskName/$taskRunId/$random.ion or kestra:///$namespace/$flowId/executions/$executionId/trigger/$triggerName/$random.ion
// We check that the file is for the given flow execution
if (namespace == null || flowId == null || executionId == null) {
return false;
}

String authorizedBasePath = KESTRA_SCHEME + namespace.replace(".", "/") + "/" + Slugify.of(flowId) + "/executions/" + executionId + "/";
return path.toString().startsWith(authorizedBasePath);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package io.kestra.core.runners.pebble.functions;

import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.Slugify;
import io.pebbletemplates.pebble.error.PebbleException;
import io.pebbletemplates.pebble.extension.Function;
import io.pebbletemplates.pebble.template.EvaluationContext;
import io.pebbletemplates.pebble.template.PebbleTemplate;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;

@Singleton
public class FileExistsFunction implements Function {
private static final String ERROR_MESSAGE = "The 'fileExists' function expects an argument 'path' that is a path to the internal storage URI.";
private static final String KESTRA_SCHEME = "kestra:///";
private static final String TRIGGER = "trigger";
private static final String NAMESPACE = "namespace";
private static final String ID = "id";

@Inject
private StorageInterface storageInterface;

@Override
public List<String> getArgumentNames() {
return List.of("path");
}

@Override
public Object execute(Map<String, Object> args, PebbleTemplate self, EvaluationContext context, int lineNumber) {

if (!args.containsKey("path")) {
throw new PebbleException(null, ERROR_MESSAGE, lineNumber, self.getName());
}

Object path = args.get("path");
URI uri = getUriFromThePath(path, lineNumber, self);

try {
return checkFileExistsFromInternalStorage(context, uri);
} catch (IOException e) {
throw new PebbleException(e, e.getMessage(), lineNumber, self.getName());
}

}

private URI getUriFromThePath(Object path, int lineNumber, PebbleTemplate self) {
if (path instanceof URI u) {
return u;
} else if (path instanceof String str && str.startsWith(KESTRA_SCHEME)) {
return URI.create(str);
} else {
throw new PebbleException(null, "Unable to create the URI from the path " + path, lineNumber, self.getName());
}
}

@SuppressWarnings("unchecked")
private boolean checkFileExistsFromInternalStorage(EvaluationContext context, URI path) throws IOException {
Map<String, String> flow = (Map<String, String>) context.getVariable("flow");
Map<String, String> execution = (Map<String, String>) context.getVariable("execution");

boolean isFileFromCurrentExecution = isFileUriValid(flow.get(NAMESPACE), flow.get(ID), execution.get(ID), path);

if (!isFileFromCurrentExecution) {
checkIfFileFromParentExecution(context, path);
}

return storageInterface.exists(flow.get("tenantId"), flow.get("namespace"), path);
}

private void checkIfFileFromParentExecution(EvaluationContext context, URI path) {
if (context.getVariable(TRIGGER) != null) {
// if there is a trigger of type execution, we also allow accessing a file from the parent execution
Map<String, String> trigger = (Map<String, String>) context.getVariable(TRIGGER);

if (!isFileUriValid(trigger.get(NAMESPACE), trigger.get("flowId"), trigger.get("executionId"), path)) {
throw new IllegalArgumentException("Unable to read the file '" + path + "' as it didn't belong to the current execution");
}
}
else {
throw new IllegalArgumentException("Unable to read the file '" + path + "' as it didn't belong to the current execution");
}
}

private boolean isFileUriValid(String namespace, String flowId, String executionId, URI path) {
// Internal storage URI should be: kestra:///$namespace/$flowId/executions/$executionId/tasks/$taskName/$taskRunId/$random.ion or kestra:///$namespace/$flowId/executions/$executionId/trigger/$triggerName/$random.ion
// We check that the file is for the given flow execution
if (namespace == null || flowId == null || executionId == null) {
return false;
}

String authorizedBasePath = KESTRA_SCHEME + namespace.replace(".", "/") + "/" + Slugify.of(flowId) + "/executions/" + executionId + "/";
return path.toString().startsWith(authorizedBasePath);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package io.kestra.core.runners.pebble.functions;

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.runners.VariableRenderer;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.IdUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URI;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.*;

@KestraTest
class FileEmptyFunctionTest {

private static final String NAMESPACE = "my.namespace";
private static final String FLOW = "flow";

@Inject
VariableRenderer variableRenderer;

@Inject
StorageInterface storageInterface;

private URI getInternalStorageURI(String executionId) {
return URI.create("/" + NAMESPACE.replace(".", "/") + "/" + FLOW + "/executions/" + executionId + "/tasks/task/" + IdUtils.create() + "/123456.ion");
}

private URI getInternalStorageFile(URI internalStorageURI, String text) throws IOException {
return storageInterface.put(null, NAMESPACE, internalStorageURI, new ByteArrayInputStream(text.getBytes()));
}

@Test
void shouldReturnFalseForFileWithText() throws IOException, IllegalVariableEvaluationException {
String executionId = IdUtils.create();
URI internalStorageURI = getInternalStorageURI(executionId);
URI internalStorageFile = getInternalStorageFile(internalStorageURI, "NOT AN EMPTY FILE");

// test for an authorized execution
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", FLOW,
"namespace", NAMESPACE),
"execution", Map.of("id", executionId)
);
boolean render = Boolean.parseBoolean(variableRenderer.render("{{ fileEmpty('" + internalStorageFile + "') }}", variables));
assertFalse(render);
}

@Test
void shouldReturnTrueForEmpty() throws IOException, IllegalVariableEvaluationException {
String executionId = IdUtils.create();
URI internalStorageURI = getInternalStorageURI(executionId);
URI internalStorageFile = getInternalStorageFile(internalStorageURI, "");

// test for an authorized execution
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", FLOW,
"namespace", NAMESPACE),
"execution", Map.of("id", executionId)
);
boolean render = Boolean.parseBoolean(variableRenderer.render("{{ fileEmpty('" + internalStorageFile + "') }}", variables));
assertTrue(render);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package io.kestra.core.runners.pebble.functions;

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.runners.VariableRenderer;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.IdUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URI;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.*;

@KestraTest
class FileExistsFunctionTest {

private static final String NAMESPACE = "my.namespace";
private static final String FLOW = "flow";

@Inject
VariableRenderer variableRenderer;

@Inject
StorageInterface storageInterface;

private URI getInternalStorageURI(String executionId) {
return URI.create("/" + NAMESPACE.replace(".", "/") + "/" + FLOW + "/executions/" + executionId + "/tasks/task/" + IdUtils.create() + "/123456.ion");
}

private URI getInternalStorageFile(URI internalStorageURI, String text) throws IOException {
return storageInterface.put(null, NAMESPACE, internalStorageURI, new ByteArrayInputStream(text.getBytes()));
}

@Test
void shouldReturnTrueForExistingFile() throws IOException, IllegalVariableEvaluationException {
String executionId = IdUtils.create();
URI internalStorageURI = getInternalStorageURI(executionId);
URI internalStorageFile = getInternalStorageFile(internalStorageURI, "EXISTING FILE");

// test for an authorized execution
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", FLOW,
"namespace", NAMESPACE),
"execution", Map.of("id", executionId)
);
boolean render = Boolean.parseBoolean(variableRenderer.render("{{ fileExists('" + internalStorageFile + "') }}", variables));
assertTrue(render);
}

@Test
void shouldReturnFalseForNonExistentFile() throws IOException, IllegalVariableEvaluationException {
String executionId = IdUtils.create();
URI internalStorageURI = getInternalStorageURI(executionId);
URI internalStorageFile = URI.create("kestra://" + internalStorageURI.getRawPath()); // Don't create file just pass the URI.

// test for an authorized execution
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", FLOW,
"namespace", NAMESPACE),
"execution", Map.of("id", executionId)
);
boolean render = Boolean.parseBoolean(variableRenderer.render("{{ fileExists('" + internalStorageFile + "') }}", variables));
assertFalse(render);
}
}
Loading
Loading