|
1 | 1 | const { NIL: SYSTEM_USER } = require('uuid');
|
2 | 2 |
|
3 | 3 | const errorToProblem = require('../components/errorToProblem');
|
4 |
| -const { addDashesToUuid, getCurrentIdentity } = require('../components/utils'); |
| 4 | +const { addDashesToUuid, getCurrentIdentity, formatS3KeyForCompare, isPrefixOfPath } = require('../components/utils'); |
5 | 5 | const utils = require('../db/models/utils');
|
6 |
| -const { bucketService, objectService, storageService, objectQueueService, userService } = require('../services'); |
| 6 | +const log = require('../components/log')(module.filename); |
| 7 | + |
| 8 | +const { |
| 9 | + bucketPermissionService, |
| 10 | + bucketService, |
| 11 | + objectService, |
| 12 | + storageService, |
| 13 | + objectQueueService, |
| 14 | + userService |
| 15 | +} = require('../services'); |
7 | 16 |
|
8 | 17 | const SERVICE = 'ObjectQueueService';
|
9 | 18 |
|
10 | 19 | /**
|
11 | 20 | * The Sync Controller
|
12 | 21 | */
|
13 | 22 | const controller = {
|
| 23 | + |
| 24 | + /** |
| 25 | + * @function syncBucketRecursive |
| 26 | + * Synchronizes all objects and subfolders found at the Key and below for the given parent folder (bucket) |
| 27 | + * NOTE: OIDC users reuire MANAGE permission to do a recursive sync on a folder |
| 28 | + * All their permissions will be copied to any NEW sub-folders created |
| 29 | + * @param {object} req Express request object |
| 30 | + * @param {object} res Express response object |
| 31 | + * @param {function} next The next callback function |
| 32 | + * @returns {function} Express middleware function |
| 33 | + */ |
| 34 | + async syncBucketRecursive(req, res, next) { |
| 35 | + try { |
| 36 | + // Wrap all sql operations in a single transaction |
| 37 | + const response = await utils.trxWrapper(async (trx) => { |
| 38 | + |
| 39 | + // curren userId |
| 40 | + const userId = await userService.getCurrentUserId( |
| 41 | + getCurrentIdentity(req.currentUser, SYSTEM_USER), |
| 42 | + SYSTEM_USER |
| 43 | + ); |
| 44 | + // parent bucket |
| 45 | + const bucketId = addDashesToUuid(req.params.bucketId); |
| 46 | + const parentBucket = await bucketService.read(bucketId); |
| 47 | + |
| 48 | + // current user's permissions on parent bucket (folder) |
| 49 | + const currentUserParentBucketPerms = userId !== SYSTEM_USER ? (await bucketPermissionService.searchPermissions({ |
| 50 | + bucketId: parentBucket.bucketId, |
| 51 | + userId: userId |
| 52 | + })).map(p => p.permCode) : []; |
| 53 | + |
| 54 | + /** |
| 55 | + * sync (ie create or delete) bucket records in COMS db to match 'folders' (S3 key prefixes) that exist in S3 |
| 56 | + */ |
| 57 | + // parent + child bucket records already in COMS db |
| 58 | + const dbChildBuckets = await bucketService.searchChildBuckets(parentBucket); |
| 59 | + let dbBuckets = [parentBucket].concat(dbChildBuckets); |
| 60 | + // 'folders' that exist below (and including) the parent 'folder' in S3 |
| 61 | + const s3Response = await storageService.listAllObjectVersions({ bucketId: bucketId, precisePath: false }); |
| 62 | + const s3Keys = [...new Set([ |
| 63 | + ...s3Response.DeleteMarkers.map(object => formatS3KeyForCompare(object.Key)), |
| 64 | + ...s3Response.Versions.map(object => formatS3KeyForCompare(object.Key)), |
| 65 | + ])]; |
| 66 | + |
| 67 | + const syncedBuckets = await this.syncBucketRecords( |
| 68 | + dbBuckets, |
| 69 | + s3Keys, |
| 70 | + parentBucket, |
| 71 | + // assign current user's permissions on parent bucket to new sub-folders (buckets) |
| 72 | + currentUserParentBucketPerms, |
| 73 | + trx |
| 74 | + ); |
| 75 | + |
| 76 | + /** |
| 77 | + * Queue objects in all the folders for syncing |
| 78 | + */ |
| 79 | + return await this.queueObjectRecords(syncedBuckets, s3Response, userId, trx); |
| 80 | + }); |
| 81 | + |
| 82 | + // return number of jobs inserted |
| 83 | + res.status(202).json(response); |
| 84 | + } catch (e) { |
| 85 | + next(errorToProblem(SERVICE, e)); |
| 86 | + } |
| 87 | + }, |
| 88 | + |
14 | 89 | /**
|
15 |
| - * @function syncBucket |
16 |
| - * Synchronizes a bucket |
| 90 | + * @function syncBucketSingle |
| 91 | + * Synchronizes objects found at the Key of the given bucket, ignoring subfolders and files after the next delimiter |
17 | 92 | * @param {object} req Express request object
|
18 | 93 | * @param {object} res Express response object
|
19 | 94 | * @param {function} next The next callback function
|
20 | 95 | * @returns {function} Express middleware function
|
21 | 96 | */
|
22 |
| - async syncBucket(req, res, next) { |
| 97 | + async syncBucketSingle(req, res, next) { |
23 | 98 | try {
|
24 |
| - // TODO: Consider adding an "all" mode for checking through all known objects and buckets for job enumeration |
25 |
| - // const allMode = isTruthy(req.query.all); |
26 | 99 | const bucketId = addDashesToUuid(req.params.bucketId);
|
| 100 | + const bucket = await bucketService.read(bucketId); |
27 | 101 | const userId = await userService.getCurrentUserId(getCurrentIdentity(req.currentUser, SYSTEM_USER), SYSTEM_USER);
|
28 | 102 |
|
29 |
| - const [dbResponse, s3Response] = await Promise.all([ |
30 |
| - objectService.searchObjects({ bucketId: bucketId }), |
31 |
| - storageService.listAllObjectVersions({ bucketId: bucketId, filterLatest: true }) |
32 |
| - ]); |
33 |
| - |
34 |
| - // Aggregate and dedupe all file paths to consider |
35 |
| - const jobs = [...new Set([ |
36 |
| - ...dbResponse.data.map(object => object.path), |
37 |
| - ...s3Response.DeleteMarkers.map(object => object.Key), |
38 |
| - ...s3Response.Versions.map(object => object.Key) |
39 |
| - ])].map(path => ({ path: path, bucketId: bucketId })); |
| 103 | + const s3Objects = await storageService.listAllObjectVersions({ bucketId: bucketId, filterLatest: true }); |
40 | 104 |
|
41 | 105 | const response = await utils.trxWrapper(async (trx) => {
|
42 |
| - await bucketService.update({ |
43 |
| - bucketId: bucketId, |
44 |
| - userId: userId, |
45 |
| - lastSyncRequestedDate: new Date().toISOString() |
46 |
| - }, trx); |
47 |
| - return await objectQueueService.enqueue({ jobs: jobs }, trx); |
| 106 | + return this.queueObjectRecords([bucket], s3Objects, userId, trx); |
48 | 107 | });
|
| 108 | + |
49 | 109 | res.status(202).json(response);
|
50 | 110 | } catch (e) {
|
51 | 111 | next(errorToProblem(SERVICE, e));
|
52 | 112 | }
|
53 | 113 | },
|
54 | 114 |
|
| 115 | + /** |
| 116 | + * @function syncBucketRecords |
| 117 | + * Synchronizes (creates / prunes) COMS db bucket records for each 'directry' found in S3 |
| 118 | + * @param {object[]} Array of Bucket models - bucket records already in COMS db before syncing |
| 119 | + * @param {string[]} s3Keys Array of key prefixes from S3 representing 'directories' |
| 120 | + * @param {object} Bucket model for the COMS db bucket record of parent bucket |
| 121 | + * @param {string[]} currentUserParentBucketPerms Array of PermCodes to add to NEW buckets |
| 122 | + * @param {object} [trx] An Objection Transaction object |
| 123 | + * @returns {string[]} And array of bucketId's for bucket records in COMS db |
| 124 | + */ |
| 125 | + async syncBucketRecords(dbBuckets, s3Keys, parentBucket, currentUserParentBucketPerms, trx) { |
| 126 | + try { |
| 127 | + // delete buckets not found in S3 from COMS db |
| 128 | + const oldDbBuckets = dbBuckets.filter(b => !s3Keys.includes(b.key)); |
| 129 | + await Promise.all( |
| 130 | + oldDbBuckets.map(dbBucket => |
| 131 | + bucketService.delete(dbBucket.bucketId, trx) |
| 132 | + .then(() => { |
| 133 | + dbBuckets = dbBuckets.filter(b => b.bucketId !== dbBucket.bucketId); |
| 134 | + }) |
| 135 | + ) |
| 136 | + ); |
| 137 | + |
| 138 | + // Create buckets only found in S3 in COMS db |
| 139 | + const newS3Keys = s3Keys.filter(k => !dbBuckets.map(b => b.key).includes(k)); |
| 140 | + await Promise.all( |
| 141 | + newS3Keys.map(s3Key => { |
| 142 | + const data = { |
| 143 | + bucketName: s3Key.substring(s3Key.lastIndexOf('/') + 1), |
| 144 | + accessKeyId: parentBucket.accessKeyId, |
| 145 | + bucket: parentBucket.bucket, |
| 146 | + endpoint: parentBucket.endpoint, |
| 147 | + key: s3Key, |
| 148 | + secretAccessKey: parentBucket.secretAccessKey, |
| 149 | + region: parentBucket.region ?? undefined, |
| 150 | + active: parentBucket.active, |
| 151 | + userId: parentBucket.createdBy ?? SYSTEM_USER, |
| 152 | + // current user has MANAGE perm on parent folder (see route.hasPermission) |
| 153 | + // ..so copy all their perms to NEW subfolders |
| 154 | + permCodes: currentUserParentBucketPerms |
| 155 | + }; |
| 156 | + return bucketService.create(data, trx) |
| 157 | + .then((dbResponse) => { |
| 158 | + dbBuckets.push(dbResponse); |
| 159 | + }); |
| 160 | + }) |
| 161 | + ); |
| 162 | + |
| 163 | + return dbBuckets; |
| 164 | + } |
| 165 | + catch (err) { |
| 166 | + log.error(err.message, { function: 'syncBucketRecords' }); |
| 167 | + throw err; |
| 168 | + } |
| 169 | + }, |
| 170 | + |
| 171 | + /** |
| 172 | + * @function queueObjectRecords |
| 173 | + * Synchronizes (creates / prunes) COMS db object records with state in S3 |
| 174 | + * @param {object[]} dbBuckets Array of Bucket models in COMS db |
| 175 | + * @param {object} s3Objects The response from storage.listAllObjectVersions - and |
| 176 | + * object containg an array of DeleteMarkers and Versions |
| 177 | + * @param {string} userId the guid of current user |
| 178 | + * @param {object} [trx] An Objection Transaction object |
| 179 | + * @returns {string[]} And array of bucketId's for bucket records in COMS db |
| 180 | + */ |
| 181 | + async queueObjectRecords(dbBuckets, s3Objects, userId, trx) { |
| 182 | + try { |
| 183 | + // get all objects in existing buckets in all 'buckets' in COMS db |
| 184 | + const dbObjects = await objectService.searchObjects({ |
| 185 | + bucketId: dbBuckets.map(b => b.bucketId) |
| 186 | + }, trx); |
| 187 | + |
| 188 | + /** |
| 189 | + * merge arrays of objects from COMS db and S3 to form an array of jobs with format: |
| 190 | + * [ { path: '/images/img3.jpg', bucketId: '123' }, { path: '/images/album1/img1.jpg', bucketId: '456' } ] |
| 191 | + */ |
| 192 | + const objects = [...new Set([ |
| 193 | + // objects already in database |
| 194 | + ...dbObjects.data.map(object => { |
| 195 | + return { |
| 196 | + path: object.path, |
| 197 | + bucketId: object.bucketId |
| 198 | + }; |
| 199 | + }), |
| 200 | + // DeleteMarkers found in S3 |
| 201 | + ...s3Objects.DeleteMarkers.map(object => { |
| 202 | + return { |
| 203 | + path: object.Key, |
| 204 | + bucketId: dbBuckets.find(b => isPrefixOfPath(b.key, object.Key))?.bucketId |
| 205 | + }; |
| 206 | + }), |
| 207 | + // Versions found in S3 |
| 208 | + ...s3Objects.Versions |
| 209 | + .filter(v => v.Size > 0) // is an file (not a 'directory') |
| 210 | + .map(object => { |
| 211 | + return { |
| 212 | + path: object.Key, |
| 213 | + bucketId: dbBuckets.find(b => isPrefixOfPath(b.key, object.Key))?.bucketId |
| 214 | + // NOTE: adding current userId will give ALL perms on new objects |
| 215 | + // and set createdBy on all downstream resources (versions, tags, meta) |
| 216 | + // userId: userId |
| 217 | + }; |
| 218 | + }), |
| 219 | + ])]; |
| 220 | + |
| 221 | + // merge and remove duplicates |
| 222 | + const jobs = [...new Map(objects.map(o => [o.path, o])).values()]; |
| 223 | + |
| 224 | + // create jobs in COMS db object_queue for each object |
| 225 | + // update 'lastSyncRequestedDate' value in COMS db for each bucket |
| 226 | + for (const bucket of dbBuckets) { |
| 227 | + await bucketService.update({ |
| 228 | + bucketId: bucket.bucketId, |
| 229 | + userId: userId, |
| 230 | + lastSyncRequestedDate: new Date().toISOString() |
| 231 | + }, trx); |
| 232 | + } |
| 233 | + return await objectQueueService.enqueue({ jobs: jobs }, trx); |
| 234 | + } |
| 235 | + catch (err) { |
| 236 | + log.error(err.message, { function: 'queueObjectRecords' }); |
| 237 | + throw err; |
| 238 | + } |
| 239 | + }, |
| 240 | + |
55 | 241 | /**
|
56 | 242 | * @function syncObject
|
57 | 243 | * Synchronizes an object
|
|
0 commit comments