import random import logging from .shared import aiohttp_client, point_has_streetview, ExhaustedSourceError RSV_URL = "https://randomstreetview.com/data" VALID_COUNTRIES = ("ad", "au", "ar", "bd", "be", "bt", "bw", "br", "bg", "kh", "ca", "cl", "hr", "co", "cz", "dk", "ae", "ee", "fi", "fr", "de", "gr", "hu", "hk", "is", "id", "ie", "it", "il", "jp", "lv", "lt", "my", "mx", "nl", "nz", "no", "pe", "pl", "pt", "ro", "ru", "sg", "sk", "si", "za", "kr", "es", "sz", "se", "ch", "tw", "th", "ua", "gb", "us") logger = logging.getLogger(__name__) async def call_random_street_view(country_lock, max_attempts=5): """ Returns an array of (some number of) tuples, each being (latitude, longitude). All points will be valid streetview coordinates, in the country indicated by country_lock. If the country_lock provided is None, a valid one is chosen at random. The returned array will never be empty. If after max_attempts no points can be found (which is very rare), this raises an ExhaustedSourceError. This function calls the streetview metadata endpoint - there is no quota consumed. """ if country_lock is None: country_lock = random.choice(VALID_COUNTRIES) for _ in range(max_attempts): logger.info("Attempting RSV...") try: async with aiohttp_client.post(RSV_URL, data={"country": country_lock.lower()}) as response: rsv_js = await response.json(content_type=None) logger.info(f"Got back {rsv_js.keys()}") except: logger.exception("Failed RSV") continue if not rsv_js["success"]: continue points = [ (country_lock, point["lat"], point["lng"]) for point in rsv_js["locations"] if await point_has_streetview(point["lat"], point["lng"]) ] if len(points) > 0: return points else: raise ExhaustedSourceError()