Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
23 changes: 22 additions & 1 deletion buf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,52 @@ lint:
- FIELD_LOWER_SNAKE_CASE
ignore_only:
ENUM_VALUE_PREFIX:
- flyteidl2/common/configuration.proto
- flyteidl2/common/list.proto
- flyteidl2/common/runtime_version.proto
- flyteidl2/core/artifact_id.proto
- flyteidl2/core/catalog.proto
- flyteidl2/core/errors.proto
- flyteidl2/event/event.proto
- flyteidl2/core/execution.proto
- flyteidl2/core/identifier.proto
- flyteidl2/core/security.proto
- flyteidl2/core/tasks.proto
- flyteidl2/core/types.proto
- flyteidl2/datacatalog/datacatalog.proto
- flyteidl2/logs/dataplane/payload.proto
- flyteidl2/secret/definition.proto
- flyteidl2/plugins/spark.proto
- flyteidl2/task/common.proto
- flyteidl2/plugins/kubeflow/common.proto
ENUM_ZERO_VALUE_SUFFIX:
- flyteidl2/common/authorization.proto
- flyteidl2/plugins/common.proto
- flyteidl2/common/list.proto
- flyteidl2/common/role.proto
- flyteidl2/common/runtime_version.proto
- flyteidl2/core/artifact_id.proto
- flyteidl2/core/catalog.proto
- flyteidl2/core/errors.proto
- flyteidl2/event/event.proto
- flyteidl2/core/execution.proto
- flyteidl2/core/identifier.proto
- flyteidl2/core/security.proto
- flyteidl2/core/tasks.proto
- flyteidl2/core/types.proto
- flyteidl2/datacatalog/datacatalog.proto
- flyteidl2/logs/dataplane/payload.proto
- flyteidl2/secret/definition.proto
- flyteidl2/plugins/spark.proto
- flyteidl2/plugins/spark.proto
- flyteidl2/plugins/kubeflow/common.proto
RPC_REQUEST_RESPONSE_UNIQUE:
- flyteidl2/cacheservice/cacheservice.proto
- flyteidl2/cacheservice/v2/cacheservice.proto
RPC_REQUEST_STANDARD_NAME:
- flyteidl2/cacheservice/cacheservice.proto
- flyteidl2/cacheservice/v2/cacheservice.proto
RPC_RESPONSE_STANDARD_NAME:
- flyteidl2/cacheservice/cacheservice.proto
- flyteidl2/cacheservice/v2/cacheservice.proto
SERVICE_SUFFIX:
- flyteidl2/datacatalog/datacatalog.proto
147 changes: 147 additions & 0 deletions flyteidl2/cacheservice/cacheservice.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
syntax = "proto3";

package flyteidl2.cacheservice;

import "flyteidl2/core/identifier.proto";
import "flyteidl2/core/literals.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";

option go_package = "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/cacheservice";

/*
* CacheService defines operations for cache management including retrieval, storage, and deletion of cached task/workflow outputs.
*/
service CacheService {
// Retrieves cached data by key.
rpc Get(GetCacheRequest) returns (GetCacheResponse);

// Stores or updates cached data by key.
rpc Put(PutCacheRequest) returns (PutCacheResponse);

// Deletes cached data by key.
rpc Delete(DeleteCacheRequest) returns (DeleteCacheResponse);

// Get or extend a reservation for a cache key
rpc GetOrExtendReservation(GetOrExtendReservationRequest) returns (GetOrExtendReservationResponse);

// Release the reservation for a cache key
rpc ReleaseReservation(ReleaseReservationRequest) returns (ReleaseReservationResponse);
}

/*
* Additional metadata as key-value pairs
*/
message KeyMapMetadata {
map<string, string> values = 1; // Additional metadata as key-value pairs
}

/*
* Metadata for cached outputs, including the source identifier and timestamps.
*/
message Metadata {
core.Identifier source_identifier = 1; // Source task or workflow identifier
KeyMapMetadata key_map = 2; // Additional metadata as key-value pairs
google.protobuf.Timestamp created_at = 3; // Creation timestamp
google.protobuf.Timestamp last_updated_at = 4; // Last update timestamp
}

/*
* Represents cached output, either as literals or an URI, with associated metadata.
*/
message CachedOutput {
oneof output {
flyteidl2.core.LiteralMap output_literals = 1; // Output literals
string output_uri = 2; // URI to output data
}
Metadata metadata = 3; // Associated metadata
}

/*
* Request to retrieve cached data by key.
*/
message GetCacheRequest {
string key = 1; // Cache key
}

/*
* Response with cached data for a given key.
*/
message GetCacheResponse {
CachedOutput output = 1; // Cached output
}

