-
Notifications
You must be signed in to change notification settings - Fork 222
Description
Description
When removing complex elements from a list bin in Aerospike using ListOperation.removeByValue(), the operation succeeds on records inserted programmatically via the Java client, but does not remove the elements from records inserted via AQL. No error is thrown for the AQL-inserted records, but the target elements remain unchanged.
ListOperation.removeByValue() works correctly for simple lists of strings but does not function as expected when used with lists of complex objects
Steps to reproduce:
Create a record with a list bin containing items via Java code (e.g., rec001).
Create a record with a list bin containing items via AQL (e.g., rec002).
Perform ListOperation.removeByValue() on both records (rec001 & rec002).
Verify whether the intended items were removed.
Expected behaviour:
Items should be deleted from the list bin for both records, regardless of insertion method.
Actual behavior:
For record added via Code (rec001) - The item is deleted
For record added via AQL (rec002) - The item is NOT deleted. Neither any error is thrown
Versions Tested
JDK - 21
Aerospike Client - 6.1.7
Aerospike Server - aerospike-server-enterprise:6.2.0.1 (Docker)
Sample Code
AQL for inserting record (rec002)
insert into test.list_demo (PK, 'storage_id', 'container') values('rec002', 'S1', JSON('{ "container_id": "C1", "item_holder_map": { "IH1": { "item_holder_id": "IH1", "items": [ { "item_id": "I1", "item_name": "Item 1", "item_type": "TYPE_A", "serial": 12345 }, { "item_id": "I2", "item_name": "Item 2", "item_type": "TYPE_B", "serial": 67890 }, { "item_id": "I3", "item_name": "Item 3", "item_type": "TYPE_A", "serial": 12378 } ] } } }'));Code for inserting record (rec001) & delete both records (rec001 & rec002)
import com.aerospike.client.Record;
import com.aerospike.client.*;
import com.aerospike.client.cdt.CTX;
import com.aerospike.client.cdt.ListOperation;
import com.aerospike.client.cdt.ListReturnType;
import com.aerospike.client.policy.WritePolicy;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.junit.jupiter.api.*;
import java.io.Serializable;
import java.time.ZoneId;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
/**
* @author rkyshz
* @Date 11/09/25
*/
@Log4j2
@SuppressWarnings({"unchecked"})
class AerospikeListValueDemo {
private static AerospikeClient aerospikeClient;
private static final String SET_NAME = "list_demo";
private static final String NAMESPACE = "test";
private static final String BIN_NAME = "container";
private static final String PK = "rec001";
private static final String PK_2 = "rec002";
private static final ObjectMapper _mapper = new ObjectMapper()
.registerModule(new JavaTimeModule())
.enable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE)
.setTimeZone(TimeZone.getTimeZone(ZoneId.systemDefault()))
.setSerializationInclusion(JsonInclude.Include.NON_NULL);
@BeforeEach
void setup() {
aerospikeClient = new AerospikeClient("localhost", 3000);
aerospikeClient.delete(getDurableDeleteWritePolicy(), new Key(NAMESPACE, SET_NAME, PK));
writeData(NAMESPACE, SET_NAME, PK, getStorage());
}
private static Storage getStorage() {
return new Storage("S1", new Container("C1", Map.of(
"IH1", new ItemHolder("IH1", List.of(
new Item("I1", "Item 1", 12345L, ItemType.TYPE_A),
new Item("I2", "Item 2", 67890L, ItemType.TYPE_B),
new Item("I3", "Item 3", 12378L, ItemType.TYPE_A)
))
)));
}
@Test
void shouldRemoveItemsByValue() {
try {
removeFromListByValue(NAMESPACE, SET_NAME, PK, BIN_NAME, List.of(getStorage().getContainer().getItemHolderMap().get("IH1").getItems().get(0)));
removeFromListByValue(NAMESPACE, SET_NAME, PK_2, BIN_NAME, List.of(getStorage().getContainer().getItemHolderMap().get("IH1").getItems().get(0)));
Storage viaCode = _mapper.convertValue(getRecord(NAMESPACE, SET_NAME, PK).bins, Storage.class);
Storage viaAql = _mapper.convertValue(getRecord(NAMESPACE, SET_NAME, PK_2).bins, Storage.class);
Assertions.assertTrue(viaCode.getContainer().getItemHolderMap().get("IH1").getItems().size()==2,"Item not deleted for record added via Code");
Assertions.assertTrue(viaAql.getContainer().getItemHolderMap().get("IH1").getItems().size()==2,"Item not deleted for record added via AQL");
} catch (Exception e) {
log.error(e);
Assertions.fail(e);
}
}
public <T extends Serializable> void writeData(String namespace, String set, String pk,
Storage value) {
Bin[] binArray = convertObjectToBins(value);
aerospikeClient.put(getDurableDeleteWritePolicy(), new Key(namespace, set, pk), binArray);
}
public <T extends Serializable> void removeFromListByValue(String namespace, String set, String pk,
String bin, List<Item> values) {
aerospikeClient.operate(getDurableDeleteWritePolicy(), new Key(namespace, set, pk),
values.stream().map(item -> ListOperation.removeByValue(bin, Value.get(_mapper.convertValue(item, Map.class)), ListReturnType.VALUE,
CTX.mapKey(Value.get("item_holder_map")),
CTX.mapKey(Value.get("IH1")),
CTX.mapKey(Value.get("items")))
).toArray(Operation[]::new));
}
public <T extends Serializable> Record getRecord(String namespace, String set, String pk) {
return aerospikeClient.get(null, new Key(namespace, set, pk));
}
private Bin[] convertObjectToBins(Object object) {
Map<String, Object> map = _mapper.convertValue(object, Map.class);
Bin[] binList = new Bin[map.size()];
int i = 0;
for (Map.Entry<String, Object> entry : map.entrySet()) {
binList[i++] = new Bin(entry.getKey(), entry.getValue());
}
return binList;
}
private static WritePolicy getDurableDeleteWritePolicy() {
WritePolicy writePolicy = new WritePolicy();
writePolicy.durableDelete = true;
writePolicy.sendKey = true;
return writePolicy;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
static class Storage implements Serializable {
private String storageId;
private Container container;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
static class Container implements Serializable {
private String containerId;
private Map<String, ItemHolder> itemHolderMap;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
static class ItemHolder implements Serializable {
private String itemHolderId;
private List<Item> items;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
static class Item implements Serializable {
private String itemId;
private String itemName;
private Long serial;
private ItemType itemType;
}
enum ItemType{
TYPE_A,
TYPE_B;
}
}Output
org.opentest4j.AssertionFailedError: Item not deleted for record added via AQL ==>