Skip to content

Add SamplingContiguousDataSource for probabilistic source selection in retrieval pipeline #590

@djwhitt

Description

@djwhitt

Problem

We need a way to gradually test alternative data retrieval strategies (e.g., offset-based + chunk sources) against production traffic. The goal is to:

  1. Route a configurable percentage of requests through an experimental source
  2. Collect metrics on success/failure rates and latency
  3. Allow the experimental source to participate in the normal fallback chain

This is not shadow traffic - the sampled source runs as part of the normal pipeline. If sampled and successful, it serves the request. If not sampled or if it fails, the pipeline continues to the next source.

Requirements

Must Haves

  • Configurable sampling rate (e.g., 10% of requests try the sampled source first)
  • Two sampling strategies:
    • Random: Math.random() < rate - uniform distribution, different result each request
    • Deterministic: Hash-based on ID - same ID always gets same sampling decision (reproducible for debugging, supports rates as low as 0.0001%)
  • Integrates into normal SequentialDataSource fallback chain
  • Metrics collection:
    • Requests sampled vs skipped
    • Success/error rates for sampled source
    • Latency for sampled source
  • Follow existing wrapper patterns (FilteredContiguousDataSource)

Should Haves

  • OpenTelemetry tracing integration with child spans

Nice to Haves

  • Multiple sampled sources with independent rates

Proposed Implementation

New Class: SamplingContiguousDataSource

Location: src/data/sampling-contiguous-data-source.ts

This wraps a single data source and probabilistically either:

  • Sampled: Tries the wrapped source, throws on failure (letting pipeline continue)
  • Not sampled: Throws immediately (letting pipeline continue to next source)
export class SamplingContiguousDataSource implements ContiguousDataSource {
  private log: winston.Logger;
  private dataSource: ContiguousDataSource;
  private samplingRate: number;
  private samplingStrategy: 'random' | 'deterministic';

  constructor({
    log,
    dataSource,
    samplingRate = 0.1,
    samplingStrategy = 'random',
  }: {
    log: winston.Logger;
    dataSource: ContiguousDataSource;
    samplingRate?: number;
    samplingStrategy?: 'random' | 'deterministic';
  }) {
    this.log = log.child({ class: this.constructor.name });
    this.dataSource = dataSource;
    this.samplingRate = samplingRate;
    this.samplingStrategy = samplingStrategy;
  }

  async getData({
    id,
    requestAttributes,
    region,
    parentSpan,
    signal,
  }: {
    id: string;
    requestAttributes?: RequestAttributes;
    region?: Region;
    parentSpan?: Span;
    signal?: AbortSignal;
  }): Promise<ContiguousData> {
    const sampled = this.shouldSample(id);

    if (!sampled) {
      // Not sampled - skip this source, let pipeline continue
      metrics.samplingSkipped.inc();
      throw new Error('Request not sampled');
    }

    // Sampled - try the wrapped source
    metrics.samplingSampled.inc();
    const startTime = Date.now();

    try {
      const data = await this.dataSource.getData({
        id,
        requestAttributes,
        region,
        parentSpan,
        signal,
      });

      const latency = Date.now() - startTime;
      metrics.samplingSuccess.inc();
      metrics.samplingLatency.observe(latency);

      this.log.debug('Sampled source succeeded', { id, latency });
      return data;
    } catch (error) {
      metrics.samplingErrors.inc();
      this.log.debug('Sampled source failed', {
        id,
        error: (error as Error).message,
      });
      throw error; // Let pipeline continue to next source
    }
  }

  private shouldSample(id: string): boolean {
    if (this.samplingStrategy === 'deterministic') {
      // Hash-based: same ID always samples or not (reproducible)
      // Use large modulus (1M) for fine-grained rates like 0.01% (1 in 10000)
      const hash = this.simpleHash(id);
      return (hash % 1_000_000) < (this.samplingRate * 1_000_000);
    }
    return Math.random() < this.samplingRate;
  }

