1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192 |
- import asyncio
- import collections
- import random
- from typing import List, Tuple, Dict, Union
- from .random_street_view import call_random_street_view, VALID_COUNTRIES as RSV_COUNTRIES
- from .urban_centers import urban_coord_unlocked, urban_coord_ensured, VALID_COUNTRIES as URBAN_COUNTRIES
- from .shared import ExhaustedSourceError, aiohttp_client
- from ..schemas import GameConfig, GenMethodEnum, CountryCode, CacheInfo, GeneratorInfo
- generator_info = [
- GeneratorInfo(
- generation_method=GenMethodEnum.rsv,
- country_locks=RSV_COUNTRIES
- ),
- GeneratorInfo(
- generation_method=GenMethodEnum.urban,
- country_locks=URBAN_COUNTRIES
- ),
- ]
- cache_names = {
- GenMethodEnum.rsv: "RSV",
- GenMethodEnum.urban: "Urban",
- }
- class PointStore:
- def __init__(self, cache_targets: Dict[Tuple[GenMethodEnum, CountryCode], int]):
- self.cache_targets = cache_targets
- self.store = collections.defaultdict(collections.deque)
- async def generate_point(self, generator: GenMethodEnum, country: Union[CountryCode, None]) -> Tuple[str, float, float]:
- if generator == GenMethodEnum.rsv:
- # RSV point functions return a collection of points, which should be cached
- point, *points = await call_random_street_view(country)
- # use the country on the point - since country itself might be None
- self.store[(generator, point[0])].extend(points)
- return point
- elif generator == GenMethodEnum.urban:
- # urban center point functions only return a single point
- if country is None:
- return await urban_coord_unlocked()
- return await urban_coord_ensured(country, city_retries=50)
- else:
- raise ExhaustedSourceError()
- async def get_point(self, generator: GenMethodEnum, country: Union[CountryCode, None]) -> Tuple[str, float, float]:
- if country is not None:
- # if we already have a point ready, just return it immediately
- # to avoid bias, we only do this in country-locking mode
- stock = self.store[(generator, country)]
- if len(stock) > 0:
- return stock.popleft()
- return await self.generate_point(generator, country)
- async def get_points(self, config: GameConfig) -> List[Tuple[str, float, float]]:
- """
- Provide points according to the GameConfig.
- Return a list of at least n valid geo points, as
- (2 character country code, latitude, longitude) tuples.
- In the event that the configured source cannot reasonably supply enough points,
- most likely due to time constraints, this will raise an ExhaustedSourceError.
- """
- return await asyncio.gather(*[self.get_point(config.generation_method, config.country_lock) for _ in range(config.rounds)])
- def get_cache_info(self) -> List[CacheInfo]:
- """
- Get CacheInfo for all caches.
- """
- return [CacheInfo(cache_name=f"{cache_names[g]}-{c}", size=len(ps)) for (g, c), ps in self.store.items()]
- async def restock_source(self, config: GameConfig):
- """
- Restock any caches associated with the GameConfig.
- """
- if config.country_lock is None:
- return
- key = (config.generation_method, config.country_lock)
- target = self.cache_targets.get(key, 0)
- stock = self.store[key]
- while len(stock) < target:
- stock.append(await self.generate_point(*key))
- points = PointStore({
- (GenMethodEnum.urban, "us"): 10,
- })
|