Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
6d9d0fe
feat: retryNotify job
zasulskii Aug 13, 2024
f87cf29
feat: add logs
zasulskii Aug 13, 2024
aae9a93
feat: config validation, postinstall script
zasulskii Aug 13, 2024
0fb0171
chore: fix index to config rename
zasulskii Aug 13, 2024
89f3a90
fix: logger error message
zasulskii Aug 13, 2024
2a32735
feat: refactor notification service, remove DI
zasulskii Aug 14, 2024
c267081
chore: remove unused file
zasulskii Aug 14, 2024
ac109d9
feat: shutdown, event-based txs
zasulskii Aug 15, 2024
2ff63de
feat: fast transactions fetch after service is shutdown
zasulskii Aug 15, 2024
63c5a07
feat: remove transactions jobs, refactor to transaction channel
zasulskii Aug 16, 2024
2a68205
feat: transaction channel destroy
zasulskii Aug 19, 2024
60f456d
refactor: message to messageMany while processing txs to notify
zasulskii Aug 19, 2024
59defd6
fix: handle transaction ref to unsub from socket
zasulskii Aug 20, 2024
23feef3
docs(README.md): register/unregister device transaction payload
bludnic Dec 4, 2024
cccf684
chore(package.json): upgrade husky
bludnic Dec 4, 2024
1528248
chore: gitignore .env
bludnic Dec 4, 2024
0649db0
chore(package.json): upgrade dependencies
bludnic Dec 4, 2024
a249cf7
fix(build): disable DTS (build error)
bludnic Dec 4, 2024
a0866f6
Merge remote-tracking branch 'origin/dev' into dev
bludnic Dec 4, 2024
9077201
chore: gitignore package-lock.json
bludnic Dec 4, 2024
0432d7d
chore(Node): enable source map support for stack traces
bludnic Dec 6, 2024
71fd99b
chore: configure dotenv
bludnic Dec 6, 2024
d02b247
refactor: remove DATABASE_URL from config (use ENV instead)
bludnic Dec 6, 2024
d29d8b5
fix: unhandled error when processing a Signal Transaction
bludnic Dec 6, 2024
7ac2cc3
chore: configure Vitest
bludnic Dec 7, 2024
d3a103e
chore(prisma): `deviceId` should be provided by the client
bludnic Dec 7, 2024
3ad6f12
feat(zod): add runtime type for `SignalMessagePayload`
bludnic Dec 7, 2024
aa3c4d7
refactor: simplify fastify server
bludnic Dec 7, 2024
9bc5571
refactor: simplify logger
bludnic Dec 7, 2024
3f953dd
refactor(prisma): simplify `checkConnection()`
bludnic Dec 7, 2024
9173b16
refactor(config): prettify
bludnic Dec 7, 2024
d14ddda
refactor(prettier): print semicolons at the ends of statements
bludnic Dec 7, 2024
bb23787
refactor: organize gitignore
bludnic Dec 7, 2024
e04a87f
chore(TypeScript): enable JSON modules
bludnic Dec 7, 2024
dfeebc4
refactor: simplify the config
bludnic Dec 7, 2024
c557205
chore(npm): add `typecheck` command
bludnic Dec 7, 2024
7e88715
refactor(config): sort imports
bludnic Dec 7, 2024
46af467
docs(README.md): provider should be uppercased
bludnic Dec 7, 2024
fdc0777
chore: add .env.example
bludnic Dec 7, 2024
5d16a65
refactor(firebase): rename config example
bludnic Dec 7, 2024
4695e4d
chore: upgrade fastify
bludnic Dec 7, 2024
58599eb
chore(fastify): add zod validation
bludnic Dec 7, 2024
e5ffe76
refactor(TransactionsChannel): simplify
bludnic Dec 7, 2024
3af6933
refactor: improve TransactionChannel
bludnic Dec 8, 2024
2844196
feat(TransactionChannel): type on/emit methods
bludnic Dec 8, 2024
3a9edbf
feat: refactor service
bludnic Dec 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion eslint.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export default tseslint.config({
'@typescript-eslint/no-unsafe-assignment': 'error',
'@typescript-eslint/no-unsafe-call': 'error',
'@typescript-eslint/no-unsafe-member-access': 'error',
'@typescript-eslint/no-unsafe-return': 'error'
'@typescript-eslint/no-unsafe-return': 'error',
'@typescript-eslint/no-explicit-any': 'off'
}
})
44 changes: 0 additions & 44 deletions src/events/channels.ts

