Disable old cleanup and start working on GC, clean media cache
This commit is contained in:
208
app.py
208
app.py
@@ -19,6 +19,7 @@ from urllib.parse import urlparse
|
||||
|
||||
import bleach
|
||||
import emoji_unicode
|
||||
import html2text
|
||||
import mf2py
|
||||
import requests
|
||||
import timeago
|
||||
@@ -34,7 +35,6 @@ from flask import request
|
||||
from flask import session
|
||||
from flask import url_for
|
||||
from flask_wtf.csrf import CSRFProtect
|
||||
import html2text
|
||||
from itsdangerous import BadSignature
|
||||
from little_boxes import activitypub as ap
|
||||
from little_boxes.activitypub import ActivityType
|
||||
@@ -2675,14 +2675,14 @@ def task_fetch_og_meta():
|
||||
for og in og_metadata:
|
||||
if not og.get("image"):
|
||||
continue
|
||||
MEDIA_CACHE.cache_og_image2(og["image"], iri)
|
||||
MEDIA_CACHE.cache_og_image(og["image"], iri)
|
||||
|
||||
app.logger.debug(f"OG metadata {og_metadata!r}")
|
||||
DB.activities.update_one(
|
||||
{"remote_id": iri}, {"$set": {"meta.og_metadata": og_metadata}}
|
||||
)
|
||||
|
||||
app.logger.info(f"OG metadata fetched for {iri}")
|
||||
app.logger.info(f"OG metadata fetched for {iri}: {og_metadata}")
|
||||
except (ActivityGoneError, ActivityNotFoundError):
|
||||
app.logger.exception(f"dropping activity {iri}, skip OG metedata")
|
||||
return ""
|
||||
@@ -2893,33 +2893,12 @@ def task_cache_attachments():
|
||||
app.logger.info(f"activity={activity!r}")
|
||||
# Generates thumbnails for the actor's icon and the attachments if any
|
||||
|
||||
actor = activity.get_actor()
|
||||
|
||||
# Update the cached actor
|
||||
DB.actors.update_one(
|
||||
{"remote_id": iri},
|
||||
{"$set": {"remote_id": iri, "data": actor.to_dict(embed=True)}},
|
||||
upsert=True,
|
||||
)
|
||||
|
||||
if actor.icon:
|
||||
MEDIA_CACHE.cache(actor.icon["url"], Kind.ACTOR_ICON)
|
||||
|
||||
obj = None
|
||||
if activity.has_type(ap.ActivityType.CREATE):
|
||||
# This means a `Create` triggered the task
|
||||
obj = activity.get_object()
|
||||
elif activity.has_type(ap.CREATE_TYPES):
|
||||
# This means a `Announce` triggered the task
|
||||
obj = activity
|
||||
else:
|
||||
app.logger.warning(f"Don't know what to do with {activity!r}")
|
||||
return
|
||||
obj = activity.get_object()
|
||||
|
||||
# Iter the attachments
|
||||
for attachment in obj._data.get("attachment", []):
|
||||
try:
|
||||
MEDIA_CACHE.cache_attachment2(attachment, iri)
|
||||
MEDIA_CACHE.cache_attachment(attachment, iri)
|
||||
except ValueError:
|
||||
app.logger.exception(f"failed to cache {attachment}")
|
||||
|
||||
@@ -2938,22 +2917,28 @@ def task_cache_attachments():
|
||||
def task_cache_actor() -> str:
|
||||
task = p.parse(request)
|
||||
app.logger.info(f"task={task!r}")
|
||||
iri, also_cache_attachments = (
|
||||
task.payload["iri"],
|
||||
task.payload.get("also_cache_attachments", True),
|
||||
)
|
||||
iri = task.payload["iri"]
|
||||
try:
|
||||
activity = ap.fetch_remote_activity(iri)
|
||||
app.logger.info(f"activity={activity!r}")
|
||||
|
||||
# FIXME(tsileo): OG meta for Announce?
|
||||
# Fetch the Open Grah metadata if it's a `Create`
|
||||
if activity.has_type(ap.ActivityType.CREATE):
|
||||
Tasks.fetch_og_meta(iri)
|
||||
|
||||
if activity.has_type([ap.ActivityType.LIKE, ap.ActivityType.ANNOUNCE]):
|
||||
# Cache the object if it's a `Like` or an `Announce` unrelated to the server outbox (because it will never get
|
||||
# displayed)
|
||||
if activity.has_type(
|
||||
[ap.ActivityType.LIKE, ap.ActivityType.ANNOUNCE]
|
||||
) and not activity.get_object_id().startswith(BASE_URL):
|
||||
Tasks.cache_object(iri)
|
||||
|
||||
actor = activity.get_actor()
|
||||
if actor.icon:
|
||||
if isinstance(actor.icon, dict) and "url" in actor.icon:
|
||||
MEDIA_CACHE.cache_actor_icon(actor.icon["url"])
|
||||
else:
|
||||
app.logger.warning(f"failed to parse icon {actor.icon} for {iri}")
|
||||
|
||||
if activity.has_type(ap.ActivityType.FOLLOW):
|
||||
if actor.id == ID:
|
||||
@@ -2973,12 +2958,8 @@ def task_cache_actor() -> str:
|
||||
)
|
||||
|
||||
app.logger.info(f"actor cached for {iri}")
|
||||
if also_cache_attachments and activity.has_type(ap.ActivityType.CREATE):
|
||||
if activity.has_type([ap.ActivityType.CREATE, ap.ActivityType.ANNOUNCE]):
|
||||
Tasks.cache_attachments(iri)
|
||||
elif also_cache_attachments and activity.has_type(ap.ActivityType.ANNOUNCE):
|
||||
obj = activity.get_object()
|
||||
Tasks.cache_attachments(obj.id)
|
||||
Tasks.cache_actor(obj.id)
|
||||
|
||||
except (ActivityGoneError, ActivityNotFoundError):
|
||||
DB.activities.update_one({"remote_id": iri}, {"$set": {"meta.deleted": True}})
|
||||
@@ -3291,129 +3272,13 @@ def task_update_question():
|
||||
def task_cleanup():
|
||||
task = p.parse(request)
|
||||
app.logger.info(f"task={task!r}")
|
||||
p.push({}, "/task/cleanup_part_1")
|
||||
# p.push({}, "/task/cleanup_part_1")
|
||||
return ""
|
||||
|
||||
|
||||
@app.route("/task/cleanup_part_1", methods=["POST"])
|
||||
def task_cleanup_part_1():
|
||||
task = p.parse(request)
|
||||
app.logger.info(f"task={task!r}")
|
||||
d = (datetime.utcnow() - timedelta(days=15)).strftime("%Y-%m-%d")
|
||||
|
||||
# (We keep Follow and Accept forever)
|
||||
|
||||
# Announce and Like cleanup
|
||||
for ap_type in [ActivityType.ANNOUNCE, ActivityType.LIKE]:
|
||||
# Migrate old (before meta.keep activities on the fly)
|
||||
DB.activities.update_many(
|
||||
{
|
||||
"box": Box.INBOX.value,
|
||||
"type": ap_type.value,
|
||||
"meta.keep": {"$exists": False},
|
||||
"activity.object": {"$regex": f"^{BASE_URL}"},
|
||||
},
|
||||
{"$set": {"meta.keep": True}},
|
||||
)
|
||||
|
||||
DB.activities.update_many(
|
||||
{
|
||||
"box": Box.INBOX.value,
|
||||
"type": ap_type.value,
|
||||
"meta.keep": {"$exists": False},
|
||||
"activity.object.id": {"$regex": f"^{BASE_URL}"},
|
||||
},
|
||||
{"$set": {"meta.keep": True}},
|
||||
)
|
||||
|
||||
DB.activities.update_many(
|
||||
{
|
||||
"box": Box.INBOX.value,
|
||||
"type": ap_type.value,
|
||||
"meta.keep": {"$exists": False},
|
||||
},
|
||||
{"$set": {"meta.keep": False}},
|
||||
)
|
||||
# End of the migration
|
||||
|
||||
# Delete old activities
|
||||
DB.activities.delete_many(
|
||||
{
|
||||
"box": Box.INBOX.value,
|
||||
"type": ap_type.value,
|
||||
"meta.keep": False,
|
||||
"activity.published": {"$lt": d},
|
||||
}
|
||||
)
|
||||
|
||||
# And delete the soft-deleted one
|
||||
DB.activities.delete_many(
|
||||
{
|
||||
"box": Box.INBOX.value,
|
||||
"type": ap_type.value,
|
||||
"meta.keep": False,
|
||||
"meta.deleted": True,
|
||||
}
|
||||
)
|
||||
|
||||
# Create cleanup (more complicated)
|
||||
# The one that mention our actor
|
||||
DB.activities.update_many(
|
||||
{
|
||||
"box": Box.INBOX.value,
|
||||
"meta.keep": {"$exists": False},
|
||||
"activity.object.tag.href": {"$regex": f"^{BASE_URL}"},
|
||||
},
|
||||
{"$set": {"meta.keep": True}},
|
||||
)
|
||||
DB.activities.update_many(
|
||||
{
|
||||
"box": Box.REPLIES.value,
|
||||
"meta.keep": {"$exists": False},
|
||||
"activity.tag.href": {"$regex": f"^{BASE_URL}"},
|
||||
},
|
||||
{"$set": {"meta.keep": True}},
|
||||
)
|
||||
|
||||
# The replies of the outbox
|
||||
DB.activities.update_many(
|
||||
{"meta.thread_root_parent": {"$regex": f"^{BASE_URL}"}},
|
||||
{"$set": {"meta.keep": True}},
|
||||
)
|
||||
# Track all the threads we participated
|
||||
keep_threads = []
|
||||
for data in DB.activities.find(
|
||||
{
|
||||
"box": Box.OUTBOX.value,
|
||||
"type": ActivityType.CREATE.value,
|
||||
"meta.thread_root_parent": {"$exists": True},
|
||||
}
|
||||
):
|
||||
keep_threads.append(data["meta"]["thread_root_parent"])
|
||||
|
||||
for root_parent in set(keep_threads):
|
||||
DB.activities.update_many(
|
||||
{"meta.thread_root_parent": root_parent}, {"$set": {"meta.keep": True}}
|
||||
)
|
||||
|
||||
DB.activities.update_many(
|
||||
{
|
||||
"box": {"$in": [Box.REPLIES.value, Box.INBOX.value]},
|
||||
"meta.keep": {"$exists": False},
|
||||
},
|
||||
{"$set": {"meta.keep": False}},
|
||||
)
|
||||
|
||||
DB.activities.update_many(
|
||||
{
|
||||
"box": Box.OUTBOX.value,
|
||||
"type": {"$in": [ActivityType.CREATE.value, ActivityType.ANNOUNCE.value]},
|
||||
"meta.public": {"$exists": False},
|
||||
},
|
||||
{"$set": {"meta.public": True}},
|
||||
)
|
||||
|
||||
p.push({}, "/task/cleanup_part_2")
|
||||
return "OK"
|
||||
|
||||
|
||||
@@ -3421,25 +3286,6 @@ def task_cleanup_part_1():
|
||||
def task_cleanup_part_2():
|
||||
task = p.parse(request)
|
||||
app.logger.info(f"task={task!r}")
|
||||
d = (datetime.utcnow() - timedelta(days=15)).strftime("%Y-%m-%d")
|
||||
|
||||
# Go over the old Create activities
|
||||
for data in DB.activities.find(
|
||||
{
|
||||
"box": Box.INBOX.value,
|
||||
"type": ActivityType.CREATE.value,
|
||||
"meta.keep": False,
|
||||
"activity.published": {"$lt": d},
|
||||
}
|
||||
).limit(5000):
|
||||
# Delete the cached attachment/
|
||||
for grid_item in MEDIA_CACHE.fs.find({"remote_id": data["remote_id"]}):
|
||||
MEDIA_CACHE.fs.delete(grid_item._id)
|
||||
DB.activities.delete_one({"_id": data["_id"]})
|
||||
|
||||
# FIXME(tsileo): cleanup cache from announces object
|
||||
|
||||
p.push({}, "/task/cleanup_part_3")
|
||||
return "OK"
|
||||
|
||||
|
||||
@@ -3447,20 +3293,4 @@ def task_cleanup_part_2():
|
||||
def task_cleanup_part_3():
|
||||
task = p.parse(request)
|
||||
app.logger.info(f"task={task!r}")
|
||||
|
||||
d = (datetime.utcnow() - timedelta(days=15)).strftime("%Y-%m-%d")
|
||||
|
||||
# Delete old replies we don't care about
|
||||
DB.activities.delete_many(
|
||||
{"box": Box.REPLIES.value, "meta.keep": False, "activity.published": {"$lt": d}}
|
||||
)
|
||||
|
||||
# Remove all the attachments no tied to a remote_id (post celery migration)
|
||||
for grid_item in MEDIA_CACHE.fs.find(
|
||||
{"kind": {"$in": ["og", "attachment"]}, "remote_id": {"$exists": False}}
|
||||
):
|
||||
MEDIA_CACHE.fs.delete(grid_item._id)
|
||||
|
||||
# TODO(tsileo): iterator over "actor_icon" and look for unused one in a separate task
|
||||
|
||||
return "OK"
|
||||
|
Reference in New Issue
Block a user