Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/chain/store/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl<DB: Blockstore> ChainIndex<DB> {
}
if to > from.epoch() {
return Err(Error::Other(format!(
"Looking for tipset {to} with height greater than start point {from}",
"looking for tipset with height greater than start point, req: {to}, head: {from}",
from = from.epoch()
)));
}
Expand Down
32 changes: 9 additions & 23 deletions src/rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ use std::fmt::{self, Debug};
use std::sync::LazyLock;
use std::time::Duration;

use anyhow::{Context as _, bail};
use enumflags2::BitFlags;
use anyhow::bail;
use futures::future::Either;
use http::{HeaderMap, HeaderValue, header};
use jsonrpsee::core::ClientError;
Expand Down Expand Up @@ -93,15 +92,17 @@ impl Client {
&self,
req: Request<T>,
) -> Result<T, ClientError> {
let max_api_path = req
.api_path()
.map_err(|e| ClientError::Custom(e.to_string()))?;
let Request {
method_name,
params,
api_paths,
timeout,
..
} = req;
let method_name = method_name.as_ref();
let client = self.get_or_init_client(api_paths).await?;
let client = self.get_or_init_client(max_api_path).await?;
let span = tracing::debug_span!("request", method = %method_name, url = %client.url);
let work = async {
// jsonrpsee's clients have a global `timeout`, but not a per-request timeout, which
Expand Down Expand Up @@ -149,31 +150,16 @@ impl Client {
};
work.instrument(span.or_current()).await
}
async fn get_or_init_client(
&self,
version: BitFlags<ApiPaths>,
) -> Result<&UrlClient, ClientError> {
let path = version
.iter()
.max()
.context("No supported versions")
.map_err(|e| ClientError::Custom(e.to_string()))?;
async fn get_or_init_client(&self, path: ApiPaths) -> Result<&UrlClient, ClientError> {
match path {
ApiPaths::V0 => &self.v0,
ApiPaths::V1 => &self.v1,
ApiPaths::V2 => &self.v2,
}
.get_or_try_init(|| async {
let url = self
.base_url
.join(match path {
ApiPaths::V0 => "rpc/v0",
ApiPaths::V1 => "rpc/v1",
ApiPaths::V2 => "rpc/v2",
})
.map_err(|it| {
ClientError::Custom(format!("creating url for endpoint failed: {it}"))
})?;
let url = self.base_url.join(path.path()).map_err(|it| {
ClientError::Custom(format!("creating url for endpoint failed: {it}"))
})?;
UrlClient::new(url, self.token.clone()).await
})
.await
Expand Down
162 changes: 152 additions & 10 deletions src/rpc/methods/chain.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2019-2025 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

mod types;
pub mod types;
use enumflags2::{BitFlags, make_bitflags};
use types::*;

Expand Down Expand Up @@ -54,6 +54,21 @@ use tokio_util::sync::CancellationToken;

const HEAD_CHANNEL_CAPACITY: usize = 10;

// SafeHeightDistance is the distance from the latest tipset, i.e. heaviest, that
// is considered to be safe from re-orgs at an increasingly diminishing
// probability.
//
// This is used to determine the safe tipset when using the "safe" tag in
// TipSetSelector or via Eth JSON-RPC APIs. Note that "safe" doesn't guarantee
// finality, but rather a high probability of not being reverted. For guaranteed
// finality, use the "finalized" tag.
//
// This constant is experimental and may change in the future.
// Discussion on this current value and a tracking item to document the
// probabilistic impact of various values is in
// https://github.com/filecoin-project/go-f3/issues/944
const SAFE_HEIGHT_DISTANCE: ChainEpoch = 200;

static CHAIN_EXPORT_LOCK: LazyLock<Mutex<Option<CancellationToken>>> =
LazyLock::new(|| Mutex::new(None));

Expand Down Expand Up @@ -978,28 +993,155 @@ impl RpcMethod<1> for ChainGetTipSet {

async fn handle(
ctx: Ctx<impl Blockstore>,
(ApiTipsetKey(tipset_key),): Self::Params,
(ApiTipsetKey(tsk),): Self::Params,
) -> Result<Self::Ok, ServerError> {
let ts = ctx
.chain_store()
.load_required_tipset_or_heaviest(&tipset_key)?;
Ok(ts)
if let Some(tsk) = &tsk {
let ts = ctx.chain_index().load_required_tipset(tsk)?;
Ok(ts)
} else {
// Error message here matches lotus
Err(anyhow::anyhow!("NewTipSet called with zero length array of blocks").into())
}
}
}

pub enum ChainGetTipSetV2 {}

impl ChainGetTipSetV2 {
pub async fn get_tipset_by_anchor(
ctx: &Ctx<impl Blockstore + Send + Sync + 'static>,
anchor: &Option<TipsetAnchor>,
) -> anyhow::Result<Option<Arc<Tipset>>> {
if let Some(anchor) = anchor {
match (&anchor.key.0, &anchor.tag) {
// Anchor is zero-valued. Fall back to heaviest tipset.
(None, None) => Ok(Some(ctx.state_manager.heaviest_tipset())),
// Get tipset at the specified key.
(Some(tsk), None) => Ok(Some(ctx.chain_index().load_required_tipset(tsk)?)),
(None, Some(tag)) => Self::get_tipset_by_tag(ctx, *tag).await,
_ => {
anyhow::bail!("invalid anchor")
}
}
} else {
// No anchor specified. Fall back to finalized tipset.
Self::get_tipset_by_tag(ctx, TipsetTag::Finalized).await
}
}

pub async fn get_tipset_by_tag(
ctx: &Ctx<impl Blockstore + Send + Sync + 'static>,
tag: TipsetTag,
) -> anyhow::Result<Option<Arc<Tipset>>> {
match tag {
TipsetTag::Latest => Ok(Some(ctx.state_manager.heaviest_tipset())),
TipsetTag::Finalized => Self::get_latest_finalized_tipset(ctx).await,
TipsetTag::Safe => Some(Self::get_latest_safe_tipset(ctx).await).transpose(),
}
}

pub async fn get_latest_safe_tipset(
ctx: &Ctx<impl Blockstore + Send + Sync + 'static>,
) -> anyhow::Result<Arc<Tipset>> {
let finalized = Self::get_latest_finalized_tipset(ctx).await?;
let head = ctx.chain_store().heaviest_tipset();
let safe_height = (head.epoch() - SAFE_HEIGHT_DISTANCE).max(0);
if let Some(finalized) = finalized
&& finalized.epoch() >= safe_height
{
Ok(finalized)
} else {
Ok(ctx.chain_index().tipset_by_height(
safe_height,
head,
ResolveNullTipset::TakeOlder,
)?)
}
}

pub async fn get_latest_finalized_tipset(
ctx: &Ctx<impl Blockstore + Send + Sync + 'static>,
) -> anyhow::Result<Option<Arc<Tipset>>> {
let Ok(f3_finalized_cert) =
crate::rpc::f3::F3GetLatestCertificate::handle(ctx.clone(), ()).await
else {
return Self::get_ec_finalized_tipset(ctx);
};

let f3_finalized_head = f3_finalized_cert.chain_head();
let head = ctx.chain_store().heaviest_tipset();
// Latest F3 finalized tipset is older than EC finality, falling back to EC finality
if head.epoch() > f3_finalized_head.epoch + ctx.chain_config().policy.chain_finality {
return Self::get_ec_finalized_tipset(ctx);
}

let ts = ctx
.chain_index()
.load_required_tipset(&f3_finalized_head.key)
.map_err(|e| {
anyhow::anyhow!(
"Failed to load F3 finalized tipset at epoch {} with key {}: {e}",
f3_finalized_head.epoch,
f3_finalized_head.key,
)
})?;
Ok(Some(ts))
}

pub fn get_ec_finalized_tipset(
ctx: &Ctx<impl Blockstore>,
) -> anyhow::Result<Option<Arc<Tipset>>> {
let head = ctx.chain_store().heaviest_tipset();
let ec_finality_epoch = head.epoch() - ctx.chain_config().policy.chain_finality;
if ec_finality_epoch >= 0 {
let ts = ctx.chain_index().tipset_by_height(
ec_finality_epoch,
head,
ResolveNullTipset::TakeOlder,
)?;
Ok(Some(ts))
} else {
Ok(None)
}
}
}

impl RpcMethod<1> for ChainGetTipSetV2 {
const NAME: &'static str = "Filecoin.ChainGetTipSet";
const PARAM_NAMES: [&'static str; 1] = ["tipsetSelector"];
const API_PATHS: BitFlags<ApiPaths> = make_bitflags!(ApiPaths::{ V2 });
const PERMISSION: Permission = Permission::Read;
const DESCRIPTION: Option<&'static str> = Some("Returns the tipset with the specified CID.");

type Params = (ApiTipsetKey,);
type Ok = Arc<Tipset>;
type Params = (TipsetSelector,);
type Ok = Option<Arc<Tipset>>;

async fn handle(_: Ctx<impl Blockstore>, _: Self::Params) -> Result<Self::Ok, ServerError> {
Err(ServerError::unsupported_method())
async fn handle(
ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
(selector,): Self::Params,
) -> Result<Self::Ok, ServerError> {
selector.validate()?;
// Get tipset by key.
if let ApiTipsetKey(Some(tsk)) = &selector.key {
let ts = ctx.chain_index().load_required_tipset(tsk)?;
return Ok(Some(ts));
}
// Get tipset by height.
if let Some(height) = &selector.height {
let anchor = Self::get_tipset_by_anchor(&ctx, &height.anchor).await?;
let ts = ctx.chain_index().tipset_by_height(
height.at,
anchor.unwrap_or_else(|| ctx.chain_store().heaviest_tipset()),
height.resolve_null_tipset_policy(),
)?;
return Ok(Some(ts));
}
// Get tipset by tag, either latest or finalized.
if let Some(tag) = &selector.tag {
let ts = Self::get_tipset_by_tag(&ctx, *tag).await?;
return Ok(ts);
}
Err(anyhow::anyhow!("no tipset found for selector").into())
}
}

Expand Down
Loading
Loading