Skip to content

Improve Export performance #3248

@olivbrau

Description

@olivbrau

Hello,

I would like some advice on how to use the Java-parquet library as efficiently as possible.
To export a Parquet file, I noticed there are several possible approaches.
I tried several of them, but each time I obtained performance significantly lower than the R function write_parquet(), which I used as a benchmark reference.
I ended up with the method I will describe just below, but I would like to know if there are things I could improve.
The idea is, if possible, not to use intermediate structures (RecordConsumer, SimpleGroup, etc.), which duplicate the data and seem to slow down the export.
My goal is to use the lowest-level API of Parquet to make the process faster.

The data I want to export is stored in a memory structure (essentially plain Java arrays):

  • I create a Configuration object, notably specifying compression settings
  • I create a schema (MessageType) describing the table I want to export
  • I create a ParquetFileWriter with default options, assigned to a LocalOutputFile
  • I create a ParquetProperties, specifying version 2_0, various options (dictionaries,
    withPageSize = 8MB, withPageRowCountLimit = 1000)
  • I create a CompressionCodecFactory.BytesInputCompressor, using SNAPPY
  • Then I start writing the data:
    • I call fileWriter.start()
    • I manually compute the number of row groups and how many rows each row group will contain
    • For each row group:
      • I create a ColumnChunkPageWriteStore
      • I create a ColumnWriteStore from that ColumnChunkPageWriteStore
      • I create a MessageColumnIO
      • I create as many ColumnWriter as there are columns to export
      • For each column, I call a series of write() for each row of the current row group
      • At the end of the row group, I call a series of endRecord() on my ColumnWriteStore
      • Then I call startBlock() on my fileWriter, flush() on the ColumnWriteStore,
        flushToFileWriter() on the ColumnChunkPageWriteStore, and finally endBlock()

All of this seems so complex that I wonder if I’m doing things the right way.
I tried parallelizing the writing phase, but the time-consuming part is the flush phase
at the end of the row group: that’s when the compressors do their work, sequentially,
and I find it unfortunate that it's impossible to parallelize this step — it would save a lot of time.
Moreover, the entire row group is kept in memory (duplicated from the original data) before flushing.
This consumes a lot of memory, forcing me to use many row groups. But the more row groups there are,
the larger the final file becomes, as each row group seems to take up a fair amount of space (headers, etc.) in the Parquet file,
and the export is also significantly slower.

I’m attaching my Java code that implements these steps.
Thank you in advance for any advice on how to improve this.

package ob.analyseurdata.core.io.tableexport.parquet;

import java.awt.Component;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.stream.IntStream;
import javax.swing.JOptionPane;
import jsat.utils.concurrent.ParallelUtils;
import ob.analyseurdata.core.data.Column;
import ob.common.data.UtilsData;
import static ob.common.data.UtilsData.DataTypePrimitif.DBL;
import static ob.common.data.UtilsData.DataTypePrimitif.EMPTY;
import static ob.common.data.UtilsData.DataTypePrimitif.GEOM;
import static ob.common.data.UtilsData.DataTypePrimitif.INT;
import static ob.common.data.UtilsData.DataTypePrimitif.STR;
import ob.common.errors.AbortedException;
import ob.common.errors.MyException;
import ob.common.errors.MyInternalException;
import ob.common.ihm.longtask.I_LongTaskManager;
import ob.common.ihm.longtask.LongTask;
import ob.common.utils.Logger;
import ob.common.utils.UtilsTime;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.zlib.ZlibCompressor;
import org.apache.parquet.column.ColumnWriteStore;
import org.apache.parquet.column.ColumnWriter;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.hadoop.CodecFactory;
import org.apache.parquet.hadoop.ColumnChunkPageWriteStore;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.codec.ZstandardCodec;
import org.apache.parquet.io.ColumnIOFactory;
import org.apache.parquet.io.LocalOutputFile;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
import org.apache.parquet.util.AutoCloseables;

public class TableExporterPARQUET_lowLevelAPI extends LongTask {
	
	private final TableExportParamsPARQUET exportParams;
	
	private boolean hasError = false;
	private Throwable th;

	private ParquetFileWriter fileWriter;
	
	private boolean closed = false;
	private boolean aborted = false;
	private Configuration conf;
	private ParquetProperties props;
	
