Description
Query engine
I'm using spark as a processing engine.
Question
I have the following docker compose file:
version: "3"
services:
spark-iceberg:
image: tabulario/spark-iceberg
container_name: spark-iceberg
build: spark/
networks:
iceberg_net:
depends_on:
- rest
- minio
volumes:
- ./warehouse:/home/iceberg/warehouse
- ./notebooks:/home/iceberg/notebooks/notebooks
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
ports:
- 8888:8888
- 8080:8080
- 10000:10000
- 10001:10001
zookeeper:
image: wurstmeister/zookeeper:latest
expose:
- "2181"
kafka:
image: wurstmeister/kafka:2.13-2.8.1
depends_on:
- zookeeper
ports:
- "9092:9092"
expose:
- "9093"
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
postgres:
image: postgres:16
container_name: pg-catalog-store-spark
networks:
iceberg_net:
ports:
- "5432:5432"
environment:
- POSTGRES_USER=iceberg_postgres_user
- POSTGRES_PASSWORD=iceberg_postgres_password
- POSTGRES_DB=iceberg
volumes:
- pg_catalog_volume_spark:/var/lib/postgresql/data
rest:
image: tabulario/iceberg-rest
container_name: iceberg-rest-spark
networks:
iceberg_net:
ports:
- 8181:8181
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=eu-west-1
- CATALOG_WAREHOUSE=s3://warehouse/
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
- CATALOG_CATALOG_IMPL=org.apache.iceberg.jdbc.JdbcCatalog
- CATALOG_URI=jdbc:postgresql://postgres:5432/iceberg
- CATALOG_JDBC_USER=iceberg_postgres_user
- CATALOG_JDBC_PASSWORD=iceberg_postgres_password
- CATALOG_S3_ENDPOINT=http://minio:9000
- CATALOG_S3_PATH__STYLE_ACCESS=true
depends_on:
- postgres
- minio
minio:
image: minio/minio
container_name: minio-store-spark
environment:
- MINIO_ROOT_USER=admin
- MINIO_ROOT_PASSWORD=password
- MINIO_DOMAIN=minio
networks:
iceberg_net:
aliases:
- warehouse.minio
ports:
- 9001:9001
- 9000:9000
command: ["server", "/data", "--console-address", ":9001"]
mc:
depends_on:
- minio
image: minio/mc
container_name: mc-spark
networks:
iceberg_net:
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=eu-west-1
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
/usr/bin/mc mb --ignore-existing minio/warehouse;
/usr/bin/mc mb --ignore-existing minio/metadata;
/usr/bin/mc policy set public minio/warehouse;
/usr/bin/mc policy set public minio/metadata;
tail -f /dev/null
"
volumes:
pg_catalog_volume_spark:
networks:
iceberg_net:
What I'm trying to do is stream some data from a python file using kafka as a broker, and then use spark to do something to that data, and then write it to a iceberg table. I am however having trouble appending some data to a iceberg table. Here is my code:
package src;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;
public class Sandbox {
public static void main(String[] args) {
// Initialize Spark Session with Iceberg REST catalog configuration
SparkSession spark = SparkSession.builder()
.appName("IcebergCatalogExample")
.master("local[*]") // Use local mode for testing
.config("spark.sql.catalog.rest", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.rest.type", "rest")
.config("spark.sql.catalog.rest.uri", "http://localhost:8181") // REST catalog endpoint
.config("spark.sql.catalog.rest.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
.config("spark.sql.catalog.rest.client-type", "rest")
.config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000") // MinIO endpoint
.config("spark.hadoop.fs.s3a.access.key", "admin")
.config("spark.hadoop.fs.s3a.secret.key", "password")
.config("spark.hadoop.fs.s3a.path.style.access", "true") // Required for MinIO
.config("spark.hadoop.fs.s3a.endpoint.region", "eu-west-1") // Use the region of your MinIO deployment
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") // S3A implementation
.getOrCreate();
// Define the namespace and table name
String namespace = "example_namespace";
String tableName = "example_table";
// Define the schema
Schema schema = new Schema(
Types.NestedField.required(1, "id", Types.LongType.get()),
Types.NestedField.required(2, "name", Types.StringType.get()),
Types.NestedField.optional(3, "email", Types.StringType.get()),
Types.NestedField.optional(4, "age", Types.IntegerType.get())
);
// Create the table if it doesn't exist
String fullTableName = String.format("rest.%s.%s", namespace, tableName);
try {
// Create the table
spark.sql(
String.format(
"CREATE TABLE IF NOT EXISTS %s " +
"(id BIGINT, name STRING, email STRING, age INT) " +
"USING iceberg " +
"PARTITIONED BY (age)",
fullTableName
)
);
System.out.println("Table created: " + fullTableName);
// Insert example data
Dataset<Row> data = spark.sql("SELECT 1 AS id, 'John Doe' AS name, '[email protected]' AS email, 30 AS age");
data.writeTo(fullTableName).append();
// Read and display the data
Dataset<Row> tableData = spark.read().format("iceberg").load(fullTableName);
tableData.show();
} catch (Exception e) {
System.err.println("Error while creating or accessing the table: " + e.getMessage());
e.printStackTrace();
}
// Stop the Spark session
spark.stop();
}
}
The endpoint minio
is modified because this is being run as a standalone cluster, outside of the cluster that is defined in the docker compose. When I run this, I get the error:
software.amazon.awssdk.services.s3.model.S3Exception: The bucket you are attempting to access must be addressed using the specified endpoint. Please send all future requests to this endpoint. (Service: S3, Status Code: 301, Request ID: 332NMTVVN450E0A2, Extended Request ID: MvTvqnln5tD/O3N1B4iyBc+8bzR0BiNro2Frh5/NH09KiZAHUj/UbOMtYZUdBWvFjlkAX0RMSzOF+Ik/SRnltA5H5P1dciD8zzUq0O68dKU=)
at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleErrorResponse(AwsXmlPredicatedResponseHandler.java:156)
at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleResponse(AwsXmlPredicatedResponseHandler.java:108)
at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:85)
at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:43)
at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler$Crc32ValidationResponseHandler.handle(AwsSyncClientHandler.java:93)
at software.amazon.awssdk.core.internal.handler.BaseClientHandler.lambda$successTransformationResponseHandler$7(BaseClientHandler.java:279)
at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:50)
at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:38)
at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:74)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:43)
at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:79)
at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:41)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:55)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:39)
at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage2.executeRequest(RetryableStage2.java:93)
at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage2.execute(RetryableStage2.java:56)
at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage2.execute(RetryableStage2.java:36)
at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:53)
at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:35)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:82)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:62)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:43)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:50)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:32)
at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:210)
at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103)
at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:173)
at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:80)
at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:182)
at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:74)
at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:53)
at software.amazon.awssdk.services.s3.DefaultS3Client.putObject(DefaultS3Client.java:10628)
at org.apache.iceberg.aws.s3.S3OutputStream.completeUploads(S3OutputStream.java:444)
at org.apache.iceberg.aws.s3.S3OutputStream.close(S3OutputStream.java:270)
at org.apache.iceberg.aws.s3.S3OutputStream.close(S3OutputStream.java:256)
at org.apache.iceberg.shaded.org.apache.parquet.io.DelegatingPositionOutputStream.close(DelegatingPositionOutputStream.java:38)
at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:1204)
at org.apache.iceberg.parquet.ParquetWriter.close(ParquetWriter.java:257)
at org.apache.iceberg.io.DataWriter.close(DataWriter.java:82)
at org.apache.iceberg.io.RollingFileWriter.closeCurrentWriter(RollingFileWriter.java:122)
at org.apache.iceberg.io.RollingFileWriter.close(RollingFileWriter.java:147)
at org.apache.iceberg.io.RollingDataWriter.close(RollingDataWriter.java:32)
at org.apache.iceberg.io.FanoutWriter.closeWriters(FanoutWriter.java:81)
at org.apache.iceberg.io.FanoutWriter.close(FanoutWriter.java:73)
at org.apache.iceberg.io.FanoutDataWriter.close(FanoutDataWriter.java:31)
at org.apache.iceberg.spark.source.SparkWrite$PartitionedDataWriter.close(SparkWrite.java:804)
at org.apache.iceberg.spark.source.SparkWrite$PartitionedDataWriter.commit(SparkWrite.java:786)
at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$5(WriteToDataSourceV2Exec.scala:475)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:491)
at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:430)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:496)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:393)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
I've found similar/identical problems here:
https://stackoverflow.com/questions/77099097/error-when-reading-s3-data-with-spark-using-iceberg-the-bucket-you-are-attempt
https://stackoverflow.com/questions/78846530/error-when-append-data-to-iceberg-using-spark
But I cannot figure out what I am doing wrong. Any help is appreciated.
Update: The issue is only with inserting data into a iceberg table. I can create, delete and update tables without issues.