diff --git a/export.py b/export.py index fc3c6b6..e2409c0 100755 --- a/export.py +++ b/export.py @@ -9,13 +9,20 @@ import asyncio import logging import lxml.html import os -from typing import List +import re +from collections import namedtuple + +Emoji = namedtuple('Emoji', 'url name extension') logging.basicConfig(level=logging.INFO, format="%(asctime)-15s\t%(message)s") logger = logging.getLogger(__name__) BASE_URL = 'https://{team_name}.slack.com' EMOJI_ENDPOINT = '/customize/emoji' +EMOJI_API = '/api/emoji.adminList' + +API_TOKEN_REGEX = r'.*(?:\"?api_token\"?):\s*\"([^"]+)\".*' +API_TOKEN_PATTERN = re.compile(API_TOKEN_REGEX) def _argparse(): @@ -49,37 +56,99 @@ def _argparse(): def concurrent_http_get(max_concurrent: int, session: aiohttp.ClientSession): semaphore = asyncio.Semaphore(max_concurrent) - async def http_get(url, name): + async def http_get(emoji: Emoji): nonlocal semaphore with (await semaphore): - response = await session.get(url) + response = await session.get(emoji.url) body = await response.content.read() await response.wait_for_close() - return body, name, url + return emoji, body return http_get -def save_to_file(response, name: str, url: str, directory: str): - logger.info(f"Got {name.ljust(15)} {url}") - ext = url.split(".")[-1] - with open(os.path.join(directory, f"{name}.{ext}"), 'wb') as out: +def save_to_file(response: bytes, emoji: Emoji, directory: str): + logger.info(f"Downloaded {emoji.name.ljust(20)} from {emoji.url}") + with open(os.path.join(directory, f"{emoji.name}.{emoji.extension}"), 'wb') as out: out.write(response) -def parse_emoji_from_page(text: str) -> List[str]: - '''Given the text of an HTML page, retrieve a list of (relative) URLs to emoji. - :param text Raw HTML. - :return ['/path/to/first.png', '/path/to/second.png', ...]''' - tree = lxml.html.fromstring(text) - urls = tree.xpath(r'//td[@headers="custom_emoji_image"]/span/@data-original') - return urls - - -def _async_session(auth_cookie): +def _async_session(auth_cookie) -> aiohttp.ClientSession: return aiohttp.ClientSession(headers={"Cookie": auth_cookie}) +async def _fetch_api_token(session: aiohttp.ClientSession, base_url: str): + # Fetch the form first, to get an api_token. + emoji_url = base_url + EMOJI_ENDPOINT + + async with session.get(emoji_url) as base_page: + + if base_page.status != 200: + raise Exception(f"Failed to fetch token from '{emoji_url}', status {base_page.status}") + + text = await base_page.text() + tree = lxml.html.fromstring(text) + + all_scripts = tree.xpath('//script[@type=\'text/javascript\']/text()') + + for script in all_scripts: + for line in script.splitlines(): + if 'api_token' in line: + # api_token: "xoxs-12345-abcdefg....", + # "api_token":"xoxs-12345-abcdefg....", + match_group = API_TOKEN_PATTERN.match(line.strip()) + + if not match_group: + raise Exception("Could not parse API token from remote data! Regex requires updating.") + + return match_group.group(1) + + raise Exception("No api_token found in page") + + +async def _determine_all_emoji_urls(session: aiohttp.ClientSession, base_url: str, token: str): + page = 1 + total_pages = None + + entries = list() + + while total_pages is None or page <= total_pages: + + data = { + 'token': token, + 'page': page, + 'count': 100 + } + + response = await session.post(base_url + EMOJI_API, data=data) + + logger.info(f"loaded {response.real_url} (page {page})") + + if response.status != 200: + raise Exception(f"Failed to load emoji from {response.request_info.real_url} (status {response.status})") + + json = await response.json() + + for entry in json['emoji']: + url = str(entry['url']) + name = str(entry['name']) + extension = str(url.split('.')[-1]) + + # slack uses 0/1 to represent false/true in the API + if entry['is_alias'] != 0: + logger.info(f"Skipping emoji \"{name}\", is alias of \"{entry['alias_for']}\"") + continue + + entries.append(Emoji(url, name, extension)) + + if total_pages is None: + total_pages = int(json['paging']['pages']) + + page += 1 + + return entries + + async def main(): args = _argparse() @@ -87,41 +156,22 @@ async def main(): os.makedirs(args.directory) base_url = BASE_URL.format(team_name=args.team_name) - emoji_url = base_url + EMOJI_ENDPOINT async with _async_session(args.cookie) as session: - logger.info(f"Getting {emoji_url}") + token = await _fetch_api_token(session, base_url) - async with session.get(emoji_url) as base_page_q: - if base_page_q.status != 200: - logger.error(f"Failed to retrieve emoji list ({base_page_q.status})") - return - text = await base_page_q.text() - tree = lxml.html.fromstring(text) + emojis = await _determine_all_emoji_urls(session, base_url, token) - emoji_urls = [] - emoji_urls.extend(parse_emoji_from_page(text)) - other_emoji_pages = [f"{base_url}{p}" for p in - tree.xpath(r'//div[@class="pagination pagination-centered"]' - r'/ul/li/a[.!="Next"]/@href[.!="#"]') - if p != EMOJI_ENDPOINT] - logger.info(f"Getting other emoji from: {other_emoji_pages}") - for emoji_page in other_emoji_pages: - async with session.get(f"{emoji_page}") as page: - text = await page.text() - emoji_urls.extend(parse_emoji_from_page(text)) + if len(emojis) == 0: + raise Exception('Failed to find any custom emoji') - emoji_names = [u.split('/')[-2] for u in emoji_urls] + function_http_get = concurrent_http_get(args.concurrent_requests, session) - logger.info(f"Parsed {len(emoji_names)} emojis") - assert len(emoji_names) > 0 + for future in asyncio.as_completed([function_http_get(emoji) for emoji in emojis]): + emoji, data = await future + save_to_file(data, emoji, args.directory) - http_get = concurrent_http_get(args.concurrent_requests, session) - tasks = [http_get(emoji_url, emoji_name) for emoji_name, emoji_url in zip(emoji_names, emoji_urls) - if "alias" not in emoji_url] - for future in asyncio.as_completed(tasks): - data, name, url = await future - save_to_file(data, name, url, args.directory) + logger.info(f"Exported {len(emojis)} custom emoji to directory '{args.directory}'") if __name__ == '__main__':