Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/analytics/src/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ impl AnalyticsDataSource for ClickhouseClient {
| AnalyticsCollection::Dispute => {
TableEngine::CollapsingMergeTree { sign: "sign_flag" }
}
AnalyticsCollection::DisputeSessionized => {
TableEngine::CollapsingMergeTree { sign: "sign_flag" }
}
AnalyticsCollection::SdkEvents
| AnalyticsCollection::SdkEventsAnalytics
| AnalyticsCollection::ApiEvents
Expand Down Expand Up @@ -439,6 +442,7 @@ impl ToSql<ClickhouseClient> for AnalyticsCollection {
Self::ConnectorEvents => Ok("connector_events_audit".to_string()),
Self::OutgoingWebhookEvent => Ok("outgoing_webhook_events_audit".to_string()),
Self::Dispute => Ok("dispute".to_string()),
Self::DisputeSessionized => Ok("sessionizer_dispute".to_string()),
Self::ActivePaymentsAnalytics => Ok("active_payments".to_string()),
}
}
Expand Down
12 changes: 6 additions & 6 deletions crates/analytics/src/disputes/accumulators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use super::metrics::DisputeMetricRow;
#[derive(Debug, Default)]
pub struct DisputeMetricsAccumulator {
pub disputes_status_rate: RateAccumulator,
pub total_amount_disputed: SumAccumulator,
pub total_dispute_lost_amount: SumAccumulator,
pub disputed_amount: PaymentProcessedAmountAccumulator,
pub dispute_lost_amount: PaymentProcessedAmountAccumulator,
}
#[derive(Debug, Default)]
pub struct RateAccumulator {
Expand All @@ -17,7 +17,7 @@ pub struct RateAccumulator {
}
#[derive(Debug, Default)]
#[repr(transparent)]
pub struct SumAccumulator {
pub struct PaymentProcessedAmountAccumulator {
pub total: Option<i64>,
}

Expand All @@ -29,7 +29,7 @@ pub trait DisputeMetricAccumulator {
fn collect(self) -> Self::MetricOutput;
}

impl DisputeMetricAccumulator for SumAccumulator {
impl DisputeMetricAccumulator for PaymentProcessedAmountAccumulator {
type MetricOutput = Option<u64>;
#[inline]
fn add_metrics_bucket(&mut self, metrics: &DisputeMetricRow) {
Expand Down Expand Up @@ -92,8 +92,8 @@ impl DisputeMetricsAccumulator {
disputes_challenged: challenge_rate,
disputes_won: won_rate,
disputes_lost: lost_rate,
total_amount_disputed: self.total_amount_disputed.collect(),
total_dispute_lost_amount: self.total_dispute_lost_amount.collect(),
disputed_amount: self.disputed_amount.collect(),
dispute_lost_amount: self.dispute_lost_amount.collect(),
total_dispute,
}
}
Expand Down
46 changes: 31 additions & 15 deletions crates/analytics/src/disputes/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use api_models::analytics::{
DisputeDimensions, DisputeMetrics, DisputeMetricsBucketIdentifier,
DisputeMetricsBucketResponse,
},
AnalyticsMetadata, DisputeFilterValue, DisputeFiltersResponse, GetDisputeFilterRequest,
GetDisputeMetricRequest, MetricsResponse,
DisputeFilterValue, DisputeFiltersResponse, DisputesAnalyticsMetadata, DisputesMetricsResponse,
GetDisputeFilterRequest, GetDisputeMetricRequest,
};
use error_stack::ResultExt;
use router_env::{
Expand All @@ -30,7 +30,7 @@ pub async fn get_metrics(
pool: &AnalyticsProvider,
auth: &AuthInfo,
req: GetDisputeMetricRequest,
) -> AnalyticsResult<MetricsResponse<DisputeMetricsBucketResponse>> {
) -> AnalyticsResult<DisputesMetricsResponse<DisputeMetricsBucketResponse>> {
let mut metrics_accumulator: HashMap<
DisputeMetricsBucketIdentifier,
DisputeMetricsAccumulator,
Expand Down Expand Up @@ -87,14 +87,17 @@ pub async fn get_metrics(
logger::debug!(bucket_id=?id, bucket_value=?value, "Bucket row for metric {metric}");
let metrics_builder = metrics_accumulator.entry(id).or_default();
match metric {
DisputeMetrics::DisputeStatusMetric => metrics_builder
DisputeMetrics::DisputeStatusMetric
| DisputeMetrics::SessionizedDisputeStatusMetric => metrics_builder
.disputes_status_rate
.add_metrics_bucket(&value),
DisputeMetrics::TotalAmountDisputed => metrics_builder
.total_amount_disputed
.add_metrics_bucket(&value),
DisputeMetrics::TotalDisputeLostAmount => metrics_builder
.total_dispute_lost_amount
DisputeMetrics::TotalAmountDisputed
| DisputeMetrics::SessionizedTotalAmountDisputed => {
metrics_builder.disputed_amount.add_metrics_bucket(&value)
}
DisputeMetrics::TotalDisputeLostAmount
| DisputeMetrics::SessionizedTotalDisputeLostAmount => metrics_builder
.dispute_lost_amount
.add_metrics_bucket(&value),
}
}
Expand All @@ -105,18 +108,31 @@ pub async fn get_metrics(
metrics_accumulator
);
}
let mut total_disputed_amount = 0;
let mut total_dispute_lost_amount = 0;
let query_data: Vec<DisputeMetricsBucketResponse> = metrics_accumulator
.into_iter()
.map(|(id, val)| DisputeMetricsBucketResponse {
values: val.collect(),
dimensions: id,
.map(|(id, val)| {
let collected_values = val.collect();
if let Some(amount) = collected_values.disputed_amount {
total_disputed_amount += amount;
}
if let Some(amount) = collected_values.dispute_lost_amount {
total_dispute_lost_amount += amount;
}

DisputeMetricsBucketResponse {
values: collected_values,
dimensions: id,
}
})
.collect();

Ok(MetricsResponse {
Ok(DisputesMetricsResponse {
query_data,
meta_data: [AnalyticsMetadata {
current_time_range: req.time_range,
meta_data: [DisputesAnalyticsMetadata {
total_disputed_amount: Some(total_disputed_amount),
total_dispute_lost_amount: Some(total_dispute_lost_amount),
}],
})
}
Expand Down
16 changes: 16 additions & 0 deletions crates/analytics/src/disputes/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod dispute_status_metric;
mod sessionized_metrics;
mod total_amount_disputed;
mod total_dispute_lost_amount;

Expand Down Expand Up @@ -92,6 +93,21 @@ where
.load_metrics(dimensions, auth, filters, granularity, time_range, pool)
.await
}
Self::SessionizedTotalAmountDisputed => {
sessionized_metrics::TotalAmountDisputed::default()
.load_metrics(dimensions, auth, filters, granularity, time_range, pool)
.await
}
Self::SessionizedDisputeStatusMetric => {
sessionized_metrics::DisputeStatusMetric::default()
.load_metrics(dimensions, auth, filters, granularity, time_range, pool)
.await
}
Self::SessionizedTotalDisputeLostAmount => {
sessionized_metrics::TotalDisputeLostAmount::default()
.load_metrics(dimensions, auth, filters, granularity, time_range, pool)
.await
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
use std::collections::HashSet;

use api_models::analytics::{
disputes::{DisputeDimensions, DisputeFilters, DisputeMetricsBucketIdentifier},
Granularity, TimeRange,
};
use common_utils::errors::ReportSwitchExt;
use error_stack::ResultExt;
use time::PrimitiveDateTime;

use super::DisputeMetricRow;
use crate::{
enums::AuthInfo,
query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, SeriesBucket, ToSql, Window},
types::{AnalyticsCollection, AnalyticsDataSource, MetricsError, MetricsResult},
};
#[derive(Default)]
pub(crate) struct DisputeStatusMetric {}

#[async_trait::async_trait]
impl<T> super::DisputeMetric<T> for DisputeStatusMetric
where
T: AnalyticsDataSource + super::DisputeMetricAnalytics,
PrimitiveDateTime: ToSql<T>,
AnalyticsCollection: ToSql<T>,
Granularity: GroupByClause<T>,
Aggregate<&'static str>: ToSql<T>,
Window<&'static str>: ToSql<T>,
{
async fn load_metrics(
&self,
dimensions: &[DisputeDimensions],
auth: &AuthInfo,
filters: &DisputeFilters,
granularity: &Option<Granularity>,
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<HashSet<(DisputeMetricsBucketIdentifier, DisputeMetricRow)>>
where
T: AnalyticsDataSource + super::DisputeMetricAnalytics,
{
let mut query_builder = QueryBuilder::new(AnalyticsCollection::DisputeSessionized);

for dim in dimensions {
query_builder.add_select_column(dim).switch()?;
}

query_builder.add_select_column("dispute_status").switch()?;

query_builder
.add_select_column(Aggregate::Count {
field: None,
alias: Some("count"),
})
.switch()?;
query_builder
.add_select_column(Aggregate::Min {
field: "created_at",
alias: Some("start_bucket"),
})
.switch()?;
query_builder
.add_select_column(Aggregate::Max {
field: "created_at",
alias: Some("end_bucket"),
})
.switch()?;

filters.set_filter_clause(&mut query_builder).switch()?;

auth.set_filter_clause(&mut query_builder).switch()?;

time_range.set_filter_clause(&mut query_builder).switch()?;

for dim in dimensions {
query_builder.add_group_by_clause(dim).switch()?;
}

query_builder
.add_group_by_clause("dispute_status")
.switch()?;

if let Some(granularity) = granularity.as_ref() {
granularity
.set_group_by_clause(&mut query_builder)
.switch()?;
}

query_builder
.execute_query::<DisputeMetricRow, _>(pool)
.await
.change_context(MetricsError::QueryBuildingError)?
.change_context(MetricsError::QueryExecutionFailure)?
.into_iter()
.map(|i| {
Ok((
DisputeMetricsBucketIdentifier::new(
i.dispute_stage.as_ref().map(|i| i.0),
i.connector.clone(),
TimeRange {
start_time: match (granularity, i.start_bucket) {
(Some(g), Some(st)) => g.clip_to_start(st)?,
_ => time_range.start_time,
},
end_time: granularity.as_ref().map_or_else(
|| Ok(time_range.end_time),
|g| i.end_bucket.map(|et| g.clip_to_end(et)).transpose(),
)?,
},
),
i,
))
})
.collect::<error_stack::Result<
HashSet<(DisputeMetricsBucketIdentifier, DisputeMetricRow)>,
crate::query::PostProcessingError,
>>()
.change_context(MetricsError::PostProcessingFailure)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
mod dispute_status_metric;
mod total_amount_disputed;
mod total_dispute_lost_amount;
pub(super) use dispute_status_metric::DisputeStatusMetric;
pub(super) use total_amount_disputed::TotalAmountDisputed;
pub(super) use total_dispute_lost_amount::TotalDisputeLostAmount;

pub use super::{DisputeMetric, DisputeMetricAnalytics, DisputeMetricRow};
Loading
Loading