Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: merge user #960

Merged
merged 3 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions next/api/src/controller/customer-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import { User } from '@/model/User';
import { CustomerServiceResponse } from '@/response/customer-service';
import { GroupResponse } from '@/response/group';
import { roleService } from '@/service/role';
import { userService } from '@/user/services/user';

class FindCustomerServicePipe {
static async transform(id: string, ctx: Context): Promise<User> {
Expand Down Expand Up @@ -123,11 +124,7 @@ export class CustomerServiceController {
if (data.active) {
processQueue.push(user.update({ inactive: null }, { useMasterKey: true }));
} else {
processQueue.push(
user
.update({ inactive: true }, { useMasterKey: true })
.then(() => user.refreshSessionToken())
);
processQueue.push(userService.inactiveUser(user));
}
}

Expand Down
55 changes: 55 additions & 0 deletions next/api/src/controller/merge-user-task.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import { Context } from 'koa';
import { z } from 'zod';

import {
Body,
Controller,
Ctx,
Get,
Post,
Query,
ResponseBody,
UseMiddlewares,
} from '@/common/http';
import { ParseIntPipe, ZodValidationPipe } from '@/common/pipe';
import { adminOnly, auth } from '@/middleware';
import { userService } from '@/user/services/user';
import { MergeUserTaskResponse } from '@/response/merge-user-task';
import { MergeUserTask } from '@/model/MergeUserTask';

const MergeUserSchema = z.object({
sourceUserId: z.string(),
targetUserId: z.string(),
});

@Controller('merge-user-tasks')
@UseMiddlewares(auth, adminOnly)
export class MergeUserTaskController {
@Post()
@ResponseBody(MergeUserTaskResponse)
async createTask(
@Body(new ZodValidationPipe(MergeUserSchema))
data: z.infer<typeof MergeUserSchema>
) {
return userService.mergeUser(data.sourceUserId, data.targetUserId);
}

@Get()
@ResponseBody(MergeUserTaskResponse)
async getTasks(
@Ctx()
ctx: Context,
@Query('page', new ParseIntPipe({ min: 1 }))
page: number,
@Query('pageSize', new ParseIntPipe({ min: 1, max: 100 }))
pageSize: number
) {
const [tasks, totalCount] = await MergeUserTask.queryBuilder()
.preload('sourceUser')
.preload('targetUser')
.paginate(page, pageSize)
.findAndCount({ useMasterKey: true });
ctx.set('X-Total-Count', totalCount.toString());
return tasks;
}
}
28 changes: 28 additions & 0 deletions next/api/src/model/MergeUserTask.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { field, Model, pointerId, pointTo } from '@/orm';
import { User } from './User';

export class MergeUserTask extends Model {
@pointerId(() => User)
sourceUserId!: string;

@pointTo(() => User)
sourceUser?: User;

@pointerId(() => User)
targetUserId!: string;

@pointTo(() => User)
targetUser?: User;

@field()
mergingData!: {
authData?: Record<string, any>;
email?: string;
};

@field()
status!: string;

@field()
completedAt?: Date;
}
7 changes: 6 additions & 1 deletion next/api/src/queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ export function createQueue<T>(name: string, options?: Bull.QueueOptions): Bull.
}
return new Bull<T>(name, {
...options,
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: true,
...options?.defaultJobOptions,
},
// https://github.com/OptimalBits/bull/blob/develop/PATTERNS.md#reusing-redis-connections
createClient: (type, redisOptions) => {
switch (type) {
Expand All @@ -34,4 +39,4 @@ export function createQueue<T>(name: string, options?: Bull.QueueOptions): Bull.
});
}

export type { Queue } from 'bull';
export type { Queue, Job } from 'bull';
19 changes: 19 additions & 0 deletions next/api/src/response/merge-user-task.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { MergeUserTask } from '@/model/MergeUserTask';
import { UserResponse } from './user';

export class MergeUserTaskResponse {
constructor(private task: MergeUserTask) {}

toJSON() {
return {
id: this.task.id,
sourceUserId: this.task.sourceUserId,
sourceUser: this.task.sourceUser && new UserResponse(this.task.sourceUser).toJSON(),
targetUser: this.task.targetUser && new UserResponse(this.task.targetUser).toJSON(),
targetUserId: this.task.targetUserId,
status: this.task.status,
completedAt: this.task.completedAt?.toISOString(),
createdAt: this.task.createdAt.toISOString(),
};
}
}
55 changes: 54 additions & 1 deletion next/api/src/service/ticket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ import { differenceInMilliseconds } from 'date-fns';
import { simpleToTradition } from 'chinese-simple2traditional';

import { Config } from '@/config';
import { ACLBuilder } from '@/orm';
import { User } from '@/model/User';
import { Ticket } from '@/model/Ticket';
import { Reply } from '@/model/Reply';
import { createQueue, Queue } from '@/queue';
import { userService } from '@/user/services/user';
import { createQueue, Job, Queue } from '@/queue';
import { DetectTicketLanguageJobData } from '@/interfaces/ticket';
import { allowedTicketLanguages } from '@/utils/locale';
import { translateService } from './translate';
Expand All @@ -21,9 +24,17 @@ interface GetRepliesOptions {
count?: boolean;
}

interface TransferTicketJobData {
sourceUserId: string;
targetUserId: string;
mergeUserTaskId?: string;
}

export class TicketService {
private detectLangQueue?: Queue<DetectTicketLanguageJobData>;

private transferTicketQueue: Queue<TransferTicketJobData>;

constructor() {
if (process.env.ENABLE_TICKET_LANGUAGE_DETECT) {
console.log(`[TicketService] Ticket language detect enabled`);
Expand All @@ -37,6 +48,11 @@ export class TicketService {
return this.detectTicketLanguage(job.data.ticketId);
});
}

this.transferTicketQueue = createQueue('ticket_transfer');
this.transferTicketQueue.process((job) => {
return this.processTransferTicketJob(job);
});
}

async getReplies(
Expand Down Expand Up @@ -128,6 +144,43 @@ export class TicketService {
await this.detectLangQueue.add({ ticketId });
}
}

async addTransferTicketJob(data: TransferTicketJobData) {
await this.transferTicketQueue.add(data);
}

private async processTransferTicketJob(job: Job<TransferTicketJobData>) {
const { sourceUserId, targetUserId } = job.data;

const tickets = await Ticket.queryBuilder()
.where('author', '==', User.ptr(sourceUserId))
.includeACL(true)
.limit(50)
.find({ useMasterKey: true });

if (tickets.length === 0) {
if (job.data.mergeUserTaskId) {
userService.transferTicketsCallback(job.data.mergeUserTaskId);
}
return;
}

await Ticket.updateSome(
tickets.map((ticket) => {
const ACL = new ACLBuilder(ticket.getRawACL() || {});
ACL.disallow(sourceUserId, 'read', 'write');
ACL.allow(targetUserId, 'read', 'write');
return [ticket, { ACL: ACL, authorId: targetUserId }];
}),
{
useMasterKey: true,
}
);

await this.transferTicketQueue.add(job.data, {
delay: 2000,
});
}
}

