Properly cache the actor
This commit is contained in:
44
tasks.py
44
tasks.py
@@ -2,6 +2,8 @@ import json
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
from typing import Dict
|
||||
from typing import Any
|
||||
|
||||
import requests
|
||||
from celery import Celery
|
||||
@@ -54,14 +56,48 @@ def process_new_activity(self, iri: str) -> None:
|
||||
)
|
||||
|
||||
log.info(f"new activity {iri} processed")
|
||||
cache_attachments.delay(iri)
|
||||
cache_actor.delay(iri)
|
||||
except (ActivityGoneError, ActivityNotFoundError):
|
||||
log.exception(f"dropping activity {iri}")
|
||||
log.exception(f"dropping activity {iri}, skip processing")
|
||||
except Exception as err:
|
||||
log.exception(f"failed to process new activity {iri}")
|
||||
self.retry(exc=err, countdown=int(random.uniform(2, 4) ** self.request.retries))
|
||||
|
||||
|
||||
def _actor_to_meta(actor: ap.BaseActivity) -> Dict[str, Any]:
|
||||
return {
|
||||
"url": actor.url,
|
||||
"icon": actor.icon,
|
||||
"name": actor.name,
|
||||
"preferredUsername": actor.preferredUsername,
|
||||
}
|
||||
|
||||
|
||||
@app.task(bind=True, max_retries=12)
|
||||
def cache_actor(self, iri: str, also_cache_attachments: bool = True) -> None:
|
||||
try:
|
||||
activity = ap.fetch_remote_activity(iri)
|
||||
log.info(f"activity={activity!r}")
|
||||
|
||||
actor = activity.get_actor()
|
||||
|
||||
# Cache the actor info
|
||||
DB.activities.update_one(
|
||||
{"remote_id": iri}, {"$set": {"meta.actor": _actor_to_meta(actor)}}
|
||||
)
|
||||
|
||||
log.info(f"actor cached for {iri}")
|
||||
if also_cache_attachments:
|
||||
cache_attachments.delay(iri)
|
||||
|
||||
except (ActivityGoneError, ActivityNotFoundError):
|
||||
DB.activities.update_one({"remote_id": iri}, {"$set": {"meta.deleted": True}})
|
||||
log.exception(f"flagging activity {iri} as deleted, no actor caching")
|
||||
except Exception as err:
|
||||
log.exception(f"failed to cache actor for {iri}")
|
||||
self.retry(exc=err, countdown=int(random.uniform(2, 4) ** self.request.retries))
|
||||
|
||||
|
||||
@app.task(bind=True, max_retries=12)
|
||||
def cache_attachments(self, iri: str) -> None:
|
||||
try:
|
||||
@@ -88,9 +124,9 @@ def cache_attachments(self, iri: str) -> None:
|
||||
log.info(f"attachments cached for {iri}")
|
||||
|
||||
except (ActivityGoneError, ActivityNotFoundError):
|
||||
log.exception(f"dropping activity {iri}")
|
||||
log.exception(f"dropping activity {iri}, no attachment caching")
|
||||
except Exception as err:
|
||||
log.exception(f"failed to process new activity {iri}")
|
||||
log.exception(f"failed to cache attachments for {iri}")
|
||||
self.retry(exc=err, countdown=int(random.uniform(2, 4) ** self.request.retries))
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user