From 230a46763380ac654c8d47594be6456f403772f0 Mon Sep 17 00:00:00 2001 From: Jeff Barnes Date: Wed, 28 Jul 2021 04:25:17 -0800 Subject: [PATCH] Added avro validation --- Cargo.toml | 1 + sql/load-order.txt | 1 + src/avro.rs | 99 ++++++++++++++++++++++++++++++++++++++++++++ src/json_schema.rs | 4 +- src/json_type_def.rs | 20 ++++----- src/lib.rs | 1 + 6 files changed, 112 insertions(+), 14 deletions(-) create mode 100644 src/avro.rs diff --git a/Cargo.toml b/Cargo.toml index e05de32..4c30c49 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ pgx-macros = "0.1.20" serde_json = "1.0.59" jsonschema = "0.12.0" jtd = "0.3" +avro-rs = "0.13.0" [dev-dependencies] pgx-tests = "0.1.20" diff --git a/sql/load-order.txt b/sql/load-order.txt index 5c811b9..333644b 100644 --- a/sql/load-order.txt +++ b/sql/load-order.txt @@ -1,2 +1,3 @@ json_schema.generated.sql json_type_def.generated.sql +avro.generated.sql diff --git a/src/avro.rs b/src/avro.rs new file mode 100644 index 0000000..9ebb853 --- /dev/null +++ b/src/avro.rs @@ -0,0 +1,99 @@ +use avro_rs::types::Record; +use avro_rs::{Schema, Writer}; +use pgx::*; + +#[pg_extern] +fn avro_is_valid(schema: JsonB, instance: JsonB) -> bool { + let parsed_schema = + Schema::parse(&schema.0).unwrap_or_else(|e| panic!("Error parsing schema: {:?}", e)); + let mut writer = Writer::new(&parsed_schema, Vec::new()); + + let val; + if schema.0["type"] == "record" { + let mut record = Record::new(&parsed_schema).unwrap(); + for (k, v) in instance.0.as_object().unwrap() { + record.put(k, avro_rs::types::Value::from(v.clone())) + } + val = avro_rs::types::Value::from(record) + } else { + val = avro_rs::types::Value::from(instance.0); + } + + writer.append(val).is_ok() +} + +#[cfg(any(test, feature = "pg_test"))] +mod tests { + use pgx::*; + use std::panic; + + #[pg_test] + fn test_avro_is_valid_bad_schema() { + let result = panic::catch_unwind(|| { + let _valid = Spi::get_one::( + r#" + select avro_is_valid('{ + "type": "record", + "name": "test", + "fields": [ + {"name": "a", "type": "number", "default": 42}, + {"name": "b", "type": "string"} + ] + }'::jsonb, '{ + "a": 42, + "b": "foo" + }'::jsonb)"#, + ); + }); + assert!(result.is_err()); + } + + #[pg_test] + fn test_avro_is_valid_true() { + let valid = Spi::get_one::( + r#" + select avro_is_valid('{ + "type": "record", + "name": "test", + "fields": [ + {"name": "a", "type": "long", "default": 42}, + {"name": "b", "type": "string"} + ] + }'::jsonb, '{ + "a": 27, + "b": "foo" + }'::jsonb)"#, + ); + assert_eq!(valid, Some(true)) + } + + #[pg_test] + fn test_avro_is_valid_false() { + let valid = Spi::get_one::( + r#" + select avro_is_valid('{ + "type": "record", + "name": "test", + "fields": [ + {"name": "a", "type": "long", "default": 42}, + {"name": "b", "type": "string"} + ] + }'::jsonb, '{ + "a": "27", + "b": "foo" + }'::jsonb)"#, + ); + assert_eq!(valid, Some(false)) + } + + // #[pg_test] + // fn test_avro_get_errors() { + // let (_value, description) = Spi::get_two::( + // "select * from avro_get_errors('{\"maxLength\": 5}', '\"foobar\"'::jsonb)", + // ); + // assert_eq!( + // description, + // Some("\"foobar\" is longer than 5 characters".to_string()) + // ) + // } +} diff --git a/src/json_schema.rs b/src/json_schema.rs index 4027297..f2e10fb 100644 --- a/src/json_schema.rs +++ b/src/json_schema.rs @@ -19,10 +19,10 @@ fn json_schema_get_errors( name!(schema_path, String), ), > { - let compiled = JSONSchema::compile(&schema.0) + let parsed_schema = JSONSchema::compile(&schema.0) .unwrap_or_else(|err| panic!("Error compiling schema: {:#?}", err)); - let result = compiled + let result = parsed_schema .validate(&instance.0) .err() .into_iter() diff --git a/src/json_type_def.rs b/src/json_type_def.rs index 932b8ed..3e73af0 100644 --- a/src/json_type_def.rs +++ b/src/json_type_def.rs @@ -1,11 +1,11 @@ use jtd::Schema; use pgx::*; -use serde_json::json; #[pg_extern] fn jtd_is_valid(schema: JsonB, instance: JsonB) -> bool { - let schema = Schema::from_serde_schema(serde_json::from_value(schema.0).unwrap()).unwrap(); - let result = jtd::validate(&schema, &instance.0, Default::default()).unwrap(); + let parsed_schema = + Schema::from_serde_schema(serde_json::from_value(schema.0).unwrap()).unwrap(); + let result = jtd::validate(&parsed_schema, &instance.0, Default::default()).unwrap(); result.is_empty() } @@ -14,16 +14,12 @@ fn jtd_is_valid(schema: JsonB, instance: JsonB) -> bool { fn jtd_get_errors( schema: JsonB, instance: JsonB, -) -> impl std::iter::Iterator< - Item = ( - name!(instance_path, String), - name!(schema_path, String), - ), -> { - let schema = Schema::from_serde_schema(serde_json::from_value(schema.0).unwrap()).unwrap(); - let result = jtd::validate(&schema, &instance.0, Default::default()).unwrap(); +) -> impl std::iter::Iterator { + let parsed_schema = + Schema::from_serde_schema(serde_json::from_value(schema.0).unwrap()).unwrap(); + let result = jtd::validate(&parsed_schema, &instance.0, Default::default()).unwrap(); - let new: Vec<(JsonB, String, String, String, String)> = result + let new: Vec<(String, String)> = result .into_iter() .map(|e| { let (instance_path, schema_path) = e.into_owned_paths(); diff --git a/src/lib.rs b/src/lib.rs index 023f694..deb8838 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,6 @@ mod json_schema; mod json_type_def; +mod avro; use pgx::*;