overwrite buffers that may contain sensitive data

also reduce the amount of memory copying that we do
This commit is contained in:
Hubert Chathi 2018-10-16 00:31:56 -04:00
parent 357d4ff479
commit 5ef6a844d6
6 changed files with 235 additions and 127 deletions

View file

@ -3,6 +3,8 @@ all: olm-python2 olm-python3
include/olm/olm.h: ../include/olm/olm.h ../include/olm/inbound_group_session.h ../include/olm/outbound_group_session.h include/olm/olm.h: ../include/olm/olm.h ../include/olm/inbound_group_session.h ../include/olm/outbound_group_session.h
mkdir -p include/olm mkdir -p include/olm
$(CPP) -I dummy -I ../include ../include/olm/olm.h -o include/olm/olm.h $(CPP) -I dummy -I ../include ../include/olm/olm.h -o include/olm/olm.h
# add memset to the header so that we can use it to clear buffers
echo 'void *memset(void *s, int c, size_t n);' >> include/olm/olm.h
olm-python2: include/olm/olm.h olm-python2: include/olm/olm.h
DEVELOP=$(DEVELOP) python2 setup.py build DEVELOP=$(DEVELOP) python2 setup.py build

View file

@ -26,6 +26,16 @@ except ImportError: # pragma: no cover
URANDOM = urandom # type: ignore URANDOM = urandom # type: ignore
def to_bytearray(string):
# type: (AnyStr) -> bytes
if isinstance(string, bytes):
return bytearray(string)
elif isinstance(string, str):
return bytearray(string, "utf-8")
raise TypeError("Invalid type {}".format(type(string)))
def to_bytes(string): def to_bytes(string):
# type: (AnyStr) -> bytes # type: (AnyStr) -> bytes
if isinstance(string, bytes): if isinstance(string, bytes):

View file

@ -37,7 +37,7 @@ from future.utils import bytes_to_native_str
# pylint: disable=no-name-in-module # pylint: disable=no-name-in-module
from _libolm import ffi, lib # type: ignore from _libolm import ffi, lib # type: ignore
from ._compat import URANDOM, to_bytes from ._compat import URANDOM, to_bytearray
from ._finalize import track_for_finalization from ._finalize import track_for_finalization
# This is imported only for type checking purposes # This is imported only for type checking purposes
@ -82,12 +82,12 @@ class Account(object):
random_length = lib.olm_create_account_random_length(self._account) random_length = lib.olm_create_account_random_length(self._account)
random = URANDOM(random_length) random = URANDOM(random_length)
random_buffer = ffi.new("char[]", random)
self._check_error( self._check_error(
lib.olm_create_account(self._account, random_buffer, lib.olm_create_account(self._account, ffi.from_buffer(random),
random_length)) random_length))
def _check_error(self, ret): def _check_error(self, ret):
# type: (int) -> None # type: (int) -> None
if ret != lib.olm_error(): if ret != lib.olm_error():
@ -111,15 +111,23 @@ class Account(object):
passphrase(str, optional): The passphrase to be used to encrypt passphrase(str, optional): The passphrase to be used to encrypt
the account. the account.
""" """
byte_key = bytes(passphrase, "utf-8") if passphrase else b"" byte_key = bytearray(passphrase, "utf-8") if passphrase else b""
key_buffer = ffi.new("char[]", byte_key)
pickle_length = lib.olm_pickle_account_length(self._account) pickle_length = lib.olm_pickle_account_length(self._account)
pickle_buffer = ffi.new("char[]", pickle_length) pickle_buffer = ffi.new("char[]", pickle_length)
self._check_error( try:
lib.olm_pickle_account(self._account, key_buffer, len(byte_key), self._check_error(
pickle_buffer, pickle_length)) lib.olm_pickle_account(self._account,
ffi.from_buffer(byte_key),
len(byte_key),
pickle_buffer,
pickle_length))
finally:
# zero out copies of the passphrase
for i in range(0, len(byte_key)):
byte_key[i] = 0
return ffi.unpack(pickle_buffer, pickle_length) return ffi.unpack(pickle_buffer, pickle_length)
@classmethod @classmethod
@ -143,15 +151,22 @@ class Account(object):
if not pickle: if not pickle:
raise ValueError("Pickle can't be empty") raise ValueError("Pickle can't be empty")
byte_key = bytes(passphrase, "utf-8") if passphrase else b"" byte_key = bytearray(passphrase, "utf-8") if passphrase else b""
key_buffer = ffi.new("char[]", byte_key) # copy because unpickle will destroy the buffer
pickle_buffer = ffi.new("char[]", pickle) pickle_buffer = ffi.new("char[]", pickle)
obj = cls.__new__(cls) obj = cls.__new__(cls)
ret = lib.olm_unpickle_account(obj._account, key_buffer, len(byte_key), try:
pickle_buffer, len(pickle)) ret = lib.olm_unpickle_account(obj._account,
obj._check_error(ret) ffi.from_buffer(byte_key),
len(byte_key),
pickle_buffer,
len(pickle))
obj._check_error(ret)
finally:
for i in range(0, len(byte_key)):
byte_key[i] = 0
return obj return obj
@ -178,14 +193,21 @@ class Account(object):
Args: Args:
message(str): The message to sign. message(str): The message to sign.
""" """
bytes_message = to_bytes(message) bytes_message = to_bytearray(message)
out_length = lib.olm_account_signature_length(self._account) out_length = lib.olm_account_signature_length(self._account)
message_buffer = ffi.new("char[]", bytes_message)
out_buffer = ffi.new("char[]", out_length) out_buffer = ffi.new("char[]", out_length)
self._check_error( try:
lib.olm_account_sign(self._account, message_buffer, self._check_error(
len(bytes_message), out_buffer, out_length)) lib.olm_account_sign(self._account,
ffi.from_buffer(bytes_message),
len(bytes_message), out_buffer,
out_length))
finally:
# clear out copies of the message, which may be plaintext
if bytes_message is not message:
for i in range(0, len(bytes_message)):
bytes_message[i] = 0
return bytes_to_native_str(ffi.unpack(out_buffer, out_length)) return bytes_to_native_str(ffi.unpack(out_buffer, out_length))
@ -214,10 +236,10 @@ class Account(object):
random_length = lib.olm_account_generate_one_time_keys_random_length( random_length = lib.olm_account_generate_one_time_keys_random_length(
self._account, count) self._account, count)
random = URANDOM(random_length) random = URANDOM(random_length)
random_buffer = ffi.new("char[]", random)
self._check_error( self._check_error(
lib.olm_account_generate_one_time_keys( lib.olm_account_generate_one_time_keys(
self._account, count, random_buffer, random_length)) self._account, count, ffi.from_buffer(random), random_length))
@property @property
def one_time_keys(self): def one_time_keys(self):

View file

@ -33,7 +33,7 @@ from future.utils import bytes_to_native_str
# pylint: disable=no-name-in-module # pylint: disable=no-name-in-module
from _libolm import ffi, lib # type: ignore from _libolm import ffi, lib # type: ignore
from ._compat import URANDOM, to_bytes from ._compat import URANDOM, to_bytearray, to_bytes
from ._finalize import track_for_finalization from ._finalize import track_for_finalization
@ -78,12 +78,17 @@ class InboundGroupSession(object):
if False: # pragma: no cover if False: # pragma: no cover
self._session = self._session # type: ffi.cdata self._session = self._session # type: ffi.cdata
byte_session_key = to_bytes(session_key) byte_session_key = to_bytearray(session_key)
key_buffer = ffi.new("char[]", byte_session_key) try:
ret = lib.olm_init_inbound_group_session( ret = lib.olm_init_inbound_group_session(
self._session, key_buffer, len(byte_session_key) self._session,
) ffi.from_buffer(byte_session_key), len(byte_session_key)
)
finally:
if byte_session_key is not session_key:
for i in range(0, len(byte_session_key)):
byte_session_key[i] = 0
self._check_error(ret) self._check_error(ret)
def pickle(self, passphrase=""): def pickle(self, passphrase=""):
@ -98,19 +103,23 @@ class InboundGroupSession(object):
passphrase(str, optional): The passphrase to be used to encrypt passphrase(str, optional): The passphrase to be used to encrypt
the session. the session.
""" """
byte_passphrase = bytes(passphrase, "utf-8") if passphrase else b"" byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b""
passphrase_buffer = ffi.new("char[]", byte_passphrase)
pickle_length = lib.olm_pickle_inbound_group_session_length( pickle_length = lib.olm_pickle_inbound_group_session_length(
self._session) self._session)
pickle_buffer = ffi.new("char[]", pickle_length) pickle_buffer = ffi.new("char[]", pickle_length)
ret = lib.olm_pickle_inbound_group_session( try:
self._session, passphrase_buffer, len(byte_passphrase), ret = lib.olm_pickle_inbound_group_session(
pickle_buffer, pickle_length self._session,
) ffi.from_buffer(byte_passphrase), len(byte_passphrase),
pickle_buffer, pickle_length
self._check_error(ret) )
self._check_error(ret)
finally:
# clear out copies of the passphrase
for i in range(0, len(byte_passphrase)):
byte_passphrase[i] = 0
return ffi.unpack(pickle_buffer, pickle_length) return ffi.unpack(pickle_buffer, pickle_length)
@ -135,20 +144,25 @@ class InboundGroupSession(object):
if not pickle: if not pickle:
raise ValueError("Pickle can't be empty") raise ValueError("Pickle can't be empty")
byte_passphrase = bytes(passphrase, "utf-8") if passphrase else b"" byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b""
passphrase_buffer = ffi.new("char[]", byte_passphrase) # copy because unpickle will destroy the buffer
pickle_buffer = ffi.new("char[]", pickle) pickle_buffer = ffi.new("char[]", pickle)
obj = cls.__new__(cls) obj = cls.__new__(cls)
ret = lib.olm_unpickle_inbound_group_session( try:
obj._session, ret = lib.olm_unpickle_inbound_group_session(
passphrase_buffer, obj._session,
len(byte_passphrase), ffi.from_buffer(byte_passphrase),
pickle_buffer, len(byte_passphrase),
len(pickle) pickle_buffer,
) len(pickle)
obj._check_error(ret) )
obj._check_error(ret)
finally:
# clear out copies of the passphrase
for i in range(0, len(byte_passphrase)):
byte_passphrase[i] = 0
return obj return obj
@ -189,12 +203,15 @@ class InboundGroupSession(object):
byte_ciphertext = to_bytes(ciphertext) byte_ciphertext = to_bytes(ciphertext)
# copy because max_plaintext_length will destroy the buffer
ciphertext_buffer = ffi.new("char[]", byte_ciphertext) ciphertext_buffer = ffi.new("char[]", byte_ciphertext)
max_plaintext_length = lib.olm_group_decrypt_max_plaintext_length( max_plaintext_length = lib.olm_group_decrypt_max_plaintext_length(
self._session, ciphertext_buffer, len(byte_ciphertext) self._session, ciphertext_buffer, len(byte_ciphertext)
) )
self._check_error(max_plaintext_length)
plaintext_buffer = ffi.new("char[]", max_plaintext_length) plaintext_buffer = ffi.new("char[]", max_plaintext_length)
# copy because max_plaintext_length will destroy the buffer
ciphertext_buffer = ffi.new("char[]", byte_ciphertext) ciphertext_buffer = ffi.new("char[]", byte_ciphertext)
message_index = ffi.new("uint32_t*") message_index = ffi.new("uint32_t*")
@ -206,10 +223,15 @@ class InboundGroupSession(object):
self._check_error(plaintext_length) self._check_error(plaintext_length)
return bytes_to_native_str(ffi.unpack( plaintext = bytes_to_native_str(ffi.unpack(
plaintext_buffer, plaintext_buffer,
plaintext_length plaintext_length
)), message_index[0] ))
# clear out copies of the plaintext
lib.memset(plaintext_buffer, 0, max_plaintext_length)
return plaintext, message_index[0]
@property @property
def id(self): def id(self):
@ -281,15 +303,19 @@ class InboundGroupSession(object):
""" """
obj = cls.__new__(cls) obj = cls.__new__(cls)
byte_session_key = to_bytes(session_key) byte_session_key = to_bytearray(session_key)
key_buffer = ffi.new("char[]", byte_session_key) try:
ret = lib.olm_import_inbound_group_session( ret = lib.olm_import_inbound_group_session(
obj._session, obj._session,
key_buffer, ffi.from_buffer(byte_session_key),
len(byte_session_key) len(byte_session_key)
) )
obj._check_error(ret) obj._check_error(ret)
finally:
if byte_session_key is not session_key:
for i in range(0, len(byte_session_key)):
byte_session_key[i] = 0
return obj return obj
@ -323,10 +349,9 @@ class OutboundGroupSession(object):
self._session self._session
) )
random = URANDOM(random_length) random = URANDOM(random_length)
random_buffer = ffi.new("char[]", random)
ret = lib.olm_init_outbound_group_session( ret = lib.olm_init_outbound_group_session(
self._session, random_buffer, random_length self._session, ffi.from_buffer(random), random_length
) )
self._check_error(ret) self._check_error(ret)
@ -353,17 +378,23 @@ class OutboundGroupSession(object):
passphrase(str, optional): The passphrase to be used to encrypt passphrase(str, optional): The passphrase to be used to encrypt
the session. the session.
""" """
byte_passphrase = bytes(passphrase, "utf-8") if passphrase else b"" byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b""
passphrase_buffer = ffi.new("char[]", byte_passphrase)
pickle_length = lib.olm_pickle_outbound_group_session_length( pickle_length = lib.olm_pickle_outbound_group_session_length(
self._session) self._session)
pickle_buffer = ffi.new("char[]", pickle_length) pickle_buffer = ffi.new("char[]", pickle_length)
ret = lib.olm_pickle_outbound_group_session( try:
self._session, passphrase_buffer, len(byte_passphrase), ret = lib.olm_pickle_outbound_group_session(
pickle_buffer, pickle_length self._session,
) ffi.from_buffer(byte_passphrase), len(byte_passphrase),
self._check_error(ret) pickle_buffer, pickle_length
)
self._check_error(ret)
finally:
# clear out copies of the passphrase
for i in range(0, len(byte_passphrase)):
byte_passphrase[i] = 0
return ffi.unpack(pickle_buffer, pickle_length) return ffi.unpack(pickle_buffer, pickle_length)
@classmethod @classmethod
@ -387,20 +418,25 @@ class OutboundGroupSession(object):
if not pickle: if not pickle:
raise ValueError("Pickle can't be empty") raise ValueError("Pickle can't be empty")
byte_passphrase = bytes(passphrase, "utf-8") if passphrase else b"" byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b""
passphrase_buffer = ffi.new("char[]", byte_passphrase) # copy because unpickle will destroy the buffer
pickle_buffer = ffi.new("char[]", pickle) pickle_buffer = ffi.new("char[]", pickle)
obj = cls.__new__(cls) obj = cls.__new__(cls)
ret = lib.olm_unpickle_outbound_group_session( try:
obj._session, ret = lib.olm_unpickle_outbound_group_session(
passphrase_buffer, obj._session,
len(byte_passphrase), ffi.from_buffer(byte_passphrase),
pickle_buffer, len(byte_passphrase),
len(pickle) pickle_buffer,
) len(pickle)
obj._check_error(ret) )
obj._check_error(ret)
finally:
# clear out copies of the passphrase
for i in range(0, len(byte_passphrase)):
byte_passphrase[i] = 0
return obj return obj
@ -414,21 +450,26 @@ class OutboundGroupSession(object):
plaintext(str): A string that will be encrypted using the group plaintext(str): A string that will be encrypted using the group
session. session.
""" """
byte_plaintext = to_bytes(plaintext) byte_plaintext = to_bytearray(plaintext)
message_length = lib.olm_group_encrypt_message_length( message_length = lib.olm_group_encrypt_message_length(
self._session, len(byte_plaintext) self._session, len(byte_plaintext)
) )
message_buffer = ffi.new("char[]", message_length) message_buffer = ffi.new("char[]", message_length)
plaintext_buffer = ffi.new("char[]", byte_plaintext) try:
ret = lib.olm_group_encrypt(
self._session,
ffi.from_buffer(byte_plaintext), len(byte_plaintext),
message_buffer, message_length,
)
self._check_error(ret)
finally:
# clear out copies of plaintext
if byte_plaintext is not plaintext:
for i in range(0, len(byte_plaintext)):
byte_plaintext[i] = 0
ret = lib.olm_group_encrypt(
self._session,
plaintext_buffer, len(byte_plaintext),
message_buffer, message_length,
)
self._check_error(ret)
return bytes_to_native_str(ffi.unpack(message_buffer, message_length)) return bytes_to_native_str(ffi.unpack(message_buffer, message_length))
@property @property

View file

@ -40,7 +40,7 @@ from future.utils import bytes_to_native_str
# pylint: disable=no-name-in-module # pylint: disable=no-name-in-module
from _libolm import ffi, lib # type: ignore from _libolm import ffi, lib # type: ignore
from ._compat import URANDOM, to_bytes from ._compat import URANDOM, to_bytearray, to_bytes
from ._finalize import track_for_finalization from ._finalize import track_for_finalization
# This is imported only for type checking purposes # This is imported only for type checking purposes
@ -164,15 +164,22 @@ class Session(object):
passphrase(str, optional): The passphrase to be used to encrypt passphrase(str, optional): The passphrase to be used to encrypt
the session. the session.
""" """
byte_key = bytes(passphrase, "utf-8") if passphrase else b"" byte_key = bytearray(passphrase, "utf-8") if passphrase else b""
key_buffer = ffi.new("char[]", byte_key)
pickle_length = lib.olm_pickle_session_length(self._session) pickle_length = lib.olm_pickle_session_length(self._session)
pickle_buffer = ffi.new("char[]", pickle_length) pickle_buffer = ffi.new("char[]", pickle_length)
self._check_error( try:
lib.olm_pickle_session(self._session, key_buffer, len(byte_key), self._check_error(
pickle_buffer, pickle_length)) lib.olm_pickle_session(self._session,
ffi.from_buffer(byte_key),
len(byte_key),
pickle_buffer, pickle_length))
finally:
# clear out copies of the passphrase
for i in range(0, len(byte_key)):
byte_key[i] = 0
return ffi.unpack(pickle_buffer, pickle_length) return ffi.unpack(pickle_buffer, pickle_length)
@classmethod @classmethod
@ -196,16 +203,23 @@ class Session(object):
if not pickle: if not pickle:
raise ValueError("Pickle can't be empty") raise ValueError("Pickle can't be empty")
byte_key = bytes(passphrase, "utf-8") if passphrase else b"" byte_key = bytearray(passphrase, "utf-8") if passphrase else b""
key_buffer = ffi.new("char[]", byte_key) # copy because unpickle will destroy the buffer
pickle_buffer = ffi.new("char[]", pickle) pickle_buffer = ffi.new("char[]", pickle)
session = cls.__new__(cls) session = cls.__new__(cls)
ret = lib.olm_unpickle_session(session._session, key_buffer, try:
len(byte_key), pickle_buffer, ret = lib.olm_unpickle_session(session._session,
len(pickle)) ffi.from_buffer(byte_key),
session._check_error(ret) len(byte_key),
pickle_buffer,
len(pickle))
session._check_error(ret)
finally:
# clear out copies of the passphrase
for i in range(0, len(byte_key)):
byte_key[i] = 0
return session return session
@ -217,29 +231,32 @@ class Session(object):
Args: Args:
plaintext(str): The plaintext message that will be encrypted. plaintext(str): The plaintext message that will be encrypted.
""" """
byte_plaintext = to_bytes(plaintext) byte_plaintext = to_bytearray(plaintext)
r_length = lib.olm_encrypt_random_length(self._session) r_length = lib.olm_encrypt_random_length(self._session)
random = URANDOM(r_length) random = URANDOM(r_length)
random_buffer = ffi.new("char[]", random)
message_type = lib.olm_encrypt_message_type(self._session) try:
message_type = lib.olm_encrypt_message_type(self._session)
self._check_error(message_type) self._check_error(message_type)
ciphertext_length = lib.olm_encrypt_message_length( ciphertext_length = lib.olm_encrypt_message_length(
self._session, len(plaintext) self._session, len(byte_plaintext)
) )
ciphertext_buffer = ffi.new("char[]", ciphertext_length) ciphertext_buffer = ffi.new("char[]", ciphertext_length)
plaintext_buffer = ffi.new("char[]", byte_plaintext) self._check_error(lib.olm_encrypt(
self._session,
self._check_error(lib.olm_encrypt( ffi.from_buffer(byte_plaintext), len(byte_plaintext),
self._session, ffi.from_buffer(random), r_length,
plaintext_buffer, len(byte_plaintext), ciphertext_buffer, ciphertext_length,
random_buffer, r_length, ))
ciphertext_buffer, ciphertext_length, finally:
)) # clear out copies of plaintext
if byte_plaintext is not plaintext:
for i in range(0, len(byte_plaintext)):
byte_plaintext[i] = 0
if message_type == lib.OLM_MESSAGE_TYPE_PRE_KEY: if message_type == lib.OLM_MESSAGE_TYPE_PRE_KEY:
return OlmPreKeyMessage( return OlmPreKeyMessage(
@ -274,22 +291,34 @@ class Session(object):
raise ValueError("Ciphertext can't be empty") raise ValueError("Ciphertext can't be empty")
byte_ciphertext = to_bytes(message.ciphertext) byte_ciphertext = to_bytes(message.ciphertext)
# make a copy the ciphertext buffer, because
# olm_decrypt_max_plaintext_length wants to destroy something
ciphertext_buffer = ffi.new("char[]", byte_ciphertext) ciphertext_buffer = ffi.new("char[]", byte_ciphertext)
max_plaintext_length = lib.olm_decrypt_max_plaintext_length( max_plaintext_length = lib.olm_decrypt_max_plaintext_length(
self._session, message.message_type, ciphertext_buffer, self._session, message.message_type, ciphertext_buffer,
len(byte_ciphertext) len(byte_ciphertext)
) )
self._check_error(max_plaintext_length)
plaintext_buffer = ffi.new("char[]", max_plaintext_length) plaintext_buffer = ffi.new("char[]", max_plaintext_length)
# make a copy the ciphertext buffer, because
# olm_decrypt_max_plaintext_length wants to destroy something
ciphertext_buffer = ffi.new("char[]", byte_ciphertext) ciphertext_buffer = ffi.new("char[]", byte_ciphertext)
plaintext_length = lib.olm_decrypt( plaintext_length = lib.olm_decrypt(
self._session, message.message_type, ciphertext_buffer, self._session, message.message_type,
len(byte_ciphertext), plaintext_buffer, max_plaintext_length ciphertext_buffer, len(byte_ciphertext),
plaintext_buffer, max_plaintext_length
) )
self._check_error(plaintext_length) self._check_error(plaintext_length)
return bytes_to_native_str( plaintext = bytes_to_native_str(
ffi.unpack(plaintext_buffer, plaintext_length)) ffi.unpack(plaintext_buffer, plaintext_length))
# clear out copies of the plaintext
lib.memset(plaintext_buffer, 0, max_plaintext_length)
return plaintext
@property @property
def id(self): def id(self):
# type: () -> str # type: () -> str
@ -331,16 +360,16 @@ class Session(object):
ret = None ret = None
byte_ciphertext = to_bytes(message.ciphertext) byte_ciphertext = to_bytes(message.ciphertext)
# make a copy, because olm_matches_inbound_session(_from) will distroy
# it
message_buffer = ffi.new("char[]", byte_ciphertext) message_buffer = ffi.new("char[]", byte_ciphertext)
if identity_key: if identity_key:
byte_id_key = to_bytes(identity_key) byte_id_key = to_bytes(identity_key)
identity_key_buffer = ffi.new("char[]", byte_id_key)
ret = lib.olm_matches_inbound_session_from( ret = lib.olm_matches_inbound_session_from(
self._session, self._session,
identity_key_buffer, len(byte_id_key), ffi.from_buffer(byte_id_key), len(byte_id_key),
message_buffer, len(byte_ciphertext) message_buffer, len(byte_ciphertext)
) )
@ -447,14 +476,11 @@ class OutboundSession(Session):
self._session) self._session)
random = URANDOM(session_random_length) random = URANDOM(session_random_length)
random_buffer = ffi.new("char[]", random)
identity_key_buffer = ffi.new("char[]", byte_id_key)
one_time_key_buffer = ffi.new("char[]", byte_one_time)
self._check_error(lib.olm_create_outbound_session( self._check_error(lib.olm_create_outbound_session(
self._session, self._session,
account._account, account._account,
identity_key_buffer, len(byte_id_key), ffi.from_buffer(byte_id_key), len(byte_id_key),
one_time_key_buffer, len(byte_one_time), ffi.from_buffer(byte_one_time), len(byte_one_time),
random_buffer, session_random_length ffi.from_buffer(random), session_random_length
)) ))

View file

@ -36,7 +36,7 @@ from typing import AnyStr, Type
# pylint: disable=no-name-in-module # pylint: disable=no-name-in-module
from _libolm import ffi, lib # type: ignore from _libolm import ffi, lib # type: ignore
from ._compat import to_bytes from ._compat import to_bytearray, to_bytes
from ._finalize import track_for_finalization from ._finalize import track_for_finalization
@ -80,13 +80,20 @@ class _Utility(object):
cls._allocate() cls._allocate()
byte_key = to_bytes(key) byte_key = to_bytes(key)
byte_message = to_bytes(message) byte_message = to_bytearray(message)
byte_signature = to_bytes(signature) byte_signature = to_bytes(signature)
cls._check_error( try:
lib.olm_ed25519_verify(cls._utility, byte_key, len(byte_key), cls._check_error(
byte_message, len(byte_message), lib.olm_ed25519_verify(cls._utility, byte_key, len(byte_key),
byte_signature, len(byte_signature))) ffi.from_buffer(byte_message),
len(byte_message),
byte_signature, len(byte_signature)))
finally:
# clear out copies of the message, which may be a plaintext
if byte_message is not message:
for i in range(0, len(byte_message)):
byte_message[i] = 0
def ed25519_verify(key, message, signature): def ed25519_verify(key, message, signature):