11import { z } from 'zod' ;
22import { assertAuthentication , TrpcInstance } from '@backend/api-front/server' ;
3- import { SchemaSite , SchemaSitePartitions , SitePartitions } from '@backend/lib/models/site' ;
3+ import { SchemaSite } from '@backend/lib/models/site' ;
44import { LambdaEnvironment } from '@backend/api-front/environment' ;
5- import { getAthenaClient , getS3Client } from '@backend/lib/utils/lazy_aws' ;
6- import { orderBy } from 'lodash' ;
7- import { DateUtils } from '@backend/lib/utils/date_utils' ;
8- import { AthenaBase } from '@backend/lib/utils/athena_base' ;
95
106export function sites ( trpcInstance : TrpcInstance ) {
117 return trpcInstance . procedure
@@ -17,96 +13,3 @@ export function sites(trpcInstance: TrpcInstance) {
1713 return LambdaEnvironment . SITES ;
1814 } ) ;
1915}
20-
21- export function sitesGetPartitions ( trpcInstance : TrpcInstance ) {
22- return trpcInstance . procedure
23- . input ( z . undefined ( ) )
24- . output ( SchemaSitePartitions )
25- . query ( async ( { ctx } ) => {
26- assertAuthentication ( ctx ) ;
27-
28- const athenaClient = getAthenaClient ( ) ;
29- const s3Client = getS3Client ( ) ;
30- const athenaWrapper = new AthenaBase (
31- athenaClient ,
32- s3Client ,
33- LambdaEnvironment . ANALYTICS_GLUE_DB_NAME ,
34- LambdaEnvironment . ANALYTICS_BUCKET_ATHENA_PATH
35- ) ;
36-
37- const res = await athenaWrapper . query ( 'SELECT * FROM "page_views$partitions" ORDER BY site, year, month' ) ;
38- return res . data as SitePartitions ;
39- } ) ;
40- }
41-
42- async function getPartitions ( athenaClient : AthenaBase ) {
43- return ( await athenaClient . query ( 'SELECT * FROM "page_views$partitions" ORDER BY site, year, month' ) )
44- . data as SitePartitions ;
45- }
46- export function sitesUpdatePartition ( trpcInstance : TrpcInstance ) {
47- return trpcInstance . procedure
48- . input ( z . object ( { forceRepair : z . boolean ( ) } ) )
49- . output ( SchemaSitePartitions )
50- . mutation ( async ( { input, ctx } ) => {
51- assertAuthentication ( ctx ) ;
52-
53- const athenaClient = getAthenaClient ( ) ;
54- const s3Client = getS3Client ( ) ;
55- const athenaWrapper = new AthenaBase (
56- athenaClient ,
57- s3Client ,
58- LambdaEnvironment . ANALYTICS_GLUE_DB_NAME ,
59- LambdaEnvironment . ANALYTICS_BUCKET_ATHENA_PATH
60- ) ;
61-
62- let partitions = await getPartitions ( athenaWrapper ) ;
63- if ( ! partitions . length || input . forceRepair ) {
64- /* Auto discover/repair all indexes aka partitions */
65- await athenaWrapper . query ( 'MSCK REPAIR TABLE page_views' ) ;
66- partitions = await getPartitions ( athenaWrapper ) ;
67- }
68-
69- if ( input . forceRepair ) {
70- return partitions ;
71- }
72- if ( ! partitions . length ) {
73- return [ ] ;
74- }
75-
76- const earliestPartition = orderBy ( partitions , [ 'year' , 'month' ] , [ 'asc' , 'asc' ] ) [ 0 ] ;
77-
78- /* Get a list of date partitions between the first partition and now */
79- const earliestPartitionDate = new Date ( Date . UTC ( earliestPartition . year , earliestPartition . month - 1 ) ) ;
80- const now = DateUtils . now ( ) ;
81- const months = DateUtils . getMonthsBetweenDates ( earliestPartitionDate , now ) ;
82-
83- let partitionsAdded = false ;
84- for ( const site of LambdaEnvironment . SITES ) {
85- const partitionsToAdd = [ ] ;
86- for ( const month of months ) {
87- const hasPartition = partitions . find (
88- ( row ) => row . site === site && row . year === month . getFullYear ( ) && row . month === month . getMonth ( ) + 1
89- ) ;
90- if ( hasPartition ) {
91- continue ;
92- }
93-
94- partitionsToAdd . push (
95- `PARTITION (site = '${ site } ', year = ${ month . getFullYear ( ) } , month = ${ month . getMonth ( ) + 1 } )`
96- ) ;
97- }
98-
99- if ( partitionsToAdd . length > 0 ) {
100- partitionsAdded = true ;
101- // console.log(`ALTER TABLE page_views ADD IF NOT EXISTS ${partitionsToAdd.join("\n")};`)
102- await athenaWrapper . query ( `ALTER TABLE page_views ADD IF NOT EXISTS ${ partitionsToAdd . join ( '\n' ) } ;` ) ;
103- }
104- }
105-
106- if ( partitionsAdded ) {
107- partitions = await getPartitions ( athenaWrapper ) ;
108- }
109-
110- return partitions ;
111- } ) ;
112- }
0 commit comments