|
| 1 | +/* Copyright (C) 2016 NooBaa */ |
| 2 | +'use strict'; |
| 3 | + |
| 4 | +const dbg = require('../../../util/debug_module')(__filename); |
| 5 | +const S3Error = require('../s3_errors').S3Error; |
| 6 | +const s3_utils = require('../s3_utils'); |
| 7 | +const { S3SelectStream } = require('../../../util/s3select'); |
| 8 | +const nb_native = require('../../../util/nb_native'); |
| 9 | +const stream_utils = require('../../../util/stream_utils'); |
| 10 | + |
| 11 | +/** |
| 12 | + * https://docs.aws.amazon.com/AmazonS3/latest/API/API_SelectObjectContent.html |
| 13 | + */ |
| 14 | +async function post_object_select(req, res) { |
| 15 | + |
| 16 | + if (!nb_native().S3Select) { |
| 17 | + throw new S3Error(S3Error.S3SelectNotCompiled); |
| 18 | + } |
| 19 | + |
| 20 | + req.object_sdk.setup_abort_controller(req, res); |
| 21 | + const agent_header = req.headers['user-agent']; |
| 22 | + const noobaa_trigger_agent = agent_header && agent_header.includes('exec-env/NOOBAA_FUNCTION'); |
| 23 | + const encryption = s3_utils.parse_encryption(req); |
| 24 | + const http_req_select_params = req.body.SelectObjectContentRequest; |
| 25 | + |
| 26 | + const md_params = { |
| 27 | + bucket: req.params.bucket, |
| 28 | + key: req.params.key, |
| 29 | + version_id: req.query.versionId, |
| 30 | + encryption, |
| 31 | + }; |
| 32 | + const object_md = await req.object_sdk.read_object_md(md_params); |
| 33 | + |
| 34 | + const params = { |
| 35 | + object_md, |
| 36 | + obj_id: object_md.obj_id, |
| 37 | + bucket: req.params.bucket, |
| 38 | + key: req.params.key, |
| 39 | + content_type: object_md.content_type, |
| 40 | + noobaa_trigger_agent, |
| 41 | + encryption, |
| 42 | + }; |
| 43 | + |
| 44 | + //handle ScanRange |
| 45 | + if (Array.isArray(http_req_select_params.ScanRange)) { |
| 46 | + const scan_range = http_req_select_params.ScanRange[0]; |
| 47 | + if (scan_range.Start) { |
| 48 | + params.start = Number(scan_range.Start); |
| 49 | + } |
| 50 | + if (scan_range.End) { |
| 51 | + if (scan_range.Start) { |
| 52 | + params.end = Number(scan_range.End); |
| 53 | + } else { |
| 54 | + //if only End is specified, start from {End} bytes from the end. |
| 55 | + params.start = object_md.size - (Number(scan_range.End)); |
| 56 | + } |
| 57 | + } |
| 58 | + } |
| 59 | + |
| 60 | + //prepare s3select stream |
| 61 | + const input_serialization = http_req_select_params.InputSerialization[0]; |
| 62 | + let input_format = null; |
| 63 | + if (input_serialization.CSV) { |
| 64 | + input_format = 'CSV'; |
| 65 | + } else if (input_serialization.JSON) { |
| 66 | + input_format = 'JSON'; |
| 67 | + } else { |
| 68 | + throw new S3Error(S3Error.MissingInputSerialization); |
| 69 | + } |
| 70 | + |
| 71 | + //currently s3select can only output in the same format as input format |
| 72 | + if (Array.isArray(http_req_select_params.OutputSerialization)) { |
| 73 | + const output_serialization = http_req_select_params.OutputSerialization[0]; |
| 74 | + if ((output_serialization.CSV && input_format !== 'CSV') || |
| 75 | + (output_serialization.JSON && input_format !== 'JSON')) { |
| 76 | + throw new S3Error(S3Error.OutputInputFormatMismatch); |
| 77 | + } |
| 78 | + } |
| 79 | + |
| 80 | + const select_args = { |
| 81 | + query: http_req_select_params.Expression[0], |
| 82 | + input_format: input_format, |
| 83 | + input_serialization_format: http_req_select_params.InputSerialization[0][input_format][0], |
| 84 | + records_header_buf: S3SelectStream.records_message_headers |
| 85 | + }; |
| 86 | + const s3select = new S3SelectStream(select_args); |
| 87 | + dbg.log3("select_args = ", select_args); |
| 88 | + |
| 89 | + //pipe s3select result into http result |
| 90 | + stream_utils.pipeline([s3select, res], true /*res is a write stream, no need for resume*/); |
| 91 | + |
| 92 | + //send s3select pipe to read_object_stream. |
| 93 | + //in some cases (currently nsfs) it will pipe object stream into our pipe (s3select) |
| 94 | + const read_stream = await req.object_sdk.read_object_stream(params, s3select); |
| 95 | + if (read_stream) { |
| 96 | + // if read_stream supports closing, then we handle abort cases such as http disconnection |
| 97 | + // by calling the close method to stop it from buffering more data which will go to waste. |
| 98 | + if (read_stream.close) { |
| 99 | + req.object_sdk.add_abort_handler(() => read_stream.close()); |
| 100 | + } |
| 101 | + read_stream.on('error', err => { |
| 102 | + dbg.error('read stream error:', err, req.path); |
| 103 | + res.destroy(err); |
| 104 | + }); |
| 105 | + //in other cases, we need to pipe the read stream ourselves |
| 106 | + stream_utils.pipeline([read_stream, s3select], true /*no need to resume s3select*/); |
| 107 | + } |
| 108 | +} |
| 109 | + |
| 110 | +module.exports = { |
| 111 | + handler: post_object_select, |
| 112 | + body: { |
| 113 | + type: 'xml', |
| 114 | + }, |
| 115 | + reply: { |
| 116 | + type: 'raw', |
| 117 | + }, |
| 118 | +}; |
0 commit comments