Move stuff arround

This commit is contained in:
Thomas Sileo
2019-08-01 21:44:35 +02:00
parent 922c9bebb2
commit cd2ae3d7b0
12 changed files with 112 additions and 121 deletions

182
core/gc.py Normal file
View File

@@ -0,0 +1,182 @@
import logging
from datetime import datetime
from datetime import timedelta
from time import perf_counter
from typing import Any
from typing import Dict
from typing import List
from little_boxes import activitypub as ap
import activitypub
from activitypub import Box
from config import DAYS_TO_KEEP
from config import ID
from config import ME
from config import MEDIA_CACHE
from utils.migrations import DB
back = activitypub.MicroblogPubBackend()
ap.use_backend(back)
MY_PERSON = ap.Person(**ME)
logger = logging.getLogger(__name__)
def threads_of_interest() -> List[str]:
out = set()
# Fetch all the threads we've participed in
for data in DB.activities.find(
{
"meta.thread_root_parent": {"$exists": True},
"box": Box.OUTBOX.value,
"type": ap.ActivityType.CREATE.value,
}
):
out.add(data["meta"]["thread_root_parent"])
# Fetch all threads related to bookmarked activities
for data in DB.activities.find({"meta.bookmarked": True}):
# Keep the replies
out.add(data["meta"]["object_id"])
# And the whole thread if any
if "thread_root_parent" in data["meta"]:
out.add(data["meta"]["thread_root_parent"])
return list(out)
def _keep(data: Dict[str, Any]) -> None:
DB.activities.update_one({"_id": data["_id"]}, {"$set": {"meta.gc_keep": True}})
def perform() -> None: # noqa: C901
start = perf_counter()
d = (datetime.utcnow() - timedelta(days=DAYS_TO_KEEP)).strftime("%Y-%m-%d")
toi = threads_of_interest()
logger.info(f"thread_of_interest={toi!r}")
create_deleted = 0
create_count = 0
# Go over the old Create activities
for data in DB.activities.find(
{
"box": Box.INBOX.value,
"type": ap.ActivityType.CREATE.value,
"activity.published": {"$lt": d},
"meta.gc_keep": {"$exists": False},
}
).limit(500):
try:
create_count += 1
remote_id = data["remote_id"]
meta = data["meta"]
activity = ap.parse_activity(data["activity"])
logger.info(f"activity={activity!r}")
# This activity has been bookmarked, keep it
if meta.get("bookmarked"):
_keep(data)
continue
# Inspect the object
obj = activity.get_object()
# This activity mentions the server actor, keep it
if obj.has_mention(ID):
_keep(data)
continue
# This activity is a direct reply of one the server actor activity, keep it
in_reply_to = obj.get_in_reply_to()
if in_reply_to and in_reply_to.startswith(ID):
_keep(data)
continue
# This activity is part of a thread we want to keep, keep it
if in_reply_to and meta.get("thread_root_parent"):
thread_root_parent = meta["thread_root_parent"]
if thread_root_parent.startswith(ID) or thread_root_parent in toi:
_keep(data)
continue
# This activity was boosted or liked, keep it
if meta.get("boosted") or meta.get("liked"):
_keep(data)
continue
# TODO(tsileo): remove after tests
if meta.get("keep"):
logger.warning(
f"{activity!r} would not have been deleted, skipping for now"
)
_keep(data)
continue
# Delete the cached attachment
for grid_item in MEDIA_CACHE.fs.find({"remote_id": remote_id}):
MEDIA_CACHE.fs.delete(grid_item._id)
# Delete the activity
DB.activities.delete_one({"_id": data["_id"]})
create_deleted += 1
except Exception:
logger.exception(f"failed to process {data!r}")
after_gc_create = perf_counter()
time_to_gc_create = after_gc_create - start
logger.info(
f"{time_to_gc_create:.2f} seconds to analyze {create_count} Create, {create_deleted} deleted"
)
announce_count = 0
announce_deleted = 0
# Go over the old Create activities
for data in DB.activities.find(
{
"box": Box.INBOX.value,
"type": ap.ActivityType.ANNOUNCE.value,
"activity.published": {"$lt": d},
"meta.gc_keep": {"$exists": False},
}
).limit(500):
try:
announce_count += 1
remote_id = data["remote_id"]
meta = data["meta"]
activity = ap.parse_activity(data["activity"])
logger.info(f"activity={activity!r}")
# This activity has been bookmarked, keep it
if meta.get("bookmarked"):
_keep(data)
continue
object_id = activity.get_object_id()
# This announce is for a local activity (i.e. from the outbox), keep it
if object_id.startswith(ID):
_keep(data)
continue
for grid_item in MEDIA_CACHE.fs.find({"remote_id": remote_id}):
MEDIA_CACHE.fs.delete(grid_item._id)
# TODO(tsileo): here for legacy reason, this needs to be removed at some point
for grid_item in MEDIA_CACHE.fs.find({"remote_id": object_id}):
MEDIA_CACHE.fs.delete(grid_item._id)
# Delete the activity
DB.activities.delete_one({"_id": data["_id"]})
announce_deleted += 1
except Exception:
logger.exception(f"failed to process {data!r}")
after_gc_announce = perf_counter()
time_to_gc_announce = after_gc_announce - after_gc_create
logger.info(
f"{time_to_gc_announce:.2f} seconds to analyze {announce_count} Announce, {announce_deleted} deleted"
)

76
core/indexes.py Normal file
View File

@@ -0,0 +1,76 @@
import pymongo
from config import DB
from utils.meta import _meta
from utils.meta import MetaKey
def create_indexes():
if "trash" not in DB.collection_names():
DB.create_collection("trash", capped=True, size=50 << 20) # 50 MB
DB.command("compact", "activities")
DB.activities.create_index([(_meta(MetaKey.NOTIFICATION), pymongo.ASCENDING)])
DB.activities.create_index(
[(_meta(MetaKey.NOTIFICATION_UNREAD), pymongo.ASCENDING)]
)
DB.activities.create_index([("remote_id", pymongo.ASCENDING)])
DB.activities.create_index([("activity.object.id", pymongo.ASCENDING)])
DB.activities.create_index([("meta.thread_root_parent", pymongo.ASCENDING)])
DB.activities.create_index(
[
("meta.thread_root_parent", pymongo.ASCENDING),
("meta.deleted", pymongo.ASCENDING),
]
)
DB.activities.create_index(
[("activity.object.id", pymongo.ASCENDING), ("meta.deleted", pymongo.ASCENDING)]
)
DB.cache2.create_index(
[
("path", pymongo.ASCENDING),
("type", pymongo.ASCENDING),
("arg", pymongo.ASCENDING),
]
)
DB.cache2.create_index("date", expireAfterSeconds=3600 * 12)
# Index for the block query
DB.activities.create_index(
[
("box", pymongo.ASCENDING),
("type", pymongo.ASCENDING),
("meta.undo", pymongo.ASCENDING),
]
)
# Index for count queries
DB.activities.create_index(
[
("box", pymongo.ASCENDING),
("type", pymongo.ASCENDING),
("meta.undo", pymongo.ASCENDING),
("meta.deleted", pymongo.ASCENDING),
]
)
DB.activities.create_index([("box", pymongo.ASCENDING)])
# Outbox query
DB.activities.create_index(
[
("box", pymongo.ASCENDING),
("type", pymongo.ASCENDING),
("meta.undo", pymongo.ASCENDING),
("meta.deleted", pymongo.ASCENDING),
("meta.public", pymongo.ASCENDING),
]
)
DB.activities.create_index(
[
("type", pymongo.ASCENDING),
("activity.object.type", pymongo.ASCENDING),
("activity.object.inReplyTo", pymongo.ASCENDING),
("meta.deleted", pymongo.ASCENDING),
]
)

155
core/migrations.py Normal file
View File

@@ -0,0 +1,155 @@
"""Migrations that will be run automatically at startup."""
from typing import Any
from typing import Dict
from urllib.parse import urlparse
from little_boxes import activitypub as ap
import activitypub
from config import ID
from utils.migrations import DB
from utils.migrations import Migration
from utils.migrations import logger
from utils.migrations import perform # noqa: just here for export
back = activitypub.MicroblogPubBackend()
ap.use_backend(back)
class _1_MetaMigration(Migration):
"""Add new metadata to simplify querying."""
def __guess_visibility(self, data: Dict[str, Any]) -> ap.Visibility:
to = data.get("to", [])
cc = data.get("cc", [])
if ap.AS_PUBLIC in to:
return ap.Visibility.PUBLIC
elif ap.AS_PUBLIC in cc:
return ap.Visibility.UNLISTED
else:
# Uses a bit of heuristic here, it's too expensive to fetch the actor, so assume the followers
# collection has "/collection" in it (which is true for most software), and at worst, we will
# classify it as "DIRECT" which behave the same as "FOLLOWERS_ONLY" (i.e. no Announce)
followers_only = False
for item in to:
if "/followers" in item:
followers_only = True
break
if not followers_only:
for item in cc:
if "/followers" in item:
followers_only = True
break
if followers_only:
return ap.Visibility.FOLLOWERS_ONLY
return ap.Visibility.DIRECT
def migrate(self) -> None: # noqa: C901 # too complex
for data in DB.activities.find():
logger.info(f"before={data}")
obj = data["activity"].get("object")
set_meta: Dict[str, Any] = {}
# Set `meta.object_id` (str)
if not data["meta"].get("object_id"):
set_meta["meta.object_id"] = None
if obj:
if isinstance(obj, str):
set_meta["meta.object_id"] = data["activity"]["object"]
elif isinstance(obj, dict):
obj_id = obj.get("id")
if obj_id:
set_meta["meta.object_id"] = obj_id
# Set `meta.object_visibility` (str)
if not data["meta"].get("object_visibility"):
set_meta["meta.object_visibility"] = None
object_id = data["meta"].get("object_id") or set_meta.get(
"meta.object_id"
)
if object_id:
obj = data["meta"].get("object") or data["activity"].get("object")
if isinstance(obj, dict):
set_meta["meta.object_visibility"] = self.__guess_visibility(
obj
).name
# Set `meta.actor_id` (str)
if not data["meta"].get("actor_id"):
set_meta["meta.actor_id"] = None
actor = data["activity"].get("actor")
if actor:
if isinstance(actor, str):
set_meta["meta.actor_id"] = data["activity"]["actor"]
elif isinstance(actor, dict):
actor_id = actor.get("id")
if actor_id:
set_meta["meta.actor_id"] = actor_id
# Set `meta.poll_answer` (bool)
if not data["meta"].get("poll_answer"):
set_meta["meta.poll_answer"] = False
if obj:
if isinstance(obj, dict):
if (
obj.get("name")
and not obj.get("content")
and obj.get("inReplyTo")
):
set_meta["meta.poll_answer"] = True
# Set `meta.visibility` (str)
if not data["meta"].get("visibility"):
set_meta["meta.visibility"] = self.__guess_visibility(
data["activity"]
).name
if not data["meta"].get("server"):
set_meta["meta.server"] = urlparse(data["remote_id"]).netloc
logger.info(f"meta={set_meta}\n")
if set_meta:
DB.activities.update_one({"_id": data["_id"]}, {"$set": set_meta})
class _2_FollowMigration(Migration):
"""Add new metadata to update the cached actor in Follow activities."""
def migrate(self) -> None:
actor_cache: Dict[str, Dict[str, Any]] = {}
for data in DB.activities.find({"type": ap.ActivityType.FOLLOW.value}):
try:
if data["meta"]["actor_id"] == ID:
# It's a "following"
actor = actor_cache.get(data["meta"]["object_id"])
if not actor:
actor = ap.parse_activity(
ap.get_backend().fetch_iri(
data["meta"]["object_id"], no_cache=True
)
).to_dict(embed=True)
if not actor:
raise ValueError(f"missing actor {data!r}")
actor_cache[actor["id"]] = actor
DB.activities.update_one(
{"_id": data["_id"]}, {"$set": {"meta.object": actor}}
)
else:
# It's a "followers"
actor = actor_cache.get(data["meta"]["actor_id"])
if not actor:
actor = ap.parse_activity(
ap.get_backend().fetch_iri(
data["meta"]["actor_id"], no_cache=True
)
).to_dict(embed=True)
if not actor:
raise ValueError(f"missing actor {data!r}")
actor_cache[actor["id"]] = actor
DB.activities.update_one(
{"_id": data["_id"]}, {"$set": {"meta.actor": actor}}
)
except Exception:
logger.exception("failed to process actor {data!r}")

232
core/shared.py Normal file
View File

@@ -0,0 +1,232 @@
import os
from datetime import datetime
from datetime import timezone
from functools import wraps
from typing import Any
from typing import Dict
from typing import Union
import flask
import werkzeug
from bson.objectid import ObjectId
from flask import current_app as app
from flask import redirect
from flask import request
from flask import session
from flask import url_for
from flask_wtf.csrf import CSRFProtect
from little_boxes import activitypub as ap
from little_boxes.activitypub import format_datetime
from poussetaches import PousseTaches
import activitypub
from activitypub import Box
from activitypub import _answer_key
from config import DB
from config import ME
from tasks import Tasks
_Response = Union[flask.Response, werkzeug.wrappers.Response, str]
p = PousseTaches(
os.getenv("MICROBLOGPUB_POUSSETACHES_HOST", "http://localhost:7991"),
os.getenv("MICROBLOGPUB_INTERNAL_HOST", "http://localhost:5000"),
)
csrf = CSRFProtect()
back = activitypub.MicroblogPubBackend()
ap.use_backend(back)
MY_PERSON = ap.Person(**ME)
def add_response_headers(headers={}):
"""This decorator adds the headers passed in to the response"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
resp = flask.make_response(f(*args, **kwargs))
h = resp.headers
for header, value in headers.items():
h[header] = value
return resp
return decorated_function
return decorator
def noindex(f):
"""This decorator passes X-Robots-Tag: noindex, nofollow"""
return add_response_headers({"X-Robots-Tag": "noindex, nofollow"})(f)
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not session.get("logged_in"):
return redirect(url_for("admin_login", next=request.url))
return f(*args, **kwargs)
return decorated_function
def _get_ip():
"""Guess the IP address from the request. Only used for security purpose (failed logins or bad payload).
Geoip will be returned if the "broxy" headers are set (it does Geoip
using an offline database and append these special headers).
"""
ip = request.headers.get("X-Forwarded-For", request.remote_addr)
geoip = None
if request.headers.get("Broxy-Geoip-Country"):
geoip = (
request.headers.get("Broxy-Geoip-Country")
+ "/"
+ request.headers.get("Broxy-Geoip-Region")
)
return ip, geoip
def post_to_outbox(activity: ap.BaseActivity) -> str:
if activity.has_type(ap.CREATE_TYPES):
activity = activity.build_create()
# Assign create a random ID
obj_id = back.random_object_id()
activity.set_id(back.activity_url(obj_id), obj_id)
back.save(Box.OUTBOX, activity)
Tasks.cache_actor(activity.id)
Tasks.finish_post_to_outbox(activity.id)
return activity.id
def _build_thread(data, include_children=True): # noqa: C901
data["_requested"] = True
app.logger.info(f"_build_thread({data!r})")
root_id = data["meta"].get("thread_root_parent", data["activity"]["object"]["id"])
query = {
"$or": [{"meta.thread_root_parent": root_id}, {"activity.object.id": root_id}],
"meta.deleted": False,
}
replies = [data]
for dat in DB.activities.find(query):
print(dat["type"])
if dat["type"][0] == ap.ActivityType.CREATE.value:
replies.append(dat)
if dat["type"][0] == ap.ActivityType.UPDATE.value:
continue
else:
# Make a Note/Question/... looks like a Create
dat = {
"activity": {"object": dat["activity"]},
"meta": dat["meta"],
"_id": dat["_id"],
}
replies.append(dat)
replies = sorted(replies, key=lambda d: d["activity"]["object"]["published"])
# Index all the IDs in order to build a tree
idx = {}
replies2 = []
for rep in replies:
rep_id = rep["activity"]["object"]["id"]
if rep_id in idx:
continue
idx[rep_id] = rep.copy()
idx[rep_id]["_nodes"] = []
replies2.append(rep)
# Build the tree
for rep in replies2:
rep_id = rep["activity"]["object"]["id"]
if rep_id == root_id:
continue
reply_of = ap._get_id(rep["activity"]["object"].get("inReplyTo"))
try:
idx[reply_of]["_nodes"].append(rep)
except KeyError:
app.logger.info(f"{reply_of} is not there! skipping {rep}")
# Flatten the tree
thread = []
def _flatten(node, level=0):
node["_level"] = level
thread.append(node)
for snode in sorted(
idx[node["activity"]["object"]["id"]]["_nodes"],
key=lambda d: d["activity"]["object"]["published"],
):
_flatten(snode, level=level + 1)
try:
_flatten(idx[root_id])
except KeyError:
app.logger.info(f"{root_id} is not there! skipping")
return thread
def paginated_query(db, q, limit=25, sort_key="_id"):
older_than = newer_than = None
query_sort = -1
first_page = not request.args.get("older_than") and not request.args.get(
"newer_than"
)
query_older_than = request.args.get("older_than")
query_newer_than = request.args.get("newer_than")
if query_older_than:
q["_id"] = {"$lt": ObjectId(query_older_than)}
elif query_newer_than:
q["_id"] = {"$gt": ObjectId(query_newer_than)}
query_sort = 1
outbox_data = list(db.find(q, limit=limit + 1).sort(sort_key, query_sort))
outbox_len = len(outbox_data)
outbox_data = sorted(
outbox_data[:limit], key=lambda x: str(x[sort_key]), reverse=True
)
if query_older_than:
newer_than = str(outbox_data[0]["_id"])
if outbox_len == limit + 1:
older_than = str(outbox_data[-1]["_id"])
elif query_newer_than:
older_than = str(outbox_data[-1]["_id"])
if outbox_len == limit + 1:
newer_than = str(outbox_data[0]["_id"])
elif first_page and outbox_len == limit + 1:
older_than = str(outbox_data[-1]["_id"])
return outbox_data, older_than, newer_than
def _add_answers_to_question(raw_doc: Dict[str, Any]) -> None:
activity = raw_doc["activity"]
if (
ap._has_type(activity["type"], ap.ActivityType.CREATE)
and "object" in activity
and ap._has_type(activity["object"]["type"], ap.ActivityType.QUESTION)
):
for choice in activity["object"].get("oneOf", activity["object"].get("anyOf")):
choice["replies"] = {
"type": ap.ActivityType.COLLECTION.value,
"totalItems": raw_doc["meta"]
.get("question_answers", {})
.get(_answer_key(choice["name"]), 0),
}
now = datetime.now(timezone.utc)
if format_datetime(now) >= activity["object"]["endTime"]:
activity["object"]["closed"] = activity["object"]["endTime"]