Skip to content

Commit 6633fab

Browse files
committed
feat: final temporal touches
1 parent da00454 commit 6633fab

File tree

14 files changed

+236
-87
lines changed

14 files changed

+236
-87
lines changed

apps/backend/src/app.module.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import { FILTER } from '@gitroom/nestjs-libraries/sentry/sentry.exception';
1414
import { ChatModule } from '@gitroom/nestjs-libraries/chat/chat.module';
1515
import { getTemporalModule } from '@gitroom/nestjs-libraries/temporal/temporal.module';
1616
import { TemporalRegisterMissingSearchAttributesModule } from '@gitroom/nestjs-libraries/temporal/temporal.register';
17+
import { InfiniteWorkflowRegisterModule } from '@gitroom/nestjs-libraries/temporal/infinite.workflow.register';
1718

1819
@Global()
1920
@Module({
@@ -28,6 +29,7 @@ import { TemporalRegisterMissingSearchAttributesModule } from '@gitroom/nestjs-l
2829
ChatModule,
2930
getTemporalModule(false),
3031
TemporalRegisterMissingSearchAttributesModule,
32+
InfiniteWorkflowRegisterModule,
3133
ThrottlerModule.forRoot([
3234
{
3335
ttl: 3600000,

apps/orchestrator/src/activities/post.activity.ts

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
import { Injectable } from '@nestjs/common';
2-
import { Activity, ActivityMethod } from 'nestjs-temporal-core';
2+
import {
3+
Activity,
4+
ActivityMethod,
5+
TemporalService,
6+
} from 'nestjs-temporal-core';
37
import { PostsService } from '@gitroom/nestjs-libraries/database/prisma/posts/posts.service';
48
import {
59
NotificationService,
@@ -13,6 +17,12 @@ import { RefreshIntegrationService } from '@gitroom/nestjs-libraries/integration
1317
import { timer } from '@gitroom/helpers/utils/timer';
1418
import { IntegrationService } from '@gitroom/nestjs-libraries/database/prisma/integrations/integration.service';
1519
import { WebhooksService } from '@gitroom/nestjs-libraries/database/prisma/webhooks/webhooks.service';
20+
import { TypedSearchAttributes } from '@temporalio/common';
21+
import {
22+
organizationId,
23+
postId as postIdSearchParam,
24+
} from '@gitroom/nestjs-libraries/temporal/temporal.search.attribute';
25+
import { postWorkflow } from '@gitroom/orchestrator/workflows';
1626

1727
@Injectable()
1828
@Activity()
@@ -23,9 +33,44 @@ export class PostActivity {
2333
private _integrationManager: IntegrationManager,
2434
private _integrationService: IntegrationService,
2535
private _refreshIntegrationService: RefreshIntegrationService,
26-
private _webhookService: WebhooksService
36+
private _webhookService: WebhooksService,
37+
private _temporalService: TemporalService
2738
) {}
2839

40+
@ActivityMethod()
41+
async searchForMissingThreeHoursPosts() {
42+
const list = await this._postService.searchForMissingThreeHoursPosts();
43+
for (const post of list) {
44+
await this._temporalService.client
45+
.getRawClient()
46+
.workflow.signalWithStart('postWorkflow', {
47+
workflowId: `post_${post.id}`,
48+
taskQueue: 'main',
49+
signal: 'poke',
50+
signalArgs: [],
51+
args: [
52+
{
53+
taskQueue: post.integration.providerIdentifier
54+
.split('-')[0]
55+
.toLowerCase(),
56+
postId: post.id,
57+
organizationId: post.organizationId,
58+
},
59+
],
60+
typedSearchAttributes: new TypedSearchAttributes([
61+
{
62+
key: postIdSearchParam,
63+
value: post.id,
64+
},
65+
{
66+
key: organizationId,
67+
value: post.organizationId,
68+
},
69+
]),
70+
});
71+
}
72+
}
73+
2974
@ActivityMethod()
3075
async updatePost(id: string, postId: string, releaseURL: string) {
3176
return this._postService.updatePost(id, postId, releaseURL);
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
export * from './post.workflow';
22
export * from './autopost.workflow';
33
export * from './digest.email.workflow';
4+
export * from './missing.post.workflow';
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import { proxyActivities, sleep } from '@temporalio/workflow';
2+
import { PostActivity } from '@gitroom/orchestrator/activities/post.activity';
3+
4+
const { searchForMissingThreeHoursPosts } = proxyActivities<PostActivity>({
5+
startToCloseTimeout: '10 minute',
6+
retry: {
7+
maximumAttempts: 3,
8+
backoffCoefficient: 1,
9+
initialInterval: '2 minutes',
10+
},
11+
});
12+
13+
export async function missingPostWorkflow() {
14+
await searchForMissingThreeHoursPosts();
15+
while (true) {
16+
await sleep('1 hour');
17+
await searchForMissingThreeHoursPosts();
18+
}
19+
}

apps/orchestrator/src/workflows/post.workflow.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import {
55
startChild,
66
proxyActivities,
77
sleep,
8+
defineSignal,
9+
setHandler,
810
} from '@temporalio/workflow';
911
import dayjs from 'dayjs';
1012
import { Integration } from '@prisma/client';
@@ -42,6 +44,8 @@ const {
4244
},
4345
});
4446

47+
const poke = defineSignal('poke');
48+
4549
export async function postWorkflow({
4650
taskQueue,
4751
postId,
@@ -53,7 +57,6 @@ export async function postWorkflow({
5357
organizationId: string;
5458
postNow?: boolean;
5559
}) {
56-
5760
// Dynamic task queue, for concurrency
5861
const {
5962
postSocial,
@@ -65,6 +68,11 @@ export async function postWorkflow({
6568
processPlug,
6669
} = proxyTaskQueue(taskQueue);
6770

71+
let poked = false;
72+
setHandler(poke, () => {
73+
poked = true;
74+
});
75+
6876
const startTime = new Date();
6977
// get all the posts and comments to post
7078
const postsList = await getPostsList(organizationId, postId);
@@ -77,6 +85,9 @@ export async function postWorkflow({
7785

7886
// if it's a repeatable post, we should ignore this
7987
if (!postNow) {
88+
if (dayjs(post.publishDate).isBefore(dayjs())) {
89+
return;
90+
}
8091
await sleep(dayjs(post.publishDate).diff(dayjs(), 'millisecond'));
8192
}
8293

docker-compose.dev.yaml

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,87 @@ services:
5353
- postiz-network
5454
restart: always
5555

56+
temporal-elasticsearch:
57+
container_name: temporal-elasticsearch
58+
image: elasticsearch:7.17.27
59+
environment:
60+
- cluster.routing.allocation.disk.threshold_enabled=true
61+
- cluster.routing.allocation.disk.watermark.low=512mb
62+
- cluster.routing.allocation.disk.watermark.high=256mb
63+
- cluster.routing.allocation.disk.watermark.flood_stage=128mb
64+
- discovery.type=single-node
65+
- ES_JAVA_OPTS=-Xms256m -Xmx256m
66+
- xpack.security.enabled=false
67+
networks:
68+
- temporal-network
69+
expose:
70+
- 9200
71+
volumes:
72+
- /var/lib/elasticsearch/data
73+
74+
temporal-postgresql:
75+
container_name: temporal-postgresql
76+
image: postgres:16
77+
environment:
78+
POSTGRES_PASSWORD: temporal
79+
POSTGRES_USER: temporal
80+
networks:
81+
- temporal-network
82+
expose:
83+
- 5432
84+
volumes:
85+
- /var/lib/postgresql/data
86+
87+
temporal:
88+
container_name: temporal
89+
ports:
90+
- "7233:7233"
91+
image: temporalio/auto-setup:1.28.1
92+
depends_on:
93+
- temporal-postgresql
94+
- temporal-elasticsearch
95+
environment:
96+
- DB=postgres12
97+
- DB_PORT=5432
98+
- POSTGRES_USER=temporal
99+
- POSTGRES_PWD=temporal
100+
- POSTGRES_SEEDS=temporal-postgresql
101+
- DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development-sql.yaml
102+
- ENABLE_ES=true
103+
- ES_SEEDS=temporal-elasticsearch
104+
- ES_VERSION=v7
105+
- TEMPORAL_NAMESPACE=default
106+
networks:
107+
- temporal-network
108+
volumes:
109+
- ./dynamicconfig:/etc/temporal/config/dynamicconfig
110+
labels:
111+
kompose.volume.type: configMap
112+
113+
temporal-admin-tools:
114+
container_name: temporal-admin-tools
115+
image: temporalio/admin-tools:1.28.1-tctl-1.18.4-cli-1.4.1
116+
environment:
117+
- TEMPORAL_ADDRESS=temporal:7233
118+
- TEMPORAL_CLI_ADDRESS=temporal:7233
119+
networks:
120+
- temporal-network
121+
stdin_open: true
122+
depends_on:
123+
- temporal
124+
tty: true
125+
126+
temporal-ui:
127+
container_name: temporal-ui
128+
image: temporalio/ui:2.34.0
129+
environment:
130+
- TEMPORAL_ADDRESS=temporal:7233
131+
- TEMPORAL_CORS_ORIGINS=http://127.0.0.1:3000
132+
networks:
133+
- temporal-network
134+
ports:
135+
- "8080:8080"
136+
56137
volumes:
57138
redisinsight:
58139
postgres-volume:
@@ -61,3 +142,6 @@ volumes:
61142
networks:
62143
postiz-network:
63144
external: false
145+
temporal-network:
146+
driver: bridge
147+
name: temporal-network
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
system.forceSearchAttributesCacheRefreshOnRead:
2+
- value: true # Dev setup only. Please don't turn this on in production.
3+
constraints: {}

dynamicconfig/development-sql.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
limit.maxIDLength:
2+
- value: 255
3+
constraints: {}
4+
system.forceSearchAttributesCacheRefreshOnRead:
5+
- value: true # Dev setup only. Please don't turn this on in production.
6+
constraints: {}

libraries/nestjs-libraries/src/database/prisma/notifications/notification.service.ts

Lines changed: 11 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import { Injectable } from '@nestjs/common';
22
import { NotificationsRepository } from '@gitroom/nestjs-libraries/database/prisma/notifications/notifications.repository';
33
import { EmailService } from '@gitroom/nestjs-libraries/services/email.service';
44
import { OrganizationRepository } from '@gitroom/nestjs-libraries/database/prisma/organizations/organization.repository';
5-
import { ioRedis } from '@gitroom/nestjs-libraries/redis/redis.service';
65
import { TemporalService } from 'nestjs-temporal-core';
76
import { TypedSearchAttributes } from '@temporalio/common';
87
import { organizationId } from '@gitroom/nestjs-libraries/temporal/temporal.search.attribute';
@@ -32,13 +31,6 @@ export class NotificationService {
3231
);
3332
}
3433

35-
getNotificationsSince(organizationId: string, since: string) {
36-
return this._notificationRepository.getNotificationsSince(
37-
organizationId,
38-
since
39-
);
40-
}
41-
4234
async inAppNotification(
4335
orgId: string,
4436
subject: string,
@@ -56,8 +48,18 @@ export class NotificationService {
5648
try {
5749
await this._temporalService.client
5850
.getRawClient()
59-
?.workflow.start('digestEmailWorkflow', {
51+
?.workflow.signalWithStart('digestEmailWorkflow', {
6052
workflowId: 'digest_email_workflow_' + orgId,
53+
signal: 'email',
54+
signalArgs: [
55+
[
56+
{
57+
title: subject,
58+
message,
59+
type,
60+
},
61+
],
62+
],
6163
taskQueue: 'main',
6264
args: [{ organizationId: orgId }],
6365
typedSearchAttributes: new TypedSearchAttributes([
@@ -69,20 +71,6 @@ export class NotificationService {
6971
});
7072
} catch (err) {}
7173

72-
await this._temporalService.signalWorkflow(
73-
'digest_email_workflow_' + orgId,
74-
'email',
75-
[
76-
[
77-
{
78-
title: subject,
79-
message,
80-
type,
81-
},
82-
],
83-
]
84-
);
85-
8674
return;
8775
}
8876

@@ -111,45 +99,6 @@ export class NotificationService {
11199
}
112100
}
113101

114-
async getDigestTypes(orgId: string): Promise<NotificationType[]> {
115-
const typesKey = 'digest_types_' + orgId;
116-
const types = await ioRedis.smembers(typesKey);
117-
// Clean up the types key after reading
118-
await ioRedis.del(typesKey);
119-
return types as NotificationType[];
120-
}
121-
122-
async sendDigestEmailsToOrg(
123-
orgId: string,
124-
subject: string,
125-
message: string,
126-
types: NotificationType[]
127-
) {
128-
const userOrg = await this._organizationRepository.getAllUsersOrgs(orgId);
129-
const hasInfo = types.includes('info');
130-
const hasSuccess = types.includes('success');
131-
const hasFail = types.includes('fail');
132-
133-
for (const user of userOrg?.users || []) {
134-
// 'info' type is always sent regardless of preferences
135-
if (hasInfo) {
136-
await this.sendEmail(user.user.email, subject, message);
137-
continue;
138-
}
139-
140-
// For digest, check if user wants any of the notification types in the digest
141-
const wantsSuccess = hasSuccess && user.user.sendSuccessEmails;
142-
const wantsFail = hasFail && user.user.sendFailureEmails;
143-
144-
// Only send if user wants at least one type of notification in the digest
145-
if (!wantsSuccess && !wantsFail) {
146-
continue;
147-
}
148-
149-
await this.sendEmail(user.user.email, subject, message);
150-
}
151-
}
152-
153102
async sendEmail(to: string, subject: string, html: string, replyTo?: string) {
154103
await this._emailService.sendEmail(to, subject, html, replyTo);
155104
}

0 commit comments

Comments
 (0)