Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

incoming message e2e tests #852

Draft
wants to merge 14 commits into
base: pro-wh/feature/inmsgs
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions client-sdk/go/client/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ func (tb *TransactionBuilder) AppendSign(ctx context.Context, signer signature.S
return tb.ts.AppendSign(rtInfo.ChainContext, signer)
}

//GetUnverifiedTransaction returns the underlying signed transaction.
func (tb *TransactionBuilder) GetUnverifiedTransaction() *types.UnverifiedTransaction {
return tb.ts.UnverifiedTransaction()
}

// SubmitTx submits a transaction to the runtime transaction scheduler and waits for transaction
// execution results.
func (tb *TransactionBuilder) SubmitTx(ctx context.Context, rsp interface{}) error {
Expand Down
14 changes: 10 additions & 4 deletions client-sdk/go/testing/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"golang.org/x/crypto/sha3"

coreSignature "github.com/oasisprotocol/oasis-core/go/common/crypto/signature"
memorySigner "github.com/oasisprotocol/oasis-core/go/common/crypto/signature/signers/memory"

"github.com/oasisprotocol/oasis-sdk/client-sdk/go/crypto/signature"
Expand All @@ -21,15 +22,20 @@ type TestKey struct {

// EthAddress is the corresponding Ethereum address if the key is secp256k1.
EthAddress [20]byte

// ConsensusSigner is the signer for the consensus API if the key is ed25519.
ConsensusSigner coreSignature.Signer
}

func newEd25519TestKey(seed string) TestKey {
signer := ed25519.WrapSigner(memorySigner.NewTestSigner(seed))
consensusSigner := memorySigner.NewTestSigner(seed)
signer := ed25519.WrapSigner(consensusSigner)
sigspec := types.NewSignatureAddressSpecEd25519(signer.Public().(ed25519.PublicKey))
return TestKey{
Signer: signer,
Address: types.NewAddress(sigspec),
SigSpec: sigspec,
Signer: signer,
Address: types.NewAddress(sigspec),
SigSpec: sigspec,
ConsensusSigner: consensusSigner,
}
}

Expand Down
32 changes: 32 additions & 0 deletions client-sdk/go/types/inmsg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package types

import (
"fmt"

"github.com/oasisprotocol/oasis-core/go/common/cbor"
)

// LatestIncomingMessageVersion is the latest incoming message format version.
const LatestIncomingMessageVersion = 1

type IncomingMessageData struct {
cbor.Versioned

// UnverifiedTransaction is an embedded transaction (UnverifiedTransaction).
// The transaction doesn't need to be from the same account that sent the message.
UnverifiedTransaction *[]byte `json:"ut"`
}

func (d *IncomingMessageData) ValidateBasic() error {
if d.V != LatestIncomingMessageVersion {
return fmt.Errorf("incoming message data: unsupported version")
}
return nil
}

func NoopIncomingMessageData() *IncomingMessageData {
return &IncomingMessageData{
Versioned: cbor.NewVersioned(LatestIncomingMessageVersion),
UnverifiedTransaction: nil,
}
}
1 change: 1 addition & 0 deletions runtime-sdk/modules/contracts/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -715,5 +715,6 @@ impl<Cfg: Config> module::MigrationHandler for Module<Cfg> {
}

impl<Cfg: Config> module::TransactionHandler for Module<Cfg> {}
impl<Cfg: Config> module::IncomingMessageHandler for Module<Cfg> {}
impl<Cfg: Config> module::BlockHandler for Module<Cfg> {}
impl<Cfg: Config> module::InvariantHandler for Module<Cfg> {}
2 changes: 2 additions & 0 deletions runtime-sdk/modules/evm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,8 @@ impl<Cfg: Config> module::TransactionHandler for Module<Cfg> {
}
}

impl<Cfg: Config> module::IncomingMessageHandler for Module<Cfg> {}

