Update export.py script to handle new Slack emoji page (resolves #27)
This commit is contained in:
parent
1c2b5aae3f
commit
ea44a2a453
142
export.py
142
export.py
|
@ -9,13 +9,20 @@ import asyncio
|
|||
import logging
|
||||
import lxml.html
|
||||
import os
|
||||
from typing import List
|
||||
import re
|
||||
from collections import namedtuple
|
||||
|
||||
Emoji = namedtuple('Emoji', 'url name extension')
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)-15s\t%(message)s")
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
BASE_URL = 'https://{team_name}.slack.com'
|
||||
EMOJI_ENDPOINT = '/customize/emoji'
|
||||
EMOJI_API = '/api/emoji.adminList'
|
||||
|
||||
API_TOKEN_REGEX = r'.*(?:\"?api_token\"?):\s*\"([^"]+)\".*'
|
||||
API_TOKEN_PATTERN = re.compile(API_TOKEN_REGEX)
|
||||
|
||||
|
||||
def _argparse():
|
||||
|
@ -49,37 +56,99 @@ def _argparse():
|
|||
def concurrent_http_get(max_concurrent: int, session: aiohttp.ClientSession):
|
||||
semaphore = asyncio.Semaphore(max_concurrent)
|
||||
|
||||
async def http_get(url, name):
|
||||
async def http_get(emoji: Emoji):
|
||||
nonlocal semaphore
|
||||
with (await semaphore):
|
||||
response = await session.get(url)
|
||||
response = await session.get(emoji.url)
|
||||
body = await response.content.read()
|
||||
await response.wait_for_close()
|
||||
return body, name, url
|
||||
return emoji, body
|
||||
|
||||
return http_get
|
||||
|
||||
|
||||
def save_to_file(response, name: str, url: str, directory: str):
|
||||
logger.info(f"Got {name.ljust(15)} {url}")
|
||||
ext = url.split(".")[-1]
|
||||
with open(os.path.join(directory, f"{name}.{ext}"), 'wb') as out:
|
||||
def save_to_file(response: bytes, emoji: Emoji, directory: str):
|
||||
logger.info(f"Downloaded {emoji.name.ljust(20)} from {emoji.url}")
|
||||
with open(os.path.join(directory, f"{emoji.name}.{emoji.extension}"), 'wb') as out:
|
||||
out.write(response)
|
||||
|
||||
|
||||
def parse_emoji_from_page(text: str) -> List[str]:
|
||||
'''Given the text of an HTML page, retrieve a list of (relative) URLs to emoji.
|
||||
:param text Raw HTML.
|
||||
:return ['/path/to/first.png', '/path/to/second.png', ...]'''
|
||||
tree = lxml.html.fromstring(text)
|
||||
urls = tree.xpath(r'//td[@headers="custom_emoji_image"]/span/@data-original')
|
||||
return urls
|
||||
|
||||
|
||||
def _async_session(auth_cookie):
|
||||
def _async_session(auth_cookie) -> aiohttp.ClientSession:
|
||||
return aiohttp.ClientSession(headers={"Cookie": auth_cookie})
|
||||
|
||||
|
||||
async def _fetch_api_token(session: aiohttp.ClientSession, base_url: str):
|
||||
# Fetch the form first, to get an api_token.
|
||||
emoji_url = base_url + EMOJI_ENDPOINT
|
||||
|
||||
async with session.get(emoji_url) as base_page:
|
||||
|
||||
if base_page.status != 200:
|
||||
raise Exception(f"Failed to fetch token from '{emoji_url}', status {base_page.status}")
|
||||
|
||||
text = await base_page.text()
|
||||
tree = lxml.html.fromstring(text)
|
||||
|
||||
all_scripts = tree.xpath('//script[@type=\'text/javascript\']/text()')
|
||||
|
||||
for script in all_scripts:
|
||||
for line in script.splitlines():
|
||||
if 'api_token' in line:
|
||||
# api_token: "xoxs-12345-abcdefg....",
|
||||
# "api_token":"xoxs-12345-abcdefg....",
|
||||
match_group = API_TOKEN_PATTERN.match(line.strip())
|
||||
|
||||
if not match_group:
|
||||
raise Exception("Could not parse API token from remote data! Regex requires updating.")
|
||||
|
||||
return match_group.group(1)
|
||||
|
||||
raise Exception("No api_token found in page")
|
||||
|
||||
|
||||
async def _determine_all_emoji_urls(session: aiohttp.ClientSession, base_url: str, token: str):
|
||||
page = 1
|
||||
total_pages = None
|
||||
|
||||
entries = list()
|
||||
|
||||
while total_pages is None or page <= total_pages:
|
||||
|
||||
data = {
|
||||
'token': token,
|
||||
'page': page,
|
||||
'count': 100
|
||||
}
|
||||
|
||||
response = await session.post(base_url + EMOJI_API, data=data)
|
||||
|
||||
logger.info(f"loaded {response.real_url} (page {page})")
|
||||
|
||||
if response.status != 200:
|
||||
raise Exception(f"Failed to load emoji from {response.request_info.real_url} (status {response.status})")
|
||||
|
||||
json = await response.json()
|
||||
|
||||
for entry in json['emoji']:
|
||||
url = str(entry['url'])
|
||||
name = str(entry['name'])
|
||||
extension = str(url.split('.')[-1])
|
||||
|
||||
# slack uses 0/1 to represent false/true in the API
|
||||
if entry['is_alias'] != 0:
|
||||
logger.info(f"Skipping emoji \"{name}\", is alias of \"{entry['alias_for']}\"")
|
||||
continue
|
||||
|
||||
entries.append(Emoji(url, name, extension))
|
||||
|
||||
if total_pages is None:
|
||||
total_pages = int(json['paging']['pages'])
|
||||
|
||||
page += 1
|
||||
|
||||
return entries
|
||||
|
||||
|
||||
async def main():
|
||||
args = _argparse()
|
||||
|
||||
|
@ -87,41 +156,22 @@ async def main():
|
|||
os.makedirs(args.directory)
|
||||
|
||||
base_url = BASE_URL.format(team_name=args.team_name)
|
||||
emoji_url = base_url + EMOJI_ENDPOINT
|
||||
|
||||
async with _async_session(args.cookie) as session:
|
||||
logger.info(f"Getting {emoji_url}")
|
||||
token = await _fetch_api_token(session, base_url)
|
||||
|
||||
async with session.get(emoji_url) as base_page_q:
|
||||
if base_page_q.status != 200:
|
||||
logger.error(f"Failed to retrieve emoji list ({base_page_q.status})")
|
||||
return
|
||||
text = await base_page_q.text()
|
||||
tree = lxml.html.fromstring(text)
|
||||
emojis = await _determine_all_emoji_urls(session, base_url, token)
|
||||
|
||||
emoji_urls = []
|
||||
emoji_urls.extend(parse_emoji_from_page(text))
|
||||
other_emoji_pages = [f"{base_url}{p}" for p in
|
||||
tree.xpath(r'//div[@class="pagination pagination-centered"]'
|
||||
r'/ul/li/a[.!="Next"]/@href[.!="#"]')
|
||||
if p != EMOJI_ENDPOINT]
|
||||
logger.info(f"Getting other emoji from: {other_emoji_pages}")
|
||||
for emoji_page in other_emoji_pages:
|
||||
async with session.get(f"{emoji_page}") as page:
|
||||
text = await page.text()
|
||||
emoji_urls.extend(parse_emoji_from_page(text))
|
||||
if len(emojis) == 0:
|
||||
raise Exception('Failed to find any custom emoji')
|
||||
|
||||
emoji_names = [u.split('/')[-2] for u in emoji_urls]
|
||||
function_http_get = concurrent_http_get(args.concurrent_requests, session)
|
||||
|
||||
logger.info(f"Parsed {len(emoji_names)} emojis")
|
||||
assert len(emoji_names) > 0
|
||||
for future in asyncio.as_completed([function_http_get(emoji) for emoji in emojis]):
|
||||
emoji, data = await future
|
||||
save_to_file(data, emoji, args.directory)
|
||||
|
||||
http_get = concurrent_http_get(args.concurrent_requests, session)
|
||||
tasks = [http_get(emoji_url, emoji_name) for emoji_name, emoji_url in zip(emoji_names, emoji_urls)
|
||||
if "alias" not in emoji_url]
|
||||
for future in asyncio.as_completed(tasks):
|
||||
data, name, url = await future
|
||||
save_to_file(data, name, url, args.directory)
|
||||
logger.info(f"Exported {len(emojis)} custom emoji to directory '{args.directory}'")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
Loading…
Reference in New Issue