@@ -23,6 +23,8 @@ import io.reactivex.schedulers.Schedulers
2323import kotlinx.coroutines.Dispatchers
2424import kotlinx.coroutines.Job
2525import kotlinx.coroutines.channels.Channel
26+ import kotlinx.coroutines.channels.Channel.Factory
27+ import kotlinx.coroutines.coroutineScope
2628import kotlinx.coroutines.delay
2729import kotlinx.coroutines.ensureActive
2830import kotlinx.coroutines.job
@@ -315,6 +317,183 @@ class NearbyParentFragmentPresenter
315317 loadPlacesDataAsync(nearbyPlaceGroups, scope)
316318 }
317319
320+ /* *
321+ * Iterates through MarkerPlaceGroups and attempts to load the places from the local
322+ * Place cache/repository. If a Place is in the cache, data from the Place is set into the
323+ * associated MarkerPlaceGroup. Else, the index is added to the indicesToUpdate list.
324+ *
325+ * @param updatedGroups The MarkerPlaceGroups that contain Place entity IDs used to search the
326+ * local cache for more information about the Place.
327+ *
328+ * @param indicesToUpdate The list of indices in updatedGroups where the associated Place
329+ * was not stored in the local cache and will need to be retrieved in some other way.
330+ */
331+ suspend fun loadCachedPlaces (
332+ updatedGroups : MutableList <MarkerPlaceGroup >,
333+ indicesToUpdate : MutableList <Int >
334+ ) {
335+
336+ for (i in 0 .. updatedGroups.lastIndex) {
337+ val repoPlace = placesRepository.fetchPlace(updatedGroups[i].place.entityID)
338+ if (repoPlace != null && repoPlace.name != null && repoPlace.name != " " ){
339+ updatedGroups[i].isBookmarked =
340+ bookmarkLocationDao.findBookmarkLocation(repoPlace.name)
341+
342+ updatedGroups[i].place.apply {
343+ name = repoPlace.name
344+ isMonument = repoPlace.isMonument
345+ pic = repoPlace.pic ? : " "
346+ exists = repoPlace.exists ? : true
347+ longDescription = repoPlace.longDescription ? : " "
348+ language = repoPlace.language
349+ siteLinks = repoPlace.siteLinks
350+ }
351+ } else {
352+ indicesToUpdate.add(i)
353+ }
354+ }
355+
356+ }
357+
358+ /* *
359+ * Creates a Channel object that contains numbers divided into separate batches/lists.
360+ * Each batch/list will have a maximum size.
361+ *
362+ * @param numbers The list of numbers that will be divided into multiple batches/lists.
363+ * @param batchSize The maximum size of each batch/list which the numbers will be placed into.
364+ *
365+ * @return The Channel object which contains batches/lists of numbers.
366+ */
367+ suspend fun createBatches (numbers : MutableList <Int >, batchSize : Int ): Channel <List <Int >> {
368+
369+ val batches = Channel <List <Int >>(Channel .UNLIMITED )
370+
371+ for (i in numbers.indices step batchSize) {
372+ batches.send(
373+ numbers.slice(
374+ i until (i + batchSize).coerceAtMost(
375+ numbers.size
376+ )
377+ )
378+ )
379+ }
380+
381+ batches.close()
382+ return batches
383+ }
384+
385+ /* *
386+ * Method used by an individual thread/coroutine to request batch Place data from WikiData.
387+ * This is intended to be used by many threads in parallel to speed up data retrieval.
388+ *
389+ * If a WikiData server error occurs, this method will launch individual threads for each Place
390+ * because a single server error for one Place can cause WikiData to respond with an error
391+ * for the entire batch.
392+ *
393+ * @param batchedIndicesToFetch Contains the batches of indices. These indices correspond to
394+ * Places in updatedGroups that need more information fetched from WikiData.
395+ * @param updatedGroups List containing incomplete Place data
396+ * @param collectResults Holds the Place data fetched from Wikidata.
397+ */
398+ private suspend fun fetchPlacesThreadMethod (
399+ batchedIndicesToFetch : Channel <List <Int >>,
400+ updatedGroups : MutableList <MarkerPlaceGroup >,
401+ collectResults : Channel <List <Pair <Int , MarkerPlaceGroup >>>
402+ ) = coroutineScope {
403+ for (indices in batchedIndicesToFetch) {
404+ ensureActive()
405+ try {
406+ val fetchedPlaces =
407+ nearbyController.getPlaces(indices.map { updatedGroups[it].place })
408+ collectResults.send(
409+ fetchedPlaces.mapIndexed { index, place ->
410+ Pair (indices[index], MarkerPlaceGroup (
411+ bookmarkLocationDao.findBookmarkLocation(place.name),
412+ place
413+ ))
414+ }
415+ )
416+ } catch (e: Exception ) {
417+ Timber .tag(" NearbyPinDetails" ).e(e)
418+ // HTTP request failed. Try individual places
419+ for (i in indices) {
420+ launch {
421+ val onePlaceBatch = mutableListOf<Pair <Int , MarkerPlaceGroup >>()
422+ try {
423+ val fetchedPlace = nearbyController.getPlaces(
424+ mutableListOf (updatedGroups[i].place)
425+ )
426+
427+ onePlaceBatch.add(Pair (i, MarkerPlaceGroup (
428+ bookmarkLocationDao.findBookmarkLocation(
429+ fetchedPlace[0 ].name
430+ ),
431+ fetchedPlace[0 ]
432+ )))
433+ } catch (e: Exception ) {
434+ Timber .tag(" NearbyPinDetails" ).e(e)
435+ onePlaceBatch.add(Pair (i, updatedGroups[i]))
436+ }
437+ collectResults.send(onePlaceBatch)
438+ }
439+ }
440+ }
441+ }
442+ }
443+
444+ /* *
445+ * Launches a specific number of threads to retrieve Place data in parallel from the
446+ * WikiData servers.
447+ *
448+ * @param numThreads The number of threads to launch in parallel
449+ * @param batchedIndicesToFetch Contains the batches of indices. These indices correspond to
450+ * Places in updatedGroups that need more information fetched from WikiData.
451+ * @param updatedGroups List containing Place data
452+ * @param collectResults Holds the Place data fetched from Wikidata.
453+ */
454+ private suspend fun launchFetchPlacesThreads (
455+ numThreads : Int ,
456+ batchedIndicesToFetch : Channel <List <Int >>,
457+ updatedGroups : MutableList <MarkerPlaceGroup >,
458+ collectResults : Channel <List <Pair <Int , MarkerPlaceGroup >>>
459+ ) = coroutineScope {
460+ repeat(numThreads) {
461+ launch(Dispatchers .IO ) {
462+ fetchPlacesThreadMethod(batchedIndicesToFetch, updatedGroups, collectResults)
463+ }
464+ }
465+ }
466+
467+ /* *
468+ * Places the fetched WikiData Place data into associated MarkerPlaceGroups
469+ *
470+ * @param resultList The Place data fetched from WikiData
471+ * @param updatedGroups The MarkerPlaceGroups which will be updated with the data fetched
472+ * from WikiData
473+ */
474+ private fun processResults (
475+ resultList : List <Pair <Int , MarkerPlaceGroup >>,
476+ updatedGroups : MutableList <MarkerPlaceGroup >
477+ ) {
478+
479+ for ((index, fetchedPlaceGroup) in resultList) {
480+ val existingPlace = updatedGroups[index].place
481+ val finalPlaceGroup = MarkerPlaceGroup (
482+ fetchedPlaceGroup.isBookmarked,
483+ fetchedPlaceGroup.place.apply {
484+ location = existingPlace.location
485+ distance = existingPlace.distance
486+ isMonument = existingPlace.isMonument
487+ }
488+ )
489+ updatedGroups[index] = finalPlaceGroup
490+ placesRepository
491+ .save(finalPlaceGroup.place)
492+ .subscribeOn(Schedulers .io())
493+ .subscribe()
494+ }
495+ }
496+
318497 /* *
319498 * Load the places' details from cache and Wikidata query, and update these details on the map
320499 * as and when they arrive.
@@ -341,101 +520,25 @@ class NearbyParentFragmentPresenter
341520 val updatedGroups = nearbyPlaceGroups.toMutableList()
342521 // first load cached places:
343522 val indicesToUpdate = mutableListOf<Int >()
344- for (i in 0 .. updatedGroups.lastIndex) {
345- val repoPlace = placesRepository.fetchPlace(updatedGroups[i].place.entityID)
346- if (repoPlace != null && repoPlace.name != null && repoPlace.name != " " ){
347- updatedGroups[i].isBookmarked = bookmarkLocationDao.findBookmarkLocation(repoPlace.name)
348- updatedGroups[i].place.apply {
349- name = repoPlace.name
350- isMonument = repoPlace.isMonument
351- pic = repoPlace.pic ? : " "
352- exists = repoPlace.exists ? : true
353- longDescription = repoPlace.longDescription ? : " "
354- language = repoPlace.language
355- siteLinks = repoPlace.siteLinks
356- }
357- } else {
358- indicesToUpdate.add(i)
359- }
360- }
523+
524+ loadCachedPlaces(updatedGroups, indicesToUpdate)
525+
361526 schedulePlacesUpdate(updatedGroups, force = true )
362527 // channel for lists of indices of places, each list to be fetched in a single request
363- val fetchPlacesChannel = Channel <List <Int >>(Channel .UNLIMITED )
364- var totalBatches = 0
365- for (i in indicesToUpdate.indices step LoadPlacesAsyncOptions .BATCH_SIZE ) {
366- ++ totalBatches
367- fetchPlacesChannel.send(
368- indicesToUpdate.slice(
369- i until (i + LoadPlacesAsyncOptions .BATCH_SIZE ).coerceAtMost(
370- indicesToUpdate.size
371- )
372- )
373- )
374- }
375- fetchPlacesChannel.close()
376- val collectResults = Channel <List <Pair <Int , MarkerPlaceGroup >>>(totalBatches)
377- repeat(LoadPlacesAsyncOptions .CONNECTION_COUNT ) {
378- launch(Dispatchers .IO ) {
379- for (indices in fetchPlacesChannel) {
380- ensureActive()
381- try {
382- val fetchedPlaces =
383- nearbyController.getPlaces(indices.map { updatedGroups[it].place })
384- collectResults.send(
385- fetchedPlaces.mapIndexed { index, place ->
386- Pair (indices[index], MarkerPlaceGroup (
387- bookmarkLocationDao.findBookmarkLocation(place.name),
388- place
389- ))
390- }
391- )
392- } catch (e: Exception ) {
393- Timber .tag(" NearbyPinDetails" ).e(e)
394- // HTTP request failed. Try individual places
395- for (i in indices) {
396- launch {
397- val onePlaceBatch = mutableListOf<Pair <Int , MarkerPlaceGroup >>()
398- try {
399- val fetchedPlace = nearbyController.getPlaces(
400- mutableListOf (updatedGroups[i].place)
401- )
402-
403- onePlaceBatch.add(Pair (i, MarkerPlaceGroup (
404- bookmarkLocationDao.findBookmarkLocation(
405- fetchedPlace[0 ].name
406- ),
407- fetchedPlace[0 ]
408- )))
409- } catch (e: Exception ) {
410- Timber .tag(" NearbyPinDetails" ).e(e)
411- onePlaceBatch.add(Pair (i, updatedGroups[i]))
412- }
413- collectResults.send(onePlaceBatch)
414- }
415- }
416- }
417- }
418- }
419- }
528+ val batchedIndicesToFetch =
529+ createBatches(indicesToUpdate, LoadPlacesAsyncOptions .BATCH_SIZE )
530+
531+ val collectResults = Channel <List <Pair <Int , MarkerPlaceGroup >>>(Factory .UNLIMITED )
532+
533+ launchFetchPlacesThreads(LoadPlacesAsyncOptions .CONNECTION_COUNT , batchedIndicesToFetch,
534+ updatedGroups, collectResults)
535+
420536 var collectCount = 0
421537 while (collectCount < indicesToUpdate.size) {
422538 val resultList = collectResults.receive()
423- for ((index, fetchedPlaceGroup) in resultList) {
424- val existingPlace = updatedGroups[index].place
425- val finalPlaceGroup = MarkerPlaceGroup (
426- fetchedPlaceGroup.isBookmarked,
427- fetchedPlaceGroup.place.apply {
428- location = existingPlace.location
429- distance = existingPlace.distance
430- isMonument = existingPlace.isMonument
431- }
432- )
433- updatedGroups[index] = finalPlaceGroup
434- placesRepository
435- .save(finalPlaceGroup.place)
436- .subscribeOn(Schedulers .io())
437- .subscribe()
438- }
539+
540+ processResults(resultList, updatedGroups)
541+
439542 // handle any places clicked
440543 if (clickedPlacesIndex < clickedPlaces.size) {
441544 val clickedPlacesBacklog = hashMapOf<LatLng , Place >()
0 commit comments