Skip to content

Commit

Permalink
Feat: Streaming Tunnel supports Schema Evolution and provides related…
Browse files Browse the repository at this point in the history
… examples (#38)

* refactor: rename httpNotOk to httpError

* feat(tunnel): add param allow schema mismatch

refactor: rename httpNotOk to httpError

* fix: Modify based on review comments
  • Loading branch information
dingxin-tech authored Oct 30, 2024
1 parent 281e261 commit acf10f7
Show file tree
Hide file tree
Showing 14 changed files with 333 additions and 124 deletions.
2 changes: 1 addition & 1 deletion examples/sdk/tunnel/upload_data_use_stream_tunnel/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func main() {
}
}

func makeRecord(schema tableschema.TableSchema) (data.Record, error) {
func makeRecord(schema *tableschema.TableSchema) (data.Record, error) {
varchar, _ := data.NewVarChar(500, "varchar")
char, _ := data.NewVarChar(254, "char")
s := data.String("hello world")
Expand Down
55 changes: 29 additions & 26 deletions odps/common/http_const.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,32 +36,35 @@ var HttpMethod = struct {
var GMT, _ = time.LoadLocation("GMT")

const (
HttpHeaderDate = "Date"
HttpHeaderContentType = "Content-Type"
HttpHeaderContentMD5 = "Content-MD5"
HttpHeaderContentLength = "Content-Length"
HttpHeaderLastModified = "Last-Modified"
HttpHeaderUserAgent = "User-Agent"
HttpHeaderXOdpsUserAgent = "x-odps-user-agent"
HttpHeaderOdpsOwner = "x-odps-owner"
HttpHeaderOdpsCreationTime = "x-odps-creation-time"
HttpHeaderOdpsRequestId = "x-odps-request-id"
HttpHeaderLocation = "Location"
HttpHeaderOdpsStartTime = "x-odps-start-time"
HttpHeaderOdpsEndTime = "x-odps-end-time"
HttpHeaderSqlTimezone = "x-odps-sql-timezone"
HttpHeaderOdpsSupervisionToken = "odps-x-supervision-token"
HttpHeaderAuthorization = "Authorization"
HttpHeaderAuthorizationSTSToken = "authorization-sts-token"
HttpHeaderAppAuthentication = "application-authentication"
HttpHeaderSTSAuthentication = "sts-authentication"
HttpHeaderSTSToken = "sts-token"
HttpHeaderODPSBearerToken = "x-odps-bearer-token"
HttpHeaderOdpsDateTransFrom = "odps-tunnel-date-transform"
HttpHeaderOdpsTunnelVersion = "x-odps-tunnel-version"
HttpHeaderOdpsSlotNum = "odps-tunnel-slot-num"
HttpHeaderRoutedServer = "odps-tunnel-routed-server"
HttpHeaderTransferEncoding = "Transfer-Encoding"
HttpHeaderDate = "Date"
HttpHeaderContentType = "Content-Type"
HttpHeaderContentMD5 = "Content-MD5"
HttpHeaderContentLength = "Content-Length"
HttpHeaderContentEncoding = "Content-Encoding"
HttpHeaderLastModified = "Last-Modified"
HttpHeaderUserAgent = "User-Agent"
HttpHeaderXOdpsUserAgent = "x-odps-user-agent"
HttpHeaderOdpsOwner = "x-odps-owner"
HttpHeaderOdpsCreationTime = "x-odps-creation-time"
HttpHeaderOdpsRequestId = "x-odps-request-id"
HttpHeaderLocation = "Location"
HttpHeaderOdpsStartTime = "x-odps-start-time"
HttpHeaderOdpsEndTime = "x-odps-end-time"
HttpHeaderSqlTimezone = "x-odps-sql-timezone"
HttpHeaderOdpsSupervisionToken = "odps-x-supervision-token"
HttpHeaderAuthorization = "Authorization"
HttpHeaderAuthorizationSTSToken = "authorization-sts-token"
HttpHeaderAppAuthentication = "application-authentication"
HttpHeaderSTSAuthentication = "sts-authentication"
HttpHeaderSTSToken = "sts-token"
HttpHeaderODPSBearerToken = "x-odps-bearer-token"
HttpHeaderOdpsDateTransFrom = "odps-tunnel-date-transform"
HttpHeaderOdpsTunnelVersion = "x-odps-tunnel-version"
HttpHeaderOdpsSlotNum = "odps-tunnel-slot-num"
HttpHeaderRoutedServer = "odps-tunnel-routed-server"
HttpHeaderTransferEncoding = "Transfer-Encoding"
HttpHeaderOdpsSdkSupportSchemaEvolution = "odps-tunnel-sdk-support-schema-evolution"
HttpHeaderOdpsTunnelLatestSchemaVersion = "odps-tunnel-latest-schema-version"

XMLContentType = "application/xml"
)
7 changes: 1 addition & 6 deletions odps/data/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,7 @@ import "strings"
type Record []Data

func NewRecord(columnNums int) Record {
return make([]Data, 0, columnNums)
}

func (r *Record) Append(d Data) {
s := []Data(*r)
*r = append(s, d)
return make([]Data, columnNums)
}

func (r *Record) Len() int {
Expand Down
2 changes: 1 addition & 1 deletion odps/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (p *Project) Load() error {
p.beLoaded = true

if err != nil {
if httpNoteOk, ok := err.(restclient.HttpNotOk); ok {
if httpNoteOk, ok := err.(restclient.HttpError); ok {
if httpNoteOk.StatusCode == 404 {
p.exists = false
}
Expand Down
58 changes: 46 additions & 12 deletions odps/restclient/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,72 @@
package restclient

import (
"encoding/json"
"encoding/xml"
"fmt"
"io/ioutil"
"net/http"
)

type HttpNotOk struct {
Status string
StatusCode int
RequestId string
Body []byte
type HttpError struct {
Status string
StatusCode int
RequestId string
Body []byte
ErrorMessage *ErrorMessage
Response *http.Response
}

func (e HttpNotOk) Error() string {
type ErrorMessage struct {
ErrorCode string `json:"Code" xml:"Code"`
Message string `json:"Message" xml:"Message"`
RequestId string `json:"RequestId" xml:"RequestId"`
HostId string `json:"HostId" xml:"HostId"`
}

func (e HttpError) Error() string {
if e.RequestId == "" {
return fmt.Sprintf("%s\n%s", e.Status, e.Body)
}

return fmt.Sprintf("requestId=%s\nstatus=%s\n%s", e.RequestId, e.Status, e.Body)
}

func NewHttpNotOk(res *http.Response) HttpNotOk {
func NewHttpNotOk(res *http.Response) HttpError {
var body []byte

if res.Body != nil {
body, _ = ioutil.ReadAll(res.Body)
_ = res.Body.Close()
}

return HttpNotOk{
Status: res.Status,
StatusCode: res.StatusCode,
RequestId: res.Header.Get("x-odps-request-id"),
Body: body,
return HttpError{
Status: res.Status,
StatusCode: res.StatusCode,
RequestId: res.Header.Get("x-odps-request-id"),
Body: body,
ErrorMessage: NewErrorMessage(body),
Response: res,
}
}

func NewErrorMessage(body []byte) *ErrorMessage {
if body == nil {
return nil
}

var errorMessage ErrorMessage

// 尝试解析为 XML
if err := xml.Unmarshal(body, &errorMessage); err == nil {
return &errorMessage
}

// 尝试解析为 JSON
if err := json.Unmarshal(body, &errorMessage); err == nil {
return &errorMessage
}

// 如果都失败了,返回 nil
return nil
}
13 changes: 13 additions & 0 deletions odps/restclient/rest_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,19 @@ func (client *RestClient) NewRequestWithUrlQuery(method, resource string, body i
return req, nil
}

func (client *RestClient) NewRequestWithParamsAndHeaders(method, resource string, body io.Reader, params url.Values, headers map[string]string) (*http.Request, error) {
req, err := client.NewRequestWithUrlQuery(method, resource, body, params)
if err != nil {
return nil, err
}
if headers != nil {
for name, value := range headers {
req.Header.Set(name, value)
}
}
return req, nil
}

func (client *RestClient) Do(req *http.Request) (*http.Response, error) {
req.Header.Set(common.HttpHeaderUserAgent, common.UserAgentValue)
req.Header.Set(common.HttpHeaderXOdpsUserAgent, client.UserAgent())
Expand Down
2 changes: 1 addition & 1 deletion odps/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (s *Schema) Tables() *Tables {
func (s *Schema) Exists() (bool, error) {
err := s.Load()

var httpErr restclient.HttpNotOk
var httpErr restclient.HttpError
if errors.As(err, &httpErr) {
if httpErr.Status == "404 Not Found" {
return false, nil
Expand Down
2 changes: 1 addition & 1 deletion odps/security/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func (sm *Manager) Run(query string, jsonOutput bool, supervisionToken string) (
return errors.WithStack(decoder.Decode(&resModel))
})

if _, ok := err.(restclient.HttpNotOk); ok {
if _, ok := err.(restclient.HttpError); ok {
return nil, errors.WithStack(err)
}

Expand Down
16 changes: 11 additions & 5 deletions odps/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func calculateMaxLabel(labels []string) string {
func (t *Table) Exists() (bool, error) {
err := t.Load()

var httpErr restclient.HttpNotOk
var httpErr restclient.HttpError
if errors.As(err, &httpErr) {
if httpErr.Status == "404 Not Found" {
return false, nil
Expand Down Expand Up @@ -378,12 +378,18 @@ func (t *Table) Delete() error {
sqlBuilder.WriteString(" if exists")

sqlBuilder.WriteRune(' ')
sqlBuilder.WriteString(t.ProjectName())
sqlBuilder.WriteRune('.')
sqlBuilder.WriteString(t.Name())

hints := make(map[string]string)
if t.SchemaName() == "" {
hints["odps.namespace.schema"] = "false"
sqlBuilder.WriteString(fmt.Sprintf("%s.%s", t.ProjectName(), t.Name()))
} else {
hints["odps.namespace.schema"] = "true"
sqlBuilder.WriteString(fmt.Sprintf("%s.%s.%s", t.ProjectName(), t.SchemaName(), t.Name()))
}
sqlBuilder.WriteString(";")

sqlTask := NewSqlTask("SQLDropTableTask", sqlBuilder.String(), nil)
sqlTask := NewSqlTask("SQLDropTableTask", sqlBuilder.String(), hints)
instances := NewInstances(t.odpsIns, t.ProjectName())
i, err := instances.CreateTask(t.ProjectName(), &sqlTask)
if err != nil {
Expand Down
25 changes: 23 additions & 2 deletions odps/tunnel/record_pack_stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package tunnel
import (
"bytes"
"github.com/aliyun/aliyun-odps-go-sdk/odps/data"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"github.com/pkg/errors"
"time"
)
Expand All @@ -45,7 +46,12 @@ func (rsw *RecordPackStreamWriter) Append(record data.Record) error {
if rsw.flushing {
return errors.New("There's an unsuccessful flush called, you should call flush to retry or call reset to drop the data")
}

if !rsw.session.allowSchemaMismatch {
err := checkIfRecordSchemaMatchSessionSchema(&record, rsw.session.schema.Columns)
if err != nil {
return errors.WithStack(err)
}
}
err := rsw.protocWriter.Write(record)
if err == nil {
rsw.recordCount += 1
Expand All @@ -54,6 +60,21 @@ func (rsw *RecordPackStreamWriter) Append(record data.Record) error {
return errors.WithStack(err)
}

func checkIfRecordSchemaMatchSessionSchema(record *data.Record, schema []tableschema.Column) error {
if record.Len() != len(schema) {
return errors.Errorf("Record schema not match session schema, record len: %d, session schema len: %d",
record.Len(), len(schema))
}
for index, recordData := range *record {
colType := schema[index].Type.ID()
if recordData != nil && recordData.Type().ID() != colType {
return errors.Errorf("Record schema not match session schema, index: %d, record type: %s, session schema type: %s",
index, recordData.Type().Name(), schema[index].Type.Name())
}
}
return nil
}

// Flush send all buffered data to server. return (traceId, recordCount, recordBytes, error)
// `recordCount` and `recordBytes` is the count and bytes count of the records uploaded
func (rsw *RecordPackStreamWriter) Flush(timeout_ ...time.Duration) (string, int64, int64, error) {
Expand All @@ -78,7 +99,7 @@ func (rsw *RecordPackStreamWriter) Flush(timeout_ ...time.Duration) (string, int

reqId, bytesSend, err := rsw.session.flushStream(rsw, timeout)
if err != nil {
return "", 0, 0, errors.WithStack(err)
return "", 0, 0, err
}

recordCount := rsw.recordCount
Expand Down
15 changes: 13 additions & 2 deletions odps/tunnel/session_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,15 @@ type sessionConfig struct {
// Columns for stream upload session only
Columns []string
SchemaVersion int
// AllowSchemaMismatch for stream upload session only
AllowSchemaMismatch bool
}

func newSessionConfig(opts ...Option) *sessionConfig {
cfg := &sessionConfig{
Compressor: nil,
SchemaVersion: -1,
Compressor: nil,
SchemaVersion: -1,
AllowSchemaMismatch: true,
}

for _, opt := range opts {
Expand Down Expand Up @@ -127,6 +130,12 @@ func withSchemaVersion(schemaVersion int) Option {
}
}

func withAllowSchemaMismatch(allowSchemaMismatch bool) Option {
return func(cfg *sessionConfig) {
cfg.AllowSchemaMismatch = allowSchemaMismatch
}
}

var SessionCfg = struct {
WithPartitionKey func(string) Option
WithSchemaName func(string) Option
Expand All @@ -140,6 +149,7 @@ var SessionCfg = struct {
WithCreatePartition func() Option
WithColumns func([]string) Option
WithSchemaVersion func(int) Option
WithAllowSchemaMismatch func(bool) Option
}{
WithPartitionKey: withPartitionKey,
WithSchemaName: withSchemaName,
Expand All @@ -153,4 +163,5 @@ var SessionCfg = struct {
WithCreatePartition: withCreatePartition,
WithColumns: withColumns,
WithSchemaVersion: withSchemaVersion,
WithAllowSchemaMismatch: withAllowSchemaMismatch,
}
4 changes: 2 additions & 2 deletions odps/tunnel/slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,12 @@ func newSlotSelect(arr []slot) slotSelector {
}
}

func (s *slotSelector) NextSlot() slot {
func (s *slotSelector) NextSlot() *slot {
if s.index >= len(s.arr) {
s.index = 0
}

e := s.arr[s.index]
e := &s.arr[s.index]
s.index += 1

return e
Expand Down
Loading

0 comments on commit acf10f7

Please sign in to comment.