commit wip
This commit is contained in:
parent
93936bee88
commit
78f01601ef
3 changed files with 174 additions and 10 deletions
BIN
map.png
Normal file
BIN
map.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 153 KiB |
125
map_test.py
Normal file
125
map_test.py
Normal file
|
@ -0,0 +1,125 @@
|
|||
import math
|
||||
|
||||
import requests
|
||||
from PIL import Image, ImageDraw, ImageFont
|
||||
from staticmap import Polygon, StaticMap
|
||||
|
||||
|
||||
class AttribStaticMap(StaticMap, object):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.attribution = "© Stadia Maps © OpenMapTiles © OpenStreetMap"
|
||||
super(AttribStaticMap, self).__init__(*args, **kwargs)
|
||||
|
||||
def _draw_features(self, image):
|
||||
super(AttribStaticMap, self)._draw_features(image)
|
||||
|
||||
txt = Image.new("RGBA", image.size, (255, 255, 255, 0))
|
||||
# get a font
|
||||
# fnt = ImageFont.truetype('FreeMono.ttf', 12)
|
||||
fnt = ImageFont.load_default()
|
||||
# get a drawing context
|
||||
d = ImageDraw.Draw(txt)
|
||||
|
||||
textSize = fnt.getbbox(self.attribution)
|
||||
textPosition = (image.size[0] - textSize[2], image.size[1] - textSize[3])
|
||||
offset = 2
|
||||
options = {"fill": (255, 255, 255, 180)}
|
||||
d.rectangle(
|
||||
[
|
||||
(textPosition[0] - (2 * offset), textPosition[1] - (2 * offset)),
|
||||
(
|
||||
textSize[2] + textPosition[0] + (2 * offset),
|
||||
textSize[3] + textPosition[1] + (2 * offset),
|
||||
),
|
||||
],
|
||||
**options
|
||||
)
|
||||
|
||||
# draw text, full opacity
|
||||
d.text(
|
||||
(textPosition[0] - offset, textPosition[1] - offset),
|
||||
self.attribution,
|
||||
font=fnt,
|
||||
fill="black",
|
||||
)
|
||||
|
||||
image.paste(txt, (0, 0), txt)
|
||||
|
||||
|
||||
scl_events_url = "https://utilisocial.io/datacapable/v2/p/scl/map/events"
|
||||
scl_events_response = requests.get(scl_events_url)
|
||||
try:
|
||||
scl_events = scl_events_response.json()
|
||||
except requests.JSONDecodeError:
|
||||
print("JSON could not be loaded from SCL API")
|
||||
raise
|
||||
|
||||
with open("stadiamaps_api_key.secret", "r+") as stadiamaps_api_key_file:
|
||||
# Reading from a file
|
||||
stadiamaps_api_key = stadiamaps_api_key_file.read()
|
||||
|
||||
map = AttribStaticMap(
|
||||
512,
|
||||
512,
|
||||
url_template="https://tiles.stadiamaps.com/tiles/outdoors/{z}/{x}/{y}.png?api_key="
|
||||
+ stadiamaps_api_key,
|
||||
)
|
||||
for ring in scl_events[0]["polygons"]["rings"]:
|
||||
polygon = Polygon(ring, "#F973167F", "#F97316", simplify=True)
|
||||
map.add_polygon(polygon)
|
||||
image = map.render()
|
||||
image.save("map.png", "PNG")
|
||||
|
||||
|
||||
try:
|
||||
def num2deg(xtile, ytile, zoom):
|
||||
n = 1 << zoom
|
||||
lon_deg = xtile / n * 360.0 - 180.0
|
||||
lat_rad = math.atan(math.sinh(math.pi * (1 - 2 * ytile / n)))
|
||||
lat_deg = math.degrees(lat_rad)
|
||||
return lat_deg, lon_deg
|
||||
|
||||
center_lat_lon = num2deg(map.x_center, map.y_center, map.zoom)
|
||||
|
||||
geocode_url = "http://gruezi-skyros.srv.gruezi.net:6664/reverse?lat={lat}&lon={lon}&format=geocodejson".format(
|
||||
lat=center_lat_lon[0], lon=center_lat_lon[1]
|
||||
)
|
||||
geocode_headers = {"User-Agent": "seattlecitylight-mastodon-bot"}
|
||||
geocode_response = requests.get(geocode_url, headers=geocode_headers)
|
||||
try:
|
||||
geocode = geocode_response.json()
|
||||
except requests.JSONDecodeError:
|
||||
print("JSON could not be loaded from nominatim API")
|
||||
raise
|
||||
|
||||
if geocode["features"][0]["properties"]["geocoding"]["city"] != "Seattle":
|
||||
city_not_seattle_text = " of {}".format(
|
||||
geocode["features"][0]["properties"]["geocoding"]["city"]
|
||||
)
|
||||
else:
|
||||
city_not_seattle_text = ""
|
||||
|
||||
if "locality" in geocode["features"][0]["properties"]["geocoding"]:
|
||||
locality = geocode["features"][0]["properties"]["geocoding"]
|
||||
if locality == "Uptown":
|
||||
locality = "Lower Queen Anne"
|
||||
|
||||
alt_text = "A map showing the location of the outage, centered around {} in the {} area{}.".format(
|
||||
geocode["features"][0]["properties"]["geocoding"]["name"],
|
||||
locality,
|
||||
city_not_seattle_text,
|
||||
)
|
||||
elif "district" in geocode["features"][0]["properties"]["geocoding"]:
|
||||
alt_text = "A map showing the location of the outage, centered around {} in the {} area{}.".format(
|
||||
geocode["features"][0]["properties"]["geocoding"]["name"],
|
||||
geocode["features"][0]["properties"]["geocoding"]["district"],
|
||||
city_not_seattle_text,
|
||||
)
|
||||
else:
|
||||
alt_text = "A map showing the location of the outage, centered around {} in {}.".format(
|
||||
geocode["features"][0]["properties"]["geocoding"]["name"],
|
||||
geocode["features"][0]["properties"]["geocoding"]["city"],
|
||||
)
|
||||
except Exception:
|
||||
alt_text = "A map showing the location of the outage."
|
||||
print(alt_text)
|
59
scl.py
59
scl.py
|
@ -5,13 +5,15 @@ from typing import Optional
|
|||
|
||||
import mastodon
|
||||
import requests
|
||||
import sqlalchemy.types as types
|
||||
import staticmap
|
||||
import yaml
|
||||
from mastodon import Mastodon
|
||||
from PIL import Image, ImageDraw, ImageFont
|
||||
from shapely import Geometry, MultiPolygon, Point, Polygon, from_wkb, to_wkb
|
||||
from sqlalchemy import create_engine, select
|
||||
from sqlalchemy.exc import NoResultFound
|
||||
from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column
|
||||
from staticmap import Polygon, StaticMap
|
||||
|
||||
post_datetime_format = "%b %e %l:%M %p"
|
||||
|
||||
|
@ -34,7 +36,7 @@ mastodon_client = Mastodon(
|
|||
)
|
||||
|
||||
|
||||
class AttribStaticMap(StaticMap, object):
|
||||
class AttribStaticMap(staticmap.StaticMap, object):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.attribution = "© Stadia Maps © OpenMapTiles © OpenStreetMap"
|
||||
super(AttribStaticMap, self).__init__(*args, **kwargs)
|
||||
|
@ -103,6 +105,16 @@ def get_hashtag_string(event) -> str:
|
|||
return hashtag_string
|
||||
|
||||
|
||||
def convert_outage_geometry(event) -> MultiPolygon:
|
||||
assert event["polygons"]["type"] == "polygon"
|
||||
assert event["polygons"]["hasZ"] is False
|
||||
assert event["polygons"]["hasM"] is False
|
||||
polygon_list = []
|
||||
for ring in event["polygons"]["rings"]:
|
||||
polygon_list.append(Polygon(ring))
|
||||
return MultiPolygon(polygon_list)
|
||||
|
||||
|
||||
def do_initial_post(
|
||||
event, event_class, start_time: datetime, estimated_restoration_time: datetime
|
||||
) -> dict[str, str | None]:
|
||||
|
@ -119,7 +131,7 @@ def do_initial_post(
|
|||
)
|
||||
assert event["polygons"]["type"] == "polygon"
|
||||
for ring in event["polygons"]["rings"]:
|
||||
polygon = Polygon(
|
||||
polygon = staticmap.Polygon(
|
||||
ring,
|
||||
# Appending 7F to the fill_color makes it 50% transparent
|
||||
fill_color="{}7F".format(event_class["outage_color"]),
|
||||
|
@ -204,11 +216,14 @@ def do_initial_post(
|
|||
|
||||
with io.BytesIO() as map_image_file:
|
||||
map_image.save(map_image_file, format="PNG", optimize=True)
|
||||
map_media_post = mastodon_client.media_post(
|
||||
map_image_file.getvalue(),
|
||||
mime_type="image/png",
|
||||
description=alt_text,
|
||||
)
|
||||
if __debug__:
|
||||
print("Would have uploaded the map media here")
|
||||
else:
|
||||
map_media_post = mastodon_client.media_post(
|
||||
map_image_file.getvalue(),
|
||||
mime_type="image/png",
|
||||
description=alt_text,
|
||||
)
|
||||
map_media_post_id = map_media_post["id"]
|
||||
|
||||
except Exception as e:
|
||||
|
@ -251,8 +266,21 @@ Cause: {}
|
|||
return {"post_id": post_id, "map_media_post_id": map_media_post_id}
|
||||
|
||||
|
||||
class GeometryWkb(types.TypeDecorator):
|
||||
impl = types.LargeBinary
|
||||
cache_ok = True
|
||||
|
||||
def process_bind_param(self, value, dialect):
|
||||
return to_wkb(value)
|
||||
|
||||
def process_result_value(self, value, dialect):
|
||||
return from_wkb(value)
|
||||
|
||||
|
||||
class Base(DeclarativeBase):
|
||||
pass
|
||||
type_annotation_map = {
|
||||
Geometry: GeometryWkb,
|
||||
}
|
||||
|
||||
|
||||
class SclOutage(Base):
|
||||
|
@ -270,9 +298,10 @@ class SclOutage(Base):
|
|||
start_time: Mapped[datetime] = mapped_column()
|
||||
num_people: Mapped[int] = mapped_column()
|
||||
max_num_people: Mapped[int] = mapped_column()
|
||||
outage_geometries: Mapped[Geometry] = mapped_column()
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"SclOutage(scl_outage_id={self.scl_outage_id!r}, most_recent_post_id={self.most_recent_post_id!r}, initial_post_id={self.initial_post_id!r}, map_media_post_id={self.map_media_post_id!r}, last_updated_time={self.last_updated_time!r}, no_longer_in_response_time={self.no_longer_in_response_time!r}), start_time={self.start_time!r}), num_people={self.num_people!r}), max_num_people={self.max_num_people!r})"
|
||||
return f"SclOutage(scl_outage_id={self.scl_outage_id!r}, most_recent_post_id={self.most_recent_post_id!r}, initial_post_id={self.initial_post_id!r}, map_media_post_id={self.map_media_post_id!r}, last_updated_time={self.last_updated_time!r}, no_longer_in_response_time={self.no_longer_in_response_time!r}), start_time={self.start_time!r}), num_people={self.num_people!r}), max_num_people={self.max_num_people!r}), outage_geometries={self.outage_geometries!r}"
|
||||
|
||||
|
||||
engine = create_engine("sqlite:///scl.db")
|
||||
|
@ -297,6 +326,9 @@ with Session(engine) as session:
|
|||
else:
|
||||
status = None
|
||||
|
||||
outage_geometries = convert_outage_geometry(event)
|
||||
scl_outage_location = Point(event["latitude"], event["longitude"])
|
||||
|
||||
try:
|
||||
hashtag_string = get_hashtag_string(event)
|
||||
existing_record = lookup_result.one()
|
||||
|
@ -329,6 +361,10 @@ with Session(engine) as session:
|
|||
# Used to determine the maximum number of people affected by this outage, to determine if it's worth posting about
|
||||
existing_record.max_num_people = event["numPeople"]
|
||||
max_event_class = classify_event_size(existing_record.max_num_people)
|
||||
if existing_record.outage_geometries != outage_geometries:
|
||||
print("updating geometries")
|
||||
existing_record.outage_geometries = outage_geometries
|
||||
|
||||
|
||||
if updated_properties:
|
||||
updated_properties.sort()
|
||||
|
@ -368,6 +404,8 @@ with Session(engine) as session:
|
|||
existing_record.map_media_post_id = initial_post_result[
|
||||
"map_media_post_id"
|
||||
]
|
||||
else:
|
||||
print("Existing record was found, and no properties were updated.")
|
||||
session.commit()
|
||||
|
||||
except NoResultFound:
|
||||
|
@ -400,6 +438,7 @@ with Session(engine) as session:
|
|||
start_time=start_time,
|
||||
num_people=event["numPeople"],
|
||||
max_num_people=event["numPeople"],
|
||||
outage_geometries=outage_geometries,
|
||||
)
|
||||
session.add(new_outage_record)
|
||||
session.commit()
|
||||
|
|
Loading…
Add table
Reference in a new issue