1
0
Fork 0
slack-emojinator/export.py

186 lines
5.6 KiB
Python
Executable File

#!/usr/bin/env python
# Export emoji in a Slack team as files
# https://github.com/smashwilson/slack-emojinator
import aiohttp
import argparse
import asyncio
import logging
import lxml.html
import os
import time
import re
from collections import namedtuple
Emoji = namedtuple('Emoji', 'url name extension')
logging.basicConfig(level=logging.INFO, format="%(asctime)-15s\t%(message)s")
logger = logging.getLogger(__name__)
BASE_URL = 'https://{team_name}.slack.com'
EMOJI_ENDPOINT = '/customize/emoji'
EMOJI_API = '/api/emoji.adminList'
API_TOKEN_REGEX = r'.*(?:\"?api_token\"?):\s*\"([^"]+)\".*'
API_TOKEN_PATTERN = re.compile(API_TOKEN_REGEX)
def _argparse():
parser = argparse.ArgumentParser(
description='Bulk import of emoji from a slack team'
)
parser.add_argument(
'directory',
help='Where do we store downloaded emoji?'
)
parser.add_argument(
'--team-name', '-t',
default=os.getenv('SLACK_TEAM'),
help='Defaults to the $SLACK_TEAM environment variable.'
)
parser.add_argument(
'--cookie', '-c',
default=os.getenv('SLACK_COOKIE'),
help='Defaults to the $SLACK_COOKIE environment variable.'
)
parser.add_argument(
'--concurrent-requests', '-r',
default=os.getenv('CONCURRENT_REQUESTS', 200),
type=int,
help='Maximum concurrent requests. Defaults to the $CONCURRENT_REQUESTS environment variable or 200.'
)
args = parser.parse_args()
return args
def concurrent_http_get(max_concurrent: int, session: aiohttp.ClientSession):
semaphore = asyncio.Semaphore(max_concurrent)
async def http_get(emoji: Emoji):
nonlocal semaphore
with (await semaphore):
response = await session.get(emoji.url)
body = await response.content.read()
await response.wait_for_close()
return emoji, body
return http_get
def save_to_file(response: bytes, emoji: Emoji, directory: str):
logger.info(f"Downloaded {emoji.name.ljust(20)} from {emoji.url}")
with open(os.path.join(directory, f"{emoji.name}.{emoji.extension}"), 'wb') as out:
out.write(response)
def _async_session(auth_cookie) -> aiohttp.ClientSession:
return aiohttp.ClientSession(headers={"Cookie": auth_cookie})
async def _fetch_api_token(session: aiohttp.ClientSession, base_url: str):
# Fetch the form first, to get an api_token.
emoji_url = base_url + EMOJI_ENDPOINT
async with session.get(emoji_url) as base_page:
if base_page.status != 200:
raise Exception(f"Failed to fetch token from '{emoji_url}', status {base_page.status}")
text = await base_page.text()
tree = lxml.html.fromstring(text)
all_scripts = tree.xpath('//script[@type=\'text/javascript\']/text()')
for script in all_scripts:
for line in script.splitlines():
if 'api_token' in line:
# api_token: "xoxs-12345-abcdefg....",
# "api_token":"xoxs-12345-abcdefg....",
match_group = API_TOKEN_PATTERN.match(line.strip())
if not match_group:
raise Exception("Could not parse API token from remote data! Regex requires updating.")
return match_group.group(1)
raise Exception("No api_token found in page")
async def _determine_all_emoji_urls(session: aiohttp.ClientSession, base_url: str, token: str):
page = 1
total_pages = None
entries = list()
while total_pages is None or page <= total_pages:
data = {
'token': token,
'page': page,
'count': 100
}
while True:
response = await session.post(base_url + EMOJI_API, data=data)
logger.info(f"loaded {response.real_url} (page {page})")
if response.status == 200:
break
if response.status != 429:
raise Exception(f"Failed to load emoji from {response.request_info.real_url} (status {response.status})")
logger.warning('Too many requests! Retrying in 5 seconds')
time.sleep(5)
json = await response.json()
for entry in json['emoji']:
url = str(entry['url'])
name = str(entry['name'])
extension = str(url.split('.')[-1])
# slack uses 0/1 to represent false/true in the API
if entry['is_alias'] != 0:
logger.info(f"Skipping emoji \"{name}\", is alias of \"{entry['alias_for']}\"")
continue
entries.append(Emoji(url, name, extension))
if total_pages is None:
total_pages = int(json['paging']['pages'])
page += 1
return entries
async def main():
args = _argparse()
if not os.path.exists(args.directory):
os.makedirs(args.directory)
base_url = BASE_URL.format(team_name=args.team_name)
async with _async_session(args.cookie) as session:
token = await _fetch_api_token(session, base_url)
emojis = await _determine_all_emoji_urls(session, base_url, token)
if len(emojis) == 0:
raise Exception('Failed to find any custom emoji')
function_http_get = concurrent_http_get(args.concurrent_requests, session)
for future in asyncio.as_completed([function_http_get(emoji) for emoji in emojis]):
emoji, data = await future
save_to_file(data, emoji, args.directory)
logger.info(f"Exported {len(emojis)} custom emoji to directory '{args.directory}'")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())