123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285 |
- import json
- import math
- import random
- import threading
- import collections
- import time
- import requests
- import haversine
- # Google API key, with access to Street View Static API
- google_api_key = "AIzaSyAqjCYR6Szph0X0H_iD6O1HenFhL9jySOo"
- metadata_url = "https://maps.googleapis.com/maps/api/streetview/metadata"
- mapcrunch_url = "http://www.mapcrunch.com/_r/"
- rsv_url = "https://randomstreetview.com/data"
- urban_centers_usa = []
- urban_centers_non_usa = []
- with open("./urban-centers-usa.csv") as infile:
- for line in infile:
- lat, lng = line.split(",")
- urban_centers_usa.append((float(lat.strip()), float(lng.strip())))
- with open("./urban-centers-non-usa.csv") as infile:
- for line in infile:
- lat, lng = line.split(",")
- urban_centers_non_usa.append((float(lat.strip()), float(lng.strip())))
- def point_has_streetview(lat, lng):
- """
- Returns True if the streetview metadata endpoint says a given point has
- data available, and False otherwise.
- This function calls the streetview metadata endpoint - there is no quota consumed.
- """
- params = {
- "key": google_api_key,
- "location": f"{lat},{lng}",
- }
- js = requests.get(metadata_url, params=params).json()
- return js["status"] == "OK"
- def generate_coord(max_retries=100, only_america=False):
- """
- Returns (latitude, longitude) of usable coord (where google has data).
- This function will attempt at most max_retries calls to map crunch to fetch
- candidate points, and will exit as soon as a suitable candidate is found.
- If no suitable candidate is found in this allotted number of retries, None is
- returned.
- This function calls the streetview metadata endpoint - there is no quota consumed.
- """
- mc_url = mapcrunch_url + ("?c=21" if only_america else "")
- for _ in range(max_retries):
- points_res = requests.get(mc_url).text
- points_js = json.loads(points_res.strip("while(1); "))
- if "c=" not in mc_url:
- mc_url += f"?c={points_js['country']}" # lock to the first country randomed
- for lat, lng in points_js["points"]:
- if point_has_streetview(lat, lng):
- return (lat, lng)
- def call_random_street_view(only_america=False):
- """
- Returns an array of (some number of) tuples, each being (latitude, longitude).
- All points will be valid streetview coordinates. There is no guarantee as to the
- length of this array (it may be empty), but it will never be None.
- This function calls the streetview metadata endpoint - there is no quota consumed.
- """
- rsv_js = requests.post(rsv_url, data={"country": "us" if only_america else "all"}).json()
- if not rsv_js["success"]:
- return []
-
- return [
- (point["lat"], point["lng"])
- for point in rsv_js["locations"]
- if point_has_streetview(point["lat"], point["lng"])
- ]
- def random_street_view_generator(only_america=False):
- """
- Returns a generator which will lazily use call_random_street_view to generate new
- street view points.
- The returned generator calls the streetview metadata endpoint - there is no quota consumed.
- """
- points = []
- while True:
- if len(points) == 0:
- points = call_random_street_view(only_america=only_america)
- else:
- yield points.pop()
- def urban_coord(max_retries=10, retries_per_point=30, max_dist_km=25, usa_chance=0.1):
- """
- Returns (latitude, longitude) of usable coord (where google has data) that is near
- a known urban center. Points will be at most max_dist_km kilometers away. This function will
- generate at most retries_per_point points around an urban center, and will try at most
- max_retries urban centers. If none of the generated points have street view data,
- this will return None. Otherwise, it will exit as soon as suitable point is found.
- This function calls the streetview metadata endpoint - there is no quota consumed.
- """
- src = urban_centers_usa if random.random() <= usa_chance else urban_centers_non_usa
- for _ in range(max_retries):
- # logic adapted from https://stackoverflow.com/a/7835325
- # start in a city
- (city_lat, city_lng) = random.choice(src)
- city_lat_rad = math.radians(city_lat)
- sin_lat = math.sin(city_lat_rad)
- cos_lat = math.cos(city_lat_rad)
- city_lng_rad = math.radians(city_lng)
- for _ in range(retries_per_point):
- # turn a random direction, and go random distance
- dist_km = random.random() * max_dist_km
- angle_rad = random.random() * 2 * math.pi
- d_over_radius = dist_km / mean_earth_radius_km
- sin_dor = math.sin(d_over_radius)
- cos_dor = math.cos(d_over_radius)
- pt_lat_rad = math.asin(sin_lat * cos_dor + cos_lat * sin_dor * math.cos(angle_rad))
- pt_lng_rad = city_lng_rad + math.atan2(math.sin(angle_rad) * sin_dor * cos_lat, cos_dor - sin_lat * math.sin(pt_lat_rad))
- pt_lat = math.degrees(pt_lat_rad)
- pt_lng = math.degrees(pt_lng_rad)
- if point_has_streetview(pt_lat, pt_lng):
- return (pt_lat, pt_lng)
- class PointSource:
- def __init__(self, stock_target):
- self.stock = collections.deque()
- self.stock_target = stock_target
- def _restock_impl(self, n):
- """
- Returns a list of new points to add to the stock.
- Implementations of this method should try to return at least n points for performance.
- """
- raise NotImplementedError("Subclasses must implement this")
-
- def restock(self, n=None):
- n = n if n is not None else self.stock_target - len(self.stock)
- if n > 0:
- pts = self._restock_impl(n)
- self.stock.extend(pts)
- diff = n - len(pts)
- if diff > 0:
- # if implementations of _restock_impl are well behaved, this will
- # never actually need to recurse to finish the job.
- self.restock(n=diff)
- def get_points(self, n=1):
- if len(self.stock) >= n:
- pts = []
- for _ in range(n):
- pts.append(self.stock.popleft())
- threading.Thread(target=self.restock).start()
- return pts
- self.restock(n=n)
- # this is safe as long as restock does actually add enough new points.
- # unless this object is being rapidly drained by another thread,
- # this will recur at most once.
- return self.get_points(n=n)
- class MapCrunchPointSource(PointSource):
- def __init__(self, stock_target=20, max_retries=100, only_america=False):
- super().__init__(stock_target=stock_target)
- self.max_retries = max_retries
- self.only_america = only_america
- def _restock_impl(self, n):
- points = []
- while len(points) < n:
- pt = generate_coord(
- max_retries=self.max_retries,
- only_america=self.only_america
- )
- if pt is not None:
- points.append(pt)
- return points
- class RSVPointSource(PointSource):
- def __init__(self, stock_target=20, only_america=False):
- super().__init__(stock_target=stock_target)
- self.only_america = only_america
-
- def _restock_impl(self, n):
- points = []
- while len(points) < n:
- points.extend(call_random_street_view(only_america=self.only_america))
- return points
- class UrbanPointSource(PointSource):
- def __init__(self, stock_target=20, max_retries=10, retries_per_point=30, max_dist_km=25, usa_chance=0.1):
- super().__init__(stock_target=stock_target)
- self.max_retries = max_retries
- self.retries_per_point = retries_per_point
- self.max_dist_km = max_dist_km
- self.usa_chance = usa_chance
-
- def _restock_impl(self, n):
- points = []
- while len(points) < n:
- pt = urban_coord(
- max_retries=self.max_retries,
- retries_per_point=self.retries_per_point,
- max_dist_km=self.max_dist_km,
- usa_chance=self.usa_chance
- )
- if pt is not None:
- points.append(pt)
- return points
- mean_earth_radius_km = (6378 + 6357) / 2
- # if you're more than 1/4 of the Earth's circumfrence away, you get 0
- max_dist_km = (math.pi * mean_earth_radius_km) / 2 # this is about 10,000 km
- # if you're within 1/16 of the Earth's circumfrence away, you get at least 1000 points
- quarter_of_max_km = max_dist_km / 4 # this is about 2,500 km
- # https://www.wolframalpha.com/input/?i=sqrt%28%28%28land+mass+of+earth%29+%2F+7%29%29+%2F+pi%29+in+kilometers
- # this is the average "radius" of a continent
- # within this radius, you get at least 2000 points
- avg_continental_rad_km = 1468.0
- # somewhat arbitrarily, if you're within 1000 km, you get at least 3000 points
- one_thousand = 1000.0
- # https://www.wolframalpha.com/input/?i=sqrt%28%28%28land+mass+of+earth%29+%2F+%28number+of+countries+on+earth%29%29+%2F+pi%29+in+kilometers
- # this is the average "radius" of a country
- # within this radius, you get at least 4000 points
- avg_country_rad_km = 479.7
- # if you're within 150m, you get a perfect score of 5000
- min_dist_km = 0.15
- def score_within(raw_dist, min_dist, max_dist):
- """
- Gives a score between 0 and 1000, with 1000 for the min_dist and 0 for the max_dist
- """
- # scale the distance down to [0.0, 1.0], then multiply it by 2 for easing
- pd2 = 2 * (raw_dist - min_dist) / (max_dist - min_dist)
- # perform a quadratic ease-in-out on pd2
- r = (pd2 ** 2) / 2 if pd2 < 1 else 1 - (((2 - pd2) ** 2) / 2)
- # use this to ease between 1000 and 0
- return int(1000 * (1 - r))
- def score(target, guess):
- """
- Takes in two (latitude, longitude) pairs and produces an int score.
- Score is in the (inclusive) range [0, 5000]
- Higher scores are closer.
- Returns (score, distance in km)
- """
- dist_km = haversine.haversine(target, guess)
- if dist_km <= min_dist_km:
- point_score = 5000
- elif dist_km <= avg_country_rad_km:
- point_score = 4000 + score_within(dist_km, min_dist_km, avg_country_rad_km)
- elif dist_km <= one_thousand:
- point_score = 3000 + score_within(dist_km, avg_country_rad_km, one_thousand)
- elif dist_km <= avg_continental_rad_km:
- point_score = 2000 + score_within(dist_km, one_thousand, avg_continental_rad_km)
- elif dist_km <= quarter_of_max_km:
- point_score = 1000 + score_within(dist_km, avg_continental_rad_km, quarter_of_max_km)
- elif dist_km <= max_dist_km:
- point_score = score_within(dist_km, quarter_of_max_km, max_dist_km)
- else: # dist_km > max_dist_km
- point_score = 0
- return point_score, dist_km
|