Skip to content
This repository was archived by the owner on May 10, 2022. It is now read-only.
This repository was archived by the owner on May 10, 2022. It is now read-only.

refactor and simplify the API #122

@foreverneverer

Description

@foreverneverer

TODO(EN)

相关Issue:#121

总体原则:简单可扩展

  • 接口的数量要尽量少:繁杂的API接口不利于用户区分使用和开发人员后期维护
  • 可扩展性强:功能的添加和变更不应改动API接口,避免用户升级客户端后,出现兼容性问题
  • 风格应趋于统一化:所有的API接口应有统一风格的参数传递和返回类型

当前问题

目前接口的主要问题即接口繁杂,而纠其根源是扩展性差导致的,具体表现为

  • 过多的重载:为了扩展同一类型请求的不同行为,当前往往使用参数重载的方式。每当添加新参数或新功能,就需增加重载方法。这将引起接口数量持续膨胀。典型为每个写接口都被重载了两个版本:

    public void set(byte[] hashKey, byte[] sortKey, byte[] value, int ttlSeconds, int timeout /*ms*/)
        throws PException;
    public void set(byte[] hashKey, byte[] sortKey, byte[] value, int timeout /*ms*/)
        throws PException;
  • 多条数据批量读写:几乎每个基础接口(set/get/delete...)都被封装了两个batch方法:batchXXX和batchXXX2,而实际上每个batch方法的代码往往是重复的。例如:

     public void batchSet(List<SetItem> items, int timeout /*ms*/) throws PException;
     public int batchSet2(List<SetItem> items, List<PException> results, int timeout /*ms*/)
        throws PException;
    
     public void batchMultiDel(List<Pair<byte[], List<byte[]>>> keys, int timeout /*ms*/)
         throws PException;
     public int batchMultiDel2(
         List<Pair<byte[], List<byte[]>>> keys, List<PException> results, int timeout /*ms*/)
         throws PException;
  • 返回结果不统一:目前查询的返回结果包括value,pair<hashKey,value>, pair<sortKey, value>,一方面用户需要区分获取的数据含义,另一方面如果获取数据包含额外信息(如timestamp),则根本无法扩展

解决方案

  • 封装:对请求参数和返回结果都进行封装,如果需要变更参数或者增加参数,只需改动封装对象,而不必添加重载接口,例如:

    public class Set implements Serializable {
        public byte[] hashKey;
        public byte[] sortKey;
        public byte[] value;
        public int ttlSeconds; // 0 means no ttl
    }
    
    //仅保留一个接口
    public void set(Set set, int timeout)

    但是,将请求和返回结果进行封装的重构会导致现有接口被废弃,受影响的接口较多,该方案暂时搁置refactor: simplify api using encapsulating parameters and results #124

  • Batch接口重构:当前每个基础接口都封装了对应的batch接口,这使API的数量大大增加。batch操作应该设计可扩展的Batch抽象类,使得任何single操作都可以扩展为batch操作,从而降低冗余的batch接口数量:

    // 所有的批量操作都可以用Batch实现,如batchSet,可以让Request=Set,
    // Response=SetResult。理论上任何RPC都可以通过Batch类实现批处理。
    class Batch<Request, Response> {
      void commit(List<Request> resuests, List<Response> responses){}
    }

详细设计

Batch接口重构

我们支持以下操作进行批量处理,它们目前在 PegasusClientInterface/PegasusTableInterface 都有对应的 batchXXX 接口:

  1. get,multiGet
  2. del,multiDel,set,multiSet

那么下面给出具体重构方式:

public abstract class Batch<Request, Response> {

    final PegasusTableInterface table;
    final int timeout;

    FutureGroup<Response> futureGroup;

    public Batch(PegasusTableInterface table, int timeout) {
        this.table = table;
        this.timeout = timeout;
    }

    //无返回数据,任意请求失败,则抛出异常,适用于写操作
    public void commit(List<Request> requests) throws PException {
        assert (!requests.isEmpty());
        asyncCommit(requests).waitAllCompleteOrOneFail(null, timeout);
    }

    //有返回数据,且仅需要缓存结果值,任意请求失败,则抛出异常,已经成功的存放在responses中,适用于读操作
    public void commit(List<Request> requests, List<Response> responses) throws PException {
        assert (!requests.isEmpty());
        asyncCommit(requests).waitAllCompleteOrOneFail(responses, timeout);
    }