	// On reprend ici les variables du couple ParquetWriter + InternalParquetRecordWriter
	private CompressionCodecFactory.BytesInputCompressor compressor;
	private MessageType schema;
	private int rowGroupOrdinal = 0; // numéro du RowGroup courant
	private int recordCount; // nb record dans le row group courant
	private ColumnWriteStore columnStore;
	private ColumnChunkPageWriteStore pageStore;
	private BloomFilterWriteStore bloomFilterWriteStore;
	private ColumnWriter[] columnWriters;


	public TableExporterPARQUET_lowLevelAPI(TableExportParamsPARQUET exportParams, I_LongTaskManager ltManager) {
		super(ltManager, "Export", 0);
		this.exportParams = exportParams;
	}
	
	public void export(boolean bShowConfirm, Component compParentConfirm) {
//		// tentative de charger les lib native ...
//		// telechargée ici : https://github.com/cdarlint/winutils
//		// mais ca ne fonctionne pas : NativeCodeLoader rale qu'il ne trouve pas ce qu'il cherche
//		String old = System.getProperty("java.library.path");
//		System.setProperty("hadoop.home.dir",   old + ";D:\\Users\\braultoli\\Desktop\\hadoop-3.3.6\\bin");
//		System.setProperty("java.library.path", old + ";D:\\Users\\braultoli\\Desktop\\hadoop-3.3.6\\bin");
//		System.out.println(System.getProperty("java.library.path"));
//		// ==> ce n'est qu'en mettant  -Djava.library.path=D:\Users\braultoli\Desktop\hadoop-3.3.6\bin
//		// que la librairie native se charge
//		// cependant, elle n'inclut pas  les compresseur natifs ...
		
		try {
						
			int nbColToWrite = exportParams.getNbColToWrite();
			int nbRowToWrite = exportParams.getNbLineToWrite();
			setNbStepTotal(nbRowToWrite);
			setName("Exporting " + exportParams.table.getFullName() + " --> " + exportParams.pathFile);
			
			
			//****************************************************
			Logger.log_INFO1("***********************************************************");
			Logger.log_INFO1("Export PARQUET  " + exportParams.table.getName());
			Logger.log_INFO2(" -> nb de lignes   : " + nbRowToWrite);
			Logger.log_INFO2(" -> nb de colonnes : " + nbColToWrite);


			// Fichier déjà existant ?
			if (new File(exportParams.pathFile).exists()) {
				if (bShowConfirm) {
					int res = JOptionPane.showConfirmDialog(compParentConfirm, "Fichier déjà existant : \n" + exportParams.pathFile + "\n\nVoulez-vous le remplacer ?", "Ecraser le fichier ?", JOptionPane.OK_CANCEL_OPTION, JOptionPane.QUESTION_MESSAGE);
					if (res != JOptionPane.OK_OPTION) {
						Logger.log_INFO1("Export ANNULE");
						Logger.log_INFO1("***********************************************************\n");
						return;
					}
				}
				Files.deleteIfExists(Path.of(exportParams.pathFile));
			}

			if (nbColToWrite==0)
				// Je ne sais pas s'il est valide de créer un fichier parquet vide...
				// Donc on émet une erreur si rien à exporter
				throw new MyException("Aucune colonne à exporter");
			
			
			//************* Export *************************************************
			UtilsTime.startTimer("export parquet");
			taskStarted();
			
			// Configuration Hadoop
			conf = createConfiguration();
			
			// Format de la table
			schema = createSchema();

			// Fichier de sortie
			fileWriter = createFileWriter();
			
			// Propriétés parquet
			props = createEncodingProperties();
			
			// Création du codec
			compressor = createCompressor(conf, props);
			
			
			// Export
			writeData();
			
			
		} catch (AbortedException ex) {
			aborted = true;
			throw new MyException("Annulation de l'export par l'utilisateur");
			
		} catch (OutOfMemoryError ex) {
			hasError = true;
			throw ex; // on re-transmet
			
		} catch (IOException | MyException ex) {
			hasError = true;
			throw new MyException("Impossible d'exporter la table", ex);
			
		} catch (IllegalArgumentException ex) {
			hasError = true;
			throw new MyException("Impossible d'exporter la table à cause d'un paramètre invalide", ex);
			
		} catch (Throwable thex) {
			hasError = true;
			// toute autre erreur de type non géré, on génère une erreur interne
			throw new MyInternalException("Impossible d'exporter la table", thex);
		
		} finally {
			// On change le nom car cela peut etre long (et non interruptible) si ecriture sur lecteur réseau
			Logger.log_INFO2(" -> Closing IO streams ...");
			if (fileWriter != null)
				try {
					fileWriter.close();
				} catch (IOException ex) {}
			taskFinished();
			
			if (aborted || hasError) {
				// en cas d'annulation, on doit supprimer le fichier, qui a toutes les chances
				// d'être dans un état corrompu
				try {
					Files.deleteIfExists(Path.of(exportParams.pathFile));
				} catch (IOException ex) {
				}
			}
				
		}
		
		if (hasError) {
			throw new OutOfMemoryError("Impossible d'exporter la table, pas assez de mémoire.\n-> Essayer de réduire le paramètre RowGroupSize");
		}
		
		UtilsTime.stopTimer("export parquet");
		Logger.log_INFO2(" -> fin de l'export");
		Logger.log_INFO2(" -> durée de l'export : " + UtilsTime.getDuree_ms("export parquet") + " ms");
		Logger.log_INFO1("***********************************************************");
	}
	
	
	private void writeData() throws IOException {
		fileWriter.start();
		
		
		int nbLineWrite = exportParams.getNbLineToWrite();
		int nbRowByRowGroup = estimNbRowForRowGroup();
		int nbRowGroup = nbLineWrite / nbRowByRowGroup;
		if (nbRowGroup == 0)
			nbRowGroup = 1;
		
		Logger.log_INFO2(" - nb row group : " + nbRowGroup);
		int nbCol = exportParams.getNbColToWrite();

		// Pour chaque RowGroup
		for (int numRowGroup = 0; numRowGroup < nbRowGroup; numRowGroup ++) {
			// Création du RowGroup
			initStore();
			
			int row1 = ParallelUtils.getStartBlock(nbLineWrite, numRowGroup, nbRowGroup);
			int row2 = ParallelUtils.getEndBlock  (nbLineWrite, numRowGroup, nbRowGroup);
		
			IntStream stream = ob.common.thread.ParallelUtils.intStream(0, nbCol, exportParams.multiThread);
			stream.forEach(c -> {
				if (!hasError) { 
					exportColCurrentRowGroup(c, row1, row2);
				}
			});

			if (hasError)
				break;
			
			recordCount += row2 - row1;
			
			// On declare les records ajoutés
			for (int i = 0; i <recordCount; i++) {
				columnStore.endRecord();
			}
			
			// Ecriture du rowgroup et démarrage d'un nouveau
			flushRowGroupToStore();
			rowGroupOrdinal++;

			Logger.log_INFO2(" - fin row group n° " + numRowGroup);
		}
		close();
	}
	