export const ticketService = new TicketService();
100 changes: 100 additions & 0 deletions next/api/src/user/services/user.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import crypto from 'node:crypto';

import { BadRequestError } from '@/common/http';
import { User } from '@/model/User';
import { MergeUserTask } from '@/model/MergeUserTask';
import { ticketService } from '@/service/ticket';
import { CreateUserData } from '../types';

export class UserService {
Expand Down Expand Up @@ -59,6 +63,102 @@ export class UserService {

return password;
}

async inactiveUser(user: User) {
const newUser = await user.update({ inactive: true }, { useMasterKey: true });
await user.refreshSessionToken();
return newUser;
}

async mergeUser(sourceId: string, targetId: string) {
if (sourceId === targetId) {
throw new BadRequestError('源用户和目标用户不能相同');
}

const sourceUser = await User.find(sourceId, { useMasterKey: true });
if (!sourceUser) {
throw new BadRequestError('源用户不存在');
}
const sourceUserTask = await MergeUserTask.queryBuilder()
.where('sourceUser', '==', sourceUser.toPointer())
.first({ useMasterKey: true });
if (sourceUserTask) {
throw new BadRequestError('源用户已被合并');
}

const targetUser = await User.find(targetId, { useMasterKey: true });
if (!targetUser) {
throw new BadRequestError('目标用户不存在');
}

if (sourceUser.authData && targetUser.authData) {
throw new BadRequestError('authData conflict');
}
if (sourceUser.email && targetUser.email) {
throw new BadRequestError('email confilct');
}

const task = await MergeUserTask.create(
{
sourceUserId: sourceUser.id,
targetUserId: targetUser.id,
mergingData: {
authData: sourceUser.authData,
email: sourceUser.email,
},
status: 'pending',
},
{ useMasterKey: true }
);

(async () => {
await this.inactiveUser(sourceUser);
if (sourceUser.email || sourceUser.authData) {
await sourceUser.update(
{
authData: null,
email: null,
},
{ useMasterKey: true }
);
}

if (task.mergingData.email || task.mergingData.authData) {
await targetUser.update(
{
authData: task.mergingData.authData,
email: task.mergingData.email,
},
{ useMasterKey: true }
);
}

await ticketService.addTransferTicketJob({
sourceUserId: sourceUser.id,
targetUserId: targetUser.id,
mergeUserTaskId: task.id,
});

await task.update({ status: 'transfer_tickets' }, { useMasterKey: true });
})();

return task;
}

async transferTicketsCallback(mergeUserTaskId: string) {
const task = await MergeUserTask.find(mergeUserTaskId, { useMasterKey: true });
if (task) {
await task.update(
{
status: 'complete',
completedAt: new Date(),
},
{
useMasterKey: true,
}
);
}
}
}

export const userService = new UserService();
Loading