336 lines
12 KiB
Python
336 lines
12 KiB
Python
# Copyright (C) 2019 Philipp Hörist <philipp AT hoerist.com>
|
|
# Copyright (C) 2015 Bahtiar `kalkin-` Gadimov <bahtiar@gadimov.de>
|
|
# Copyright (C) 2015 Daniel Gultsch <daniel@cgultsch.de>
|
|
#
|
|
# This file is part of OMEMO Gajim Plugin.
|
|
#
|
|
# OMEMO Gajim Plugin is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published
|
|
# by the Free Software Foundation; version 3 only.
|
|
#
|
|
# OMEMO Gajim Plugin is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with OMEMO Gajim Plugin. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import logging
|
|
import binascii
|
|
import threading
|
|
from enum import IntEnum, unique
|
|
from pathlib import Path
|
|
|
|
from gi.repository import GLib
|
|
from gi.repository import Gtk
|
|
from gi.repository import Gdk
|
|
|
|
from nbxmpp.namespaces import Namespace
|
|
|
|
from gajim import dialogs
|
|
from gajim.common import app, ged
|
|
from gajim.plugins import GajimPlugin
|
|
from gajim.plugins.plugins_i18n import _
|
|
from gajim.groupchat_control import GroupchatControl
|
|
|
|
AXOLOTL_MISSING = 'You are missing Python3-Axolotl or use an outdated version'
|
|
PROTOBUF_MISSING = "OMEMO can't import Google Protobuf, you can find help in " \
|
|
"the GitLab Wiki"
|
|
ERROR_MSG = ''
|
|
|
|
|
|
log = logging.getLogger('gajim.p.omemo')
|
|
if log.getEffectiveLevel() == logging.DEBUG:
|
|
log_axolotl = logging.getLogger('axolotl')
|
|
log_axolotl.setLevel(logging.DEBUG)
|
|
log_axolotl.addHandler(logging.StreamHandler())
|
|
log_axolotl.propagate = False
|
|
|
|
try:
|
|
import google.protobuf
|
|
except Exception as error:
|
|
log.error(error)
|
|
ERROR_MSG = PROTOBUF_MISSING
|
|
|
|
try:
|
|
import axolotl
|
|
except Exception as error:
|
|
log.error(error)
|
|
ERROR_MSG = AXOLOTL_MISSING
|
|
|
|
if not ERROR_MSG:
|
|
try:
|
|
from omemo.modules import omemo
|
|
from omemo import file_crypto
|
|
from omemo.gtk.key import KeyDialog
|
|
from omemo.gtk.config import OMEMOConfigDialog
|
|
from omemo.backend.aes import aes_encrypt_file
|
|
except Exception as error:
|
|
log.error(error)
|
|
ERROR_MSG = 'Error: %s' % error
|
|
|
|
|
|
@unique
|
|
class UserMessages(IntEnum):
|
|
QUERY_DEVICES = 0
|
|
NO_FINGERPRINTS = 1
|
|
UNDECIDED_FINGERPRINTS = 2
|
|
|
|
|
|
class OmemoPlugin(GajimPlugin):
|
|
def init(self):
|
|
# pylint: disable=attribute-defined-outside-init
|
|
if ERROR_MSG:
|
|
self.activatable = False
|
|
self.available_text = ERROR_MSG
|
|
self.config_dialog = None
|
|
return
|
|
self.encryption_name = 'OMEMO'
|
|
self.allow_groupchat = True
|
|
self.events_handlers = {
|
|
'omemo-new-fingerprint': (ged.PRECORE, self._on_new_fingerprints),
|
|
'signed-in': (ged.PRECORE, self._on_signed_in),
|
|
'muc-disco-update': (ged.GUI1, self._on_muc_disco_update),
|
|
'muc-joined': (ged.GUI1, self._on_muc_joined),
|
|
}
|
|
self.modules = [omemo]
|
|
self.config_dialog = OMEMOConfigDialog(self)
|
|
self.gui_extension_points = {
|
|
'hyperlink_handler': (self._file_decryption, None),
|
|
'encrypt' + self.encryption_name: (self._encrypt_message, None),
|
|
'gc_encrypt' + self.encryption_name: (
|
|
self._muc_encrypt_message, None),
|
|
'send_message' + self.encryption_name: (
|
|
self._before_sendmessage, None),
|
|
'encryption_dialog' + self.encryption_name: (
|
|
self._on_encryption_button_clicked, None),
|
|
'encryption_state' + self.encryption_name: (
|
|
self._encryption_state, None),
|
|
'update_caps': (self._update_caps, None)}
|
|
|
|
self.disabled_accounts = []
|
|
self._windows = {}
|
|
|
|
self.config_default_values = {
|
|
'DISABLED_ACCOUNTS': ([], ''),
|
|
'BLIND_TRUST': (True, ''),
|
|
'SHOW_HELP_FINGERPRINTS': (True, ''),
|
|
}
|
|
|
|
for account in self.config['DISABLED_ACCOUNTS']:
|
|
self.disabled_accounts.append(account)
|
|
|
|
self._load_css()
|
|
|
|
def _is_enabled_account(self, account):
|
|
if account in self.disabled_accounts:
|
|
return False
|
|
if account == 'Local':
|
|
return False
|
|
return True
|
|
|
|
@staticmethod
|
|
def get_omemo(account):
|
|
return app.connections[account].get_module('OMEMO')
|
|
|
|
@staticmethod
|
|
def _load_css():
|
|
path = Path(__file__).parent / 'gtk' / 'style.css'
|
|
try:
|
|
with path.open("r") as file:
|
|
css = file.read()
|
|
except Exception as exc:
|
|
log.error('Error loading css: %s', exc)
|
|
return
|
|
|
|
try:
|
|
provider = Gtk.CssProvider()
|
|
provider.load_from_data(bytes(css.encode('utf-8')))
|
|
Gtk.StyleContext.add_provider_for_screen(Gdk.Screen.get_default(),
|
|
provider, 610)
|
|
except Exception:
|
|
log.exception('Error loading application css')
|
|
|
|
def activate(self):
|
|
"""
|
|
Method called when the Plugin is activated in the PluginManager
|
|
"""
|
|
for account in app.connections:
|
|
if not self._is_enabled_account(account):
|
|
continue
|
|
self.get_omemo(account).activate()
|
|
|
|
def deactivate(self):
|
|
"""
|
|
Method called when the Plugin is deactivated in the PluginManager
|
|
"""
|
|
for account in app.connections:
|
|
if not self._is_enabled_account(account):
|
|
continue
|
|
self.get_omemo(account).deactivate()
|
|
|
|
def _on_signed_in(self, event):
|
|
account = event.conn.name
|
|
if not self._is_enabled_account(account):
|
|
return
|
|
self.get_omemo(account).on_signed_in()
|
|
|
|
def _on_muc_disco_update(self, event):
|
|
if not self._is_enabled_account(event.account):
|
|
return
|
|
self.get_omemo(event.account).on_muc_disco_update(event)
|
|
|
|
def _on_muc_joined(self, event):
|
|
if not self._is_enabled_account(event.account):
|
|
return
|
|
self.get_omemo(event.account).on_muc_joined(event)
|
|
|
|
def _update_caps(self, account, features):
|
|
if not self._is_enabled_account(account):
|
|
return
|
|
features.append('%s+notify' % Namespace.OMEMO_TEMP_DL)
|
|
|
|
@staticmethod
|
|
def activate_encryption(chat_control):
|
|
return True
|
|
|
|
def _muc_encrypt_message(self, conn, obj, callback):
|
|
account = conn.name
|
|
if not self._is_enabled_account(account):
|
|
return
|
|
self.get_omemo(account).encrypt_message(conn, obj, callback, True)
|
|
|
|
def _encrypt_message(self, conn, obj, callback):
|
|
account = conn.name
|
|
if not self._is_enabled_account(account):
|
|
return
|
|
self.get_omemo(account).encrypt_message(conn, obj, callback, False)
|
|
|
|
def _file_decryption(self, uri, instance, window):
|
|
file_crypto.FileDecryption(self).hyperlink_handler(
|
|
uri, instance, window)
|
|
|
|
def encrypt_file(self, file, _account, callback):
|
|
thread = threading.Thread(target=self._encrypt_file_thread,
|
|
args=(file, callback))
|
|
thread.daemon = True
|
|
thread.start()
|
|
|
|
@staticmethod
|
|
def _encrypt_file_thread(file, callback, *args, **kwargs):
|
|
result = aes_encrypt_file(file.get_data())
|
|
file.size = len(result.payload)
|
|
fragment = binascii.hexlify(result.iv + result.key).decode()
|
|
file.set_uri_transform_func(
|
|
lambda uri: 'aesgcm%s#%s' % (uri[5:], fragment))
|
|
file.set_encrypted_data(result.payload)
|
|
GLib.idle_add(callback, file)
|
|
|
|
@staticmethod
|
|
def _encryption_state(_chat_control, state):
|
|
state['visible'] = True
|
|
state['authenticated'] = True
|
|
|
|
def _on_encryption_button_clicked(self, chat_control):
|
|
self._show_fingerprint_window(chat_control)
|
|
|
|
def _before_sendmessage(self, chat_control):
|
|
account = chat_control.account
|
|
if not self._is_enabled_account(account):
|
|
return
|
|
contact = chat_control.contact
|
|
omemo = self.get_omemo(account)
|
|
self.new_fingerprints_available(chat_control)
|
|
if isinstance(chat_control, GroupchatControl):
|
|
room = chat_control.room_jid
|
|
if not omemo.is_omemo_groupchat(room):
|
|
dialogs.ErrorDialog(
|
|
_('Bad Configuration'),
|
|
_('To use OMEMO in a Groupchat, the Groupchat should be'
|
|
' non-anonymous and members-only.'))
|
|
chat_control.sendmessage = False
|
|
return
|
|
|
|
missing = True
|
|
for jid in omemo.backend.get_muc_members(room):
|
|
if not omemo.are_keys_missing(jid):
|
|
missing = False
|
|
if missing:
|
|
log.info('%s => No Trusted Fingerprints for %s',
|
|
account, room)
|
|
self.print_message(chat_control, UserMessages.NO_FINGERPRINTS)
|
|
chat_control.sendmessage = False
|
|
else:
|
|
# check if we have devices for the contact
|
|
if not omemo.backend.get_devices(contact.jid, without_self=True):
|
|
omemo.request_devicelist(contact.jid)
|
|
self.print_message(chat_control, UserMessages.QUERY_DEVICES)
|
|
chat_control.sendmessage = False
|
|
return
|
|
# check if bundles are missing for some devices
|
|
if omemo.backend.storage.hasUndecidedFingerprints(contact.jid):
|
|
log.info('%s => Undecided Fingerprints for %s',
|
|
account, contact.jid)
|
|
self.print_message(chat_control, UserMessages.UNDECIDED_FINGERPRINTS)
|
|
chat_control.sendmessage = False
|
|
else:
|
|
log.debug('%s => Sending Message to %s',
|
|
account, contact.jid)
|
|
|
|
def _on_new_fingerprints(self, event):
|
|
self.new_fingerprints_available(event.chat_control)
|
|
|
|
def new_fingerprints_available(self, chat_control):
|
|
jid = chat_control.contact.jid
|
|
account = chat_control.account
|
|
omemo = self.get_omemo(account)
|
|
if isinstance(chat_control, GroupchatControl):
|
|
for jid_ in omemo.backend.get_muc_members(chat_control.room_jid,
|
|
without_self=False):
|
|
fingerprints = omemo.backend.storage.getNewFingerprints(jid_)
|
|
if fingerprints:
|
|
self._show_fingerprint_window(
|
|
chat_control, fingerprints)
|
|
break
|
|
elif not isinstance(chat_control, GroupchatControl):
|
|
fingerprints = omemo.backend.storage.getNewFingerprints(jid)
|
|
if fingerprints:
|
|
self._show_fingerprint_window(
|
|
chat_control, fingerprints)
|
|
|
|
def _show_fingerprint_window(self, chat_control, fingerprints=None):
|
|
contact = chat_control.contact
|
|
account = chat_control.account
|
|
omemo = self.get_omemo(account)
|
|
transient = chat_control.parent_win.window
|
|
|
|
if 'dialog' not in self._windows:
|
|
is_groupchat = isinstance(chat_control, GroupchatControl)
|
|
self._windows['dialog'] = \
|
|
KeyDialog(self, contact, transient,
|
|
self._windows, groupchat=is_groupchat)
|
|
if fingerprints:
|
|
log.debug('%s => Showing Fingerprint Prompt for %s',
|
|
account, contact.jid)
|
|
omemo.backend.storage.setShownFingerprints(fingerprints)
|
|
else:
|
|
self._windows['dialog'].present()
|
|
self._windows['dialog'].update()
|
|
if fingerprints:
|
|
omemo.backend.storage.setShownFingerprints(fingerprints)
|
|
|
|
@staticmethod
|
|
def print_message(chat_control, kind):
|
|
msg = None
|
|
if kind == UserMessages.QUERY_DEVICES:
|
|
msg = _('No devices found. Query in progress...')
|
|
elif kind == UserMessages.NO_FINGERPRINTS:
|
|
msg = _('To send an encrypted message, you have to '
|
|
'first trust the fingerprint of your contact!')
|
|
elif kind == UserMessages.UNDECIDED_FINGERPRINTS:
|
|
msg = _('You have undecided fingerprints')
|
|
if msg is None:
|
|
return
|
|
chat_control.add_status_message(msg)
|