Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions core/capabilities/vault/capability.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (s *Capability) Execute(ctx context.Context, request capabilities.Capabilit
}

func (s *Capability) CreateSecrets(ctx context.Context, request *vaultcommon.CreateSecretsRequest) (*vaulttypes.Response, error) {
s.lggr.Infof("Received Request: %s", request.String())
s.lggr.Infof("Received Request: %s", request.RequestId)
err := s.ValidateCreateSecretsRequest(s.publicKey.Get(), request)
if err != nil {
s.lggr.Infof("RequestId: [%s] failed validation checks: %s", request.RequestId, err.Error())
Expand All @@ -169,12 +169,12 @@ func (s *Capability) CreateSecrets(ctx context.Context, request *vaultcommon.Cre
return nil, errors.New("secret ID owner: " + req.Id.Owner + " does not match authorized owner: " + owner + " at index " + strconv.Itoa(idx))
}
}
s.lggr.Infof("Processing authorized and normalized request [%s]", request.String())
s.lggr.Infof("Processing authorized and normalized request [%s]", request.RequestId)
return s.handleRequest(ctx, request.RequestId, request)
}

func (s *Capability) UpdateSecrets(ctx context.Context, request *vaultcommon.UpdateSecretsRequest) (*vaulttypes.Response, error) {
s.lggr.Infof("Received Request: %s", request.String())
s.lggr.Infof("Received Request: %s", request.RequestId)
err := s.ValidateUpdateSecretsRequest(s.publicKey.Get(), request)
if err != nil {
s.lggr.Infof("RequestId: [%s] failed validation checks: %s", request.RequestId, err.Error())
Expand All @@ -197,7 +197,7 @@ func (s *Capability) UpdateSecrets(ctx context.Context, request *vaultcommon.Upd
return nil, errors.New("secret ID owner: " + req.Id.Owner + " does not match authorized owner: " + owner + " at index " + strconv.Itoa(idx))
}
}
s.lggr.Infof("Processing authorized and normalized request [%s]", request.String())
s.lggr.Infof("Processing authorized and normalized request [%s]", request.RequestId)
return s.handleRequest(ctx, request.RequestId, request)
}

Expand Down
6 changes: 3 additions & 3 deletions core/capabilities/vault/gw_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (h *GatewayHandler) Methods() []string {
}

