Skip to content

Commit c07e237

Browse files
authored
feat(influx_tools): Add export to parquet files (#25297)
Adds a command to export data into per-shard parquet files. To do so, the command iterates over the shards, creates a cumulative schema over the series of a measurement (i.e. a super-set of tags and fields) and exports the data to a parquet file per measurement and shard.
1 parent 1fbe319 commit c07e237

File tree

10 files changed

+1425
-12
lines changed

10 files changed

+1425
-12
lines changed

cmd/influx_tools/help/help.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ Usage: influx-tools command [arguments]
3434
The commands are:
3535
3636
export reshapes existing shards to a new shard duration
37+
export-parquet exports existing shards to parquet files
3738
compact-shard fully compacts the specified shard
38-
gen-init creates database and retention policy metadata
39+
gen-init creates database and retention policy metadata
3940
gen-exec generates data
4041
help display this help message
4142

cmd/influx_tools/main.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33

44
import (
55
"errors"
6+
"flag"
67
"fmt"
78
"io"
89
"os"
@@ -15,6 +16,7 @@ import (
1516
geninit "github.com/influxdata/influxdb/cmd/influx_tools/generate/init"
1617
"github.com/influxdata/influxdb/cmd/influx_tools/help"
1718
"github.com/influxdata/influxdb/cmd/influx_tools/importer"
19+
"github.com/influxdata/influxdb/cmd/influx_tools/parquet"
1820
"github.com/influxdata/influxdb/cmd/influx_tools/server"
1921
"github.com/influxdata/influxdb/cmd/influxd/run"
2022
"github.com/influxdata/influxdb/services/meta"
@@ -55,36 +57,41 @@ func (m *Main) Run(args ...string) error {
5557
switch name {
5658
case "", "help":
5759
if err := help.NewCommand().Run(args...); err != nil {
58-
return fmt.Errorf("help failed: %s", err)
60+
return fmt.Errorf("help failed: %w", err)
5961
}
6062
case "compact-shard":
6163
c := compact.NewCommand()
6264
if err := c.Run(args); err != nil {
63-
return fmt.Errorf("compact-shard failed: %s", err)
65+
return fmt.Errorf("compact-shard failed: %w", err)
6466
}
6567
case "export":
6668
c := export.NewCommand(&ossServer{logger: zap.NewNop()})
6769
if err := c.Run(args); err != nil {
68-
return fmt.Errorf("export failed: %s", err)
70+
return fmt.Errorf("export failed: %w", err)
71+
}
72+
case "export-parquet":
73+
c := parquet.NewCommand(&ossServer{logger: zap.NewNop()})
74+
if err := c.Run(args); err != nil && !errors.Is(err, flag.ErrHelp) {
75+
return fmt.Errorf("export failed: %w", err)
6976
}
7077
case "import":
7178
c := importer.NewCommand(&ossServer{logger: zap.NewNop()})
7279
if err := c.Run(args); err != nil {
73-
return fmt.Errorf("import failed: %s", err)
80+
return fmt.Errorf("import failed: %w", err)
7481
}
7582
case "gen-init":
7683
c := geninit.NewCommand(&ossServer{logger: zap.NewNop()})
7784
if err := c.Run(args); err != nil {
78-
return fmt.Errorf("gen-init failed: %s", err)
85+
return fmt.Errorf("gen-init failed: %w", err)
7986
}
8087
case "gen-exec":
8188
deps := genexec.Dependencies{Server: &ossServer{logger: zap.NewNop()}}
8289
c := genexec.NewCommand(deps)
8390
if err := c.Run(args); err != nil {
84-
return fmt.Errorf("gen-exec failed: %s", err)
91+
return fmt.Errorf("gen-exec failed: %w", err)
8592
}
8693
default:
87-
return fmt.Errorf(`unknown command "%s"`+"\n"+`Run 'influx-tools help' for usage`+"\n\n", name)
94+
return fmt.Errorf("unknown command %q\nRun 'influx-tools help' for usage", name)
8895
}
8996

9097
return nil
@@ -106,7 +113,7 @@ func (s *ossServer) Open(path string) (err error) {
106113

107114
// Validate the configuration.
108115
if err = s.config.Validate(); err != nil {
109-
return fmt.Errorf("validate config: %s", err)
116+
return fmt.Errorf("validate config: %w", err)
110117
}
111118

112119
if s.noClient {

cmd/influx_tools/parquet/batcher.go

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
package parquet
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sort"
7+
8+
"go.uber.org/zap"
9+
10+
"github.com/influxdata/influxdb/models"
11+
"github.com/influxdata/influxdb/tsdb"
12+
"github.com/influxdata/influxql"
13+
)
14+
15+
type row struct {
16+
timestamp int64
17+
tags map[string]string
18+
fields map[string]interface{}
19+
}
20+
21+
type batcher struct {
22+
measurement []byte
23+
shard *tsdb.Shard
24+
25+
typeResolutions map[string]influxql.DataType
26+
converter map[string]func(interface{}) (interface{}, error)
27+
nameResolutions map[string]string
28+
29+
series []seriesEntry
30+
start int64
31+
32+
logger *zap.SugaredLogger
33+
}
34+
35+
func (b *batcher) init() error {
36+
// Setup the type converters for the conflicting fields
37+
b.converter = make(map[string]func(interface{}) (interface{}, error), len(b.typeResolutions))
38+
for field, ftype := range b.typeResolutions {
39+
switch ftype {
40+
case influxql.Float:
41+
b.converter[field] = toFloat
42+
case influxql.Unsigned:
43+
b.converter[field] = toUint
44+
case influxql.Integer:
45+
b.converter[field] = toInt
46+
case influxql.Boolean:
47+
b.converter[field] = toBool
48+
case influxql.String:
49+
b.converter[field] = toString
50+
default:
51+
return fmt.Errorf("unknown converter %v for field %q", ftype, field)
52+
}
53+
}
54+
55+
b.start = models.MinNanoTime
56+
57+
return nil
58+
}
59+
60+
func (b *batcher) reset() {
61+
b.start = models.MinNanoTime
62+
}
63+
64+
func (b *batcher) next(ctx context.Context) ([]row, error) {
65+
// Iterate over the series and fields and accumulate the data row-wise
66+
iter, err := b.shard.CreateCursorIterator(ctx)
67+
if err != nil {
68+
return nil, fmt.Errorf("getting cursor iterator for %q failed: %w", string(b.measurement), err)
69+
}
70+
71+
data := make(map[string]map[int64]row, len(b.series))
72+
end := models.MaxNanoTime
73+
var rowCount int
74+
for _, s := range b.series {
75+
data[s.key] = make(map[int64]row, tsdb.DefaultMaxPointsPerBlock)
76+
tags := make(map[string]string, len(s.tags))
77+
for _, t := range s.tags {
78+
tags[string(t.Key)] = string(t.Value)
79+
}
80+
for field := range s.fields {
81+
cursor, err := iter.Next(ctx,
82+
&tsdb.CursorRequest{
83+
Name: b.measurement,
84+
Tags: s.tags,
85+
Field: field,
86+
Ascending: true,
87+
StartTime: b.start,
88+
EndTime: models.MaxNanoTime,
89+
},
90+
)
91+
if err != nil {
92+
return nil, fmt.Errorf("getting cursor for %s-%s failed: %w", s.key, field, err)
93+
}
94+
if cursor == nil {
95+
continue
96+
}
97+
98+
// Prepare mappings
99+
fname := field
100+
if n, found := b.nameResolutions[field]; found {
101+
fname = n
102+
}
103+
converter := identity
104+
if c, found := b.converter[field]; found {
105+
converter = c
106+
}
107+
fieldEnd := models.MaxNanoTime
108+
109+
c, err := newValueCursor(cursor)
110+
if err != nil {
111+
return nil, fmt.Errorf("creating value cursor failed: %w", err)
112+
}
113+
114+
for {
115+
// Check if we do still have data
116+
timestamp, ok := c.peek()
117+
if !ok {
118+
break
119+
}
120+
121+
timestamp, value := c.next()
122+
v, err := converter(value)
123+
if err != nil {
124+
b.logger.Errorf("converting %v of field %q failed: %v", value, field, err)
125+
continue
126+
}
127+
128+
if _, found := data[s.key][timestamp]; !found {
129+
data[s.key][timestamp] = row{
130+
timestamp: timestamp,
131+
tags: tags,
132+
fields: make(map[string]interface{}),
133+
}
134+
rowCount++
135+
}
136+
137+
data[s.key][timestamp].fields[fname] = v
138+
fieldEnd = timestamp
139+
}
140+
141+
c.close()
142+
end = min(end, fieldEnd)
143+
}
144+
}
145+
if len(data) == 0 {
146+
return nil, nil
147+
}
148+
149+
// Extract the rows ordered by timestamp
150+
rows := make([]row, 0, rowCount)
151+
for _, tmap := range data {
152+
for _, r := range tmap {
153+
rows = append(rows, r)
154+
}
155+
}
156+
sort.Slice(rows, func(i, j int) bool { return rows[i].timestamp < rows[j].timestamp })
157+
158+
// Only include rows that are before the end-timestamp to avoid duplicate
159+
// or incomplete entries due to not iterating through all data
160+
n := sort.Search(len(rows), func(i int) bool { return rows[i].timestamp > end })
161+
162+
// Remember the earliest datum to use this for the next batch excluding the entry itself
163+
b.start = end + 1
164+
165+
return rows[:n], nil
166+
}

cmd/influx_tools/parquet/command.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package parquet
2+
3+
import (
4+
"context"
5+
"errors"
6+
"flag"
7+
"fmt"
8+
"io"
9+
"os"
10+
11+
"go.uber.org/zap"
12+
13+
"github.com/influxdata/influxdb/cmd/influx_tools/server"
14+
internal_errors "github.com/influxdata/influxdb/pkg/errors"
15+
)
16+
17+
// Command represents the program execution for "store query".
18+
type Command struct {
19+
// Standard input/output, overridden for testing.
20+
Stderr io.Writer
21+
Logger *zap.Logger
22+
23+
server server.Interface
24+
}
25+
26+
// NewCommand returns a new instance of the export Command.
27+
func NewCommand(server server.Interface) *Command {
28+
return &Command{
29+
Stderr: os.Stderr,
30+
server: server,
31+
}
32+
}
33+
34+
// Run executes the export command using the specified args.
35+
func (cmd *Command) Run(args []string) (err error) {
36+
var (
37+
configPath string
38+
database string
39+
rp string
40+
measurements string
41+
typeResolutions string
42+
nameResolutions string
43+
output string
44+
dryRun bool
45+
)
46+
47+
cwd, err := os.Getwd()
48+
if err != nil {
49+
return fmt.Errorf("getting current working directory failed: %w", err)
50+
}
51+
52+
flags := flag.NewFlagSet("export-parquet", flag.ContinueOnError)
53+
flags.StringVar(&configPath, "config", "", "Config file of the InfluxDB v1 instance")
54+
flags.StringVar(&database, "database", "", "Database to export")
55+
flags.StringVar(&rp, "rp", "", "Retention policy in the database to export (default: default RP of the DB)")
56+
flags.StringVar(&measurements, "measurements", "*", "Comma-separated list of measurements to export")
57+
flags.StringVar(&typeResolutions, "resolve-types", "", "Comma-separated list of field type resolutions in the form <measurements>.<field>=<type>")
58+
flags.StringVar(&nameResolutions, "resolve-names", "", "Comma-separated list of field renamings in the form <measurements>.<field>=<new name>")
59+
flags.StringVar(&output, "output", cwd, "Output directory for exported parquet files")
60+
flags.BoolVar(&dryRun, "dry-run", false, "Print plan and exit")
61+
62+
if err := flags.Parse(args); err != nil {
63+
return fmt.Errorf("parsing flags failed: %w", err)
64+
}
65+
66+
if database == "" {
67+
return errors.New("database is required")
68+
}
69+
70+
loggerCfg := zap.NewDevelopmentConfig()
71+
loggerCfg.DisableStacktrace = true
72+
loggerCfg.DisableCaller = true
73+
cmd.Logger, err = loggerCfg.Build()
74+
if err != nil {
75+
return fmt.Errorf("creating logger failed: %w", err)
76+
}
77+
78+
if err := cmd.server.Open(configPath); err != nil {
79+
return fmt.Errorf("opening server failed: %w", err)
80+
}
81+
defer cmd.server.Close()
82+
83+
cfg := &config{
84+
Database: database,
85+
RP: rp,
86+
Measurements: measurements,
87+
TypeResolutions: typeResolutions,
88+
NameResolutions: nameResolutions,
89+
Output: output,
90+
}
91+
exp, err := newExporter(cmd.server, cfg, cmd.Logger)
92+
if err != nil {
93+
return err
94+
}
95+
96+
ctx := context.Background()
97+
if err := exp.open(ctx); err != nil {
98+
return fmt.Errorf("opening exporter failed: %w", err)
99+
}
100+
defer internal_errors.Capture(&err, exp.close)()
101+
102+
exp.printPlan(cmd.Stderr)
103+
104+
if dryRun {
105+
return nil
106+
}
107+
108+
return exp.export(ctx)
109+
}

0 commit comments

Comments
 (0)