olm: Allow decryption functions to define how to handle unicode decode errors.

This patch changes the decryption functions not to fail if there was an
unicode decode error while converting the decrypted bytes plaintext into
a native python string.

Characters that cannot be decoded as unicode are now replaced with the
unicode replacement character (U+FFFD).

The old behaviour of raising an UnicodeDecodeError can be achieved by
passing the "strict" error handling scheme to the decrypt function.
This commit is contained in:
Damir Jelić 2019-06-18 13:46:57 +02:00
parent e1a4e6ebf1
commit 2f5590bf38
8 changed files with 81 additions and 19 deletions

View file

@ -33,7 +33,7 @@ from future.utils import bytes_to_native_str
# pylint: disable=no-name-in-module
from _libolm import ffi, lib # type: ignore
from ._compat import URANDOM, to_bytearray, to_bytes
from ._compat import URANDOM, to_bytearray, to_bytes, to_native_str
from ._finalize import track_for_finalization
@ -176,8 +176,8 @@ class InboundGroupSession(object):
raise OlmGroupSessionError(last_error)
def decrypt(self, ciphertext):
# type: (AnyStr) -> Tuple[str, int]
def decrypt(self, ciphertext, errors="replace"):
# type: (AnyStr, str) -> Tuple[str, int]
"""Decrypt a message
Returns a tuple of the decrypted plain-text and the message index of
@ -197,6 +197,13 @@ class InboundGroupSession(object):
Args:
ciphertext(str): Base64 encoded ciphertext containing the encrypted
message
unicode_errors(str, optional): The error handling scheme to use for
unicode decoding errors. The default is "replace" meaning that
the character that was unable to decode will be replaced with
the unicode replacement character (U+FFFD). Other possible
values are "strict", "ignore" and "xmlcharrefreplace" as well
as any other name registered with codecs.register_error that
can handle UnicodeEncodeErrors.
"""
if not ciphertext:
raise ValueError("Ciphertext can't be empty.")
@ -223,10 +230,10 @@ class InboundGroupSession(object):
self._check_error(plaintext_length)
plaintext = bytes_to_native_str(ffi.unpack(
plaintext_buffer,
plaintext_length
))
plaintext = to_native_str(
ffi.unpack(plaintext_buffer, plaintext_length),
errors=errors
)
# clear out copies of the plaintext
lib.memset(plaintext_buffer, 0, max_plaintext_length)

View file

@ -40,7 +40,7 @@ from future.utils import bytes_to_native_str
from _libolm import ffi, lib # type: ignore
from ._compat import URANDOM, to_bytearray
from ._compat import URANDOM, to_bytearray, to_native_str
from ._finalize import track_for_finalization
@ -313,8 +313,8 @@ class PkDecryption(object):
return obj
def decrypt(self, message):
# type (PkMessage) -> str
def decrypt(self, message, errors="replace"):
# type (PkMessage, str) -> str
"""Decrypt a previously encrypted Pk message.
Returns the decrypted plaintext.
@ -322,6 +322,13 @@ class PkDecryption(object):
Args:
message(PkMessage): the pk message to decrypt.
unicode_errors(str, optional): The error handling scheme to use for
unicode decoding errors. The default is "replace" meaning that
the character that was unable to decode will be replaced with
the unicode replacement character (U+FFFD). Other possible
values are "strict", "ignore" and "xmlcharrefreplace" as well
as any other name registered with codecs.register_error that
can handle UnicodeEncodeErrors.
"""
ephemeral_key = to_bytearray(message.ephemeral_key)
ephemeral_key_size = len(ephemeral_key)
@ -354,7 +361,7 @@ class PkDecryption(object):
# clear out copies of the plaintext
lib.memset(plaintext_buffer, 0, max_plaintext_length)
return bytes_to_native_str(plaintext)
return to_native_str(plaintext, errors=errors)
def _clear_pk_signing(pk_struct):

View file

