@@ -15,20 +15,19 @@ import cats.implicits._
15
15
import cats .data .{EitherT , NonEmptyList }
16
16
import cats .effect .Clock
17
17
import com .snowplowanalytics .iglu .client .Resolver
18
+ import com .snowplowanalytics .iglu .client .resolver .SchemaContentList
18
19
import com .snowplowanalytics .iglu .client .resolver .registries .RegistryLookup
19
- import com .snowplowanalytics .iglu .client .resolver .Resolver .{ ResolverResult , SchemaListKey }
20
- import com .snowplowanalytics .iglu .core .{SchemaCriterion , SchemaKey , SchemaList , SchemaMap , SelfDescribingSchema }
20
+ import com .snowplowanalytics .iglu .client .resolver .Resolver .ResolverResult
21
+ import com .snowplowanalytics .iglu .core .{SchemaKey , SchemaMap , SelfDescribingSchema }
21
22
import com .snowplowanalytics .iglu .schemaddl .jsonschema .Schema
22
23
import com .snowplowanalytics .iglu .schemaddl .parquet .{Field , FieldValue }
23
24
import com .snowplowanalytics .iglu .schemaddl .redshift ._
24
25
import com .snowplowanalytics .snowplow .analytics .scalasdk .Event
25
26
import com .snowplowanalytics .snowplow .badrows .{BadRow , FailureDetails , Processor }
26
27
import com .snowplowanalytics .snowplow .badrows .FailureDetails .LoaderIgluError
27
- import com .snowplowanalytics .snowplow .badrows .FailureDetails .LoaderIgluError .SchemaListNotFound
28
28
import com .snowplowanalytics .snowplow .rdbloader .common .Common .AtomicSchema
29
29
import com .snowplowanalytics .snowplow .rdbloader .common .LoaderMessage .TypesInfo .Shredded .ShreddedFormat
30
30
import com .snowplowanalytics .snowplow .rdbloader .common .SchemaProvider
31
- import com .snowplowanalytics .snowplow .rdbloader .common .SchemaProvider .SchemaWithKey
32
31
33
32
/** Represents transformed data in blob storage */
34
33
@@ -151,25 +150,13 @@ object Transformed {
151
150
152
151
def getShredModel [F [_]: Monad : Clock : RegistryLookup ](
153
152
schemaKey : SchemaKey ,
154
- schemaKeys : List [SchemaKey ],
155
- resolver : Resolver [F ]
153
+ schemaContentList : SchemaContentList
156
154
): EitherT [F , LoaderIgluError , ShredModel ] =
157
- schemaKeys
158
- .traverse { sk =>
159
- SchemaProvider
160
- .getSchema(resolver, sk)
161
- .map(schema => SchemaWithKey (sk, schema))
162
- }
163
- .flatMap { schemaWithKeyList =>
164
- EitherT
165
- .fromOption[F ][FailureDetails .LoaderIgluError , NonEmptyList [SchemaWithKey ]](
166
- NonEmptyList .fromList(schemaWithKeyList),
167
- FailureDetails .LoaderIgluError .InvalidSchema (schemaKey, s " Empty resolver response for $schemaKey" )
168
- )
169
- .map { nel =>
170
- val schemas = nel.map(swk => SelfDescribingSchema [Schema ](SchemaMap (swk.schemaKey), swk.schema))
171
- foldMapRedshiftSchemas(schemas)(schemaKey)
172
- }
155
+ EitherT
156
+ .fromEither[F ](SchemaProvider .parseSchemaJsons(schemaContentList))
157
+ .map { nel =>
158
+ val schemas = nel.map(swk => SelfDescribingSchema [Schema ](SchemaMap (swk.schemaKey), swk.schema))
159
+ foldMapRedshiftSchemas(schemas)(schemaKey)
173
160
}
174
161
175
162
/**
@@ -179,32 +166,27 @@ object Transformed {
179
166
schemaKey : SchemaKey ,
180
167
shredModelCache : ShredModelCache [F ],
181
168
resolver : => Resolver [F ]
182
- ): EitherT [F , LoaderIgluError , ShredModel ] = {
183
- val criterion = SchemaCriterion (schemaKey.vendor, schemaKey.name, schemaKey.format, Some (schemaKey.version.model), None , None )
184
-
185
- EitherT (resolver.listSchemasResult(schemaKey.vendor, schemaKey.name, schemaKey.version.model, Some (schemaKey)))
186
- .leftMap(error => SchemaListNotFound (criterion, error))
169
+ ): EitherT [F , LoaderIgluError , ShredModel ] =
170
+ EitherT (resolver.lookupSchemasUntilResult(schemaKey))
171
+ .leftMap(e => SchemaProvider .resolverBadRow(e.schemaKey)(e.error))
187
172
.flatMap {
188
- case cached : ResolverResult .Cached [SchemaListKey , SchemaList ] =>
189
- lookupInCache(schemaKey, resolver, shredModelCache, cached)
190
- case ResolverResult .NotCached (schemaList) =>
191
- val schemaKeys = schemaList.schemas
192
- getShredModel(schemaKey, schemaKeys, resolver)
173
+ case cached : ResolverResult .Cached [SchemaKey , SchemaContentList ] =>
174
+ lookupInCache(schemaKey, shredModelCache, cached)
175
+ case ResolverResult .NotCached (schemaContentList) =>
176
+ getShredModel(schemaKey, schemaContentList)
193
177
}
194
- }
195
178
196
179
def lookupInCache [F [_]: Monad : Clock : RegistryLookup ](
197
180
schemaKey : SchemaKey ,
198
- resolver : Resolver [F ],
199
181
shredModelCache : ShredModelCache [F ],
200
- cached : ResolverResult .Cached [SchemaListKey , SchemaList ]
182
+ cached : ResolverResult .Cached [SchemaKey , SchemaContentList ]
201
183
) = {
202
184
val key = (schemaKey, cached.timestamp)
203
185
EitherT .liftF(shredModelCache.get(key)).flatMap {
204
186
case Some (model) =>
205
187
EitherT .pure[F , FailureDetails .LoaderIgluError ](model)
206
188
case None =>
207
- getShredModel(schemaKey, cached.value.schemas, resolver )
189
+ getShredModel[ F ] (schemaKey, cached.value)
208
190
.semiflatTap(props => shredModelCache.put(key, props))
209
191
}
210
192
}
0 commit comments