import asyncio import collections import logging import random from itertools import groupby from typing import List, Tuple, Dict, Union from .random_street_view import call_random_street_view, VALID_COUNTRIES as RSV_COUNTRIES from .urban_centers import urban_coord, VALID_COUNTRIES as URBAN_COUNTRIES from .shared import aiohttp_client, reverse_geocode from ..schemas import GameConfig, GenMethodEnum, CountryCode, CacheInfo, GeneratorInfo logger = logging.getLogger(__name__) generator_info = [ GeneratorInfo( generation_method=GenMethodEnum.rsv, country_locks=RSV_COUNTRIES ), GeneratorInfo( generation_method=GenMethodEnum.urban, country_locks=URBAN_COUNTRIES ), ] cache_names = { GenMethodEnum.rsv: "RSV", GenMethodEnum.urban: "Urban", } class ExhaustedSourceError(Exception): pass DIFFICULTY_1 = [ # Singapore - very small, pretty obvious from lang combo "sg", # Israel, Taiwan, Japan, South Korea, Greece, Poland - immediately obvious from language "il", "tw", "jp", "kr", "gr", "pl", # Hong Kong - distraction from Taiwan "hk", ] DIFFICULTY_2 = [ # USA! USA! USA! USA! (suck it Europe) "us", "us", "us", "us", # Western Europe minus a few more interesting ones "ie", "gb", "es", "fr", "be", "nl", "lu", "de", "ch", "li", "at", "it", "mt", # Southern Africa (b/c English) "za", "ls", "sz", "na", "bw", "zw", # New Zealand (b/c English) "nz", ] DIFFICULTY_3 = [ # Nordic languages "is", "fo", "se", "no", "dk", "gl", # Finno-urgic "fi", "ee", # Other Baltics "lv", "lt", # Central + Eastern Europe + Balkans (non-Cyrillic, non-Polish confusable) "cz", "sk", "hu", "ro", "si", "hr", "ba", "al", "md", # Cyrillic Balkans "bg", "rs", "me", "mk", # Turkey can also have its language confused with some of the above "tr", # Caucasus "am", "az", "ge", # SE Asia (partial - mainly the ones with non-Latin scripts) "bt", "np", "bd", "mm", "kh", "vn", "th", "la", ] DIFFICULTY_4 = [ # SE Asia (partial - mainly the ones with harder to differentiate languages) "id", "my", # Middle East "iq", "jo", "lb", "sa", "ae", "om", # North Africa "eg", "dz", "tn", "ma", # West Africa "sn", "gi", "ng", # East Africa "ke", "et", # Mexico + Central America + South America minus Brazil (i.e., all Spanish) "mx", "gt", "ni", "pa", "co", "ec", "pe", "bo", "ar", "cl", ] DIFFICULTY_5 = [ # Canada + Australia "ca", "au", # Brazil + Portugal (lol) "br", "pt", # Russia + other Cyrillic + Mongolia "ru", "ua", "by", "kz", "kg", "tj", "tm", "uz", "mn", # India (basically all photo orbs) "in", ] DIFFICULTY_X = [ # tropical/subtropical island nations "lk", "cv", "cu", "do", "jm", "mg", "mv", "pg", "ph", "ws", "tt", "pr", ] DIFFICULTY_TIER_ORDER = ( DIFFICULTY_1, DIFFICULTY_2, DIFFICULTY_3, DIFFICULTY_4, DIFFICULTY_5, DIFFICULTY_4, DIFFICULTY_3, DIFFICULTY_2, DIFFICULTY_1, DIFFICULTY_X, ) class PointStore: def __init__(self, cache_targets: Dict[Tuple[GenMethodEnum, CountryCode], int], rsv_country_retries: int = 5, urban_country_pool_size: int = 30, urban_country_retries: int = 30, urban_city_retries: int = 50, urban_city_retries_per_random_country: int = 10): self.cache_targets = cache_targets self.rsv_country_retries = rsv_country_retries self.urban_country_pool_size = urban_country_pool_size self.urban_country_retries = urban_country_retries self.urban_city_retries = urban_city_retries self.urban_city_retries_per_random_country = urban_city_retries_per_random_country self.store = collections.defaultdict(collections.deque) async def _gen_rsv_point(self, country: CountryCode): # RSV point function returns a collection of points, which should be cached for actual_country, points in groupby(await call_random_street_view(country), key=lambda p: p[0]): # but these points need to be cached according to the actual reverse geocoded country they are in self.store[(GenMethodEnum.rsv, actual_country)].extend(points) stock = self.store[(GenMethodEnum.rsv, country)] if len(stock) > 0: return stock.popleft() async def _gen_urban_point(self, countries: List[CountryCode], city_retries: int): for country in countries: logger.info(f"Selecting urban centers from {country}") pt = await urban_coord(country, city_retries=city_retries) if pt is not None: if pt[0] == country: return pt else: # TODO technically this is slightly wasted effort in rare edge cases self.store[(GenMethodEnum.urban, pt[0])].append(pt) # TODO I think all of this logic still gets stuck in the trap of generating points even when there's a stock in some edge cases # this needs a rewrite but I'm not doing that now async def get_point(self, generator: GenMethodEnum, country: Union[CountryCode, None], force_generate: bool = False) -> Tuple[str, float, float]: if country is None: # generating points across the whole world # for current generators, this means selecting a country at random if generator == GenMethodEnum.rsv: for _ in range(self.rsv_country_retries): # try a few countries before giving up, just in case one has no data country = random.choice(RSV_COUNTRIES) point = await self._gen_rsv_point(country) if point is not None: return point elif generator == GenMethodEnum.urban: # try many countries since finding an urban center point is harder countries = random.sample(URBAN_COUNTRIES, k=min(self.urban_country_pool_size, len(URBAN_COUNTRIES))) point = await self._gen_urban_point(countries, self.urban_city_retries_per_random_country) if point is not None: return point # if nothing could be done - inform the caller raise ExhaustedSourceError # generating points for a specific country # if we already have a point ready, just return it immediately if not force_generate: stock = self.store[(generator, country)] if len(stock) > 0: return stock.popleft() # otherwise, need to actually generate a new point if generator == GenMethodEnum.rsv: point = await self._gen_rsv_point(country) if point is not None: return point elif generator == GenMethodEnum.urban: point = await self._gen_urban_point((country for _ in range(self.urban_country_retries)), self.urban_city_retries) if point is not None: return point # finally, if all that fails, just inform the caller raise ExhaustedSourceError async def get_points(self, config: GameConfig) -> List[Tuple[str, float, float]]: """ Provide points according to the GameConfig. Return a list of valid geo points, as (2 character country code, latitude, longitude) tuples. In the event that the configured source cannot reasonably supply enough points, most likely due to time constraints, this will raise an ExhaustedSourceError. """ try: if config.generation_method == GenMethodEnum.diff_tiered: # in the case of using the "difficulty tiered" generator there is some special logic # assume that, in general, we want 10 points (4 normal rounds going up in difficulty, 1 max difficulty round, 4 normal going down, 1 nightmare tier) # if more are requested, it repeats. if less, it only goes that far. def make_point_task(tier, attempts): if attempts <= 0: raise ExhaustedSourceError try: country_lock = random.choice(tier) if country_lock in random_street_view.VALID_COUNTRIES: return self.get_point(GenMethodEnum.rsv, country_lock) elif country_lock in urban_centers.VALID_COUNTRIES: return self.get_point(GenMethodEnum.urban, country_lock) else: raise ExhaustedSourceError except: return make_point_task(tier, attempts - 1) point_tasks = [make_point_task(DIFFICULTY_TIER_ORDER[i % len(DIFFICULTY_TIER_ORDER)], 3) for i in range(config.rounds)] else: point_tasks = [self.get_point(config.generation_method, config.country_lock) for _ in range(config.rounds)] gathered = asyncio.gather(*point_tasks) return await asyncio.wait_for(gathered, 60) # TODO - it would be nice to keep partially generated sets around if there's a timeout or exhaustion except asyncio.TimeoutError: raise ExhaustedSourceError def get_cache_info(self) -> List[CacheInfo]: """ Get CacheInfo for all caches. """ return [CacheInfo(cache_name=f"{cache_names[g]}-{c}", size=len(ps)) for (g, c), ps in self.store.items()] async def _restock_source_impl(self, generator: GenMethodEnum, country: CountryCode): key = (generator, country) target = self.cache_targets.get(key, 0) stock = self.store[key] while len(stock) < target: # this check allows for RSV to do its multi-point restock stock.append(await self.get_point(*key, force_generate=True)) async def restock_source(self, config: GameConfig): """ Restock any caches associated with the GameConfig. """ if config.country_lock is None: return try: await self._restock_source_impl(config.generation_method, config.country_lock) except ExhaustedSourceError: # if the cache can't be restocked, that is bad, but not fatal logger.exception(f"Failed to fully restock point cache for {config}") async def restock_all(self, timeout: Union[int, float, None] = None): """ Restock all caches. """ restock_tasks = [self._restock_source_impl(gen, cc) for (gen, cc) in self.cache_targets.keys()] gathered = asyncio.gather(*restock_tasks) try: await asyncio.wait_for(gathered, timeout) except (asyncio.TimeoutError, ExhaustedSourceError): # if this task times out, it's fine, as it's just intended to be a best effort logger.exception("Failed to fully restock a point cache!") points = PointStore({ (GenMethodEnum.urban, "us"): 10, })