Follow pagination for >500 emoji
At some point, Slack started paginating on the export page for teams with huge emoji collections. This means the export process would only detect the 500 emoji on that first page (ordered alphabetically). It now looks to other pages and exports those as well, restoring full functionality. Tested on a team with ~1300 emoji (3 pages).
This commit is contained in:
parent
f4dfff6fd9
commit
25a3405192
84
export.py
84
export.py
|
@ -3,21 +3,19 @@
|
|||
# Export emoji in a Slack team as files
|
||||
# https://github.com/smashwilson/slack-emojinator
|
||||
|
||||
import requests
|
||||
import lxml.html
|
||||
|
||||
import aiohttp
|
||||
import argparse
|
||||
import os
|
||||
import shutil
|
||||
import asyncio, aiohttp
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
from upload import _session
|
||||
import lxml.html
|
||||
import os
|
||||
from typing import List
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)-15s\t%(message)s")
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
URL = "https://{team_name}.slack.com/customize/emoji"
|
||||
BASE_URL = 'https://{team_name}.slack.com'
|
||||
EMOJI_ENDPOINT = '/customize/emoji'
|
||||
|
||||
|
||||
def _argparse():
|
||||
|
@ -46,8 +44,9 @@ def _argparse():
|
|||
args = parser.parse_args()
|
||||
return args
|
||||
|
||||
def concurrent_http_get(num_chunks: int, session: aiohttp.ClientSession):
|
||||
semaphore = asyncio.Semaphore(num_chunks)
|
||||
|
||||
def concurrent_http_get(max_concurrent: int, session: aiohttp.ClientSession):
|
||||
semaphore = asyncio.Semaphore(max_concurrent)
|
||||
|
||||
async def http_get(url, name):
|
||||
nonlocal semaphore
|
||||
|
@ -56,47 +55,74 @@ def concurrent_http_get(num_chunks: int, session: aiohttp.ClientSession):
|
|||
body = await response.content.read()
|
||||
await response.wait_for_close()
|
||||
return body, name, url
|
||||
|
||||
return http_get
|
||||
|
||||
def handle_response(response, name: str, url: str, directory: str):
|
||||
|
||||
def save_to_file(response, name: str, url: str, directory: str):
|
||||
logger.info(f"Got {name.ljust(15)} {url}")
|
||||
ext = url.split(".")[-1]
|
||||
with open(os.path.join(directory, f"{name}.{ext}"), 'wb') as out:
|
||||
out.write(response)
|
||||
|
||||
|
||||
def parse_emoji_from_page(text: str) -> List[str]:
|
||||
'''Given the text of an HTML page, retrieve a list of (relative) URLs to emoji.
|
||||
:param text Raw HTML.
|
||||
:return ['/path/to/first.png', '/path/to/second.png', ...]'''
|
||||
tree = lxml.html.fromstring(text)
|
||||
urls = tree.xpath(r'//td[@headers="custom_emoji_image"]/span/@data-original')
|
||||
return urls
|
||||
|
||||
|
||||
def _async_session(auth_cookie):
|
||||
return aiohttp.ClientSession(headers={"Cookie": auth_cookie})
|
||||
|
||||
|
||||
async def main():
|
||||
args = _argparse()
|
||||
|
||||
if not os.path.exists(args.directory):
|
||||
os.makedirs(args.directory)
|
||||
|
||||
async with _async_session(args.cookie) as session:
|
||||
endpoint = URL.format(team_name=args.team_name)
|
||||
logger.info(f"Getting {endpoint}")
|
||||
resp = await session.get(endpoint)
|
||||
async with resp:
|
||||
if resp.status != 200:
|
||||
logger.error(f"Failed to retrieve emoji list ({resp.status})")
|
||||
return
|
||||
text = await resp.text()
|
||||
tree = lxml.html.fromstring(text)
|
||||
urls = tree.xpath(r'//td[@headers="custom_emoji_image"]/span/@data-original')
|
||||
names = [u.split('/')[-2] for u in urls]
|
||||
base_url = BASE_URL.format(team_name=args.team_name)
|
||||
emoji_url = base_url + EMOJI_ENDPOINT
|
||||
|
||||
logger.info(f"Parsed {len(names)} emojis")
|
||||
assert len(names) > 0
|
||||
async with _async_session(args.cookie) as session:
|
||||
logger.info(f"Getting {emoji_url}")
|
||||
|
||||
async with session.get(emoji_url) as base_page_q:
|
||||
if base_page_q.status != 200:
|
||||
logger.error(f"Failed to retrieve emoji list ({base_page_q.status})")
|
||||
return
|
||||
text = await base_page_q.text()
|
||||
tree = lxml.html.fromstring(text)
|
||||
|
||||
emoji_urls = []
|
||||
emoji_urls.extend(parse_emoji_from_page(text))
|
||||
other_emoji_pages = [f"{base_url}{p}" for p in
|
||||
tree.xpath(r'//div[@class="pagination pagination-centered"]'
|
||||
r'/ul/li/a[.!="Next"]/@href[.!="#"]')
|
||||
if p != EMOJI_ENDPOINT]
|
||||
logger.info(f"Getting other emoji from: {other_emoji_pages}")
|
||||
for emoji_page in other_emoji_pages:
|
||||
async with session.get(f"{emoji_page}") as page:
|
||||
text = await page.text()
|
||||
emoji_urls.extend(parse_emoji_from_page(text))
|
||||
|
||||
emoji_names = [u.split('/')[-2] for u in emoji_urls]
|
||||
|
||||
logger.info(f"Parsed {len(emoji_names)} emojis")
|
||||
assert len(emoji_names) > 0
|
||||
|
||||
http_get = concurrent_http_get(args.concurrent_requests, session)
|
||||
tasks = [http_get(emoji_url, emoji_name) for emoji_name, emoji_url in zip(names, urls) if "alias" not in emoji_url]
|
||||
tasks = [http_get(emoji_url, emoji_name) for emoji_name, emoji_url in zip(emoji_names, emoji_urls)
|
||||
if "alias" not in emoji_url]
|
||||
for future in asyncio.as_completed(tasks):
|
||||
data, name, url = await future
|
||||
handle_response(data, name, url, args.directory)
|
||||
save_to_file(data, name, url, args.directory)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(main())
|
||||
|
||||
|
|
Loading…
Reference in New Issue