	private synchronized void toto() {
		
		long ram = 0;
		for (int c = 0; c <columnWriters.length; c++) {
			ColumnWriter w = columnWriters[c];
			ram += w.getBufferedSizeInMemory();
		}
		System.out.println("!! RAM utilisée : " + ram);
	}
	
	private void exportColCurrentRowGroup(int c, int row1, int row2) {
		Column col = exportParams.vColToExport.get(c);
		ColumnWriter w = columnWriters[c];

		try {
		switch (col.getDataType()) {
			case DBL -> {
				if (exportParams.doubleAsFloat) {
					for (int i = row1; i < row2; i++) {
						if (hasError) break;
						int rowToWrite = exportParams.rowsToExport.getRow_0b(i);
						double val = col.getValue_DBL_fast(rowToWrite);
						if (!UtilsData.isNull(val))
							w.write((float)val, 0, 1);
						else
							// Pour écrire null en parquet, il faut ecrire avec un definitionLevel de 0
							w.writeNull(0, 0);
					}
				} else {
					for (int i = row1; i < row2; i++) {
						if (hasError) break;
						int rowToWrite = exportParams.rowsToExport.getRow_0b(i);
						double val = col.getValue_DBL_fast(rowToWrite);
						if (!UtilsData.isNull(val))
							w.write(val, 0, 1);
						else
							// Pour écrire null en parquet, il faut ecrire avec un definitionLevel de 0
							w.writeNull(0, 0);
					}
				}
			}
			case INT -> {
				for (int i = row1; i < row2; i++) {
					if (hasError) break;
					int rowToWrite = exportParams.rowsToExport.getRow_0b(i);
					int val = col.getValue_INT_fast(rowToWrite);
					if (!UtilsData.isNull(val))
						w.write(val, 0, 1);
					else
						// Pour écrire null en parquet, il faut ecrire avec un definitionLevel de 0
						w.writeNull(0, 0);
				}
			}
			case STR -> {
				String lastVal = null;
				Binary lastBinary = null;
				for (int i = row1; i < row2; i++) {
					if (hasError) break;
					int rowToWrite = exportParams.rowsToExport.getRow_0b(i);
					String val = col.getValue_STR_fast(rowToWrite);
					if (val == null)
						throw new MyException("String null non encore gérées pour l'export au format parquet");
					
					// mini cache String si 2 valeurs successives égales
					if (val.equals(lastVal))
						w.write(lastBinary, 0, 1);
					else {
						lastVal = val;
						lastBinary = Binary.fromString(val);
						w.write(lastBinary, 0, 1);
					}
				}
			}
			case EMPTY -> {
				// ecriture de null de type int (les col EMPTY ont été définies comme INT32 dans le schéma
				for (int i = row1; i < row2; i++) {
					if (hasError) break;
					// Pour écrire null en parquet, il faut ecrire n'importe quoi, mais avec un definitionLevel de 0
					w.writeNull(0, 0);
				}
			}
			default -> {
				throw new MyException("Type de colonne non pris en charge pour un export en parquet");
			}
		}
		} catch (OutOfMemoryError err) {
			hasError = true;
			w.close(); // libération memoire, permettant à ZAZ de survivre si possible
		}
	}
	
	
	private int estimNbRowForRowGroup() {
		int sizeByte1Row = exportParams.getSize1row();
		int nbRow = (int) (exportParams.sizeRowGroup_B / sizeByte1Row);
		if (nbRow<4) // cas pathologique
			nbRow = 4;
		return nbRow;
	}
	
	
	// A appeler pour chaque nouveau RowGroup
	// Un peu long, donc il faut veiller à ne pas trop limiter la taille des RowGroup
	private void initStore() {
		ColumnChunkPageWriteStore columnChunkPageWriteStore = new ColumnChunkPageWriteStore(
				compressor,
				schema,
				props.getAllocator(),
				props.getColumnIndexTruncateLength(),
				props.getPageWriteChecksumEnabled(),
				fileWriter.getEncryptor(),
				rowGroupOrdinal);
		pageStore = columnChunkPageWriteStore;
		bloomFilterWriteStore = columnChunkPageWriteStore;

		columnStore = props.newColumnWriteStore(schema, pageStore, bloomFilterWriteStore);
		MessageColumnIO columnIO = new ColumnIOFactory(true).getColumnIO(schema);

		// creation des ColumnWriter
		int nbCol = exportParams.getNbColToWrite();
		this.columnWriters = new ColumnWriter[nbCol];
		for (int c = 0; c <nbCol; c++) {
			ColumnWriter w = columnStore.getColumnWriter(columnIO.getLeaves().get(c).getColumnDescriptor());
			columnWriters[c] = w;
		}
	}
	
	
	public void close() throws IOException {
		if (!closed) {
			try {
				if (aborted)
					return;
				//flushRowGroupToStore(); // déjà fait, en fait
				fileWriter.end(new HashMap<>(0));
			} finally {
				AutoCloseables.uncheckedClose(columnStore, pageStore, bloomFilterWriteStore, fileWriter);
				closed = true;
			}
		}
	}
	