message OverwriteOutput {
bool overwrite = 1; // Overwrite flag
bool delete_blob = 2; // Delete existing blob
google.protobuf.Duration max_age = 3; // Maximum age of the cached output since last update
}

/*
* Request to store/update cached data by key.
*/
message PutCacheRequest {
string key = 1; // Cache key
CachedOutput output = 2; // Output to cache
OverwriteOutput overwrite = 3; // Overwrite flag if exists
}

/*
* Response message of cache store/update operation.
*/
message PutCacheResponse {
// Empty, success indicated by no errors
}

/*
* Request to delete cached data by key.
*/
message DeleteCacheRequest {
string key = 1; // Cache key
}

/*
* Response message of cache deletion operation.
*/
message DeleteCacheResponse {
// Empty, success indicated by no errors
}

// A reservation including owner, heartbeat interval, expiration timestamp, and various metadata.
message Reservation {
string key = 1; // The unique ID for the reservation - same as the cache key
string owner_id = 2; // The unique ID of the owner for the reservation
google.protobuf.Duration heartbeat_interval = 3; // Requested reservation extension heartbeat interval
google.protobuf.Timestamp expires_at = 4; // Expiration timestamp of this reservation
}

/*
* Request to get or extend a reservation for a cache key
*/
message GetOrExtendReservationRequest {
string key = 1; // The unique ID for the reservation - same as the cache key
string owner_id = 2; // The unique ID of the owner for the reservation
google.protobuf.Duration heartbeat_interval = 3; // Requested reservation extension heartbeat interval
}

/*
* Request to get or extend a reservation for a cache key
*/
message GetOrExtendReservationResponse {
Reservation reservation = 1; // The reservation that was created or extended
}

/*
* Request to release the reservation for a cache key
*/
message ReleaseReservationRequest {
string key = 1; // The unique ID for the reservation - same as the cache key
string owner_id = 2; // The unique ID of the owner for the reservation
}

/*
* Response message of release reservation operation.
*/
message ReleaseReservationResponse {
// Empty, success indicated by no errors
}
78 changes: 78 additions & 0 deletions flyteidl2/cacheservice/v2/cacheservice.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
syntax = "proto3";

package flyteidl2.cacheservice.v2;

import "buf/validate/validate.proto";
import "flyteidl2/cacheservice/cacheservice.proto";

option go_package = "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/cacheservice/v2";

/*
* CacheService defines operations for cache management including retrieval, storage, and deletion of cached task/workflow outputs.
*/
service CacheService {
// Retrieves cached data by key.
rpc Get(GetCacheRequest) returns (flyteidl2.cacheservice.GetCacheResponse);

// Stores or updates cached data by key.
rpc Put(PutCacheRequest) returns (flyteidl2.cacheservice.PutCacheResponse);

// Deletes cached data by key.
rpc Delete(DeleteCacheRequest) returns (flyteidl2.cacheservice.DeleteCacheResponse);

// Get or extend a reservation for a cache key
rpc GetOrExtendReservation(GetOrExtendReservationRequest) returns (flyteidl2.cacheservice.GetOrExtendReservationResponse);

// Release the reservation for a cache key
rpc ReleaseReservation(ReleaseReservationRequest) returns (flyteidl2.cacheservice.ReleaseReservationResponse);
}

/*
* Identifier for cache operations, including org, project, and domain.
* This is used to scope cache operations to specific organizational contexts.
*/
message Identifier {
string org = 1 [(buf.validate.field).string.min_len = 1]; // Organization identifier
string project = 2 [(buf.validate.field).string.min_len = 1]; // Project identifier
string domain = 3 [(buf.validate.field).string.min_len = 1]; // Domain identifier
}

/*
* Request to retrieve cached data by key.
*/
message GetCacheRequest {
flyteidl2.cacheservice.GetCacheRequest base_request = 1;
Identifier identifier = 2 [(buf.validate.field).required = true]; // Identifier for the cache operation
}

/*
* Request to store/update cached data by key.
*/
message PutCacheRequest {
flyteidl2.cacheservice.PutCacheRequest base_request = 1;
Identifier identifier = 2 [(buf.validate.field).required = true]; // Identifier for the cache operation
}

/*
* Request to delete cached data by key.
*/
message DeleteCacheRequest {
flyteidl2.cacheservice.DeleteCacheRequest base_request = 1;
Identifier identifier = 2 [(buf.validate.field).required = true]; // Identifier for the cache operation
}

