Skip to content

Conversation

@tylerjroach
Copy link
Member

  • PR title and description conform to Pull Request guidelines.

Issue #, if available:

Description of changes:

  • Still need to get Logger approved but I went ahead and built it in.

How did you test these changes?
(Please add a line here how the changes were tested)

Documentation update required?

  • No
  • Yes (Please include a PR link for the documentation update)

General Checklist

  • Added Unit Tests
  • Added Integration Tests
  • Security oriented best practices and standards are followed (e.g. using input sanitization, principle of least privilege, etc)

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.

@tylerjroach tylerjroach requested a review from a team as a code owner April 11, 2025 18:25
@tylerjroach tylerjroach merged commit 9ee12ba into feat/appsync-events Apr 11, 2025
@tylerjroach tylerjroach deleted the tjroach/events-websocket-subscribe branch April 11, 2025 20:25
private val _connectResult = AtomicReference<Result<EventsWebSocket>?>(null)
private val _connectionInProgress = AtomicReference<Deferred<Result<EventsWebSocket>>?>(null)

fun getExistingWebSocket(): EventsWebSocket? = _connectResult.get()?.getOrNull()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be a val

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattcreaser This is suspending. What are you suggesting here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not suspending?

    val existingWebSocket: EventsWebSocket?
        get() = _connectResult.get()?.getOrNull()

But this is just preference, there's no issue with current.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep sorry, was looking at wrong value. updated

private val logger: Logger?
) {
private val mutex = Mutex()
private val _connectResult = AtomicReference<Result<EventsWebSocket>?>(null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not have underscores in non-backing properties

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was treating these like backing properties because they were private and not exposed outside the class, holding a reference that was accessed for the getConnectedWebSocket() method. I'll go ahead and rename though as this isn't technically a backing property.

fun toEventsException(fallbackMessage: String? = null): EventsException {
return when (errorType) {
"UnauthorizedException" -> UnauthorizedException(message ?: fallbackMessage)
else -> EventsException(message = "$errorType: $message")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this should pass fallbackMessage if message is null

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

val events = _events.asSharedFlow() // publicly exposed as read-only shared flow

private lateinit var webSocket: WebSocket
internal val isClosed = AtomicBoolean(false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't seem to be any advantage to having this be AtomicBoolean over a regular Boolean (possibly marked Volatile)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, volatile should be sufficient as the okhttp callback of onFailure or onClosed is the only place the value is updated (which should be from same thread/ownership).

}

override fun onOpen(webSocket: WebSocket, response: Response) {
val connectionInitMessage = json.encodeToString(WebSocketMessage.Send.ConnectionInit())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this function just be calling send(WebSocketMessage.Send.ConnectionInit())

* Gets the namespace of the logger.
* @return namespace for logger
*/
val namespace: String
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

namespace appears to be unused, is it supposed to be used as the tag of the logcat message?

The way I'd kind of expected this to work is for there to be an interface like:

interface LoggerFactory {
     fun logger(namespace: String): Logger
}

And then a particular events class (say EventsWebSocket) would do something like val logger = loggerFactory.logger("EventsWebSocket") so that it wouldn't have to prepend a TAG to every message.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, I have it set up like this:

private val events = Events(
    endpoint = endpoint,
    connectAuthorizer = apiKeyAuthorizer,
    defaultChannelAuthorizers = ChannelAuthorizers(
        subscribeAuthorizer = apiKeyAuthorizer,
        publishAuthorizer = apiKeyAuthorizer
    ),
    options = Events.Options(AndroidLogger("Events", LogLevel.DEBUG))
)
/**
 * Implementation of Logger interface that logs to Android's LogCat
 */
class AndroidLogger(
    override val namespace: String,
    override val thresholdLevel: LogLevel
) : Logger {

    override fun error(message: String) {
        if (!thresholdLevel.above(LogLevel.ERROR)) {
            Log.e(namespace, message)
        }
    }

    override fun error(message: String, error: Throwable?) {
        if (!thresholdLevel.above(LogLevel.ERROR)) {
            Log.e(namespace, message, error)
        }
    }

    override fun warn(message: String) {
        if (!thresholdLevel.above(LogLevel.WARN)) {
            Log.w(namespace, message)
        }
    }

    override fun warn(message: String, issue: Throwable?) {
        if (!thresholdLevel.above(LogLevel.WARN)) {
            Log.w(namespace, message, issue)
        }
    }

    override fun info(message: String) {
        if (!thresholdLevel.above(LogLevel.INFO)) {
            Log.i(namespace, message)
        }
    }

    override fun debug(message: String) {
        if (!thresholdLevel.above(LogLevel.DEBUG)) {
            Log.d(namespace, message)
        }
    }

    override fun verbose(message: String) {
        if (!thresholdLevel.above(LogLevel.VERBOSE)) {
            Log.v(namespace, message)
        }
    }
}

This was also on customer code side but it would be reasonable for us to include the implementation for AndroidLogger. This is all the same as how our current Amplify Logger works.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed after discussion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants