diff --git a/.github/workflows/fly.yml b/.github/workflows/fly.yml index 4ad6f49..2e6046d 100644 --- a/.github/workflows/fly.yml +++ b/.github/workflows/fly.yml @@ -7,7 +7,7 @@ jobs: deploy: name: Deploy app to Fly runs-on: ubuntu-latest - concurrency: deploy-group # optional: ensure only one action runs at a time + concurrency: deploy-group # optional: ensure only one action runs at a time steps: - uses: actions/checkout@v3 - uses: superfly/flyctl-actions/setup-flyctl@master diff --git a/README.md b/README.md index b71abc3..bb9b3b3 100644 --- a/README.md +++ b/README.md @@ -135,4 +135,5 @@ The hosted service is provided "as is" and on an "as available" basis, with no g The author respects the privacy of users and takes data security seriously. However, the author cannot guarantee the security of any data transmitted to the hosted service and is not responsible for any breach of security or for the actions of any third parties that may obtain any personal information. By using the hosted service, you agree to accept all risks associated with the use of the service and agree not to hold the author liable for any issues, losses, or damages that may arise from its use. + diff --git a/bun.lockb b/bun.lockb index 20860df..ffc1c61 100755 Binary files a/bun.lockb and b/bun.lockb differ diff --git a/components.json b/components.json index c8e3a67..057cd68 100644 --- a/components.json +++ b/components.json @@ -14,4 +14,4 @@ "components": "src/components", "utils": "src/lib/utils" } -} \ No newline at end of file +} diff --git a/package-lock.json b/package-lock.json index 1a0d501..2d3b61d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -27,7 +27,7 @@ "@sentry/nextjs": "^7.93.0", "@t3-oss/env-nextjs": "^0.7.1", "@types/spotify-api": "^0.0.25", - "@xyflow/react": "^12.0.0-next.5", + "@xyflow/react": "^12.0.0-next.8", "bullmq": "^5.1.1", "class-variance-authority": "^0.7.0", "clsx": "^2.1.0", @@ -41,6 +41,7 @@ "next-auth": "^4.24.5", "next-themes": "^0.2.1", "radash": "^11.0.0", + "random-words": "^2.0.1", "react": "18.2.0", "react-day-picker": "^8.10.0", "react-dom": "18.2.0", @@ -50,11 +51,13 @@ "react-wrap-balancer": "^1.1.0", "sonner": "^1.3.1", "spotify-web-api-node": "^5.0.2", + "swr": "^2.2.4", "tailwind-merge": "^2.2.0", "tailwindcss-animate": "^1.0.7", "uuid": "^9.0.1", + "validator": "^13.11.0", "zod": "^3.22.4", - "zustand": "^4.4.7" + "zustand": "^4.5.0" }, "devDependencies": { "@next/eslint-plugin-next": "^14.0.3", @@ -64,6 +67,7 @@ "@types/react-dom": "^18.2.15", "@types/spotify-web-api-node": "^5.0.11", "@types/uuid": "^9.0.7", + "@types/validator": "^13.11.8", "@typescript-eslint/eslint-plugin": "^6.11.0", "@typescript-eslint/parser": "^6.11.0", "autoprefixer": "^10.4.14", @@ -5663,9 +5667,9 @@ "license": "MIT" }, "node_modules/@types/geojson": { - "version": "7946.0.13", - "resolved": "https://registry.npmjs.org/@types/geojson/-/geojson-7946.0.13.tgz", - "integrity": "sha512-bmrNrgKMOhM3WsafmbGmC+6dsF2Z308vLFsQ3a/bT8X8Sv5clVYpPars/UPq+sAaJP+5OoLAYgwbkS5QEJdLUQ==" + "version": "7946.0.14", + "resolved": "https://registry.npmjs.org/@types/geojson/-/geojson-7946.0.14.tgz", + "integrity": "sha512-WCfD5Ht3ZesJUsONdhvm84dmzWOiOzOAqOncN0++w0lBw1o8OuDNJF2McvvCef/yBqb/HYRahp1BYtODFQ8bRg==" }, "node_modules/@types/istanbul-lib-coverage": { "version": "2.0.6", @@ -5750,6 +5754,12 @@ "integrity": "sha512-WUtIVRUZ9i5dYXefDEAI7sh9/O7jGvHg7Df/5O/gtH3Yabe5odI3UWopVR1qbPXQtvOxWu3mM4XxlYeZtMWF4g==", "dev": true }, + "node_modules/@types/validator": { + "version": "13.11.8", + "resolved": "https://registry.npmjs.org/@types/validator/-/validator-13.11.8.tgz", + "integrity": "sha512-c/hzNDBh7eRF+KbCf+OoZxKbnkpaK/cKp9iLQWqB7muXtM+MtL9SUUH8vCFcLn6dH1Qm05jiexK0ofWY7TfOhQ==", + "dev": true + }, "node_modules/@types/yargs": { "version": "15.0.19", "license": "MIT", @@ -5997,11 +6007,11 @@ } }, "node_modules/@xyflow/react": { - "version": "12.0.0-next.5", - "resolved": "https://registry.npmjs.org/@xyflow/react/-/react-12.0.0-next.5.tgz", - "integrity": "sha512-lwtt2jDe9Oyq7GCHQ/A4p/RbqrvqsBcJzZfhYsOq6TRPK7/e9LAyc7trR6Tw+f5U02H01jbYUcIsfH1VWgOzmg==", + "version": "12.0.0-next.8", + "resolved": "https://registry.npmjs.org/@xyflow/react/-/react-12.0.0-next.8.tgz", + "integrity": "sha512-Jdy/0wv10ycxwQm5rESWN8Psp+FTipbIv4RTIJwAH9zG5AqGp20ESi/k3oxJmAsbQHIcGgjB2RlOhxBs2qqJLA==", "dependencies": { - "@xyflow/system": "0.0.14", + "@xyflow/system": "0.0.16", "classcat": "^5.0.3", "zustand": "^4.4.0" }, @@ -6011,9 +6021,9 @@ } }, "node_modules/@xyflow/system": { - "version": "0.0.14", - "resolved": "https://registry.npmjs.org/@xyflow/system/-/system-0.0.14.tgz", - "integrity": "sha512-dmVQujAwZyoSZTr0sQHlgro9pS+RO9AgjT2BDlyB7t/+FYQBgW3FKdTQnVh8/azcdAncWruyiPt9K/h2kpTYrA==", + "version": "0.0.16", + "resolved": "https://registry.npmjs.org/@xyflow/system/-/system-0.0.16.tgz", + "integrity": "sha512-pVWaBHAE1Ew9acmNGEdsVLNh8Z+TEBbsdaIojV+SDbmolYlCQ1UJ6Sse4k+K3rr7kgX/1GRsiEcWxOFIx0wt+w==", "dependencies": { "@types/d3": "^7.4.0", "@types/d3-drag": "^3.0.1", @@ -11626,6 +11636,14 @@ "node": ">=14.18.0" } }, + "node_modules/random-words": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/random-words/-/random-words-2.0.1.tgz", + "integrity": "sha512-nZNJAmgcFmtJMTDDIUCm/iK4R6RydC6NvALvWhYItXQrgYGk1F7Gww416LpVROFQtfVd5TaLEf4WuSsko03N7w==", + "dependencies": { + "seedrandom": "^3.0.5" + } + }, "node_modules/range-parser": { "version": "1.2.1", "license": "MIT", @@ -12149,6 +12167,11 @@ "loose-envify": "^1.1.0" } }, + "node_modules/seedrandom": { + "version": "3.0.5", + "resolved": "https://registry.npmjs.org/seedrandom/-/seedrandom-3.0.5.tgz", + "integrity": "sha512-8OwmbklUNzwezjGInmZ+2clQmExQPvomqjL7LFqOYqtmuxRgQYqOD3mHaU+MvZn5FLUeVxVfQjwLZW/n/JFuqg==" + }, "node_modules/semver": { "version": "7.5.4", "license": "ISC", @@ -12720,6 +12743,18 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/swr": { + "version": "2.2.4", + "resolved": "https://registry.npmjs.org/swr/-/swr-2.2.4.tgz", + "integrity": "sha512-njiZ/4RiIhoOlAaLYDqwz5qH/KZXVilRLvomrx83HjzCWTfa+InyfAjv05PSFxnmLzZkNO9ZfvgoqzAaEI4sGQ==", + "dependencies": { + "client-only": "^0.0.1", + "use-sync-external-store": "^1.2.0" + }, + "peerDependencies": { + "react": "^16.11.0 || ^17.0.0 || ^18.0.0" + } + }, "node_modules/tailwind-merge": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/tailwind-merge/-/tailwind-merge-2.2.0.tgz", @@ -13274,6 +13309,14 @@ "builtins": "^1.0.3" } }, + "node_modules/validator": { + "version": "13.11.0", + "resolved": "https://registry.npmjs.org/validator/-/validator-13.11.0.tgz", + "integrity": "sha512-Ii+sehpSfZy+At5nPdnyMhx78fEoPDkR2XW/zimHEL3MyGJQOCQ7WeP20jPYRz7ZCpcKLB21NxuXHF3bxjStBQ==", + "engines": { + "node": ">= 0.10" + } + }, "node_modules/wcwidth": { "version": "1.0.1", "license": "MIT", @@ -13480,9 +13523,9 @@ } }, "node_modules/zustand": { - "version": "4.4.7", - "resolved": "https://registry.npmjs.org/zustand/-/zustand-4.4.7.tgz", - "integrity": "sha512-QFJWJMdlETcI69paJwhSMJz7PPWjVP8Sjhclxmxmxv/RYI7ZOvR5BHX+ktH0we9gTWQMxcne8q1OY8xxz604gw==", + "version": "4.5.0", + "resolved": "https://registry.npmjs.org/zustand/-/zustand-4.5.0.tgz", + "integrity": "sha512-zlVFqS5TQ21nwijjhJlx4f9iGrXSL0o/+Dpy4txAP22miJ8Ti6c1Ol1RLNN98BMib83lmDH/2KmLwaNXpjrO1A==", "dependencies": { "use-sync-external-store": "1.2.0" }, @@ -13491,7 +13534,7 @@ }, "peerDependencies": { "@types/react": ">=16.8", - "immer": ">=9.0", + "immer": ">=9.0.6", "react": ">=16.8" }, "peerDependenciesMeta": { diff --git a/package.json b/package.json index 215a2b6..42f9e30 100644 --- a/package.json +++ b/package.json @@ -33,7 +33,7 @@ "@sentry/nextjs": "^7.93.0", "@t3-oss/env-nextjs": "^0.7.1", "@types/spotify-api": "^0.0.25", - "@xyflow/react": "^12.0.0-next.5", + "@xyflow/react": "^12.0.0-next.8", "bullmq": "^5.1.1", "class-variance-authority": "^0.7.0", "clsx": "^2.1.0", @@ -47,6 +47,7 @@ "next-auth": "^4.24.5", "next-themes": "^0.2.1", "radash": "^11.0.0", + "random-words": "^2.0.1", "react": "18.2.0", "react-day-picker": "^8.10.0", "react-dom": "18.2.0", @@ -56,11 +57,13 @@ "react-wrap-balancer": "^1.1.0", "sonner": "^1.3.1", "spotify-web-api-node": "^5.0.2", + "swr": "^2.2.4", "tailwind-merge": "^2.2.0", "tailwindcss-animate": "^1.0.7", "uuid": "^9.0.1", + "validator": "^13.11.0", "zod": "^3.22.4", - "zustand": "^4.4.7" + "zustand": "^4.5.0" }, "devDependencies": { "@next/eslint-plugin-next": "^14.0.3", @@ -70,6 +73,7 @@ "@types/react-dom": "^18.2.15", "@types/spotify-web-api-node": "^5.0.11", "@types/uuid": "^9.0.7", + "@types/validator": "^13.11.8", "@typescript-eslint/eslint-plugin": "^6.11.0", "@typescript-eslint/parser": "^6.11.0", "autoprefixer": "^10.4.14", diff --git a/src/app/api/nodes/route.ts b/src/app/api/nodes/route.ts deleted file mode 100644 index 8ab11a7..0000000 --- a/src/app/api/nodes/route.ts +++ /dev/null @@ -1,108 +0,0 @@ -import { NextResponse, NextRequest } from "next/server"; -import { getServerSession } from "next-auth"; -import { authOptions } from "@/server/auth"; -import { getToken } from "next-auth/jwt"; - -import { type Edge, type Node, type Position } from "@xyflow/react"; - -// const secret = process.env.NEXTAUTH_SECRET; - -export async function GET(request: NextRequest) { - // const session = await getServerSession({ req: request, ...authOptions }); - // const token = await getToken({ req: request, secret }); - - // if (session && token) { - // return NextResponse.next(); - // } else { - // return NextResponse.redirect("/api/auth/signin"); - // } - - const nodeSize = { - width: 100, - height: 40, - }; - - // this example uses some v12 features that are not released yet - const initialNodes: Node[] = [ - { - id: "1", - type: "input", - data: { label: "Node 1" }, - position: { x: 250, y: 5 }, - width: 100, - height: 40, - handles: [ - { - type: "source", - position: "bottom" as Position, - x: nodeSize.width * 0.5, - y: nodeSize.height, - width: 1, - height: 1, - }, - ], - }, - { - id: "2", - data: { label: "Node 2" }, - position: { x: 100, y: 100 }, - width: 100, - height: 40, - handles: [ - { - type: 'source', - position: 'bottom' as Position, - x: nodeSize.width * 0.5, - y: nodeSize.height, - width: 1, - height: 1, - }, - { - type: 'target', - position: 'top' as Position, - x: nodeSize.width * 0.5, - y: 0, - width: 1, - height: 1, - }, - ], - }, - { - id: "3", - data: { label: "Node 3" }, - position: { x: 400, y: 100 }, - width: 100, - height: 40, - handles: [ - { - type: 'source', - position: 'bottom' as Position, - x: nodeSize.width * 0.5, - y: nodeSize.height, - width: 1, - height: 1, - }, - { - type: 'target', - position: 'top' as Position, - x: nodeSize.width * 0.5, - y: 0, - width: 1, - height: 1, - }, - ], - }, - ]; - - const initialEdges: Edge[] = [ - { id: "e1-2", source: "1", target: "2", animated: true }, - { id: "e1-3", source: "1", target: "3", animated: true }, - ]; - - const data = { - nodes: initialNodes, - edges: initialEdges, - }; - - return NextResponse.json(data); -} diff --git a/src/app/api/user/[uid]/playlist/[playlistId]/route.ts b/src/app/api/user/[uid]/playlist/[playlistId]/route.ts index 1049a3c..180ea16 100644 --- a/src/app/api/user/[uid]/playlist/[playlistId]/route.ts +++ b/src/app/api/user/[uid]/playlist/[playlistId]/route.ts @@ -1,4 +1,3 @@ - import { NextResponse, type NextRequest } from "next/server"; import { getServerSession } from "next-auth"; import { authOptions } from "@/server/auth"; @@ -10,9 +9,9 @@ export async function GET( { params, }: { - params: { - uid: string - playlistId: string + params: { + uid: string; + playlistId: string; }; }, ) { @@ -36,7 +35,6 @@ export async function GET( ); } - const accessToken = await getAccessTokenFromProviderAccountId(params.uid); if (!accessToken) { return NextResponse.json("No access token found", { status: 500 }); @@ -70,4 +68,4 @@ export async function GET( }; return NextResponse.json(playlist); -} \ No newline at end of file +} diff --git a/src/app/api/user/[uid]/playlists/route.ts b/src/app/api/user/[uid]/playlists/route.ts index dcadb58..861d786 100644 --- a/src/app/api/user/[uid]/playlists/route.ts +++ b/src/app/api/user/[uid]/playlists/route.ts @@ -4,7 +4,6 @@ import { authOptions } from "@/server/auth"; import { getAccessTokenFromProviderAccountId } from "~/server/db/helper"; import SpotifyWebApi from "spotify-web-api-node"; import { env } from "~/env"; -import Redis from 'ioredis'; export async function GET( request: NextRequest, { diff --git a/src/app/api/workflow/[id]/route.ts b/src/app/api/workflow/[id]/route.ts index 155bf64..bd53ee5 100644 --- a/src/app/api/workflow/[id]/route.ts +++ b/src/app/api/workflow/[id]/route.ts @@ -1,8 +1,11 @@ - -import { getWorkflowJob } from "../workflowQueue"; import { NextResponse, type NextRequest } from "next/server"; import { getServerSession } from "next-auth"; import { authOptions } from "@/server/auth"; +import { db } from "@/server/db"; +import { isUUID } from "validator"; +import { Logger } from "@/lib/log"; + +const log = new Logger("/api/workflow/[id]"); export async function GET( request: NextRequest, { @@ -12,8 +15,9 @@ export async function GET( }, ) { const session = await getServerSession({ req: request, ...authOptions }); - const id = params.id; - if (!id) { + + if (!params.id || !isUUID(params.id)) { + log.error("No id provided"); return NextResponse.json( { error: "No id provided", @@ -23,6 +27,7 @@ export async function GET( } if (!session) { + log.error("Not authenticated"); return NextResponse.json( { error: "Not authenticated", @@ -31,12 +36,15 @@ export async function GET( ); } - const job = await getWorkflowJob(id); - if (!job) { - return NextResponse.json({ job: null }); + const workflow = await db.query.workflowJobs.findFirst({ + where: (workflowJobs, { eq }) => eq(workflowJobs.id, params.id), + }); + + if (!workflow) { + return NextResponse.json("Workflow not found", { status: 404 }); } - if (job.data.userId !== session.user.id) { + if (workflow.userId !== session.user.id) { return NextResponse.json( { error: "Unauthorized", @@ -45,5 +53,13 @@ export async function GET( ); } - return NextResponse.json({ job }); + const res = { + id: workflow.id, + cron: workflow.cron, + workflow: workflow.workflow ? JSON.parse(workflow.workflow) : null, + createdAt: workflow.createdAt, + }; + + log.info(`Returning workflow ${params.id} for user ${session.user.id}`); + return NextResponse.json(res); } diff --git a/src/app/api/workflow/[id]/run/route.ts b/src/app/api/workflow/[id]/run/route.ts new file mode 100644 index 0000000..56d1a74 --- /dev/null +++ b/src/app/api/workflow/[id]/run/route.ts @@ -0,0 +1,79 @@ +import { NextResponse, type NextRequest } from "next/server"; +import { getServerSession } from "next-auth"; +import { authOptions } from "@/server/auth"; +import { getAccessTokenFromUserId } from "@/server/db/helper"; +import { createWorkflowQueue } from "../../workflowQueue"; +import { Runner } from "@/lib/workflow/Workflow"; +import { db } from "@/server/db"; +import { Logger } from "@/lib/log"; + +const log = new Logger("/api/workflow/[id]/run"); +export async function POST( + request: NextRequest, + { + params, + }: { + params: { id: string }; + }, +) { + log.info("running workflow"); + const session = await getServerSession({ req: request, ...authOptions }); + if (!session) { + log.error("Not authenticated"); + return NextResponse.json({ error: "Not authenticated" }, { status: 401 }); + } + + const accessToken = await getAccessTokenFromUserId(session.user.id); + if (!accessToken) { + log.error("Unable to get access token"); + return NextResponse.json( + { error: "Unable to get access token" }, + { status: 500 }, + ); + } + const id = params.id; + const workflow = await db.query.workflowJobs.findFirst({ + where: (workflowJobs, { eq }) => eq(workflowJobs.id, id), + }); + + if (!workflow?.workflow || workflow.userId !== session.user.id) { + log.error("Unauthorized or Workflow not found"); + return NextResponse.json( + { error: "Unauthorized or Workflow not found" }, + { status: 404 }, + ); + } + + const runner = new Runner({ + slug: session.user.id, + access_token: accessToken, + }); + const workflowObj = JSON.parse(workflow.workflow) as WorkflowObject; + + workflowObj.operations = runner.sortOperations(workflowObj); + runner.validateWorkflow(workflowObj); + + try { + const job = await createWorkflowQueue( + workflowObj, + session.user.id, + workflow.id, + ); + log.info("Added job to queue", { + jobId: job.id, + workflowId: workflow.id, + userId: session.user.id, + } as any); + return NextResponse.json({ job }); + } catch (err) { + log.error("Error adding job to queue", { + error: err, + workflowId: workflow.id, + userId: session.user.id, + } as any); + return NextResponse.json( + { error: "Error adding job to queue" }, + { status: 500 }, + ); + } +} diff --git a/src/app/api/workflow/queue/[id]/route.ts b/src/app/api/workflow/queue/[id]/route.ts new file mode 100644 index 0000000..3f19f50 --- /dev/null +++ b/src/app/api/workflow/queue/[id]/route.ts @@ -0,0 +1,84 @@ +import { NextResponse, type NextRequest } from "next/server"; +import { getServerSession } from "next-auth"; +import { authOptions } from "@/server/auth"; +import { db } from "@/server/db"; +import { eq } from "drizzle-orm"; +import { workflowRuns } from "~/server/db/schema"; +export async function GET( + request: NextRequest, + { + params, + }: { + params: { id: string }; + }, +) { + const session = await getServerSession({ req: request, ...authOptions }); + const id = params.id; + if (!id) { + return NextResponse.json( + { + error: "No id provided", + }, + { status: 400 }, + ); + } + + if (!session) { + return NextResponse.json( + { + error: "Not authenticated", + }, + { status: 401 }, + ); + } + + const fields = request.nextUrl.searchParams.get("fields"); + + const columns = fields + ? fields.split(",").reduce((obj, key) => ({ ...obj, [key]: true }), {}) + : { + id: true, + workflow: true, + returnValues: true, + userId: true, + status: true, + startedAt: true, + completedAt: true, + error: true, + }; + + const workflowQuery = await db.query.workflowRuns.findFirst({ + where: eq(workflowRuns.id, id), + with: { + workflow: { + columns: { + ...columns, + id: true, + userId: true, + workflow: true, + }, + }, + }, + }); + + if (!workflowQuery) { + return NextResponse.json({ job: null }); + } + + if (workflowQuery.workflow.userId !== session.user.id) { + return NextResponse.json( + { + error: "Unauthorized", + }, + { status: 401 }, + ); + } + + if (workflowQuery.workflow.workflow) { + workflowQuery.workflow = JSON.parse(workflowQuery.workflow.workflow); + } + if (workflowQuery.returnValues) { + workflowQuery.returnValues = JSON.parse(workflowQuery.returnValues); + } + return NextResponse.json(workflowQuery); +} diff --git a/src/app/api/workflow/route.ts b/src/app/api/workflow/route.ts index 88601bc..247864b 100644 --- a/src/app/api/workflow/route.ts +++ b/src/app/api/workflow/route.ts @@ -2,30 +2,46 @@ import { NextResponse, type NextRequest } from "next/server"; import { getServerSession } from "next-auth"; import { authOptions } from "@/server/auth"; // import { Runner } from "~/lib/workflow/Workflow"; -import { operations } from '../../../lib/workflow/Workflow'; -import { type Workflow } from "~/lib/workflow/types"; +import { operations } from "../../../lib/workflow/Workflow"; import { getAccessTokenFromUserId } from "~/server/db/helper"; -import { createWorkflowQueue } from "./workflowQueue"; +import { + createWorkflowQueue, + storeWorkflowJob, + workflowExists, + updateWorkflowJob, +} from "./workflowQueue"; import { Runner } from "~/lib/workflow/Workflow"; +import { v4 as uuidv4 } from "uuid"; +import { Logger } from "@/lib/log"; + +const log = new Logger("/api/workflow"); + export async function POST(request: NextRequest) { const session = await getServerSession({ req: request, ...authOptions }); if (!session) { - return NextResponse.redirect("/api/auth/signin"); + return NextResponse.json( + { + error: "Not authenticated", + }, + { status: 401 }, + ); } - const accessToken = await getAccessTokenFromUserId( - session.user.id, - ); + const accessToken = await getAccessTokenFromUserId(session.user.id); if (!accessToken) { - return NextResponse.redirect("/api/auth/signin"); + return NextResponse.json( + { + error: "Something went wrong, unable to get access token", + }, + { status: 500 }, + ); } - console.log("Received workflow from user", session.user.id); - - let workflow: Workflow; + log.info("Received workflow from user", session.user.id); + let workflowRes: WorkflowResponse; try { - workflow = (await request.json()) as Workflow; + workflowRes = (await request.json()) as WorkflowResponse; } catch (err) { - console.error("Error parsing workflow", err); + log.error("Error parsing workflow", err); return NextResponse.json( { error: "Error parsing workflow: " + (err as Error).message }, { status: 400 }, @@ -36,36 +52,35 @@ export async function POST(request: NextRequest) { access_token: accessToken, }); + const workflow = workflowRes.workflow; + + if (!workflow) { + return NextResponse.json( + { error: "No workflow provided" }, + { status: 400 }, + ); + } + const operations = runner.sortOperations(workflow); workflow.operations = operations; runner.validateWorkflow(workflow); - let res: any; - try { - const job = await createWorkflowQueue(workflow, session.user.id); - res = { job }; - } catch (err) { - console.error("Error running workflow", err); - const errorMessage = (err as Error).message; - const errorLines = errorMessage.split("\n"); - const prettyErrors = errorLines.map((line) => { - const [errorType, operation] = line.split(" in operation: ") as [ - string, - string, - ]; - let operationObj = {}; - try { - operationObj = operation ? JSON.parse(operation) : undefined; - } catch (e) { - operationObj = operation; - } - return { - errorType: errorType.replace("Invalid ", ""), - operation: operationObj, - }; - }); - return NextResponse.json({ errors: prettyErrors }, { status: 500 }); - } + const job = { + id: workflow.id ?? uuidv4(), + data: { + workflow, + }, + status: "wait", + timestamp: Date.now(), + }; + + log.info("Storing workflow job", { + jobId: job.id, + userId: session.user.id, + } as any); - return NextResponse.json(res); + const response = (await workflowExists(job.id)) + ? await updateWorkflowJob(session.user.id, job) + : await storeWorkflowJob(session.user.id, job); + return NextResponse.json(response); } diff --git a/src/app/api/workflow/validate/route.ts b/src/app/api/workflow/validate/route.ts index 5504af1..34bce0a 100644 --- a/src/app/api/workflow/validate/route.ts +++ b/src/app/api/workflow/validate/route.ts @@ -1,30 +1,28 @@ import { NextResponse, type NextRequest } from "next/server"; import { getServerSession } from "next-auth"; import { authOptions } from "@/server/auth"; -// import { Runner } from "~/lib/workflow/Workflow"; -import { type Workflow } from "~/lib/workflow/types"; import { getAccessTokenFromUserId } from "~/server/db/helper"; import { Runner } from "~/lib/workflow/Workflow"; +import { Logger } from "@/lib/log"; + +const log = new Logger("/api/workflow/validate"); export async function POST(request: NextRequest) { const session = await getServerSession({ req: request, ...authOptions }); if (!session) { return NextResponse.redirect("/api/auth/signin"); } - const accessToken = await getAccessTokenFromUserId( - session.user.id, - ); + const accessToken = await getAccessTokenFromUserId(session.user.id); if (!accessToken) { return NextResponse.redirect("/api/auth/signin"); } - // console.log("session", session); - console.log("Received workflow from user", session.user.id); + log.info("Received workflow from user", session.user.id); - let workflow: Workflow; + let workflow: WorkflowObject; try { - workflow = (await request.json()) as Workflow; + workflow = (await request.json()) as WorkflowObject; } catch (err) { - console.error("Error parsing workflow", err); + log.error("Error parsing workflow", err); return NextResponse.json( { error: "Error parsing workflow: " + (err as Error).message }, { status: 400 }, @@ -37,8 +35,6 @@ export async function POST(request: NextRequest) { let res: any; try { - const operations = runner.sortOperations(workflow); - workflow.operations = operations; const [valid, errors] = await runner.validateWorkflow(workflow); res = { valid, errors }; } catch (err) { diff --git a/src/app/api/workflow/workflowQueue.ts b/src/app/api/workflow/workflowQueue.ts index cfb0bc2..f72d022 100644 --- a/src/app/api/workflow/workflowQueue.ts +++ b/src/app/api/workflow/workflowQueue.ts @@ -2,12 +2,13 @@ import { Queue } from "bullmq"; import Redis from "ioredis"; import { env } from "~/env"; -import { type Workflow } from "~/lib/workflow/types"; import { db } from "~/server/db"; -import { workflowJobs } from "~/server/db/schema"; +import { workflowJobs, workflowRuns } from "~/server/db/schema"; import { v4 as uuidv4 } from "uuid"; import { eq } from "drizzle-orm"; +import { Logger } from "@/lib/log"; +const log = new Logger("workflowQueue"); const connection = new Redis(env.REDIS_URL, { maxRetriesPerRequest: null, @@ -24,19 +25,60 @@ export const workflowQueue = new Queue("workflowQueue", { }, }); -async function storeWorkflowJob(userId: string, job: any) { - console.log("Storing workflow job", job.id); - const workflowJob = db.insert(workflowJobs).values({ +export async function storeWorkflowJob(userId: string, job: any) { + log.info("Storing workflow job", job.id); + await db.insert(workflowJobs).values({ id: job.id, workflow: JSON.stringify(job.data.workflow), userId, - status: job.status, + createdAt: new Date(job.timestamp as number), + }); + const res = await db.query.workflowJobs.findFirst({ + where: (workflowJobs, { eq }) => eq(workflowJobs.id, job.id as string), + }); + return res; +} + +export async function updateWorkflowJob(userId: string, job: any) { + log.info("Updating workflow job", job.id); + await db + .update(workflowJobs) + .set({ + workflow: JSON.stringify(job.data.workflow), + // cron: + }) + .where(eq(workflowJobs.id, job.id as string)); + const res = await db.query.workflowJobs.findFirst({ + where: (workflowJobs, { eq }) => eq(workflowJobs.id, job.id as string), + }); + return res; +} + +export async function workflowExists(id: string) { + const workflow = await db.query.workflowJobs.findFirst({ + where: (workflowJobs, { eq }) => eq(workflowJobs.id, id), + }); + return !!workflow; +} + +export async function storeWorkflowQueueRun(workflowId: string, job: any) { + log.info("Storing workflow run", workflowId); + await db.insert(workflowRuns).values({ + id: job.id, + workflowId, startedAt: new Date(job.timestamp as number), }); - return workflowJob; + const res = await db.query.workflowRuns.findFirst({ + where: (workflowRuns, { eq }) => eq(workflowRuns.id, job.id as string), + }); + return res; } -export async function createWorkflowQueue(workflow: Workflow, userId: string) { - try{ +export async function createWorkflowQueue( + workflow: WorkflowObject, + userId: string, + workflowId: string, +) { + try { const job = await workflowQueue.add( "workflowQueue", { workflow, userId }, @@ -51,10 +93,10 @@ export async function createWorkflowQueue(workflow: Workflow, userId: string) { }, }, ); - await storeWorkflowJob(userId, job); + await storeWorkflowQueueRun(workflowId, job); return job; } catch (err) { - console.error("Error adding job to queue", err); + log.error("Error adding job to queue", err); throw err; } } @@ -64,45 +106,82 @@ export async function getWorkflowJob(id: string) { return job; } -export async function updateWorkflowJob(jobId: string, status?: string, workerId?: string) { - try{ - console.log("Updating workflow job", jobId); - const job = await workflowQueue.getJob(jobId); - if (!job) { - throw new Error("Job not found"); - } +export async function updateWorkflowRun( + jobId: string, + status?: string, + workerId?: string, + returnValues?: any, +) { + try { + log.info("Updating workflow job", jobId); + const job = await workflowQueue.getJob(jobId); + if (!job) { + throw new Error("Job not found"); + } + + if (!status) { + if (job.finishedOn) { + status = "completed"; + } else if (job.stacktrace) { + status = "failed"; + } else if (job.processedOn) { + status = "active"; + } else if (job.delay) { + status = "delayed"; + } + } + const finished = ["completed", "failed", "cancelled"].includes(status!); - if (!status) { - if (job.finishedOn) { - status = "completed"; - } else if (job.stacktrace) { - status = "failed"; - } else if (job.processedOn) { - status = "active"; - } else if (job.delay) { - status = "delayed"; + let completedAt; + if (finished) { + completedAt = new Date(); } - } - const finished = ["completed", "failed", "cancelled"].includes(status!); + if (returnValues?.length > 0) { + returnValues = compressReturnValues(returnValues); + } - let completedAt; - if (finished) { - completedAt = new Date(); - } - - await db.update(workflowJobs).set({ - status: status, - error: job.failedReason, - completedAt: completedAt, - workerId: workerId, - }).where( - eq(workflowJobs.id, jobId), - ); - return "updated" + log.info("Updating workflow job", jobId); + + await db + .update(workflowRuns) + .set({ + status: status, + error: job.failedReason, + completedAt: completedAt, + workerId: workerId, + returnValues: JSON.stringify(returnValues), + }) + .where(eq(workflowRuns.id, jobId)); + return "updated"; } catch (err) { - console.error("Error updating job", err); + log.error("Error updating job", err); throw err; } -} \ No newline at end of file +} + +function compressReturnValues(returnValues: any) { + returnValues.forEach((obj: any) => { + delete obj.track.audio_features; + delete obj.track.available_markets; + delete obj.album.release_date_precision; + delete obj.added_by; + delete obj.video_thumbnail; + delete obj.track.preview_url; + delete obj.track.external_ids; + delete obj.track.external_urls; + + obj.track.album.artists.forEach((artist: any) => { + delete artist.external_urls; + delete artist.href; + delete artist.uri; + }); + obj.track.artists.forEach((artist: any) => { + delete artist.external_urls; + delete artist.href; + delete artist.uri; + }); + }); + return returnValues; +} diff --git a/src/app/auth/providerButtons.tsx b/src/app/auth/providerButtons.tsx index c5d80cd..140c329 100644 --- a/src/app/auth/providerButtons.tsx +++ b/src/app/auth/providerButtons.tsx @@ -83,13 +83,12 @@ const LoadingSVG = () => ( export default function ProviderButtons() { const [providers, setProviders] = useState>({}); - const isMobile = () => { if (typeof window !== "undefined") { return window.innerWidth < 768; } return false; - } + }; const { data: session, status } = useSession(); @@ -143,16 +142,14 @@ export default function ProviderButtons() { /> )} - { - status === "loading" ? ( - - - Authenticating... - - ) : ( - Sign in with {provider.name} - ) - } + {status === "loading" ? ( + + + Authenticating... + + ) : ( + Sign in with {provider.name} + )} ))} diff --git a/src/app/flow-test/page.tsx b/src/app/flow-test/page.tsx deleted file mode 100644 index 8abfc68..0000000 --- a/src/app/flow-test/page.tsx +++ /dev/null @@ -1,119 +0,0 @@ -/* eslint-disable @typescript-eslint/no-unsafe-argument */ -"use client"; -import React, { useState } from 'react'; - -const Page = () => { - const [jsonValue, setJsonValue] = useState(''); - const [response, setResponse] = useState(''); - const [jobId, setJobId] = useState(null); - - const handleTextareaChange = (event: React.ChangeEvent) => { - setJsonValue(event.target.value); - }; - - const workflow = { - name: "spotify-playlist", - sources: [ - { - id: "playlist1", - type: "playlist", - params: { - playlistId: "4rgpmMVnicF0U90eoc5sUG", - }, - }, - { - id: "playlist2", - type: "playlist", - params: { - playlistId: "1lRI38EWAozAe9ra141sPq", - }, - }, - ], - operations: [ - { - id: "filter", - type: "Filter.filter", - params: { - filterKey: "track.popularity", - filterValue: "> 20", - }, - sources: ["playlist1"], - }, - { - id: "filter2", - type: "Filter.dedupeArtists", - params: {}, - sources: ["filter"], - }, - ], - }; - - React.useEffect(() => { - let intervalId: NodeJS.Timeout; - console.log("jobId: ", jobId); - if (jobId) { - intervalId = setInterval(() => { - console.log("fetching job: ", jobId); - fetch(`/api/workflow/${jobId}`) - .then(response => response.json()) - .then(data => { - setResponse(data as string); - }) - .catch(error => { - console.error(error); - }); - }, 2500); - } - - // Clear interval on component unmount - return () => { - if (intervalId) { - clearInterval(intervalId); - } - }; - }, [jobId]); - - const handleSubmit = () => { - console.log("builtin workflow: ", JSON.stringify(workflow, null, 2)); - fetch('/api/workflow', { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - body: jsonValue - }) - .then(response => response.json()) - .then(data => { - // Handle the response from the server - setResponse(data as string); - setJobId(data.job.id); - }) - .catch(error => { - // Handle any errors - console.error(error); - }); - }; - - return ( -
-