This file was deleted.

167 changes: 167 additions & 0 deletions src/events/channels/transactionsChannel.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import EventEmitter from 'events'
import { AnyTransaction, WebSocketClient } from 'adamant-api'
import { schedule } from 'node-cron'
import { logger } from '../../modules/logger.js'
import { config } from '../../config/index.js'
import { adamantClient } from '../../modules/adamantClient.js'
import { prisma } from '../../modules/prisma.js'
import { JobName } from '@prisma/client'
import { isSignalTx } from '../../services/parser/isSignalTx.js'
import { isTxToNotify } from '../../services/parser/isTxToNotify.js'
import { ChatMessageTransaction } from 'adamant-api/dist/api/generated.js'

class TransactionsChannel extends EventEmitter {
// Specify types of events
override emit(event: 'newSignalMessage', tx: ChatMessageTransaction): boolean
override emit(event: 'newMessage', tx: AnyTransaction): boolean
override emit(event: string, ...args: never[]): boolean {
return super.emit(event, ...args)
}

override on(
event: 'newSignalMessage',
listener: (tx: ChatMessageTransaction) => void
): this
override on(event: 'newMessage', listener: (tx: AnyTransaction) => void): this
override on(event: string, listener: (...args: any[]) => void): this {
return super.on(event, listener)
}

private isLocked: boolean
private processedTxs: { [key: string]: AnyTransaction } = {} // cache for processed transactions
adamantSocket?: WebSocketClient

constructor() {
super()
this.isLocked = false
}

initSocket() {
adamantClient.initSocket({
wsType: 'ws',
admAddress: config.adamantAccount.address
})
logger.info(
`Adamant Client socket initialized on ${config.adamantAccount.address} address`
)

if (adamantClient.socket) {
adamantClient.socket.on((tx) => {
this.handleTransaction(tx)
})
adamantClient.socket.catch((error) => logger.error(error))

this.adamantSocket = adamantClient.socket
}
}

startJob() {
logger.info(
`Spawned transaction parser job with ${config.app.txCheckInterval} interval`
)
return schedule(config.app.txCheckInterval, async () => {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using schedule, you can use a regular setTimeout that invokes itself after each iteration. In this case, you can remove the this.isLocked check, as invoking the callback multiple times won't be possible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally add destroy method to this class that will:

  1. Stop the timer
  2. Stop listening for adamantClient events (adamantClient.socket.off(callback))

Invoke transactionsChannel.destroy() from main.ts on SIGINT/SIGTERM

if (this.isLocked) return

this.isLocked = true

try {
const getHeightResponse = await adamantClient.getHeight()
const currentHeight = getHeightResponse.success
? getHeightResponse.height
: 0

let jobStatus = await prisma.cronJobStatus.findFirst({
where: { jobName: JobName.TRANSACTIONS }
})

if (!jobStatus) {
jobStatus = await prisma.cronJobStatus.create({
data: {
jobName: JobName.TRANSACTIONS,
state: JSON.stringify({
lastHeight: currentHeight
})
}
})
}

const lastCheckHeight = (
JSON.parse(jobStatus.state as string) as { lastHeight: number }
).lastHeight

// Determine fetch interval
let heightToFetch = lastCheckHeight + config.app.heightSkipPerHeight

if (heightToFetch > currentHeight) {
this.isLocked = false
return
}

if (currentHeight - heightToFetch > 1) {
// If the gap is more than 1 block, fetch transactions in chunks of 1000 blocks
heightToFetch = lastCheckHeight + 1000

if (currentHeight - heightToFetch < 0) {
// if the gap reaches currenHeight and more, than set it to currentHeight
heightToFetch = currentHeight
}
}

const txs = await adamantClient.getTransactions({
fromHeight: lastCheckHeight,
and: {
toHeight: heightToFetch
},
returnAsset: 1
})

if (!txs.success) {
this.isLocked = false
return
}

txs.transactions.forEach((tx) => {
if (tx.height < currentHeight - config.notify.latestHeightToNotify) {
// skip if transaction is too old
return
}

this.handleTransaction(tx)
})

await prisma.cronJobStatus.update({
where: { jobName: JobName.TRANSACTIONS },
data: {
state: JSON.stringify({
lastHeight: heightToFetch
})
}
})
} catch (error) {
logger.error(error, 'Error while running transactions channel job')
} finally {
this.isLocked = false
}
})
}

handleTransaction(tx: AnyTransaction) {
if (this.processedTxs[tx.id]) {
delete this.processedTxs[tx.id] // removing from cache because we got tx again from rest api or socket
return
}

this.processedTxs[tx.id] = tx

if (isSignalTx(tx)) {
logger.info(
`Got signal transaction to (un)subscribe to notifications, txId: ${tx.id}, processing...`
)
this.emit('newSignalMessage', tx as ChatMessageTransaction)
} else if (isTxToNotify(tx)) {
this.emit('newMessage', tx)
}
}
}

