Big cleanup part 2 (#58)

* Cleanup little-boxes stuff

* Force html5lib for parsing OG data

* Bugfixes
This commit is contained in:
Thomas Sileo
2019-08-04 16:34:30 +02:00
committed by GitHub
parent d38c43ebe8
commit a21121308f
11 changed files with 402 additions and 125 deletions

View File

@@ -1,5 +1,4 @@
import hashlib
import json
import logging
import os
from datetime import datetime
@@ -33,6 +32,8 @@ from core.tasks import Tasks
logger = logging.getLogger(__name__)
_NewMeta = Dict[str, Any]
ACTORS_CACHE = LRUCache(maxsize=256)
MY_PERSON = ap.Person(**ME)
@@ -98,6 +99,9 @@ def _is_local_reply(create: ap.Create) -> bool:
class MicroblogPubBackend(Backend):
"""Implements a Little Boxes backend, backed by MongoDB."""
def base_url(self) -> str:
return BASE_URL
def debug_mode(self) -> bool:
return strtobool(os.getenv("MICROBLOGPUB_DEBUG", "false"))
@@ -108,29 +112,19 @@ class MicroblogPubBackend(Backend):
def extra_inboxes(self) -> List[str]:
return EXTRA_INBOXES
def base_url(self) -> str:
"""Base URL config."""
return BASE_URL
def activity_url(self, obj_id):
"""URL for activity link."""
return f"{BASE_URL}/outbox/{obj_id}"
def note_url(self, obj_id):
"""URL for activity link."""
return f"{BASE_URL}/note/{obj_id}"
def save(self, box: Box, activity: ap.BaseActivity) -> None:
"""Custom helper for saving an activity to the DB."""
visibility = ap.get_visibility(activity)
is_public = False
if visibility in [ap.Visibility.PUBLIC, ap.Visibility.UNLISTED]:
is_public = True
object_id = None
try:
object_id = activity.get_object_id()
except Exception: # TODO(tsileo): should be ValueError, but replies trigger a KeyError on object
pass
object_visibility = None
if activity.has_type(
[ap.ActivityType.CREATE, ap.ActivityType.ANNOUNCE, ap.ActivityType.LIKE]
@@ -695,25 +689,6 @@ class MicroblogPubBackend(Backend):
{"$set": {"meta.thread_root_parent": root_reply}},
)
def post_to_outbox(self, activity: ap.BaseActivity) -> None:
if activity.has_type(ap.CREATE_TYPES):
activity = activity.build_create()
self.save(Box.OUTBOX, activity)
# Assign create a random ID
obj_id = self.random_object_id()
activity.set_id(self.activity_url(obj_id), obj_id)
recipients = activity.recipients()
logger.info(f"recipients={recipients}")
activity = ap.clean_activity(activity.to_dict())
payload = json.dumps(activity)
for recp in recipients:
logger.debug(f"posting to {recp}")
self.post_to_remote_inbox(self.get_actor(), payload, recp)
def gen_feed():
fg = FeedGenerator()

View File

@@ -17,3 +17,11 @@ class CollectionName(Enum):
def find_one_activity(q: _Q) -> _Doc:
return DB[CollectionName.ACTIVITIES.value].find_one(q)
def update_one_activity(q: _Q, update: _Q) -> None:
DB[CollectionName.ACTIVITIES.value].update_one(q, update)
def update_many_activities(q: _Q, update: _Q) -> None:
DB[CollectionName.ACTIVITIES.value].update_many(q, update)

181
core/inbox.py Normal file
View File

@@ -0,0 +1,181 @@
import logging
from functools import singledispatch
from typing import Any
from typing import Dict
from little_boxes import activitypub as ap
from little_boxes.errors import NotAnActivityError
import config
from core.activitypub import _answer_key
from core.db import DB
from core.meta import Box
from core.shared import MY_PERSON
from core.shared import back
from core.shared import post_to_outbox
from core.tasks import Tasks
from utils import now
_logger = logging.getLogger(__name__)
_NewMeta = Dict[str, Any]
@singledispatch
def process_inbox(activity: ap.BaseActivity, new_meta: _NewMeta) -> None:
_logger.warning(f"skipping {activity!r}")
return None
@process_inbox.register
def _delete_process_inbox(delete: ap.Delete, new_meta: _NewMeta) -> None:
_logger.info(f"process_inbox activity={delete!r}")
obj_id = delete.get_object_id()
_logger.debug("delete object={obj_id}")
try:
obj = ap.fetch_remote_activity(obj_id)
_logger.info(f"inbox_delete handle_replies obj={obj!r}")
in_reply_to = obj.get_in_reply_to() if obj.inReplyTo else None
if obj.has_type(ap.CREATE_TYPES):
in_reply_to = ap._get_id(
DB.activities.find_one(
{"meta.object_id": obj_id, "type": ap.ActivityType.CREATE.value}
)["activity"]["object"].get("inReplyTo")
)
if in_reply_to:
back._handle_replies_delete(MY_PERSON, in_reply_to)
except Exception:
_logger.exception(f"failed to handle delete replies for {obj_id}")
DB.activities.update_one(
{"meta.object_id": obj_id, "type": "Create"}, {"$set": {"meta.deleted": True}}
)
# Foce undo other related activities
DB.activities.update({"meta.object_id": obj_id}, {"$set": {"meta.undo": True}})
@process_inbox.register
def _update_process_inbox(update: ap.Update, new_meta: _NewMeta) -> None:
_logger.info(f"process_inbox activity={update!r}")
obj = update.get_object()
if obj.ACTIVITY_TYPE == ap.ActivityType.NOTE:
DB.activities.update_one(
{"activity.object.id": obj.id}, {"$set": {"activity.object": obj.to_dict()}}
)
elif obj.has_type(ap.ActivityType.QUESTION):
choices = obj._data.get("oneOf", obj.anyOf)
total_replies = 0
_set = {}
for choice in choices:
answer_key = _answer_key(choice["name"])
cnt = choice["replies"]["totalItems"]
total_replies += cnt
_set[f"meta.question_answers.{answer_key}"] = cnt
_set["meta.question_replies"] = total_replies
DB.activities.update_one(
{"box": Box.INBOX.value, "activity.object.id": obj.id}, {"$set": _set}
)
# Also update the cached copies of the question (like Announce and Like)
DB.activities.update_many(
{"meta.object.id": obj.id}, {"$set": {"meta.object": obj.to_dict()}}
)
# FIXME(tsileo): handle update actor amd inbox_update_note/inbox_update_actor
@process_inbox.register
def _create_process_inbox(create: ap.Create, new_meta: _NewMeta) -> None:
_logger.info(f"process_inbox activity={create!r}")
# If it's a `Quesiion`, trigger an async task for updating it later (by fetching the remote and updating the
# local copy)
question = create.get_object()
if question.has_type(ap.ActivityType.QUESTION):
Tasks.fetch_remote_question(question)
back._handle_replies(MY_PERSON, create)
@process_inbox.register
def _announce_process_inbox(announce: ap.Announce, new_meta: _NewMeta) -> None:
_logger.info(f"process_inbox activity={announce!r}")
# TODO(tsileo): actually drop it without storing it and better logging, also move the check somewhere else
# or remove it?
try:
obj = announce.get_object()
except NotAnActivityError:
_logger.exception(
f'received an Annouce referencing an OStatus notice ({announce._data["object"]}), dropping the message'
)
return
if obj.has_type(ap.ActivityType.QUESTION):
Tasks.fetch_remote_question(obj)
DB.activities.update_one(
{"remote_id": announce.id},
{
"$set": {
"meta.object": obj.to_dict(embed=True),
"meta.object_actor": obj.get_actor().to_dict(embed=True),
}
},
)
DB.activities.update_one(
{"activity.object.id": obj.id}, {"$inc": {"meta.count_boost": 1}}
)
@process_inbox.register
def _like_process_inbox(like: ap.Like, new_meta: _NewMeta) -> None:
_logger.info(f"process_inbox activity={like!r}")
obj = like.get_object()
# Update the meta counter if the object is published by the server
DB.activities.update_one(
{"box": Box.OUTBOX.value, "activity.object.id": obj.id},
{"$inc": {"meta.count_like": 1}},
)
@process_inbox.register
def _follow_process_inbox(activity: ap.Follow, new_meta: _NewMeta) -> None:
_logger.info(f"process_inbox activity={activity!r}")
# Reply to a Follow with an Accept
actor_id = activity.get_actor().id
accept = ap.Accept(
actor=config.ID,
object={
"type": "Follow",
"id": activity.id,
"object": activity.get_object_id(),
"actor": actor_id,
},
to=[actor_id],
published=now(),
)
post_to_outbox(accept)
@process_inbox.register
def _undo_process_inbox(activity: ap.Undo, new_meta: _NewMeta) -> None:
_logger.info(f"process_inbox activity={activity!r}")
obj = activity.get_object()
DB.activities.update_one({"remote_id": obj.id}, {"$set": {"meta.undo": True}})
if obj.has_type(ap.ActivityType.LIKE):
# Update the meta counter if the object is published by the server
DB.activities.update_one(
{
"box": Box.OUTBOX.value,
"meta.object_id": obj.get_object_id(),
"type": ap.ActivityType.CREATE.value,
},
{"$inc": {"meta.count_like": -1}},
)
elif obj.has_type(ap.ActivityType.ANNOUNCE):
announced = obj.get_object()
# Update the meta counter if the object is published by the server
DB.activities.update_one(
{"activity.object.id": announced.id}, {"$inc": {"meta.count_boost": -1}}
)

148
core/outbox.py Normal file
View File

@@ -0,0 +1,148 @@
import logging
from datetime import datetime
from functools import singledispatch
from typing import Any
from typing import Dict
from little_boxes import activitypub as ap
from core.db import DB
from core.db import find_one_activity
from core.db import update_many_activities
from core.shared import MY_PERSON
from core.shared import back
from core.tasks import Tasks
_logger = logging.getLogger(__name__)
_NewMeta = Dict[str, Any]
@singledispatch
def process_outbox(activity: ap.BaseActivity, new_meta: _NewMeta) -> None:
_logger.warning(f"skipping {activity!r}")
return None
@process_outbox.register
def _delete_process_outbox(delete: ap.Delete, new_meta: _NewMeta) -> None:
_logger.info(f"process_outbox activity={delete!r}")
obj_id = delete.get_object_id()
# Flag everything referencing the deleted object as deleted (except the Delete activity itself)
update_many_activities(
{"meta.object_id": obj_id, "remote_id": {"$ne": delete.id}},
{"$set": {"meta.deleted": True, "meta.undo": True}},
)
# If the deleted activity was in DB, decrease some threads-related counter
data = find_one_activity(
{"meta.object_id": obj_id, "type": ap.ActivityType.CREATE.value}
)
_logger.info(f"found local copy of deleted activity: {data}")
if data:
obj = ap.parse_activity(data["activity"]).get_object()
_logger.info(f"obj={obj!r}")
in_reply_to = obj.get_in_reply_to()
if in_reply_to:
DB.activities.update_one(
{"activity.object.id": in_reply_to},
{"$inc": {"meta.count_reply": -1, "meta.count_direct_reply": -1}},
)
@process_outbox.register
def _update_process_outbox(update: ap.Update, new_meta: _NewMeta) -> None:
_logger.info(f"process_outbox activity={update!r}")
obj = update._data["object"]
update_prefix = "activity.object."
to_update: Dict[str, Any] = {"$set": dict(), "$unset": dict()}
to_update["$set"][f"{update_prefix}updated"] = (
datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
)
for k, v in obj.items():
if k in ["id", "type"]:
continue
if v is None:
to_update["$unset"][f"{update_prefix}{k}"] = ""
else:
to_update["$set"][f"{update_prefix}{k}"] = v
if len(to_update["$unset"]) == 0:
del to_update["$unset"]
_logger.info(f"updating note from outbox {obj!r} {to_update}")
DB.activities.update_one({"activity.object.id": obj["id"]}, to_update)
# FIXME(tsileo): should send an Update (but not a partial one, to all the note's recipients
# (create a new Update with the result of the update, and send it without saving it?)
@process_outbox.register
def _create_process_outbox(create: ap.Create, new_meta: _NewMeta) -> None:
_logger.info(f"process_outbox activity={create!r}")
back._handle_replies(MY_PERSON, create)
@process_outbox.register
def _announce_process_outbox(announce: ap.Announce, new_meta: _NewMeta) -> None:
_logger.info(f"process_outbox activity={announce!r}")
obj = announce.get_object()
if obj.has_type(ap.ActivityType.QUESTION):
Tasks.fetch_remote_question(obj)
DB.activities.update_one(
{"remote_id": announce.id},
{
"$set": {
"meta.object": obj.to_dict(embed=True),
"meta.object_actor": obj.get_actor().to_dict(embed=True),
}
},
)
DB.activities.update_one(
{"activity.object.id": obj.id}, {"$set": {"meta.boosted": announce.id}}
)
@process_outbox.register
def _like_process_outbox(like: ap.Like, new_meta: _NewMeta) -> None:
_logger.info(f"process_outbox activity={like!r}")
obj = like.get_object()
if obj.has_type(ap.ActivityType.QUESTION):
Tasks.fetch_remote_question(obj)
DB.activities.update_one(
{"activity.object.id": obj.id},
{"$inc": {"meta.count_like": 1}, "$set": {"meta.liked": like.id}},
)
@process_outbox.register
def _undo_process_outbox(undo: ap.Undo, new_meta: _NewMeta) -> None:
_logger.info(f"process_outbox activity={undo!r}")
obj = undo.get_object()
DB.activities.update_one({"remote_id": obj.id}, {"$set": {"meta.undo": True}})
# Undo Like
if obj.has_type(ap.ActivityType.LIKE):
liked = obj.get_object_id()
DB.activities.update_one(
{"activity.object.id": liked},
{"$inc": {"meta.count_like": -1}, "$set": {"meta.liked": False}},
)
elif obj.has_type(ap.ActivityType.ANNOUNCE):
announced = obj.get_object_id()
DB.activities.update_one(
{"activity.object.id": announced}, {"$set": {"meta.boosted": False}}
)
# Undo Follow (undo new following)
elif obj.has_type(ap.ActivityType.FOLLOW):
pass
# do nothing

View File

@@ -1,13 +1,13 @@
import binascii
import os
from datetime import datetime
from datetime import timezone
from functools import wraps
from typing import Any
from typing import Dict
from typing import Union
from urllib.parse import urljoin
import flask
import werkzeug
from bson.objectid import ObjectId
from flask import current_app as app
from flask import redirect
@@ -19,6 +19,7 @@ from little_boxes import activitypub as ap
from little_boxes.activitypub import format_datetime
from poussetaches import PousseTaches
from config import BASE_URL
from config import DB
from config import ME
from core import activitypub
@@ -26,7 +27,8 @@ from core.activitypub import _answer_key
from core.meta import Box
from core.tasks import Tasks
_Response = Union[flask.Response, werkzeug.wrappers.Response, str]
# _Response = Union[flask.Response, werkzeug.wrappers.Response, str, Any]
_Response = Any
p = PousseTaches(
os.getenv("MICROBLOGPUB_POUSSETACHES_HOST", "http://localhost:7991"),
@@ -69,7 +71,7 @@ def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not session.get("logged_in"):
return redirect(url_for("admin_login", next=request.url))
return redirect(url_for("admin.admin_login", next=request.url))
return f(*args, **kwargs)
return decorated_function
@@ -92,14 +94,26 @@ def _get_ip():
return ip, geoip
def activity_url(item_id: str) -> str:
return urljoin(BASE_URL, url_for("outbox_detail", item_id=item_id))
def post_to_outbox(activity: ap.BaseActivity) -> str:
if activity.has_type(ap.CREATE_TYPES):
activity = activity.build_create()
# Assign create a random ID
obj_id = back.random_object_id()
activity.set_id(back.activity_url(obj_id), obj_id)
obj_id = binascii.hexlify(os.urandom(8)).decode("utf-8")
uri = activity_url(obj_id)
activity._data["id"] = uri
if activity.has_type(ap.ActivityType.CREATE):
activity._data["object"]["id"] = urljoin(
BASE_URL, url_for("outbox_activity", item_id=obj_id)
)
activity._data["object"]["url"] = urljoin(
BASE_URL, url_for("note_by_id", note_id=obj_id)
)
activity.reset_object_cache()
back.save(Box.OUTBOX, activity)
Tasks.cache_actor(activity.id)