Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.linkedin.transport.examples;

import com.google.common.collect.ImmutableList;
import com.linkedin.transport.api.data.StdData;
import com.linkedin.transport.api.data.StdString;
import com.linkedin.transport.api.data.StdStruct;
import com.linkedin.transport.api.udf.StdUDF2;
import com.linkedin.transport.api.udf.TopLevelStdUDF;
import java.util.List;


public class DeleteFieldOfAStruct extends StdUDF2<StdData, StdString, StdData> implements TopLevelStdUDF {

/**
*
* @param input struct from which the field is to be removed
* @param fieldName name of field to be removed. If no such field, don't do anything.
* @return struct with field removed
*/
@Override
public StdData eval(StdData input, StdString fieldName) {
if (input instanceof StdStruct) {
StdStruct inputAsStruct = ((StdStruct) input);
StdData field = inputAsStruct.getField(fieldName.get());
if (field == null) {
// no field with the matching name, don't do anything
return input;
} else {
// Replaced with empty string.
inputAsStruct.setField(fieldName.get(), getStdFactory().createString(""));
}
} else {
throw new RuntimeException("Works only with a struct");
}
return input;
}


@Override
public List<String> getInputParameterSignatures() {
return ImmutableList.of(
"row(varchar, varchar)",
"varchar"
);
}

@Override
public String getOutputParameterSignature() {
return "row(varchar, varchar)";
}

@Override
public String getFunctionName() {
return "deleteFieldOfAStruct";
}

@Override
public String getFunctionDescription() {
return "changes the content of the field called 'fieldName' to empty string";
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.linkedin.transport.examples;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.linkedin.transport.api.udf.StdUDF;
import com.linkedin.transport.api.udf.TopLevelStdUDF;
import com.linkedin.transport.test.AbstractStdUDFTest;
import com.linkedin.transport.test.spi.Row;
import com.linkedin.transport.test.spi.StdTester;
import java.util.List;
import java.util.Map;
import org.testng.annotations.Test;


public class TestDeleteFieldOfAStruct extends AbstractStdUDFTest {

@Override
protected Map<Class<? extends TopLevelStdUDF>, List<Class<? extends StdUDF>>> getTopLevelStdUDFClassesAndImplementations() {
return ImmutableMap.of(DeleteFieldOfAStruct.class, ImmutableList.of(DeleteFieldOfAStruct.class));
}

@Test
public void testDeleteFieldOfAStructFunction() {
StdTester tester = getTester();
Row data = rowWithFieldNames(
ImmutableList.of("firstName","lastName"),
ImmutableList.of("foo","bar"));
Row dataAfterDeleteFirstName = rowWithFieldNames(
ImmutableList.of("firstName","lastName"),
ImmutableList.of("","bar")); // <- value corresponding to field name "firstName" has been removed
Row dataAfterDeleteLastName = rowWithFieldNames(
ImmutableList.of("firstName","lastName"),
ImmutableList.of("foo","")); // <- value corresponding to field name "lastName" has been removed
tester.check(
functionCall("deleteFieldOfAStruct", data, "firstName"),
dataAfterDeleteFirstName,
"row(varchar, varchar)");
tester.check(
functionCall("deleteFieldOfAStruct", data, "lastName"),
dataAfterDeleteLastName,
"row(varchar, varchar)");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ protected static Row row(Object... args) {
return new Row(Arrays.asList(args));
}

/**
* Creates a row from the provided elements with the corresponding field names
* to pass to the test framework
*/
protected static Row rowWithFieldNames(List<String> fieldNames, List<Object> args) {
return new Row(fieldNames, args);
}

/**
* Creates a string containing the absolute path from the provided relative path of the resource to pass to the
* test framework
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,19 @@ private Pair<TestType, Object> resolveMap(Map<Object, Object> map, TestType keyT

private Pair<TestType, Object> resolveStruct(Row struct, List<TestType> fieldTypes) {
List<TestType> resolvedFieldTypes = new ArrayList<>();
List<String> resolvedFieldNames = new ArrayList<>();
List<Object> resolvedFields = new ArrayList<>();
IntStream.range(0, fieldTypes.size()).forEach(idx -> {
Pair<TestType, Object> resolvedField = resolveParameter(struct.getFields().get(idx), fieldTypes.get(idx));
if (struct.getFieldNames() != null) {
resolvedFieldNames.add(struct.getFieldNames().get(idx));
}
resolvedFieldTypes.add(resolvedField.getLeft());
resolvedFields.add(resolvedField.getRight());
});
return Pair.of(TestTypeFactory.struct(resolvedFieldTypes), new Row(resolvedFields));
if (resolvedFieldNames.isEmpty()) {
return Pair.of(TestTypeFactory.struct(resolvedFieldTypes), new Row(resolvedFields));
}
return Pair.of(TestTypeFactory.struct(resolvedFieldNames, resolvedFieldTypes), new Row(resolvedFieldNames, resolvedFields));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,25 @@
public class Row {

private final List<Object> _fields;
private List<String> _fieldNames;

public Row(List<Object> fields) {
_fields = fields;
}

public Row(List<String> fieldNames, List<Object> fields) {
_fieldNames = fieldNames;
_fields = fields;
}

public List<Object> getFields() {
return _fields;
}

public List<String> getFieldNames() {
return _fieldNames;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,14 @@ public static TestType inferTypeFromData(Object data) {
return TestTypeFactory.map(inferCollectionTypeFromData(map.keySet(), "map keys"),
inferCollectionTypeFromData(map.values(), "map values"));
} else if (data instanceof Row) {
Row row = (Row) data;
if (row.getFieldNames() == null) {
return TestTypeFactory.struct(
row.getFields().stream().map(TestTypeUtils::inferTypeFromData).collect(Collectors.toList()));
}
return TestTypeFactory.struct(
((Row) data).getFields().stream().map(TestTypeUtils::inferTypeFromData).collect(Collectors.toList()));
row.getFieldNames(),
row.getFields().stream().map(TestTypeUtils::inferTypeFromData).collect(Collectors.toList()));
} else if (data instanceof FunctionCall) {
return TestTypeFactory.UNKNOWN_TEST_TYPE;
} else {
Expand Down