Skip to content

Commit 3ca3da9

Browse files
committed
support flink_value_extractor_factory jni #220
1 parent fa75757 commit 3ca3da9

File tree

6 files changed

+124
-3
lines changed

6 files changed

+124
-3
lines changed

java/rocksjni/flink_compactionfilterjni.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,15 @@ void Java_org_rocksdb_FlinkCompactionFilter_disposeFlinkCompactionFilterConfigHo
183183
delete config_holder;
184184
}
185185

186+
jlong Java_org_rocksdb_FlinkCompactionFilter_createFlinkValueExtractorFactory(
187+
JNIEnv* env, jclass /* jcls */, jlong config_holder_handle) {
188+
using namespace TERARKDB_NAMESPACE::flink;
189+
auto config_holder =
190+
*(reinterpret_cast<std::shared_ptr<FlinkCompactionFilter::ConfigHolder>*>(
191+
config_holder_handle));
192+
return reinterpret_cast<jlong>(new FlinkValueExtractorFactory(config_holder));
193+
}
194+
186195
/*
187196
* Class: org_rocksdb_FlinkCompactionFilter
188197
* Method: createNewFlinkCompactionFilter0

java/rocksjni/options.cc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3305,6 +3305,20 @@ void Java_org_rocksdb_ColumnFamilyOptions_setMergeOperator(
33053305
mergeOperatorHandle));
33063306
}
33073307

3308+
/*
3309+
* Class: org_rocksdb_ColumnFamilyOptions
3310+
* Method: setCompactionFilterHandle
3311+
* Signature: (JJ)V
3312+
*/
3313+
void Java_org_rocksdb_ColumnFamilyOptions_setValueExtractorFactoryHandle(
3314+
JNIEnv* /*env*/, jobject /*jobj*/, jlong jopt_handle,
3315+
jlong jvaluemeta_extractor_factory_handle) {
3316+
reinterpret_cast<TERARKDB_NAMESPACE::ColumnFamilyOptions*>(jopt_handle)
3317+
->value_meta_extractor_factory =
3318+
reinterpret_cast<TERARKDB_NAMESPACE::ValueExtractorFactory*>(
3319+
jvaluemeta_extractor_factory_handle);
3320+
}
3321+
33083322
/*
33093323
* Class: org_rocksdb_ColumnFamilyOptions
33103324
* Method: setCompactionFilterHandle

java/rocksjni/value_extractor.cc

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2+
// This source code is licensed under both the GPLv2 (found in the
3+
// COPYING file in the root directory) and Apache 2.0 License
4+
// (found in the LICENSE.Apache file in the root directory).
5+
//
6+
// This file implements the "bridge" between Java and C++ for
7+
// TERARKDB_NAMESPACE::CompactionFilter.
8+
9+
#include "rocksdb/value_extractor.h"
10+
11+
#include <jni.h>
12+
13+
#include "include/org_rocksdb_AbstractValueExtractorFactory.h"
14+
15+
// <editor-fold desc="org.rocksdb.AbstractCompactionFilter">
16+
17+
/*
18+
* Class: org_rocksdb_AbstractCompactionFilter
19+
* Method: disposeInternal
20+
* Signature: (J)V
21+
*/
22+
void Java_org_rocksdb_AbstractValueExtractorFactory_disposeInternal(
23+
JNIEnv* /*env*/, jobject /*jobj*/, jlong handle) {
24+
auto* vef =
25+
reinterpret_cast<TERARKDB_NAMESPACE::ValueExtractorFactory*>(handle);
26+
assert(vef != nullptr);
27+
delete vef;
28+
}
29+
// </editor-fold>
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2+
// This source code is licensed under both the GPLv2 (found in the
3+
// COPYING file in the root directory) and Apache 2.0 License
4+
// (found in the LICENSE.Apache file in the root directory).
5+
6+
package org.rocksdb;
7+
8+
/**
9+
* Each compaction will create a new {@link AbstractCompactionFilter}
10+
* allowing the application to know about different compactions
11+
*
12+
* @param <T> The concrete type of the compaction filter
13+
*/
14+
public abstract class AbstractValueExtractorFactory<T extends AbstractSlice<?>>
15+
extends RocksObject {
16+
17+
public AbstractValueExtractorFactory(final long nativeHandle) {
18+
super(nativeHandle);
19+
}
20+
21+
22+
/**
23+
* A name which identifies this compaction filter
24+
*
25+
* The name will be printed to the LOG file on start up for diagnosis
26+
*/
27+
public abstract String name();
28+
29+
/**
30+
* We override {@link RocksCallbackObject#disposeInternal()}
31+
* as disposing of a TERARKDB_NAMESPACE::AbstractCompactionFilterFactory requires
32+
* a slightly different approach as it is a std::shared_ptr
33+
*/
34+
@Override
35+
protected void disposeInternal() {
36+
disposeInternal(nativeHandle_);
37+
}
38+
39+
private native void disposeInternal(final long handle);
40+
}

java/src/main/java/org/rocksdb/FlinkCompactionFilter.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public FlinkCompactionFilter(ConfigHolder configHolder, TimeProvider timeProvide
2626
public FlinkCompactionFilter(ConfigHolder configHolder, TimeProvider timeProvider, Logger logger) {
2727
super(createNewFlinkCompactionFilter0(configHolder.nativeHandle_, timeProvider, logger == null ? 0 : logger.nativeHandle_));
2828
}
29-
29+
private native static long createFlinkValueExtractorFactory(long configHolderHandle);
3030
private native static long createNewFlinkCompactionFilter0(long configHolderHandle, TimeProvider timeProvider, long loggerHandle);
3131
private native static long createNewFlinkCompactionFilterConfigHolder();
3232
private native static void disposeFlinkCompactionFilterConfigHolder(long configHolderHandle);
@@ -118,6 +118,18 @@ public interface TimeProvider {
118118
long currentTimestamp();
119119
}
120120

121+
public static class FlinkValueExtractorFactory
122+
extends AbstractValueExtractorFactory<Slice> {
123+
124+
public FlinkValueExtractor(ConfigHolder configHolder) {
125+
super(createFlinkValueExtractorFactory(configHolder));
126+
}
127+
@Override
128+
public String name() {
129+
return "FlinkValueExtractorFactory";
130+
}
131+
}
132+
121133
public static class FlinkCompactionFilterFactory extends AbstractCompactionFilterFactory<FlinkCompactionFilter> {
122134
private final ConfigHolder configHolder;
123135
private final TimeProvider timeProvider;

utilities/flink/flink_compaction_filter.h

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ static const std::size_t JAVA_MAX_SIZE = static_cast<std::size_t>(0x7fffffff);
3333
* Note: this compaction filter is a special implementation, designed for usage
3434
* only in Apache Flink project.
3535
*/
36-
class FlinkCompactionFilter : public CompactionFilter, ValueExtractor {
36+
class FlinkCompactionFilter : public CompactionFilter, public ValueExtractor {
3737
public:
3838
enum StateType {
3939
// WARNING!!! Do not change the order of enum entries as it is important for
@@ -144,7 +144,7 @@ class FlinkCompactionFilter : public CompactionFilter, ValueExtractor {
144144
std::string* skip_until) const override;
145145

146146
Status Extract(const Slice& key, const Slice& value,
147-
std::string* output) const override;
147+
std::string* output) const override;
148148

149149
bool IgnoreSnapshots() const override { return true; }
150150

@@ -196,5 +196,22 @@ static const FlinkCompactionFilter::Config DISABLED_CONFIG =
196196
std::numeric_limits<int64_t>::max(),
197197
std::numeric_limits<int64_t>::max(), nullptr};
198198

199+
class FlinkValueExtractorFactory : public ValueExtractorFactory {
200+
public:
201+
const char* Name() const override {
202+
return "flink.ValueTimeStampExtractorFactory";
203+
}
204+
explicit FlinkValueExtractorFactory(
205+
std::shared_ptr<FlinkCompactionFilter::ConfigHolder> config_holder)
206+
: config_holder_(std::move(config_holder)) {}
207+
std::unique_ptr<ValueExtractor> CreateValueExtractor(
208+
const Context& context) const {
209+
return std::unique_ptr<ValueExtractor>(
210+
new FlinkCompactionFilter(config_holder_, nullptr, nullptr));
211+
};
212+
private:
213+
std::shared_ptr<FlinkCompactionFilter::ConfigHolder> config_holder_;
214+
};
215+
199216
} // namespace flink
200217
} // namespace TERARKDB_NAMESPACE

0 commit comments

Comments
 (0)