mirror of
https://github.com/MatomoCamp/matomocamp-companion-android.git
synced 2024-09-19 16:13:46 +02:00
new implementation of tickerFlow and synchronizedTickerFlow based on kotlin.time.Duration and TimeSource
This commit is contained in:
parent
8949b4764d
commit
ad9ff4fc83
5 changed files with 57 additions and 29 deletions
|
@ -1,31 +1,51 @@
|
|||
package be.digitalia.fosdem.flow
|
||||
|
||||
import android.os.SystemClock
|
||||
import be.digitalia.fosdem.utils.ElapsedRealTimeSource
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.SharingStarted
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.flow
|
||||
import java.util.Arrays
|
||||
import kotlin.time.Duration
|
||||
import kotlin.time.ExperimentalTime
|
||||
import kotlin.time.TimeMark
|
||||
import kotlin.time.TimeSource
|
||||
|
||||
fun tickerFlow(periodInMillis: Long): Flow<Unit> = flow {
|
||||
fun tickerFlow(period: Duration): Flow<Unit> = flow {
|
||||
while (true) {
|
||||
emit(Unit)
|
||||
delay(periodInMillis)
|
||||
delay(period)
|
||||
}
|
||||
}
|
||||
|
||||
@OptIn(ExperimentalTime::class)
|
||||
fun synchronizedTickerFlow(period: Duration, subscriptionCount: StateFlow<Int>): Flow<Unit> {
|
||||
return synchronizedTickerFlow(period, subscriptionCount, ElapsedRealTimeSource)
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a ticker Flow which remembers the time of the last emission of the previous collection.
|
||||
* It only supports one subscriber at a time.
|
||||
* Creates a ticker Flow which delays emitting a value until there is at least one subscription.
|
||||
* timeSource needs to be monotonic.
|
||||
*/
|
||||
fun rememberTickerFlow(periodInMillis: Long): Flow<Unit> {
|
||||
var nextEmissionTime = 0L
|
||||
@ExperimentalTime
|
||||
fun synchronizedTickerFlow(
|
||||
period: Duration,
|
||||
subscriptionCount: StateFlow<Int>,
|
||||
timeSource: TimeSource
|
||||
): Flow<Unit> {
|
||||
return flow {
|
||||
delay(nextEmissionTime - SystemClock.elapsedRealtime())
|
||||
while (true) {
|
||||
emit(Unit)
|
||||
nextEmissionTime = SystemClock.elapsedRealtime() + periodInMillis
|
||||
delay(periodInMillis)
|
||||
var nextEmissionTimeMark: TimeMark? = null
|
||||
flow {
|
||||
nextEmissionTimeMark?.let { delay(-it.elapsedNow()) }
|
||||
while (true) {
|
||||
emit(Unit)
|
||||
nextEmissionTimeMark = timeSource.markNow() + period
|
||||
delay(period)
|
||||
}
|
||||
}
|
||||
.flowWhileShared(subscriptionCount, SharingStarted.WhileSubscribed())
|
||||
.collect(this)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
package be.digitalia.fosdem.utils
|
||||
|
||||
import android.os.SystemClock
|
||||
import kotlin.time.AbstractLongTimeSource
|
||||
import kotlin.time.DurationUnit
|
||||
import kotlin.time.ExperimentalTime
|
||||
|
||||
@ExperimentalTime
|
||||
object ElapsedRealTimeSource : AbstractLongTimeSource(DurationUnit.NANOSECONDS) {
|
||||
override fun read(): Long = SystemClock.elapsedRealtimeNanos()
|
||||
override fun toString(): String = "TimeSource(SystemClock.elapsedRealtimeNanos())"
|
||||
}
|
|
@ -8,9 +8,8 @@ import be.digitalia.fosdem.BuildConfig
|
|||
import be.digitalia.fosdem.alarms.AppAlarmManager
|
||||
import be.digitalia.fosdem.db.BookmarksDao
|
||||
import be.digitalia.fosdem.db.ScheduleDao
|
||||
import be.digitalia.fosdem.flow.flowWhileShared
|
||||
import be.digitalia.fosdem.flow.rememberTickerFlow
|
||||
import be.digitalia.fosdem.flow.stateFlow
|
||||
import be.digitalia.fosdem.flow.synchronizedTickerFlow
|
||||
import be.digitalia.fosdem.flow.versionedResourceFlow
|
||||
import be.digitalia.fosdem.model.Event
|
||||
import be.digitalia.fosdem.parsers.ExportedBookmarksParser
|
||||
|
@ -20,7 +19,6 @@ import kotlinx.coroutines.Dispatchers
|
|||
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.SharingStarted
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.filterNotNull
|
||||
import kotlinx.coroutines.flow.flatMapLatest
|
||||
|
@ -30,8 +28,8 @@ import okio.buffer
|
|||
import okio.source
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
import java.util.concurrent.TimeUnit
|
||||
import javax.inject.Inject
|
||||
import kotlin.time.Duration.Companion.minutes
|
||||
|
||||
@HiltViewModel
|
||||
class BookmarksViewModel @Inject constructor(
|
||||
|
@ -48,8 +46,7 @@ class BookmarksViewModel @Inject constructor(
|
|||
upcomingOnlyStateFlow.filterNotNull().flatMapLatest { upcomingOnly ->
|
||||
if (upcomingOnly) {
|
||||
// Refresh upcoming bookmarks every 2 minutes
|
||||
rememberTickerFlow(REFRESH_PERIOD)
|
||||
.flowWhileShared(subscriptionCount, SharingStarted.WhileSubscribed())
|
||||
synchronizedTickerFlow(REFRESH_PERIOD, subscriptionCount)
|
||||
.flatMapLatest {
|
||||
getObservableBookmarks(Instant.now() - TIME_OFFSET, subscriptionCount)
|
||||
}
|
||||
|
@ -89,7 +86,7 @@ class BookmarksViewModel @Inject constructor(
|
|||
}
|
||||
|
||||
companion object {
|
||||
private val REFRESH_PERIOD = TimeUnit.MINUTES.toMillis(2L)
|
||||
private val REFRESH_PERIOD = 2.minutes
|
||||
|
||||
// In upcomingOnly mode, events that just started are still shown for 5 minutes
|
||||
private val TIME_OFFSET = Duration.ofMinutes(5L)
|
||||
|
|
|
@ -10,7 +10,8 @@ import androidx.paging.cachedIn
|
|||
import be.digitalia.fosdem.db.ScheduleDao
|
||||
import be.digitalia.fosdem.flow.countSubscriptionsFlow
|
||||
import be.digitalia.fosdem.flow.flowWhileShared
|
||||
import be.digitalia.fosdem.flow.rememberTickerFlow
|
||||
import be.digitalia.fosdem.flow.stateFlow
|
||||
import be.digitalia.fosdem.flow.synchronizedTickerFlow
|
||||
import be.digitalia.fosdem.model.StatusEvent
|
||||
import dagger.hilt.android.lifecycle.HiltViewModel
|
||||
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||
|
@ -20,21 +21,19 @@ import kotlinx.coroutines.flow.distinctUntilChanged
|
|||
import kotlinx.coroutines.flow.filterNotNull
|
||||
import kotlinx.coroutines.flow.flatMapLatest
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.flow.stateIn
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
import java.util.concurrent.TimeUnit
|
||||
import javax.inject.Inject
|
||||
import kotlin.time.Duration.Companion.minutes
|
||||
|
||||
@HiltViewModel
|
||||
class LiveViewModel @Inject constructor(scheduleDao: ScheduleDao) : ViewModel() {
|
||||
|
||||
// Share a single ticker providing the time to ensure both lists are synchronized
|
||||
private val ticker: Flow<Instant> =
|
||||
rememberTickerFlow(REFRESH_PERIOD)
|
||||
private val ticker: Flow<Instant> = stateFlow(viewModelScope, null) { subscriptionCount ->
|
||||
synchronizedTickerFlow(REFRESH_PERIOD, subscriptionCount)
|
||||
.map { Instant.now() }
|
||||
.stateIn(viewModelScope, SharingStarted.WhileSubscribed(), null)
|
||||
.filterNotNull()
|
||||
}.filterNotNull()
|
||||
|
||||
@OptIn(ExperimentalCoroutinesApi::class)
|
||||
private fun createLiveEventsHotFlow(
|
||||
|
@ -59,7 +58,7 @@ class LiveViewModel @Inject constructor(scheduleDao: ScheduleDao) : ViewModel()
|
|||
}
|
||||
|
||||
companion object {
|
||||
private val REFRESH_PERIOD = TimeUnit.MINUTES.toMillis(1L)
|
||||
private val REFRESH_PERIOD = 1.minutes
|
||||
private val NEXT_EVENTS_INTERVAL = Duration.ofMinutes(30L)
|
||||
}
|
||||
}
|
|
@ -22,7 +22,7 @@ import kotlinx.coroutines.flow.flowOf
|
|||
import kotlinx.coroutines.flow.map
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
import java.util.concurrent.TimeUnit
|
||||
import kotlin.time.Duration.Companion.minutes
|
||||
|
||||
class TrackScheduleListViewModel @AssistedInject constructor(
|
||||
scheduleDao: ScheduleDao,
|
||||
|
@ -61,6 +61,6 @@ class TrackScheduleListViewModel @AssistedInject constructor(
|
|||
}
|
||||
|
||||
companion object {
|
||||
private val TIME_REFRESH_PERIOD = TimeUnit.MINUTES.toMillis(1L)
|
||||
private val TIME_REFRESH_PERIOD = 1.minutes
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue