Skip to content

Conversation

@welf
Copy link
Contributor

@welf welf commented Mar 25, 2025

JSON-RPC Server Implementation Plan

Closes #3353

Stage 0: Preparation

  • Fix dead code warnings in existing tests.

Stage 1: Core Infrastructure & HTTP Server

This stage focuses on setting up the core JSON-RPC components, configuration, error handling, shared state, basic infrastructure (DB adapter, metrics, manual rate limiting), and implementing the HTTP server with initial RPC method handling.

1. Project Setup & Configuration

1.1. Project Setup

  • Create base directory structure and initial files/subdirectories for rusk::jsonrpc.

1.2. Core Configuration (jsonrpc::config)

  • Define core configuration structs (JsonRpcConfig, HttpServerConfig, WebSocketServerConfig, RateLimitConfig, CorsConfig, MethodRateLimit, RateLimit, FeatureToggles, SanitizationConfig) with necessary fields and default values.
  • Implement JsonRpcConfigBuilder with fluent methods.
  • Implement Default, Serialize, Deserialize traits for config structs.
  • Implement custom serde helpers for SocketAddr and Duration.
  • Define ConfigError enum using thiserror.

1.3. Configuration Loading (jsonrpc::config)

  • Implement JsonRpcConfig::load to handle defaults, TOML file parsing (from env var or argument), environment variable overrides, and validation.
  • Implement apply_env_overrides for environment variable parsing and application.
  • Implement helpers for locating the default config file (project_root, default_config_path).
  • Implement to_toml_string and to_file for config export.
  • Document configuration options externally (docs/jsonrpc_configuration.md).
  • Implement load_from_file_only for testing.
  • Add comprehensive tests for loading logic, precedence, overrides, formats, and errors.

1.4. Configuration Enhancements & Validation

  • Add structured tracing logs throughout configuration handling.
  • Implement JsonRpcConfig::validate with sanity checks, TLS file validation, and security checks (public binding, CORS, limits, sanitization).
  • Implement security helper methods (is_binding_publicly, etc.).
  • Implement JsonRpcConfig::test_config() helper.
  • Implement JsonRpcConfig::build_cors_layer() helper for tower_http integration.
  • Add comprehensive tests for validation and security checks.

2. Error Handling Adaptation (jsonrpc::error)

  • Define central jsonrpc::error::Error enum using thiserror with appropriate variants and From implementations for underlying infrastructure, service, config, and serialization errors.
  • Implement Error::into_error_object for conversion to jsonrpsee::types::ErrorObjectOwned, mapping internal errors to JSON-RPC codes.
  • Implement sanitize_message function with logic for term redaction, path sanitization, control character filtering, and truncation based on SanitizationConfig.
  • Add comprehensive tests for error conversion, sanitization logic, and Display implementations.

3. Core Infrastructure (jsonrpc::infrastructure)

3.0 Data Models (jsonrpc::model)

This phase focuses on defining the data structures used for JSON-RPC request parameters and response payloads, ensuring they align with the API specifications and have proper serialization/deserialization and conversions from/to internal node data types.

  • Model Files:
    • archive.rs:
      • ArchivedEvent
      • ContractEvent
      • MoonlightGroup
      • Order
    • block.rs:
      • BlockHeader
      • Block
      • BlockStatus
      • CandidateBlock
      • BlockLabel
      • ChainTip
      • BlockFaults
      • Fault
      • FaultType
      • FaultItem
    • chain.rs:
      • ChainStats
    • consensus.rs:
      • ValidationResult
      • QuorumType
      • VoteType
    • gas.rs:
      • GasPriceStats
    • key.rs:
      • AccountPublicKey
    • mempool.rs:
      • MempoolInfo
    • network.rs:
      • PeersMetrics
    • provisioner.rs:
      • ProvisionerInfo
      • StakeOwnerInfo
      • StakeInfo (alias for ProvisionerInfo)
    • transaction.rs:
      • BaseTransaction
      • TransactionType
      • TransactionStatusType
      • TransactionStatus
      • EstimateTransactionFeeResponse
      • MoonlightTransactionData
      • PhoenixTransactionData
      • TransactionDataType
      • TransactionResponse
      • MempoolTransaction
      • TransactionInfo
      • SimulationResult
      • SpendingIdentifier
      • SpendingIdType
    • serde_helper.rs: Provides serialization helpers.
    • contract.rs: Placeholder file.
    • prover.rs: Placeholder file.
    • subscription.rs: Placeholder file (Actual types in infrastructure/subscription/types.rs).
  • Documentation:
    • Ensure all implemented models and the parent model module have comprehensive documentation adhering to project standards (module, struct, field level comments, examples, doc tests with fully qualified paths).

3.1. Service & Infrastructure Errors (jsonrpc::service::error, jsonrpc::infrastructure::error)

  • Define base service::error::Error and infrastructure::error::Error enums using thiserror.
  • Define specific error enums (DbError, StateError, RateLimitError) and implement From for wrapping within infrastructure::error::Error.
  • Ensure jsonrpc::error::Error wraps these via From implementations.

3.2. Application State (jsonrpc::infrastructure::state)

3.2.1. Define AppState Struct (jsonrpc::infrastructure::state)
  • Defined AppState holding shared config and adapters (Arc<dyn Trait> for adapters, Arc<RwLock<...>> for SubscriptionManager).
3.2.2. Implement AppState Functionality
  • Implemented AppState::new constructor.
  • Implemented public delegating methods for adapter functionality.
  • Implemented public accessors for config, metrics, etc, but not for adapters.
  • Derived Clone, ensured Send + Sync.
3.2.3. Implement Tests for Application State (tests/jsonrpc/state.rs)
  • Added tests verifying creation, cloning, Send+Sync, accessors, adapter delegation, and Debug implementation using mock adapters and helpers.

3.3. Metrics Collection (jsonrpc::infrastructure::metrics)

  • Define core metric constants using the metrics crate (JSONRPC_REQUESTS_TOTAL, _DURATION_SECONDS, _ACTIVE_CONNECTIONS, _ACTIVE_SUBSCRIPTIONS).
  • Implement init_metrics_recorder using metrics-exporter-prometheus and once_cell for idempotent initialization.
  • Implement register_jsonrpc_metrics to describe and initialize metrics.
  • Implement basic tests for initialization and metric usage.

3.4. Manual Rate Limiting (jsonrpc::infrastructure::manual_limiter)

3.4.1. Structs and Types
  • Defined ManualRateLimiters struct, BaseLimiter type alias, and nested DashMap type aliases for storing per-client/per-pattern limiters.
3.4.2. Methods
  • Implemented ManualRateLimiters::new constructor with config validation, pattern compilation, and initialization.
  • Implemented check_method_limit for enforcing limits based on method patterns, including lazy creation of limiters.
  • Implemented rate_limit_to_quota helper for config validation.
3.4.3. Documentation
  • Added module and struct/method documentation with usage examples.
  • Integrated Arc<ManualRateLimiters> into AppState.
  • Added ManualMethodLimitExceeded variant to RateLimitError.
3.4.4. Tests
  • Implemented tests for constructor logic (success, disabled, invalid config).
  • Implemented tests for check_method_limit (enforcement, pattern matching, reset, disabled state, per-client/pattern isolation).