    //有返回数据,需要获取所有请求结果值和异常值,失败的请求把异常和结果存放在responses,适用于读写操作
    public void commitWaitAllComplete(List<Request> requests, List<Pair<PException,Response>> responses) throws PException {
        assert (!requests.isEmpty());
        asyncCommit(requests).waitAllcomplete(responses, timeout);
    }

    private FutureGroup<Response> asyncCommit(List<Request> requests){
        futureGroup = new FutureGroup<>(requests.size());
        for (Request request : requests) {
            futureGroup.add(asyncCommit(request));
        }
        return futureGroup;
    }

    //抽象方法,使用任意基础操作(set, get)实现该方法,则该基础方法即可扩展为batch操作
    abstract Future<Response> asyncCommit(Request request);
}

Example

Get操作基于Batch类重构的结果:

public class Get  {
    public byte[] hashKey;
    public byte[] sortKey;

    public Get(byte[] hashKey) {
        this.hashKey = hashKey;
    }

    public Get(byte[] hashKey, byte[] sortKey) {
        this.hashKey = hashKey;
        this.sortKey = sortKey;
    }
}

public class BatchGet extends Batch<Get, byte[]> {
    public BatchGet(PegasusTableInterface table, int timeout) {
        super(table, timeout);
    }

    @Override
    Future<byte[]> asyncCommit(Get get) {
        return table.asyncGet(get.hashKey, get.sortKey, timeout);
    }
}

Set操作基于Batch类重构的结果:

public class Set {
    public byte[] hashKey;
    public byte[] sortKey;
    public byte[] value;
    public int ttlSeconds; // 0 means no ttl

    public Set(byte[] hashKey, byte[] sortKey, byte[] value) {
        this(hashKey, sortKey, value, 0);
    }

    public Set(byte[] hashKey, byte[] sortKey, byte[] value, int ttlSeconds) {
        assert (value != null && ttlSeconds >= 0);
        this.hashKey = hashKey;
        this.sortKey = sortKey;
        this.value = value;
        this.ttlSeconds = ttlSeconds;
    }
}

public class BatchSet extends Batch<Set, Void> {
    public BatchSet(PegasusTableInterface table, int timeout) {
        super(table, timeout);
    }

    @Override
    public Future<Void> asyncCommit(Set set) {
        return table.asyncSet(set.hashKey, set.sortKey, set.value, timeout);
    }
}

新的Batch接口使用示例:

public class PegasusTest {

    public static void main(String[] args) throws PException {
        PegasusTableInterface table = PegasusClientFactory.getSingletonClient().openTable("temp");
        Batch<Get,byte[]> batch = new BatchGet(table, 1000);
        List<Get> requests = new ArrayList<>();
        List<byte[]> responses = new ArrayList<>();
        requests.add(new Get("hashKey1".getBytes(),"sortKey1".getBytes()));
        requests.add(new Get("hashKey2".getBytes(),"sortKey2".getBytes()));
        batch.commit(requests, responses);
    }
}

用户也可以自定义实现batch操作(仅需使用asyncXXX实现asyncCommit):

public class PegasusTest {

    public static void main(String[] args) throws PException {
        PegasusTableInterface table = PegasusClientFactory.getSingletonClient().openTable("temp");
        
        Batch<MultiGet, MultiGetResult> multiGetbatch =  new Batch<MultiGet, MultiGetResult>(table,1000) {
            @Override
            public Future<MultiGetResult> asyncCommit(MultiGet multiGet) {
                return table.asyncMultiGet(multiGet.hashKey, multiGet.sortKeys,timeout);
            }
        };

        List<MultiGet> multiGetsRequests = new ArrayList<>();
        List<MultiGetResult> multiGetsResponses = new ArrayList<>();
        multiGetbatch.commit(multiGetsRequests, multiGetsResponses);
    }
}

使用该方案后,包括checkAndSet等所有async接口都可以方便的扩展为batch操作。相关PR: #129

参考

[1] 良好的RPC接口设计,需要注意这些方面
[2] 主流的数据库接口:Hbase-ClientAmazonDynamoDB-Client

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions