shared.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. from typing import List, Tuple, Union, Dict
  2. import collections
  3. import logging
  4. import requests
  5. # Google API key, with access to Street View Static API
  6. # this can be safely committed due to permission restriction
  7. google_api_key = "AIzaSyAqjCYR6Szph0X0H_iD6O1HenFhL9jySOo"
  8. metadata_url = "https://maps.googleapis.com/maps/api/streetview/metadata"
  9. logger = logging.getLogger(__name__)
  10. def point_has_streetview(lat, lng):
  11. """
  12. Returns True if the streetview metadata endpoint says a given point has
  13. data available, and False otherwise.
  14. This function calls the streetview metadata endpoint - there is no quota consumed.
  15. """
  16. return requests.get(metadata_url, params={
  17. "key": google_api_key,
  18. "location": f"{lat},{lng}",
  19. }).json()["status"] == "OK"
  20. class ExhaustedSourceError(Exception):
  21. def __init__(self, partial=[]):
  22. self.partial = partial
  23. class GeoPointSource:
  24. """
  25. Abstract base class for a source of geo points
  26. """
  27. def get_name(self) -> str:
  28. """
  29. Return a human-readable name for this point source, for debugging purposes.
  30. """
  31. raise NotImplemented("Must be implemented by subclasses")
  32. def get_points(self, n: int) -> List[Tuple[str, float, float]]:
  33. """
  34. Return a list of at least n valid geo points, as
  35. (2 character country code, latitude, longitude) tuples.
  36. In the event that the GeoPointSource cannot reasonably supply enough points,
  37. most likely due to time constraints, it should raise an ExhaustedSourceError.
  38. """
  39. raise NotImplemented("Must be implemented by subclasses")
  40. class CachedGeoPointSource(GeoPointSource):
  41. """
  42. Wrapper tool for maintaing a cache of points from a GeoPointSource to
  43. make get_points faster, at the exchange of needing to restock those
  44. points after the fact. This can be done in another thread, however, to
  45. hide this cost from the user.
  46. """
  47. def __init__(self, source: GeoPointSource, stock_target: int):
  48. self.source = source
  49. self.stock = collections.deque()
  50. self.stock_target = stock_target
  51. def get_name(self):
  52. return f"Cached({self.source.get_name()}, {self.stock_target})"
  53. def restock(self, n: Union[int, None] = None):
  54. """
  55. Restock at least n points into this source.
  56. If n is not provided, it will default to stock_target, as set during the
  57. construction of this point source.
  58. """
  59. n = n if n is not None else self.stock_target - len(self.stock)
  60. if n > 0:
  61. logger.info(f"Restocking {type(self).__name__} with {n} points")
  62. try:
  63. pts = self.source.get_points(n)
  64. except ExhaustedSourceError as e:
  65. pts = e.partial # take what we can get
  66. self.stock.extend(pts)
  67. diff = n - len(pts)
  68. if diff > 0:
  69. # if implementations of source.get_points are well behaved, this will
  70. # never actually need to recurse to finish the job.
  71. self.restock(n=diff)
  72. logger.info(f"Finished restocking {type(self).__name__}")
  73. def get_points(self, n: int) -> List[Tuple[str, float, float]]:
  74. """
  75. Pull n points from the current stock.
  76. It is recommended to call CachedGeoPointSource.restock after this, to ensure
  77. the stock is not depleted. If possible, calling restock in another thread is
  78. recommended, as it can be a long operation depending on implementation.
  79. """
  80. if len(self.stock) >= n:
  81. pts = []
  82. for _ in range(n):
  83. pts.append(self.stock.popleft())
  84. return pts
  85. self.restock(n=n)
  86. # this is safe as long as restock does actually add enough new points.
  87. # unless this object is being rapidly drained by another thread,
  88. # this will recur at most once.
  89. return self.get_points(n=n)
  90. class GeoPointSourceGroup:
  91. """
  92. Container of multiple GeoPointSources, each with some key.
  93. """
  94. def __init__(self, sources: Dict[str, GeoPointSource], default: GeoPointSource):
  95. self.sources = sources
  96. self.default = default
  97. self.cached = [s for s in sources.values() if isinstance(s, CachedGeoPointSource)]
  98. if isinstance(default, CachedGeoPointSource):
  99. self.cached.append(default)
  100. def restock(self, key: Union[str, None] = None):
  101. """
  102. Restock a CachedGeoPointSources managed by this group.
  103. If the targeted GeoPointSource is uncached, this method does nothing.
  104. """
  105. src = self.sources.get(key, self.default)
  106. if isinstance(src, CachedGeoPointSource):
  107. src.restock()
  108. def get_points_from(self, n: int, key: Union[str, None] = None) -> List[Tuple[str, float, float]]:
  109. """
  110. Return a list of at least n valid geo points, for a given key. If no key
  111. is provided, or no matching GeoPointSource is found, the default
  112. GeoPointSource will be used.
  113. """
  114. return self.sources.get(key, self.default).get_points(n)