impl<Cfg: Config> module::BlockHandler for Module<Cfg> {
fn end_block<C: Context>(ctx: &mut C) {
// Update the list of historic block hashes.
Expand Down
20 changes: 20 additions & 0 deletions runtime-sdk/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,8 @@ pub struct RuntimeBatchContext<'a, R: runtime::Runtime, S: NestedStore> {
max_messages: u32,
/// Emitted messages.
messages: Vec<(roothash::Message, MessageEventHookInvocation)>,
/// Number of processed incoming messages.
in_msgs_processed: usize,

/// Per-context values.
values: BTreeMap<&'static str, Box<dyn Any>>,
Expand Down Expand Up @@ -306,6 +308,7 @@ impl<'a, R: runtime::Runtime, S: NestedStore> RuntimeBatchContext<'a, R, S> {
block_etags: EventTags::new(),
max_messages,
messages: Vec::new(),
in_msgs_processed: 0,
values: BTreeMap::new(),
_runtime: PhantomData,
}
Expand Down Expand Up @@ -337,10 +340,25 @@ impl<'a, R: runtime::Runtime, S: NestedStore> RuntimeBatchContext<'a, R, S> {
block_etags: EventTags::new(),
max_messages: ctx.max_messages,
messages: Vec::new(),
in_msgs_processed: 0,
values: BTreeMap::new(),
_runtime: PhantomData,
}
}

// Load how many roothash messages that this runtime handled in this round. This becomes valid
// after the dispatcher finishes executing incoming blocks, and it doesn't get updated during
// the execution of those incoming messages. The dispatcher calls this in order to report back
// to the node how many messages it got through.
pub fn get_in_msgs_processed(&self) -> usize {
self.in_msgs_processed
}

// Save how many roothash incoming messages that this runtime handled in this round. This is
// for the dispatcher to call, and modules shouldn't need to use this.
pub fn set_in_msgs_processed(&mut self, count: usize) {
self.in_msgs_processed = count;
}
}

impl<'a, R: runtime::Runtime, S: NestedStore> Context for RuntimeBatchContext<'a, R, S> {
Expand Down Expand Up @@ -465,6 +483,7 @@ impl<'a, R: runtime::Runtime, S: NestedStore> Context for RuntimeBatchContext<'a
_ => remaining_messages,
},
messages: Vec::new(),
in_msgs_processed: self.in_msgs_processed,
values: BTreeMap::new(),
_runtime: PhantomData,
};
Expand Down Expand Up @@ -694,6 +713,7 @@ impl<'round, 'store, R: runtime::Runtime, S: Store> Context
_ => remaining_messages,
},
messages: Vec::new(),
in_msgs_processed: 0,
values: BTreeMap::new(),
_runtime: PhantomData,
};
Expand Down
155 changes: 148 additions & 7 deletions runtime-sdk/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
};

use anyhow::anyhow;
use slog::error;
use slog::{error, warn};
use thiserror::Error;

use oasis_core_runtime::{
Expand All @@ -32,15 +32,18 @@ use crate::{
error::{Error as _, RuntimeError},
event::IntoTags,
keymanager::{KeyManagerClient, KeyManagerError},
module::{self, BlockHandler, MethodHandler, TransactionHandler},
module::{self, BlockHandler, IncomingMessageHandler, MethodHandler, TransactionHandler},
modules,
modules::core::API as _,
runtime::Runtime,
schedule_control::ScheduleControlHost,
storage,
storage::Prefix,
types,
types::transaction::{AuthProof, Transaction, TransactionWeight},
types::{
in_msg::IncomingMessageData,
transaction::{AuthProof, Transaction, TransactionWeight},
},
};

/// Unique module name.
Expand Down Expand Up @@ -162,6 +165,16 @@ impl<R: Runtime> Dispatcher<R> {
}
}

/// Decode a roothash incoming message's data field.
pub fn decode_in_msg(
in_msg: &roothash::IncomingMessage,
) -> Result<types::in_msg::IncomingMessageData, modules::core::Error> {
let data: types::in_msg::IncomingMessageData = cbor::from_slice(&in_msg.data)
.map_err(|e| modules::core::Error::MalformedIncomingMessageData(in_msg.id, e.into()))?;
data.validate_basic()?;
Ok(data)
}

/// Run the dispatch steps inside a transaction context. This includes the before call hooks,
/// the call itself and after call hooks. The after call hooks are called regardless if the call
/// succeeds or not.
Expand Down Expand Up @@ -321,6 +334,58 @@ impl<R: Runtime> Dispatcher<R> {
}
}

