Split app
This commit is contained in:
496
blueprints/tasks.py
Normal file
496
blueprints/tasks.py
Normal file
@@ -0,0 +1,496 @@
|
||||
import json
|
||||
import traceback
|
||||
from datetime import datetime
|
||||
from datetime import timezone
|
||||
|
||||
import flask
|
||||
import requests
|
||||
from flask import current_app as app
|
||||
from little_boxes import activitypub as ap
|
||||
from little_boxes.errors import ActivityGoneError
|
||||
from little_boxes.errors import ActivityNotFoundError
|
||||
from little_boxes.errors import NotAnActivityError
|
||||
from little_boxes.httpsig import HTTPSigAuth
|
||||
from requests.exceptions import HTTPError
|
||||
|
||||
import activity_gc
|
||||
import activitypub
|
||||
import config
|
||||
from activitypub import Box
|
||||
from app_utils import MY_PERSON
|
||||
from app_utils import _add_answers_to_question
|
||||
from app_utils import back
|
||||
from app_utils import p
|
||||
from app_utils import post_to_outbox
|
||||
from config import DB
|
||||
from tasks import Tasks
|
||||
from utils import now
|
||||
from utils import opengraph
|
||||
from utils.meta import MetaKey
|
||||
from utils.meta import _meta
|
||||
from utils.notifications import set_inbox_flags
|
||||
|
||||
SIG_AUTH = HTTPSigAuth(config.KEY)
|
||||
|
||||
blueprint = flask.Blueprint("tasks", __name__)
|
||||
|
||||
|
||||
class TaskError(Exception):
|
||||
"""Raised to log the error for poussetaches."""
|
||||
|
||||
def __init__(self):
|
||||
self.message = traceback.format_exc()
|
||||
|
||||
|
||||
@blueprint.route("/task/update_question", methods=["POST"])
|
||||
def task_update_question():
|
||||
"""Sends an Update."""
|
||||
task = p.parse(flask.request)
|
||||
app.logger.info(f"task={task!r}")
|
||||
iri = task.payload
|
||||
try:
|
||||
app.logger.info(f"Updating question {iri}")
|
||||
cc = [config.ID + "/followers"]
|
||||
doc = DB.activities.find_one({"box": Box.OUTBOX.value, "remote_id": iri})
|
||||
_add_answers_to_question(doc)
|
||||
question = ap.Question(**doc["activity"]["object"])
|
||||
|
||||
raw_update = dict(
|
||||
actor=question.id,
|
||||
object=question.to_dict(embed=True),
|
||||
attributedTo=MY_PERSON.id,
|
||||
cc=list(set(cc)),
|
||||
to=[ap.AS_PUBLIC],
|
||||
)
|
||||
raw_update["@context"] = config.DEFAULT_CTX
|
||||
|
||||
update = ap.Update(**raw_update)
|
||||
print(update)
|
||||
print(update.to_dict())
|
||||
post_to_outbox(update)
|
||||
|
||||
except HTTPError as err:
|
||||
app.logger.exception("request failed")
|
||||
if 400 >= err.response.status_code >= 499:
|
||||
app.logger.info("client error, no retry")
|
||||
return ""
|
||||
|
||||
raise TaskError() from err
|
||||
except Exception as err:
|
||||
app.logger.exception("task failed")
|
||||
raise TaskError() from err
|
||||
|
||||
return ""
|
||||
|
||||
|
||||
@blueprint.route("/task/fetch_og_meta", methods=["POST"])
|
||||
def task_fetch_og_meta():
|
||||
task = p.parse(flask.request)
|
||||
app.logger.info(f"task={task!r}")
|
||||
iri = task.payload
|
||||
try:
|
||||
activity = ap.fetch_remote_activity(iri)
|
||||
app.logger.info(f"activity={activity!r}")
|
||||
if activity.has_type(ap.ActivityType.CREATE):
|
||||
note = activity.get_object()
|
||||
links = opengraph.links_from_note(note.to_dict())
|
||||
og_metadata = opengraph.fetch_og_metadata(config.USER_AGENT, links)
|
||||
for og in og_metadata:
|
||||
if not og.get("image"):
|
||||
continue
|
||||
config.MEDIA_CACHE.cache_og_image(og["image"], iri)
|
||||
|
||||
app.logger.debug(f"OG metadata {og_metadata!r}")
|
||||
DB.activities.update_one(
|
||||
{"remote_id": iri}, {"$set": {"meta.og_metadata": og_metadata}}
|
||||
)
|
||||
|
||||
app.logger.info(f"OG metadata fetched for {iri}: {og_metadata}")
|
||||
except (ActivityGoneError, ActivityNotFoundError):
|
||||
app.logger.exception(f"dropping activity {iri}, skip OG metedata")
|
||||
return ""
|
||||
except requests.exceptions.HTTPError as http_err:
|
||||
if 400 <= http_err.response.status_code < 500:
|
||||
app.logger.exception("bad request, no retry")
|
||||
return ""
|
||||
app.logger.exception("failed to fetch OG metadata")
|
||||
raise TaskError() from http_err
|
||||
except Exception as err:
|
||||
app.logger.exception(f"failed to fetch OG metadata for {iri}")
|
||||
raise TaskError() from err
|
||||
|
||||
return ""
|
||||
|
||||
|
||||
@blueprint.route("/task/cache_object", methods=["POST"])
|
||||
def task_cache_object():
|
||||
task = p.parse(flask.request)
|
||||
app.logger.info(f"task={task!r}")
|
||||
iri = task.payload
|
||||
try:
|
||||
activity = ap.fetch_remote_activity(iri)
|
||||
app.logger.info(f"activity={activity!r}")
|
||||
obj = activity.get_object()
|
||||
DB.activities.update_one(
|
||||
{"remote_id": activity.id},
|
||||
{
|
||||
"$set": {
|
||||
"meta.object": obj.to_dict(embed=True),
|
||||
"meta.object_actor": activitypub._actor_to_meta(obj.get_actor()),
|
||||
}
|
||||
},
|
||||
)
|
||||
except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError):
|
||||
DB.activities.update_one({"remote_id": iri}, {"$set": {"meta.deleted": True}})
|
||||
app.logger.exception(f"flagging activity {iri} as deleted, no object caching")
|
||||
except Exception as err:
|
||||
app.logger.exception(f"failed to cache object for {iri}")
|
||||
raise TaskError() from err
|
||||
|
||||
return ""
|
||||
|
||||
|
||||
@blueprint.route("/task/finish_post_to_outbox", methods=["POST"]) # noqa:C901
|
||||
def task_finish_post_to_outbox():
|
||||
task = p.parse(flask.request)
|
||||
app.logger.info(f"task={task!r}")
|
||||
iri = task.payload
|
||||
try:
|
||||
activity = ap.fetch_remote_activity(iri)
|
||||
app.logger.info(f"activity={activity!r}")
|
||||
|
||||
recipients = activity.recipients()
|
||||
|
||||
if activity.has_type(ap.ActivityType.DELETE):
|
||||
back.outbox_delete(MY_PERSON, activity)
|
||||
elif activity.has_type(ap.ActivityType.UPDATE):
|
||||
back.outbox_update(MY_PERSON, activity)
|
||||
elif activity.has_type(ap.ActivityType.CREATE):
|
||||
back.outbox_create(MY_PERSON, activity)
|
||||
elif activity.has_type(ap.ActivityType.ANNOUNCE):
|
||||
back.outbox_announce(MY_PERSON, activity)
|
||||
elif activity.has_type(ap.ActivityType.LIKE):
|
||||
back.outbox_like(MY_PERSON, activity)
|
||||
elif activity.has_type(ap.ActivityType.UNDO):
|
||||
obj = activity.get_object()
|
||||
if obj.has_type(ap.ActivityType.LIKE):
|
||||
back.outbox_undo_like(MY_PERSON, obj)
|
||||
elif obj.has_type(ap.ActivityType.ANNOUNCE):
|
||||
back.outbox_undo_announce(MY_PERSON, obj)
|
||||
elif obj.has_type(ap.ActivityType.FOLLOW):
|
||||
back.undo_new_following(MY_PERSON, obj)
|
||||
|
||||
app.logger.info(f"recipients={recipients}")
|
||||
activity = ap.clean_activity(activity.to_dict())
|
||||
|
||||
payload = json.dumps(activity)
|
||||
for recp in recipients:
|
||||
app.logger.debug(f"posting to {recp}")
|
||||
Tasks.post_to_remote_inbox(payload, recp)
|
||||
except (ActivityGoneError, ActivityNotFoundError):
|
||||
app.logger.exception(f"no retry")
|
||||
except Exception as err:
|
||||
app.logger.exception(f"failed to post to remote inbox for {iri}")
|
||||
raise TaskError() from err
|
||||
|
||||
return ""
|
||||
|
||||
|
||||
@blueprint.route("/task/finish_post_to_inbox", methods=["POST"]) # noqa: C901
|
||||
def task_finish_post_to_inbox():
|
||||
task = p.parse(flask.request)
|
||||
app.logger.info(f"task={task!r}")
|
||||
iri = task.payload
|
||||
try:
|
||||
activity = ap.fetch_remote_activity(iri)
|
||||
app.logger.info(f"activity={activity!r}")
|
||||
|
||||
if activity.has_type(ap.ActivityType.DELETE):
|
||||
back.inbox_delete(MY_PERSON, activity)
|
||||
elif activity.has_type(ap.ActivityType.UPDATE):
|
||||
back.inbox_update(MY_PERSON, activity)
|
||||
elif activity.has_type(ap.ActivityType.CREATE):
|
||||
back.inbox_create(MY_PERSON, activity)
|
||||
elif activity.has_type(ap.ActivityType.ANNOUNCE):
|
||||
back.inbox_announce(MY_PERSON, activity)
|
||||
elif activity.has_type(ap.ActivityType.LIKE):
|
||||
back.inbox_like(MY_PERSON, activity)
|
||||
elif activity.has_type(ap.ActivityType.FOLLOW):
|
||||
# Reply to a Follow with an Accept
|
||||
actor_id = activity.get_actor().id
|
||||
accept = ap.Accept(
|
||||
actor=config.ID,
|
||||
object={
|
||||
"type": "Follow",
|
||||
"id": activity.id,
|
||||
"object": activity.get_object_id(),
|
||||
"actor": actor_id,
|
||||
},
|
||||
to=[actor_id],
|
||||
published=now(),
|
||||
)
|
||||
post_to_outbox(accept)
|
||||
elif activity.has_type(ap.ActivityType.UNDO):
|
||||
obj = activity.get_object()
|
||||
if obj.has_type(ap.ActivityType.LIKE):
|
||||
back.inbox_undo_like(MY_PERSON, obj)
|
||||
elif obj.has_type(ap.ActivityType.ANNOUNCE):
|
||||
back.inbox_undo_announce(MY_PERSON, obj)
|
||||
elif obj.has_type(ap.ActivityType.FOLLOW):
|
||||
back.undo_new_follower(MY_PERSON, obj)
|
||||
except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError):
|
||||
app.logger.exception(f"no retry")
|
||||
except Exception as err:
|
||||
app.logger.exception(f"failed to cache attachments for {iri}")
|
||||
raise TaskError() from err
|
||||
|
||||
return ""
|
||||
|
||||
|
||||
@blueprint.route("/task/cache_attachments", methods=["POST"])
|
||||
def task_cache_attachments():
|
||||
task = p.parse(flask.request)
|
||||
app.logger.info(f"task={task!r}")
|
||||
iri = task.payload
|
||||
try:
|
||||
activity = ap.fetch_remote_activity(iri)
|
||||
app.logger.info(f"activity={activity!r}")
|
||||
# Generates thumbnails for the actor's icon and the attachments if any
|
||||
|
||||
obj = activity.get_object()
|
||||
|
||||
# Iter the attachments
|
||||
for attachment in obj._data.get("attachment", []):
|
||||
try:
|
||||
config.MEDIA_CACHE.cache_attachment(attachment, iri)
|
||||
except ValueError:
|
||||
app.logger.exception(f"failed to cache {attachment}")
|
||||
|
||||
app.logger.info(f"attachments cached for {iri}")
|
||||
|
||||
except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError):
|
||||
app.logger.exception(f"dropping activity {iri}, no attachment caching")
|
||||
except Exception as err:
|
||||
app.logger.exception(f"failed to cache attachments for {iri}")
|
||||
raise TaskError() from err
|
||||
|
||||
return ""
|
||||
|
||||
|
||||
@blueprint.route("/task/cache_actor", methods=["POST"])
|
||||
def task_cache_actor() -> str:
|
||||
task = p.parse(flask.request)
|
||||
app.logger.info(f"task={task!r}")
|
||||
iri = task.payload["iri"]
|
||||
try:
|
||||
activity = ap.fetch_remote_activity(iri)
|
||||
app.logger.info(f"activity={activity!r}")
|
||||
|
||||
# Fetch the Open Grah metadata if it's a `Create`
|
||||
if activity.has_type(ap.ActivityType.CREATE):
|
||||
Tasks.fetch_og_meta(iri)
|
||||
|
||||
actor = activity.get_actor()
|
||||
if actor.icon:
|
||||
if isinstance(actor.icon, dict) and "url" in actor.icon:
|
||||
config.MEDIA_CACHE.cache_actor_icon(actor.icon["url"])
|
||||
else:
|
||||
app.logger.warning(f"failed to parse icon {actor.icon} for {iri}")
|
||||
|
||||
if activity.has_type(ap.ActivityType.FOLLOW):
|
||||
if actor.id == config.ID:
|
||||
# It's a new following, cache the "object" (which is the actor we follow)
|
||||
DB.activities.update_one(
|
||||
{"remote_id": iri},
|
||||
{
|
||||
"$set": {
|
||||
"meta.object": activity.get_object().to_dict(embed=True)
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
# Cache the actor info
|
||||
DB.activities.update_one(
|
||||
{"remote_id": iri}, {"$set": {"meta.actor": actor.to_dict(embed=True)}}
|
||||
)
|
||||
|
||||
app.logger.info(f"actor cached for {iri}")
|
||||
if activity.has_type([ap.ActivityType.CREATE, ap.ActivityType.ANNOUNCE]):
|
||||
Tasks.cache_attachments(iri)
|
||||
|
||||
except (ActivityGoneError, ActivityNotFoundError):
|
||||
DB.activities.update_one({"remote_id": iri}, {"$set": {"meta.deleted": True}})
|
||||
app.logger.exception(f"flagging activity {iri} as deleted, no actor caching")
|
||||
except Exception as err:
|
||||
app.logger.exception(f"failed to cache actor for {iri}")
|
||||
raise TaskError() from err
|
||||
|
||||
return ""
|
||||
|
||||
|
||||
@blueprint.route("/task/forward_activity", methods=["POST"])
|
||||
def task_forward_activity():
|
||||
task = p.parse(flask.request)
|
||||
app.logger.info(f"task={task!r}")
|
||||
iri = task.payload
|
||||
try:
|
||||
activity = ap.fetch_remote_activity(iri)
|
||||
recipients = back.followers_as_recipients()
|
||||
app.logger.debug(f"Forwarding {activity!r} to {recipients}")
|
||||
activity = ap.clean_activity(activity.to_dict())
|
||||
payload = json.dumps(activity)
|
||||
for recp in recipients:
|
||||
app.logger.debug(f"forwarding {activity!r} to {recp}")
|
||||
Tasks.post_to_remote_inbox(payload, recp)
|
||||
except Exception as err:
|
||||
app.logger.exception("task failed")
|
||||
raise TaskError() from err
|
||||
|
||||
return ""
|
||||
|
||||
|
||||
@blueprint.route("/task/post_to_remote_inbox", methods=["POST"])
|
||||
def task_post_to_remote_inbox():
|
||||
"""Post an activity to a remote inbox."""
|
||||
task = p.parse(flask.request)
|
||||
app.logger.info(f"task={task!r}")
|
||||
payload, to = task.payload["payload"], task.payload["to"]
|
||||
try:
|
||||
app.logger.info("payload=%s", payload)
|
||||
app.logger.info("generating sig")
|
||||
signed_payload = json.loads(payload)
|
||||
|
||||
# XXX Disable JSON-LD signature crap for now (as HTTP signatures are enough for most implementations)
|
||||
# Don't overwrite the signature if we're forwarding an activity
|
||||
# if "signature" not in signed_payload:
|
||||
# generate_signature(signed_payload, KEY)
|
||||
|
||||
app.logger.info("to=%s", to)
|
||||
resp = requests.post(
|
||||
to,
|
||||
data=json.dumps(signed_payload),
|
||||
auth=SIG_AUTH,
|
||||
headers={
|
||||
"Content-Type": config.HEADERS[1],
|
||||
"Accept": config.HEADERS[1],
|
||||
"User-Agent": config.USER_AGENT,
|
||||
},
|
||||
)
|
||||
app.logger.info("resp=%s", resp)
|
||||
app.logger.info("resp_body=%s", resp.text)
|
||||
resp.raise_for_status()
|
||||
except HTTPError as err:
|
||||
app.logger.exception("request failed")
|
||||
if 400 >= err.response.status_code >= 499:
|
||||
app.logger.info("client error, no retry")
|
||||
return ""
|
||||
|
||||
raise TaskError() from err
|
||||
except Exception as err:
|
||||
app.logger.exception("task failed")
|
||||
raise TaskError() from err
|
||||
|
||||
return ""
|
||||
|
||||
|
||||
@blueprint.route("/task/fetch_remote_question", methods=["POST"])
|
||||
def task_fetch_remote_question():
|
||||
"""Fetch a remote question for implementation that does not send Update."""
|
||||
task = p.parse(flask.request)
|
||||
app.logger.info(f"task={task!r}")
|
||||
iri = task.payload
|
||||
try:
|
||||
app.logger.info(f"Fetching remote question {iri}")
|
||||
local_question = DB.activities.find_one(
|
||||
{
|
||||
"box": Box.INBOX.value,
|
||||
"type": ap.ActivityType.CREATE.value,
|
||||
"activity.object.id": iri,
|
||||
}
|
||||
)
|
||||
remote_question = ap.get_backend().fetch_iri(iri, no_cache=True)
|
||||
# FIXME(tsileo): compute and set `meta.object_visiblity` (also update utils.py to do it)
|
||||
if (
|
||||
local_question
|
||||
and (
|
||||
local_question["meta"].get("voted_for")
|
||||
or local_question["meta"].get("subscribed")
|
||||
)
|
||||
and not DB.notifications.find_one({"activity.id": remote_question["id"]})
|
||||
):
|
||||
DB.notifications.insert_one(
|
||||
{
|
||||
"type": "question_ended",
|
||||
"datetime": datetime.now(timezone.utc).isoformat(),
|
||||
"activity": remote_question,
|
||||
}
|
||||
)
|
||||
|
||||
# Update the Create if we received it in the inbox
|
||||
if local_question:
|
||||
DB.activities.update_one(
|
||||
{"remote_id": local_question["remote_id"], "box": Box.INBOX.value},
|
||||
{"$set": {"activity.object": remote_question}},
|
||||
)
|
||||
|
||||
# Also update all the cached copies (Like, Announce...)
|
||||
DB.activities.update_many(
|
||||
{"meta.object.id": remote_question["id"]},
|
||||
{"$set": {"meta.object": remote_question}},
|
||||
)
|
||||
|
||||
except HTTPError as err:
|
||||
app.logger.exception("request failed")
|
||||
if 400 >= err.response.status_code >= 499:
|
||||
app.logger.info("client error, no retry")
|
||||
return ""
|
||||
|
||||
raise TaskError() from err
|
||||
except Exception as err:
|
||||
app.logger.exception("task failed")
|
||||
raise TaskError() from err
|
||||
|
||||
return ""
|
||||
|
||||
|
||||
@blueprint.route("/task/cleanup", methods=["POST"])
|
||||
def task_cleanup():
|
||||
task = p.parse(flask.request)
|
||||
app.logger.info(f"task={task!r}")
|
||||
activity_gc.perform()
|
||||
return ""
|
||||
|
||||
|
||||
@blueprint.route("/task/process_new_activity", methods=["POST"]) # noqa:c901
|
||||
def task_process_new_activity():
|
||||
"""Process an activity received in the inbox"""
|
||||
task = p.parse(flask.request)
|
||||
app.logger.info(f"task={task!r}")
|
||||
iri = task.payload
|
||||
try:
|
||||
activity = ap.fetch_remote_activity(iri)
|
||||
app.logger.info(f"activity={activity!r}")
|
||||
|
||||
flags = {}
|
||||
|
||||
if not activity.published:
|
||||
flags[_meta(MetaKey.PUBLISHED)] = now()
|
||||
else:
|
||||
flags[_meta(MetaKey.PUBLISHED)] = activity.published
|
||||
|
||||
set_inbox_flags(activity, flags)
|
||||
app.logger.info(f"a={activity}, flags={flags!r}")
|
||||
if flags:
|
||||
DB.activities.update_one({"remote_id": activity.id}, {"$set": flags})
|
||||
|
||||
app.logger.info(f"new activity {iri} processed")
|
||||
if not activity.has_type(ap.ActivityType.DELETE):
|
||||
Tasks.cache_actor(iri)
|
||||
except (ActivityGoneError, ActivityNotFoundError):
|
||||
app.logger.exception(f"dropping activity {iri}, skip processing")
|
||||
return ""
|
||||
except Exception as err:
|
||||
app.logger.exception(f"failed to process new activity {iri}")
|
||||
raise TaskError() from err
|
||||
|
||||
return ""
|
Reference in New Issue
Block a user