-
Notifications
You must be signed in to change notification settings - Fork 86
feat: multipart import uploads #134
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Reviewer's GuideRefactors the import file upload flow to support both presigned and multipart uploads by introducing dynamic upload routing and a full chunked upload implementation. Sequence diagram for multipart import file upload processsequenceDiagram
participant User as actor User
participant AFClientService
participant APIService
participant AppFlowyCloud
User->>AFClientService: importFile(file, onProgress)
AFClientService->>APIService: createImportTask(file)
APIService->>AppFlowyCloud: POST /api/import/create
AppFlowyCloud-->>APIService: { task_id, upload_type, presigned_url?, workspace_id? }
APIService-->>AFClientService: { taskId, uploadType, presignedUrl, workspaceId }
AFClientService->>APIService: uploadImportFileAuto(file, uploadInfo, onProgress)
alt uploadType == 'presigned_url'
APIService->>APIService: uploadImportFile(presignedUrl, file, onProgress)
else uploadType == 'multipart'
APIService->>APIService: uploadImportFileMultipart(file, workspaceId, taskId, onProgress)
APIService->>AppFlowyCloud: POST /api/file_storage/{workspaceId}/create_upload
AppFlowyCloud-->>APIService: { upload_id, file_id }
loop For each chunk
APIService->>AppFlowyCloud: PUT /api/file_storage/{workspaceId}/upload_part/import/{fileId}/{uploadId}/{partNumber}
AppFlowyCloud-->>APIService: { e_tag, part_num }
end
APIService->>AppFlowyCloud: PUT /api/file_storage/{workspaceId}/complete_upload
AppFlowyCloud-->>APIService: { code }
end
APIService-->>AFClientService: (upload complete)
AFClientService-->>User: (import complete)
Class diagram for updated import upload servicesclassDiagram
class AFClientService {
+importFile(file: File, onProgress: fn)
}
class APIService {
+createImportTask(file: File)
+uploadImportFileAuto(file: File, uploadInfo, onProgress: fn)
+uploadImportFile(presignedUrl: string, file: File, onProgress: fn)
+uploadImportFileMultipart(file: File, workspaceId: string, taskId: string, onProgress: fn)
}
AFClientService --> APIService : uses
APIService <|-- uploadImportFileAuto
APIService <|-- uploadImportFileMultipart
APIService <|-- uploadImportFile
APIService <|-- createImportTask
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @strophy - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `src/application/services/js-services/http/http_api.ts:1521` </location>
<code_context>
+ for (let offset = 0; offset < file.size; offset += CHUNK_SIZE) {
+ const chunk = file.slice(offset, offset + CHUNK_SIZE);
+
+ const partResponse = await axiosInstance?.put<{
+ code: number;
+ data: { e_tag: string; part_num: number };
+ }>(
+ `/api/file_storage/${workspaceId}/upload_part/import/${fileId}/${uploadId}/${partNumber}`,
+ chunk, // Send the chunk directly as body
+ {
+ headers: {
+ 'Content-Type': 'application/octet-stream',
+ },
+ onUploadProgress: (progressEvent) => {
+ uploadedBytes = offset + (progressEvent.loaded || 0);
+ const progress = Math.min(uploadedBytes / file.size, 1);
+ onProgress(progress);
+ }
+ }
+ );
</code_context>
<issue_to_address>
Potential for unhandled partial progress reporting.
For the last chunk, progressEvent.loaded may not match the actual bytes uploaded, causing progress to be inaccurate. Please adjust the calculation to ensure progress reaches exactly 1.0 at completion.
</issue_to_address>
<suggested_fix>
<<<<<<< SEARCH
for (let offset = 0; offset < file.size; offset += CHUNK_SIZE) {
const chunk = file.slice(offset, offset + CHUNK_SIZE);
const partResponse = await axiosInstance?.put<{
code: number;
data: { e_tag: string; part_num: number };
}>(
`/api/file_storage/${workspaceId}/upload_part/import/${fileId}/${uploadId}/${partNumber}`,
chunk, // Send the chunk directly as body
{
headers: {
'Content-Type': 'application/octet-stream',
},
onUploadProgress: (progressEvent) => {
uploadedBytes = offset + (progressEvent.loaded || 0);
const progress = Math.min(uploadedBytes / file.size, 1);
onProgress(progress);
}
}
);
=======
for (let offset = 0; offset < file.size; offset += CHUNK_SIZE) {
const chunk = file.slice(offset, offset + CHUNK_SIZE);
const isLastChunk = offset + CHUNK_SIZE >= file.size;
const partResponse = await axiosInstance?.put<{
code: number;
data: { e_tag: string; part_num: number };
}>(
`/api/file_storage/${workspaceId}/upload_part/import/${fileId}/${uploadId}/${partNumber}`,
chunk, // Send the chunk directly as body
{
headers: {
'Content-Type': 'application/octet-stream',
},
onUploadProgress: (progressEvent) => {
if (isLastChunk && progressEvent.loaded === chunk.size) {
// Ensure progress is exactly 1.0 at completion of last chunk
uploadedBytes = file.size;
onProgress(1.0);
} else {
uploadedBytes = offset + (progressEvent.loaded || 0);
const progress = Math.min(uploadedBytes / file.size, 1);
onProgress(progress);
}
}
}
);
>>>>>>> REPLACE
</suggested_fix>
### Comment 2
<location> `src/application/services/js-services/http/http_api.ts:1491` </location>
<code_context>
});
}
+async function uploadImportFileMultipart(file: File, workspaceId: string, taskId: string, onProgress: (progress: number) => void) {
+ const CHUNK_SIZE = 100 * 1024 * 1024; // 100MB chunks
+ const fileId = `import_presigned_url_${taskId}`;
</code_context>
<issue_to_address>
Consider refactoring 'uploadImportFileMultipart' by extracting each major step into separate helper functions to improve readability and maintainability.
You can dramatically flatten `uploadImportFileMultipart` by extracting each step into a small helper. For example:
```ts
// helpers/fileMultipart.ts
async function initMultipart(
workspaceId: string,
fileId: string,
fileSize: number
): Promise<{ uploadId: string }> {
const resp = await axiosInstance.post<{
code: number
data: { upload_id: string }
}>(
`/api/file_storage/${workspaceId}/create_upload`,
{ parent_dir: 'import', file_id: fileId, content_type: 'application/zip', file_size: fileSize }
);
if (resp.data.code !== 0) throw new Error('create upload failed');
return { uploadId: resp.data.data.upload_id };
}
async function uploadPart(
workspaceId: string,
fileId: string,
uploadId: string,
partNumber: number,
chunk: Blob,
onProgress: (p: number) => void
): Promise<{ e_tag: string; part_number: number }> {
const resp = await axiosInstance.put<{
code: number
data: { e_tag: string; part_num: number }
}>(
`/api/file_storage/${workspaceId}/upload_part/import/${fileId}/${uploadId}/${partNumber}`,
chunk,
{
headers: { 'Content-Type': 'application/octet-stream' },
onUploadProgress(evt) {
onProgress(evt.loaded / chunk.size);
}
}
);
if (resp.data.code !== 0) throw new Error(`part ${partNumber} failed`);
return { e_tag: resp.data.data.e_tag, part_number: resp.data.data.part_num };
}
async function finalizeMultipart(
workspaceId: string,
fileId: string,
uploadId: string,
parts: { e_tag: string; part_number: number }[]
) {
const resp = await axiosInstance.put<{ code: number }>(
`/api/file_storage/${workspaceId}/complete_upload`,
{ parent_dir: 'import', file_id: fileId, upload_id: uploadId, parts }
);
if (resp.data.code !== 0) throw new Error('complete upload failed');
}
```
Then your main function reduces to:
```ts
async function uploadImportFileMultipart(
file: File,
workspaceId: string,
taskId: string,
onProgress: (p: number) => void
) {
const CHUNK = 100 * 1024 * 1024;
const fileId = `import_presigned_url_${taskId}`;
try {
const { uploadId } = await initMultipart(workspaceId, fileId, file.size);
const parts: {e_tag: string; part_number: number}[] = [];
for (let i = 0, part=1; i < file.size; i += CHUNK, part++) {
const chunk = file.slice(i, i + CHUNK);
const partData = await uploadPart(workspaceId, fileId, uploadId, part, chunk, totalProg => {
const prog = (i + totalProg * chunk.size) / file.size;
onProgress(Math.min(prog, 1));
});
parts.push(partData);
}
await finalizeMultipart(workspaceId, fileId, uploadId, parts);
onProgress(1);
} catch (e) {
return Promise.reject({ code: -1, message: `Multipart upload failed: ${e instanceof Error ? e.message : e}` });
}
}
```
This splits the three main concerns (init, upload part, finalize) into focused helpers, flattens error‐handling, and keeps every existing feature intact.
</issue_to_address>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
for (let offset = 0; offset < file.size; offset += CHUNK_SIZE) { | ||
const chunk = file.slice(offset, offset + CHUNK_SIZE); | ||
|
||
const partResponse = await axiosInstance?.put<{ | ||
code: number; | ||
data: { e_tag: string; part_num: number }; | ||
}>( | ||
`/api/file_storage/${workspaceId}/upload_part/import/${fileId}/${uploadId}/${partNumber}`, | ||
chunk, // Send the chunk directly as body | ||
{ | ||
headers: { | ||
'Content-Type': 'application/octet-stream', | ||
}, | ||
onUploadProgress: (progressEvent) => { | ||
uploadedBytes = offset + (progressEvent.loaded || 0); | ||
const progress = Math.min(uploadedBytes / file.size, 1); | ||
onProgress(progress); | ||
} | ||
} | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: Potential for unhandled partial progress reporting.
For the last chunk, progressEvent.loaded may not match the actual bytes uploaded, causing progress to be inaccurate. Please adjust the calculation to ensure progress reaches exactly 1.0 at completion.
for (let offset = 0; offset < file.size; offset += CHUNK_SIZE) { | |
const chunk = file.slice(offset, offset + CHUNK_SIZE); | |
const partResponse = await axiosInstance?.put<{ | |
code: number; | |
data: { e_tag: string; part_num: number }; | |
}>( | |
`/api/file_storage/${workspaceId}/upload_part/import/${fileId}/${uploadId}/${partNumber}`, | |
chunk, // Send the chunk directly as body | |
{ | |
headers: { | |
'Content-Type': 'application/octet-stream', | |
}, | |
onUploadProgress: (progressEvent) => { | |
uploadedBytes = offset + (progressEvent.loaded || 0); | |
const progress = Math.min(uploadedBytes / file.size, 1); | |
onProgress(progress); | |
} | |
} | |
); | |
for (let offset = 0; offset < file.size; offset += CHUNK_SIZE) { | |
const chunk = file.slice(offset, offset + CHUNK_SIZE); | |
const isLastChunk = offset + CHUNK_SIZE >= file.size; | |
const partResponse = await axiosInstance?.put<{ | |
code: number; | |
data: { e_tag: string; part_num: number }; | |
}>( | |
`/api/file_storage/${workspaceId}/upload_part/import/${fileId}/${uploadId}/${partNumber}`, | |
chunk, // Send the chunk directly as body | |
{ | |
headers: { | |
'Content-Type': 'application/octet-stream', | |
}, | |
onUploadProgress: (progressEvent) => { | |
if (isLastChunk && progressEvent.loaded === chunk.size) { | |
// Ensure progress is exactly 1.0 at completion of last chunk | |
uploadedBytes = file.size; | |
onProgress(1.0); | |
} else { | |
uploadedBytes = offset + (progressEvent.loaded || 0); | |
const progress = Math.min(uploadedBytes / file.size, 1); | |
onProgress(progress); | |
} | |
} | |
} | |
); |
@@ -1460,6 +1488,90 @@ export async function uploadImportFile(presignedUrl: string, file: File, onProgr | |||
}); | |||
} | |||
|
|||
async function uploadImportFileMultipart(file: File, workspaceId: string, taskId: string, onProgress: (progress: number) => void) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (complexity): Consider refactoring 'uploadImportFileMultipart' by extracting each major step into separate helper functions to improve readability and maintainability.
You can dramatically flatten uploadImportFileMultipart
by extracting each step into a small helper. For example:
// helpers/fileMultipart.ts
async function initMultipart(
workspaceId: string,
fileId: string,
fileSize: number
): Promise<{ uploadId: string }> {
const resp = await axiosInstance.post<{
code: number
data: { upload_id: string }
}>(
`/api/file_storage/${workspaceId}/create_upload`,
{ parent_dir: 'import', file_id: fileId, content_type: 'application/zip', file_size: fileSize }
);
if (resp.data.code !== 0) throw new Error('create upload failed');
return { uploadId: resp.data.data.upload_id };
}
async function uploadPart(
workspaceId: string,
fileId: string,
uploadId: string,
partNumber: number,
chunk: Blob,
onProgress: (p: number) => void
): Promise<{ e_tag: string; part_number: number }> {
const resp = await axiosInstance.put<{
code: number
data: { e_tag: string; part_num: number }
}>(
`/api/file_storage/${workspaceId}/upload_part/import/${fileId}/${uploadId}/${partNumber}`,
chunk,
{
headers: { 'Content-Type': 'application/octet-stream' },
onUploadProgress(evt) {
onProgress(evt.loaded / chunk.size);
}
}
);
if (resp.data.code !== 0) throw new Error(`part ${partNumber} failed`);
return { e_tag: resp.data.data.e_tag, part_number: resp.data.data.part_num };
}
async function finalizeMultipart(
workspaceId: string,
fileId: string,
uploadId: string,
parts: { e_tag: string; part_number: number }[]
) {
const resp = await axiosInstance.put<{ code: number }>(
`/api/file_storage/${workspaceId}/complete_upload`,
{ parent_dir: 'import', file_id: fileId, upload_id: uploadId, parts }
);
if (resp.data.code !== 0) throw new Error('complete upload failed');
}
Then your main function reduces to:
async function uploadImportFileMultipart(
file: File,
workspaceId: string,
taskId: string,
onProgress: (p: number) => void
) {
const CHUNK = 100 * 1024 * 1024;
const fileId = `import_presigned_url_${taskId}`;
try {
const { uploadId } = await initMultipart(workspaceId, fileId, file.size);
const parts: {e_tag: string; part_number: number}[] = [];
for (let i = 0, part=1; i < file.size; i += CHUNK, part++) {
const chunk = file.slice(i, i + CHUNK);
const partData = await uploadPart(workspaceId, fileId, uploadId, part, chunk, totalProg => {
const prog = (i + totalProg * chunk.size) / file.size;
onProgress(Math.min(prog, 1));
});
parts.push(partData);
}
await finalizeMultipart(workspaceId, fileId, uploadId, parts);
onProgress(1);
} catch (e) {
return Promise.reject({ code: -1, message: `Multipart upload failed: ${e instanceof Error ? e.message : e}` });
}
}
This splits the three main concerns (init, upload part, finalize) into focused helpers, flattens error‐handling, and keeps every existing feature intact.
As discussed on another PR, the implementation, while suitable for self hosted use case, might cause issues for managed service. We will need to investigate this more before merging. |
Description
To be merged together with AppFlowy-IO/AppFlowy-Cloud#1521
This PR changes the Notion import file upload logic to support multipart uploads for large files. The
uploadImportFileAuto
anduploadImportFileMultipart
functions now require ataskId
parameter, and the multipart upload process uses this for chunked uploads.Refactored the multipart upload implementation to ensure correct file ID usage and progress tracking.
Checklist
General
Testing
Feature-Specific
Summary by Sourcery
Add support for multipart file uploads in the Notion import flow by introducing task-based import tasks and chunked upload logic.
New Features:
Enhancements:
Tests: