Skip to content

Commit

Permalink
Now using shared import code
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasmartinelli committed Aug 19, 2015
1 parent 62909e1 commit 72608af
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 42 deletions.
76 changes: 36 additions & 40 deletions csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ import (
"encoding/csv"
"fmt"
"io"
"log"
"os"
"strings"

"github.com/cheggaaa/pb"
"github.com/codegangsta/cli"
"github.com/lib/pq"
)

func parseColumns(c *cli.Context, reader *csv.Reader) []string {
Expand Down Expand Up @@ -41,20 +40,17 @@ func importCsv(c *cli.Context) {
schema := c.GlobalString("schema")
tableName := parseTableName(c, filename)

db, err := connect(parseConnStr(c), schema)
failOnError(err, "Could not connect to db")
defer db.Close()

file, err := os.Open(filename)
failOnError(err, "Cannot open file")
defer file.Close()

fi, err := file.Stat()
failOnError(err, "Could not find out file size of file")
total := fi.Size()
bar := pb.New64(total)
bar.SetUnits(pb.U_BYTES)
bar.Start()
db, err := connect(parseConnStr(c), schema)
failOnError(err, "Could not connect to db")
defer db.Close()

success := 0
failed := 0
bar := NewProgressBar(file)

reader := csv.NewReader(io.TeeReader(file, bar))
reader.Comma = rune(c.String("delimiter")[0])
Expand All @@ -64,46 +60,48 @@ func importCsv(c *cli.Context) {
columns := parseColumns(c, reader)
reader.FieldsPerRecord = len(columns)

table, err := createTable(db, schema, tableName, columns)
failOnError(err, "Could not create table statement")

_, err = table.Exec()
failOnError(err, "Could not create table")

txn, err := db.Begin()
failOnError(err, "Could not start transaction")

stmt, err := txn.Prepare(pq.CopyInSchema(schema, tableName, columns...))
failOnError(err, "Could not prepare copy in statement")

successCount := 0
failCount := 0
i, err := NewCSVImport(db, schema, tableName, columns)
failOnError(err, "Could not prepare import")

for {
cols := make([]interface{}, len(columns))
record, err := reader.Read()

//Loop ensures we don't insert too many values and that
//values are properly converted into empty interfaces
for i, col := range record {
cols[i] = col
}

if err == io.EOF {
break
}
failOnError(err, "Could not read csv")
_, err = stmt.Exec(cols...)
failOnError(err, "Could add bulk insert")
successCount++
}

_, err = stmt.Exec()
failOnError(err, "Could not exec the bulk copy")
//Todo: better error handling
failOnError(err, "Could not read csv")

err = stmt.Close()
failOnError(err, "Could not close")
err = i.AddRow(cols...)
if err != nil {
failed++
line := strings.Join(record, c.GlobalString("delimiter"))

if c.GlobalBool("ignore-errors") {
os.Stderr.WriteString(line)
} else {
msg := fmt.Sprintf("Could not import %s: %s", err, line)
log.Fatalln(msg)
panic(msg)
}
} else {
success++
}
}

err = txn.Commit()
failOnError(err, "Could not commit transaction")
err = i.Commit()
failOnError(err, "Could not commit")
bar.Finish()

//refactore whole reporting stuff
//print report
fmt.Println(fmt.Sprintf("Successfully copied %d rows into %s"))

Expand All @@ -113,9 +111,7 @@ func importCsv(c *cli.Context) {
fmt.Println(fmt.Sprintf("%d rows could not be imported into %s."))
}

if failCount > 0 && c.GlobalBool("ignore-errors") {
if failed > 0 && c.GlobalBool("ignore-errors") {
fmt.Println("You can specify the --ignore-errors flag to write errors to stderr, instead of aborting the transcation.")
}

bar.Finish()
}
7 changes: 5 additions & 2 deletions json.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ func importJSON(c *cli.Context) {
bar := NewProgressBar(file)

i, err := NewJSONImport(db, schema, tableName, "data")
failOnError(err, "Could not prepare import")

reader := bufio.NewReader(io.TeeReader(file, bar))

for {
// We use ReadBytes because it can deal with very long lines
// which happens often with big JSON objects
Expand All @@ -74,7 +76,6 @@ func importJSON(c *cli.Context) {
//todo: Better error handling so that db can close
failOnError(err, "Could not read line")

//todo: not so happy with this part
handleError := func() {
failed++
if c.GlobalBool("ignore-errors") {
Expand All @@ -99,6 +100,8 @@ func importJSON(c *cli.Context) {

}

i.Commit()
// handle error
err = i.Commit()
failOnError(err, "Could not commit")
bar.Finish()
}

0 comments on commit 72608af

Please sign in to comment.