Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions source/packages/services/assetlibrary/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
"@types/jest": "26.0.23",
"@types/json-stable-stringify": "1.0.32",
"@types/jsonwebtoken": "9.0.1",
"@types/node": "^18.17.0",
"@types/prettyjson": "0.0.29",
"@typescript-eslint/eslint-plugin": "6.2.0",
"ajv": "6.12.3",
Expand Down
72 changes: 34 additions & 38 deletions source/packages/services/assetlibrary/src/authz/authz.full.dao.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,17 @@
* and limitations under the License. *
*********************************************************************************************************************/
import { logger } from '@awssolutions/simple-cdf-logger';
import { process, structure } from 'gremlin';
import { process } from 'gremlin';
import { inject, injectable } from 'inversify';
import { BaseDaoFull } from '../data/base.full.dao';
import { ConnectionDaoFull } from '../data/connection.full.dao';
import { TYPES } from '../di/types';
import { Authorizations } from './authz.full.model';

const __ = process.statics; // eslint-disable-line no-underscore-dangle

@injectable()
export class AuthzDaoFull extends BaseDaoFull {
public constructor(
@inject('neptuneUrl') neptuneUrl: string,
@inject(TYPES.GraphSourceFactory) graphSourceFactory: () => structure.Graph
) {
super(neptuneUrl, graphSourceFactory);
}
export class AuthzDaoFull {
public constructor(@inject(TYPES.ConnectionDao) private connectionDao: ConnectionDaoFull) {}

public async listAuthorizedHierarchies(
deviceIds: string[],
Expand All @@ -48,39 +43,40 @@ export class AuthzDaoFull extends BaseDaoFull {
const ids: string[] = deviceIds.map((d) => `device___${d}`);
ids.push(...groupPaths.map((g) => `group___${g}`));

const conn = await super.getConnection();
const traverser = conn.traversal
.V(ids)
.as('entity')
.union(
// return an item if the entity exists
__.project('entity', 'exists')
.by(
__.select('entity').coalesce(
__.values('deviceId'),
__.values('groupPath')
const results = await this.connectionDao.withTraversal(async (conn) => {
const traverser = conn.traversal
.V(ids)
.as('entity')
.union(
// return an item if the entity exists
__.project('entity', 'exists')
.by(
__.select('entity').coalesce(
__.values('deviceId'),
__.values('groupPath')
)
)
.by(__.constant(true)),
// return an item if the entity is authorized
__.local(
__.until(__.has('groupPath', process.P.within(hierarchies)))
.repeat(
__.outE().has('isAuthCheck', true).otherV().simplePath().dedup()
)
.as('authorizedPath')
)
.by(__.constant(true)),
// return an item if the entity is authorized
__.local(
__.until(__.has('groupPath', process.P.within(hierarchies)))
.repeat(
__.outE().has('isAuthCheck', true).otherV().simplePath().dedup()
)
.as('authorizedPath')
)
.project('entity', 'authorizedPath')
.by(
__.select('entity').coalesce(
__.values('deviceId'),
__.values('groupPath')
.project('entity', 'authorizedPath')
.by(
__.select('entity').coalesce(
__.values('deviceId'),
__.values('groupPath')
)
)
)
.by(__.select('authorizedPath').values('groupPath'))
);
.by(__.select('authorizedPath').values('groupPath'))
);

const results = await traverser.toList();
return await traverser.toList();
});

logger.debug(
`authz.full.dao listAuthorizedHierarchies: results:${JSON.stringify(results)}`
Expand Down
57 changes: 28 additions & 29 deletions source/packages/services/assetlibrary/src/data/common.full.dao.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
* and limitations under the License. *
*********************************************************************************************************************/
import { logger } from '@awssolutions/simple-cdf-logger';
import { process, structure } from 'gremlin';
import { process } from 'gremlin';
import { inject, injectable } from 'inversify';
import { TYPES } from '../di/types';
import { TypeUtils } from '../utils/typeUtils';
import { BaseDaoFull } from './base.full.dao';
import { ConnectionDaoFull } from './connection.full.dao';
import { FullAssembler } from './full.assembler';
import { RelatedEntityDto, VertexDto, isRelatedEntityDto, isVertexDto } from './full.model';
import { EntityTypeMap, ModelAttributeValue, SortKeys } from './model';
Expand All @@ -24,15 +24,12 @@ import { Node } from './node';
const __ = process.statics;

@injectable()
export class CommonDaoFull extends BaseDaoFull {
export class CommonDaoFull {
public constructor(
@inject('neptuneUrl') neptuneUrl: string,
@inject(TYPES.TypeUtils) private typeUtils: TypeUtils,
@inject(TYPES.FullAssembler) private fullAssembler: FullAssembler,
@inject(TYPES.GraphSourceFactory) graphSourceFactory: () => structure.Graph
) {
super(neptuneUrl, graphSourceFactory);
}
@inject(TYPES.ConnectionDao) private connectionDao: ConnectionDaoFull
) {}

public async listRelated(
entityDbId: string,
Expand Down Expand Up @@ -146,20 +143,21 @@ export class CommonDaoFull extends BaseDaoFull {
relatedUnion.range(offsetAsInt, offsetAsInt + countAsInt);

// build the main part of the query, unioning the related traversers with the main entity we want to return
const conn = await super.getConnection();
const traverser = conn.traversal
.V(entityDbId)
.as('main')
.union(
relatedUnion,
__.select('main').valueMap().with_(process.withOptions.tokens)
);
const results = await this.connectionDao.withTraversal(async (conn) => {
const traverser = conn.traversal
.V(entityDbId)
.as('main')
.union(
relatedUnion,
__.select('main').valueMap().with_(process.withOptions.tokens)
);

// execute and retrieve the results
logger.debug(
`common.full.dao listRelated: traverser: ${JSON.stringify(traverser.toString())}`
);
const results = await traverser.toList();
// execute and retrieve the results
logger.debug(
`common.full.dao listRelated: traverser: ${JSON.stringify(traverser.toString())}`
);
return await traverser.toList();
});
logger.debug(`common.full.dao listRelated: results: ${JSON.stringify(results)}`);

if (results === undefined || results.length === 0) {
Expand Down Expand Up @@ -188,15 +186,16 @@ export class CommonDaoFull extends BaseDaoFull {
return {};
}

const conn = await super.getConnection();
const query = conn.traversal
.V(entityDbIds)
.project('id', 'labels')
.by(__.coalesce(__.values('deviceId'), __.values('groupPath')))
.by(__.label().fold());
const results = await this.connectionDao.withTraversal(async (conn) => {
const query = conn.traversal
.V(entityDbIds)
.project('id', 'labels')
.by(__.coalesce(__.values('deviceId'), __.values('groupPath')))
.by(__.label().fold());

logger.silly(`common.full.dao getLabels: query: ${JSON.stringify(query)}`);
const results = await query.toList();
logger.silly(`common.full.dao getLabels: query: ${JSON.stringify(query)}`);
return await query.toList();
});
logger.silly(`common.full.dao getLabels: results: ${JSON.stringify(results)}`);

if ((results?.length ?? 0) === 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import { logger } from '@awssolutions/simple-cdf-logger';
import { driver, process, structure } from 'gremlin';
import { inject, injectable } from 'inversify';
import { TYPES } from '../di/types';
import { PrematurelyClosedConnectionError } from '../utils/errors';
import { retry } from '../utils/retry';

@injectable()
export class BaseDaoFull {
export class ConnectionDaoFull {
private _graph: structure.Graph;
private _conn: driver.DriverRemoteConnection | null;

Expand All @@ -28,40 +30,65 @@ export class BaseDaoFull {
this._conn = null;
}

protected async getConnection(): Promise<NeptuneConnection> {
logger.debug(`base.full.dao getConnection: in:`);

private async _withTraversal<T>(fn: (conn: NeptuneConnection) => Promise<T>): Promise<T> {
logger.debug(`connection.full.dao withTraversal: in:`);
if (this._conn == null) {
logger.debug(`base.full.dao getConnection: create new connection:`);
logger.debug(`connection.full.dao withTraversal: create new connection:`);
this._conn = new driver.DriverRemoteConnection(this.neptuneUrl, {
mimeType: 'application/vnd.gremlin-v2.0+json',
pingEnabled: false,
connectOnStartup: false,
});
this._conn.addListener('close', (code: number, message: string) => {
logger.info(`base.full.dao connection close: code: ${code}, message: ${message}`);
}

return new Promise((resolve, reject) => {
const closeListener = (code: number, message: string) => {
logger.info(
`connection.full.dao connection close: code: ${code}, message: ${message}`
);
this._conn = null;
if (code === 1006) {
throw new Error('Connection closed prematurely');
reject(new PrematurelyClosedConnectionError());
}
});
await this._conn.open();
}
};

logger.debug(`base.full.dao getConnection: withRemote:`);
const res = new NeptuneConnection(
this._graph.traversal().withRemote(this._conn),
);
this._conn.addListener('close', closeListener);

this._conn
.open()
.then(() => {
logger.debug(`connection.full.dao getConnection: withRemote:`);
const conn = new NeptuneConnection(
this._graph.traversal().withRemote(this._conn)
);
fn(conn)
.then(resolve)
.catch(reject)
.finally(() => {
this._conn?.removeListener('close', closeListener);
});
})
.catch((e) => {
this._conn?.removeListener('close', closeListener);
reject(e);
});
});
}

logger.debug(`base.full.dao getConnection: exit:`);
return res;
public async withTraversal<T>(fn: (conn: NeptuneConnection) => Promise<T>): Promise<T> {
return await retry(
async () => {
return await this._withTraversal(fn);
},
{
shouldRetry: (e) => e.name === 'PrematurelyClosedConnectionError',
}
);
}
}

export class NeptuneConnection {
constructor(
private _traversal: process.GraphTraversalSource,
) {}
constructor(private _traversal: process.GraphTraversalSource) {}

public get traversal(): process.GraphTraversalSource {
return this._traversal;
Expand Down
Loading