Skip to content

[FSTORE-1183] Error downloading credentials for Kafka storage connector in Python mode #1213

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Feb 26, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.logicalclocks.hsfs.beam.StreamFeatureGroup;
import com.logicalclocks.hsfs.metadata.DatasetApi;
import com.logicalclocks.hsfs.engine.EngineBase;
import com.logicalclocks.hsfs.engine.FeatureGroupUtils;
import com.logicalclocks.hsfs.metadata.HopsworksInternalClient;
import org.apache.avro.Schema;

Expand All @@ -34,6 +35,7 @@

public class BeamEngine extends EngineBase {
private static BeamEngine INSTANCE = null;
private FeatureGroupUtils featureGroupUtils = new FeatureGroupUtils();

public static synchronized BeamEngine getInstance() throws FeatureStoreException {
if (INSTANCE == null) {
Expand Down Expand Up @@ -71,7 +73,7 @@ public String addFile(String filePath) throws IOException, FeatureStoreException
}
String targetPath = System.getProperty("java.io.tmpdir") + filePath.substring(filePath.lastIndexOf("/"));
try (FileOutputStream outputStream = new FileOutputStream(targetPath)) {
outputStream.write(DatasetApi.readContent(filePath, "HIVEDB"));
outputStream.write(DatasetApi.readContent(filePath, featureGroupUtils.getDatasetType(filePath)));
}
return targetPath;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,4 +240,11 @@ private void checkListdiff(List<String> primaryPartitionKeyNames, List<String> f
public Subject getSubject(FeatureGroupBase featureGroup) throws FeatureStoreException, IOException {
return kafkaApi.getSubject(featureGroup.getFeatureStore(), getFgName(featureGroup));
}

public String getDatasetType(String path) {
if (Pattern.compile("^(?:hdfs://|)/apps/hive/warehouse/*").matcher(path).find()) {
return "HIVEDB";
}
return "DATASET";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (c) 2024. Hopsworks AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*
* See the License for the specific language governing permissions and limitations under the License.
*
*/

package com.logicalclocks.hsfs.engine;

import org.junit.Assert;
import org.junit.jupiter.api.Test;

import com.fasterxml.jackson.core.JsonProcessingException;

public class TestFeatureGroupUtils {

private FeatureGroupUtils featureGroupUtils = new FeatureGroupUtils();

@Test
void testGetDatasetTypeHIVEDB() throws JsonProcessingException {
// Arrange
String path = "/apps/hive/warehouse/temp_featurestore.db/storage_connector_resources/kafka__tstore.jks";

// Act
String databaseType = featureGroupUtils.getDatasetType(path);

// Assert
Assert.assertEquals("HIVEDB", databaseType);
}

@Test
void testGetDatasetTypeHIVEDBWithDfs() throws JsonProcessingException {
// Arrange
String path = "hdfs:///apps/hive/warehouse/temp_featurestore.db/storage_connector_resources/kafka__tstore.jks";

// Act
String databaseType = featureGroupUtils.getDatasetType(path);

// Assert
Assert.assertEquals("HIVEDB", databaseType);
}

@Test
void testGetDatasetTypeDATASET() throws JsonProcessingException {
// Arrange
String path = "/Projects/temp/Resources/kafka__tstore.jks";

// Act
String databaseType = featureGroupUtils.getDatasetType(path);

// Assert
Assert.assertEquals("DATASET", databaseType);
}

@Test
void testGetDatasetTypeDATASETWithDfs() throws JsonProcessingException {
// Arrange
String path = "hdfs:///Projects/temp/Resources/kafka__tstore.jks";

// Act
String databaseType = featureGroupUtils.getDatasetType(path);

// Assert
Assert.assertEquals("DATASET", databaseType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@
public class SparkEngine extends EngineBase {

private final StorageConnectorUtils storageConnectorUtils = new StorageConnectorUtils();

private FeatureGroupUtils featureGroupUtils = new FeatureGroupUtils();

private static SparkEngine INSTANCE = null;
Expand All @@ -143,7 +142,6 @@ public static void setInstance(SparkEngine sparkEngine) {
@Getter
private SparkSession sparkSession;

private FeatureGroupUtils utils = new FeatureGroupUtils();
private HudiEngine hudiEngine = new HudiEngine();

private SparkEngine() {
Expand Down Expand Up @@ -657,7 +655,7 @@ private Dataset<Row> onlineFeatureGroupToAvro(FeatureGroupBase featureGroupBase,

public void writeEmptyDataframe(FeatureGroupBase featureGroup)
throws IOException, FeatureStoreException, ParseException {
String fgTableName = utils.getTableName(featureGroup);
String fgTableName = featureGroupUtils.getTableName(featureGroup);
Dataset emptyDf = sparkSession.table(fgTableName).limit(0);
writeOfflineDataframe(featureGroup, emptyDf, HudiOperationType.UPSERT, new HashMap<>(), null);
}
Expand All @@ -681,8 +679,8 @@ private void writeSparkDataset(FeatureGroupBase featureGroup, Dataset<Row> datas
.mode(SaveMode.Append)
// write options cannot be null
.options(writeOptions == null ? new HashMap<>() : writeOptions)
.partitionBy(utils.getPartitionColumns(featureGroup))
.saveAsTable(utils.getTableName(featureGroup));
.partitionBy(featureGroupUtils.getPartitionColumns(featureGroup))
.saveAsTable(featureGroupUtils.getTableName(featureGroup));
}

public String profile(Dataset<Row> df, List<String> restrictToColumns, Boolean correlation,
Expand Down Expand Up @@ -935,7 +933,7 @@ public String addFile(String filePath) throws FeatureStoreException {
java.nio.file.Path targetPath = Paths.get(SparkFiles.getRootDirectory(), fileName);

try (FileOutputStream outputStream = new FileOutputStream(targetPath.toString())) {
outputStream.write(DatasetApi.readContent(filePath, "HIVEDB"));
outputStream.write(DatasetApi.readContent(filePath, featureGroupUtils.getDatasetType(filePath)));
} catch (IOException e) {
throw new FeatureStoreException("Error setting up file: " + filePath, e);
}
Expand Down
4 changes: 3 additions & 1 deletion python/hsfs/engine/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,9 @@ def add_file(self, file):

local_file = os.path.join("/tmp", os.path.basename(file))
if not os.path.exists(local_file):
content_stream = self._dataset_api.read_content(file, "HIVEDB")
content_stream = self._dataset_api.read_content(
file, util.get_dataset_type(file)
)
bytesio_object = BytesIO(content_stream.content)
# Write the stuff
with open(local_file, "wb") as f:
Expand Down
2 changes: 1 addition & 1 deletion python/hsfs/engine/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,7 @@ def add_file(self, file):
if isinstance(client.get_instance(), client.external.Client):
tmp_file = os.path.join(SparkFiles.getRootDirectory(), file_name)
print("Reading key file from storage connector.")
response = self._dataset_api.read_content(tmp_file, "HIVEDB")
response = self._dataset_api.read_content(file, util.get_dataset_type(file))

with open(tmp_file, "wb") as f:
f.write(response.content)
Expand Down
7 changes: 7 additions & 0 deletions python/hsfs/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,13 @@ def get_host_name():
return host


def get_dataset_type(path: str):
if re.match(r"^(?:hdfs://|)/apps/hive/warehouse/*", path):
return "HIVEDB"
else:
return "DATASET"


async def create_async_engine(
online_conn, external: bool, default_min_size: int, options: dict = None
):
Expand Down
22 changes: 22 additions & 0 deletions python/tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,28 @@ def test_convert_hudi_commit_time_to_timestamp(self):
timestamp = util.get_timestamp_from_date_string("20221118095233099")
assert timestamp == 1668765153099

def test_get_dataset_type_HIVEDB(self):
db_type = util.get_dataset_type(
"/apps/hive/warehouse/temp_featurestore.db/storage_connector_resources/kafka__tstore.jks"
)
assert db_type == "HIVEDB"

def test_get_dataset_type_HIVEDB_with_dfs(self):
db_type = util.get_dataset_type(
"hdfs:///apps/hive/warehouse/temp_featurestore.db/storage_connector_resources/kafka__tstore.jks"
)
assert db_type == "HIVEDB"

def test_get_dataset_type_DATASET(self):
db_type = util.get_dataset_type("/Projects/temp/Resources/kafka__tstore.jks")
assert db_type == "DATASET"

def test_get_dataset_type_DATASET_with_dfs(self):
db_type = util.get_dataset_type(
"hdfs:///Projects/temp/Resources/kafka__tstore.jks"
)
assert db_type == "DATASET"

def test_get_job_url(self, mocker):
# Arrange
mock_client_get_instance = mocker.patch("hsfs.client.get_instance")
Expand Down