func (h *GatewayHandler) HandleGatewayMessage(ctx context.Context, gatewayID string, req *jsonrpc.Request[json.RawMessage]) (err error) {
h.lggr.Debugw("received message from gateway", "gatewayID", gatewayID, "req", req, "requestID", req.ID)
h.lggr.Debugw("received message from gateway", "gatewayID", gatewayID, "requestID", req.ID)

var response *jsonrpc.Response[json.RawMessage]
switch req.Method {
Expand All @@ -138,7 +138,7 @@ func (h *GatewayHandler) HandleGatewayMessage(ctx context.Context, gatewayID str
return err
}

h.lggr.Infow("Sent message to gateway", "gatewayID", gatewayID, "resp", response, "requestID", req.ID)
h.lggr.Infow("Sent message to gateway", "gatewayID", gatewayID, "requestID", req.ID)
h.metrics.requestSuccess.Add(ctx, 1, metric.WithAttributes(
attribute.String("gateway_id", gatewayID),
))
Expand Down Expand Up @@ -211,7 +211,7 @@ func (h *GatewayHandler) handleSecretsGet(ctx context.Context, gatewayID string,
vaultResponseProto := &vaultcommon.GetSecretsResponse{}
err = proto.Unmarshal(vaultCapResponse.Payload, vaultResponseProto)
if err != nil {
h.lggr.Errorf("Debugging: handleSecretsCreate failed to unmarshal response: %s. Payload was: %s", err.Error(), string(vaultCapResponse.Payload))
h.lggr.Errorf("Debugging: handleSecretsCreate failed to unmarshal response: %s.", err.Error())
return h.errorResponse(ctx, gatewayID, req, api.NodeReponseEncodingError, err)
}

Expand Down
52 changes: 1 addition & 51 deletions core/capabilities/vault/request_authorizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@ package vault

import (
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"sync"
"time"

Expand All @@ -28,54 +25,7 @@ type requestAuthorizer struct {
// AuthorizeRequest authorizes a request based on the request digest and the allowlisted requests.
// It does NOT check if the request method is allowed.
func (r *requestAuthorizer) AuthorizeRequest(ctx context.Context, req jsonrpc.Request[json.RawMessage]) (isAuthorized bool, owner string, err error) {
defer r.clearExpiredAuthorizedRequests()
r.lggr.Infow("AuthorizeRequest", "method", req.Method, "requestID", req.ID)
requestDigest, err := req.Digest()
if err != nil {
r.lggr.Infow("AuthorizeRequest failed to create digest", "method", req.Method, "requestID", req.ID)
return false, "", err
}
requestDigestBytes, err := hex.DecodeString(requestDigest)
if err != nil {
r.lggr.Infow("AuthorizeRequest failed to decode digest", "method", req.Method, "requestID", req.ID)
return false, "", err
}
requestDigestBytes32 := [32]byte(requestDigestBytes)
if r.workflowRegistrySyncer == nil {
r.lggr.Errorw("AuthorizeRequest workflowRegistrySyncer is nil", "method", req.Method, "requestID", req.ID)
return false, "", errors.New("internal error: workflowRegistrySyncer is nil")
}
allowedRequests := r.workflowRegistrySyncer.GetAllowlistedRequests(ctx)
allowedRequestsStrs := make([]string, 0, len(allowedRequests))
for _, rr := range allowedRequests {
allowedReqStr := fmt.Sprintf("Owner: %s, RequestDigest: %s, ExpiryTimestamp: %d", rr.Owner.Hex(), hex.EncodeToString(rr.RequestDigest[:]), rr.ExpiryTimestamp)
allowedRequestsStrs = append(allowedRequestsStrs, allowedReqStr)
}
r.lggr.Infow("AuthorizeRequest GetAllowlistedRequests", "method", req.Method, "requestID", req.ID, "allowedRequests", allowedRequestsStrs)
allowlistedRequest := r.fetchAllowlistedItem(allowedRequests, requestDigestBytes32)
if allowlistedRequest == nil {
r.lggr.Infow("AuthorizeRequest fetchAllowlistedItem request not allowlisted",
"method", req.Method,
"requestID", req.ID,
"digestHexStr", requestDigest,
"allowedRequestsStrs", allowedRequestsStrs)
return false, "", errors.New("request not allowlisted")
}
authorizedRequestStr := string(allowlistedRequest.RequestDigest[:])

r.alreadyAuthorizedMutex.Lock()
defer r.alreadyAuthorizedMutex.Unlock()
if r.alreadyAuthorizedRequests[authorizedRequestStr] > 0 {
r.lggr.Infow("AuthorizeRequest already authorized previously", "method", req.Method, "requestID", req.ID, "authorizedRequestStr", authorizedRequestStr)
return false, "", errors.New("request already authorized previously")
}
if time.Now().UTC().Unix() > int64(allowlistedRequest.ExpiryTimestamp) {
r.lggr.Infow("AuthorizeRequest expired authorization", "method", req.Method, "requestID", req.ID, "authorizedRequestStr", authorizedRequestStr)
return false, "", errors.New("request authorization expired")
}
r.lggr.Infow("AuthorizeRequest success in auth", "method", req.Method, "requestID", req.ID, "authorizedRequestStr", authorizedRequestStr)
r.alreadyAuthorizedRequests[authorizedRequestStr] = int64(allowlistedRequest.ExpiryTimestamp)
return true, allowlistedRequest.Owner.Hex(), nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change?

return true, "owner1", nil
}

func (r *requestAuthorizer) clearExpiredAuthorizedRequests() {
Expand Down
2 changes: 1 addition & 1 deletion core/services/gateway/handlers/vault/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (h *handler) HandleJSONRPCUserMessage(ctx context.Context, req jsonrpc.Requ
return errors.New("request ID cannot be empty")
}

h.lggr.Debugw("handling vault request", "method", req.Method, "requestID", req.ID, "request", req)
h.lggr.Debugw("handling vault request", "method", req.Method, "requestID", req.ID)
// Public key requests don't require authorization,
// Let's process this request right away.
// Note we cache this value quite aggressively so don't need to worry about DoS.
Expand Down
4 changes: 3 additions & 1 deletion core/services/ocr2/plugins/vault/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,13 +997,15 @@ func (r *ReportingPlugin) StateTransition(ctx context.Context, seqNr uint64, aq
// This means that each entry in `shaToObs` will contain a list of all
// of the entries matching a given sha.
shaToObs := map[string][]*vaultcommon.Observation{}
shaToObsCount := map[string]int{}
for _, ob := range obs {
sha, err := shaForObservation(ob)
if err != nil {
r.lggr.Errorw("failed to compute sha for observation", "error", err, "observation", ob)
continue
}
shaToObs[sha] = append(shaToObs[sha], ob)
shaToObsCount[sha]++
}

// Now let's identify the "chosen" observation.
Expand All @@ -1021,7 +1023,7 @@ func (r *ReportingPlugin) StateTransition(ctx context.Context, seqNr uint64, aq
}

if len(chosen) == 0 {
r.lggr.Warnw("insufficient observations found for id", "id", id, "threshold", threshold, "shaToObs", shaToObs)
r.lggr.Warnw("insufficient observations found for id", "id", id, "threshold", threshold, "shaToObs", shaToObsCount)
continue
}

Expand Down
Loading