Fix the tasks and formatting
This commit is contained in:
41
tasks.py
41
tasks.py
@@ -5,18 +5,18 @@ import random
|
||||
|
||||
import requests
|
||||
from celery import Celery
|
||||
from little_boxes import activitypub as ap
|
||||
from little_boxes.httpsig import HTTPSigAuth
|
||||
from little_boxes.linked_data_sig import generate_signature
|
||||
from requests.exceptions import HTTPError
|
||||
|
||||
from little_boxes import activitypub as ap
|
||||
import activitypub
|
||||
from config import DB
|
||||
from config import HEADERS
|
||||
from config import KEY
|
||||
from config import USER_AGENT
|
||||
from little_boxes.httpsig import HTTPSigAuth
|
||||
from little_boxes.linked_data_sig import generate_signature
|
||||
from utils.opengraph import fetch_og_metadata
|
||||
from utils.media import Kind
|
||||
from config import MEDIA_CACHE
|
||||
from config import USER_AGENT
|
||||
from utils.media import Kind
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
app = Celery(
|
||||
@@ -25,6 +25,10 @@ app = Celery(
|
||||
SigAuth = HTTPSigAuth(KEY)
|
||||
|
||||
|
||||
back = activitypub.MicroblogPubBackend()
|
||||
ap.use_backend(back)
|
||||
|
||||
|
||||
@app.task(bind=True, max_retries=12)
|
||||
def process_new_activity(self, iri: str) -> None:
|
||||
try:
|
||||
@@ -32,7 +36,7 @@ def process_new_activity(self, iri: str) -> None:
|
||||
log.info(f"activity={activity!r}")
|
||||
|
||||
tag_stream = False
|
||||
if activity.has_type(ap.ActivityType.ANNOUCE):
|
||||
if activity.has_type(ap.ActivityType.ANNOUNCE):
|
||||
tag_stream = True
|
||||
elif activity.has_type(ap.ActivityType.CREATE):
|
||||
note = activity.get_object()
|
||||
@@ -40,7 +44,9 @@ def process_new_activity(self, iri: str) -> None:
|
||||
tag_stream = True
|
||||
|
||||
log.info(f"{iri} tag_stream={tag_stream}")
|
||||
DB.update_one({"remote_id": activity.id}, {"$set": {"meta.stream": tag_stream}})
|
||||
DB.activities.update_one(
|
||||
{"remote_id": activity.id}, {"$set": {"meta.stream": tag_stream}}
|
||||
)
|
||||
|
||||
log.info(f"new activity {iri} processed")
|
||||
except Exception as err:
|
||||
@@ -71,7 +77,7 @@ def cache_attachments(self, iri: str) -> None:
|
||||
for attachment in activity.get_object()._data.get("attachment", []):
|
||||
MEDIA_CACHE.cache(attachment["url"], Kind.ATTACHMENT)
|
||||
|
||||
log.info(f"attachmwents cached for {iri}")
|
||||
log.info(f"attachments cached for {iri}")
|
||||
|
||||
except Exception as err:
|
||||
log.exception(f"failed to process new activity {iri}")
|
||||
@@ -105,20 +111,3 @@ def post_to_inbox(self, payload: str, to: str) -> None:
|
||||
log.info("client error, no retry")
|
||||
return
|
||||
self.retry(exc=err, countdown=int(random.uniform(2, 4) ** self.request.retries))
|
||||
|
||||
|
||||
@app.task(bind=True, max_retries=12)
|
||||
def fetch_og(self, col, remote_id):
|
||||
try:
|
||||
log.info("fetch_og_meta remote_id=%s col=%s", remote_id, col)
|
||||
if col == "INBOX":
|
||||
log.info(
|
||||
"%d links saved", fetch_og_metadata(USER_AGENT, DB.inbox, remote_id)
|
||||
)
|
||||
elif col == "OUTBOX":
|
||||
log.info(
|
||||
"%d links saved", fetch_og_metadata(USER_AGENT, DB.outbox, remote_id)
|
||||
)
|
||||
except Exception as err:
|
||||
self.log.exception("failed")
|
||||
self.retry(exc=err, countdown=int(random.uniform(2, 4) ** self.request.retries))
|
||||
|
Reference in New Issue
Block a user