Skip to content

Commit

Permalink
[FLINK-4602][State] Move RocksDB statebackend classes to o.a.f.state.…
Browse files Browse the repository at this point in the history
…rocksdb package. (#25543)
  • Loading branch information
AlexYinHan authored Oct 29, 2024
1 parent cddb14e commit 316daca
Show file tree
Hide file tree
Showing 150 changed files with 2,913 additions and 2,654 deletions.
2 changes: 1 addition & 1 deletion docs/content.zh/docs/dev/python/table/table_environment.md
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,7 @@ table_env.get_config().set("execution.checkpointing.interval", "3min")

# 设置 statebackend 类型为 "rocksdb",其他可选项有 "hashmap"
# 你也可以将这个属性设置为 StateBackendFactory 的完整类名
# e.g. org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackendFactory
# e.g. org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackendFactory
table_env.get_config().set("state.backend.type", "rocksdb")

# 设置 RocksDB statebackend 所需要的 checkpoint 目录
Expand Down
2 changes: 1 addition & 1 deletion docs/content.zh/docs/ops/state/state_backends.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ env = StreamExecutionEnvironment.get_execution_environment(config)

可选值包括 *jobmanager* (HashMapStateBackend), *rocksdb* (EmbeddedRocksDBStateBackend),
或使用实现了 state backend 工厂 {{< gh_link file="flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java" name="StateBackendFactory" >}} 的类的全限定类名,
例如: EmbeddedRocksDBStateBackend 对应为 `org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackendFactory`
例如: EmbeddedRocksDBStateBackend 对应为 `org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackendFactory`

`execution.checkpointing.dir` 选项指定了所有 State Backend 写 CheckPoint 数据和写元数据文件的目录。
你能在 [这里]({{< ref "docs/ops/state/checkpoints" >}}#directory-structure) 找到关于 CheckPoint 目录结构的详细信息。
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/dev/python/table/table_environment.md
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ table_env.get_config().set("execution.checkpointing.interval", "3min")

# set the statebackend type to "rocksdb", other available options are "hashmap"
# you can also set the full qualified Java class name of the StateBackendFactory to this option
# e.g. org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackendFactory
# e.g. org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackendFactory
table_env.get_config().set("state.backend.type", "rocksdb")

# set the checkpoint directory, which is required by the RocksDB statebackend
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/ops/state/state_backends.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ A default state backend can be configured in the [Flink configuration file]({{<

Possible values for the config entry are *hashmap* (HashMapStateBackend), *rocksdb* (EmbeddedRocksDBStateBackend), or the fully qualified class
name of the class that implements the state backend factory {{< gh_link file="flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java" name="StateBackendFactory" >}},
such as `org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackendFactory` for EmbeddedRocksDBStateBackend.
such as `org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackendFactory` for EmbeddedRocksDBStateBackend.

The `execution.checkpointing.dir` option defines the directory to which all backends write checkpoint data and meta data files.
You can find more details about the checkpoint directory structure [here]({{< ref "docs/ops/state/checkpoints" >}}#directory-structure).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,14 +456,14 @@ public static class RocksdbCompactFilterCleanupStrategy

/**
* @deprecated Use {@link
* org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions#COMPACT_FILTER_PERIODIC_COMPACTION_TIME}
* org.apache.flink.state.rocksdb.RocksDBConfigurableOptions#COMPACT_FILTER_PERIODIC_COMPACTION_TIME}
* instead.
*/
@Deprecated static final Duration DEFAULT_PERIODIC_COMPACTION_TIME = Duration.ofDays(30);

/**
* @deprecated Use {@link
* org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions#COMPACT_FILTER_QUERY_TIME_AFTER_NUM_ENTRIES}
* org.apache.flink.state.rocksdb.RocksDBConfigurableOptions#COMPACT_FILTER_QUERY_TIME_AFTER_NUM_ENTRIES}
* instead.
*/
@Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class ConfigurationOptionLocator {
"flink-metrics/flink-metrics-otel", "org.apache.flink.metrics.otel"),
new OptionsClassLocation(
"flink-state-backends/flink-statebackend-rocksdb",
"org.apache.flink.contrib.streaming.state"),
"org.apache.flink.state.rocksdb"),
new OptionsClassLocation(
"flink-table/flink-table-api-java", "org.apache.flink.table.api.config"),
new OptionsClassLocation("flink-python", "org.apache.flink.python"),
Expand Down Expand Up @@ -95,7 +95,7 @@ public class ConfigurationOptionLocator {
"org.apache.flink.configuration.WritableConfig",
"org.apache.flink.configuration.ConfigOptions",
"org.apache.flink.streaming.api.environment.CheckpointConfig",
"org.apache.flink.contrib.streaming.state.PredefinedOptions",
"org.apache.flink.state.rocksdb.PredefinedOptions",
"org.apache.flink.python.PythonConfig",
"org.apache.flink.cep.configuration.SharedBufferCacheConfig",
"org.apache.flink.table.api.config.LookupJoinHintOptions"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
package org.apache.flink.streaming.tests;

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.legacy.PrintSinkFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public static void main(String[] args) throws Exception {
boolean incrementalCheckpoints = params.getBoolean("incremental-checkpoints", false);
configuration.set(
StateBackendOptions.STATE_BACKEND,
"org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackendFactory");
"org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackendFactory");
configuration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, incrementalCheckpoints);
configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.cep.time.TimerService;
import org.apache.flink.cep.utils.CepOperatorTestUtilities;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.mock.Whitebox;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.cep.utils.CepOperatorTestUtilities;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;

/** IT Case for reading keyed state from a memory state backend. */
public class EmbeddedRocksDBStateBackendReaderKeyedStateITCase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;

/** IT Case for reading window state with the embedded rocksdb state backend. */
public class EmbeddedRocksDBStateBackendWindowITCase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
import org.apache.flink.state.api.functions.KeyedStateReaderFunction;
import org.apache.flink.state.api.utils.JobResultRetriever;
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.AbstractTestBaseJUnit4;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
Expand All @@ -39,6 +38,7 @@
import org.apache.flink.state.api.functions.BroadcastStateBootstrapFunction;
import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
import org.apache.flink.state.api.functions.StateBootstrapFunction;
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.state.api.utils.MaxWatermarkSource;
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
import org.apache.flink.state.api.output.operators.KeyedStateBootstrapOperator;
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
Expand Down
8 changes: 4 additions & 4 deletions flink-python/pyflink/datastream/state_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def _from_j_state_backend(j_state_backend):
gateway = get_gateway()
JStateBackend = gateway.jvm.org.apache.flink.runtime.state.StateBackend
JHashMapStateBackend = gateway.jvm.org.apache.flink.runtime.state.hashmap.HashMapStateBackend
JEmbeddedRocksDBStateBackend = gateway.jvm.org.apache.flink.contrib.streaming.state.\
JEmbeddedRocksDBStateBackend = gateway.jvm.org.apache.flink.state.rocksdb.\
EmbeddedRocksDBStateBackend
j_clz = j_state_backend.getClass()

Expand Down Expand Up @@ -191,7 +191,7 @@ def __init__(self,
if j_embedded_rocks_db_state_backend is None:
gateway = get_gateway()
JTernaryBoolean = gateway.jvm.org.apache.flink.util.TernaryBoolean
JEmbeddedRocksDBStateBackend = gateway.jvm.org.apache.flink.contrib.streaming.state \
JEmbeddedRocksDBStateBackend = gateway.jvm.org.apache.flink.state.rocksdb \
.EmbeddedRocksDBStateBackend

if enable_incremental_checkpointing not in (None, True, False):
Expand Down Expand Up @@ -316,7 +316,7 @@ def set_options(self, options_factory_class_name: str):
The options factory must have a default constructor.
"""
gateway = get_gateway()
JOptionsFactory = gateway.jvm.org.apache.flink.contrib.streaming.state.RocksDBOptionsFactory
JOptionsFactory = gateway.jvm.org.apache.flink.state.rocksdb.RocksDBOptionsFactory
j_options_factory_clz = load_java_class(options_factory_class_name)
if not get_java_class(JOptionsFactory).isAssignableFrom(j_options_factory_clz):
raise ValueError("The input class does not implement RocksDBOptionsFactory.")
Expand Down Expand Up @@ -448,7 +448,7 @@ def _from_j_predefined_options(j_predefined_options) -> 'PredefinedOptions':

def _to_j_predefined_options(self):
gateway = get_gateway()
JPredefinedOptions = gateway.jvm.org.apache.flink.contrib.streaming.state.PredefinedOptions
JPredefinedOptions = gateway.jvm.org.apache.flink.state.rocksdb.PredefinedOptions
return getattr(JPredefinedOptions, self.name)


Expand Down
4 changes: 2 additions & 2 deletions flink-python/pyflink/datastream/tests/test_state_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,11 @@ def test_get_set_options(self):
self.assertIsNone(state_backend.get_options())

state_backend.set_options(
"org.apache.flink.contrib.streaming.state."
"org.apache.flink.state.rocksdb."
"RocksDBStateBackendConfigTest$TestOptionsFactory")

self.assertEqual(state_backend.get_options(),
"org.apache.flink.contrib.streaming.state."
"org.apache.flink.state.rocksdb."
"RocksDBStateBackendConfigTest$TestOptionsFactory")

def test_get_set_number_of_transfer_threads(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@

/**
* Several integration tests for queryable state using the {@link
* org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend}.
* org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend}.
*/
class HAQueryableStateRocksDBBackendITCase extends AbstractQueryableStateTestBase {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

/**
* Several integration tests for queryable state using the {@link
* org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend}.
* org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend}.
*/
public class NonHAQueryableStateRocksDBBackendITCase extends AbstractQueryableStateTestBase {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBTestUtils;
import org.apache.flink.queryablestate.client.VoidNamespace;
import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend;
import org.apache.flink.state.rocksdb.RocksDBTestUtils;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class StateBackendLoader {

/** Used for loading RocksDBStateBackend. */
private static final String ROCKSDB_STATE_BACKEND_FACTORY =
"org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackendFactory";
"org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackendFactory";

/** Used for loading ForStStateBackend. */
private static final String FORST_STATE_BACKEND_FACTORY =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateLatencyTrackOptions;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackendTest;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackendTest;
import org.apache.flink.testutils.junit.utils.TempDirUtils;

import org.junit.jupiter.api.Disabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.StateBackendTestBase;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
import org.apache.flink.util.IOUtils;

import org.junit.After;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
Expand All @@ -41,6 +40,7 @@
import org.apache.flink.runtime.state.delegate.DelegatingStateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
Expand Down
Loading

0 comments on commit 316daca

Please sign in to comment.