3.5. Database Adapter (jsonrpc::infrastructure::db)

  • Define DatabaseAdapter Trait:
    • Required Primitives:
      • get_block_by_hash
      • get_block_hash_by_height
      • get_block_header_by_hash
      • get_block_label_by_height
      • get_spent_transaction_by_hash
      • ledger_tx_exists
      • mempool_tx
      • mempool_tx_exists
      • mempool_txs_sorted_by_fee
      • mempool_txs_count
      • mempool_txs_ids_sorted_by_fee
      • mempool_txs_ids_sorted_by_low_fee
      • candidate
      • candidate_by_iteration
      • validation_result
      • metadata_op_read
      • metadata_op_write
      • get_block_status
      • validate_nullifiers
    • Default Methods:
      • get_block_height
      • get_block_transactions_by_hash
      • get_candidate_block_by_hash
      • get_block_faults_by_hash
      • get_latest_candidate_block
      • get_validation_result
      • get_latest_validation_result
      • get_transaction_status
      • get_mempool_transactions
      • get_mempool_transaction_by_hash
      • get_mempool_info
      • get_chain_stats
      • get_gas_price
      • suggest_transaction_fee
      • get_block_by_height
      • get_latest_block
      • get_blocks_range
      • get_blocks_by_hashes
      • get_latest_block_header
      • get_block_header_by_height
      • get_block_headers_range
      • get_block_headers_by_hashes
      • get_block_timestamp_by_hash
      • get_block_timestamp_by_height
      • get_block_transactions_by_height
      • get_block_faults_by_height
      • get_latest_block_label
      • get_transaction_by_hash
      • get_transactions_batch
      • get_mempool_transactions_count
      • get_latest_blocks
      • get_blocks_count
      • get_block_pair
      • get_block_transaction_range_by_hash
      • get_last_block_transactions_by_height
      • get_block_transaction_range_by_height
  • Define RuskDbAdapter Implementation: Defined struct holding Arc<RwLock<Backend>>.
  • Implement DatabaseAdapter Trait for RuskDbAdapter: Implement all required primitive methods using spawn_blocking and node::database components. Relies on default methods for higher-level API.

3.6 Archive Adapter (jsonrpc::infrastructure::archive)

  • Define ArchiveAdapter Trait:
    • Required Primitives:
      • get_moonlight_txs_by_memo
      • get_last_archived_block (Returns height and hash)
      • get_block_events_by_hash
      • get_block_events_by_height
      • get_latest_block_events
      • get_contract_finalized_events
      • get_next_block_with_phoenix_transaction
      • get_moonlight_transaction_history (Handles AccountPublicKey parsing from bs58 string)
    • Default Methods:
      • get_last_archived_block_height (Derived from get_last_archived_block)
      • get_contract_events (Alias for get_contract_finalized_events)
      • get_contract_events_by_topic (Filters get_contract_finalized_events)
      • get_contract_events_by_block_height (Filters get_block_events_by_height)
      • get_contract_events_by_block_hash (Filters get_block_events_by_hash)
      • get_contract_transactions (Alias for get_contract_events)
      • get_contract_transactions_by_block_height (Alias for get_contract_events_by_block_height)
      • get_contract_transactions_by_block_hash (Alias for get_contract_events_by_block_hash)
      • get_item_added_events (Filters get_contract_events_by_topic)
      • get_item_removed_events (Filters get_contract_events_by_topic)
      • get_item_modified_events (Filters get_contract_events_by_topic)
      • sync_contract (Method likely not needed on adapter, potentially AppState level)
      • get_stake_events (Filters get_contract_events_by_topic)
      • get_unstake_events (Filters get_contract_events_by_topic)
      • get_slash_events (Filters get_contract_events_by_topic)
      • get_hard_slash_events (Filters get_contract_events_by_topic)
      • get_provisioner_changes (Filters get_contract_events_by_topic)
      • get_transfer_events (Filters get_contract_events_by_topic)
      • get_deposit_events (Filters get_contract_events_by_topic)
      • get_withdraw_events (Filters get_contract_events_by_topic)
      • get_convert_events (Filters get_contract_events_by_topic)
  • Define RuskArchiveAdapter Implementation: Defined struct holding Arc<node::archive::Archive>.
  • Implement ArchiveAdapter Trait for RuskArchiveAdapter:
    • Implement all required primitive methods using spawn_blocking (if needed) and node::archive::Archive methods.
    • Ensure correct data type conversions (e.g., using From/TryFrom).
    • Handle AccountPublicKey parsing for get_moonlight_transaction_history.
    • Implement necessary error mapping to infrastructure::error::Error.

3.7 Network Adapter (jsonrpc::infrastructure::network)

  • Define NetworkAdapter Trait
    • Required Primitives:
      • broadcast_transaction
      • get_network_info
      • get_public_address
      • get_alive_peers
      • get_alive_peers_count
      • flood_request
      • get_network_peers_location (Note: Method not yet defined in trait)
    • Default Methods:
      • get_peers_metrics
  • Define RuskNetworkAdapter Implementation: Defined struct holding Arc<RwLock<N: Network>>.
  • Implement NetworkAdapter Trait for RuskNetworkAdapter:
    • broadcast_transaction
    • get_network_info
    • get_public_address
    • get_alive_peers
    • get_alive_peers_count
    • flood_request
    • get_network_peers_location

3.8 VM Adapter (jsonrpc::infrastructure::vm)

  • Define VmAdapter Trait:
    • Required Primitives:
      • simulate_transaction
      • preverify_transaction
      • get_chain_id
      • get_account_data
      • get_state_root
      • get_block_gas_limit
      • get_provisioners
      • get_stake_info_by_pk
      • query_contract_raw
      • get_vm_config
    • Default Methods:
      • get_account_balance
      • get_account_nonce
      • get_stake_data_by_pk
      • get_all_stake_data
      • get_provisioner_info_by_pk
  • Define RuskVmAdapter Implementation: Defined struct holding Arc<RwLock<NodeRusk>>.
  • Implement VmAdapter Trait for RuskVmAdapter:
    • simulate_transaction
    • preverify_transaction
    • get_chain_id
    • get_account_data
    • get_state_root
    • get_block_gas_limit
    • get_provisioners
    • get_stake_info_by_pk
    • query_contract_raw
    • get_vm_config

3.9 AppState Refactoring

  • Expose Adapter Methods: Expose individual adapter methods directly on AppState (e.g., app_state.get_block_by_height(...) instead of app_state.db_adapter().get_block_by_height(...)) for a more user-friendly API.
  • Implement Methods That Combine Calls to Multiple Adapters: Implement methods on AppState that combine calls to multiple underlying adapters:
    • deployContract (Uses VmAdapter, NetworkAdapter)
    • getTransactionReceiptByHash (Uses DatabaseAdapter, ArchiveAdapter)
    • getNetworkStatus (Uses AppState::get_alive_peers_count, AppState::get_block_height, and AppState::get_chain_id)
    • getNodeInfo (Uses AppState::get_public_address, AppState::get_bootstrapping_nodes, AppState::get_vm_config and AppState::get_chain_id)
    • estimateTransactionFee (Uses VmAdapter, DatabaseAdapter)
    • getTransactionsByAddress (Uses DatabaseAdapter, ArchiveAdapter)

4. HTTP Server Setup & Execution (jsonrpc::server)

