import io import math from datetime import datetime from typing import Optional import requests import yaml from mastodon import Mastodon from PIL import Image, ImageDraw, ImageFont from sqlalchemy import create_engine, select from sqlalchemy.exc import NoResultFound from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column from staticmap import Polygon, StaticMap post_datetime_format = "%b %e %l:%M %p" scl_events_url = "https://utilisocial.io/datacapable/v2/p/scl/map/events" scl_events_response = requests.get(scl_events_url) try: scl_events = scl_events_response.json() except requests.JSONDecodeError: print("JSON could not be loaded from SCL API") raise config = yaml.safe_load(open("config.yml")) stadiamaps_api_key = config["stadiamaps"]["api_key"] nominatim_url = config["nominatim"]["api_base_url"] mastodon = Mastodon( client_id=config["mastodon"]["client_id"], client_secret=config["mastodon"]["client_secret"], access_token=config["mastodon"]["access_token"], api_base_url=config["mastodon"]["api_base_url"], ) class AttribStaticMap(StaticMap, object): def __init__(self, *args, **kwargs): self.attribution = "© Stadia Maps © OpenMapTiles © OpenStreetMap" super(AttribStaticMap, self).__init__(*args, **kwargs) def _draw_features(self, image): super(AttribStaticMap, self)._draw_features(image) txt = Image.new("RGBA", image.size, (255, 255, 255, 0)) # get a font # fnt = ImageFont.truetype('FreeMono.ttf', 12) fnt = ImageFont.load_default() # get a drawing context d = ImageDraw.Draw(txt) textSize = fnt.getbbox(self.attribution) textPosition = (image.size[0] - textSize[2], image.size[1] - textSize[3]) offset = 2 options = {"fill": (255, 255, 255, 180)} d.rectangle( [ (textPosition[0] - (2 * offset), textPosition[1] - (2 * offset)), ( textSize[2] + textPosition[0] + (2 * offset), textSize[3] + textPosition[1] + (2 * offset), ), ], **options, ) # draw text, full opacity d.text( (textPosition[0] - offset, textPosition[1] - offset), self.attribution, font=fnt, fill="black", ) image.paste(txt, (0, 0), txt) def classify_event_size(num_people: int) -> dict[str, str, bool]: if num_people < 250: return { "size": "Small", "outage_color": "#F97316", "is_postable": False, } elif event["numPeople"] < 1000: return { "size": "Medium", "outage_color": "#EF4444", "is_postable": True, } else: return { "size": "Large", "outage_color": "#991B1B", "is_postable": True, } class Base(DeclarativeBase): pass class SclOutage(Base): __tablename__ = "scl_outages" scl_outage_id: Mapped[int] = mapped_column(primary_key=True, unique=True) outage_user_id: Mapped[str] = mapped_column() most_recent_post_id: Mapped[Optional[str]] = mapped_column() initial_post_id: Mapped[Optional[str]] = mapped_column() map_media_post_id: Mapped[Optional[str]] = mapped_column() last_updated_time: Mapped[datetime] = mapped_column() estimated_restoration_time: Mapped[datetime] = mapped_column() cause: Mapped[str] = mapped_column() status: Mapped[Optional[str]] = mapped_column() no_longer_in_response_time: Mapped[Optional[datetime]] = mapped_column() start_time: Mapped[datetime] = mapped_column() num_people: Mapped[int] = mapped_column() max_num_people: Mapped[int] = mapped_column() def __repr__(self) -> str: return f"SclOutage(scl_outage_id={self.scl_outage_id!r}, most_recent_post_id={self.most_recent_post_id!r}, initial_post_id={self.initial_post_id!r}, map_media_post_id={self.map_media_post_id!r}, last_updated_time={self.last_updated_time!r}, no_longer_in_response_time={self.no_longer_in_response_time!r}), start_time={self.start_time!r}), num_people={self.num_people!r}), max_num_people={self.max_num_people!r})" engine = create_engine("sqlite:///scl.db") Base.metadata.create_all(engine) with Session(engine) as session: for event in scl_events: print("Processing outage with Internal ID {}".format(event["id"])) start_time = datetime.fromtimestamp(event["startTime"] / 1000) last_updated_time = datetime.fromtimestamp(event["lastUpdatedTime"] / 1000) estimated_restoration_time = datetime.fromtimestamp(event["etrTime"] / 1000) lookup_statement = select(SclOutage).where( SclOutage.scl_outage_id == event["id"] ) lookup_result = session.scalars(lookup_statement) event_class = classify_event_size(event["numPeople"]) if "status" in event: status = event["status"] else: status = None hashtag_string = "#SeattleCityLightOutage #SCLOutage #SCLOutage{}".format( event["identifier"] ) try: existing_record = lookup_result.one() updated_properties = [] updated_entries = [] if estimated_restoration_time != existing_record.estimated_restoration_time: existing_record.estimated_restoration_time = estimated_restoration_time updated_properties.append("estimated restoration") updated_entries.append( "Est. Restoration: {}".format( estimated_restoration_time.strftime(post_datetime_format) ) ) if event["cause"] != existing_record.cause: existing_record.cause = event["cause"] updated_properties.append("cause") updated_entries.append("Cause: {}".format(event["cause"])) previous_event_class = classify_event_size(existing_record.num_people) if event_class["size"] != previous_event_class["size"]: updated_properties.append("outage size") updated_entries.append("Outage Size: {}".format(event_class["size"])) if status != existing_record.status: existing_record.status = status updated_properties.append("status") updated_entries.append("Status: {}".format(status)) if existing_record.num_people != event["numPeople"]: existing_record.num_people = event["numPeople"] if existing_record.max_num_people < event["numPeople"]: # Used to determine the maximum number of people affected by this outage, to determine if it's worth posting about existing_record.max_num_people = event["numPeople"] if updated_properties: updated_properties.sort() updated_entries.sort() if len(updated_properties) == 1: updated_entries.insert( 0, "The {} of this outage has been updated.\n".format( updated_properties[0] ), ) else: # TODO: this currently just smashes all of the properties together with commas, it'd be nice to make it actually format it like a sentence updated_entries.insert( 0, "The {} of this outage have been updated.\n".format( ", ".join(updated_properties) ), ) updated_entries.append("") updated_entries.append(hashtag_string) mastodon_post_result = mastodon.status_post( status="\n".join(updated_entries), in_reply_to_id=existing_record.most_recent_post_id, visibility="public", language="en", ) existing_record.most_recent_post_id = mastodon_post_result["id"] session.commit() except NoResultFound: print("Existing record not found") if not event_class["is_postable"]: print("Outage is not considered postable, will not post") continue else: # Fallback location from the SCL API in case one couldn't be reverse geocoded area_text = event["city"] try: map = AttribStaticMap( 512, 512, url_template="https://tiles.stadiamaps.com/tiles/outdoors/{z}/{x}/{y}.png?api_key=" + stadiamaps_api_key, ) assert event["polygons"]["type"] == "polygon" for ring in event["polygons"]["rings"]: polygon = Polygon( ring, # Appending 7F to the fill_color makes it 50% transparent fill_color="{}7F".format(event_class["outage_color"]), outline_color=event_class["outage_color"], simplify=True, ) map.add_polygon(polygon) map_image = map.render() try: def num2deg(xtile, ytile, zoom): n = 1 << zoom lon_deg = xtile / n * 360.0 - 180.0 lat_rad = math.atan( math.sinh(math.pi * (1 - 2 * ytile / n)) ) lat_deg = math.degrees(lat_rad) return lat_deg, lon_deg center_lat_lon = num2deg(map.x_center, map.y_center, map.zoom) # Check to make sure the calculated lat and lon are sane enough # NW Corner assert center_lat_lon[0] < 48 and center_lat_lon[1] > -122.6 # SE Corner assert center_lat_lon[0] > 47.2 and center_lat_lon[1] < -122 # Zoom level 17 ensures that we won't get any building/POI names, just street names geocode_url = "{nominatim_url}/reverse?lat={lat}&lon={lon}&format=geocodejson&zoom=17".format( nominatim_url=nominatim_url, lat=center_lat_lon[0], lon=center_lat_lon[1], ) geocode_headers = { "User-Agent": "seattlecitylight-mastodon-bot" } geocode_response = requests.get( geocode_url, headers=geocode_headers ) try: geocode = geocode_response.json() except requests.JSONDecodeError: print("JSON could not be loaded from nominatim API") raise if ( geocode["features"][0]["properties"]["geocoding"]["city"] != "Seattle" ): city_not_seattle_text = " of {}".format( geocode["features"][0]["properties"]["geocoding"][ "city" ] ) else: city_not_seattle_text = "" street = geocode["features"][0]["properties"]["geocoding"][ "name" ] if ( "locality" in geocode["features"][0]["properties"]["geocoding"] and event_class["size"] != "Large" ): locality = geocode["features"][0]["properties"][ "geocoding" ]["locality"] if locality == "Uptown": locality = "Lower Queen Anne" alt_text = "A map showing the location of the outage, centered around {} in the {} area{}.".format( street, locality, city_not_seattle_text, ) area_text = "the {} area{}".format( locality, city_not_seattle_text ) elif ( "district" in geocode["features"][0]["properties"]["geocoding"] ): alt_text = "A map showing the location of the outage, centered around {} in the {} area{}.".format( street, geocode["features"][0]["properties"]["geocoding"][ "district" ], city_not_seattle_text, ) area_text = "the {} area{}".format( geocode["features"][0]["properties"]["geocoding"][ "district" ], city_not_seattle_text, ) else: alt_text = "A map showing the location of the outage, centered around {} in {}.".format( street, geocode["features"][0]["properties"]["geocoding"][ "city" ], ) area_text = geocode["features"][0]["properties"][ "geocoding" ]["city"] except Exception: alt_text = "A map showing the location of the outage." with io.BytesIO() as map_image_file: map_image.save(map_image_file, format="PNG", optimize=True) map_media_post = mastodon.media_post( map_image_file.getvalue(), mime_type="image/png", description=alt_text, ) except Exception as e: print(e) print( "Ran into an issue with generating/uploading the map. Will post without it." ) map_media_post = None post_text = """Seattle City Light is reporting a {} outage in {}. Start Date: {} Est. Restoration: {} Cause: {} {}""".format( event_class["size"].lower(), area_text, start_time.strftime(post_datetime_format), estimated_restoration_time.strftime(post_datetime_format), event["cause"], hashtag_string, ) print( "Posting the following to Mastodon, with a post length of {}:\n{}".format( len(post_text), post_text ) ) mastodon_post_result = mastodon.status_post( status=post_text, media_ids=map_media_post, visibility="public", language="en", ) new_outage_record = SclOutage( scl_outage_id=event["id"], outage_user_id=event["identifier"], most_recent_post_id=mastodon_post_result["id"], initial_post_id=mastodon_post_result["id"], map_media_post_id=map_media_post["id"], last_updated_time=last_updated_time, estimated_restoration_time=estimated_restoration_time, cause=event["cause"], status=status, start_time=start_time, num_people=event["numPeople"], max_num_people=event["numPeople"], ) session.add(new_outage_record) session.commit() lookup_active_outages_statement = select(SclOutage).where( SclOutage.no_longer_in_response_time == None ) for active_outage in session.scalars(lookup_active_outages_statement): if not any(event["id"] == active_outage.scl_outage_id for event in scl_events): # Event ID no longer exists in response mastodon_post_result = mastodon.status_post( status="This outage is reported to be resolved.\n\n#SeattleCityLightOutage #SCLOutage #SCLOutage{}".format( active_outage.outage_user_id ), in_reply_to_id=active_outage.most_recent_post_id, visibility="public", language="en", ) active_outage.most_recent_post_id = mastodon_post_result["id"] active_outage.no_longer_in_response_time = datetime.now() session.commit()