Custom emojis support
This commit is contained in:
47
utils/emojis.py
Normal file
47
utils/emojis.py
Normal file
@@ -0,0 +1,47 @@
|
||||
import mimetypes
|
||||
import re
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from typing import Dict
|
||||
from typing import List
|
||||
from typing import Set
|
||||
|
||||
from little_boxes import activitypub as ap
|
||||
|
||||
EMOJI_REGEX = re.compile(r"(:[\d\w]+:)")
|
||||
|
||||
EMOJIS: Dict[str, ap.Emoji] = {}
|
||||
EMOJIS_BY_NAME: Dict[str, ap.Emoji] = {}
|
||||
|
||||
|
||||
def _load_emojis(root_dir: Path, base_url: str) -> None:
|
||||
if EMOJIS:
|
||||
return
|
||||
for emoji in (root_dir / "static" / "emojis").iterdir():
|
||||
mt = mimetypes.guess_type(emoji.name)[0]
|
||||
if mt and mt.startswith("image/"):
|
||||
name = emoji.name.split(".")[0]
|
||||
ap_emoji = ap.Emoji(
|
||||
name=f":{name}:",
|
||||
updated=ap.format_datetime(datetime.fromtimestamp(0.0).astimezone()),
|
||||
id=f"{base_url}/emoji/{name}",
|
||||
icon={
|
||||
"mediaType": mt,
|
||||
"type": ap.ActivityType.IMAGE.value,
|
||||
"url": f"{base_url}/static/emojis/{emoji.name}",
|
||||
},
|
||||
)
|
||||
EMOJIS[emoji.name] = ap_emoji
|
||||
EMOJIS_BY_NAME[ap_emoji.name] = ap_emoji
|
||||
|
||||
|
||||
def tags(content: str) -> List[Dict[str, Any]]:
|
||||
tags: List[Dict[str, Any]] = []
|
||||
added: Set[str] = set()
|
||||
for e in re.findall(EMOJI_REGEX, content):
|
||||
if e not in added and e in EMOJIS_BY_NAME:
|
||||
tags.append(EMOJIS_BY_NAME[e].to_dict())
|
||||
added.add(e)
|
||||
|
||||
return tags
|
@@ -5,8 +5,11 @@ from enum import unique
|
||||
from functools import lru_cache
|
||||
from gzip import GzipFile
|
||||
from io import BytesIO
|
||||
from shutil import copyfileobj
|
||||
from typing import Any
|
||||
from typing import Dict
|
||||
from typing import Optional
|
||||
from typing import Tuple
|
||||
|
||||
import gridfs
|
||||
import piexif
|
||||
@@ -31,13 +34,26 @@ def is_video(filename):
|
||||
return False
|
||||
|
||||
|
||||
def load(url: str, user_agent: str) -> Image:
|
||||
def _load(url: str, user_agent: str) -> Tuple[BytesIO, Optional[str]]:
|
||||
"""Initializes a `PIL.Image` from the URL."""
|
||||
out = BytesIO()
|
||||
with requests.get(url, stream=True, headers={"User-Agent": user_agent}) as resp:
|
||||
resp.raise_for_status()
|
||||
|
||||
resp.raw.decode_content = True
|
||||
return Image.open(BytesIO(resp.raw.read()))
|
||||
while 1:
|
||||
buf = resp.raw.read()
|
||||
if not buf:
|
||||
break
|
||||
out.write(buf)
|
||||
out.seek(0)
|
||||
return out, resp.headers.get("content-type")
|
||||
|
||||
|
||||
def load(url: str, user_agent: str) -> Image:
|
||||
"""Initializes a `PIL.Image` from the URL."""
|
||||
out, _ = _load(url, user_agent)
|
||||
return Image.open(out)
|
||||
|
||||
|
||||
def to_data_uri(img: Image) -> str:
|
||||
@@ -54,6 +70,7 @@ class Kind(Enum):
|
||||
ACTOR_ICON = "actor_icon"
|
||||
UPLOAD = "upload"
|
||||
OG_IMAGE = "og"
|
||||
EMOJI = "emoji"
|
||||
|
||||
|
||||
class MediaCache(object):
|
||||
@@ -173,6 +190,26 @@ class MediaCache(object):
|
||||
kind=Kind.ACTOR_ICON.value,
|
||||
)
|
||||
|
||||
def is_emoji_cached(self, url: str) -> bool:
|
||||
return bool(self.fs.find_one({"url": url, "kind": Kind.EMOJI.value}))
|
||||
|
||||
def cache_emoji(self, url: str, iri: str) -> None:
|
||||
if self.is_emoji_cached(url):
|
||||
return
|
||||
src, content_type = _load(url, self.user_agent)
|
||||
with BytesIO() as buf:
|
||||
with GzipFile(mode="wb", fileobj=buf) as g:
|
||||
copyfileobj(src, g)
|
||||
buf.seek(0)
|
||||
self.fs.put(
|
||||
buf,
|
||||
url=url,
|
||||
remote_id=iri,
|
||||
size=None,
|
||||
content_type=content_type or mimetypes.guess_type(url)[0],
|
||||
kind=Kind.EMOJI.value,
|
||||
)
|
||||
|
||||
def save_upload(self, obuf: BytesIO, filename: str) -> str:
|
||||
# Remove EXIF metadata
|
||||
if filename.lower().endswith(".jpg") or filename.lower().endswith(".jpeg"):
|
||||
|
@@ -91,6 +91,25 @@ ALLOWED_TAGS = [
|
||||
]
|
||||
|
||||
|
||||
@filters.app_template_filter()
|
||||
def replace_custom_emojis(content, note):
|
||||
print("\n" * 50)
|
||||
print("custom_replace", note)
|
||||
idx = {}
|
||||
for tag in note.get("tag", []):
|
||||
if tag.get("type") == "Emoji":
|
||||
# try:
|
||||
idx[tag["name"]] = _get_file_url(tag["icon"]["url"], None, Kind.EMOJI)
|
||||
|
||||
for emoji_name, emoji_url in idx.items():
|
||||
content = content.replace(
|
||||
emoji_name,
|
||||
f'<img class="custom-emoji" src="{emoji_url}" title="{emoji_name}" alt="{emoji_name}">',
|
||||
)
|
||||
|
||||
return content
|
||||
|
||||
|
||||
def clean_html(html):
|
||||
try:
|
||||
return bleach.clean(html, tags=ALLOWED_TAGS, strip=True)
|
||||
@@ -237,6 +256,9 @@ _FILE_URL_CACHE = LRUCache(4096)
|
||||
|
||||
|
||||
def _get_file_url(url, size, kind) -> str:
|
||||
if url.startswith(BASE_URL):
|
||||
return url
|
||||
|
||||
k = (url, size, kind)
|
||||
cached = _FILE_URL_CACHE.get(k)
|
||||
if cached:
|
||||
@@ -249,8 +271,6 @@ def _get_file_url(url, size, kind) -> str:
|
||||
return out
|
||||
|
||||
_logger.error(f"cache not available for {url}/{size}/{kind}")
|
||||
if url.startswith(BASE_URL):
|
||||
return url
|
||||
p = urlparse(url)
|
||||
return f"/p/{p.scheme}" + p._replace(scheme="").geturl()[1:]
|
||||
|
||||
|
Reference in New Issue
Block a user