Skip to content

Commit

Permalink
Merge pull request #3 from thatInfrastructureGuy/feature/addToQueue
Browse files Browse the repository at this point in the history
Feature/add to queue
  • Loading branch information
thatInfrastructureGuy authored Jul 16, 2020
2 parents 1d7c44a + e216b54 commit e0bfc75
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 245 deletions.
56 changes: 29 additions & 27 deletions config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sqltocsvgzip

import (
"bytes"
"compress/flate"
"database/sql"
"os"
Expand All @@ -12,15 +13,26 @@ const (
minFileSize = 5 * 1024 * 1024
)

type s3Obj struct {
type obj struct {
partNumber int64
buf []byte
}

type LogLevel int

const (
Error LogLevel = 1
Warn LogLevel = 2
Info LogLevel = 3
Debug LogLevel = 4
Verbose LogLevel = 5
)

// Converter does the actual work of converting the rows to CSV.
// There are a few settings you can override if you want to do
// some fancy stuff to your CSV.
type Converter struct {
LogLevel LogLevel
Headers []string // Column headers to use (default is rows.Columns())
WriteHeaders bool // Flag to output headers in your CSV (default is true)
TimeFormat string // Format string for any time.Time values (default is time's default)
Expand All @@ -30,22 +42,23 @@ type Converter struct {
GzipGoroutines int
GzipBatchPerGoroutine int
SingleThreaded bool
Debug bool
S3Bucket string
S3Region string
S3Acl string
S3Path string
S3Upload bool
S3UploadThreads int
S3UploadMaxPartSize int64
UploadThreads int
UploadPartSize int

s3Svc *s3.S3
s3Resp *s3.CreateMultipartUploadOutput
s3Uploadable chan *s3Obj
s3CompletedParts []*s3.CompletedPart
rows *sql.Rows
rowPreProcessor CsvPreProcessorFunc
gzipBuf []byte
gzipBuf *bytes.Buffer
partNumber int64
uploadQ chan *obj
quit chan bool
}

// CsvPreprocessorFunc is a function type for preprocessing your CSV.
Expand All @@ -70,40 +83,29 @@ func New(rows *sql.Rows) *Converter {
WriteHeaders: true,
Delimiter: ',',
CompressionLevel: flate.DefaultCompression,
GzipGoroutines: 6,
GzipBatchPerGoroutine: 180000,
GzipGoroutines: 10,
GzipBatchPerGoroutine: 100000,
UploadPartSize: 5 * 1024 * 1025, // Should be greater than 1 * 1024 * 1024 for pgzip
LogLevel: Info,
}
}

// DefaultConfig sets the following variables.
//
// WriteHeaders: true,
// Delimiter: ',',
// CompressionLevel: flate.DefaultCompression,
// GzipGoroutines: 6,
// GzipBatchPerGoroutine: 180000,
// S3Upload: true,
// S3UploadThreads: 6,
// S3UploadMaxPartSize: 5 * 1024 * 1025, // Should be greater than 5 * 1024 * 1024
// S3Bucket: os.Getenv("S3_BUCKET"),
// S3Path: os.Getenv("S3_PATH"),
// S3Region: os.Getenv("S3_REGION"),
// S3Acl: os.Getenv("S3_ACL"), // If empty, defaults to bucket-owner-full-control
//
// DefaultConfig sets the default values for Converter struct.
func DefaultConfig(rows *sql.Rows) *Converter {
return &Converter{
rows: rows,
WriteHeaders: true,
Delimiter: ',',
CompressionLevel: flate.DefaultCompression,
GzipGoroutines: 6,
GzipBatchPerGoroutine: 180000,
GzipGoroutines: 10,
GzipBatchPerGoroutine: 100000,
S3Upload: true,
S3UploadThreads: 6,
S3UploadMaxPartSize: 5 * 1024 * 1025, // Should be greater than 5 * 1024 * 1024
UploadThreads: 6,
UploadPartSize: 5 * 1024 * 1025, // Should be greater than 5 * 1024 * 1024 for s3 upload
S3Bucket: os.Getenv("S3_BUCKET"),
S3Path: os.Getenv("S3_PATH"),
S3Region: os.Getenv("S3_REGION"),
S3Acl: os.Getenv("S3_ACL"),
LogLevel: Info,
}
}
17 changes: 17 additions & 0 deletions csv.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,27 @@
package sqltocsvgzip

import (
"bytes"
"encoding/csv"
"fmt"
"time"
)

func (c *Converter) getCSVWriter() (*csv.Writer, *bytes.Buffer) {
// Same size as sqlRowBatch
var csvBuffer bytes.Buffer

// CSV writer to csvBuffer
csvWriter := csv.NewWriter(&csvBuffer)

// Set delimiter
if c.Delimiter != '\x00' {
csvWriter.Comma = c.Delimiter
}

return csvWriter, &csvBuffer
}

func (c *Converter) setCSVHeaders() ([]string, int, error) {
var headers []string
columnNames, err := c.rows.Columns()
Expand Down
27 changes: 3 additions & 24 deletions getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,38 +9,17 @@ import (

// getSqlBatchSize gets the size of rows to be retrieved.
// This batch is worked upon entirely before flushing to disk.
func (c *Converter) getSqlBatchSize(totalColumns int) int {
func (c *Converter) getSqlBatchSize(totalColumns int) {
// Use sqlBatchSize set by user
if c.SqlBatchSize != 0 {
return c.SqlBatchSize
return
}

// Default to 4096
c.SqlBatchSize = 4096

// Use Default value when Single thread.
if c.SingleThreaded {
return c.SqlBatchSize
}

// If Multi-threaded, then block size should be atleast 1Mb = 1048576 bytes
// See https://github.com/klauspost/pgzip

// (String X SqlBatchSize X TotalColumns) > 1048576
// String = 16 bytes
// (SqlBatchSize X TotalColumns) > 65536

for (c.SqlBatchSize * totalColumns) <= 65536 {
c.SqlBatchSize = c.SqlBatchSize * 2
}

// We aim for 1.5 MB - 2 MB to be on a safe side
c.SqlBatchSize = c.SqlBatchSize * 2

return c.SqlBatchSize
}

func (c *Converter) selectCompressionMethod(writer io.Writer) (io.WriteCloser, error) {
func (c *Converter) getGzipWriter(writer io.Writer) (io.WriteCloser, error) {
// Use gzip if single threaded
if c.SingleThreaded {
zw, err := gzip.NewWriterLevel(writer, c.CompressionLevel)
Expand Down
31 changes: 20 additions & 11 deletions s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package sqltocsvgzip
import (
"bytes"
"fmt"
"log"
"os"
"io"
"net/url"
"sync"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -35,7 +35,7 @@ func (c *Converter) createMultipartRequest() (err error) {
return err
}

log.Println("Created multipart upload request.")
c.writeLog(Info, "Created multipart upload request.")
return nil
}

Expand All @@ -59,7 +59,7 @@ func (c *Converter) createS3Session() error {
}

func (c *Converter) abortMultipartUpload() error {
log.Println("Aborting multipart upload for UploadId: " + *c.s3Resp.UploadId)
c.writeLog(Info, "Aborting multipart upload for UploadId: "+aws.StringValue(c.s3Resp.UploadId))
abortInput := &s3.AbortMultipartUploadInput{
Bucket: c.s3Resp.Bucket,
Key: c.s3Resp.Key,
Expand All @@ -70,7 +70,7 @@ func (c *Converter) abortMultipartUpload() error {
}

func (c *Converter) completeMultipartUpload() (*s3.CompleteMultipartUploadOutput, error) {
log.Println("Completing multipart upload for UploadId: " + *c.s3Resp.UploadId)
c.writeLog(Info, "Completing multipart upload for UploadId: "+aws.StringValue(c.s3Resp.UploadId))
completeInput := &s3.CompleteMultipartUploadInput{
Bucket: c.s3Resp.Bucket,
Key: c.s3Resp.Key,
Expand All @@ -95,17 +95,17 @@ func (c *Converter) uploadPart(partNumber int64, buf []byte, mu *sync.RWMutex) (
for tryNum <= maxRetries {
uploadResult, err := c.s3Svc.UploadPart(partInput)
if err != nil {
log.Println(err)
c.writeLog(Error, err.Error())
if tryNum == maxRetries {
if aerr, ok := err.(awserr.Error); ok {
return aerr
}
return err
}
log.Println("Retrying to upload part: #", partNumber)
c.writeLog(Info, fmt.Sprintf("Retrying to upload part: #%v", partNumber))
tryNum++
} else {
log.Println("Uploaded part: #", partNumber)
c.writeLog(Info, fmt.Sprintf("Uploaded part: #%v", partNumber))
mu.Lock()
c.s3CompletedParts = append(c.s3CompletedParts, &s3.CompletedPart{
ETag: uploadResult.ETag,
Expand All @@ -118,7 +118,12 @@ func (c *Converter) uploadPart(partNumber int64, buf []byte, mu *sync.RWMutex) (
return nil
}

func (c *Converter) UploadObjectToS3(f *os.File) error {
func (c *Converter) UploadObjectToS3(w io.Writer) error {
buf, ok := w.(*bytes.Buffer)
if !ok {
return fmt.Errorf("Expected buffer. Got %T", w)
}

fileType := "application/x-gzip"

// The session the S3 Uploader will use
Expand All @@ -135,12 +140,16 @@ func (c *Converter) UploadObjectToS3(f *os.File) error {
Key: aws.String(c.S3Path),
ACL: aws.String(c.S3Acl),
ContentType: aws.String(fileType),
Body: f,
Body: bytes.NewReader(buf.Bytes()),
})
if err != nil {
return err
}

log.Println(res.Location)
uploadPath, err := url.PathUnescape(res.Location)
if err != nil {
return err
}
c.writeLog(Info, "Successfully uploaded file: "+uploadPath)
return nil
}
Loading

0 comments on commit e0bfc75

Please sign in to comment.