	// Ecriture du rowgroup
	private void flushRowGroupToStore() throws IOException {
		try {
			if (recordCount > 0) {
				fileWriter.startBlock(recordCount);
				columnStore.flush();
				pageStore.flushToFileWriter(fileWriter);
				fileWriter.endBlock();
				recordCount = 0;
			}
		} finally {
			AutoCloseables.uncheckedClose(columnStore, pageStore, bloomFilterWriteStore);
			columnStore = null;
			pageStore = null;
			bloomFilterWriteStore = null;
		}
	}
	

	private MessageType createSchema() {
		Types.MessageTypeBuilder schemaBuilder = Types.buildMessage();
		for (int i = 0; i <exportParams.getNbColToWrite(); i++) {
			Column col = exportParams.vColToExport.get(i);
			switch (col.getDataType()) {
				// on met optional pour pouvoir mettre des valeurs null
				case DBL -> {
					if (exportParams.doubleAsFloat)
						schemaBuilder.optional(PrimitiveType.PrimitiveTypeName.FLOAT).named(col.getName());
					else
						schemaBuilder.optional(PrimitiveType.PrimitiveTypeName.DOUBLE).named(col.getName());
				}
				case INT -> {
					schemaBuilder.optional(PrimitiveType.PrimitiveTypeName.INT32).named(col.getName());
				}
				case STR -> {
					schemaBuilder.optional(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType()).named(col.getName());
				}
				case EMPTY -> {
					// on met INT32 par défaut
					schemaBuilder.optional(PrimitiveType.PrimitiveTypeName.INT32).named(col.getName());
				}
				default -> {
					throw new MyException("Type de colonne non pris en charge pour un export en parquet");
				}
			}
		}
		return schemaBuilder.named("schema");
	}
	
	
	private Configuration createConfiguration() throws IOException {
        // Configuration Hadoop
        Configuration config = new Configuration();
		
		// C'est dans cet objet que l'on peut spécifier les taux de compression des divers codec
		// GZIP :
		if (exportParams.compressionLevel <= 9) // peut etre supérieur pour d'autres codec (ex. ZSTD)
			config.set("zlib.compress.level", ZlibCompressor.CompressionLevel.values()[exportParams.compressionLevel].name()); // cf. ZlibCompressor.CompressionLevel

		// ZSTD :
		config.setInt(ZstandardCodec.PARQUET_COMPRESS_ZSTD_LEVEL, exportParams.compressionLevel);
		//config.setInt(ZstandardCodec.PARQUET_COMPRESS_ZSTD_WORKERS, 8); a quoi ca sert ??? mettre 8 au lieu de 0 semble ralentir
		
		return config;
	}
	
	
	private ParquetFileWriter createFileWriter() throws IOException {
		// on doit utiliser LocalOutputFile pour qu'il n'y ait pas besoin de binaries hadoop
		OutputFile out = new LocalOutputFile(new File(exportParams.pathFile).toPath());

		return new ParquetFileWriter(out, schema, ParquetFileWriter.Mode.OVERWRITE,
						ParquetWriter.DEFAULT_BLOCK_SIZE,ParquetWriter.MAX_PADDING_SIZE_DEFAULT,
						ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
						ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH,
						false//ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED
		);
	}
	
	
	private ParquetProperties createEncodingProperties() {
		ParquetProperties.Builder encodingPropsBuilder = ParquetProperties.builder();
		encodingPropsBuilder.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0);
		
		encodingPropsBuilder.withStatisticsEnabled(false); // n'accélere pas ?? à rechecker
		
		// voir l'intéret ...
		// avec nos données (ex. LF) ce paramétrage ne semble pas diminuer la taille sauf si forcage en FLOAT
		encodingPropsBuilder.withByteStreamSplitEncoding(exportParams.bByteSplitting);

		// Dictionnaire
		encodingPropsBuilder.withDictionaryEncoding(exportParams.bWithDictionnary);
		encodingPropsBuilder.withDictionaryPageSize(exportParams.dictionnaryMaxSize_B);
				
		// paramétrage spécifiques pour certaines colonnes
		for (Column col : exportParams.vColWithDictionnary)
			encodingPropsBuilder.withDictionaryEncoding(col.getName(), true);
		for (Column col : exportParams.vColNoDictionnary)
			encodingPropsBuilder.withDictionaryEncoding(col.getName(), false);

		
		encodingPropsBuilder.withPageSize(exportParams.pageSize_B);
		encodingPropsBuilder.withPageRowCountLimit(exportParams.pageRowCountLimit);
		encodingPropsBuilder.withMinRowCountForPageSizeCheck(1000); // 100 par defaut ralentit un peu ?
		
		// La taille du rowGroup sera géré par nous-même dans la fonction writeData()
		//  -> gestion de façon approxiative mais efficace lors de l'écriture, donc pas besoin de le paramétrer ici
		//encodingPropsBuilder.withRowGroupSize(128l * 1024 * 1024); // 128MB = par defaut
		
		
		// On complète avec des propriétés additionnelles
		// mais qui nécessitent d'avoir déjà un fichier properties ... (bof, mais bon)
		// c'est le cas d'un ValuesWriterFactory perso, mais qui a besoin d'un properties
		// déjà buildé pour la création des ValueWriter
		ParquetProperties propsTemp = encodingPropsBuilder.build();
		
		encodingPropsBuilder = ParquetProperties.copy(propsTemp);
		return encodingPropsBuilder.build();
	}
	
	private CompressionCodecFactory.BytesInputCompressor createCompressor(Configuration conf, ParquetProperties props) {
		... SNAPPY
	}
	
	
}

Component(s)

Benchmark, Core

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions