Skip to content

Commit

Permalink
adjust to check and use the ordered lright list of columns to the copy
Browse files Browse the repository at this point in the history
  • Loading branch information
willyrgf committed Nov 7, 2020
1 parent 0d30731 commit cbb2ade
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 2 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ go 1.14

require (
github.com/go-co-op/gocron v0.3.1
github.com/google/uuid v1.1.2
github.com/jackc/pgconn v1.6.4
github.com/jackc/pgx v3.6.2+incompatible
github.com/jackc/pgx/v4 v4.8.1
github.com/jasonlvhit/gocron v0.0.1
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXi
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
Expand Down Expand Up @@ -316,6 +318,7 @@ golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7 h1:fHDIZ2oxGnUZRN6WgWFCbYBjH9uqVPRCUVUDhs0wnbA=
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b h1:0mm1VjtFUOIlE1SbDlwjYaDxZVDP2S5ou6y0gSgXHu8=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down
37 changes: 35 additions & 2 deletions syncer/repository_pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,24 @@ import (
"context"
"fmt"

"github.com/cryp-com-br/pg-syncer/helpers"
"github.com/cryp-com-br/pg-syncer/repository"
"github.com/google/uuid"
"github.com/jackc/pgconn"
pgx "github.com/jackc/pgx/v4"
log "github.com/sirupsen/logrus"
)

// getPreparedColumns
func (s *Service) getPreparedColumns(ctx context.Context, prepare *pgconn.StatementDescription) (columns []string) {

for _, f := range prepare.Fields {
columns = append(columns, string(f.Name))
}

return
}

// getTableColumns
func (s *Service) getTableColumns(ctx context.Context, sourceConn *repository.PostgresConn, destinationConn *repository.PostgresConn, table string) (columns []string, err error) {
rows, err := destinationConn.Conn.Query(ctx, "select column_name from information_schema.columns where table_name = $1", table)
Expand Down Expand Up @@ -43,13 +56,33 @@ func (s *Service) truncateTable(ctx context.Context, conn *repository.PostgresCo

// copyFromSelect
func (s *Service) copyFromSelect(ctx context.Context, sourceConn *repository.PostgresConn, destinationConn *repository.PostgresConn) (err error) {
// get columns
// TODO: use the prepared statement to execute query
// get source columns
prepare, err := sourceConn.Conn.Conn().Prepare(ctx, uuid.New().String(), s.Access.SourceQuery)
if err != nil {
log.Errorf("service.copyFromSelect(): sourceConn.Conn().Prepare(ctx, pre, s.Access.SourceQuery) error=%w", err)
return
}

sourceColumns := s.getPreparedColumns(ctx, prepare)

// get destination columns
destinationColumns, err := s.getTableColumns(ctx, sourceConn, destinationConn, s.Access.DestinationTable)
if err != nil {
log.Errorf("service.copyFromSelect(): s.getTableColumns() error=%w", err)
return
}

log.Debugf("service.copyFromSelect(): helpers.ArraysIsEqual(): is=", helpers.ArraysIsEqual(sourceColumns, destinationColumns))
if !helpers.ArraysIsEqual(sourceColumns, destinationColumns) {
err = fmt.Errorf("the list of columns of the prepared query and the destination table don't match")
log.Errorf("service.copyFromSelect(): helpers.ArraysIsEqual(): err=%w", err)
return
}

log.Debugf("service.copyFromSelect() sourceColumns=%+v", sourceColumns)
log.Debugf("service.copyFromSelect() destinationColumns=%+v", destinationColumns)

// get the data to copy then
rows, err := sourceConn.Conn.Query(ctx, s.Access.SourceQuery)
if err != nil {
Expand All @@ -59,7 +92,7 @@ func (s *Service) copyFromSelect(ctx context.Context, sourceConn *repository.Pos

destinationIdentifier := pgx.Identifier{s.Access.DestinationSchema, s.Access.DestinationTable}

copyCount, err := destinationConn.Conn.CopyFrom(ctx, destinationIdentifier, destinationColumns, rows)
copyCount, err := destinationConn.Conn.CopyFrom(ctx, destinationIdentifier, sourceColumns, rows)
if err != nil {
log.Errorf("service.copyFromSelect(): sourceConn.Conn.CopyFrom() error=%w", err)
return
Expand Down
2 changes: 2 additions & 0 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ func Start(ctx context.Context, s Syncers, c *Config) (err error) {
}

}

scheduler.RunAll()
scheduler.StartBlocking()

return nil
Expand Down

0 comments on commit cbb2ade

Please sign in to comment.