/// Execute the given roothash incoming message. This includes executing the embedded
/// transaction if there is one.
pub fn execute_in_msg<C: BatchContext>(
ctx: &mut C,
in_msg: &roothash::IncomingMessage,
data: &IncomingMessageData,
tx: &Option<Transaction>,
) -> Result<(), RuntimeError> {
warn!(ctx.get_logger("dispatcher"), "incoming message executing"; "id" => in_msg.id); // %%%
R::Modules::execute_in_msg(ctx, in_msg, data, tx)?;
warn!(ctx.get_logger("dispatcher"), "incoming message modules done"; "id" => in_msg.id); // %%%
if let Some(tx) = tx {
let tx_size = match data
.ut
.as_ref()
.unwrap_or_else(|| panic!("incoming message {} has tx but no ut", in_msg.id))
.len()
.try_into()
{
Ok(tx_size) => tx_size,
Err(err) => {
warn!(ctx.get_logger("dispatcher"), "incoming message transaction too large"; "id" => in_msg.id, "err" => ?err);
return Ok(());
}
};
// Use the ID as index.
let index = in_msg.id.try_into().unwrap();
// todo: put result tags in block
let result = Self::execute_tx(ctx, tx_size, tx.clone(), index)?;
let result_parsed = cbor::from_slice::<crate::types::transaction::CallResult>(&result.output).unwrap();
warn!(ctx.get_logger("dispatcher"), "incoming message transaction done"; "id" => in_msg.id, "result_parsed" => ?result_parsed); // %%%
} else {
warn!(ctx.get_logger("dispatcher"), "incoming message no transaction"; "id" => in_msg.id); // %%%
}
Ok(())
}

/// Prefetch prefixes for the given roothash incoming message. This includes prefetching the
/// prefixes for the embedded transaction if there is one.
pub fn prefetch_in_msg(
prefixes: &mut BTreeSet<Prefix>,
in_msg: &roothash::IncomingMessage,
data: &IncomingMessageData,
tx: &Option<Transaction>,
) -> Result<(), RuntimeError> {
R::Modules::prefetch_in_msg(prefixes, in_msg, data, tx)?;
if let Some(tx) = tx {
Self::prefetch_tx(prefixes, tx.clone())?;
}
Ok(())
}

fn handle_last_round_messages<C: Context>(ctx: &mut C) -> Result<(), modules::core::Error> {
let message_events = ctx.runtime_round_results().messages.clone();

Expand Down Expand Up @@ -455,6 +520,8 @@ impl<R: Runtime> Dispatcher<R> {
// Query block weight limits for next round.
let block_weight_limits = R::Modules::get_block_weight_limits(&mut ctx);

let in_msgs_count = ctx.get_in_msgs_processed();

// Commit the context and retrieve the emitted messages.
let (block_tags, messages) = ctx.commit();
let (messages, handlers) = messages.into_iter().unzip();
Expand All @@ -468,7 +535,7 @@ impl<R: Runtime> Dispatcher<R> {
block_tags: block_tags.into_tags(),
batch_weight_limits: Some(block_weight_limits),
tx_reject_hashes: vec![],
in_msgs_count: 0, // TODO: Support processing incoming messages.
in_msgs_count,
})
}
}
Expand All @@ -478,16 +545,30 @@ impl<R: Runtime + Send + Sync> transaction::dispatcher::Dispatcher for Dispatche
&self,
rt_ctx: transaction::Context<'_>,
batch: &TxnBatch,
_in_msgs: &[roothash::IncomingMessage],
in_msgs: &[roothash::IncomingMessage],
) -> Result<ExecuteBatchResult, RuntimeError> {
self.execute_batch_common(
rt_ctx,
|ctx| -> Result<Vec<ExecuteTxResult>, RuntimeError> {
// If prefetch limit is set enable prefetch.
let prefetch_enabled = R::PREFETCH_LIMIT > 0;

let mut txs = Vec::with_capacity(batch.len());
let mut prefixes: BTreeSet<Prefix> = BTreeSet::new();
let mut in_msgs_parsed = Vec::with_capacity(in_msgs.len());
for in_msg in in_msgs {
let data = Self::decode_in_msg(in_msg).unwrap_or_else(|err| {
warn!(ctx.get_logger("dispatcher"), "incoming message data malformed"; "id" => in_msg.id, "err" => ?err);
IncomingMessageData::noop()
});
let tx = data.ut.as_ref().and_then(|ut| Self::decode_tx(ctx, ut).map_err(|err| {
warn!(ctx.get_logger("dispatcher"), "incoming message transaction malformed"; "id" => in_msg.id, "err" => ?err);
}).ok());
if prefetch_enabled {
Self::prefetch_in_msg(&mut prefixes, in_msg, &data, &tx)?;
}
in_msgs_parsed.push((in_msg, data, tx));
}
let mut txs = Vec::with_capacity(batch.len());
for tx in batch.iter() {
let tx_size = tx.len().try_into().map_err(|_| {
Error::MalformedTransactionInBatch(anyhow!("transaction too large"))
Expand All @@ -510,6 +591,12 @@ impl<R: Runtime + Send + Sync> transaction::dispatcher::Dispatcher for Dispatche
.prefetch_prefixes(prefixes.into_iter().collect(), R::PREFETCH_LIMIT);
}

// Execute incoming messages.
for (in_msg, data, tx) in in_msgs_parsed {
Self::execute_in_msg(ctx, in_msg, &data, &tx)?;
}
ctx.set_in_msgs_processed(in_msgs.len());

// Execute the batch.
let mut results = Vec::with_capacity(batch.len());
for (index, (tx_size, tx)) in txs.into_iter().enumerate() {
Expand All @@ -525,14 +612,68 @@ impl<R: Runtime + Send + Sync> transaction::dispatcher::Dispatcher for Dispatche
&self,
rt_ctx: transaction::Context<'_>,
batch: &mut TxnBatch,
_in_msgs: &[roothash::IncomingMessage],
in_msgs: &[roothash::IncomingMessage],
) -> Result<ExecuteBatchResult, RuntimeError> {
let cfg = R::SCHEDULE_CONTROL.unwrap(); // Must succeed otherwise we wouldn't be here.
let mut tx_reject_hashes = Vec::new();

let mut result = self.execute_batch_common(
rt_ctx,
|ctx| -> Result<Vec<ExecuteTxResult>, RuntimeError> {
// Execute incoming messages.
let in_msgs_gas_limit = R::Core::remaining_in_msgs_gas(ctx);
let mut in_msgs_processed = 0usize;
for in_msg in in_msgs {
let data = Self::decode_in_msg(in_msg).unwrap_or_else(|err| {
warn!(ctx.get_logger("dispatcher"), "incoming message data malformed"; "id" => in_msg.id, "err" => ?err);
IncomingMessageData::noop()
});
let tx = match data.ut.as_ref() {
Some(ut) => {
match Self::decode_tx(ctx, ut) {
Ok(tx) => {
let remaining_gas = R::Core::remaining_in_msgs_gas(ctx);
if remaining_gas < cfg.min_remaining_gas {
// This next message has a transaction, but we won't have
// enough gas to execute it, so leave it for the next
// round and stop.
break;
} else if tx.auth_info.fee.gas > in_msgs_gas_limit {
// The transaction is too large to execute under our
// current parameters, so skip over it.
warn!(ctx.get_logger("dispatcher"), "incoming message transaction fee gas exceeds round gas limit";
"id" => in_msg.id,
"tx_gas" => tx.auth_info.fee.gas,
"in_msgs_gas_limit" => in_msgs_gas_limit,
);
// Actually don't skip the message entirely, just don't
// execute the transaction.
None
} else if tx.auth_info.fee.gas > remaining_gas {
// The transaction is too large to execute in this round,
// so leave it for the next round and stop.
break;
} else {
Some(tx)
}
}
Err(err) => {
warn!(ctx.get_logger("dispatcher"), "incoming message transaction malformed";
"id" => in_msg.id,
"err" => ?err,
);
None
}
}
}
None => None,
};

Self::execute_in_msg(ctx, in_msg, &data, &tx)?;
in_msgs_processed += 1;
}
ctx.set_in_msgs_processed(in_msgs_processed);

// Schedule and execute the batch.
//
// The idea is to keep scheduling transactions as long as we have some space
Expand Down
Loading