  private simpleHash(str: string): number {
    // djb2 hash - simple and fast with good distribution
    let hash = 5381;
    for (let i = 0; i < str.length; i++) {
      hash = ((hash << 5) + hash) ^ str.charCodeAt(i);
      hash |= 0; // Convert to 32-bit integer
    }
    return Math.abs(hash);
  }
}

Integration in system.ts

The sampled source is prepended to the existing pipeline. The existing ON_DEMAND_RETRIEVAL_ORDER stays unchanged.

const onDemandDataSources: ContiguousDataSource[] = [];

// 1. Add sampled source at the FRONT (if enabled)
if (config.SAMPLING_ENABLED) {
  const experimentalSource = new OffsetChunkDataSource({ ... }); // or whatever we're testing
  const sampledSource = new SamplingContiguousDataSource({
    log,
    dataSource: experimentalSource,
    samplingRate: config.SAMPLING_RATE,
    samplingStrategy: config.SAMPLING_STRATEGY,
  });
  onDemandDataSources.push(sampledSource);
}

// 2. Add all existing sources (unchanged)
for (const sourceName of config.ON_DEMAND_RETRIEVAL_ORDER) {
  const dataSource = getDataSource(sourceName);
  if (dataSource) {
    onDemandDataSources.push(dataSource);
  }
}

// SequentialDataSource tries each in order, moves to next on failure
const sequentialSource = new SequentialDataSource({
  log,
  dataSources: onDemandDataSources,
});

Key point: The existing pipeline is untouched. We're just adding a new source at the front that either:

  • Gets sampled → tries the experimental source → returns data or fails through
  • Not sampled → immediately fails through to the existing pipeline

Data Flow

Request arrives
    ↓
SequentialDataSource tries sources in order:
    ↓
[SamplingContiguousDataSource wrapping ExperimentalSource]  ← NEW (prepended)
    ├─ Not sampled (90%) → throws → existing pipeline runs
    └─ Sampled (10%) → tries ExperimentalSource
        ├─ Success → returns data (existing pipeline never runs)
        └─ Failure → throws → existing pipeline runs
    ↓
[Existing pipeline unchanged: S3 → Gateways → TxChunks → ...]

Important: When not sampled or on failure, the entire existing pipeline runs exactly as before. No sources are removed or modified.

Configuration

New environment variables:

# Comma-separated list of sources to wrap with sampling
SAMPLED_SOURCES=offset-chunks

# Sampling rate (0.0 to 1.0)
SAMPLING_RATE=0.1

# Sampling strategy
SAMPLING_STRATEGY=random  # or 'deterministic'

Metrics

New Prometheus metrics:

  • arweave_sampling_sampled_total - Counter of requests that were sampled
  • arweave_sampling_skipped_total - Counter of requests that were skipped
  • arweave_sampling_success_total - Counter of successful sampled requests
  • arweave_sampling_errors_total - Counter of failed sampled requests
  • arweave_sampling_latency_seconds - Histogram of sampled request latency

Files to Create/Modify

File Action
src/data/sampling-contiguous-data-source.ts Create
src/data/sampling-contiguous-data-source.test.ts Create
src/config.ts Add sampling config vars
src/system.ts Wire up sampling wrapper
src/metrics.ts Add sampling metrics
docs/envs.md Document new env vars

Testing Strategy

  1. Unit tests: Mock inner source, verify:

    • Sampled requests call inner source
    • Non-sampled requests throw immediately
    • Metrics incremented correctly
    • Deterministic sampling is reproducible
  2. Integration test:

    • Configure with 50% sampling rate
    • Make 100 requests
    • Verify ~50 went through sampled source
    • Verify fallback works when sampled source fails

Verification

After implementation:

  1. Run unit tests: yarn test:file src/data/sampling-contiguous-data-source.test.ts
  2. Start with sampling enabled: SAMPLED_SOURCES=offset-chunks SAMPLING_RATE=0.5 yarn start
  3. Make requests and verify metrics at /metrics endpoint show expected sampling ratio
  4. Check logs for sampling decisions

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions