Merge branch 'python/unicode_decode_errors' into 'master'
Python unicode decode errors when decrypting. See merge request matrix-org/olm!4
This commit is contained in:
commit
ae38f2c5a0
10 changed files with 98 additions and 19 deletions
|
@ -43,6 +43,9 @@ test: olm-python2 olm-python3
|
|||
PYTHONPATH=install-temp/3 python3 -m pytest --cov --cov-branch --benchmark-disable
|
||||
rm -rf install-temp
|
||||
|
||||
isort:
|
||||
isort -y -p olm
|
||||
|
||||
clean:
|
||||
rm -rf python_olm.egg-info/ dist/ __pycache__/
|
||||
rm -rf *.so _libolm.o
|
||||
|
|
|
@ -44,3 +44,24 @@ def to_bytes(string):
|
|||
return bytes(string, "utf-8")
|
||||
|
||||
raise TypeError("Invalid type {}".format(type(string)))
|
||||
|
||||
|
||||
def to_unicode_str(byte_string, errors="replace"):
|
||||
"""Turn a byte string into a unicode string.
|
||||
|
||||
Should be used everywhere where the input byte string might not be trusted
|
||||
and may contain invalid unicode values.
|
||||
|
||||
Args:
|
||||
byte_string (bytes): The bytestring that will be converted to a native
|
||||
string.
|
||||
errors (str, optional): The error handling scheme that should be used
|
||||
to handle unicode decode errors. Can be one of "strict" (raise an
|
||||
UnicodeDecodeError exception, "ignore" (remove the offending
|
||||
characters), "replace" (replace the offending character with
|
||||
U+FFFD), "xmlcharrefreplace" as well as any other name registered
|
||||
with codecs.register_error that can handle UnicodeEncodeErrors.
|
||||
|
||||
Returns the decoded native string.
|
||||
"""
|
||||
return byte_string.decode(encoding="utf-8", errors=errors)
|
||||
|
|
|
@ -33,7 +33,7 @@ from future.utils import bytes_to_native_str
|
|||
# pylint: disable=no-name-in-module
|
||||
from _libolm import ffi, lib # type: ignore
|
||||
|
||||
from ._compat import URANDOM, to_bytearray, to_bytes
|
||||
from ._compat import URANDOM, to_bytearray, to_bytes, to_unicode_str
|
||||
from ._finalize import track_for_finalization
|
||||
|
||||
|
||||
|
@ -176,8 +176,8 @@ class InboundGroupSession(object):
|
|||
|
||||
raise OlmGroupSessionError(last_error)
|
||||
|
||||
def decrypt(self, ciphertext):
|
||||
# type: (AnyStr) -> Tuple[str, int]
|
||||
def decrypt(self, ciphertext, unicode_errors="replace"):
|
||||
# type: (AnyStr, str) -> Tuple[str, int]
|
||||
"""Decrypt a message
|
||||
|
||||
Returns a tuple of the decrypted plain-text and the message index of
|
||||
|
@ -197,6 +197,13 @@ class InboundGroupSession(object):
|
|||
Args:
|
||||
ciphertext(str): Base64 encoded ciphertext containing the encrypted
|
||||
message
|
||||
unicode_errors(str, optional): The error handling scheme to use for
|
||||
unicode decoding errors. The default is "replace" meaning that
|
||||
the character that was unable to decode will be replaced with
|
||||
the unicode replacement character (U+FFFD). Other possible
|
||||
values are "strict", "ignore" and "xmlcharrefreplace" as well
|
||||
as any other name registered with codecs.register_error that
|
||||
can handle UnicodeEncodeErrors.
|
||||
"""
|
||||
if not ciphertext:
|
||||
raise ValueError("Ciphertext can't be empty.")
|
||||
|
@ -223,10 +230,10 @@ class InboundGroupSession(object):
|
|||
|
||||
self._check_error(plaintext_length)
|
||||
|
||||
plaintext = bytes_to_native_str(ffi.unpack(
|
||||
plaintext_buffer,
|
||||
plaintext_length
|
||||
))
|
||||
plaintext = to_unicode_str(
|
||||
ffi.unpack(plaintext_buffer, plaintext_length),
|
||||
errors=unicode_errors
|
||||
)
|
||||
|
||||
# clear out copies of the plaintext
|
||||
lib.memset(plaintext_buffer, 0, max_plaintext_length)
|
||||
|
|
|
@ -40,7 +40,7 @@ from future.utils import bytes_to_native_str
|
|||
|
||||
from _libolm import ffi, lib # type: ignore
|
||||
|
||||
from ._compat import URANDOM, to_bytearray
|
||||
from ._compat import URANDOM, to_bytearray, to_unicode_str
|
||||
from ._finalize import track_for_finalization
|
||||
|
||||
|
||||
|
@ -313,8 +313,8 @@ class PkDecryption(object):
|
|||
|
||||
return obj
|
||||
|
||||
def decrypt(self, message):
|
||||
# type (PkMessage) -> str
|
||||
def decrypt(self, message, unicode_errors="replace"):
|
||||
# type (PkMessage, str) -> str
|
||||
"""Decrypt a previously encrypted Pk message.
|
||||
|
||||
Returns the decrypted plaintext.
|
||||
|
@ -322,6 +322,13 @@ class PkDecryption(object):
|
|||
|
||||
Args:
|
||||
message(PkMessage): the pk message to decrypt.
|
||||
unicode_errors(str, optional): The error handling scheme to use for
|
||||
unicode decoding errors. The default is "replace" meaning that
|
||||
the character that was unable to decode will be replaced with
|
||||
the unicode replacement character (U+FFFD). Other possible
|
||||
values are "strict", "ignore" and "xmlcharrefreplace" as well
|
||||
as any other name registered with codecs.register_error that
|
||||
can handle UnicodeEncodeErrors.
|
||||
"""
|
||||
ephemeral_key = to_bytearray(message.ephemeral_key)
|
||||
ephemeral_key_size = len(ephemeral_key)
|
||||
|
@ -354,7 +361,7 @@ class PkDecryption(object):
|
|||
# clear out copies of the plaintext
|
||||
lib.memset(plaintext_buffer, 0, max_plaintext_length)
|
||||
|
||||
return bytes_to_native_str(plaintext)
|
||||
return to_unicode_str(plaintext, errors=unicode_errors)
|
||||
|
||||
|
||||
def _clear_pk_signing(pk_struct):
|
||||
|
|
|
@ -30,15 +30,15 @@ Examples:
|
|||
|
||||
"""
|
||||
|
||||
from functools import wraps
|
||||
from builtins import bytes
|
||||
from functools import wraps
|
||||
from typing import Optional
|
||||
|
||||
from future.utils import bytes_to_native_str
|
||||
|
||||
from _libolm import ffi, lib
|
||||
|
||||
from ._compat import URANDOM, to_bytes, to_bytearray
|
||||
from ._compat import URANDOM, to_bytearray, to_bytes
|
||||
from ._finalize import track_for_finalization
|
||||
|
||||
|
||||
|
|
|
@ -40,7 +40,7 @@ from future.utils import bytes_to_native_str
|
|||
# pylint: disable=no-name-in-module
|
||||
from _libolm import ffi, lib # type: ignore
|
||||
|
||||
from ._compat import URANDOM, to_bytearray, to_bytes
|
||||
from ._compat import URANDOM, to_bytearray, to_bytes, to_unicode_str
|
||||
from ._finalize import track_for_finalization
|
||||
|
||||
# This is imported only for type checking purposes
|
||||
|
@ -273,8 +273,8 @@ class Session(object):
|
|||
else: # pragma: no cover
|
||||
raise ValueError("Unknown message type")
|
||||
|
||||
def decrypt(self, message):
|
||||
# type: (_OlmMessage) -> str
|
||||
def decrypt(self, message, unicode_errors="replace"):
|
||||
# type: (_OlmMessage, str) -> str
|
||||
"""Decrypts a message using the session. Returns the plaintext string
|
||||
on success. Raises OlmSessionError on failure. If the base64 couldn't
|
||||
be decoded then the error message will be "INVALID_BASE64". If the
|
||||
|
@ -285,7 +285,14 @@ class Session(object):
|
|||
|
||||
Args:
|
||||
message(OlmMessage): The Olm message that will be decrypted. It can
|
||||
be either an OlmPreKeyMessage or an OlmMessage.
|
||||
be either an OlmPreKeyMessage or an OlmMessage.
|
||||
unicode_errors(str, optional): The error handling scheme to use for
|
||||
unicode decoding errors. The default is "replace" meaning that
|
||||
the character that was unable to decode will be replaced with
|
||||
the unicode replacement character (U+FFFD). Other possible
|
||||
values are "strict", "ignore" and "xmlcharrefreplace" as well
|
||||
as any other name registered with codecs.register_error that
|
||||
can handle UnicodeEncodeErrors.
|
||||
"""
|
||||
if not message.ciphertext:
|
||||
raise ValueError("Ciphertext can't be empty")
|
||||
|
@ -311,8 +318,10 @@ class Session(object):
|
|||
plaintext_buffer, max_plaintext_length
|
||||
)
|
||||
self._check_error(plaintext_length)
|
||||
plaintext = bytes_to_native_str(
|
||||
ffi.unpack(plaintext_buffer, plaintext_length))
|
||||
plaintext = to_unicode_str(
|
||||
ffi.unpack(plaintext_buffer, plaintext_length),
|
||||
errors=unicode_errors
|
||||
)
|
||||
|
||||
# clear out copies of the plaintext
|
||||
lib.memset(plaintext_buffer, 0, max_plaintext_length)
|
||||
|
|
|
@ -32,6 +32,7 @@ Examples:
|
|||
|
||||
# pylint: disable=redefined-builtin,unused-import
|
||||
from typing import AnyStr, Type
|
||||
|
||||
from future.utils import bytes_to_native_str
|
||||
|
||||
# pylint: disable=no-name-in-module
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
import pytest
|
||||
|
||||
from olm import InboundGroupSession, OlmGroupSessionError, OutboundGroupSession
|
||||
|
@ -112,3 +113,16 @@ class TestClass(object):
|
|||
outbound = OutboundGroupSession()
|
||||
inbound = InboundGroupSession(outbound.session_key)
|
||||
del inbound
|
||||
|
||||
def test_invalid_unicode_decrypt(self):
|
||||
outbound = OutboundGroupSession()
|
||||
inbound = InboundGroupSession(outbound.session_key)
|
||||
|
||||
text = outbound.encrypt(b"\xed")
|
||||
plaintext, _ = inbound.decrypt(text)
|
||||
|
||||
print(plaintext)
|
||||
assert plaintext == u"<EFBFBD>"
|
||||
|
||||
plaintext, _ = inbound.decrypt(text, "ignore")
|
||||
assert plaintext == ""
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
import pytest
|
||||
|
||||
from olm import (PkDecryption, PkDecryptionError, PkEncryption, PkSigning,
|
||||
|
@ -55,3 +56,10 @@ class TestClass(object):
|
|||
message = "This statement is true"
|
||||
signature = signing.sign(message)
|
||||
ed25519_verify(signing.public_key, message, signature)
|
||||
|
||||
def test_invalid_unicode_decrypt(self):
|
||||
decryption = PkDecryption()
|
||||
encryption = PkEncryption(decryption.public_key)
|
||||
message = encryption.encrypt(b"\xed")
|
||||
plaintext = decryption.decrypt(message)
|
||||
assert plaintext == u"<EFBFBD>"
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
import pytest
|
||||
|
||||
from olm import (Account, InboundSession, OlmMessage, OlmPreKeyMessage,
|
||||
|
@ -141,3 +142,11 @@ class TestClass(object):
|
|||
|
||||
new_message = new_session.encrypt(plaintext)
|
||||
assert bob_session.matches(new_message) is False
|
||||
|
||||
def test_invalid_unicode_decrypt(self):
|
||||
alice, bob, session = self._create_session()
|
||||
message = session.encrypt(b"\xed")
|
||||
|
||||
bob_session = InboundSession(bob, message)
|
||||
plaintext = bob_session.decrypt(message)
|
||||
assert plaintext == u"<EFBFBD>"
|
||||
|
|
Loading…
Reference in a new issue