Skip to content
This repository has been archived by the owner on Dec 16, 2021. It is now read-only.

Commit

Permalink
Support a feature to specify indexed columns. (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
yu-iskw authored Feb 13, 2018
1 parent fd6e940 commit ed128e2
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 44 deletions.
23 changes: 5 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,43 +36,30 @@ Apache Beam's `DatastoreIO` doesn't allow us to write same key at once.
- e.g. `DataflowRunner`
- `--parentPaths`: Output Google Datastore parent path(s)
- e.g. `Parent1:p1,Parent2:p2` ==> `KEY('Parent1', 'p1', 'Parent2', 'p2')`
- `--indexedColumns`: Indexed columns on Google Datastore.
- e.g. `col1,col2,col3` ==> `col1`, `col2` and `col2` are indexed on Google Datastore.
- `--numWorkers`: The number of workers when you run it on top of Google Dataflow.
- `--workerMachineType`: Google Dataflow worker instance type
- e.g. `n1-standard-1`, `n1-standard-4`

#### Example to run on Google Dataflow

```
./bigquery-to-datastore.sh \
--project=${GCP_PROJECT_ID} \
--runner=DataflowRunner \
--inputBigQueryDataset=test_dataset \
--inputBigQueryTable=test_table \
--outputDatastoreNamespace=test_namespace \
--outputDatastoreKind=TestKind \
--parentPaths=Parent1:p1,Parent2:p2 \
--keyColumn=uuid \
--tempLocation=gs://test_yu/test-log/ \
--gcpTempLocation=gs://test_yu/test-log/
```

Or, the below command allows us to run this package with a JAR file.

```
# compile
mvn clean package
# Run bigquery-to-datastore via the compiled JAR file
java -cp /path/to/bigquery-to-datastore-bundled-{version}.jar \
java -cp $(pwd)/target/bigquery-to-datastore-bundled-0.3.jar \
com.github.yuiskw.beam.BigQuery2Datastore \
--project=sage-shard-740 \
--project=your-gcp-project \
--runner=DataflowRunner \
--inputBigQueryDataset=test_dataset \
--inputBigQueryTable=test_table \
--outputDatastoreNamespace=test_namespace \
--outputDatastoreKind=TestKind \
--parentPaths=Parent1:p1,Parent2:p2 \
--keyColumn=id \
--indexedColumns=col1,col2,col3 \
--tempLocation=gs://test_bucket/test-log/ \
--gcpTempLocation=gs://test_bucket/test-log/
```
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

<groupId>com.github.yuiskw</groupId>
<artifactId>bigquery-to-datastore</artifactId>
<version>0.2</version>
<version>0.3</version>

<packaging>jar</packaging>

Expand Down
31 changes: 29 additions & 2 deletions src/main/java/com/github/yuiskw/beam/BigQuery2Datastore.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
*/
package com.github.yuiskw.beam;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;

import com.google.api.services.bigquery.model.TableReference;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO;
Expand All @@ -14,7 +17,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;

/**
* This class is used for a Dataflow job which write parsed Laplace logs to BigQuery.
Expand Down Expand Up @@ -51,6 +53,10 @@ public interface Optoins extends DataflowPipelineOptions {
@Description("Datastore parent path(s) (format: 'Parent1:p1,Parent2:p2')")
String getParentPaths();
void setparentPaths(String parentPaths);

@Description("Indexed columns (format: 'column1,column2,column3')")
String getIndexedColumns();
void setIndexedColumns(String indexedColumns);
}

public static void main(String[] args) {
Expand All @@ -63,6 +69,7 @@ public static void main(String[] args) {
String kind = options.getOutputDatastoreKind();
String keyColumn = options.getKeyColumn();
LinkedHashMap<String, String> parents = parseParentPaths(options.getParentPaths());
List<String> indexedColumns = parseIndexedColumns(options.getIndexedColumns());

// Input
TableReference tableRef = new TableReference().setDatasetId(datasetId).setTableId(tableId);
Expand All @@ -72,10 +79,12 @@ public static void main(String[] args) {
DatastoreV1.Write writer = DatastoreIO.v1().write().withProjectId(projectId);

// Build and run pipeline
TableRow2EntityFn fn =
new TableRow2EntityFn(projectId, namespace, parents, kind, keyColumn, indexedColumns);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(reader)
.apply(ParDo.of(new TableRow2EntityFn(projectId, namespace, parents, kind, keyColumn)))
.apply(ParDo.of(fn))
.apply(writer);
pipeline.run();
}
Expand Down Expand Up @@ -112,4 +121,22 @@ public static LinkedHashMap<String, String> parseParentPaths(String parentPaths)
}
return pathMap;
}

/**
* Get indexed column names
*
* @param indexedColumns a string separated by "," (i.e. "column1,column2,column3").
* @return array of indexed column name.
*/
public static List<String> parseIndexedColumns(String indexedColumns) {
ArrayList<String> columns = new ArrayList<String>();
if (indexedColumns != null) {
for (String path : indexedColumns.split(",")) {
// trim
String column = path.replaceAll("(^\\s+|\\s+$)", "");
columns.add(column);
}
}
return columns;
}
}
55 changes: 39 additions & 16 deletions src/main/java/com/github/yuiskw/beam/TableRow2EntityFn.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,22 @@ public class TableRow2EntityFn extends DoFn<TableRow, Entity> {
private String kind;
/** BigQuery column for Google Datastore key */
private String keyColumn;
/** Indexed columns in Google Datastore */
private List<String> indexedColumns;

public TableRow2EntityFn(
String projectId,
String namespace,
LinkedHashMap<String, String> parents,
String kind,
String keyColumn) {
String keyColumn,
List<String> indexedColumns) {
this.projectId = projectId;
this.namespace = namespace;
this.parents = parents;
this.kind = kind;
this.keyColumn = keyColumn;
this.indexedColumns = indexedColumns;
}

/**
Expand Down Expand Up @@ -75,59 +79,62 @@ public void processElement(ProcessContext c) {
/**
* Convert an object to Datastore value
*/
public Value convertToDatastoreValue(Object value) {
public Value convertToDatastoreValue(String columnName, Object value) {
Value v = null;

if (value == null) {
return v;
}

// The property is excluded from index or not.
boolean isExcluded = isExcludedFromIndex(columnName, this.indexedColumns);

if (value instanceof java.lang.Boolean) {
v = Value.newBuilder().setBooleanValue(((Boolean) value).booleanValue())
.setExcludeFromIndexes(true).build();
.setExcludeFromIndexes(isExcluded).build();
}
// INTEGER
else if (value instanceof java.lang.Integer) {
v = Value.newBuilder().setIntegerValue(((Integer) value).intValue())
.setExcludeFromIndexes(true).build();
.setExcludeFromIndexes(isExcluded).build();
}
else if (value instanceof String && parseInteger((String) value) != null) {
Integer integer = parseInteger((String) value);
v = Value.newBuilder().setIntegerValue(integer.intValue())
.setExcludeFromIndexes(true).build();
.setExcludeFromIndexes(isExcluded).build();
}
// LONG
else if (value instanceof java.lang.Long) {
v = Value.newBuilder().setIntegerValue((int) ((Long) value).longValue())
.setExcludeFromIndexes(true).build();
.setExcludeFromIndexes(isExcluded).build();
}
// DOUBLE
else if (value instanceof java.lang.Double) {
v = Value.newBuilder().setDoubleValue(((Double) value).doubleValue())
.setExcludeFromIndexes(true).build();
.setExcludeFromIndexes(isExcluded).build();
}
// TIMESTAMP
else if (value instanceof org.joda.time.LocalDateTime) {
Timestamp timestamp = toTimestamp(((LocalDateTime) value).toLocalDate().toDate());
v = Value.newBuilder().setTimestampValue(timestamp)
.setExcludeFromIndexes(true).build();
.setExcludeFromIndexes(isExcluded).build();
}
else if (value instanceof String && parseTimestamp((String) value) != null) {
Date date = parseTimestamp((String) value);
Timestamp timestamp = toTimestamp(date);
v = Value.newBuilder().setTimestampValue(timestamp)
.setExcludeFromIndexes(true).build();
.setExcludeFromIndexes(isExcluded).build();
}
// DATE
else if (value instanceof org.joda.time.LocalDate) {
Timestamp timestamp = toTimestamp(((LocalDate) value).toDate());
v = Value.newBuilder().setTimestampValue(timestamp)
.setExcludeFromIndexes(true).build();
.setExcludeFromIndexes(isExcluded).build();
} else if (value instanceof String && parseDate((String) value) != null) {
Date date = parseDate((String) value);
Timestamp timestamp = toTimestamp(date);
v = Value.newBuilder().setTimestampValue(timestamp)
.setExcludeFromIndexes(true).build();
.setExcludeFromIndexes(isExcluded).build();
}
// TIME
// NOTE: Datastore doesn't have any data type to time.
Expand All @@ -139,14 +146,14 @@ else if (value instanceof org.joda.time.LocalTime) {
// STRING
else if (value instanceof String) {
v = Value.newBuilder().setStringValue((String) value)
.setExcludeFromIndexes(true).build();
.setExcludeFromIndexes(isExcluded).build();
}
// RECORD
else if (value instanceof List) {
ArrayValue.Builder arrayValueBuilder = ArrayValue.newBuilder();
List<Object> records = (List<Object>) value;
for (Object record : records) {
Value subV = convertToDatastoreValue(record);
Value subV = convertToDatastoreValue(columnName, record);
if (subV != null) {
arrayValueBuilder.addValues(subV);
}
Expand All @@ -158,7 +165,7 @@ else if (value instanceof Map) {
Entity.Builder subEntityBuilder = Entity.newBuilder();
Map<String, Object> struct = (Map<String, Object>) value;
for (String subKey : struct.keySet()) {
Value subV = convertToDatastoreValue(struct.get(subKey));
Value subV = convertToDatastoreValue(columnName, struct.get(subKey));
if (subV != null) {
subEntityBuilder.putProperties(subKey, subV);
}
Expand Down Expand Up @@ -188,10 +195,11 @@ public Entity convertTableRowToEntity(TableRow row) throws ParseException {
}

// Put a value in the builder
String propertyName = entry.getKey();
Object value = entry.getValue();
Value v = convertToDatastoreValue(value);
Value v = convertToDatastoreValue(propertyName, value);
if (v != null) {
builder.putProperties(entry.getKey(), v);
builder.putProperties(propertyName, v);
}
}
return builder.build();
Expand Down Expand Up @@ -304,4 +312,19 @@ public static Date parseTimestamp(String value) {
}
return date;
}

/**
* Get if a column is excluded from index or not.
*
* @param columnName column name
* @return if a column is indexed, then return true. Otherwise, return false.
*/
public static boolean isExcludedFromIndex(String columnName, List<String> indexedColumns) {
if (indexedColumns.contains(columnName)) {
return false;
}
else {
return true;
}
}
}
14 changes: 13 additions & 1 deletion src/test/java/com/github/yuiskw/beam/BigQuery2DatastoreTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.github.yuiskw.beam;

import java.util.LinkedHashMap;
import java.util.List;

import org.junit.Ignore;
import org.junit.Rule;
Expand All @@ -24,7 +25,8 @@ public void testGetOptions() {
"--inputBigQueryTable=test_table",
"--outputDatastoreNamespace=test_namespace",
"--outputDatastoreKind=TestKind",
"--keyColumn=key_column"
"--keyColumn=key_column",
"--indexedColumns=col1,col2,col3"
};
BigQuery2Datastore.Optoins options = BigQuery2Datastore.getOptions(args);
assertEquals("test-project-id", options.getProject());
Expand All @@ -33,6 +35,7 @@ public void testGetOptions() {
assertEquals("test_namespace", options.getOutputDatastoreNamespace());
assertEquals("TestKind", options.getOutputDatastoreKind());
assertEquals("key_column", options.getKeyColumn());
assertEquals("col1,col2,col3", options.getIndexedColumns());
}

/**
Expand Down Expand Up @@ -126,4 +129,13 @@ public void testParseParentPathsWithSpaces() {
assertEquals("p1", parents.get("Parent1"));
assertEquals("p2", parents.get("Parent2"));
}

@Test
public void testParseIndexedColumns() {
String indexedColumns = " col1, col2 , col3 ";
List<String> columns = BigQuery2Datastore.parseIndexedColumns(indexedColumns);
assertEquals("col1", columns.get(0));
assertEquals("col2", columns.get(1));
assertEquals("col3", columns.get(2));
}
}
Loading

0 comments on commit ed128e2

Please sign in to comment.