Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: direct s3 storage access for PGlite #77

Draft
wants to merge 2 commits into
base: next
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/proxy/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ trap 'cleanup' EXIT
mkdir -p $S3FS_MOUNT

# Mount the S3 bucket
s3fs $AWS_S3_BUCKET $S3FS_MOUNT -o use_path_request_style -o url=$AWS_ENDPOINT_URL_S3 -o endpoint=$AWS_REGION
s3fs $AWS_S3_BUCKET $S3FS_MOUNT -o use_path_request_style -o url=$AWS_ENDPOINT_URL_S3 -o endpoint=$AWS_REGION -o use_cache=/tmp

# Check if the mount was successful
if mountpoint -q $S3FS_MOUNT; then
Expand Down
69 changes: 14 additions & 55 deletions apps/proxy/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
import { PGlite, PGliteInterface } from '@electric-sql/pglite'
import { vector } from '@electric-sql/pglite/vector'
import { mkdir, readFile, access, rm } from 'node:fs/promises'
import { mkdir, readFile } from 'node:fs/promises'
import net from 'node:net'
import { createReadStream } from 'node:fs'
import { pipeline } from 'node:stream/promises'
import { createGunzip } from 'node:zlib'
import { extract } from 'tar'
import { PostgresConnection, ScramSha256Data, TlsOptions } from 'pg-gateway'
import { createClient } from '@supabase/supabase-js'
import type { Database } from '@postgres-new/supabase'
import { findUp } from 'find-up'

const supabaseUrl = process.env.SUPABASE_URL ?? 'http://127.0.0.1:54321'
const supabaseKey = process.env.SUPABASE_SERVICE_ROLE_KEY ?? ''
const dataMount = process.env.DATA_MOUNT ?? './data'
const s3fsMount = process.env.S3FS_MOUNT ?? './s3'
const wildcardDomain = process.env.WILDCARD_DOMAIN ?? 'db.example.com'
const packageLockJsonPath = await findUp('package-lock.json')
Expand All @@ -29,11 +24,9 @@ const packageLockJson = JSON.parse(await readFile(packageLockJsonPath, 'utf8'))
}
const pgliteVersion = `(PGlite ${packageLockJson.packages['node_modules/@electric-sql/pglite'].version})`

const dumpDir = `${s3fsMount}/dbs`
const dbDir = `${s3fsMount}/dbs`
const tlsDir = `${s3fsMount}/tls`
const dbDir = `${dataMount}/dbs`

await mkdir(dumpDir, { recursive: true })
await mkdir(dbDir, { recursive: true })
await mkdir(tlsDir, { recursive: true })

Expand Down Expand Up @@ -63,15 +56,6 @@ function sendFatalError(connection: PostgresConnection, code: string, message: s
return new Error(message)
}

async function fileExists(path: string): Promise<boolean> {
try {
await access(path)
return true
} catch {
return false
}
}

const supabase = createClient<Database>(supabaseUrl, supabaseKey)

const server = net.createServer((socket) => {
Expand Down Expand Up @@ -159,51 +143,19 @@ const server = net.createServer((socket) => {
// at this point we know sniServerName is set
const databaseId = getIdFromServerName(tlsInfo!.sniServerName!)

console.log(`Serving database '${databaseId}'`)
log(`Serving database '${databaseId}'`)

const dbPath = `${dbDir}/${databaseId}`

if (!(await fileExists(dbPath))) {
console.log(`Database '${databaseId}' is not cached, downloading...`)

const dumpPath = `${dumpDir}/${databaseId}.tar.gz`

if (!(await fileExists(dumpPath))) {
connection.sendError({
severity: 'FATAL',
code: 'XX000',
message: `database ${databaseId} not found`,
})
connection.socket.end()
return
}

// Create a directory for the database
await mkdir(dbPath, { recursive: true })

try {
// Extract the .tar.gz file
await pipeline(createReadStream(dumpPath), createGunzip(), extract({ cwd: dbPath }))
} catch (error) {
console.error(error)
await rm(dbPath, { recursive: true, force: true }) // Clean up the partially created directory
connection.sendError({
severity: 'FATAL',
code: 'XX000',
message: `Error extracting database: ${(error as Error).message}`,
})
connection.socket.end()
return
}
}

log('opening database...')
db = new PGlite({
dataDir: dbPath,
extensions: {
vector,
},
})
await db.waitReady
log('database open and ready')
const { rows } = await db.query("SELECT 1 FROM pg_roles WHERE rolname = 'readonly_postgres';")
if (rows.length === 0) {
await db.exec(`
Expand All @@ -212,6 +164,7 @@ const server = net.createServer((socket) => {
`)
}
await db.close()
log('reopening database...')
db = new PGlite({
dataDir: dbPath,
username: 'readonly_postgres',
Expand All @@ -220,6 +173,7 @@ const server = net.createServer((socket) => {
},
})
await db.waitReady
log('database reopened and ready')
},
async onMessage(data, { isAuthenticated }) {
// Only forward messages to PGlite after authentication
Expand All @@ -239,11 +193,16 @@ const server = net.createServer((socket) => {
})

socket.on('close', async () => {
console.log('Client disconnected')
log('Client disconnected')
await db?.close()
})
})

server.listen(5432, async () => {
console.log('Server listening on port 5432')
log('Server listening on port 5432')
})

function log(message: string) {
const timestamp = new Date().toISOString()
console.log(`[${timestamp}] ${message}`)
}
41 changes: 32 additions & 9 deletions apps/web/app/api/databases/[id]/route.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { DeleteObjectCommand, S3Client } from '@aws-sdk/client-s3'
import { S3Client, ListObjectsV2Command, DeleteObjectsCommand } from '@aws-sdk/client-s3'
import { NextRequest, NextResponse } from 'next/server'
import { createClient } from '~/utils/supabase/server'

Expand Down Expand Up @@ -53,17 +53,40 @@ export async function DELETE(

await supabase.from('deployed_databases').delete().eq('database_id', databaseId)

const key = `dbs/${databaseId}.tar.gz`
try {
await s3Client.send(
new DeleteObjectCommand({
async function recursiveDelete(token?: string) {
// get the files
const listCommand = new ListObjectsV2Command({
Bucket: process.env.AWS_S3_BUCKET,
Prefix: `dbs/${databaseId}`,
ContinuationToken: token,
})
let list = await s3Client.send(listCommand)
if (list.KeyCount) {
// if items to delete
// delete the files
const deleteCommand = new DeleteObjectsCommand({
Bucket: process.env.AWS_S3_BUCKET,
Key: key,
Delete: {
Objects: list.Contents!.map((item) => ({ Key: item.Key })),
Quiet: false,
},
})
)
} catch (error) {
console.error(`Error deleting S3 object ${key}:`, error)
let deleted = await s3Client.send(deleteCommand)

// log any errors deleting files
if (deleted.Errors) {
deleted.Errors.map((error) =>
console.log(`${error.Key} could not be deleted - ${error.Code}`)
)
}
}
// repeat if more files to delete
if (list.NextContinuationToken) {
await recursiveDelete(list.NextContinuationToken)
}
}
// start the recursive function
await recursiveDelete()

return NextResponse.json({
success: true,
Expand Down
75 changes: 46 additions & 29 deletions apps/web/app/api/databases/[id]/upload/route.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import { S3Client } from '@aws-sdk/client-s3'
import { S3Client, CompleteMultipartUploadCommandOutput } from '@aws-sdk/client-s3'
import { Upload } from '@aws-sdk/lib-storage'
import { NextRequest, NextResponse } from 'next/server'
import { createGzip } from 'zlib'
import { Readable } from 'stream'
import { createClient } from '~/utils/supabase/server'
import { createScramSha256Data } from 'pg-gateway'
import { generateDatabasePassword } from '~/utils/generate-database-password'
import { entries } from 'streaming-tar'

const wildcardDomain = process.env.NEXT_PUBLIC_WILDCARD_DOMAIN ?? 'db.example.com'
const s3Client = new S3Client({ forcePathStyle: true })
Expand Down Expand Up @@ -62,6 +61,7 @@ export async function POST(
)
}

// TODO: we should check the size of the uncompressed tarball
const dumpSizeInMB = dump.size / (1024 * 1024)
if (dumpSizeInMB > 100) {
return NextResponse.json(
Expand All @@ -74,21 +74,51 @@ export async function POST(
}

const databaseId = params.id
const key = `dbs/${databaseId}.tar.gz`

const gzip = createGzip()
const body = Readable.from(streamToAsyncIterable(dump.stream()))
const directoryPrefix = `dbs/${databaseId}`
const tarEntryStream =
dump.type === 'application/x-gzip'
? dump.stream().pipeThrough(new DecompressionStream('gzip'))
: dump.stream()
const uploads: Promise<CompleteMultipartUploadCommandOutput>[] = []

for await (const entry of entries(tarEntryStream)) {
let upload: Upload

switch (entry.type) {
case 'file': {
const buffer = new Uint8Array(await entry.arrayBuffer())
upload = new Upload({
client: s3Client,
params: {
Bucket: process.env.AWS_S3_BUCKET,
Key: `${directoryPrefix}${entry.name}`,
Body: buffer,
},
})
break
}
case 'directory': {
// Directories end in '/' and have an empty body
upload = new Upload({
client: s3Client,
params: {
Bucket: process.env.AWS_S3_BUCKET,
Key: `${directoryPrefix}${entry.name}/`,
Body: new Uint8Array(),
},
})
await entry.skip()
break
}
default: {
continue
}
}

const upload = new Upload({
client: s3Client,
params: {
Bucket: process.env.AWS_S3_BUCKET,
Key: key,
Body: body.pipe(gzip),
},
})
uploads.push(upload.done())
}

await upload.done()
await Promise.all(uploads)

const { data: existingDeployedDatabase } = await supabase
.from('deployed_databases')
Expand Down Expand Up @@ -127,16 +157,3 @@ export async function POST(
},
})
}

async function* streamToAsyncIterable(stream: ReadableStream) {
const reader = stream.getReader()
try {
while (true) {
const { done, value } = await reader.read()
if (done) return
yield value
}
} finally {
reader.releaseLock()
}
}