import asyncio import collections import random from typing import List, Tuple, Dict, Union from .random_street_view import call_random_street_view, VALID_COUNTRIES as RSV_COUNTRIES from .urban_centers import urban_coord_unlocked, urban_coord_ensured, VALID_COUNTRIES as URBAN_COUNTRIES from .shared import ExhaustedSourceError, aiohttp_client from ..schemas import GameConfig, GenMethodEnum, CountryCode, CacheInfo, GeneratorInfo generator_info = [ GeneratorInfo( generation_method=GenMethodEnum.rsv, country_locks=RSV_COUNTRIES ), GeneratorInfo( generation_method=GenMethodEnum.urban, country_locks=URBAN_COUNTRIES ), ] cache_names = { GenMethodEnum.rsv: "RSV", GenMethodEnum.urban: "Urban", } class PointStore: def __init__(self, cache_targets: Dict[Tuple[GenMethodEnum, CountryCode], int]): self.cache_targets = cache_targets self.store = collections.defaultdict(collections.deque) async def generate_point(self, generator: GenMethodEnum, country: Union[CountryCode, None]) -> Tuple[str, float, float]: if generator == GenMethodEnum.rsv: # RSV point functions return a collection of points, which should be cached point, *points = await call_random_street_view(country) # use the country on the point - since country itself might be None self.store[(generator, point[0])].extend(points) return point elif generator == GenMethodEnum.urban: # urban center point functions only return a single point if country is None: return await urban_coord_unlocked() return await urban_coord_ensured(country, city_retries=50) else: raise ExhaustedSourceError async def get_point(self, generator: GenMethodEnum, country: Union[CountryCode, None]) -> Tuple[str, float, float]: if country is not None: # if we already have a point ready, just return it immediately # to avoid bias, we only do this in country-locking mode stock = self.store[(generator, country)] if len(stock) > 0: return stock.popleft() return await self.generate_point(generator, country) async def get_points(self, config: GameConfig) -> List[Tuple[str, float, float]]: """ Provide points according to the GameConfig. Return a list of at least n valid geo points, as (2 character country code, latitude, longitude) tuples. In the event that the configured source cannot reasonably supply enough points, most likely due to time constraints, this will raise an ExhaustedSourceError. """ return await asyncio.gather(*[self.get_point(config.generation_method, config.country_lock) for _ in range(config.rounds)]) def get_cache_info(self) -> List[CacheInfo]: """ Get CacheInfo for all caches. """ return [CacheInfo(cache_name=f"{cache_names[g]}-{c}", size=len(ps)) for (g, c), ps in self.store.items()] async def restock_source(self, config: GameConfig): """ Restock any caches associated with the GameConfig. """ if config.country_lock is None: return key = (config.generation_method, config.country_lock) target = self.cache_targets.get(key, 0) stock = self.store[key] while len(stock) < target: stock.append(await self.generate_point(*key)) points = PointStore({ (GenMethodEnum.urban, "us"): 10, })