diff --git a/config.go b/config.go index 11a3401..d85e698 100644 --- a/config.go +++ b/config.go @@ -1,6 +1,7 @@ package sqltocsvgzip import ( + "bytes" "compress/flate" "database/sql" "os" @@ -12,15 +13,26 @@ const ( minFileSize = 5 * 1024 * 1024 ) -type s3Obj struct { +type obj struct { partNumber int64 buf []byte } +type LogLevel int + +const ( + Error LogLevel = 1 + Warn LogLevel = 2 + Info LogLevel = 3 + Debug LogLevel = 4 + Verbose LogLevel = 5 +) + // Converter does the actual work of converting the rows to CSV. // There are a few settings you can override if you want to do // some fancy stuff to your CSV. type Converter struct { + LogLevel LogLevel Headers []string // Column headers to use (default is rows.Columns()) WriteHeaders bool // Flag to output headers in your CSV (default is true) TimeFormat string // Format string for any time.Time values (default is time's default) @@ -30,22 +42,23 @@ type Converter struct { GzipGoroutines int GzipBatchPerGoroutine int SingleThreaded bool - Debug bool S3Bucket string S3Region string S3Acl string S3Path string S3Upload bool - S3UploadThreads int - S3UploadMaxPartSize int64 + UploadThreads int + UploadPartSize int s3Svc *s3.S3 s3Resp *s3.CreateMultipartUploadOutput - s3Uploadable chan *s3Obj s3CompletedParts []*s3.CompletedPart rows *sql.Rows rowPreProcessor CsvPreProcessorFunc - gzipBuf []byte + gzipBuf *bytes.Buffer + partNumber int64 + uploadQ chan *obj + quit chan bool } // CsvPreprocessorFunc is a function type for preprocessing your CSV. @@ -70,40 +83,29 @@ func New(rows *sql.Rows) *Converter { WriteHeaders: true, Delimiter: ',', CompressionLevel: flate.DefaultCompression, - GzipGoroutines: 6, - GzipBatchPerGoroutine: 180000, + GzipGoroutines: 10, + GzipBatchPerGoroutine: 100000, + UploadPartSize: 5 * 1024 * 1025, // Should be greater than 1 * 1024 * 1024 for pgzip + LogLevel: Info, } } -// DefaultConfig sets the following variables. -// -// WriteHeaders: true, -// Delimiter: ',', -// CompressionLevel: flate.DefaultCompression, -// GzipGoroutines: 6, -// GzipBatchPerGoroutine: 180000, -// S3Upload: true, -// S3UploadThreads: 6, -// S3UploadMaxPartSize: 5 * 1024 * 1025, // Should be greater than 5 * 1024 * 1024 -// S3Bucket: os.Getenv("S3_BUCKET"), -// S3Path: os.Getenv("S3_PATH"), -// S3Region: os.Getenv("S3_REGION"), -// S3Acl: os.Getenv("S3_ACL"), // If empty, defaults to bucket-owner-full-control -// +// DefaultConfig sets the default values for Converter struct. func DefaultConfig(rows *sql.Rows) *Converter { return &Converter{ rows: rows, WriteHeaders: true, Delimiter: ',', CompressionLevel: flate.DefaultCompression, - GzipGoroutines: 6, - GzipBatchPerGoroutine: 180000, + GzipGoroutines: 10, + GzipBatchPerGoroutine: 100000, S3Upload: true, - S3UploadThreads: 6, - S3UploadMaxPartSize: 5 * 1024 * 1025, // Should be greater than 5 * 1024 * 1024 + UploadThreads: 6, + UploadPartSize: 5 * 1024 * 1025, // Should be greater than 5 * 1024 * 1024 for s3 upload S3Bucket: os.Getenv("S3_BUCKET"), S3Path: os.Getenv("S3_PATH"), S3Region: os.Getenv("S3_REGION"), S3Acl: os.Getenv("S3_ACL"), + LogLevel: Info, } } diff --git a/csv.go b/csv.go index 991d0a6..e58d36c 100644 --- a/csv.go +++ b/csv.go @@ -1,10 +1,27 @@ package sqltocsvgzip import ( + "bytes" + "encoding/csv" "fmt" "time" ) +func (c *Converter) getCSVWriter() (*csv.Writer, *bytes.Buffer) { + // Same size as sqlRowBatch + var csvBuffer bytes.Buffer + + // CSV writer to csvBuffer + csvWriter := csv.NewWriter(&csvBuffer) + + // Set delimiter + if c.Delimiter != '\x00' { + csvWriter.Comma = c.Delimiter + } + + return csvWriter, &csvBuffer +} + func (c *Converter) setCSVHeaders() ([]string, int, error) { var headers []string columnNames, err := c.rows.Columns() diff --git a/getter.go b/getter.go index 44551c8..de2f4f9 100644 --- a/getter.go +++ b/getter.go @@ -9,38 +9,17 @@ import ( // getSqlBatchSize gets the size of rows to be retrieved. // This batch is worked upon entirely before flushing to disk. -func (c *Converter) getSqlBatchSize(totalColumns int) int { +func (c *Converter) getSqlBatchSize(totalColumns int) { // Use sqlBatchSize set by user if c.SqlBatchSize != 0 { - return c.SqlBatchSize + return } // Default to 4096 c.SqlBatchSize = 4096 - - // Use Default value when Single thread. - if c.SingleThreaded { - return c.SqlBatchSize - } - - // If Multi-threaded, then block size should be atleast 1Mb = 1048576 bytes - // See https://github.com/klauspost/pgzip - - // (String X SqlBatchSize X TotalColumns) > 1048576 - // String = 16 bytes - // (SqlBatchSize X TotalColumns) > 65536 - - for (c.SqlBatchSize * totalColumns) <= 65536 { - c.SqlBatchSize = c.SqlBatchSize * 2 - } - - // We aim for 1.5 MB - 2 MB to be on a safe side - c.SqlBatchSize = c.SqlBatchSize * 2 - - return c.SqlBatchSize } -func (c *Converter) selectCompressionMethod(writer io.Writer) (io.WriteCloser, error) { +func (c *Converter) getGzipWriter(writer io.Writer) (io.WriteCloser, error) { // Use gzip if single threaded if c.SingleThreaded { zw, err := gzip.NewWriterLevel(writer, c.CompressionLevel) diff --git a/s3.go b/s3.go index 54a1208..f9a4ea7 100644 --- a/s3.go +++ b/s3.go @@ -3,8 +3,8 @@ package sqltocsvgzip import ( "bytes" "fmt" - "log" - "os" + "io" + "net/url" "sync" "github.com/aws/aws-sdk-go/aws" @@ -35,7 +35,7 @@ func (c *Converter) createMultipartRequest() (err error) { return err } - log.Println("Created multipart upload request.") + c.writeLog(Info, "Created multipart upload request.") return nil } @@ -59,7 +59,7 @@ func (c *Converter) createS3Session() error { } func (c *Converter) abortMultipartUpload() error { - log.Println("Aborting multipart upload for UploadId: " + *c.s3Resp.UploadId) + c.writeLog(Info, "Aborting multipart upload for UploadId: "+aws.StringValue(c.s3Resp.UploadId)) abortInput := &s3.AbortMultipartUploadInput{ Bucket: c.s3Resp.Bucket, Key: c.s3Resp.Key, @@ -70,7 +70,7 @@ func (c *Converter) abortMultipartUpload() error { } func (c *Converter) completeMultipartUpload() (*s3.CompleteMultipartUploadOutput, error) { - log.Println("Completing multipart upload for UploadId: " + *c.s3Resp.UploadId) + c.writeLog(Info, "Completing multipart upload for UploadId: "+aws.StringValue(c.s3Resp.UploadId)) completeInput := &s3.CompleteMultipartUploadInput{ Bucket: c.s3Resp.Bucket, Key: c.s3Resp.Key, @@ -95,17 +95,17 @@ func (c *Converter) uploadPart(partNumber int64, buf []byte, mu *sync.RWMutex) ( for tryNum <= maxRetries { uploadResult, err := c.s3Svc.UploadPart(partInput) if err != nil { - log.Println(err) + c.writeLog(Error, err.Error()) if tryNum == maxRetries { if aerr, ok := err.(awserr.Error); ok { return aerr } return err } - log.Println("Retrying to upload part: #", partNumber) + c.writeLog(Info, fmt.Sprintf("Retrying to upload part: #%v", partNumber)) tryNum++ } else { - log.Println("Uploaded part: #", partNumber) + c.writeLog(Info, fmt.Sprintf("Uploaded part: #%v", partNumber)) mu.Lock() c.s3CompletedParts = append(c.s3CompletedParts, &s3.CompletedPart{ ETag: uploadResult.ETag, @@ -118,7 +118,12 @@ func (c *Converter) uploadPart(partNumber int64, buf []byte, mu *sync.RWMutex) ( return nil } -func (c *Converter) UploadObjectToS3(f *os.File) error { +func (c *Converter) UploadObjectToS3(w io.Writer) error { + buf, ok := w.(*bytes.Buffer) + if !ok { + return fmt.Errorf("Expected buffer. Got %T", w) + } + fileType := "application/x-gzip" // The session the S3 Uploader will use @@ -135,12 +140,16 @@ func (c *Converter) UploadObjectToS3(f *os.File) error { Key: aws.String(c.S3Path), ACL: aws.String(c.S3Acl), ContentType: aws.String(fileType), - Body: f, + Body: bytes.NewReader(buf.Bytes()), }) if err != nil { return err } - log.Println(res.Location) + uploadPath, err := url.PathUnescape(res.Location) + if err != nil { + return err + } + c.writeLog(Info, "Successfully uploaded file: "+uploadPath) return nil } diff --git a/sqltocsvgzip.go b/sqltocsvgzip.go index f8f7703..44a32ff 100644 --- a/sqltocsvgzip.go +++ b/sqltocsvgzip.go @@ -7,9 +7,8 @@ package sqltocsvgzip import ( "bytes" "database/sql" - "encoding/csv" "fmt" - "io/ioutil" + "io" "log" "net/url" "os" @@ -23,123 +22,129 @@ func WriteFile(csvGzipFileName string, rows *sql.Rows) error { return New(rows).WriteFile(csvGzipFileName) } -func UploadToS3(csvGzipFileName string, rows *sql.Rows) error { - return DefaultConfig(rows).WriteFile(csvGzipFileName) +func UploadToS3(rows *sql.Rows) error { + return DefaultConfig(rows).Upload() } // WriteFile writes the csv.gzip to the filename specified, return an error if problem -func (c *Converter) WriteFile(csvGzipFileName string) error { - f, err := os.Create(csvGzipFileName) +func (c *Converter) Upload() error { + if c.UploadPartSize < minFileSize { + return fmt.Errorf("UploadPartSize should be greater than %v\n", minFileSize) + } + + // Create MultiPart S3 Upload + err := c.createS3Session() + if err != nil { + return err + } + + err = c.createMultipartRequest() if err != nil { return err } - defer f.Close() wg := sync.WaitGroup{} - quit := make(chan bool, 1) + buf := bytes.Buffer{} + c.uploadQ = make(chan *obj, c.UploadThreads) + c.quit = make(chan bool, 1) + + // Upload Parts to S3 + for i := 0; i < c.UploadThreads; i++ { + wg.Add(1) + go func() { + defer wg.Done() + err = c.UploadAndDeletePart() + if err != nil { + c.writeLog(Error, err.Error()) + } + }() + } - // Create MultiPart S3 Upload - if c.S3Upload { - err = c.createS3Session() - if err != nil { - return err + err = c.Write(&buf) + if err != nil { + // Abort S3 Upload + awserr := c.abortMultipartUpload() + if awserr != nil { + return awserr } + return err + } - err = c.createMultipartRequest() + close(c.uploadQ) + wg.Wait() + + if c.partNumber == 0 { + // Upload one time + c.writeLog(Info, "Gzip file < 5 MB. Enable direct upload. Abort multipart upload.") + err = c.abortMultipartUpload() if err != nil { return err } - // Upload Parts to S3 - c.s3Uploadable = make(chan *s3Obj, c.S3UploadThreads) - - for i := 0; i < c.S3UploadThreads; i++ { - wg.Add(1) - go func() { - defer wg.Done() - err = c.UploadAndDeletePart(quit) - if err != nil { - log.Println(err) - } - }() + err = c.UploadObjectToS3(&buf) + if err != nil { + return err } + return nil } - err = c.Write(f, quit) + // Sort completed parts + c.sortCompletedParts() + // Complete S3 upload + completeResponse, err := c.completeMultipartUpload() if err != nil { // Abort S3 Upload - if c.S3Upload { - awserr := c.abortMultipartUpload() - if awserr != nil { - return awserr - } + awserr := c.abortMultipartUpload() + if awserr != nil { + return awserr } return err } - if c.S3Upload { - wg.Wait() - - // Sort completed parts - c.sortCompletedParts() - // Complete S3 upload - completeResponse, err := c.completeMultipartUpload() - if err != nil { - return err - } - uploadPath, err := url.PathUnescape(completeResponse.String()) - if err != nil { - return err - } - log.Printf("Successfully uploaded file: %s\n", uploadPath) + uploadPath, err := url.PathUnescape(completeResponse.String()) + if err != nil { + return err } + c.writeLog(Info, "Successfully uploaded file: "+uploadPath) return nil } -// Write writes the csv.gzip to the Writer provided -func (c *Converter) Write(f *os.File, quit chan bool) error { - if c.S3Upload && c.S3UploadMaxPartSize < minFileSize { - return fmt.Errorf("S3UploadMaxPartSize should be greater than %v\n", minFileSize) +// WriteFile writes the csv.gzip to the filename specified, return an error if problem +func (c *Converter) WriteFile(csvGzipFileName string) error { + f, err := os.Create(csvGzipFileName) + if err != nil { + return err } - var countRows, partNumber int64 - writeRow := true - rows := c.rows - - // Same size as sqlRowBatch - var csvBuffer bytes.Buffer - - // CSV writer to csvBuffer - csvWriter := csv.NewWriter(&csvBuffer) + defer f.Close() - // Set delimiter - if c.Delimiter != '\x00' { - csvWriter.Comma = c.Delimiter - } + // Explicitely unset s3 upload + c.S3Upload = false - // Set headers - columnNames, totalColumns, err := c.setCSVHeaders() + err = c.Write(f) if err != nil { return err } - // GZIP writer to underline file.csv.gzip - zw, err := c.selectCompressionMethod(f) + return nil +} + +// Write writes the csv.gzip to the Writer provided +func (c *Converter) Write(w io.Writer) error { + var countRows int64 + writeRow := true + + csvWriter, csvBuffer := c.getCSVWriter() + + // Set headers + columnNames, totalColumns, err := c.setCSVHeaders() if err != nil { return err } - defer zw.Close() - - // Buffer size: string bytes x sqlBatchSize x No. of Columns - sqlBatchSize := c.getSqlBatchSize(totalColumns) - if c.Debug { - log.Println("SQL Batch size: ", sqlBatchSize) - } - - // Create buffer - sqlRowBatch := make([][]string, 0, sqlBatchSize) - // Append headers + // Create slice and append headers + c.getSqlBatchSize(totalColumns) + sqlRowBatch := make([][]string, 0, c.SqlBatchSize) sqlRowBatch = append(sqlRowBatch, columnNames) // Buffers for each iteration @@ -150,9 +155,23 @@ func (c *Converter) Write(f *os.File, quit chan bool) error { valuePtrs[i] = &values[i] } + // GZIP writer to underline file.csv.gzip + zw, err := c.getGzipWriter(w) + if err != nil { + return err + } + defer zw.Close() + // Iterate over sql rows - for rows.Next() { - if err = rows.Scan(valuePtrs...); err != nil { + for c.rows.Next() { + select { + case <-c.quit: + return fmt.Errorf("Received quit signal. Exiting.") + default: + // Do nothing + } + + if err = c.rows.Scan(valuePtrs...); err != nil { return err } @@ -164,59 +183,53 @@ func (c *Converter) Write(f *os.File, quit chan bool) error { if writeRow { sqlRowBatch = append(sqlRowBatch, row) - if len(sqlRowBatch) >= sqlBatchSize { + + // Write to CSV Buffer + if len(sqlRowBatch) >= c.SqlBatchSize { + c.writeLog(Verbose, fmt.Sprintf("Batching at %v rows", len(sqlRowBatch))) countRows = countRows + int64(len(sqlRowBatch)) - // Convert from sql to csv - // Writes to buffer err = csvWriter.WriteAll(sqlRowBatch) if err != nil { return err } + // Reset Slice + sqlRowBatch = sqlRowBatch[:0] + } - // Convert from csv to gzip - // Writes from buffer to underlying file - _, err = zw.Write(csvBuffer.Bytes()) + // Convert from csv to gzip + // Writes from buffer to underlying file + if csvBuffer.Len() >= c.UploadPartSize { + bytesWritten, err := zw.Write(csvBuffer.Bytes()) + c.writeLog(Debug, fmt.Sprintf("Csv to gzip bytes written: %v", bytesWritten)) if err != nil { return err } // Reset buffer - sqlRowBatch = sqlRowBatch[:0] csvBuffer.Reset() - } + } // Upload partially created file to S3 - // If UploadtoS3 is set to true && // If size of the gzip file exceeds maxFileStorage if c.S3Upload { - select { - case <-quit: - return fmt.Errorf("Received quit signal. Exiting.") - default: - // Do nothing - } - - fileInfo, err := f.Stat() - if err != nil { - return err + gzipBuffer, ok := w.(*bytes.Buffer) + if !ok { + return fmt.Errorf("Expected buffer. Got %T", w) } - if fileInfo.Size() >= c.S3UploadMaxPartSize { - if partNumber > 10000 { - return fmt.Errorf("Number of parts cannot exceed 10000") + if gzipBuffer.Len() >= c.UploadPartSize { + c.writeLog(Debug, fmt.Sprintf("gzipBuffer size: %v", gzipBuffer.Len())) + if c.partNumber == 10000 { + return fmt.Errorf("Number of parts cannot exceed 10000. Please increase UploadPartSize and try again.") } - // Increament PartNumber - partNumber++ + // Add to Queue - partNumber, err = c.AddToQueue(f, partNumber) - if err != nil { - return err - } + c.AddToQueue(gzipBuffer) } } } } - err = rows.Err() + err = c.rows.Err() if err != nil { return err } @@ -227,111 +240,89 @@ func (c *Converter) Write(f *os.File, quit chan bool) error { if err != nil { return err } + //Wipe the buffer + sqlRowBatch = nil + _, err = zw.Write(csvBuffer.Bytes()) if err != nil { return err } - //Wipe the buffer - sqlRowBatch = nil csvBuffer.Reset() + // Log the total number of rows processed. + c.writeLog(Info, fmt.Sprintf("Total rows processed (sql rows + headers row): %v", countRows)) + // Upload last part of the file to S3 if c.S3Upload { - // Increament PartNumber - partNumber++ - if partNumber == 1 { - // Upload one time - if c.Debug { - log.Println("Gzip files < 5 MB are uploaded together without batching.") - } - err = c.UploadObjectToS3(f) - if err != nil { - return err - } - c.abortMultipartUpload() - } else { - // Add to Queue for multipart upload - partNumber, err = c.AddToQueue(f, partNumber) - if err != nil { - return err - } + if c.partNumber == 0 { + return nil } - close(c.s3Uploadable) + // Add to Queue for multipart upload + gzipBuffer, ok := w.(*bytes.Buffer) + if !ok { + return fmt.Errorf("Expected buffer. Got %T", w) + } + c.AddToQueue(gzipBuffer) } - // Log the total number of rows processed. - log.Println("Total number of sql rows processed: ", countRows) - return nil } -func (c *Converter) AddToQueue(f *os.File, partNumber int64) (newPartNumber int64, err error) { - newPartNumber = partNumber - - buf, err := ioutil.ReadFile(f.Name()) - if err != nil { - return 0, err - } - - if c.Debug { - log.Printf("Part %v wrote bytes %v.\n", partNumber, len(buf)) - } - - if len(buf) >= minFileSize { - // Add previous part to queue - if partNumber > 1 { - if c.Debug { - log.Println("Add part to queue: #", partNumber-1) - } - c.s3Uploadable <- &s3Obj{ - partNumber: partNumber - 1, - buf: c.gzipBuf, +func (c *Converter) AddToQueue(buf *bytes.Buffer) { + // Increament PartNumber + c.partNumber++ + + if buf.Len() >= c.UploadPartSize { + if c.partNumber > 1 { + // Add part to queue + c.writeLog(Debug, fmt.Sprintf("Add part to queue: #%v", c.partNumber-1)) + tmpSlice := make([]byte, c.gzipBuf.Len()) + copy(tmpSlice, c.gzipBuf.Bytes()) + c.uploadQ <- &obj{ + partNumber: c.partNumber - 1, + buf: tmpSlice, } } c.gzipBuf = buf } else { - if c.Debug { - log.Printf("Part size is less than %v. Merging with previous part.\n", minFileSize) - } - // Write the bytes to previous partFile - c.gzipBuf = append(c.gzipBuf, buf...) - log.Println("Add part to queue: #", partNumber-1) - c.s3Uploadable <- &s3Obj{ - partNumber: partNumber - 1, - buf: c.gzipBuf, + c.writeLog(Debug, fmt.Sprintf("Buffer len %v should be greater than %v for upload.", buf.Len(), c.UploadPartSize)) + c.gzipBuf.Write(buf.Bytes()) + + // Add part to queue + c.writeLog(Debug, fmt.Sprintf("Add part to queue: #%v", c.partNumber-1)) + tmpSlice := make([]byte, c.gzipBuf.Len()) + copy(tmpSlice, c.gzipBuf.Bytes()) + c.uploadQ <- &obj{ + partNumber: c.partNumber - 1, + buf: tmpSlice, } - newPartNumber = partNumber - 1 - } - // Reset file - err = f.Truncate(0) - if err != nil { - return 0, err - } - // Move the reader - _, err = f.Seek(0, 0) - if err != nil { - return 0, err + c.partNumber-- } - return newPartNumber, nil + c.writeLog(Verbose, fmt.Sprintf("c.gzipBuf: %v at partNumber: %v", c.gzipBuf.Len(), c.partNumber)) + buf.Reset() } -func (c *Converter) UploadAndDeletePart(quit chan bool) (err error) { +func (c *Converter) UploadAndDeletePart() (err error) { mu := &sync.RWMutex{} - for s3obj := range c.s3Uploadable { + for s3obj := range c.uploadQ { err = c.uploadPart(s3obj.partNumber, s3obj.buf, mu) if err != nil { - log.Println("Error occurred. Sending quit signal to writer.") - quit <- true + c.writeLog(Error, "Error occurred. Sending quit signal to writer.") + c.quit <- true c.abortMultipartUpload() return err } } - if c.Debug { - log.Println("Received closed signal") - } + c.writeLog(Debug, "Received closed signal") return } + +func (c *Converter) writeLog(logLevel LogLevel, logLine string) { + if logLevel <= c.LogLevel { + log.Println(logLine) + } +}