@@ -5,10 +5,12 @@ const _ = require('lodash');
5
5
const util = require ( 'util' ) ;
6
6
7
7
// const P = require('../util/promise');
8
+ const stream_utils = require ( '../util/stream_utils' ) ;
8
9
const dbg = require ( '../util/debug_module' ) ( __filename ) ;
10
+ const S3Error = require ( '../endpoint/s3/s3_errors' ) . S3Error ;
9
11
//TODO: why do we what to use the wrap and not directly @google -cloud/storage ?
10
12
const GoogleCloudStorage = require ( '../util/google_storage_wrap' ) ;
11
- const S3Error = require ( '../endpoint/s3/s3_errors' ) . S3Error ;
13
+ const endpoint_stats_collector = require ( './endpoint_stats_collector' ) ;
12
14
13
15
/**
14
16
* @implements {nb.Namespace}
@@ -32,6 +34,7 @@ class NamespaceGCP {
32
34
this . bucket = target_bucket ;
33
35
this . rpc_client = rpc_client ;
34
36
this . access_mode = access_mode ;
37
+ this . stats_collector = endpoint_stats_collector . instance ( this . rpc_client ) ;
35
38
}
36
39
37
40
get_write_resource ( ) {
@@ -104,7 +107,68 @@ class NamespaceGCP {
104
107
105
108
async upload_object ( params , object_sdk ) {
106
109
dbg . log0 ( 'NamespaceGCP.upload_object:' , this . bucket , inspect ( params ) ) ;
107
- throw new S3Error ( S3Error . NotImplemented ) ;
110
+
111
+ let metadata ;
112
+ if ( params . copy_source ) {
113
+ dbg . error ( 'NamespaceGCP.upload_object: Copy is not implemented yet' , this . bucket , inspect ( params . copy_source ) ) ;
114
+ throw new S3Error ( S3Error . NotImplemented ) ;
115
+ } else {
116
+ try {
117
+ let count = 1 ;
118
+ const count_stream = stream_utils . get_tap_stream ( data => {
119
+ this . stats_collector . update_namespace_write_stats ( {
120
+ namespace_resource_id : this . namespace_resource_id ,
121
+ bucket_name : params . bucket ,
122
+ size : data . length ,
123
+ count
124
+ } ) ;
125
+ // clear count for next updates
126
+ count = 0 ;
127
+ } ) ;
128
+ const file = this . gcs . bucket ( this . bucket ) . file ( params . key ) ;
129
+ // https://googleapis.dev/nodejs/storage/latest/File.html#createWriteStream
130
+ // for the options of createWriteStream:
131
+ // https://googleapis.dev/nodejs/storage/latest/global.html#CreateWriteStreamOptions
132
+ const options = {
133
+ metadata : {
134
+ contentType : params . content_type ,
135
+ md5Hash : params . md5_b64 ,
136
+ }
137
+ } ;
138
+ const writeStream = file . createWriteStream ( options ) ;
139
+ params . source_stream . pipe ( count_stream ) . pipe ( writeStream ) ;
140
+
141
+ await new Promise ( ( resolve , reject ) => {
142
+ //throw the error on error and reject the promise
143
+ writeStream . on ( 'error' , err => {
144
+ reject ( err ) ;
145
+ throw err ;
146
+ } ) ;
147
+ // upon finish get the metadata
148
+ writeStream . on ( 'finish' , async ( ) => {
149
+ try {
150
+ [ metadata ] = await file . getMetadata ( ) ;
151
+ dbg . log1 ( `NamespaceGCP.upload_object: ${ params . key } uploaded to ${ this . bucket } .` ) ;
152
+ resolve ( ) ;
153
+ } catch ( err ) {
154
+ reject ( err ) ;
155
+ throw err ;
156
+ }
157
+ } ) ;
158
+ } ) ;
159
+
160
+ } catch ( err ) {
161
+ this . _translate_error_code ( err ) ;
162
+ dbg . warn ( 'NamespaceGCP.upload_object:' , inspect ( err ) ) ;
163
+ object_sdk . rpc_client . pool . update_issues_report ( {
164
+ namespace_resource_id : this . namespace_resource_id ,
165
+ error_code : err . code || ( err . errors [ 0 ] && err . errors [ 0 ] . reason ) || 'InternalError' ,
166
+ time : Date . now ( ) ,
167
+ } ) ;
168
+ throw err ;
169
+ }
170
+ }
171
+ return this . _get_gcp_object_info ( metadata ) ;
108
172
}
109
173
110
174
/////////////////////////////
@@ -157,7 +221,14 @@ class NamespaceGCP {
157
221
async delete_object ( params , object_sdk ) {
158
222
// https://googleapis.dev/nodejs/storage/latest/File.html#delete
159
223
dbg . log0 ( 'NamespaceGCP.delete_object:' , this . bucket , inspect ( params ) ) ;
160
- throw new S3Error ( S3Error . NotImplemented ) ;
224
+ try {
225
+ const res = await this . gcs . bucket ( this . bucket ) . file ( params . key ) . delete ( ) ;
226
+ dbg . log1 ( 'NamespaceGCP.delete_object:' , this . bucket , inspect ( params ) , 'res' , inspect ( res ) ) ;
227
+ return { } ;
228
+ } catch ( err ) {
229
+ this . _translate_error_code ( err ) ;
230
+ throw err ;
231
+ }
161
232
}
162
233
163
234
async delete_multiple_objects ( params , object_sdk ) {
@@ -215,6 +286,25 @@ class NamespaceGCP {
215
286
216
287
//TODO: add here the internal functions
217
288
289
+
290
+ /**
291
+ * @returns {nb.ObjectInfo }
292
+ */
293
+ _get_gcp_object_info ( metadata ) {
294
+ dbg . log1 ( `_get_gcp_object_info: metadata: ${ inspect ( metadata ) } ` ) ;
295
+ return {
296
+ obj_id : metadata . id ,
297
+ bucket : metadata . bucket ,
298
+ key : metadata . name ,
299
+ size : metadata . size ,
300
+ etag : metadata . etag ,
301
+ create_time : metadata . timeCreated ,
302
+ last_modified_time : metadata . updated ,
303
+ content_type : metadata . contentType ,
304
+ md5_b64 : metadata . md5Hash ,
305
+ } ;
306
+ }
307
+
218
308
_translate_error_code ( err ) {
219
309
// https://cloud.google.com/storage/docs/json_api/v1/status-codes
220
310
if ( err . code === 404 ) err . rpc_code = 'NO_SUCH_OBJECT' ;
0 commit comments