@ -30,15 +30,15 @@ Examples:
"""
from functools import wraps
from builtins import bytes
from functools import wraps
from typing import Optional
from future.utils import bytes_to_native_str
from _libolm import ffi, lib
from ._compat import URANDOM, to_bytes, to_bytearray
from ._compat import URANDOM, to_bytearray, to_bytes
from ._finalize import track_for_finalization

View file

@ -40,7 +40,7 @@ from future.utils import bytes_to_native_str
# pylint: disable=no-name-in-module
from _libolm import ffi, lib # type: ignore
from ._compat import URANDOM, to_bytearray, to_bytes
from ._compat import URANDOM, to_bytearray, to_bytes, to_native_str
from ._finalize import track_for_finalization
# This is imported only for type checking purposes
@ -273,8 +273,8 @@ class Session(object):
else: # pragma: no cover
raise ValueError("Unknown message type")
def decrypt(self, message):
# type: (_OlmMessage) -> str
def decrypt(self, message, errors="replace"):
# type: (_OlmMessage, str) -> str
"""Decrypts a message using the session. Returns the plaintext string
on success. Raises OlmSessionError on failure. If the base64 couldn't
be decoded then the error message will be "INVALID_BASE64". If the
@ -285,7 +285,14 @@ class Session(object):
Args:
message(OlmMessage): The Olm message that will be decrypted. It can
be either an OlmPreKeyMessage or an OlmMessage.
be either an OlmPreKeyMessage or an OlmMessage.
unicode_errors(str, optional): The error handling scheme to use for
unicode decoding errors. The default is "replace" meaning that
the character that was unable to decode will be replaced with
the unicode replacement character (U+FFFD). Other possible
values are "strict", "ignore" and "xmlcharrefreplace" as well
as any other name registered with codecs.register_error that
can handle UnicodeEncodeErrors.
"""
if not message.ciphertext:
raise ValueError("Ciphertext can't be empty")
@ -311,8 +318,10 @@ class Session(object):
plaintext_buffer, max_plaintext_length
)
self._check_error(plaintext_length)
plaintext = bytes_to_native_str(
ffi.unpack(plaintext_buffer, plaintext_length))
plaintext = to_native_str(
ffi.unpack(plaintext_buffer, plaintext_length),
errors=errors
)
# clear out copies of the plaintext
lib.memset(plaintext_buffer, 0, max_plaintext_length)

View file

@ -32,6 +32,7 @@ Examples:
# pylint: disable=redefined-builtin,unused-import
from typing import AnyStr, Type
from future.utils import bytes_to_native_str
# pylint: disable=no-name-in-module

View file

@ -1,3 +1,7 @@
# -*- coding: utf-8 -*-
from builtins import bytes
import pytest
from olm import InboundGroupSession, OlmGroupSessionError, OutboundGroupSession
@ -112,3 +116,16 @@ class TestClass(object):
outbound = OutboundGroupSession()
inbound = InboundGroupSession(outbound.session_key)
del inbound
def test_invalid_unicode_decrypt(self):
outbound = OutboundGroupSession()
inbound = InboundGroupSession(outbound.session_key)
text = outbound.encrypt(bytes([0xed]))
plaintext, _ = inbound.decrypt(text)
print(plaintext)
assert plaintext == "<EFBFBD>"
plaintext, _ = inbound.decrypt(text, "ignore")
assert plaintext == ""

View file

@ -1,3 +1,6 @@
# -*- coding: utf-8 -*-
from builtins import bytes
import pytest
from olm import (PkDecryption, PkDecryptionError, PkEncryption, PkSigning,
@ -55,3 +58,10 @@ class TestClass(object):
message = "This statement is true"
signature = signing.sign(message)
ed25519_verify(signing.public_key, message, signature)
def test_invalid_unicode_decrypt(self):
decryption = PkDecryption()
encryption = PkEncryption(decryption.public_key)
message = encryption.encrypt(bytes([0xed]))
plaintext = decryption.decrypt(message)
assert plaintext == "<EFBFBD>"

View file

@ -1,3 +1,6 @@
# -*- coding: utf-8 -*-
from builtins import bytes
import pytest
from olm import (Account, InboundSession, OlmMessage, OlmPreKeyMessage,
@ -141,3 +144,11 @@ class TestClass(object):
new_message = new_session.encrypt(plaintext)
assert bob_session.matches(new_message) is False
def test_invalid_unicode_decrypt(self):
alice, bob, session = self._create_session()
message = session.encrypt(bytes([0xed]))
bob_session = InboundSession(bob, message)
plaintext = bob_session.decrypt(message)
assert plaintext == "<EFBFBD>"