Skip to content

Add workaround for reconnection issue to documentation #241

Open
@lsafer-meemer

Description

Is your feature request related to a problem? Please describe.
It is a problem that client reconnection is not addressed

Describe the solution you'd like
This is just a workaround, but here we go. It primarly uses flows and might be a
solution to be integrated with the library.

@Mr3zee if you see this not fit as an issue I am very sorry. but I thought it might help someone

Some utilities:

fun <T> Flow<T>.firstShareStateInBlocking(scope: CoroutineScope): StateFlow<T> {
    val started: SharingStarted = SharingStarted.Eagerly
    return shareIn(scope, started, replay = 1).let {
        it.stateIn(scope, started, runBlocking { it.first() })
    }
}

// this is helpful if using compose
fun <T : R, R> Flow<T>.collectAsStateIn(
    coroutineScope: CoroutineScope,
    initial: R,
): State<R> {
    val state = mutableStateOf(initial)
    onEach { state.value = it }.launchIn(coroutineScope)
    return state
}

Creating a client state that reacts to session changes and reconnection requests

@OptIn(ExperimentalCoroutinesApi::class)
fun createRPCClientState(
    options: RpcClientOptions,
    httpClient: HttpClient,
    dataStore: DataStore<Preferences>,
    reConnectRequestFlow: Flow<Unit>,
    coroutineScope: CoroutineScope,
): StateFlow<RPCClient> {
    return dataStore.data
        .map { it[PREF_RPC_SESSION] }
        .distinctUntilChanged()
        .flatMapLatest { newSession ->
            merge(
                flowOf(newSession),
                reConnectRequestFlow
                    .map { newSession },
            )
        }
        .mapLatest { newSession ->
            httpClient.rpc(options.baseurl) {
                newSession?.let { bearerAuth(it) }
            }
        }
        .runningReduce { oldClient, newClient ->
            oldClient.webSocketSession.close()
            newClient
        }
        .firstShareStateInBlocking(coroutineScope)
}

And to use the client (in compose, but can be used elsewhere):

val remoteServiceState = rpcClientState
        .map { it.withService<MyRemoteService>() }
        .firstShareStateInBlocking(viewModelScope) // <-- or other scope

@OptIn(ExperimentalCoroutinesApi::class)
val someFlowValue by remoteServiceState
    .flatMapLatest { it.someServiceFlow }
    .onEach { newFlowValue ->  /**/  } // optional - to peek the value when changed
    .collectAsStateIn(viewModelScope, initial = null)  // <-- or other scope

Metadata

Assignees

Labels

featureNew feature or request

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions