import io import re from datetime import datetime from typing import Optional, Tuple import mastodon import requests import shapely import staticmap import yaml from mastodon import Mastodon from mastodon.return_types import MediaAttachment from PIL import Image, ImageDraw, ImageFont from shapely import Geometry from sqlalchemy import create_engine, select from sqlalchemy.exc import NoResultFound from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column from geospatial import DBGeometry, convert_outage_geometry from posttext import list_to_sentence REQUESTS_HEADERS = {"User-Agent": "seattlecitylight-mastodon-bot"} POST_DATETIME_FORMAT = "%b %e %l:%M %p" scl_events_url = "https://utilisocial.io/datacapable/v2/p/scl/map/events" scl_events_response = requests.get(scl_events_url) try: scl_events = scl_events_response.json() except requests.JSONDecodeError: print("JSON could not be loaded from SCL API") raise config = yaml.safe_load(open("config.yml")) osm_url_template = config["osm"]["url_template"] osm_attribution = config["osm"]["attribution"] nominatim_url = config["nominatim"]["api_base_url"] mastodon_client = Mastodon( client_id=config["mastodon"]["client_id"], client_secret=config["mastodon"]["client_secret"], access_token=config["mastodon"]["access_token"], api_base_url=config["mastodon"]["api_base_url"], ) try: mastodon_client.account_verify_credentials() except Exception: print("Mastodon health check failed") raise if config["nominatim"]["health_check_url"]: try: health_check_response = requests.get( config["nominatim"]["health_check_url"], headers=REQUESTS_HEADERS, timeout=10, ) assert health_check_response.ok except Exception: print("Nominatim health check failed") raise if config["osm"]["health_check_url"]: try: health_check_response = requests.get( config["osm"]["health_check_url"], headers=REQUESTS_HEADERS, timeout=10 ) assert health_check_response.ok except Exception: print("OSM health check failed") raise class AttribStaticMap(staticmap.StaticMap, object): def __init__(self, *args, **kwargs): self.attribution = osm_attribution super(AttribStaticMap, self).__init__(*args, **kwargs) def _draw_features(self, image): super(AttribStaticMap, self)._draw_features(image) txt = Image.new("RGBA", image.size, (255, 255, 255, 0)) fnt = ImageFont.truetype("fonts/Cabin-Regular.ttf", 24) d = ImageDraw.Draw(txt) textSize = fnt.getbbox(self.attribution) textPosition = (image.size[0] - textSize[2], image.size[1] - textSize[3]) offset = 2 d.rectangle( [ (textPosition[0] - (2 * offset), textPosition[1] - (2 * offset)), ( textSize[2] + textPosition[0] + (2 * offset), textSize[3] + textPosition[1] + (2 * offset), ), ], fill=(255, 255, 255, 180), ) # draw text, full opacity d.text( (textPosition[0] - offset, textPosition[1] - offset), self.attribution, font=fnt, fill="black", ) image.paste(txt, (0, 0), txt) def classify_event_size(num_people: int) -> dict[str, str | bool]: if num_people < 250: return { "size": "Small", "outage_color": "#F97316", "is_postable": False, } elif num_people < 1000: return { "size": "Medium", "outage_color": "#EF4444", "is_postable": True, } else: return { "size": "Large", "outage_color": "#991B1B", "is_postable": True, } def clean_hashtag_text(hashtag_text: str) -> str: return re.sub(r"[\W^_]+", "", hashtag_text.title()) def get_hashtag_string(event) -> str: city = str() try: city = event["geoloc_city"] except KeyError: city = event["city"] neighborhood_text = str() try: neighborhood = event["neighborhood"] if neighborhood != city: neighborhood_text = " #{}".format(clean_hashtag_text(neighborhood)) except KeyError: pass hashtag_string = "#SeattleCityLightOutage #SCLOutage{} #{}".format( neighborhood_text, clean_hashtag_text(city) ) return hashtag_string def generate_post_map_image( event, event_class, outage_geometries: shapely.MultiPolygon ) -> Tuple[MediaAttachment, str]: # Fallback location from the SCL API in case one couldn't be reverse geocoded area_text = event["city"] map = AttribStaticMap( 1024, 1024, url_template=osm_url_template, tile_size=512, ) assert event["polygons"]["type"] == "polygon" for ring in event["polygons"]["rings"]: polygon = staticmap.Polygon( ring, # Appending 7F to the fill_color makes it 50% transparent fill_color="{}7F".format(event_class["outage_color"]), outline_color=event_class["outage_color"], ) map.add_polygon(polygon) try: outage_center: shapely.Point = outage_geometries.centroid assert outage_center.geom_type == "Point" # Check to make sure the calculated lat and lon are sane enough # NW Corner assert outage_center.y < 48 and outage_center.x > -122.6 # SE Corner assert outage_center.y > 47.2 and outage_center.x < -122 # Zoom level 17 ensures that we won't get any building/POI names, just street names geocode_url = "{nominatim_url}/reverse?lat={lat}&lon={lon}&format=geocodejson&zoom=17".format( nominatim_url=nominatim_url, lat=outage_center.y, lon=outage_center.x, ) geocode_response = requests.get(geocode_url, headers=REQUESTS_HEADERS) try: geocode = geocode_response.json() except requests.JSONDecodeError: print("JSON could not be loaded from nominatim API") raise city = geocode["features"][0]["properties"]["geocoding"]["city"] street = geocode["features"][0]["properties"]["geocoding"]["name"] event["geoloc_city"] = city if city != "Seattle": city_not_seattle_text = " of {}".format(city) else: city_not_seattle_text = "" if ( "locality" in geocode["features"][0]["properties"]["geocoding"] and event_class["size"] != "Large" ): locality = geocode["features"][0]["properties"]["geocoding"]["locality"] if locality == "Uptown": locality = "Lower Queen Anne" alt_text = "A map showing the location of the outage, centered around {} in the {} area{}.".format( street, locality, city_not_seattle_text, ) area_text = "the {} area{}".format(locality, city_not_seattle_text) event["neighborhood"] = locality elif "district" in geocode["features"][0]["properties"]["geocoding"]: district = geocode["features"][0]["properties"]["geocoding"]["district"] alt_text = "A map showing the location of the outage, centered around {} in the {} area{}.".format( street, district, city_not_seattle_text, ) area_text = "the {} area{}".format( district, city_not_seattle_text, ) event["neighborhood"] = district else: alt_text = "A map showing the location of the outage, centered around {} in {}.".format( street, city, ) area_text = city except Exception: alt_text = "A map showing the location of the outage." map_image = map.render() with io.BytesIO() as map_image_file: map_image.save(map_image_file, format="WebP", method=6) map_media_post = mastodon_client.media_post( map_image_file.getvalue(), # type: ignore mime_type="image/webp", description=alt_text, ) return (map_media_post, area_text) def do_initial_post( event, event_class, start_time: datetime, estimated_restoration_time: datetime, outage_geometries: shapely.MultiPolygon, ) -> dict[str, str | None]: post_id = None map_media_post = None area_text = str() try: map_media_post, area_text = generate_post_map_image( event, event_class, outage_geometries ) except Exception as e: print(e) print( "Ran into an issue with generating/uploading the map. Will post without it." ) hashtag_string = get_hashtag_string(event) est_restoration_post_text = str() if estimated_restoration_time > datetime.now(): est_restoration_post_text = "\nEst. Restoration: {}\n".format( estimated_restoration_time.strftime(POST_DATETIME_FORMAT) ) post_text = """Seattle City Light is reporting a {} outage in {}. Start Date: {}{} Cause: {} {}""".format( event_class["size"].lower(), area_text, start_time.strftime(POST_DATETIME_FORMAT), est_restoration_post_text, event["cause"], hashtag_string, ) print( "Posting the following to Mastodon, with a post length of {}:\n{}".format( len(post_text), post_text ) ) post_result = mastodon_client.status_post( status=post_text, media_ids=map_media_post, visibility="public", language="en", ) post_id = post_result["id"] return {"post_id": post_id, "map_media_post_id": None} class Base(DeclarativeBase): type_annotation_map = { Geometry: DBGeometry, } class SclOutage(Base): __tablename__ = "scl_outages" scl_outage_id: Mapped[int] = mapped_column(primary_key=True, unique=True) outage_user_id: Mapped[str] = mapped_column() most_recent_post_id: Mapped[Optional[str]] = mapped_column() initial_post_id: Mapped[Optional[str]] = mapped_column() map_media_post_id: Mapped[Optional[str]] = mapped_column() last_updated_time: Mapped[datetime] = mapped_column() estimated_restoration_time: Mapped[datetime] = mapped_column() cause: Mapped[str] = mapped_column() status: Mapped[Optional[str]] = mapped_column() no_longer_in_response_time: Mapped[Optional[datetime]] = mapped_column() start_time: Mapped[datetime] = mapped_column() num_people: Mapped[int] = mapped_column() max_num_people: Mapped[int] = mapped_column() neighborhood: Mapped[Optional[str]] = mapped_column() city: Mapped[Optional[str]] = mapped_column() outage_geometries: Mapped[Optional[Geometry]] = mapped_column() geometries_modified: Mapped[Optional[bool]] = mapped_column() def __repr__(self) -> str: return f"SclOutage(scl_outage_id={self.scl_outage_id!r}, most_recent_post_id={self.most_recent_post_id!r}, initial_post_id={self.initial_post_id!r}, map_media_post_id={self.map_media_post_id!r}, last_updated_time={self.last_updated_time!r}, no_longer_in_response_time={self.no_longer_in_response_time!r}, start_time={self.start_time!r}, num_people={self.num_people!r}, max_num_people={self.max_num_people!r}, neighborhood={self.neighborhood!r}, city={self.city!r}, outage_geometries={self.outage_geometries!r}, geometries_modified={self.geometries_modified!r})" engine = create_engine("sqlite:///scl.db") Base.metadata.create_all(engine) with Session(engine) as session: for event in scl_events: print("Processing outage with Internal ID {}".format(event["id"])) start_time = datetime.fromtimestamp(event["startTime"] / 1000) last_updated_time = datetime.fromtimestamp(event["lastUpdatedTime"] / 1000) estimated_restoration_time = datetime.fromtimestamp(event["etrTime"] / 1000) lookup_statement = select(SclOutage).where( SclOutage.scl_outage_id == event["id"] ) lookup_result = session.scalars(lookup_statement) event_class = classify_event_size(event["numPeople"]) if "status" in event: status = event["status"] else: status = None outage_geometries = convert_outage_geometry(event) try: hashtag_string = get_hashtag_string(event) existing_record = lookup_result.one() updated_properties = [] updated_entries = [] map_media_post = None est_restoration_diff_mins = ( abs( ( estimated_restoration_time - existing_record.estimated_restoration_time ).total_seconds() ) / 60 ) # Only post if estimated restoration time has changed by 60m or more if est_restoration_diff_mins >= 60: existing_record.estimated_restoration_time = estimated_restoration_time if estimated_restoration_time > datetime.now(): # New estimated restoration time is in the future, so likely to be a real time updated_properties.append("estimated restoration") updated_entries.append( "Est. Restoration: {}".format( estimated_restoration_time.strftime(POST_DATETIME_FORMAT) ) ) if event["cause"] != existing_record.cause: existing_record.cause = event["cause"] updated_properties.append("cause") updated_entries.append("Cause: {}".format(event["cause"])) previous_event_class = classify_event_size(existing_record.num_people) if event_class["size"] != previous_event_class["size"]: updated_properties.append("outage size") updated_entries.append("Outage Size: {}".format(event_class["size"])) if status != existing_record.status: existing_record.status = status updated_properties.append("status") updated_entries.append("Status: {}".format(status)) if existing_record.num_people != event["numPeople"]: existing_record.num_people = event["numPeople"] if existing_record.max_num_people < event["numPeople"]: # Used to determine the maximum number of people affected by this outage, to determine if it's worth posting about existing_record.max_num_people = event["numPeople"] max_event_class = classify_event_size(existing_record.max_num_people) if existing_record.outage_geometries != outage_geometries: print("Geometries modified") updated_properties.append("area") existing_record.outage_geometries = outage_geometries existing_record.geometries_modified = True map_media_post, _ = generate_post_map_image( event, event_class, outage_geometries ) if updated_properties: updated_properties.sort() updated_entries.sort() if len(updated_properties) == 1: updated_entries.insert( 0, "The {} of this outage has been updated.\n".format( updated_properties[0] ), ) else: updated_entries.insert( 0, "The {} of this outage have been updated.\n".format( list_to_sentence(updated_properties) ), ) if max_event_class["is_postable"] and existing_record.initial_post_id: try: post_result = mastodon_client.status_post( status="\n".join(updated_entries), in_reply_to_id=existing_record.most_recent_post_id, media_ids=map_media_post, visibility="public", language="en", ) existing_record.most_recent_post_id = post_result["id"] except mastodon.MastodonNotFoundError: print( "Could not post a reply to the existing post, skip this update" ) elif max_event_class["is_postable"]: print( "Posting an event that grew above the threshold required to post" ) initial_post_result = do_initial_post( event, event_class, start_time, estimated_restoration_time, outage_geometries, ) try: existing_record.neighborhood = initial_post_result[ "neighborhood" ] except KeyError: pass try: existing_record.city = initial_post_result["city"] except KeyError: pass existing_record.initial_post_id = initial_post_result["post_id"] existing_record.most_recent_post_id = initial_post_result["post_id"] existing_record.map_media_post_id = initial_post_result[ "map_media_post_id" ] else: print("Existing record was found, and no properties were updated.") session.commit() except NoResultFound: print("Existing record not found") post_id = None map_media_post_id = None neighborhood = None city = None if not event_class["is_postable"]: print( "Outage is {} considered postable, will not post".format( event_class["size"] ) ) else: initial_post_result = do_initial_post( event, event_class, start_time, estimated_restoration_time, outage_geometries, ) post_id = initial_post_result["post_id"] map_media_post_id = initial_post_result["map_media_post_id"] try: neighborhood = initial_post_result["neighborhood"] except KeyError: pass try: city = initial_post_result["city"] except KeyError: pass new_outage_record = SclOutage( scl_outage_id=event["id"], outage_user_id=event["identifier"], most_recent_post_id=post_id, initial_post_id=post_id, map_media_post_id=map_media_post_id, last_updated_time=last_updated_time, estimated_restoration_time=estimated_restoration_time, cause=event["cause"], status=status, start_time=start_time, num_people=event["numPeople"], max_num_people=event["numPeople"], neighborhood=neighborhood, city=city, outage_geometries=outage_geometries, ) session.add(new_outage_record) session.commit() lookup_active_outages_statement = select(SclOutage).where( SclOutage.no_longer_in_response_time == None # noqa: E711 - Syntax must stay this way for SQLAlchemy ) for active_outage in session.scalars(lookup_active_outages_statement): if ( not any(event["id"] == active_outage.scl_outage_id for event in scl_events) and scl_events ): # Event ID no longer exists in response if active_outage.most_recent_post_id: try: post_result = mastodon_client.status_post( status="This outage is no longer in the SCL feed, which usually means it's either been resolved, or split into multiple smaller outages.\n\n#SeattleCityLightOutage #SCLOutage", in_reply_to_id=active_outage.most_recent_post_id, visibility="public", language="en", ) active_outage.most_recent_post_id = post_result["id"] except mastodon.MastodonNotFoundError: print( "The outage post couldn't be replied to, it was externally deleted." ) active_outage.no_longer_in_response_time = datetime.now() session.commit()