export const transactionsChannel = new TransactionsChannel()
26 changes: 3 additions & 23 deletions src/events/handlers.ts
Original file line number Diff line number Diff line change
@@ -1,37 +1,17 @@
import {
notifyMessagesChannel,
signalMessagesChannel,
transactionsChannel
} from './channels.js'
import { txsParser } from '../services/txsParser.js'
import { AnyTransaction } from 'adamant-api'
import { transactionsChannel } from './channels/transactionsChannel.js'
import { logger } from '../modules/logger.js'
import { ChatMessageTransaction } from 'adamant-api/dist/api/generated.js'
import { processSignalTransaction } from '../services/processSignalTransaction.js'
import { processTransactionToNotify } from '../services/processTransactionToNotify.js'
import { prisma } from '../modules/prisma.js'

export const spawnEventHandlers = () => {
const processedTxs: { [key: string]: AnyTransaction } = {} // cache for processed transactions

transactionsChannel.on('newTransaction', (tx: AnyTransaction) => {
if (processedTxs[tx.id]) {
delete processedTxs[tx.id] // removing from cache because we got tx again from rest api or socket
return
}

processedTxs[tx.id] = tx

txsParser(tx).catch((e) => logger.error(e, 'Failed to parsed transaction'))
})

signalMessagesChannel.on('newSignalMessage', (tx: ChatMessageTransaction) => {
transactionsChannel.on('newSignalMessage', (tx) => {
processSignalTransaction(tx).catch((e) =>
logger.error(e, 'Failed to process signal transaction')
)
})

notifyMessagesChannel.on('newMessage', async (tx: AnyTransaction) => {
transactionsChannel.on('newMessage', async (tx) => {
let devices = await prisma.device.findMany({
where: { admAddress: tx.recipientId }
})
Expand Down
4 changes: 2 additions & 2 deletions src/jobs/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { spawnTransactionsJobs } from './transactionsJobs.js'
import { transactionsChannel } from '../events/channels/transactionsChannel.js'
import { spawnRetryNotifyJob } from './retryNotifyJob.js'

export const spawnJobs = () => {
const transactionJob = spawnTransactionsJobs()
const transactionJob = transactionsChannel.startJob()
const retryNotifyJob = spawnRetryNotifyJob()

transactionJob.start()
Expand Down
50 changes: 27 additions & 23 deletions src/jobs/retryNotifyJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,34 @@ export const spawnRetryNotifyJob = () => {

isLocked = true

const transactionsToNotify = await prisma.notifyTransaction.findMany({
where: {
isNotified: false,
lastNotifyDate: { not: null }
},
include: {
device: true
}
})

await Promise.all(
transactionsToNotify.map(async (transaction) => {
const tx = JSON.parse(transaction.admTx as string) as AnyTransaction

logger.info(
`Got notification that failed to send, processing... DeviceId: ${transaction.device.id}, provider: ${transaction.device.pushServiceProvider}, admTxId: ${tx.id}`
)

processTransactionToNotify(tx, transaction.device).catch((e) =>
logger.error(e, 'Failed to process failed transaction to notify')
)
try {
const transactionsToNotify = await prisma.notifyTransaction.findMany({
where: {
isNotified: false,
lastNotifyDate: { not: null }
},
include: {
device: true
}
})
)

isLocked = false
await Promise.all(
transactionsToNotify.map(async (transaction) => {
const tx = JSON.parse(transaction.admTx as string) as AnyTransaction

logger.info(
`Got notification that failed to send, processing... DeviceId: ${transaction.device.id}, provider: ${transaction.device.pushServiceProvider}, admTxId: ${tx.id}`
)

processTransactionToNotify(tx, transaction.device).catch((e) =>
logger.error(e, 'Failed to process failed transaction to notify')
)
})
)
} catch (err) {
logger.error(err, 'Error while running retry notify job')
} finally {
isLocked = false
}
})
}
Loading