4.1. HTTP Server Initialization

  • Implement run_server function using axum and axum-server.
  • Instantiate necessary adapters (RuskDbAdapter, etc.) and AppState during node startup (rusk::Builder::build_and_run), passing AppState to run_server.
  • Configure HTTPS listener based on JsonRpcConfig.http.
  • Load TLS certificates/keys using rustls and rustls-pemfile (handled by load_tls_config helper).
  • Set up basic /health route.

4.2. Integrate tower-governor Layer for HTTP

  • Integrate Tower Middleware Layers into axum Router using .layer().
  • Integrate tower-governor: Create GovernorLayer from JsonRpcConfig.rate_limit.default_limit and add to router.
  • Integrate CORS: Use JsonRpcConfig::build_cors_layer() helper and add CorsLayer to router.
  • Ensure router uses into_make_service_with_connect_info::<SocketAddr>. // Added check

4.3. Implement Rate Limit Error Handling for Layer

  • Note: Specific middleware for catching tower_governor::GovernorError was not added; the layer handles sending 429 responses directly.
  • Set up graceful shutdown using axum_server::Handle and tokio::signal::ctrl_c.
  • Update Rusk binary to initialize AppState and call run_server.

4.4. Define RPC Method Traits (Server-side)

  • Defined initial RuskInfoRpcServer trait using #[rpc(server)].
  • Added module documentation and example method signature.

4.5. Implement RPC Method Dispatching for HTTP

  • Implemented RuskInfoRpcImpl holding Arc<AppState>.
  • Merged implementation into RpcModule using merge().
  • Implemented manual dispatch via rpc_handler using RpcModule::raw_json_request.
  • Integrated handler into axum router using .route("/rpc", post(rpc_handler)).with_state(rpc_module).
  • Updated documentation for run_server and rpc_methods.

5. Implement HTTP Services

Implement the actual logic for the HTTP-based RPC methods defined in Stage 1.4.4.

5.1. Implement Block Service (jsonrpc::service::block)

  • Block Query Methods:
    • getBlockByHash (Uses AppState::get_block_by_hash, potentially AppState::get_block_transactions_by_hash)
    • getBlockByHeight (Uses AppState::get_block_by_height, potentially AppState::get_block_transactions_by_hash)
    • getLatestBlock (Uses AppState::get_latest_block, potentially AppState::get_block_transactions_by_hash)
  • Block Range/Count Methods:
    • getBlockRange (Uses AppState::get_blocks_range, potentially transaction methods)
    • getLatestBlocks (Uses AppState::get_latest_blocks, potentially transaction methods)
    • getBlocksCount (Uses AppState::get_blocks_count)
    • getBlockPair (Uses AppState::get_block_pair, potentially transaction methods)
  • Block Status Methods:
    • getBlockStatus (Uses AppState::get_block_label_by_height or similar)
  • Block Event Methods (Archive):
    • getBlockEventsByHash (Uses AppState::get_block_events_by_hash)
    • getBlockEventsByHeight (Uses AppState::get_block_events_by_height)
    • getLatestBlockEvents (Uses AppState::get_latest_block_events)
  • Block Transaction Methods:
    • getBlockTransactionsByHash (Uses AppState::get_block_transactions_by_hash)
    • getBlockTransactionRangeByHash (Uses AppState::get_block_transaction_range_by_hash)
    • getLastBlockTransactionsByHash (Uses AppState::get_last_block_transactions_by_hash - adapter method TBD, currently block_txs only)
    • getBlockTransactionsByHeight (Uses AppState::get_block_transactions_by_height)
    • getBlockTransactionRangeByHeight (Uses AppState::get_block_transaction_range_by_height)
    • getLastBlockTransactionsByHeight (Uses AppState::get_last_block_transactions_by_height)
  • Specialized Methods:
    • getNextBlockWithPhoenixTransaction (Uses AppState::get_next_block_with_phoenix_transaction)
  • Gas Price Methods:
    • getGasPrice (Uses AppState::get_gas_price)

5.2. Implement Transaction Service (jsonrpc::service::transaction)

  • Transaction Query Methods:
    • getTransactionByHash (Uses AppState::get_transaction_by_hash)
    • getTransactionStatus (Uses AppState::get_transaction_status)
    • getTransactionsByAddress (Uses AppState::get_transactions_by_address - adapter/AppState method TBD)
    • getTransactionsBatch (Uses AppState::get_transactions_batch)
  • Transaction Receipt Methods:
    • getTransactionReceiptByHash (Uses AppState::get_transaction_receipt_by_hash - adapter/AppState method TBD)
  • Memo-based Query Methods (Archive):
    • findTransactionsByMemo (Uses AppState::get_moonlight_txs_by_memo)
    • getMoonlightTransactionsByMemo (Uses AppState::get_moonlight_txs_by_memo)
  • Mempool Query Methods:
    • getMempoolTransactions (Uses AppState::get_mempool_transactions)
    • getMempoolTransactionByHash (Uses AppState::get_mempool_transaction_by_hash)
    • getMempoolInfo (Uses AppState::get_mempool_info)
    • getMempoolTransactionsCount (Uses AppState::get_mempool_transactions_count)
  • Transaction Processing Methods:
    • simulateTransaction (Uses AppState::simulate_transaction)
    • sendTransaction (Uses AppState::broadcast_transaction, potentially AppState::preverify_transaction first)
    • estimateTransactionFee (Uses AppState::estimate_transaction_fee - adapter/AppState method TBD)
    • decodeRawTransaction (Pure deserialization, no adapter needed)
  • Gas Price Methods:
    • suggestTransactionFee (Uses AppState::suggest_transaction_fee)

5.3. Implement Contract Service (jsonrpc::service::contract)

  • Contract Execution Methods:
    • callContract (Uses AppState::query_contract_raw - intended for read-only, might need dedicated write method on AppState/VMAdapter)
    • queryContract (Uses AppState::query_contract_raw)
    • deployContract (Uses AppState::deploy_contract - adapter/AppState method TBD)
  • Transfer Contract Methods:
    • getAccountBalance (Uses AppState::get_account_balance)
    • getAccountNonce (Uses AppState::get_account_nonce)
    • validateNullifiers (Uses AppState::validate_nullifiers - adapter method TBD)
    • getMoonlightTransactionHistory (Uses AppState::get_moonlight_transaction_history)
    • getMoonlightTransactionsByMemo (Uses AppState::get_moonlight_txs_by_memo - already covered in Transaction Service)
    • getTransferEvents (Uses AppState::get_transfer_events)
    • getDepositEvents (Uses AppState::get_deposit_events)
    • getWithdrawEvents (Uses AppState::get_withdraw_events)
    • getConvertEvents (Uses AppState::get_convert_events)
  • Stake Contract Methods:
    • getProvisioners (Uses AppState::get_provisioners)
    • getProvisionerInfo (Uses AppState::get_provisioner_info_by_pk)
    • getStakeEvents (Uses AppState::get_stake_events)
    • getUnstakeEvents (Uses AppState::get_unstake_events)
    • getSlashEvents (Uses AppState::get_slash_events)
    • getHardSlashEvents (Uses AppState::get_hard_slash_events)
    • getProvisionerChanges (Uses AppState::get_provisioner_changes)
  • Generic Contract Event/Transaction Methods (Archive):
    • getContractEvents (Uses AppState::get_contract_events)
    • getContractEventsByTopic (Uses AppState::get_contract_events_by_topic)
    • getContractEventsByBlockHeight (Uses AppState::get_contract_events_by_block_height)
    • getContractEventsByBlockHash (Uses AppState::get_contract_events_by_block_hash)
    • getContractFinalizedEvents (Uses AppState::get_contract_finalized_events)
    • getContractTransactions (Uses AppState::get_contract_transactions)
    • getContractTransactionsByBlockHeight (Uses AppState::get_contract_transactions_by_block_height)
    • getContractTransactionsByBlockHash (Uses AppState::get_contract_transactions_by_block_hash)
  • Item Event Methods (Archive):
    • getItemAddedEvents (Uses AppState::get_item_added_events)
    • getItemRemovedEvents (Uses AppState::get_item_removed_events)
    • getItemModifiedEvents (Uses AppState::get_item_modified_events)
  • Other Contract Methods:
    • syncContract (Uses AppState::sync_contract - adapter method TBD)
    • getNextBlockWithPhoenixTransaction (Uses AppState::get_next_block_with_phoenix_transaction)

5.4. Implement Network Service (jsonrpc::service::network)

  • Node Information Methods:
    • getNodeInfo (Uses AppState::get_vm_config, AppState::get_chain_id, AppState::get_public_address - needs combined AppState method)
    • getNodeVersion (Reads constants, no adapter needed)
    • getChainId (Uses AppState::get_chain_id)
  • Network Status Methods:
    • getNetworkStatus (Uses AppState::get_alive_peers_count, AppState::get_block_height, AppState::get_chain_id - needs combined AppState method)
  • Network Peer Methods:
    • getNetworkPeers (Uses AppState::get_alive_peers)
    • getNetworkPeersLocation (Uses AppState::get_network_peers_location - adapter method TBD)

5.5. Implement Prover Service (jsonrpc::service::prover)

  • Proof Generation Methods:
    • prove (Uses external prover interaction - adapter TBD)
  • Proof Verification Methods:
    • verifyProof (Uses external prover interaction - adapter TBD)

6. Testing Infrastructure & Stage 1 Tests (tests/jsonrpc)

6.1. Create Testing Infrastructure

  • Set up test utilities (tests/jsonrpc/utils.rs) including mock adapters (MockDbAdapter, etc.) and AppState creation helpers (create_test_app_state, create_custom_app_state).
  • Added helper for getting ephemeral ports (get_ephemeral_port).
  • Used reqwest for sending HTTP requests in integration tests.

6.2. Implement Core Infrastructure Tests

  • Tested config loading, validation, errors, and overrides.
  • Tested error mapping and sanitization.
  • Tested basic server startup (/health), CORS, tower-governor rate limiting, and basic RPC call (rusk_getNodeInfo) via HTTP integration tests.

6.3. Implement RuskDbAdapter Integration Tests

  • Added tests/jsonrpc/db_adapter.rs (feature-gated chain).
  • Added utility for setting up temporary RocksDB instance.
  • Added tests verifying get_block_by_height success and NotFound cases.

6.4. Implement HTTP Service Integration Tests

  • Added integration test for rusk_getNodeInfo using reqwest (test_rpc_method_via_server).

7. Additional Tasks / Refinements for Stage 1

  • Logging: Add structured logging (e.g., tracing::error!) in RuskDbAdapter error handling paths.
  • Linter Errors: Manually investigate and resolve persistent linter errors, particularly in rusk/src/lib/jsonrpc/infrastructure/db.rs if DbError::InternalError was replaced.



Stage 2: WebSocket and Subscription Implementation

This stage focuses on adding WebSocket support, implementing the chosen subscription management strategy, defining and implementing subscription-related RPC methods, and adding WebSocket-specific testing.

1. Subscription Management Infrastructure (jsonrpc::infrastructure::subscription)

This section details the setup required for handling subscriptions, including types, errors, filters, and the core manager logic (presented with alternatives).

1.1. Core Subscription Types (types.rs, error.rs)

1.1.1. Define subscription error types
  • Create error.rs with SubscriptionError enum with variants:
    • InvalidTopic - When topic is not supported/valid
    • InvalidSubscription - When subscription ID doesn't exist/is invalid
    • InvalidSubscriptionIdFormat - When subscription ID is not a valid UUID
    • InvalidFilter - When filter configuration is invalid
    • SessionNotFound - When session ID doesn't exist
    • PublishFailed - When event delivery fails
    • ChannelClosed - When event channel is closed
    • TopicClosed - When a topic has been closed
    • TooManySubscriptions - When subscription limits are exceeded
  • Document each error variant and its purpose with examples and doc tests
  • Add comprehensive documentation to the module with examples and doc tests
1.1.2. Create Topic enum
  • Define in types.rs with variants matching JSON-RPC methods from JSON‐RPC Websocket API subscription methods:
    • BlockAcceptance - For subscribeBlockAcceptance
    • BlockFinalization - For subscribeBlockFinalization
    • ChainReorganization - For subscribeChainReorganization
    • ContractEvents - For subscribeContractEvents
    • ContractTransferEvents - For subscribeContractTransferEvents
    • MempoolAcceptance - For subscribeMempoolAcceptance
    • MempoolEvents - For subscribeMempoolEvents
  • Implement Serialize/Deserialize, Debug, Clone, FromStr, Display
  • Document each topic type and its purpose with examples and doc tests
  • Add comprehensive documentation to the module with examples and doc tests
  • Add tests for the Topic enum, serialize/deserialize, debug, Display, FromStr, and Topic::as_str method
1.1.3. Create SubscriptionId type
  • Create UUID-based newtype in types.rs.
  • Implement generation, validation, and formatting methods
  • Add conversion from/to strings for JSON-RPC compatibility
  • Add tests for SubscriptionId generation, Display, FromStr, Serde
1.1.4. Create SessionId type
  • Define newtype wrapper for client session IDs in types.rs
  • Add validation methods
  • Document relationship with WebSocket connections
  • Implement necessary traits (Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Display, FromStr)
  • Add tests for creation, validation, Display, FromStr, Serde, Debug, Hash
1.1.5. Create subscription parameters types
  • BlockSubscriptionParams with include_txs: Option<bool>
  • ContractSubscriptionParams with fields:
    • contract_id: String
    • event_names: Option<Vec<String>>
    • include_metadata: Option<bool>
    • min_amount: Option<String> (for transfer events)
  • MempoolSubscriptionParams with fields:
    • contract_id: Option<String>
    • include_details: Option<bool>
  • Add comprehensive documentation to all types and their fields with examples and doc tests
  • Add state-builder pattern to ContractSubscriptionParams and the regular builder pattern to the rest of the types
  • Add tests for the builder pattern, serialize/deserialize, Debug, and all methods
1.1.6. Document types module
  • Document all types, traits, and their fields and methods with examples and doc tests
  • Add comprehensive documentation to the module with examples and doc tests

1.2. Filters Implementation (filters/)

1.2.1. Design Filter trait
  • Create base Filter trait in filter.rs
  • Define matches(&self, event: &dyn Any) -> bool method
  • Add trait bounds: Send + Sync + 'static
  • Document extension patterns with examples and doc tests
1.2.2. Implement BlockFilter
  • Create BlockFilter struct in block_filter.rs
  • Add field for include_txs: bool
  • Implement Filter trait for block events
  • Add builder pattern for construction
  • Document the filter implementation and all its methods and fields with examples and doc tests
  • Document the builder pattern and all its methods and fields with examples and doc tests
  • Add tests for the filter implementation and all its methods and fields
1.2.3. Implement ContractFilter
  • Create ContractFilter struct in contract_filter.rs
  • Add fields:
    • contract_id: String
    • event_names: Option<Vec<String>>
    • include_metadata: bool
  • Implement Filter trait for contract events
  • Add builder pattern for construction
  • Document the filter implementation and all its methods and fields with examples and doc tests
  • Document the builder pattern and all its methods and fields with examples and doc tests
  • Add tests for the filter implementation and all its methods and fields
1.2.4. Implement TransferFilter
  • Extend ContractFilter for transfer events in transfer_filter.rs
  • Add field for min_amount: Option<u64>
  • Implement specialized filtering for transfer events
  • Add builder pattern
  • Document the filter implementation and all its methods and fields with examples and doc tests
  • Document the builder pattern and all its methods and fields with examples and doc tests
  • Add tests for the filter implementation and all its methods and fields
1.2.5. Implement MempoolFilter
  • Create MempoolFilter struct in mempool_filter.rs
  • Add fields:
    • contract_id: Option<String>
    • include_details: bool
  • Implement Filter trait for mempool events
  • Add builder pattern
  • Document the filter implementation and all its methods and fields with examples and doc tests
  • Document the builder pattern and all its methods and fields with examples and doc tests
  • Add tests for the filter implementation and all its methods and fields
1.2.6. Document filters module and all its submodules
  • Document all types, traits, and their fields and methods with examples and doc tests
  • Add comprehensive module-level documentation to the filters module and all its submodules with examples and doc tests

1.3. Subscription Manager Implementation (manager.rs) - Alternative Variants

Choose one of the following variants to implement.

1.3.1. Variant A: Dual Server / Parse Legacy RuesEvent

Assumption: The new JSON-RPC server will run simultaneously with the legacy HTTP server for a transition period. Event producers (Rusk VM, ChainEventStreamer) will continue to emit events in the existing RuesEvent format, and the legacy server requires this format. The new JSON-RPC server must adapt to this existing format.

1.3.1.1. Relocate RuesEvent Definition (Prerequisite)
  • Task: Identify and copy the definitions of RuesEvent, RuesEventUri, DataType, and any other necessary dependent types from the legacy rusk::lib::http::event module.
  • Task: Place these copied definitions into a stable, shared location accessible by both event producers and the new jsonrpc module (e.g., node-data crate or a new rusk-events-common crate). Let's refer to the copied type as RelocatedRuesEvent.
  • Task: Crucially, update the use statements in all event producer modules (Rusk, ChainEventStreamer, etc.) to import RelocatedRuesEvent from its new location. This ensures producers compile and function correctly before the http module is removed.
  • Task: Ensure the broadcast channel created in RuskNodeBuilder uses the relocated type: broadcast::channel::<RelocatedRuesEvent>().
1.3.1.2. Create SubscriptionManager skeleton (Variant A)
  • Task: Define the basic SubscriptionManager struct in rusk::lib::jsonrpc::infrastructure::subscription::manager.rs.
  • Task: Include the dual registry fields using appropriate collections (e.g., DashMap or RwLock<HashMap> depending on concurrency needs):
    • topic_subscriptions: Map from internal Topic to a collection of SubscriptionId -> SubscriptionSink.
    • session_subscriptions: Map from SessionId to a collection of associated (Topic, SubscriptionId) tuples.
  • Task: Add filter storage: Map from SubscriptionId to Box<dyn Filter>.
  • Task: Add subscription statistics tracking: Map from SubscriptionId to SubscriptionStats.
  • Task: Implement the SubscriptionManager::new constructor:
    • Accepts broadcast::Receiver<RelocatedRuesEvent> (the receiver for the channel carrying the copied/relocated event type).
    • Spawns the background event processing task (detailed in 1.3.1.5).
1.3.1.3. Implement thread safety (Variant A)
  • Task: Wrap all shared internal collections (topic_subscriptions, session_subscriptions, filters, subscription_stats) in appropriate concurrent primitives (e.g., Arc<DashMap<...>> or Arc<RwLock<HashMap<...>>>) to allow safe access from multiple threads (RPC handler threads adding/removing subscriptions, background task reading subscriptions).
  • Task: Analyze and document the locking strategy to prevent deadlocks, especially between adding/removing subscriptions and the background task iterating over them.
  • Task: Ensure SubscriptionManager itself derives Clone and is Send + Sync.
  • Task: Document the thread-safety guarantees and expected concurrent usage patterns.
1.3.1.4. Define Internal SystemEvent Structures
  • Task: Define a structured SystemEvent enum within the rusk::lib::jsonrpc::infrastructure::subscription module (or a submodule).
  • Task: Define variants corresponding to the logical event types relevant to subscriptions (e.g., BlockFinalized, ContractEventEmitted, MempoolTxAccepted). Each variant should hold the corresponding structured Rust data (e.g., BlockHeader, ContractTxEvent, Transaction). Import these underlying data structures from appropriate crates (e.g., node-data).
  • Task: Define a try_parse_relocated_rues_event(event: RelocatedRuesEvent) -> Result<(Topic, SystemEvent), ParseError> function (or similar). This function is the core adaptation layer:
    • It inspects event.uri (based on RUES specification: /on/[target]/[topic]) to determine the event type and map it to the internal Topic enum.
    • It deserializes the event.data (likely DataType::Binary) into the appropriate structured Rust type (e.g., BlockHeader, ContractTxEvent) based on the URI.
    • It constructs and returns the corresponding SystemEvent variant containing the deserialized structured data.
  • Task: Define a suitable ParseError enum to handle errors during URI parsing or data deserialization within try_parse_relocated_rues_event.
1.3.1.5. Implement background task for event processing (Variant A)
  • Task: Create a dedicated Tokio task spawned within the SubscriptionManager::new constructor.
  • Task: Core Event Loop:
    • The task continuously loops, receiving RelocatedRuesEvent messages from the broadcast receiver passed during construction.
    • Handle potential broadcast::error::RecvError::Lagged errors gracefully (log a warning and continue).
  • Task: Event Processing Steps: Upon receiving a RelocatedRuesEvent:
    1. Parse & Deserialize: Call try_parse_relocated_rues_event to convert the raw event into Result<(Topic, SystemEvent), ParseError>. Log ParseError and skip the event if conversion fails.
    2. Identify Subscribers: On Ok((topic, system_event)), acquire necessary locks and look up all (SubscriptionId, SubscriptionSink) pairs associated with the topic in the topic_subscriptions registry.
    3. Apply Filters: For each potential subscriber (subscription_id, sink):
      • Look up the associated Filter in the filters registry using subscription_id.
      • If a filter exists, call its matches(&system_event_data) method, passing a reference to the structured data contained within the system_event variant (e.g., &block_header). Use &dyn Any for the filter to downcast.
      • Proceed to the next step only if there is no filter OR the filter's matches method returns true.
    4. Format JSON Payload: Using the structured data within the system_event, construct the JSON-RPC notification payload expected by the client for this subscription type. This involves serializing the structured data into the appropriate JSON format according to the API specification.
    5. Send to Sink: Create the jsonrpsee::SubscriptionMessage containing the formatted JSON payload and attempt to send it asynchronously using SubscriptionSink::send().
    6. Handle Sink Errors: Monitor the result of sink.send(). If an error occurs (e.g., Disconnected), log the error, potentially increment failure counters in SubscriptionStats, and implement logic for potential subscription cleanup (see Task 1.6).
    7. Update Statistics: Increment relevant counters in SubscriptionStats for the subscription_id (e.g., events_processed, events_dropped_buffer on sink errors).
  • Task: Implement graceful shutdown handling for the background task.
  • Task: Add detailed tracing logs for the task lifecycle, event reception, parsing success/failure, filtering decisions, sink errors, and shutdown.
