HUGE cleanup
This commit is contained in:
207
activitypub.py
207
activitypub.py
@@ -1,6 +1,7 @@
|
||||
import logging
|
||||
import os
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
from typing import Any
|
||||
from typing import Dict
|
||||
from typing import List
|
||||
@@ -17,12 +18,12 @@ from config import ID
|
||||
from config import ME
|
||||
from config import USER_AGENT
|
||||
from config import USERNAME
|
||||
from little_boxes import strtobool
|
||||
from little_boxes import activitypub as ap
|
||||
from little_boxes import strtobool
|
||||
from little_boxes.activitypub import _to_list
|
||||
from little_boxes.backend import Backend
|
||||
from little_boxes.collection import parse_collection as ap_parse_collection
|
||||
from little_boxes.errors import Error
|
||||
from little_boxes.activitypub import _to_list
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -46,6 +47,12 @@ def ensure_it_is_me(f):
|
||||
return wrapper
|
||||
|
||||
|
||||
class Box(Enum):
|
||||
INBOX = "inbox"
|
||||
OUTBOX = "outbox"
|
||||
REPLIES = "replies"
|
||||
|
||||
|
||||
class MicroblogPubBackend(Backend):
|
||||
"""Implements a Little Boxes backend, backed by MongoDB."""
|
||||
|
||||
@@ -68,10 +75,11 @@ class MicroblogPubBackend(Backend):
|
||||
"""URL for activity link."""
|
||||
return f"{BASE_URL}/note/{obj_id}"
|
||||
|
||||
@ensure_it_is_me
|
||||
def outbox_new(self, as_actor: ap.Person, activity: ap.BaseActivity) -> None:
|
||||
DB.outbox.insert_one(
|
||||
def save(self, box: Box, activity: ap.BaseActivity) -> None:
|
||||
"""Custom helper for saving an activity to the DB."""
|
||||
DB.activities.insert_one(
|
||||
{
|
||||
"box": box.value,
|
||||
"activity": activity.to_dict(),
|
||||
"type": _to_list(activity.type),
|
||||
"remote_id": activity.id,
|
||||
@@ -79,11 +87,16 @@ class MicroblogPubBackend(Backend):
|
||||
}
|
||||
)
|
||||
|
||||
@ensure_it_is_me
|
||||
def outbox_new(self, as_actor: ap.Person, activity: ap.BaseActivity) -> None:
|
||||
self.save(Box.OUTBOX, activity)
|
||||
|
||||
@ensure_it_is_me
|
||||
def outbox_is_blocked(self, as_actor: ap.Person, actor_id: str) -> bool:
|
||||
return bool(
|
||||
DB.outbox.find_one(
|
||||
DB.activities.find_one(
|
||||
{
|
||||
"box": Box.OUTBOX.value,
|
||||
"type": ap.ActivityType.BLOCK.value,
|
||||
"activity.object": actor_id,
|
||||
"meta.undo": False,
|
||||
@@ -101,14 +114,14 @@ class MicroblogPubBackend(Backend):
|
||||
if iri.endswith("/activity"):
|
||||
iri = iri.replace("/activity", "")
|
||||
is_a_note = True
|
||||
data = DB.outbox.find_one({"remote_id": iri})
|
||||
if data:
|
||||
if is_a_note:
|
||||
return data["activity"]["object"]
|
||||
data = DB.activities.find_one({"box": Box.OUTBOX.value, "remote_id": iri})
|
||||
if data and is_a_note:
|
||||
return data["activity"]["object"]
|
||||
elif data:
|
||||
return data["activity"]
|
||||
else:
|
||||
# Check if the activity is stored in the inbox
|
||||
data = DB.inbox.find_one({"remote_id": iri})
|
||||
data = DB.activities.find_one({"remote_id": iri})
|
||||
if data:
|
||||
return data["activity"]
|
||||
|
||||
@@ -117,18 +130,11 @@ class MicroblogPubBackend(Backend):
|
||||
|
||||
@ensure_it_is_me
|
||||
def inbox_check_duplicate(self, as_actor: ap.Person, iri: str) -> bool:
|
||||
return bool(DB.inbox.find_one({"remote_id": iri}))
|
||||
return bool(DB.activities.find_one({"box": Box.INBOX.value, "remote_id": iri}))
|
||||
|
||||
@ensure_it_is_me
|
||||
def inbox_new(self, as_actor: ap.Person, activity: ap.BaseActivity) -> None:
|
||||
DB.inbox.insert_one(
|
||||
{
|
||||
"activity": activity.to_dict(),
|
||||
"type": _to_list(activity.type),
|
||||
"remote_id": activity.id,
|
||||
"meta": {"undo": False, "deleted": False},
|
||||
}
|
||||
)
|
||||
self.save(Box.INBOX, activity)
|
||||
|
||||
@ensure_it_is_me
|
||||
def post_to_remote_inbox(self, as_actor: ap.Person, payload: str, to: str) -> None:
|
||||
@@ -161,41 +167,37 @@ class MicroblogPubBackend(Backend):
|
||||
def inbox_like(self, as_actor: ap.Person, like: ap.Like) -> None:
|
||||
obj = like.get_object()
|
||||
# Update the meta counter if the object is published by the server
|
||||
DB.outbox.update_one(
|
||||
{"activity.object.id": obj.id}, {"$inc": {"meta.count_like": 1}}
|
||||
DB.activities.update_one(
|
||||
{"box": Box.OUTBOX.value, "activity.object.id": obj.id},
|
||||
{"$inc": {"meta.count_like": 1}},
|
||||
)
|
||||
|
||||
@ensure_it_is_me
|
||||
def inbox_undo_like(self, as_actor: ap.Person, like: ap.Like) -> None:
|
||||
obj = like.get_object()
|
||||
# Update the meta counter if the object is published by the server
|
||||
DB.outbox.update_one(
|
||||
{"activity.object.id": obj.id}, {"$inc": {"meta.count_like": -1}}
|
||||
DB.activities.update_one(
|
||||
{"box": Box.OUTBOX.value, "activity.object.id": obj.id},
|
||||
{"$inc": {"meta.count_like": -1}},
|
||||
)
|
||||
|
||||
@ensure_it_is_me
|
||||
def outbox_like(self, as_actor: ap.Person, like: ap.Like) -> None:
|
||||
obj = like.get_object()
|
||||
# Unlikely, but an actor can like it's own post
|
||||
DB.outbox.update_one(
|
||||
{"activity.object.id": obj.id}, {"$inc": {"meta.count_like": 1}}
|
||||
DB.activities.update_one(
|
||||
{"activity.object.id": obj.id},
|
||||
{"$inc": {"meta.count_like": 1}, "$set": {"meta.liked": like.id}},
|
||||
)
|
||||
|
||||
# Keep track of the like we just performed
|
||||
DB.inbox.update_one(
|
||||
{"activity.object.id": obj.id}, {"$set": {"meta.liked": like.id}}
|
||||
DB.activities.update_one(
|
||||
{"remote_id": like.id}, {"$set": {"meta.object": obj.to_dict(embed=True)}}
|
||||
)
|
||||
|
||||
@ensure_it_is_me
|
||||
def outbox_undo_like(self, as_actor: ap.Person, like: ap.Like) -> None:
|
||||
obj = like.get_object()
|
||||
# Unlikely, but an actor can like it's own post
|
||||
DB.outbox.update_one(
|
||||
{"activity.object.id": obj.id}, {"$inc": {"meta.count_like": -1}}
|
||||
)
|
||||
|
||||
DB.inbox.update_one(
|
||||
{"activity.object.id": obj.id}, {"$set": {"meta.liked": False}}
|
||||
DB.activities.update_one(
|
||||
{"activity.object.id": obj.id},
|
||||
{"$inc": {"meta.count_like": -1}, "$set": {"meta.liked": False}},
|
||||
)
|
||||
|
||||
@ensure_it_is_me
|
||||
@@ -204,57 +206,57 @@ class MicroblogPubBackend(Backend):
|
||||
"object"
|
||||
].startswith("http"):
|
||||
# TODO(tsileo): actually drop it without storing it and better logging, also move the check somewhere else
|
||||
# or remote it?
|
||||
logger.warn(
|
||||
f'received an Annouce referencing an OStatus notice ({announce._data["object"]}), dropping the message'
|
||||
)
|
||||
return
|
||||
# FIXME(tsileo): Save/cache the object, and make it part of the stream so we can fetch it
|
||||
if isinstance(announce._data["object"], str):
|
||||
obj_iri = announce._data["object"]
|
||||
else:
|
||||
obj_iri = self.get_object().id
|
||||
|
||||
DB.outbox.update_one(
|
||||
{"activity.object.id": obj_iri}, {"$inc": {"meta.count_boost": 1}}
|
||||
obj = announce.get_object()
|
||||
DB.activities.update_one(
|
||||
{"remote_id": announce.id},
|
||||
{"$set": {"meta.object": obj.to_dict(embed=True)}},
|
||||
)
|
||||
DB.activities.update_one(
|
||||
{"activity.object.id": obj.id}, {"$inc": {"meta.count_boost": 1}}
|
||||
)
|
||||
|
||||
@ensure_it_is_me
|
||||
def inbox_undo_announce(self, as_actor: ap.Person, announce: ap.Announce) -> None:
|
||||
obj = announce.get_object()
|
||||
# Update the meta counter if the object is published by the server
|
||||
DB.outbox.update_one(
|
||||
DB.activities.update_one(
|
||||
{"activity.object.id": obj.id}, {"$inc": {"meta.count_boost": -1}}
|
||||
)
|
||||
|
||||
@ensure_it_is_me
|
||||
def outbox_announce(self, as_actor: ap.Person, announce: ap.Announce) -> None:
|
||||
obj = announce.get_object()
|
||||
DB.inbox.update_one(
|
||||
DB.activities.update_one(
|
||||
{"remote_id": announce.id},
|
||||
{"$set": {"meta.object": obj.to_dict(embed=True)}},
|
||||
)
|
||||
DB.activities.update_one(
|
||||
{"activity.object.id": obj.id}, {"$set": {"meta.boosted": announce.id}}
|
||||
)
|
||||
|
||||
@ensure_it_is_me
|
||||
def outbox_undo_announce(self, as_actor: ap.Person, announce: ap.Announce) -> None:
|
||||
obj = announce.get_object()
|
||||
DB.inbox.update_one(
|
||||
DB.activities.update_one(
|
||||
{"activity.object.id": obj.id}, {"$set": {"meta.boosted": False}}
|
||||
)
|
||||
|
||||
@ensure_it_is_me
|
||||
def inbox_delete(self, as_actor: ap.Person, delete: ap.Delete) -> None:
|
||||
if not DB.inbox.find_one_and_update(
|
||||
{"activity.object.id": delete.get_object().id},
|
||||
{"$set": {"meta.deleted": True}},
|
||||
):
|
||||
DB.threads.update_one(
|
||||
{"activity.object.id": delete.get_object().id},
|
||||
{"$set": {"meta.deleted": True}},
|
||||
)
|
||||
|
||||
obj = delete.get_object()
|
||||
DB.activities.update_one(
|
||||
{"activity.object.id": obj.id}, {"$set": {"meta.deleted": True}}
|
||||
)
|
||||
|
||||
if obj.ACTIVITY_TYPE != ap.ActivityType.NOTE:
|
||||
obj = ap.parse_activity(
|
||||
DB.inbox.find_one(
|
||||
DB.activities.find_one(
|
||||
{
|
||||
"activity.object.id": delete.get_object().id,
|
||||
"type": ap.ActivityType.CREATE.value,
|
||||
@@ -268,14 +270,14 @@ class MicroblogPubBackend(Backend):
|
||||
|
||||
@ensure_it_is_me
|
||||
def outbox_delete(self, as_actor: ap.Person, delete: ap.Delete) -> None:
|
||||
DB.outbox.update_one(
|
||||
DB.activities.update_one(
|
||||
{"activity.object.id": delete.get_object().id},
|
||||
{"$set": {"meta.deleted": True}},
|
||||
)
|
||||
obj = delete.get_object()
|
||||
if delete.get_object().ACTIVITY_TYPE != ap.ActivityType.NOTE:
|
||||
obj = ap.parse_activity(
|
||||
DB.outbox.find_one(
|
||||
DB.activities.find_one(
|
||||
{
|
||||
"activity.object.id": delete.get_object().id,
|
||||
"type": ap.ActivityType.CREATE.value,
|
||||
@@ -289,15 +291,10 @@ class MicroblogPubBackend(Backend):
|
||||
def inbox_update(self, as_actor: ap.Person, update: ap.Update) -> None:
|
||||
obj = update.get_object()
|
||||
if obj.ACTIVITY_TYPE == ap.ActivityType.NOTE:
|
||||
if not DB.inbox.find_one_and_update(
|
||||
DB.activities.update_one(
|
||||
{"activity.object.id": obj.id},
|
||||
{"$set": {"activity.object": obj.to_dict()}},
|
||||
):
|
||||
DB.threads.update_one(
|
||||
{"activity.object.id": obj.id},
|
||||
{"$set": {"activity.object": obj.to_dict()}},
|
||||
)
|
||||
|
||||
)
|
||||
# FIXME(tsileo): handle update actor amd inbox_update_note/inbox_update_actor
|
||||
|
||||
@ensure_it_is_me
|
||||
@@ -322,7 +319,7 @@ class MicroblogPubBackend(Backend):
|
||||
|
||||
print(f"updating note from outbox {obj!r} {update}")
|
||||
logger.info(f"updating note from outbox {obj!r} {update}")
|
||||
DB.outbox.update_one({"activity.object.id": obj["id"]}, update)
|
||||
DB.activities.update_one({"activity.object.id": obj["id"]}, update)
|
||||
# FIXME(tsileo): should send an Update (but not a partial one, to all the note's recipients
|
||||
# (create a new Update with the result of the update, and send it without saving it?)
|
||||
|
||||
@@ -340,18 +337,10 @@ class MicroblogPubBackend(Backend):
|
||||
if not in_reply_to:
|
||||
pass
|
||||
|
||||
if not DB.inbox.find_one_and_update(
|
||||
DB.activities.update_one(
|
||||
{"activity.object.id": in_reply_to},
|
||||
{"$inc": {"meta.count_reply": -1, "meta.count_direct_reply": -1}},
|
||||
):
|
||||
if not DB.outbox.find_one_and_update(
|
||||
{"activity.object.id": in_reply_to},
|
||||
{"$inc": {"meta.count_reply": -1, "meta.count_direct_reply": -1}},
|
||||
):
|
||||
DB.threads.update_one(
|
||||
{"activity.object.id": in_reply_to},
|
||||
{"$inc": {"meta.count_reply": -1, "meta.count_direct_reply": -1}},
|
||||
)
|
||||
)
|
||||
|
||||
@ensure_it_is_me
|
||||
def _handle_replies(self, as_actor: ap.Person, create: ap.Create) -> None:
|
||||
@@ -365,24 +354,14 @@ class MicroblogPubBackend(Backend):
|
||||
root_reply = in_reply_to
|
||||
reply = ap.fetch_remote_activity(root_reply, expected=ap.ActivityType.NOTE)
|
||||
|
||||
if not DB.inbox.find_one_and_update(
|
||||
creply = DB.activities.find_one_and_update(
|
||||
{"activity.object.id": in_reply_to},
|
||||
{"$inc": {"meta.count_reply": 1, "meta.count_direct_reply": 1}},
|
||||
):
|
||||
if not DB.outbox.find_one_and_update(
|
||||
{"activity.object.id": in_reply_to},
|
||||
{"$inc": {"meta.count_reply": 1, "meta.count_direct_reply": 1}},
|
||||
):
|
||||
# It means the activity is not in the inbox, and not in the outbox, we want to save it
|
||||
DB.threads.insert_one(
|
||||
{
|
||||
"activity": reply.to_dict(),
|
||||
"type": _to_list(reply.type),
|
||||
"remote_id": reply.id,
|
||||
"meta": {"undo": False, "deleted": False},
|
||||
}
|
||||
)
|
||||
new_threads.append(reply.id)
|
||||
)
|
||||
if not creply:
|
||||
# It means the activity is not in the inbox, and not in the outbox, we want to save it
|
||||
self.save(Box.REPLIES, reply)
|
||||
new_threads.append(reply.id)
|
||||
|
||||
while reply is not None:
|
||||
in_reply_to = reply.inReplyTo
|
||||
@@ -391,25 +370,15 @@ class MicroblogPubBackend(Backend):
|
||||
root_reply = in_reply_to
|
||||
reply = ap.fetch_remote_activity(root_reply, expected=ap.ActivityType.NOTE)
|
||||
q = {"activity.object.id": root_reply}
|
||||
if not DB.inbox.count(q) and not DB.outbox.count(q):
|
||||
DB.threads.insert_one(
|
||||
{
|
||||
"activity": reply.to_dict(),
|
||||
"type": _to_list(reply.type),
|
||||
"remote_id": reply.id,
|
||||
"meta": {"undo": False, "deleted": False},
|
||||
}
|
||||
)
|
||||
if not DB.activities.count(q):
|
||||
self.save(Box.REPLIES, reply)
|
||||
new_threads.append(reply.id)
|
||||
|
||||
q = {"remote_id": create.id}
|
||||
if not DB.inbox.find_one_and_update(
|
||||
q, {"$set": {"meta.thread_root_parent": root_reply}}
|
||||
):
|
||||
DB.outbox.update_one(q, {"$set": {"meta.thread_root_parent": root_reply}})
|
||||
|
||||
DB.threads.update(
|
||||
{"remote_id": {"$in": new_threads}},
|
||||
DB.activities.update_one(
|
||||
{"remote_id": create.id}, {"$set": {"meta.thread_root_parent": root_reply}}
|
||||
)
|
||||
DB.activities.update(
|
||||
{"box": Box.REPLIES.value, "remote_id": {"$in": new_threads}},
|
||||
{"$set": {"meta.thread_root_parent": root_reply}},
|
||||
)
|
||||
|
||||
@@ -423,7 +392,9 @@ def gen_feed():
|
||||
fg.description(f"{USERNAME} notes")
|
||||
fg.logo(ME.get("icon", {}).get("url"))
|
||||
fg.language("en")
|
||||
for item in DB.outbox.find({"type": "Create"}, limit=50):
|
||||
for item in DB.activities.find(
|
||||
{"box": Box.OUTBOX.value, "type": "Create"}, limit=50
|
||||
):
|
||||
fe = fg.add_entry()
|
||||
fe.id(item["activity"]["object"].get("url"))
|
||||
fe.link(href=item["activity"]["object"].get("url"))
|
||||
@@ -435,7 +406,9 @@ def gen_feed():
|
||||
def json_feed(path: str) -> Dict[str, Any]:
|
||||
"""JSON Feed (https://jsonfeed.org/) document."""
|
||||
data = []
|
||||
for item in DB.outbox.find({"type": "Create"}, limit=50):
|
||||
for item in DB.activities.find(
|
||||
{"box": Box.OUTBOX.value, "type": "Create"}, limit=50
|
||||
):
|
||||
data.append(
|
||||
{
|
||||
"id": item["id"],
|
||||
@@ -471,11 +444,15 @@ def build_inbox_json_feed(
|
||||
data = []
|
||||
cursor = None
|
||||
|
||||
q: Dict[str, Any] = {"type": "Create", "meta.deleted": False}
|
||||
q: Dict[str, Any] = {
|
||||
"type": "Create",
|
||||
"meta.deleted": False,
|
||||
"box": Box.INBOX.value,
|
||||
}
|
||||
if request_cursor:
|
||||
q["_id"] = {"$lt": request_cursor}
|
||||
|
||||
for item in DB.inbox.find(q, limit=50).sort("_id", -1):
|
||||
for item in DB.activities.find(q, limit=50).sort("_id", -1):
|
||||
actor = ap.get_backend().fetch_iri(item["activity"]["actor"])
|
||||
data.append(
|
||||
{
|
||||
|
Reference in New Issue
Block a user