diff --git a/map.png b/map.png new file mode 100644 index 0000000..2135caa Binary files /dev/null and b/map.png differ diff --git a/map_test.py b/map_test.py new file mode 100644 index 0000000..e1f2362 --- /dev/null +++ b/map_test.py @@ -0,0 +1,125 @@ +import math + +import requests +from PIL import Image, ImageDraw, ImageFont +from staticmap import Polygon, StaticMap + + +class AttribStaticMap(StaticMap, object): + def __init__(self, *args, **kwargs): + self.attribution = "© Stadia Maps © OpenMapTiles © OpenStreetMap" + super(AttribStaticMap, self).__init__(*args, **kwargs) + + def _draw_features(self, image): + super(AttribStaticMap, self)._draw_features(image) + + txt = Image.new("RGBA", image.size, (255, 255, 255, 0)) + # get a font + # fnt = ImageFont.truetype('FreeMono.ttf', 12) + fnt = ImageFont.load_default() + # get a drawing context + d = ImageDraw.Draw(txt) + + textSize = fnt.getbbox(self.attribution) + textPosition = (image.size[0] - textSize[2], image.size[1] - textSize[3]) + offset = 2 + options = {"fill": (255, 255, 255, 180)} + d.rectangle( + [ + (textPosition[0] - (2 * offset), textPosition[1] - (2 * offset)), + ( + textSize[2] + textPosition[0] + (2 * offset), + textSize[3] + textPosition[1] + (2 * offset), + ), + ], + **options + ) + + # draw text, full opacity + d.text( + (textPosition[0] - offset, textPosition[1] - offset), + self.attribution, + font=fnt, + fill="black", + ) + + image.paste(txt, (0, 0), txt) + + +scl_events_url = "https://utilisocial.io/datacapable/v2/p/scl/map/events" +scl_events_response = requests.get(scl_events_url) +try: + scl_events = scl_events_response.json() +except requests.JSONDecodeError: + print("JSON could not be loaded from SCL API") + raise + +with open("stadiamaps_api_key.secret", "r+") as stadiamaps_api_key_file: + # Reading from a file + stadiamaps_api_key = stadiamaps_api_key_file.read() + +map = AttribStaticMap( + 512, + 512, + url_template="https://tiles.stadiamaps.com/tiles/outdoors/{z}/{x}/{y}.png?api_key=" + + stadiamaps_api_key, +) +for ring in scl_events[0]["polygons"]["rings"]: + polygon = Polygon(ring, "#F973167F", "#F97316", simplify=True) + map.add_polygon(polygon) +image = map.render() +image.save("map.png", "PNG") + + +try: + def num2deg(xtile, ytile, zoom): + n = 1 << zoom + lon_deg = xtile / n * 360.0 - 180.0 + lat_rad = math.atan(math.sinh(math.pi * (1 - 2 * ytile / n))) + lat_deg = math.degrees(lat_rad) + return lat_deg, lon_deg + + center_lat_lon = num2deg(map.x_center, map.y_center, map.zoom) + + geocode_url = "http://gruezi-skyros.srv.gruezi.net:6664/reverse?lat={lat}&lon={lon}&format=geocodejson".format( + lat=center_lat_lon[0], lon=center_lat_lon[1] + ) + geocode_headers = {"User-Agent": "seattlecitylight-mastodon-bot"} + geocode_response = requests.get(geocode_url, headers=geocode_headers) + try: + geocode = geocode_response.json() + except requests.JSONDecodeError: + print("JSON could not be loaded from nominatim API") + raise + + if geocode["features"][0]["properties"]["geocoding"]["city"] != "Seattle": + city_not_seattle_text = " of {}".format( + geocode["features"][0]["properties"]["geocoding"]["city"] + ) + else: + city_not_seattle_text = "" + + if "locality" in geocode["features"][0]["properties"]["geocoding"]: + locality = geocode["features"][0]["properties"]["geocoding"] + if locality == "Uptown": + locality = "Lower Queen Anne" + + alt_text = "A map showing the location of the outage, centered around {} in the {} area{}.".format( + geocode["features"][0]["properties"]["geocoding"]["name"], + locality, + city_not_seattle_text, + ) + elif "district" in geocode["features"][0]["properties"]["geocoding"]: + alt_text = "A map showing the location of the outage, centered around {} in the {} area{}.".format( + geocode["features"][0]["properties"]["geocoding"]["name"], + geocode["features"][0]["properties"]["geocoding"]["district"], + city_not_seattle_text, + ) + else: + alt_text = "A map showing the location of the outage, centered around {} in {}.".format( + geocode["features"][0]["properties"]["geocoding"]["name"], + geocode["features"][0]["properties"]["geocoding"]["city"], + ) +except Exception: + alt_text = "A map showing the location of the outage." +print(alt_text) diff --git a/scl.py b/scl.py index fe1ed00..047c41d 100644 --- a/scl.py +++ b/scl.py @@ -5,13 +5,15 @@ from typing import Optional import mastodon import requests +import sqlalchemy.types as types +import staticmap import yaml from mastodon import Mastodon from PIL import Image, ImageDraw, ImageFont +from shapely import Geometry, MultiPolygon, Point, Polygon, from_wkb, to_wkb from sqlalchemy import create_engine, select from sqlalchemy.exc import NoResultFound from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column -from staticmap import Polygon, StaticMap post_datetime_format = "%b %e %l:%M %p" @@ -34,7 +36,7 @@ mastodon_client = Mastodon( ) -class AttribStaticMap(StaticMap, object): +class AttribStaticMap(staticmap.StaticMap, object): def __init__(self, *args, **kwargs): self.attribution = "© Stadia Maps © OpenMapTiles © OpenStreetMap" super(AttribStaticMap, self).__init__(*args, **kwargs) @@ -103,6 +105,16 @@ def get_hashtag_string(event) -> str: return hashtag_string +def convert_outage_geometry(event) -> MultiPolygon: + assert event["polygons"]["type"] == "polygon" + assert event["polygons"]["hasZ"] is False + assert event["polygons"]["hasM"] is False + polygon_list = [] + for ring in event["polygons"]["rings"]: + polygon_list.append(Polygon(ring)) + return MultiPolygon(polygon_list) + + def do_initial_post( event, event_class, start_time: datetime, estimated_restoration_time: datetime ) -> dict[str, str | None]: @@ -119,7 +131,7 @@ def do_initial_post( ) assert event["polygons"]["type"] == "polygon" for ring in event["polygons"]["rings"]: - polygon = Polygon( + polygon = staticmap.Polygon( ring, # Appending 7F to the fill_color makes it 50% transparent fill_color="{}7F".format(event_class["outage_color"]), @@ -204,11 +216,14 @@ def do_initial_post( with io.BytesIO() as map_image_file: map_image.save(map_image_file, format="PNG", optimize=True) - map_media_post = mastodon_client.media_post( - map_image_file.getvalue(), - mime_type="image/png", - description=alt_text, - ) + if __debug__: + print("Would have uploaded the map media here") + else: + map_media_post = mastodon_client.media_post( + map_image_file.getvalue(), + mime_type="image/png", + description=alt_text, + ) map_media_post_id = map_media_post["id"] except Exception as e: @@ -251,8 +266,21 @@ Cause: {} return {"post_id": post_id, "map_media_post_id": map_media_post_id} +class GeometryWkb(types.TypeDecorator): + impl = types.LargeBinary + cache_ok = True + + def process_bind_param(self, value, dialect): + return to_wkb(value) + + def process_result_value(self, value, dialect): + return from_wkb(value) + + class Base(DeclarativeBase): - pass + type_annotation_map = { + Geometry: GeometryWkb, + } class SclOutage(Base): @@ -270,9 +298,10 @@ class SclOutage(Base): start_time: Mapped[datetime] = mapped_column() num_people: Mapped[int] = mapped_column() max_num_people: Mapped[int] = mapped_column() + outage_geometries: Mapped[Geometry] = mapped_column() def __repr__(self) -> str: - return f"SclOutage(scl_outage_id={self.scl_outage_id!r}, most_recent_post_id={self.most_recent_post_id!r}, initial_post_id={self.initial_post_id!r}, map_media_post_id={self.map_media_post_id!r}, last_updated_time={self.last_updated_time!r}, no_longer_in_response_time={self.no_longer_in_response_time!r}), start_time={self.start_time!r}), num_people={self.num_people!r}), max_num_people={self.max_num_people!r})" + return f"SclOutage(scl_outage_id={self.scl_outage_id!r}, most_recent_post_id={self.most_recent_post_id!r}, initial_post_id={self.initial_post_id!r}, map_media_post_id={self.map_media_post_id!r}, last_updated_time={self.last_updated_time!r}, no_longer_in_response_time={self.no_longer_in_response_time!r}), start_time={self.start_time!r}), num_people={self.num_people!r}), max_num_people={self.max_num_people!r}), outage_geometries={self.outage_geometries!r}" engine = create_engine("sqlite:///scl.db") @@ -297,6 +326,9 @@ with Session(engine) as session: else: status = None + outage_geometries = convert_outage_geometry(event) + scl_outage_location = Point(event["latitude"], event["longitude"]) + try: hashtag_string = get_hashtag_string(event) existing_record = lookup_result.one() @@ -329,6 +361,10 @@ with Session(engine) as session: # Used to determine the maximum number of people affected by this outage, to determine if it's worth posting about existing_record.max_num_people = event["numPeople"] max_event_class = classify_event_size(existing_record.max_num_people) + if existing_record.outage_geometries != outage_geometries: + print("updating geometries") + existing_record.outage_geometries = outage_geometries + if updated_properties: updated_properties.sort() @@ -368,6 +404,8 @@ with Session(engine) as session: existing_record.map_media_post_id = initial_post_result[ "map_media_post_id" ] + else: + print("Existing record was found, and no properties were updated.") session.commit() except NoResultFound: @@ -400,6 +438,7 @@ with Session(engine) as session: start_time=start_time, num_people=event["numPeople"], max_num_people=event["numPeople"], + outage_geometries=outage_geometries, ) session.add(new_outage_record) session.commit()