/*
* Request to get or extend a reservation for a cache key
*/
message GetOrExtendReservationRequest {
flyteidl2.cacheservice.GetOrExtendReservationRequest base_request = 1;
Identifier identifier = 2 [(buf.validate.field).required = true]; // Identifier for the cache operation
}

/*
* Request to release the reservation for a cache key
*/
message ReleaseReservationRequest {
flyteidl2.cacheservice.ReleaseReservationRequest base_request = 1;
Identifier identifier = 2 [(buf.validate.field).required = true]; // Identifier for the cache operation
}
107 changes: 107 additions & 0 deletions flyteidl2/clients/go/coreutils/extract_literal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// extract_literal.go
// Utility methods to extract a native golang value from a given Literal.
// Usage:
// 1] string literal extraction
// lit, _ := MakeLiteral("test_string")
// val, _ := ExtractFromLiteral(lit)
// 2] integer literal extraction. integer would be extracted in type int64.
// lit, _ := MakeLiteral([]interface{}{1, 2, 3})
// val, _ := ExtractFromLiteral(lit)
// 3] float literal extraction. float would be extracted in type float64.
// lit, _ := MakeLiteral([]interface{}{1.0, 2.0, 3.0})
// val, _ := ExtractFromLiteral(lit)
// 4] map of boolean literal extraction.
// mapInstance := map[string]interface{}{
// "key1": []interface{}{1, 2, 3},
// "key2": []interface{}{5},
// }
// lit, _ := MakeLiteral(mapInstance)
// val, _ := ExtractFromLiteral(lit)
// For further examples check the test TestFetchLiteral in extract_literal_test.go

package coreutils

import (
"fmt"

"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/core"
)

func ExtractFromLiteral(literal *core.Literal) (interface{}, error) {
switch literalValue := literal.Value.(type) {
case *core.Literal_Scalar:
switch scalarValue := literalValue.Scalar.Value.(type) {
case *core.Scalar_Primitive:
switch scalarPrimitive := scalarValue.Primitive.Value.(type) {
case *core.Primitive_Integer:
scalarPrimitiveInt := scalarPrimitive.Integer
return scalarPrimitiveInt, nil
case *core.Primitive_FloatValue:
scalarPrimitiveFloat := scalarPrimitive.FloatValue
return scalarPrimitiveFloat, nil
case *core.Primitive_StringValue:
scalarPrimitiveString := scalarPrimitive.StringValue
return scalarPrimitiveString, nil
case *core.Primitive_Boolean:
scalarPrimitiveBoolean := scalarPrimitive.Boolean
return scalarPrimitiveBoolean, nil
case *core.Primitive_Datetime:
scalarPrimitiveDateTime := scalarPrimitive.Datetime.AsTime()
return scalarPrimitiveDateTime, nil
case *core.Primitive_Duration:
scalarPrimitiveDuration := scalarPrimitive.Duration.AsDuration()
return scalarPrimitiveDuration, nil
default:
return nil, fmt.Errorf("unsupported literal scalar primitive type %T", scalarValue)
}
case *core.Scalar_Binary:
return scalarValue.Binary, nil
case *core.Scalar_Blob:
return scalarValue.Blob.Uri, nil
case *core.Scalar_Schema:
return scalarValue.Schema.Uri, nil
case *core.Scalar_Generic:
return scalarValue.Generic, nil
case *core.Scalar_StructuredDataset:
return scalarValue.StructuredDataset.Uri, nil
case *core.Scalar_Union:
// extract the value of the union but not the actual union object
extractedVal, err := ExtractFromLiteral(scalarValue.Union.Value)
if err != nil {
return nil, err
}
return extractedVal, nil
case *core.Scalar_NoneType:
return nil, nil
default:
return nil, fmt.Errorf("unsupported literal scalar type %T", scalarValue)
}
case *core.Literal_Collection:
collectionValue := literalValue.Collection.Literals
collection := make([]interface{}, len(collectionValue))
for index, val := range collectionValue {
if collectionElem, err := ExtractFromLiteral(val); err == nil {
collection[index] = collectionElem
} else {
return nil, err
}
}
return collection, nil
case *core.Literal_Map:
mapLiteralValue := literalValue.Map.Literals
mapResult := make(map[string]interface{}, len(mapLiteralValue))
for key, val := range mapLiteralValue {
if val, err := ExtractFromLiteral(val); err == nil {
mapResult[key] = val
} else {
return nil, err
}
}
return mapResult, nil
case *core.Literal_OffloadedMetadata:
// Return the URI of the offloaded metadata to be used when displaying in flytectl
return literalValue.OffloadedMetadata.Uri, nil

}
return nil, fmt.Errorf("unsupported literal type %T", literal)
}
Loading