Skip to content

Commit

Permalink
Refactor meta and app server
Browse files Browse the repository at this point in the history
  • Loading branch information
christian-konrad committed Sep 19, 2022
1 parent 3c18611 commit fd00915
Show file tree
Hide file tree
Showing 65 changed files with 2,155 additions and 244 deletions.
51 changes: 51 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ PEERS=n1:localhost:6000,n2:localhost:6001,n3:localhost:6002
ID=n0
HTTP_PORT=8080
META_PORT=6000
REPLICATION_PORT=6050

./mvnw spring-boot:run -Dspring-boot.run.arguments="\
--node-id={ID} \
--server.address=localhost \
--server.port={HTTP_PORT} \
--metadata-port={META_PORT} \
--replication-port={REPLICATION_PORT} \
--storage=/tmp/raft-demo/{ID} \
--peers={PEERS}"
```
Expand All @@ -63,6 +65,7 @@ java -jar target/raft-log-replication-demo-0.0.1-SNAPSHOT.jar \
--server.address=localhost \
--server.port={HTTP_PORT} \
--metadata-port={META_PORT} \
--replication-port={REPLICATION_PORT} \
--storage=/tmp/raft-demo/{ID} \
--peers={PEERS}"
```
Expand All @@ -78,6 +81,7 @@ java -jar target/raft-log-replication-demo-0.0.1-SNAPSHOT.jar \
--server.address=localhost \
--server.port=8080 \
--metadata-port=6000 \
--replication-port=6050 \
--storage=/tmp/raft-demo/${ID} \
--peers=${PEERS}
Expand All @@ -87,6 +91,7 @@ java -jar target/raft-log-replication-demo-0.0.1-SNAPSHOT.jar \
--server.address=localhost \
--server.port=8081 \
--metadata-port=6001 \
--replication-port=6051 \
--storage=/tmp/raft-demo/${ID} \
--peers=${PEERS}
Expand All @@ -96,6 +101,7 @@ java -jar target/raft-log-replication-demo-0.0.1-SNAPSHOT.jar \
--server.address=localhost \
--server.port=8082 \
--metadata-port=6002 \
--replication-port=6052 \
--storage=/tmp/raft-demo/${ID} \
--peers=${PEERS}
```
Expand All @@ -104,6 +110,51 @@ java -jar target/raft-log-replication-demo-0.0.1-SNAPSHOT.jar \
See the docker-compose as well as the Dockerfile.
## Provisioning a partition
In general, a new partition of any state machine (any that implements the `StateMachine` interface) can be instantiated using the REST API:
```sh
POST http://localhost:8080/api/cluster-manager/partitions
Content-Type: application/json
{
"stateMachineClassName": "de.umr.raft.raftlogreplicationdemo.replication.impl.statemachines.EventStoreStateMachine",
"partitionName": "events",
"replicationFactor": 3
}
```
For event stores, there is a convenience method to instantiate a new stream on its own partition:
```sh
POST http://localhost:8080/api/event-store/streams
Content-Type: application/json
{
"streamName": "demo-event-store",
"schema": [
{
"name": "SYMBOL",
"type": "STRING",
"properties": {}
},
{
"name": "SECURITYTYPE",
"type": "INTEGER",
"properties": {}
},
{
"name": "LASTTRADEPRICE",
"type": "FLOAT",
"properties": {}
}
]
}
```
> Note that the schema is currently ignored, since it is hardcoded. The implementation of the dynamic schema invocation is currently pending.
## Roadmap
### Make a library out of the Apache Ratis on-top abstractions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ public class RaftConfig {
@Value("${storage}")
@Getter private String storagePath;

@Value("${server.port}:8000")
@Value("${server.port:8000}")
@Getter private String httpPort;

// TODO actually, THIS is replication-port. Need also seperate, explicit port for management group
@Value("${metadata-port}:6000")
@Getter private String metadataPort;
// or name managementPort and replicationPort ?
// TODO need some kind of port proxy
@Value("${metadata-port:6000}")
@Getter private String managementPort;

@Value("${replication-port:6050}")
@Getter private String replicationPort;

@Value("${server.address:#{null}}")
private Optional<String> host;
Expand Down Expand Up @@ -128,4 +128,8 @@ public String getHostAddress() {
public String getPublicHostAddress() {
return publicHost.orElseGet(this::getHostAddress);
}

public String getReplicationAddress() {
return getHostAddress() + ":" + getReplicationPort();
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package de.umr.raft.raftlogreplicationdemo.controllers.clustermanagement;

import de.umr.raft.raftlogreplicationdemo.models.clustermanagement.DetachPartitionRequest;
import de.umr.raft.raftlogreplicationdemo.models.clustermanagement.PartitionInfoResponse;
import de.umr.raft.raftlogreplicationdemo.models.clustermanagement.RaftPeerRequest;
import de.umr.raft.raftlogreplicationdemo.models.clustermanagement.RegisterPartitionRequest;
import de.umr.raft.raftlogreplicationdemo.replication.api.PartitionInfo;
import de.umr.raft.raftlogreplicationdemo.models.sysinfo.ClusterHealth;
import de.umr.raft.raftlogreplicationdemo.services.impl.clustermanagement.ClusterManagementService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;

import java.util.List;

@RestController
@RequestMapping("/api/cluster-manager/")
Expand All @@ -18,13 +19,33 @@ public class ClusterManagementController {
ClusterManagementService clusterManagementService;

@PostMapping(value = "/partitions")
public PartitionInfo registerPartition(@RequestBody RegisterPartitionRequest registerPartitionRequest) throws ClassNotFoundException {
public PartitionInfoResponse registerPartition(@RequestBody RegisterPartitionRequest registerPartitionRequest) throws ClassNotFoundException {
return clusterManagementService.registerPartition(registerPartitionRequest);
}

@DeleteMapping(value = "/partitions/{stateMachineClassname}/{partitionName}")
public PartitionInfoResponse detachPartition(@PathVariable String stateMachineClassname, @PathVariable String partitionName) throws ClassNotFoundException {
return clusterManagementService.detachPartition(new DetachPartitionRequest(stateMachineClassname, partitionName));
}

@GetMapping(value = "/partitions")
public List<PartitionInfoResponse> listPartitions() {
return clusterManagementService.listPartitions();
}

@GetMapping(value = "/partitions/{stateMachineClassname}")
public List<PartitionInfoResponse> listPartitions(@PathVariable String stateMachineClassname) throws ClassNotFoundException {
return clusterManagementService.listPartitions(stateMachineClassname);
}

// TODO remove after test
@PostMapping(value = "/heartbeat")
public void sendHeartbeat(@RequestBody RaftPeerRequest raftPeerRequest) throws ClassNotFoundException {
public void sendHeartbeat(@RequestBody RaftPeerRequest raftPeerRequest) {
clusterManagementService.sendHeartbeat(raftPeerRequest);
}

@GetMapping(value = "/health")
public ClusterHealth getClusterHealth() {
return clusterManagementService.getClusterHealth();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import de.umr.raft.raftlogreplicationdemo.models.counter.CreateCounterRequest;
import de.umr.raft.raftlogreplicationdemo.models.sysinfo.RaftGroupInfo;
import de.umr.raft.raftlogreplicationdemo.replication.api.PartitionInfo;
import de.umr.raft.raftlogreplicationdemo.services.impl.ReplicatedCounterService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
Expand All @@ -13,7 +14,7 @@
import java.util.List;
import java.util.concurrent.ExecutionException;

//@RestController
@RestController
@RequestMapping("/api/counter/replicated")
public class ReplicatedCounterController {

Expand All @@ -39,7 +40,7 @@ public String getCounter(@PathVariable String id) throws IOException, ExecutionE
// TODO POST /api/counter/replicated?partition-size=3 creates new raft group with 3 peers
@PostMapping("")
@ResponseStatus(value = HttpStatus.OK)
public RaftGroupInfo createNewCounter(@RequestBody CreateCounterRequest createCounterRequest) throws IOException, ExecutionException, InterruptedException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
public PartitionInfo createNewCounter(@RequestBody CreateCounterRequest createCounterRequest) throws IOException, ExecutionException, InterruptedException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
return counterService.createNewCounter(createCounterRequest).get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.util.Optional;
import java.util.concurrent.ExecutionException;

// TODO start again once cluster management works

@RestController
@RequestMapping("/api/event-store")
public class ReplicatedEventStoreController {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
import java.util.Optional;
import java.util.concurrent.ExecutionException;

//@RestController
@RestController
@RequestMapping("/api/sys-info/performance")
public class MeasurePerformanceController {

@Autowired MeasureEventStorePerformanceService measureEventStorePerformanceService;
@Autowired MeasureCounterPerformanceService measureCounterPerformanceService;
@Autowired MeasureMetadataPerformanceService measureMetadataPerformanceService;
//@Autowired MeasureCounterPerformanceService measureCounterPerformanceService;
//@Autowired MeasureMetadataPerformanceService measureMetadataPerformanceService;

// TODO remove crossOrigin later
@CrossOrigin
Expand Down Expand Up @@ -55,36 +55,36 @@ public String runGetEventStoreStreamInfoMeasurements() throws IOException, Execu
// TODO aggregate event

// TODO remove crossOrigin later
@CrossOrigin
@GetMapping("/measure/counter/replicated/increment/{count}")
public String runIncrementReplicatedCounterMeasurements(@PathVariable Integer count, @RequestParam Optional<Integer> batchSize) throws IOException, ExecutionException, InterruptedException {
return measureCounterPerformanceService.runIncrementReplicatedCounterMeasurements(count, batchSize).get();
}

@GetMapping("/measure/counter/replicated/increment/sync/{count}")
public String runIncrementReplicatedCounterSyncMeasurements(@PathVariable Integer count) throws IOException, ExecutionException, InterruptedException {
return measureCounterPerformanceService.runIncrementReplicatedCounterSyncMeasurements(count).get();
}

@GetMapping("/measure/counter/replicated/read")
public String runReadReplicatedCounterMeasurements() throws IOException, ExecutionException, InterruptedException {
return measureCounterPerformanceService.runReadReplicatedCounterMeasurements().get();
}

@GetMapping("/measure/counter/standalone/increment/{count}")
public String runIncrementStandaloneCounterMeasurements(@PathVariable Integer count) throws IOException, ExecutionException, InterruptedException {
return measureCounterPerformanceService.runIncrementStandaloneCounterMeasurements(count).get();
}

@GetMapping("/measure/counter/read")
public String runReadStandaloneCounterMeasurements() throws IOException, ExecutionException, InterruptedException {
return measureCounterPerformanceService.runReadStandaloneCounterMeasurements().get();
}
// @CrossOrigin
// @GetMapping("/measure/counter/replicated/increment/{count}")
// public String runIncrementReplicatedCounterMeasurements(@PathVariable Integer count, @RequestParam Optional<Integer> batchSize) throws IOException, ExecutionException, InterruptedException {
// return measureCounterPerformanceService.runIncrementReplicatedCounterMeasurements(count, batchSize).get();
// }
//
// @GetMapping("/measure/counter/replicated/increment/sync/{count}")
// public String runIncrementReplicatedCounterSyncMeasurements(@PathVariable Integer count) throws IOException, ExecutionException, InterruptedException {
// return measureCounterPerformanceService.runIncrementReplicatedCounterSyncMeasurements(count).get();
// }
//
// @GetMapping("/measure/counter/replicated/read")
// public String runReadReplicatedCounterMeasurements() throws IOException, ExecutionException, InterruptedException {
// return measureCounterPerformanceService.runReadReplicatedCounterMeasurements().get();
// }
//
// @GetMapping("/measure/counter/standalone/increment/{count}")
// public String runIncrementStandaloneCounterMeasurements(@PathVariable Integer count) throws IOException, ExecutionException, InterruptedException {
// return measureCounterPerformanceService.runIncrementStandaloneCounterMeasurements(count).get();
// }
//
// @GetMapping("/measure/counter/read")
// public String runReadStandaloneCounterMeasurements() throws IOException, ExecutionException, InterruptedException {
// return measureCounterPerformanceService.runReadStandaloneCounterMeasurements().get();
// }

@GetMapping("/measure/metadata/get")
public String runGetMetadataMeasurements() throws IOException, ExecutionException, InterruptedException {
return measureMetadataPerformanceService.runGetMetadataMeasurements().get();
}
// @GetMapping("/measure/metadata/get")
// public String runGetMetadataMeasurements() throws IOException, ExecutionException, InterruptedException {
// return measureMetadataPerformanceService.runGetMetadataMeasurements().get();
// }

// TODO also test non-replicated store

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import java.util.Map;


/**
*
*/

// TODO start again once cluster management works

@JsonComponent
public class InsertEventRequestDeserializer extends JsonDeserializer<InsertEventRequest> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@
import java.io.IOException;


/**
*
*/
// TODO start again once cluster management works

@JsonComponent
public class InsertEventRequestSerializer extends JsonSerializer<InsertEventRequest> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
// TODO obsolete, use RegisterPartitionRequest
public interface CreatePartitionRequest {

abstract long getPartitionsCount();
abstract int getPartitionsCount();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package de.umr.raft.raftlogreplicationdemo.models.clustermanagement;

import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.ratis.statemachine.impl.BaseStateMachine;

@RequiredArgsConstructor
public class DetachPartitionRequest {
@Getter @NonNull final String stateMachineClassName;
@Getter @NonNull final String partitionName;

public Class<? extends BaseStateMachine> getStateMachineClass() throws ClassNotFoundException {
return Class.forName(stateMachineClassName).asSubclass(BaseStateMachine.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package de.umr.raft.raftlogreplicationdemo.models.clustermanagement;

import de.umr.raft.raftlogreplicationdemo.replication.api.PartitionInfo;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;

import java.util.List;
import java.util.stream.Collectors;

@RequiredArgsConstructor
public class PartitionInfoResponse {
@Getter @NonNull final String partitionName;
@Getter @NonNull final String raftGroupId;
@Getter @NonNull final List<RaftPeerResponse> peers;
@Getter @NonNull final String stateMachineClassname;
@Getter @NonNull final PartitionInfo.PartitionState partitionState;

public static PartitionInfoResponse of(PartitionInfo partitionInfo) {
return new PartitionInfoResponse(
partitionInfo.getPartitionName().getName(),
partitionInfo.getRaftGroup().getGroupId().toString(),
partitionInfo.getRaftGroup().getPeers().stream().map(raftPeer -> new RaftPeerResponse(raftPeer.getId().toString(), raftPeer.getAddress())).collect(Collectors.toList()),
partitionInfo.getStateMachineClassname(),
partitionInfo.getPartitionState());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package de.umr.raft.raftlogreplicationdemo.models.clustermanagement;

import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
public class RaftPeerResponse {
@Getter @NonNull final String id;
@Getter @NonNull final String address;
}
Loading

0 comments on commit fd00915

Please sign in to comment.