Skip to content

Commit

Permalink
Merge pull request #46 from trocco-io/feature/ensure-cursor-delete
Browse files Browse the repository at this point in the history
Make KintoneClient AutoCloseable and ensure cursor is deleted
  • Loading branch information
NamedPython authored Jul 26, 2024
2 parents 5c73787 + c187825 commit 2e3f7ea
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 46 deletions.
43 changes: 28 additions & 15 deletions src/main/java/org/embulk/input/kintone/KintoneClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import java.util.List;
import java.util.Map;

public class KintoneClient
public class KintoneClient implements AutoCloseable
{
private final Logger logger = LoggerFactory.getLogger(KintoneClient.class);
private static final int FETCH_SIZE = 500;
private static final String CURSOR_ALREADY_EXISTS_ERROR = "Cursor already exists: KintoneClient can only generate one cursor per instance.";
private RecordClient recordClient;
private AppClient appClient;
private String cursorId;

public KintoneClient() throws ConfigException
{
Expand Down Expand Up @@ -78,31 +80,32 @@ else if (task.getToken().isPresent()) {

public GetRecordsByCursorResponseBody getResponse(final PluginTask task, final Schema schema)
{
CreateCursorResponseBody cursor = this.createCursor(task, schema);
this.createCursor(task, schema);
try {
return this.recordClient.getRecordsByCursor(cursor.getId());
return this.recordClient.getRecordsByCursor(this.cursorId);
}
catch (KintoneApiRuntimeException e) {
this.logger.error(e.toString());
this.deleteCursor(cursor.getId());
throw new RuntimeException(e);
}
}

public GetRecordsByCursorResponseBody getRecordsByCursor(String cursor)
public GetRecordsByCursorResponseBody getRecordsByCursor()
{
try {
return this.recordClient.getRecordsByCursor(cursor);
return this.recordClient.getRecordsByCursor(this.cursorId);
}
catch (KintoneApiRuntimeException e) {
this.logger.error(e.toString());
this.deleteCursor(cursor);
throw new RuntimeException(e);
}
}

public CreateCursorResponseBody createCursor(final PluginTask task, final Schema schema)
public void createCursor(final PluginTask task, final Schema schema)
{
if (this.cursorId != null) {
throw new RuntimeException(CURSOR_ALREADY_EXISTS_ERROR);
}
ArrayList<String> fields = new ArrayList<>();
for (Column c : schema.getColumns()) {
fields.add(c.getName());
Expand All @@ -117,21 +120,25 @@ public CreateCursorResponseBody createCursor(final PluginTask task, final Schema
request.setQuery(task.getQuery().orElse(""));
request.setSize((long) FETCH_SIZE);
try {
return this.recordClient.createCursor(request);
CreateCursorResponseBody cursorResponse = recordClient.createCursor(request);
this.cursorId = cursorResponse.getId();
}
catch (KintoneApiRuntimeException e) {
this.logger.error(e.toString());
throw new RuntimeException(e);
}
}

public void deleteCursor(String cursor)
private void deleteCursor()
{
try {
this.recordClient.deleteCursor(cursor);
}
catch (KintoneApiRuntimeException e) {
this.logger.error(e.toString());
if (this.cursorId != null) {
try {
this.recordClient.deleteCursor(this.cursorId);
this.cursorId = null;
}
catch (KintoneApiRuntimeException e) {
this.logger.error(e.toString());
}
}
}

Expand Down Expand Up @@ -167,4 +174,10 @@ public List<String> getFieldCodes(final PluginTask task, FieldType fieldType)
}
return fieldCodes;
}

@Override
public void close()
{
this.deleteCursor();
}
}
58 changes: 27 additions & 31 deletions src/main/java/org/embulk/input/kintone/KintoneInputPlugin.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.embulk.input.kintone;

import com.google.common.annotations.VisibleForTesting;
import com.kintone.client.api.record.CreateCursorResponseBody;
import com.kintone.client.api.record.GetRecordsByCursorResponseBody;
import com.kintone.client.model.app.field.FieldProperty;
import com.kintone.client.model.record.FieldType;
Expand Down Expand Up @@ -82,42 +81,39 @@ public TaskReport run(TaskSource taskSource,
final TaskMapper taskMapper = CONFIG_MAPPER_FACTORY.createTaskMapper();
final PluginTask task = taskMapper.map(taskSource, PluginTask.class);

try {
try (PageBuilder pageBuilder = getPageBuilder(schema, output)) {
KintoneClient client = getKintoneClient();
client.validateAuth(task);
client.connect(task);
try (PageBuilder pageBuilder = getPageBuilder(schema, output); KintoneClient client = getKintoneClient()) {
client.validateAuth(task);
client.connect(task);
client.createCursor(task, schema);

CreateCursorResponseBody cursor = client.createCursor(task, schema);
GetRecordsByCursorResponseBody cursorResponse = new GetRecordsByCursorResponseBody(true, null);
GetRecordsByCursorResponseBody cursorResponse = new GetRecordsByCursorResponseBody(true, null);

List<String> subTableFieldCodes = null;
if (task.getExpandSubtable()) {
subTableFieldCodes = client.getFieldCodes(task, FieldType.SUBTABLE);
}
List<String> subTableFieldCodes = null;
if (task.getExpandSubtable()) {
subTableFieldCodes = client.getFieldCodes(task, FieldType.SUBTABLE);
}

while (cursorResponse.isNext()) {
cursorResponse = client.getRecordsByCursor(cursor.getId());
for (Record record : cursorResponse.getRecords()) {
List<Record> records;
if (task.getExpandSubtable()) {
records = expandSubtable(record, subTableFieldCodes);
}
else {
records = new ArrayList<>();
records.add(record);
}

for (Record expandedRecord : records) {
schema.visitColumns(new KintoneInputColumnVisitor(new KintoneAccessor(expandedRecord), pageBuilder, task));
pageBuilder.addRecord();
}
while (cursorResponse.isNext()) {
cursorResponse = client.getRecordsByCursor();
for (Record record : cursorResponse.getRecords()) {
List<Record> records;
if (task.getExpandSubtable()) {
records = expandSubtable(record, subTableFieldCodes);
}
else {
records = new ArrayList<>();
records.add(record);
}
pageBuilder.flush();
}

pageBuilder.finish();
for (Record expandedRecord : records) {
schema.visitColumns(new KintoneInputColumnVisitor(new KintoneAccessor(expandedRecord), pageBuilder, task));
pageBuilder.addRecord();
}
}
pageBuilder.flush();
}

pageBuilder.finish();
}
catch (Exception e) {
logger.error(e.getMessage());
Expand Down
41 changes: 41 additions & 0 deletions src/test/java/org/embulk/input/kintone/TestKintoneClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@
import org.embulk.config.ConfigException;
import org.embulk.config.ConfigSource;
import org.embulk.spi.InputPlugin;
import org.embulk.spi.Schema;
import org.embulk.test.TestingEmbulk;
import org.junit.Rule;
import org.junit.Test;

import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -166,4 +170,41 @@ public void checkGetFieldCodes()
assertEquals(1, client.getFieldCodes(task, FieldType.SUBTABLE).size());
assertTrue(client.getFieldCodes(task, FieldType.SUBTABLE).contains("subtable2"));
}

@Test
public void testCreateCursorThrowsExceptionWhenCursorAlreadyExists()
{
config = loadYamlResource(embulk);
PluginTask task = configMapper.map(config, PluginTask.class);
Schema schema = mock(Schema.class);

KintoneClient client = new KintoneClient(appClient);

try {
Field cursorIdField = KintoneClient.class.getDeclaredField("cursorId");
cursorIdField.setAccessible(true);
cursorIdField.set(client, "existingCursorId");

Field cursorAlreadyExistsErrorField = KintoneClient.class.getDeclaredField("CURSOR_ALREADY_EXISTS_ERROR");
cursorAlreadyExistsErrorField.setAccessible(true);
String cursorAlreadyExistsErrorMessage = (String) cursorAlreadyExistsErrorField.get(null);

Method createCursorMethod = KintoneClient.class.getDeclaredMethod("createCursor", PluginTask.class, Schema.class);
createCursorMethod.setAccessible(true);

Exception exception = assertThrows(RuntimeException.class, () -> {
try {
createCursorMethod.invoke(client, task, schema);
}
catch (InvocationTargetException ex) {
throw ex.getTargetException();
}
});

assertEquals(cursorAlreadyExistsErrorMessage, exception.getMessage());
}
catch (Exception e) {
throw new RuntimeException("Reflection error", e);
}
}
}

0 comments on commit 2e3f7ea

Please sign in to comment.