1.3.1.6. Document manager module (Variant A)
  • Task: Write comprehensive documentation for the SubscriptionManager struct, explaining its role, the dual-registry mechanism, and its interaction with the background task.
  • Task: Document the SystemEvent enum, the try_parse_relocated_rues_event function, and the ParseError type.
  • Task: Clearly document the thread-safety model and locking strategy.
  • Task: Provide usage examples relevant to the jsonrpsee context.
1.3.2. Variant B: Hard Cutover / Use NodeBroadcastEvent

Assumption: The legacy HTTP server will be completely removed when the new JSON-RPC server is deployed. Event producers can be modified to emit a new, structured event type directly, eliminating the need for the legacy RuesEvent format entirely.

1.3.2.1. Define NodeBroadcastEvent & Refactor Producers (Prerequisite)
  • Task: Define a new NodeBroadcastEvent enum in a stable, shared location (e.g., node-data or rusk-events-common). This enum should have variants corresponding to logical event types, holding the relevant structured Rust data directly (e.g., BlockHeader, ContractTxEvent). Ensure it derives Clone and Debug.
  • Task: Refactor Event Producers: Modify all event producer modules (Rusk, ChainEventStreamer, etc.):
    • Remove all dependencies on the legacy RuesEvent (from http::event or the relocated copy).
    • Implement simple impl From<RawNodeStructure> for NodeBroadcastEvent for relevant raw types (e.g., BlockHeader, ContractTxEvent). These implementations should directly wrap the data into the enum variant without any serialization. Remember to handle cloning if the raw structure is needed elsewhere.
    • Update the event sending logic to create and send NodeBroadcastEvent instead of RuesEvent.
  • Task: Update the broadcast channel creation in RuskNodeBuilder to use the new type: broadcast::channel::<NodeBroadcastEvent>(). Ensure the sender/receiver are passed correctly.
1.3.2.2. Create SubscriptionManager skeleton (Variant B)
  • Task: Define the basic SubscriptionManager struct in rusk::lib::jsonrpc::infrastructure::subscription::manager.rs.
  • Task: Include the dual registry fields using appropriate collections (e.g., DashMap or RwLock<HashMap>):
    • topic_subscriptions: Map from internal Topic to a collection of SubscriptionId -> SubscriptionSink.
    • session_subscriptions: Map from SessionId to a collection of associated (Topic, SubscriptionId) tuples.
  • Task: Add filter storage: Map from SubscriptionId to Box<dyn Filter>.
  • Task: Add subscription statistics tracking: Map from SubscriptionId to SubscriptionStats.
  • Task: Implement the SubscriptionManager::new constructor:
    • Accepts broadcast::Receiver<NodeBroadcastEvent> (the receiver for the channel carrying the new structured event type).
    • Spawns the background event processing task (detailed in 1.3.2.5).
1.3.2.3. Implement thread safety (Variant B)
  • Task: Wrap all shared internal collections (topic_subscriptions, session_subscriptions, filters, subscription_stats) in appropriate concurrent primitives (e.g., Arc<DashMap<...>> or Arc<RwLock<HashMap<...>>>) for safe multi-threaded access.
  • Task: Analyze and document the locking strategy to prevent deadlocks.
  • Task: Ensure SubscriptionManager itself derives Clone and is Send + Sync.
  • Task: Document the thread-safety guarantees and expected concurrent usage patterns.
1.3.2.4. Map NodeBroadcastEvent to Internal Topic
  • Task: Implement a mechanism (e.g., a function get_topic_for_event(event: &NodeBroadcastEvent) -> Option<Topic>) within the subscription module to map the received NodeBroadcastEvent variant to the corresponding internal Topic enum used for routing subscriptions. This mapping should be straightforward based on the enum variant.
