186 lines
5.6 KiB
Python
Executable File
186 lines
5.6 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
# Export emoji in a Slack team as files
|
|
# https://github.com/smashwilson/slack-emojinator
|
|
|
|
import aiohttp
|
|
import argparse
|
|
import asyncio
|
|
import logging
|
|
import lxml.html
|
|
import os
|
|
import time
|
|
import re
|
|
from collections import namedtuple
|
|
|
|
Emoji = namedtuple('Emoji', 'url name extension')
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)-15s\t%(message)s")
|
|
logger = logging.getLogger(__name__)
|
|
|
|
BASE_URL = 'https://{team_name}.slack.com'
|
|
EMOJI_ENDPOINT = '/customize/emoji'
|
|
EMOJI_API = '/api/emoji.adminList'
|
|
|
|
API_TOKEN_REGEX = r'.*(?:\"?api_token\"?):\s*\"([^"]+)\".*'
|
|
API_TOKEN_PATTERN = re.compile(API_TOKEN_REGEX)
|
|
|
|
|
|
def _argparse():
|
|
parser = argparse.ArgumentParser(
|
|
description='Bulk import of emoji from a slack team'
|
|
)
|
|
parser.add_argument(
|
|
'directory',
|
|
help='Where do we store downloaded emoji?'
|
|
)
|
|
parser.add_argument(
|
|
'--team-name', '-t',
|
|
default=os.getenv('SLACK_TEAM'),
|
|
help='Defaults to the $SLACK_TEAM environment variable.'
|
|
)
|
|
parser.add_argument(
|
|
'--cookie', '-c',
|
|
default=os.getenv('SLACK_COOKIE'),
|
|
help='Defaults to the $SLACK_COOKIE environment variable.'
|
|
)
|
|
parser.add_argument(
|
|
'--concurrent-requests', '-r',
|
|
default=os.getenv('CONCURRENT_REQUESTS', 200),
|
|
type=int,
|
|
help='Maximum concurrent requests. Defaults to the $CONCURRENT_REQUESTS environment variable or 200.'
|
|
)
|
|
args = parser.parse_args()
|
|
return args
|
|
|
|
|
|
def concurrent_http_get(max_concurrent: int, session: aiohttp.ClientSession):
|
|
semaphore = asyncio.Semaphore(max_concurrent)
|
|
|
|
async def http_get(emoji: Emoji):
|
|
nonlocal semaphore
|
|
with (await semaphore):
|
|
response = await session.get(emoji.url)
|
|
body = await response.content.read()
|
|
await response.wait_for_close()
|
|
return emoji, body
|
|
|
|
return http_get
|
|
|
|
|
|
def save_to_file(response: bytes, emoji: Emoji, directory: str):
|
|
logger.info(f"Downloaded {emoji.name.ljust(20)} from {emoji.url}")
|
|
with open(os.path.join(directory, f"{emoji.name}.{emoji.extension}"), 'wb') as out:
|
|
out.write(response)
|
|
|
|
|
|
def _async_session(auth_cookie) -> aiohttp.ClientSession:
|
|
return aiohttp.ClientSession(headers={"Cookie": auth_cookie})
|
|
|
|
|
|
async def _fetch_api_token(session: aiohttp.ClientSession, base_url: str):
|
|
# Fetch the form first, to get an api_token.
|
|
emoji_url = base_url + EMOJI_ENDPOINT
|
|
|
|
async with session.get(emoji_url) as base_page:
|
|
|
|
if base_page.status != 200:
|
|
raise Exception(f"Failed to fetch token from '{emoji_url}', status {base_page.status}")
|
|
|
|
text = await base_page.text()
|
|
tree = lxml.html.fromstring(text)
|
|
|
|
all_scripts = tree.xpath('//script[@type=\'text/javascript\']/text()')
|
|
|
|
for script in all_scripts:
|
|
for line in script.splitlines():
|
|
if 'api_token' in line:
|
|
# api_token: "xoxs-12345-abcdefg....",
|
|
# "api_token":"xoxs-12345-abcdefg....",
|
|
match_group = API_TOKEN_PATTERN.match(line.strip())
|
|
|
|
if not match_group:
|
|
raise Exception("Could not parse API token from remote data! Regex requires updating.")
|
|
|
|
return match_group.group(1)
|
|
|
|
raise Exception("No api_token found in page")
|
|
|
|
|
|
async def _determine_all_emoji_urls(session: aiohttp.ClientSession, base_url: str, token: str):
|
|
page = 1
|
|
total_pages = None
|
|
|
|
entries = list()
|
|
|
|
while total_pages is None or page <= total_pages:
|
|
|
|
data = {
|
|
'token': token,
|
|
'page': page,
|
|
'count': 100
|
|
}
|
|
|
|
while True:
|
|
response = await session.post(base_url + EMOJI_API, data=data)
|
|
|
|
logger.info(f"loaded {response.real_url} (page {page})")
|
|
if response.status == 200:
|
|
break
|
|
if response.status != 429:
|
|
raise Exception(f"Failed to load emoji from {response.request_info.real_url} (status {response.status})")
|
|
|
|
logger.warning('Too many requests! Retrying in 5 seconds')
|
|
time.sleep(5)
|
|
|
|
json = await response.json()
|
|
|
|
for entry in json['emoji']:
|
|
url = str(entry['url'])
|
|
name = str(entry['name'])
|
|
extension = str(url.split('.')[-1])
|
|
|
|
# slack uses 0/1 to represent false/true in the API
|
|
if entry['is_alias'] != 0:
|
|
logger.info(f"Skipping emoji \"{name}\", is alias of \"{entry['alias_for']}\"")
|
|
continue
|
|
|
|
entries.append(Emoji(url, name, extension))
|
|
|
|
if total_pages is None:
|
|
total_pages = int(json['paging']['pages'])
|
|
|
|
page += 1
|
|
|
|
return entries
|
|
|
|
|
|
async def main():
|
|
args = _argparse()
|
|
|
|
if not os.path.exists(args.directory):
|
|
os.makedirs(args.directory)
|
|
|
|
base_url = BASE_URL.format(team_name=args.team_name)
|
|
|
|
async with _async_session(args.cookie) as session:
|
|
token = await _fetch_api_token(session, base_url)
|
|
|
|
emojis = await _determine_all_emoji_urls(session, base_url, token)
|
|
|
|
if len(emojis) == 0:
|
|
raise Exception('Failed to find any custom emoji')
|
|
|
|
function_http_get = concurrent_http_get(args.concurrent_requests, session)
|
|
|
|
for future in asyncio.as_completed([function_http_get(emoji) for emoji in emojis]):
|
|
emoji, data = await future
|
|
save_to_file(data, emoji, args.directory)
|
|
|
|
logger.info(f"Exported {len(emojis)} custom emoji to directory '{args.directory}'")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
loop = asyncio.get_event_loop()
|
|
loop.run_until_complete(main())
|