Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage/dataflux): add dataflux interface #10748

Merged
merged 46 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
0e7d8ad
dataflux initial commit
akansha1812 Aug 22, 2024
b21866c
interface for range splitter
akansha1812 Aug 22, 2024
a3e269e
add a valid license header
akansha1812 Aug 22, 2024
863fa26
add a valid license header to all dataflux files
akansha1812 Aug 22, 2024
10dd4a8
Merge branch 'main' into main
akansha1812 Aug 22, 2024
78090cd
Merge branch 'googleapis:main' into main
akansha1812 Aug 26, 2024
03aecaa
resolved comments
akansha1812 Aug 26, 2024
d3056e0
update sequential listing comment
akansha1812 Aug 26, 2024
b48d583
add end-to-end sequential listing
akansha1812 Aug 26, 2024
e246c3f
Merge branch 'main' into main
akansha1812 Aug 27, 2024
b1e7712
adding basic integration test
akansha1812 Aug 27, 2024
39de969
Merge branch 'main' into main
akansha1812 Aug 27, 2024
db3ad80
Merge branch 'googleapis:main' into main
akansha1812 Aug 28, 2024
98b632f
Merge branch 'googleapis:main' into main
akansha1812 Sep 3, 2024
c7787fb
Merge branch 'main' into main
akansha1812 Sep 4, 2024
8313e50
Merge branch 'main' into main
akansha1812 Sep 4, 2024
f2b5dc3
Merge branch 'main' into main
akansha1812 Sep 11, 2024
c90c6fc
chore: update gapic-generator-go to 0.47.0 (#10848)
gcf-owl-bot[bot] Sep 11, 2024
39ee892
chore(main): release auth 0.9.4 (#10846)
release-please[bot] Sep 11, 2024
bb69df7
feat(firestore): Adding distance threshold and result field (#10802)
bhshkh Sep 11, 2024
0feb258
remove pagination within sequential listing
akansha1812 Sep 11, 2024
ab44946
Merge branch 'main' into main
akansha1812 Sep 11, 2024
8b147b2
remove nextBatch_all example
akansha1812 Sep 11, 2024
8f8b7ac
remove close function from NewLister return value
akansha1812 Sep 11, 2024
18cffa6
remove close function from NewLister return value
akansha1812 Sep 11, 2024
67bb404
change listing as method of Lister
akansha1812 Sep 13, 2024
4691742
Merge branch 'main' into main
akansha1812 Sep 13, 2024
4dc54ab
Merge branch 'googleapis:main' into main
akansha1812 Sep 13, 2024
38abf10
Merge branch 'googleapis:main' into main
akansha1812 Sep 13, 2024
ae91a8a
Merge branch 'googleapis:main' into main
akansha1812 Sep 16, 2024
206f47e
chore: update gapic-generator-go to 0.47.0 (#10848)
gcf-owl-bot[bot] Sep 11, 2024
2d586ea
chore: update gapic-generator-go to 0.47.0 (#10848)
gcf-owl-bot[bot] Sep 11, 2024
71cb718
chore: update gapic-generator-go to 0.47.0 (#10848)
gcf-owl-bot[bot] Sep 11, 2024
31aa908
add integration test for next batch
akansha1812 Sep 16, 2024
39c8fd2
Reset extra file
akansha1812 Sep 17, 2024
2c8faf4
Merge branch 'googleapis:main' into main
akansha1812 Sep 18, 2024
64fbc61
sequential list to return object instead of pointer
akansha1812 Sep 18, 2024
d28cda2
fetch 5000 objects from gcs
akansha1812 Sep 18, 2024
081b8a7
fetch 5000 objects from gcs
akansha1812 Sep 18, 2024
b84b324
make worker status unexported, round up batchsize comment
akansha1812 Sep 19, 2024
7b3441e
Merge branch 'main' into main
akansha1812 Sep 19, 2024
e1b7da8
Merge branch 'main' into main
akansha1812 Sep 19, 2024
ed1e505
Merge branch 'googleapis:main' into main
akansha1812 Sep 19, 2024
367172c
add comment to use no acl
akansha1812 Sep 19, 2024
af913aa
update with go mod tidy
akansha1812 Sep 19, 2024
df9384a
Merge branch 'main' into main
akansha1812 Sep 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions storage/dataflux/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2024 Google LLC
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/*
Package dataflux provides an easy way to parallelize listing in Google
Cloud Storage.

More information about Google Cloud Storage is available at
https://cloud.google.com/storage/docs.

See https://pkg.go.dev/cloud.google.com/go for authentication, timeouts,
connection pooling and similar aspects of this package.

NOTE: This package is in preview. It is not stable, and is likely to change.
*/
package dataflux // import "cloud.google.com/go/storage/dataflux"
95 changes: 95 additions & 0 deletions storage/dataflux/fast_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dataflux

import (
"context"

"cloud.google.com/go/storage"
)

// listingMethod represents the method of listing.
type listingMethod int

const (
// open when any method can be used to list.
open listingMethod = iota
// sequential when the listing is done sequentially.
sequential
// worksteal when the listing is done using work stealing algorithm.
worksteal
)

// Input contains options for listing objects.
type Input struct {
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
// Client is the storage client to list objects from.
Client *storage.Client
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
// BucketName is the name of the bucket to list objects from.
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
BucketName string
// Parallelism is number of parallel workers to use for listing.
Parallelism int
// pageSize is the number of objects to list.
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
BatchSize int
// query is the query to filter objects for listing.
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
Query storage.Query
// SkipDirectoryObjects is to indicate whether to list directory objects.
SkipDirectoryObjects bool
}

// Lister is used for interacting with Dataflux fast-listing.
// The caller should initialize it with NewLister() instead of creating it directly.
type Lister struct {
// method indicates the listing method(open, sequential, worksteal) to be used for listing.
method listingMethod
// pageToken is the token to use for sequential listing.
pageToken string
// ranges is the channel to store the start and end ranges to be listed by the workers in worksteal listing.
ranges chan *listRange
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
// bucket is the bucket handle to list objects from.
bucket *storage.BucketHandle
// parallelism is number of parallel workers to use for listing.
parallelism int
// batchSize is the number of objects to list.
batchSize int
// query is the query to filter objects for listing.
query storage.Query
// skipDirectoryObjects is to indicate whether to list directory objects.
skipDirectoryObjects bool
}

// CloseFunc is the function to close the range channel of a Lister.
type CloseFunc func()
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved

// NewLister creates a new dataflux Lister to list objects in the give bucket.
func NewLister(opts *Input) (*Lister, CloseFunc) {

lister := &Lister{}
return lister, func() { lister.Close() }
}

// NextBatch runs worksteal algorithm and sequential listing in parallel to quickly
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
// return a list of objects in the bucket. For buckets with smaller dataset,
// sequential listing is expected to be faster. For buckets with larger dataset,
// worksteal listing is expected to be faster.
func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error) {
return nil, nil
}

// Close closes the range channel of the Lister.
func (c *Lister) Close() {
if c.ranges != nil {
close(c.ranges)
}
}
62 changes: 62 additions & 0 deletions storage/dataflux/object_lister.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dataflux

import (
"context"

"cloud.google.com/go/storage"
)

// newObjectListerOpts specifies options for instantiating the NewObjectLister.
type newObjectListerOpts struct {
// startRange is the start offset of the objects to be listed.
startRange string
// endRange is the end offset of the objects to be listed.
endRange string
// bucketHandle is the bucket handle of the bucket to be listed.
bucketHandle *storage.BucketHandle
// query is the storage.Query to filter objects for listing.
query storage.Query
// skipDirectoryObjects is to indicate whether to list directory objects.
skipDirectoryObjects bool
// generation is the generation number of the last object in the page.
generation int64
}

// nextPageResult holds the next page of object names and indicates whether the
// lister has completed listing (no more objects to retrieve).
type nextPageResult struct {
// items is the list of objects listed.
items []*storage.ObjectAttrs
// doneListing indicates whether the lister has completed listing.
doneListing bool
// nextStartRange is the start offset of the next page of objects to be listed.
nextStartRange string
// generation is the generation number of the last object in the page.
generation int64
}

// newObjectLister creates a new ObjectLister using the given lister options.
func newObjectLister(ctx context.Context, opts newObjectListerOpts) (*nextPageResult, error) {
return &nextPageResult{}, nil
}

func addPrefix(name, prefix string) string {
if name != "" {
return prefix + name
}
return name
}
43 changes: 43 additions & 0 deletions storage/dataflux/range_splitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dataflux

import (
"sync"
)

// rangeSplitter specifies the a list and a map of sorted alphabets.
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
type rangeSplitter struct {
mu sync.Mutex
sortedAlphabet *[]rune
alphabetMap map[rune]int
}

// listRange specifies the start and end range for the range splitter.
type listRange struct {
startRange string
endRange string
}

// newRangeSplitter creates a new RangeSplitter with the given alphabets.
func newRangeSplitter(alphabet string) (*rangeSplitter, error) {

return &rangeSplitter{}, nil
}

// splitRange creates a given number of splits based on a provided start and end range.
func (rs *rangeSplitter) splitRange(startRange, endRange string, numSplits int) ([]string, error) {
return nil, nil
}
28 changes: 28 additions & 0 deletions storage/dataflux/sequential.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dataflux

import (
"context"

"cloud.google.com/go/storage"
)

// Listing performs a sequential listing on the given bucket.
// It returns a list of objects and the next token to use to continue listing.
// If the next token is empty, then listing is complete.
func Listing(ctx context.Context, opts Lister) ([]*storage.ObjectAttrs, string, error) {
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
return nil, "", nil
}
54 changes: 54 additions & 0 deletions storage/dataflux/worksteal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dataflux

import (
"context"
"sync"

"cloud.google.com/go/storage"
)

// WorkerStatus indicates the status of a worker.
type WorkerStatus int
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved

const (
// Idle status shows that the worker is currently not listing.
Idle WorkerStatus = iota
// Active status shows that the worker is currently listing objects within assigned range.
Active
)

type listerResult struct {
mu sync.Mutex
objects []*storage.ObjectAttrs
}

type worker struct {
goroutineID int
startRange string
endRange string
status WorkerStatus
rangesplitter *rangeSplitter
idleChannel chan int
result *listerResult
generation int64
}

// workstealListing is the main entry point of the worksteal algorithm.
// It performs worksteal to achieve highly dynamic object listing.
func workstealListing(ctx context.Context, opts Lister) ([]*storage.ObjectAttrs, error) {
return nil, nil
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
}