Skip to content

Commit

Permalink
int-preservation feature for mlr top
Browse files Browse the repository at this point in the history
  • Loading branch information
johnkerl committed Dec 5, 2015
1 parent 9b7c254 commit 91e77c6
Show file tree
Hide file tree
Showing 11 changed files with 303 additions and 123 deletions.
144 changes: 142 additions & 2 deletions c/containers/mlr_val.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
// For some Linux distros, in spite of including time.h:
char *strptime(const char *s, const char *format, struct tm *tm);

typedef int mv_i_nn_comparator_func_t(mv_t* pa, mv_t* pb);
typedef int mv_i_cncn_comparator_func_t(const mv_t* pa, const mv_t* pb);

// ----------------------------------------------------------------
mv_t MV_NULL = {
.type = MT_NULL,
Expand Down Expand Up @@ -1805,6 +1808,103 @@ mv_t ge_op_func(mv_t* pval1, mv_t* pval2) { return (ge_dispositions[pval1->type]
mv_t lt_op_func(mv_t* pval1, mv_t* pval2) { return (lt_dispositions[pval1->type][pval2->type])(pval1, pval2); }
mv_t le_op_func(mv_t* pval1, mv_t* pval2) { return (le_dispositions[pval1->type][pval2->type])(pval1, pval2); }

// ----------------------------------------------------------------
static int eq_i_ii(mv_t* pa, mv_t* pb) { return pa->u.intv == pb->u.intv; }
static int ne_i_ii(mv_t* pa, mv_t* pb) { return pa->u.intv != pb->u.intv; }
static int gt_i_ii(mv_t* pa, mv_t* pb) { return pa->u.intv > pb->u.intv; }
static int ge_i_ii(mv_t* pa, mv_t* pb) { return pa->u.intv >= pb->u.intv; }
static int lt_i_ii(mv_t* pa, mv_t* pb) { return pa->u.intv < pb->u.intv; }
static int le_i_ii(mv_t* pa, mv_t* pb) { return pa->u.intv <= pb->u.intv; }

static int eq_i_ff(mv_t* pa, mv_t* pb) { return pa->u.fltv == pb->u.fltv; }
static int ne_i_ff(mv_t* pa, mv_t* pb) { return pa->u.fltv != pb->u.fltv; }
static int gt_i_ff(mv_t* pa, mv_t* pb) { return pa->u.fltv > pb->u.fltv; }
static int ge_i_ff(mv_t* pa, mv_t* pb) { return pa->u.fltv >= pb->u.fltv; }
static int lt_i_ff(mv_t* pa, mv_t* pb) { return pa->u.fltv < pb->u.fltv; }
static int le_i_ff(mv_t* pa, mv_t* pb) { return pa->u.fltv <= pb->u.fltv; }

static int eq_i_fi(mv_t* pa, mv_t* pb) { return pa->u.fltv == pb->u.intv; }
static int ne_i_fi(mv_t* pa, mv_t* pb) { return pa->u.fltv != pb->u.intv; }
static int gt_i_fi(mv_t* pa, mv_t* pb) { return pa->u.fltv > pb->u.intv; }
static int ge_i_fi(mv_t* pa, mv_t* pb) { return pa->u.fltv >= pb->u.intv; }
static int lt_i_fi(mv_t* pa, mv_t* pb) { return pa->u.fltv < pb->u.intv; }
static int le_i_fi(mv_t* pa, mv_t* pb) { return pa->u.fltv <= pb->u.intv; }

static int eq_i_if(mv_t* pa, mv_t* pb) { return pa->u.intv == pb->u.fltv; }
static int ne_i_if(mv_t* pa, mv_t* pb) { return pa->u.intv != pb->u.fltv; }
static int gt_i_if(mv_t* pa, mv_t* pb) { return pa->u.intv > pb->u.fltv; }
static int ge_i_if(mv_t* pa, mv_t* pb) { return pa->u.intv >= pb->u.fltv; }
static int lt_i_if(mv_t* pa, mv_t* pb) { return pa->u.intv < pb->u.fltv; }
static int le_i_if(mv_t* pa, mv_t* pb) { return pa->u.intv <= pb->u.fltv; }

static mv_i_nn_comparator_func_t* ieq_dispositions[MT_MAX][MT_MAX] = {
// NULL ERROR BOOL FLOAT INT STRING
/*NULL*/ {NULL, NULL, NULL, NULL, NULL, NULL},
/*ERROR*/ {NULL, NULL, NULL, NULL, NULL, NULL},
/*BOOL*/ {NULL, NULL, NULL, NULL, NULL, NULL},
/*FLOAT*/ {NULL, NULL, NULL, eq_i_ff, eq_i_fi, NULL},
/*INT*/ {NULL, NULL, NULL, eq_i_if, eq_i_ii, NULL},
/*STRING*/ {NULL, NULL, NULL, NULL, NULL, NULL},
};

static mv_i_nn_comparator_func_t* ine_dispositions[MT_MAX][MT_MAX] = {
// NULL ERROR BOOL FLOAT INT STRING
/*NULL*/ {NULL, NULL, NULL, NULL, NULL, NULL},
/*ERROR*/ {NULL, NULL, NULL, NULL, NULL, NULL},
/*BOOL*/ {NULL, NULL, NULL, NULL, NULL, NULL},
/*FLOAT*/ {NULL, NULL, NULL, ne_i_ff, ne_i_fi, NULL},
/*INT*/ {NULL, NULL, NULL, ne_i_if, ne_i_ii, NULL},
/*STRING*/ {NULL, NULL, NULL, NULL, NULL, NULL},
};

static mv_i_nn_comparator_func_t* igt_dispositions[MT_MAX][MT_MAX] = {
// NULL ERROR BOOL FLOAT INT STRING
/*NULL*/ {NULL, NULL, NULL, NULL, NULL, NULL},
/*ERROR*/ {NULL, NULL, NULL, NULL, NULL, NULL},
/*BOOL*/ {NULL, NULL, NULL, NULL, NULL, NULL},
/*FLOAT*/ {NULL, NULL, NULL, gt_i_ff, gt_i_fi, NULL},
/*INT*/ {NULL, NULL, NULL, gt_i_if, gt_i_ii, NULL},
/*STRING*/ {NULL, NULL, NULL, NULL, NULL, NULL},
};

static mv_i_nn_comparator_func_t* ige_dispositions[MT_MAX][MT_MAX] = {
// NULL ERROR BOOL FLOAT INT STRING
/*NULL*/ {NULL, NULL, NULL, NULL, NULL, NULL},
/*ERROR*/ {NULL, NULL, NULL, NULL, NULL, NULL},
/*BOOL*/ {NULL, NULL, NULL, NULL, NULL, NULL},
/*FLOAT*/ {NULL, NULL, NULL, ge_i_ff, ge_i_fi, NULL},
/*INT*/ {NULL, NULL, NULL, ge_i_if, ge_i_ii, NULL},
/*STRING*/ {NULL, NULL, NULL, NULL, NULL, NULL},
};

static mv_i_nn_comparator_func_t* ilt_dispositions[MT_MAX][MT_MAX] = {
// NULL ERROR BOOL FLOAT INT STRING
/*NULL*/ {NULL, NULL, NULL, NULL, NULL, NULL},
/*ERROR*/ {NULL, NULL, NULL, NULL, NULL, NULL},
/*BOOL*/ {NULL, NULL, NULL, NULL, NULL, NULL},
/*FLOAT*/ {NULL, NULL, NULL, lt_i_ff, lt_i_fi, NULL},
/*INT*/ {NULL, NULL, NULL, lt_i_if, lt_i_ii, NULL},
/*STRING*/ {NULL, NULL, NULL, NULL, NULL, NULL},
};

static mv_i_nn_comparator_func_t* ile_dispositions[MT_MAX][MT_MAX] = {
// NULL ERROR BOOL FLOAT INT STRING
/*NULL*/ {NULL, NULL, NULL, NULL, NULL, NULL},
/*ERROR*/ {NULL, NULL, NULL, NULL, NULL, NULL},
/*BOOL*/ {NULL, NULL, NULL, NULL, NULL, NULL},
/*FLOAT*/ {NULL, NULL, NULL, le_i_ff, le_i_fi, NULL},
/*INT*/ {NULL, NULL, NULL, le_i_if, le_i_ii, NULL},
/*STRING*/ {NULL, NULL, NULL, NULL, NULL, NULL},
};


int mv_i_nn_eq(mv_t* pval1, mv_t* pval2) { return (ieq_dispositions[pval1->type][pval2->type])(pval1, pval2); }
int mv_i_nn_ne(mv_t* pval1, mv_t* pval2) { return (ine_dispositions[pval1->type][pval2->type])(pval1, pval2); }
int mv_i_nn_gt(mv_t* pval1, mv_t* pval2) { return (igt_dispositions[pval1->type][pval2->type])(pval1, pval2); }
int mv_i_nn_ge(mv_t* pval1, mv_t* pval2) { return (ige_dispositions[pval1->type][pval2->type])(pval1, pval2); }
int mv_i_nn_lt(mv_t* pval1, mv_t* pval2) { return (ilt_dispositions[pval1->type][pval2->type])(pval1, pval2); }
int mv_i_nn_le(mv_t* pval1, mv_t* pval2) { return (ile_dispositions[pval1->type][pval2->type])(pval1, pval2); }

// ----------------------------------------------------------------
// arg2 evaluates to string via compound expression; regexes compiled on each call.
mv_t matches_no_precomp_func(mv_t* pval1, mv_t* pval2) {
Expand Down Expand Up @@ -1849,7 +1949,6 @@ mv_t does_not_match_precomp_func(mv_t* pval1, regex_t* pregex, string_builder_t*
}

// ----------------------------------------------------------------
typedef int mv_comparator_func_t(const mv_t* pa, const mv_t* pb);
static int mv_ff_comparator(const mv_t* pa, const mv_t* pb) {
double d = pa->u.fltv - pb->u.fltv;
return (d < 0) ? -1 : (d > 0) ? 1 : 0;
Expand All @@ -1867,7 +1966,7 @@ static int mv_ii_comparator(const mv_t* pa, const mv_t* pb) {
return (d < 0) ? -1 : (d > 0) ? 1 : 0;
}
// We assume mv_t's coming into percentile keeper are int or double -- in particular, non-null.
static mv_comparator_func_t* mv_comparator_dispositions[MT_MAX][MT_MAX] = {
static mv_i_cncn_comparator_func_t* mv_comparator_dispositions[MT_MAX][MT_MAX] = {
// NULL ERROR BOOL FLOAT INT STRING
/*NULL*/ {NULL, NULL, NULL, NULL, NULL, NULL},
/*ERROR*/ {NULL, NULL, NULL, NULL, NULL, NULL},
Expand All @@ -1881,3 +1980,44 @@ int mv_nn_comparator(const void* pva, const void* pvb) {
const mv_t* pb = pvb;
return mv_comparator_dispositions[pa->type][pb->type](pa, pb);
}

// ----------------------------------------------------------------
int mlr_bsearch_mv_n_for_insert(mv_t* array, int size, mv_t* pvalue) {
int lo = 0;
int hi = size-1;
int mid = (hi+lo)/2;
int newmid;

if (size == 0)
return 0;
if (mv_i_nn_gt(pvalue, &array[0]))
return 0;
if (mv_i_nn_lt(pvalue, &array[hi]))
return size;

while (lo < hi) {
mv_t* pa = &array[mid];
if (mv_i_nn_eq(pvalue, pa)) {
return mid;
}
else if (mv_i_nn_gt(pvalue, pa)) {
hi = mid;
newmid = (hi+lo)/2;
}
else {
lo = mid;
newmid = (hi+lo)/2;
}
if (mid == newmid) {
if (mv_i_nn_ge(pvalue, &array[lo]))
return lo;
else if (mv_i_nn_ge(pvalue, &array[hi]))
return hi;
else
return hi+1;
}
mid = newmid;
}

return lo;
}
11 changes: 11 additions & 0 deletions c/containers/mlr_val.h
Original file line number Diff line number Diff line change
Expand Up @@ -280,15 +280,26 @@ mv_t does_not_match_no_precomp_func(mv_t* pval1, mv_t* pval2);
mv_t matches_precomp_func(mv_t* pval1, regex_t* pregex, string_builder_t* psb);
mv_t does_not_match_precomp_func(mv_t* pval1, regex_t* pregex, string_builder_t* psb);

// For filter/put DSLs:
mv_t eq_op_func(mv_t* pval1, mv_t* pval2);
mv_t ne_op_func(mv_t* pval1, mv_t* pval2);
mv_t gt_op_func(mv_t* pval1, mv_t* pval2);
mv_t ge_op_func(mv_t* pval1, mv_t* pval2);
mv_t lt_op_func(mv_t* pval1, mv_t* pval2);
mv_t le_op_func(mv_t* pval1, mv_t* pval2);

// For non-DSL comparison of mlrvals:
int mv_i_nn_eq(mv_t* pval1, mv_t* pval2);
int mv_i_nn_ne(mv_t* pval1, mv_t* pval2);
int mv_i_nn_gt(mv_t* pval1, mv_t* pval2);
int mv_i_nn_ge(mv_t* pval1, mv_t* pval2);
int mv_i_nn_lt(mv_t* pval1, mv_t* pval2);
int mv_i_nn_le(mv_t* pval1, mv_t* pval2);

// ----------------------------------------------------------------
// For qsort of numeric mlrvals.
int mv_nn_comparator(const void* pva, const void* pvb);

int mlr_bsearch_mv_n_for_insert(mv_t* array, int size, mv_t* pvalue);

#endif // MLR_VAL_H
15 changes: 10 additions & 5 deletions c/containers/top_keeper.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
// ----------------------------------------------------------------
top_keeper_t* top_keeper_alloc(int capacity) {
top_keeper_t* ptop_keeper = mlr_malloc_or_die(sizeof(top_keeper_t));
ptop_keeper->top_values = mlr_malloc_or_die(capacity*sizeof(double));
ptop_keeper->top_values = mlr_malloc_or_die(capacity*sizeof(mv_t));
ptop_keeper->top_precords = mlr_malloc_or_die(capacity*sizeof(lrec_t*));
ptop_keeper->size = 0;
ptop_keeper->capacity = capacity;
Expand Down Expand Up @@ -56,8 +56,8 @@ void top_keeper_free(top_keeper_t* ptop_keeper) {
// [9 ] [9 ] [9 #] [9 #]

// Our caller, mapper_top, feeds us records. We keep them or free them.
void top_keeper_add(top_keeper_t* ptop_keeper, double value, lrec_t* prec) {
int destidx = mlr_bsearch_double_for_insert(ptop_keeper->top_values, ptop_keeper->size, value);
void top_keeper_add(top_keeper_t* ptop_keeper, mv_t value, lrec_t* prec) {
int destidx = mlr_bsearch_mv_n_for_insert(ptop_keeper->top_values, ptop_keeper->size, &value);
if (ptop_keeper->size < ptop_keeper->capacity) {
for (int i = ptop_keeper->size-1; i >= destidx; i--) {
ptop_keeper->top_values[i+1] = ptop_keeper->top_values[i];
Expand All @@ -84,8 +84,13 @@ void top_keeper_add(top_keeper_t* ptop_keeper, double value, lrec_t* prec) {
// ----------------------------------------------------------------
void top_keeper_print(top_keeper_t* ptop_keeper) {
printf("top_keeper dump:\n");
for (int i = 0; i < ptop_keeper->size; i++)
printf("[%02d] %.8lf\n", i, ptop_keeper->top_values[i]);
for (int i = 0; i < ptop_keeper->size; i++) {
mv_t* pvalue = &ptop_keeper->top_values[i];
if (pvalue->type == MT_FLOAT)
printf("[%02d] %.8lf\n", i, pvalue->u.fltv);
else
printf("[%02d] %lld\n", i, pvalue->u.intv);
}
for (int i = ptop_keeper->size; i < ptop_keeper->capacity; i++)
printf("[%02d] ---\n", i);
}
9 changes: 5 additions & 4 deletions c/containers/top_keeper.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,19 @@

#ifndef TOP_KEEPER_H
#define TOP_KEEPER_H
#include "containers/mlr_val.h"
#include "containers/lrec.h"

typedef struct _top_keeper_t {
double* top_values;
mv_t* top_values;
lrec_t** top_precords;
int size;
int capacity;
int size;
int capacity;
} top_keeper_t;

top_keeper_t* top_keeper_alloc(int capacity);
void top_keeper_free(top_keeper_t* ptop_keeper);
void top_keeper_add(top_keeper_t* ptop_keeper, double value, lrec_t* prec);
void top_keeper_add(top_keeper_t* ptop_keeper, mv_t value, lrec_t* prec);

// For debug/test
void top_keeper_print(top_keeper_t* ptop_keeper);
Expand Down
29 changes: 18 additions & 11 deletions c/mapping/mapper_top.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
typedef struct _mapper_top_state_t {
slls_t* pvalue_field_names;
slls_t* pgroup_by_field_names;
int show_full_records;
int top_count;
double sign; // for +1 for max; -1 for min
int show_full_records;
int allow_int_float;
mv_t sign; // for +1 for max; -1 for min
lhmslv_t* groups;
} mapper_top_state_t;

Expand All @@ -30,7 +31,7 @@ static void mapper_top_ingest(lrec_t* pinrec, mapper_top_state_t* pstate);
static sllv_t* mapper_top_emit(mapper_top_state_t* pstate, context_t* pctx);
static void mapper_top_free(void* pvstate);
static mapper_t* mapper_top_alloc(slls_t* pvalue_field_names, slls_t* pgroup_by_field_names,
int top_count, int do_max, int show_full_records);
int top_count, int do_max, int show_full_records, int allow_int_float);
static void mapper_top_usage(FILE* o, char* argv0, char* verb);
static mapper_t* mapper_top_parse_cli(int* pargi, int argc, char** argv);

Expand All @@ -51,6 +52,8 @@ static void mapper_top_usage(FILE* o, char* argv0, char* verb) {
fprintf(o, " to print only value and group-by fields. Requires a single\n");
fprintf(o, " value-field name only.\n");
fprintf(o, "--min Print top smallest values; default is top largest values.\n");
fprintf(o, "-F Keep top values as floats even if they look like integers.\n");

fprintf(o, "Prints the n records with smallest/largest values at specified fields,\n");
fprintf(o, "optionally by category.\n");
}
Expand All @@ -61,6 +64,7 @@ static mapper_t* mapper_top_parse_cli(int* pargi, int argc, char** argv) {
slls_t* pgroup_by_field_names = slls_alloc();
int show_full_records = FALSE;
int do_max = TRUE;
int allow_int_float = TRUE;

char* verb = argv[(*pargi)++];

Expand All @@ -71,6 +75,7 @@ static mapper_t* mapper_top_parse_cli(int* pargi, int argc, char** argv) {
ap_define_true_flag(pstate, "-a", &show_full_records);
ap_define_true_flag(pstate, "--max", &do_max);
ap_define_false_flag(pstate, "--min", &do_max);
ap_define_false_flag(pstate, "-F", &allow_int_float);

if (!ap_parse(pstate, verb, pargi, argc, argv)) {
mapper_top_usage(stderr, argv[0], verb);
Expand All @@ -86,12 +91,12 @@ static mapper_t* mapper_top_parse_cli(int* pargi, int argc, char** argv) {
}

return mapper_top_alloc(pvalue_field_names, pgroup_by_field_names,
top_count, do_max, show_full_records);
top_count, do_max, show_full_records, allow_int_float);
}

// ----------------------------------------------------------------
static mapper_t* mapper_top_alloc(slls_t* pvalue_field_names, slls_t* pgroup_by_field_names,
int top_count, int do_max, int show_full_records)
int top_count, int do_max, int show_full_records, int allow_int_float)
{
mapper_t* pmapper = mlr_malloc_or_die(sizeof(mapper_t));

Expand All @@ -100,8 +105,9 @@ static mapper_t* mapper_top_alloc(slls_t* pvalue_field_names, slls_t* pgroup_by_
pstate->pvalue_field_names = slls_copy(pvalue_field_names);
pstate->pgroup_by_field_names = slls_copy(pgroup_by_field_names);
pstate->show_full_records = show_full_records;
pstate->allow_int_float = allow_int_float;
pstate->top_count = top_count;
pstate->sign = do_max ? 1.0 : -1.0;
pstate->sign = mv_from_int(do_max ? 1 : -1);
pstate->groups = lhmslv_alloc();

pmapper->pvstate = pstate;
Expand Down Expand Up @@ -161,7 +167,9 @@ static void mapper_top_ingest(lrec_t* pinrec, mapper_top_state_t* pstate) {
for ( ; pa != NULL && pb != NULL; pa = pa->pnext, pb = pb->pnext) {
char* value_field_name = pa->value;
char* value_field_sval = pb->value;
double value_field_dval = mlr_double_from_string_or_die(value_field_sval);
mv_t value_field_nval = pstate->allow_int_float
? mv_scan_number_or_die(value_field_sval)
: mv_from_float(mlr_double_from_string_or_die(value_field_sval));

top_keeper_t* ptop_keeper_for_group = lhmsv_get(group_to_acc_field, value_field_name);
if (ptop_keeper_for_group == NULL) {
Expand All @@ -171,7 +179,7 @@ static void mapper_top_ingest(lrec_t* pinrec, mapper_top_state_t* pstate) {

// The top-keeper object will free the record if it isn't retained, or
// keep it if it is.
top_keeper_add(ptop_keeper_for_group, value_field_dval * pstate->sign,
top_keeper_add(ptop_keeper_for_group, n_nn_times_func(&value_field_nval, &pstate->sign),
pstate->show_full_records ? pinrec : NULL);
}
}
Expand Down Expand Up @@ -221,10 +229,9 @@ static sllv_t* mapper_top_emit(mapper_top_state_t* pstate, context_t* pctx) {

char* key = mlr_paste_2_strings(value_field_name, "_top");
if (i < ptop_keeper_for_group->size) {
double fltv = ptop_keeper_for_group->top_values[i] * pstate->sign;
char* strv = mlr_alloc_string_from_double(fltv, MLR_GLOBALS.ofmt);
mv_t numv = n_nn_times_func(&ptop_keeper_for_group->top_values[i], &pstate->sign);
char* strv = mv_format_val(&numv);
lrec_put(poutrec, key, strv, LREC_FREE_ENTRY_KEY|LREC_FREE_ENTRY_VALUE);
free(strv);
} else {
lrec_put(poutrec, key, "", LREC_FREE_ENTRY_KEY);
}
Expand Down
Loading

0 comments on commit 91e77c6

Please sign in to comment.