Skip to content

Commit

Permalink
feat: merge user (#960)
Browse files Browse the repository at this point in the history
  • Loading branch information
sdjdd authored Jan 23, 2024
1 parent 099fa37 commit 0b0b2c2
Show file tree
Hide file tree
Showing 12 changed files with 538 additions and 7 deletions.
7 changes: 2 additions & 5 deletions next/api/src/controller/customer-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import { User } from '@/model/User';
import { CustomerServiceResponse } from '@/response/customer-service';
import { GroupResponse } from '@/response/group';
import { roleService } from '@/service/role';
import { userService } from '@/user/services/user';

class FindCustomerServicePipe {
static async transform(id: string, ctx: Context): Promise<User> {
Expand Down Expand Up @@ -123,11 +124,7 @@ export class CustomerServiceController {
if (data.active) {
processQueue.push(user.update({ inactive: null }, { useMasterKey: true }));
} else {
processQueue.push(
user
.update({ inactive: true }, { useMasterKey: true })
.then(() => user.refreshSessionToken())
);
processQueue.push(userService.inactiveUser(user));
}
}

Expand Down
55 changes: 55 additions & 0 deletions next/api/src/controller/merge-user-task.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import { Context } from 'koa';
import { z } from 'zod';

import {
Body,
Controller,
Ctx,
Get,
Post,
Query,
ResponseBody,
UseMiddlewares,
} from '@/common/http';
import { ParseIntPipe, ZodValidationPipe } from '@/common/pipe';
import { adminOnly, auth } from '@/middleware';
import { userService } from '@/user/services/user';
import { MergeUserTaskResponse } from '@/response/merge-user-task';
import { MergeUserTask } from '@/model/MergeUserTask';

const MergeUserSchema = z.object({
sourceUserId: z.string(),
targetUserId: z.string(),
});

@Controller('merge-user-tasks')
@UseMiddlewares(auth, adminOnly)
export class MergeUserTaskController {
@Post()
@ResponseBody(MergeUserTaskResponse)
async createTask(
@Body(new ZodValidationPipe(MergeUserSchema))
data: z.infer<typeof MergeUserSchema>
) {
return userService.mergeUser(data.sourceUserId, data.targetUserId);
}

@Get()
@ResponseBody(MergeUserTaskResponse)
async getTasks(
@Ctx()
ctx: Context,
@Query('page', new ParseIntPipe({ min: 1 }))
page: number,
@Query('pageSize', new ParseIntPipe({ min: 1, max: 100 }))
pageSize: number
) {
const [tasks, totalCount] = await MergeUserTask.queryBuilder()
.preload('sourceUser')
.preload('targetUser')
.paginate(page, pageSize)
.findAndCount({ useMasterKey: true });
ctx.set('X-Total-Count', totalCount.toString());
return tasks;
}
}
28 changes: 28 additions & 0 deletions next/api/src/model/MergeUserTask.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { field, Model, pointerId, pointTo } from '@/orm';
import { User } from './User';

export class MergeUserTask extends Model {
@pointerId(() => User)
sourceUserId!: string;

@pointTo(() => User)
sourceUser?: User;

@pointerId(() => User)
targetUserId!: string;

@pointTo(() => User)
targetUser?: User;

@field()
mergingData!: {
authData?: Record<string, any>;
email?: string;
};

@field()
status!: string;

@field()
completedAt?: Date;
}
7 changes: 6 additions & 1 deletion next/api/src/queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ export function createQueue<T>(name: string, options?: Bull.QueueOptions): Bull.
}
return new Bull<T>(name, {
...options,
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: true,
...options?.defaultJobOptions,
},
// https://github.com/OptimalBits/bull/blob/develop/PATTERNS.md#reusing-redis-connections
createClient: (type, redisOptions) => {
switch (type) {
Expand All @@ -34,4 +39,4 @@ export function createQueue<T>(name: string, options?: Bull.QueueOptions): Bull.
});
}

export type { Queue } from 'bull';
export type { Queue, Job } from 'bull';
19 changes: 19 additions & 0 deletions next/api/src/response/merge-user-task.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { MergeUserTask } from '@/model/MergeUserTask';
import { UserResponse } from './user';

export class MergeUserTaskResponse {
constructor(private task: MergeUserTask) {}

toJSON() {
return {
id: this.task.id,
sourceUserId: this.task.sourceUserId,
sourceUser: this.task.sourceUser && new UserResponse(this.task.sourceUser).toJSON(),
targetUser: this.task.targetUser && new UserResponse(this.task.targetUser).toJSON(),
targetUserId: this.task.targetUserId,
status: this.task.status,
completedAt: this.task.completedAt?.toISOString(),
createdAt: this.task.createdAt.toISOString(),
};
}
}
55 changes: 54 additions & 1 deletion next/api/src/service/ticket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ import { differenceInMilliseconds } from 'date-fns';
import { simpleToTradition } from 'chinese-simple2traditional';

import { Config } from '@/config';
import { ACLBuilder } from '@/orm';
import { User } from '@/model/User';
import { Ticket } from '@/model/Ticket';
import { Reply } from '@/model/Reply';
import { createQueue, Queue } from '@/queue';
import { userService } from '@/user/services/user';
import { createQueue, Job, Queue } from '@/queue';
import { DetectTicketLanguageJobData } from '@/interfaces/ticket';
import { allowedTicketLanguages } from '@/utils/locale';
import { translateService } from './translate';
Expand All @@ -21,9 +24,17 @@ interface GetRepliesOptions {
count?: boolean;
}

interface TransferTicketJobData {
sourceUserId: string;
targetUserId: string;
mergeUserTaskId?: string;
}

export class TicketService {
private detectLangQueue?: Queue<DetectTicketLanguageJobData>;

private transferTicketQueue: Queue<TransferTicketJobData>;

constructor() {
if (process.env.ENABLE_TICKET_LANGUAGE_DETECT) {
console.log(`[TicketService] Ticket language detect enabled`);
Expand All @@ -37,6 +48,11 @@ export class TicketService {
return this.detectTicketLanguage(job.data.ticketId);
});
}

this.transferTicketQueue = createQueue('ticket_transfer');
this.transferTicketQueue.process((job) => {
return this.processTransferTicketJob(job);
});
}

async getReplies(
Expand Down Expand Up @@ -128,6 +144,43 @@ export class TicketService {
await this.detectLangQueue.add({ ticketId });
}
}

async addTransferTicketJob(data: TransferTicketJobData) {
await this.transferTicketQueue.add(data);
}

private async processTransferTicketJob(job: Job<TransferTicketJobData>) {
const { sourceUserId, targetUserId } = job.data;

const tickets = await Ticket.queryBuilder()
.where('author', '==', User.ptr(sourceUserId))
.includeACL(true)
.limit(50)
.find({ useMasterKey: true });

if (tickets.length === 0) {
if (job.data.mergeUserTaskId) {
userService.transferTicketsCallback(job.data.mergeUserTaskId);
}
return;
}

await Ticket.updateSome(
tickets.map((ticket) => {
const ACL = new ACLBuilder(ticket.getRawACL() || {});
ACL.disallow(sourceUserId, 'read', 'write');
ACL.allow(targetUserId, 'read', 'write');
return [ticket, { ACL: ACL, authorId: targetUserId }];
}),
{
useMasterKey: true,
}
);

await this.transferTicketQueue.add(job.data, {
delay: 2000,
});
}
}

export const ticketService = new TicketService();
100 changes: 100 additions & 0 deletions next/api/src/user/services/user.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import crypto from 'node:crypto';

import { BadRequestError } from '@/common/http';
import { User } from '@/model/User';
import { MergeUserTask } from '@/model/MergeUserTask';
import { ticketService } from '@/service/ticket';
import { CreateUserData } from '../types';

export class UserService {
Expand Down Expand Up @@ -59,6 +63,102 @@ export class UserService {

return password;
}

async inactiveUser(user: User) {
const newUser = await user.update({ inactive: true }, { useMasterKey: true });
await user.refreshSessionToken();
return newUser;
}

async mergeUser(sourceId: string, targetId: string) {
if (sourceId === targetId) {
throw new BadRequestError('源用户和目标用户不能相同');
}

const sourceUser = await User.find(sourceId, { useMasterKey: true });
if (!sourceUser) {
throw new BadRequestError('源用户不存在');
}
const sourceUserTask = await MergeUserTask.queryBuilder()
.where('sourceUser', '==', sourceUser.toPointer())
.first({ useMasterKey: true });
if (sourceUserTask) {
throw new BadRequestError('源用户已被合并');
}

const targetUser = await User.find(targetId, { useMasterKey: true });
if (!targetUser) {
throw new BadRequestError('目标用户不存在');
}

if (sourceUser.authData && targetUser.authData) {
throw new BadRequestError('authData conflict');
}
if (sourceUser.email && targetUser.email) {
throw new BadRequestError('email confilct');
}

const task = await MergeUserTask.create(
{
sourceUserId: sourceUser.id,
targetUserId: targetUser.id,
mergingData: {
authData: sourceUser.authData,
email: sourceUser.email,
},
status: 'pending',
},
{ useMasterKey: true }
);

(async () => {
await this.inactiveUser(sourceUser);
if (sourceUser.email || sourceUser.authData) {
await sourceUser.update(
{
authData: null,
email: null,
},
{ useMasterKey: true }
);
}

if (task.mergingData.email || task.mergingData.authData) {
await targetUser.update(
{
authData: task.mergingData.authData,
email: task.mergingData.email,
},
{ useMasterKey: true }
);
}

await ticketService.addTransferTicketJob({
sourceUserId: sourceUser.id,
targetUserId: targetUser.id,
mergeUserTaskId: task.id,
});

await task.update({ status: 'transfer_tickets' }, { useMasterKey: true });
})();

return task;
}

async transferTicketsCallback(mergeUserTaskId: string) {
const task = await MergeUserTask.find(mergeUserTaskId, { useMasterKey: true });
if (task) {
await task.update(
{
status: 'complete',
completedAt: new Date(),
},
{
useMasterKey: true,
}
);
}
}
}

export const userService = new UserService();
Loading

0 comments on commit 0b0b2c2

Please sign in to comment.