1.3.2.5. Implement background task for event processing (Variant B)
  • Task: Create a dedicated Tokio task spawned within the SubscriptionManager::new constructor.
  • Task: Core Event Loop:
    • The task continuously loops, receiving NodeBroadcastEvent messages from the broadcast receiver passed during construction.
    • Handle potential broadcast::error::RecvError::Lagged errors gracefully.
  • Task: Event Processing Steps: Upon receiving a NodeBroadcastEvent:
    1. Determine Topic: Call get_topic_for_event (from Task 1.3.2.4) to get the internal Topic. If no topic maps (shouldn't happen with exhaustive matching), log an error and skip.
    2. Identify Subscribers: Acquire necessary locks and look up all (SubscriptionId, SubscriptionSink) pairs associated with the Topic in the topic_subscriptions registry.
    3. Apply Filters: For each potential subscriber (subscription_id, sink):
      • Look up the associated Filter in the filters registry using subscription_id.
      • If a filter exists, call its matches(&node_broadcast_event_data) method, passing a reference to the structured data held directly within the received NodeBroadcastEvent variant (e.g., &block_header if the event is NodeBroadcastEvent::BlockFinalized(block_header)). Use &dyn Any for the filter to downcast.
      • Proceed to the next step only if there is no filter OR the filter's matches method returns true.
    4. Format JSON Payload: Using the structured data within the NodeBroadcastEvent, construct the JSON-RPC notification payload expected by the client for this subscription type. This involves serializing the structured data into the appropriate JSON format according to the API specification.
    5. Send to Sink: Create the jsonrpsee::SubscriptionMessage containing the formatted JSON payload and attempt to send it asynchronously using SubscriptionSink::send().
    6. Handle Sink Errors: Monitor the result of sink.send(). If an error occurs (e.g., Disconnected), log the error, potentially increment failure counters in SubscriptionStats, and implement logic for potential subscription cleanup (see Task 1.6).
    7. Update Statistics: Increment relevant counters in SubscriptionStats for the subscription_id.
  • Task: Implement graceful shutdown handling for the background task.
  • Task: Add detailed tracing logs for the task lifecycle, event reception, topic mapping, filtering decisions, sink errors, and shutdown.
1.3.2.6. Document manager module (Variant B)
  • Task: Write comprehensive documentation for the SubscriptionManager struct, explaining its role and the dual-registry mechanism.
  • Task: Document the expected NodeBroadcastEvent type (likely linking to its definition in the shared crate).
  • Task: Clearly document the thread-safety model and locking strategy.
  • Task: Provide usage examples relevant to the jsonrpsee context.
1.3.3. Comparison of Variants
Feature Variant A (Dual Server / Parse RuesEvent) Variant B (Hard Cutover / NodeBroadcastEvent)
Producer Changes Minimal: Only requires updating use statements for relocated RuesEvent. Significant: Requires removing RuesEvent logic, adding NodeBroadcastEvent logic.
jsonrpc Server Logic More Complex: Needs RelocatedRuesEvent definition, SystemEvent definition, and try_parse... function (deserialization). Simpler: Receives structured NodeBroadcastEvent directly, no parsing/deserialization needed for the event itself.
Efficiency Performs one deserialization per event in jsonrpc. Producers still perform serialization for legacy RuesEvent. Optimal: No event serialization/deserialization between producer and jsonrpc. Data flows as structs.
Transition Complexity Lower risk during transition, producers largely untouched. Allows simultaneous server operation. Higher risk during transition due to core producer refactoring. Does not easily support simultaneous server operation.
Final Architecture Leaves adaptation layer (try_parse...) in jsonrpc. Requires later refactoring to remove RelocatedRuesEvent fully. Cleaner: Results directly in the desired state with structured events and no legacy format.
Code Duplication/Debt Introduces RelocatedRuesEvent copy. Parsing logic depends on legacy format knowledge. Eliminates legacy format dependency entirely.
Scope Alignment Better fits a scope focused only on replacing the server interface while preserving producer behavior initially. Better fits a scope where removing the legacy server includes refactoring event production for efficiency.

Recommendation:

  • If simultaneous operation is required or minimizing disruption to producers during the initial server replacement is paramount: Choose Variant A. It isolates changes mostly to the new jsonrpc module and defers the larger producer refactoring. Accept the technical debt of copying/parsing RuesEvent temporarily.
  • If a hard cutover is acceptable and refactoring producers as part of the server replacement is feasible: Choose Variant B. It's more work upfront involving core components but leads directly to the cleaner, more efficient final architecture without intermediate steps or temporary code duplication/parsing logic.

Variant B is architecturally superior in the long run, but Variant A provides a more phased approach if required by project constraints or risk tolerance during the transition.

1.4. Subscription Lifecycle Methods

Implement add_subscription method

  • Add parameters: session_id: SessionId, topic: Topic, sink: SubscriptionSink, filter: Option<Box<dyn Filter>>
  • Generate subscription ID
  • Integrate ManualRateLimiters for Subscription Creation:
    • Define Creation Pattern: Define a pattern like "subscription:create" or "subscription:*" in the RateLimitConfig.
    • Check Limit: At the beginning of the add_subscription method, retrieve ClientInfo (likely needs to be passed in or obtained from context) and call AppState::manual_rate_limiters().check_method_limit(&client_info, "subscription:create").
    • Handle Limit Error: If the check fails, return Err(SubscriptionError::RateLimitExceeded).
  • Register in topic_subscriptions
  • Register in session_subscriptions
  • Store filter if provided
  • Initialize subscription statistics
  • Return subscription ID
  • Document with examples and doc tests

Implement remove_subscription method

  • Accept subscription ID
  • Remove from topic registry
  • Remove from session registry
  • Remove associated filter
  • Remove statistics
  • Return appropriate error if not found
  • Document with examples and doc tests

Implement remove_session_subscriptions method

  • Accept session ID
  • Find all subscriptions for session
  • Remove each from both registries
  • Clean up filters and statistics
  • Handle errors properly
  • Document with examples and doc tests

Implement synchronous publish method (Optional)
Note: Primarily for internal use or specific low-volume cases. Background task handles main event flow.

  • Accept topic and event
  • Find all subscriptions for topic
  • Apply filters to determine relevant subscribers
  • Integrate ManualRateLimiters for Event Delivery (Sync):
    • Retrieve ClientInfo associated with the SubscriptionSink.
    • Before sending, call check_method_limit(&client_info, "<topic_pattern>").
    • If rate limited, skip sending and increment events_dropped_rate_limit.
  • Send to each subscription sink (Format JSON payload first)
  • Update statistics (events processed/dropped)
  • Handle failures and track failed deliveries
  • Document blocking nature and use cases with examples and doc tests

Implement asynchronous publish_async method (Optional)
Note: Only if direct async publishing outside the main event loop is needed.

  • Accept topic and event
  • Send to channel consumed by background task (if Variant A/B structure uses an internal channel) or handle directly if needed.
  • Handle backpressure
  • Return quickly
  • Document non-blocking behavior and use cases with examples and doc tests

1.5. Subscription Status and Management

Implement SubscriptionStats struct

  • Add fields to track:
    • events_processed: u64
    • events_dropped_buffer: u64 (Events dropped due to client buffer full/slow)
    • events_dropped_rate_limit: u64 (Events dropped due to manual rate limiter)
    • last_event_time: Option<u64>
    • creation_time: u64
  • Implement update methods for these statistics.

Implement get_subscription_status method (within Subscription Service)
Note: Method implementation is in the service layer, but relies on stats from infrastructure.

  • Accept subscription ID
  • Retrieve the SubscriptionStats for that ID from the SubscriptionManager.
  • Format the output according to the getSubscriptionStatus WebSocket method specification (including active, events_processed, events_dropped (sum of dropped counts), last_event_time, and potentially calculating throttled based on recent events_dropped_rate_limit).

Integrate ManualRateLimiters for Event Delivery (Background Task)

  • Determine Rate Limit Patterns: Define specific method patterns in the RateLimitConfig for different subscription topics (e.g., "subscription:BlockAcceptance", "subscription:ContractEvents:*").
  • Store ClientInfo: Ensure ClientInfo (or relevant parts like IP) is stored alongside the SubscriptionSink when a subscription is added (likely needs modification to add_subscription and internal structures).
  • Check Before Sending: In the background task (Variant A or B), before sending an event via SubscriptionSink::send():
    • Retrieve the stored ClientInfo for the sink.
    • Determine the correct topic pattern string (e.g., "subscription:BlockAcceptance").
    • Call AppState::manual_rate_limiters().check_method_limit(&client_info, "<topic_pattern>").
  • Handle Rate Limit Errors: If check_method_limit returns Err(RateLimitError::ManualMethodLimitExceeded), do not send the event to that specific sink.
  • Update Statistics: Increment the events_dropped_rate_limit counter in SubscriptionStats for the specific subscription when an event is dropped due to the rate limit check.

1.6. Error Handling and Cleanup

1.6.1. Enhance background task with error handling
  • Add retry logic for temporary sink failures (optional, consider complexity)
  • Track failed sends (events_dropped_buffer in SubscriptionStats)
  • Remove subscriptions after repeated/fatal failures (e.g., Disconnected sink error)
  • Add metrics for failed sends (events_dropped_buffer, events_dropped_rate_limit)
1.6.2. Add subscription cleanup logic
  • Detect and remove stale subscriptions (e.g., associated with disconnected sessions). Logic for this might live in remove_session_subscriptions.
  • Implement periodic cleanup task (optional, if needed beyond session disconnects)
  • Add timeout configuration (WebSocketServerConfig.idle_timeout) - jsonrpsee likely handles basic idle disconnects. Ensure manager cleans up associated subscriptions.
  • Document cleanup strategy

1.7. AppState Integration

Remove placeholder from state.rs

  • Delete the placeholder SubscriptionManager struct if one was used in Stage 1.
  • Update imports to use the actual SubscriptionManager implementation.

Update AppState to use new implementation

  • Modify the type in AppState's subscription_manager field to Arc<RwLock<ActualSubscriptionManager>>.
  • Update the AppState::new constructor to initialize the actual SubscriptionManager (passing the event receiver).
  • Ensure all methods interacting with subscription_manager still work.
  • Fix the tests and doc tests
  • Add comprehensive documentation to the AppState fields related to subscriptions.

1.8. Documentation for Subscription Infrastructure

1.8.1. Add module-level documentation (infrastructure/subscription/mod.rs)
  • Explain overall subscription architecture (chosen variant: A or B).
  • Document dual-registry design rationale.
  • Explain thread-safety model.
  • Provide usage examples matching the WebSocket methods document.
1.8.2. Document synchronous vs asynchronous publishing (if applicable)
  • Explain differences in detail.
  • Provide examples for when to use each.
  • Discuss performance implications.
  • Cover error handling differences.
1.8.3. Add thread-safety documentation
  • Explain concurrent access patterns.
  • Document lock ordering if applicable.
  • Explain safe shutdown procedures.
  • Address potential deadlocks.

1.9. Testing for Subscription Infrastructure

These tests focus on the manager/infrastructure itself, before the RPC layer.

1.9.1. Create unit tests for SubscriptionManager (infrastructure/subscription/manager.rs tests)
  • Test subscription registration/removal (add_subscription, remove_subscription).
  • Test session cleanup (remove_session_subscriptions).
  • Test filter application logic (using mock events and filters).
  • Test statistics tracking updates (SubscriptionStats).
  • Test error handling scenarios (e.g., adding duplicate subscriptions, removing non-existent ones).

2. WebSocket Server Setup & Execution (jsonrpc::server)

This section focuses on enabling and configuring the WebSocket part of the jsonrpsee server.

2.1. WebSocket Server Initialization

  • Enable WebSocket Listener in jsonrpsee
    • Modify the JsonRpcServer setup (from Stage 1.4.1) to configure both HTTP and WebSocket listeners using ServerBuilder::build and ServerBuilder::ws_tokio::bind (or similar combined setup).
    • Use settings from JsonRpcConfig.websocket (bind address, port, max connections, max message size etc.).
    • Ensure TLS is configured for WebSocket if specified.

2.2. Manual WebSocket Rate Limiting Integration

  • Add WebSocket Connection Rate Limiting
    • Initialize the websocket_limiters map within ManualRateLimiters::new based on config.websocket_limit.
    • Use ServerBuilder::set_rpc_connect_validator (or similar mechanism in jsonrpsee) to hook into the WebSocket connection establishment phase.
    • Inside the validator closure:
      • Extract ClientInfo (IP address) from the incoming connection details.
      • Call AppState::manual_rate_limiters().check_websocket_limit(&client_info).
      • If it returns Err(RateLimitError::ManualWebSocketLimitExceeded), reject the connection (return false or appropriate error from the validator).
      • If Ok(()), allow the connection (return true).
    • Implement mapping from RateLimitError::ManualWebSocketLimitExceeded to a suitable log message or potentially an error response if the framework allows.

2.3. Define RPC Method Traits (Subscription-side)

  • Define Subscription RPC Method Traits (jsonrpc::service::subscription)
    • Define subscription traits (e.g., SubscriptionRpcServer) using #[rpc(server, subscription)] attribute.
    • Define subscription method signatures within these traits (e.g., subscribeBlockAcceptance, unsubscribe).
      • Subscription methods should accept &self, parameters, SubscriptionSink, and optionally context (subscription_context).
      • Unsubscribe methods typically accept &self, SubscriptionId, and optionally context.
      • Other management methods like getSubscriptionStatus use #[method(name = "...")].

2.4. Implement RPC Method Dispatching for WebSocket

  • Implement WebSocket RPC Method Dispatching
    • Create a concrete struct (e.g., SubscriptionRpcImpl) implementing the subscription RPC trait(s). This struct will hold the AppState.
    • Merge the subscription RPC module with the existing HTTP RPC module created in Stage 1 using RpcModule::merge().
    • Register the final merged RpcModule (containing both HTTP and WS methods) with the jsonrpsee::Server during startup.

3. Implement Subscription Service (jsonrpc::service::subscription)

Implement the logic for the subscription-related RPC methods.

3.1. Implement Block Event Subscriptions

  • subscribeBlockAcceptance: Call AppState::subscription_manager().add_subscription(...) with Topic::BlockAcceptance and appropriate BlockFilter.
  • subscribeBlockFinalization: Call add_subscription with Topic::BlockFinalization and BlockFilter.
  • subscribeChainReorganization: Call add_subscription with Topic::ChainReorganization.

3.2. Implement Contract Event Subscriptions

  • subscribeContractEvents: Call add_subscription with Topic::ContractEvents and ContractFilter.
  • subscribeContractTransferEvents: Call add_subscription with Topic::ContractTransferEvents and TransferFilter.

3.3. Implement Mempool Event Subscriptions

  • subscribeMempoolAcceptance: Call add_subscription with Topic::MempoolAcceptance and MempoolFilter.
  • subscribeMempoolEvents: Call add_subscription with Topic::MempoolEvents and MempoolFilter.

3.4. Implement Subscription Management Methods

  • unsubscribe: Call AppState::subscription_manager().remove_subscription(...) using the provided SubscriptionId.
  • getSubscriptionStatus: Retrieve stats using AppState::subscription_manager().get_subscription_status(...) (needs adding to manager) and format the response.

4. Stage 2 Testing (tests/jsonrpc)

4.1. Implement Subscription Infrastructure Unit Tests

Covered in Task 1.9.

4.2. Implement Subscription Service Integration Tests

  • Use jsonrpsee::ws_client::WsClient for testing.
  • Test WebSocket connection establishment and manual rate limiting (check_websocket_limit).
  • Test Each Subscription Type:
    • Connect client, call subscribe* method.
    • Verify successful subscription response (returns ID).
    • Simulate event production (triggering the manager's background task via mock receiver or direct call).
    • Test with different filters, verify client only receives matching events.
    • Test manual rate limiting on event delivery (check_method_limit for topic patterns).
  • Test Management Methods:
    • Test getSubscriptionStatus returns correct stats after events.
    • Test unsubscribe stops notifications for that client/ID.
    • Test session disconnect automatically removes associated subscriptions (verify via getSubscriptionStatus or internal state check).
    • Test manual rate limiting on subscription creation (check_method_limit for "subscription:create").

4.3. Test Background Task Behavior

  • Test event processing loop handles different event types correctly (Variant A/B specific).
  • Test cleanup of failed subscriptions (e.g., simulate sink errors).
  • Test shutdown behavior.
  • Test with simulated event source errors (e.g., RecvError::Lagged).
  • Test throttling behavior (manual rate limits, sink backpressure if applicable).

Integration Testing Schedule

  • After Stage 1: Test all HTTP RPC methods (Block, Tx, Contract, Network, Prover) and core infrastructure (config, errors, metrics, HTTP rate limits).
  • After Stage 2: Test WebSocket connectivity, all subscription methods (subscribe*, unsubscribe, getStatus), event delivery with filters, WebSocket rate limits, and subscription cleanup.

@welf welf force-pushed the json-rpc-server branch from 6d21a5a to 251628c Compare May 18, 2025 23:08
@welf
Copy link
Contributor Author

welf commented May 19, 2025

@HDauven All JSON-RPC block methods are implemented and their documentation in Wiki is updated. The same documentation is located in the rusk/docs/JSON-RPC-block-methods.md file.

Implementation Plan in the PR description is also up to date.

In a couple of days I'll implement JSON-RPC network methods.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Epic A large feature encompassing multiple sub-issues module:rusk Issues related to rusk module type:feature implementing a new feature

Projects

None yet

2 participants