import io from datetime import datetime from typing import Optional import mastodon import requests import shapely import yaml from mastodon import Mastodon from PIL import Image, ImageDraw, ImageFont from sqlalchemy import create_engine, select from sqlalchemy.exc import NoResultFound from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column from staticmap import CircleMarker, Polygon, StaticMap from geospatial import convert_outage_geometry post_datetime_format = "%b %e %l:%M %p" scl_events_url = "https://utilisocial.io/datacapable/v2/p/scl/map/events" scl_events_response = requests.get(scl_events_url) try: scl_events = scl_events_response.json() except requests.JSONDecodeError: print("JSON could not be loaded from SCL API") raise config = yaml.safe_load(open("config.yml")) stadiamaps_api_key = config["stadiamaps"]["api_key"] nominatim_url = config["nominatim"]["api_base_url"] mastodon_client = Mastodon( client_id=config["mastodon"]["client_id"], client_secret=config["mastodon"]["client_secret"], access_token=config["mastodon"]["access_token"], api_base_url=config["mastodon"]["api_base_url"], ) class AttribStaticMap(StaticMap, object): def __init__(self, *args, **kwargs): self.attribution = "© Stadia Maps © OpenMapTiles © OpenStreetMap" super(AttribStaticMap, self).__init__(*args, **kwargs) def _draw_features(self, image): super(AttribStaticMap, self)._draw_features(image) txt = Image.new("RGBA", image.size, (255, 255, 255, 0)) # get a font # fnt = ImageFont.truetype('FreeMono.ttf', 12) fnt = ImageFont.load_default() # get a drawing context d = ImageDraw.Draw(txt) textSize = fnt.getbbox(self.attribution) textPosition = (image.size[0] - textSize[2], image.size[1] - textSize[3]) offset = 2 options = {"fill": (255, 255, 255, 180)} d.rectangle( [ (textPosition[0] - (2 * offset), textPosition[1] - (2 * offset)), ( textSize[2] + textPosition[0] + (2 * offset), textSize[3] + textPosition[1] + (2 * offset), ), ], **options, ) # draw text, full opacity d.text( (textPosition[0] - offset, textPosition[1] - offset), self.attribution, font=fnt, fill="black", ) image.paste(txt, (0, 0), txt) def classify_event_size(num_people: int) -> dict[str, str | bool]: if num_people < 250: return { "size": "Small", "outage_color": "#F97316", "is_postable": False, } elif num_people < 1000: return { "size": "Medium", "outage_color": "#EF4444", "is_postable": True, } else: return { "size": "Large", "outage_color": "#991B1B", "is_postable": True, } def get_hashtag_string(event) -> str: city = str() try: city = event["geoloc_city"] except KeyError: city = event["city"] neighborhood_text = str() try: neighborhood = event["neighborhood"] if neighborhood != city: neighborhood_text = " #{}".format(neighborhood.title().replace(" ", "")) except KeyError: pass hashtag_string = "#SeattleCityLightOutage #SCLOutage{} #{}".format( neighborhood_text, city.title().replace(" ", "") ) return hashtag_string def do_initial_post( event, event_class, start_time: datetime, estimated_restoration_time: datetime, outage_geometries: shapely.MultiPolygon, ) -> dict[str, str | None]: post_id = None map_media_post_id = None # Fallback location from the SCL API in case one couldn't be reverse geocoded area_text = event["city"] try: map = AttribStaticMap( 512, 512, url_template="https://tiles.stadiamaps.com/tiles/outdoors/{z}/{x}/{y}.png?api_key=" + stadiamaps_api_key, ) assert event["polygons"]["type"] == "polygon" for ring in event["polygons"]["rings"]: polygon = Polygon( ring, # Appending 7F to the fill_color makes it 50% transparent fill_color="{}7F".format(event_class["outage_color"]), outline_color=event_class["outage_color"], simplify=True, ) map.add_polygon(polygon) try: outage_center: shapely.Point = outage_geometries.centroid assert outage_center.geom_type == "Point" # Check to make sure the calculated lat and lon are sane enough # NW Corner assert outage_center.y < 48 and outage_center.x > -122.6 # SE Corner assert outage_center.y > 47.2 and outage_center.x < -122 marker_outline = CircleMarker( (outage_center.x, outage_center.y), "white", 18 ) marker = CircleMarker( (outage_center.x, outage_center.y), event_class["outage_color"], 12 ) map.add_marker(marker_outline) map.add_marker(marker) # Zoom level 17 ensures that we won't get any building/POI names, just street names geocode_url = "{nominatim_url}/reverse?lat={lat}&lon={lon}&format=geocodejson&zoom=17".format( nominatim_url=nominatim_url, lat=outage_center.y, lon=outage_center.x, ) geocode_headers = {"User-Agent": "seattlecitylight-mastodon-bot"} geocode_response = requests.get(geocode_url, headers=geocode_headers) try: geocode = geocode_response.json() except requests.JSONDecodeError: print("JSON could not be loaded from nominatim API") raise if geocode["features"][0]["properties"]["geocoding"]["city"] != "Seattle": city_not_seattle_text = " of {}".format( geocode["features"][0]["properties"]["geocoding"]["city"] ) else: city_not_seattle_text = "" street = geocode["features"][0]["properties"]["geocoding"]["name"] if ( "locality" in geocode["features"][0]["properties"]["geocoding"] and event_class["size"] != "Large" ): locality = geocode["features"][0]["properties"]["geocoding"]["locality"] if locality == "Uptown": locality = "Lower Queen Anne" alt_text = "A map showing the location of the outage, centered around {} in the {} area{}.".format( street, locality, city_not_seattle_text, ) area_text = "the {} area{}".format(locality, city_not_seattle_text) event["neighborhood"] = locality elif "district" in geocode["features"][0]["properties"]["geocoding"]: district = geocode["features"][0]["properties"]["geocoding"]["district"] alt_text = "A map showing the location of the outage, centered around {} in the {} area{}.".format( street, district, city_not_seattle_text, ) area_text = "the {} area{}".format( district, city_not_seattle_text, ) event["neighborhood"] = district else: city = geocode["features"][0]["properties"]["geocoding"]["city"] alt_text = "A map showing the location of the outage, centered around {} in {}.".format( street, city, ) area_text = city event["geoloc_city"] = city except Exception: alt_text = "A map showing the location of the outage." map_image = map.render() with io.BytesIO() as map_image_file: map_image.save(map_image_file, format="PNG", optimize=True) map_media_post = mastodon_client.media_post( map_image_file.getvalue(), mime_type="image/png", description=alt_text, ) map_media_post_id = map_media_post["id"] except Exception as e: print(e) print( "Ran into an issue with generating/uploading the map. Will post without it." ) map_media_post = None hashtag_string = get_hashtag_string(event) post_text = """Seattle City Light is reporting a {} outage in {}. Start Date: {} Est. Restoration: {} Cause: {} {}""".format( event_class["size"].lower(), area_text, start_time.strftime(post_datetime_format), estimated_restoration_time.strftime(post_datetime_format), event["cause"], hashtag_string, ) print( "Posting the following to Mastodon, with a post length of {}:\n{}".format( len(post_text), post_text ) ) post_result = mastodon_client.status_post( status=post_text, media_ids=map_media_post, visibility="public", language="en", ) post_id = post_result["id"] return {"post_id": post_id, "map_media_post_id": map_media_post_id} class Base(DeclarativeBase): pass class SclOutage(Base): __tablename__ = "scl_outages" scl_outage_id: Mapped[int] = mapped_column(primary_key=True, unique=True) outage_user_id: Mapped[str] = mapped_column() most_recent_post_id: Mapped[Optional[str]] = mapped_column() initial_post_id: Mapped[Optional[str]] = mapped_column() map_media_post_id: Mapped[Optional[str]] = mapped_column() last_updated_time: Mapped[datetime] = mapped_column() estimated_restoration_time: Mapped[datetime] = mapped_column() cause: Mapped[str] = mapped_column() status: Mapped[Optional[str]] = mapped_column() no_longer_in_response_time: Mapped[Optional[datetime]] = mapped_column() start_time: Mapped[datetime] = mapped_column() num_people: Mapped[int] = mapped_column() max_num_people: Mapped[int] = mapped_column() def __repr__(self) -> str: return f"SclOutage(scl_outage_id={self.scl_outage_id!r}, most_recent_post_id={self.most_recent_post_id!r}, initial_post_id={self.initial_post_id!r}, map_media_post_id={self.map_media_post_id!r}, last_updated_time={self.last_updated_time!r}, no_longer_in_response_time={self.no_longer_in_response_time!r}), start_time={self.start_time!r}), num_people={self.num_people!r}), max_num_people={self.max_num_people!r})" engine = create_engine("sqlite:///scl.db") Base.metadata.create_all(engine) with Session(engine) as session: for event in scl_events: print("Processing outage with Internal ID {}".format(event["id"])) start_time = datetime.fromtimestamp(event["startTime"] / 1000) last_updated_time = datetime.fromtimestamp(event["lastUpdatedTime"] / 1000) estimated_restoration_time = datetime.fromtimestamp(event["etrTime"] / 1000) lookup_statement = select(SclOutage).where( SclOutage.scl_outage_id == event["id"] ) lookup_result = session.scalars(lookup_statement) event_class = classify_event_size(event["numPeople"]) if "status" in event: status = event["status"] else: status = None outage_geometries = convert_outage_geometry(event) try: hashtag_string = get_hashtag_string(event) existing_record = lookup_result.one() updated_properties = [] updated_entries = [] if estimated_restoration_time != existing_record.estimated_restoration_time: existing_record.estimated_restoration_time = estimated_restoration_time updated_properties.append("estimated restoration") updated_entries.append( "Est. Restoration: {}".format( estimated_restoration_time.strftime(post_datetime_format) ) ) if event["cause"] != existing_record.cause: existing_record.cause = event["cause"] updated_properties.append("cause") updated_entries.append("Cause: {}".format(event["cause"])) previous_event_class = classify_event_size(existing_record.num_people) if event_class["size"] != previous_event_class["size"]: updated_properties.append("outage size") updated_entries.append("Outage Size: {}".format(event_class["size"])) if status != existing_record.status: existing_record.status = status updated_properties.append("status") updated_entries.append("Status: {}".format(status)) if existing_record.num_people != event["numPeople"]: existing_record.num_people = event["numPeople"] if existing_record.max_num_people < event["numPeople"]: # Used to determine the maximum number of people affected by this outage, to determine if it's worth posting about existing_record.max_num_people = event["numPeople"] max_event_class = classify_event_size(existing_record.max_num_people) if updated_properties: updated_properties.sort() updated_entries.sort() if len(updated_properties) == 1: updated_entries.insert( 0, "The {} of this outage has been updated.\n".format( updated_properties[0] ), ) else: # TODO: this currently just smashes all of the properties together with commas, it'd be nice to make it actually format it like a sentence updated_entries.insert( 0, "The {} of this outage have been updated.\n".format( ", ".join(updated_properties) ), ) if max_event_class["is_postable"] and existing_record.initial_post_id: post_result = mastodon_client.status_post( status="\n".join(updated_entries), in_reply_to_id=existing_record.most_recent_post_id, visibility="unlisted", language="en", ) existing_record.most_recent_post_id = post_result["id"] elif max_event_class["is_postable"]: print( "Posting an event that grew above the threshold required to post" ) initial_post_result = do_initial_post( event, event_class, start_time, estimated_restoration_time, outage_geometries, ) existing_record.initial_post_id = initial_post_result["post_id"] existing_record.most_recent_post_id = initial_post_result["post_id"] existing_record.map_media_post_id = initial_post_result[ "map_media_post_id" ] session.commit() except NoResultFound: print("Existing record not found") post_id = None map_media_post_id = None if not event_class["is_postable"]: print( "Outage is {} considered postable, will not post".format( event_class["size"] ) ) else: initial_post_result = do_initial_post( event, event_class, start_time, estimated_restoration_time, outage_geometries, ) post_id = initial_post_result["post_id"] map_media_post_id = initial_post_result["map_media_post_id"] new_outage_record = SclOutage( scl_outage_id=event["id"], outage_user_id=event["identifier"], most_recent_post_id=post_id, initial_post_id=post_id, map_media_post_id=map_media_post_id, last_updated_time=last_updated_time, estimated_restoration_time=estimated_restoration_time, cause=event["cause"], status=status, start_time=start_time, num_people=event["numPeople"], max_num_people=event["numPeople"], ) session.add(new_outage_record) session.commit() lookup_active_outages_statement = select(SclOutage).where( SclOutage.no_longer_in_response_time == None # noqa: E711 - Syntax must stay this way for SQLAlchemy ) for active_outage in session.scalars(lookup_active_outages_statement): if ( not any(event["id"] == active_outage.scl_outage_id for event in scl_events) and scl_events ): # Event ID no longer exists in response if active_outage.most_recent_post_id: try: post_result = mastodon_client.status_post( status="This outage is reported to be resolved.\n\n#SeattleCityLightOutage #SCLOutage #SCLOutage{}".format( active_outage.outage_user_id ), in_reply_to_id=active_outage.most_recent_post_id, visibility="public", language="en", ) active_outage.most_recent_post_id = post_result["id"] except mastodon.MastodonNotFoundError: print( "The outage post couldn't be replied to, it was externally deleted." ) active_outage.no_longer_in_response_time = datetime.now() session.commit()