Merge pull request #68 from matrix-org/poljar-python

Poljar's improved python bindings
This commit is contained in:
Hubert Chathi 2018-10-19 11:34:11 -04:00 committed by GitHub
commit 1d880f9711
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
39 changed files with 2466 additions and 1366 deletions

View file

@ -161,6 +161,7 @@ $(RELEASE_TARGET): $(RELEASE_OBJECTS)
$(OLM_LDFLAGS) \ $(OLM_LDFLAGS) \
$(OUTPUT_OPTION) $(RELEASE_OBJECTS) $(OUTPUT_OPTION) $(RELEASE_OBJECTS)
ln -sf libolm.$(SO).$(VERSION) $(BUILD_DIR)/libolm.$(SO).$(MAJOR) ln -sf libolm.$(SO).$(VERSION) $(BUILD_DIR)/libolm.$(SO).$(MAJOR)
ln -sf libolm.$(SO).$(VERSION) $(BUILD_DIR)/libolm.$(SO)
debug: $(DEBUG_TARGET) debug: $(DEBUG_TARGET)
.PHONY: debug .PHONY: debug

View file

@ -19,8 +19,7 @@ Usage notes:
make fuzzers make fuzzers
3. Some of the tests (eg ``fuzz_decrypt`` and ``fuzz_group_decrypt``) require a 3. Some of the tests (eg ``fuzz_decrypt`` and ``fuzz_group_decrypt``) require a
session file. You can use the ones generated by the python test script session file. You can create one by pickling an Olm session.
(``python/test.sh``).
4. Make some work directories: 4. Make some work directories:

View file

@ -8,14 +8,6 @@ rm -f olm-*.tgz
make lib make lib
make test make test
virtualenv env
. env/bin/activate
pip install pyyaml
pip install pep8
./python/test_olm.sh
pep8 -v python
. ~/.emsdk_set_env.sh . ~/.emsdk_set_env.sh
make js make js
(cd javascript && npm install && npm run test) (cd javascript && npm install && npm run test)

View file

@ -1,225 +0,0 @@
b _olm_enc_input
r
l
p key
p key_lenght
p key_length
b _olm_enc_input
r
key[12]
p key[12]
p key[11]
key[11]='\0'
p key[11]='\0'
p key[11]
key_length=12
p key_length=12
n
c
b _olm_enc_input
r
r
r
b olm_decrypt
r
l
b 677
c
s
fin
s
s
fin
s
s
fin
s
l
n
l
l
s
s
n
l
n
l
p reader
p *this
n
p chain
p receiver_chains
p receiver_chains.length()
p receiver_chains.size()
p reader
p reader.ratchet_key
r
r
b olm_account_one_time_keys
r
l
s
n
p *this
p one_time_keys
p one_time_keys.length
p one_time_keys.length()
p one_time_keys.len()
p one_time_keys.size()
p one_time_keys.count()
p one_time_keys.data
p one_time_keys._data
p &one_time_keys._data
l
n
q
r
b olm_create_inbound_session
r
b olm_create_inbound_session_from
r
r
r
b olm_create_inbound_session_from
r
b olm_create_inbound_session
b olm_create_inbound_session
r
l
n
l
s
b olm_create_inbound_session
r
l
l
n
s
f
s
fin
s
s
fin
s
l
n
l
l -
l
l
l
n
p our_one_time_key
p *our_one_time_key
l
n
l
n
p bob_one_time_key
p alice_identity_key
p alice_base_key
p bob_identity_key
x alice_identity_key
x &alice_identity_key
x /32x &alice_identity_key
x /32b &alice_identity_key
l
l
l
n
b olm_decrypt
c
l
l
b 'olm::Session::decrypt'
c
l
l
n
l
n
p reader
p reader
5*128
p 5*128
p 0xb0 - 0x80
p 0xb0 - 0x80 + 640
l
n
s
l
n
p reader
n
l
n
p max_length
p reader.ciphertext_length
l
n
l
p receiver_chains
p &receiver_chains ._data
p &receiver_chains ._data[1]
n
s
s
l
n
p new_chain.index
p reader.counter
n
l
l
n
s
s
n
l
x key
x /16b key
l
l
n
p keys
_olm_crypto_aes_decrypt_cbc&keys.aes_key, &keys.aes_iv, ciphertext, ciphertext_length, plaintext
p _olm_crypto_aes_decrypt_cbc(&keys.aes_key, &keys.aes_iv, ciphertext, ciphertext_length, plaintext)
p plaintext
r
b olm_account_identity_keys
l
r
b olm_unpickle_account
r
l
n
p object.last_error
l
l -
l
b 268
r
c
s
l
l
p end-pos
x /246b pos
x /246x pos
x /82x pos+164
x /82x pos+132
pos
p pos
x /246x pos
r
r
b olm_create_outbound_session
r
n
l
p id_key_length
p ot_key_length
p olm::decode_base64_length(id_key_length)
p olm::decode_base64_length(ot_key_length)
p CURVE25519_KEY_LENGTH

15
python/.gitignore vendored
View file

@ -1,5 +1,12 @@
.coverage
.mypy_cache/
.ropeproject/
.pytest_cache/
packages/
python_olm.egg-info/
_libolm*
__pycache__
*.pyc *.pyc
/*.account .hypothesis/
/*.session .tox/
/*.group_session include/
/group_message

3
python/MANIFEST.in Normal file
View file

@ -0,0 +1,3 @@
include olm.h
include Makefile
include olm_build.py

43
python/Makefile Normal file
View file

@ -0,0 +1,43 @@
all: olm-python2 olm-python3
include/olm/olm.h: ../include/olm/olm.h ../include/olm/inbound_group_session.h ../include/olm/outbound_group_session.h
mkdir -p include/olm
$(CPP) -I dummy -I ../include ../include/olm/olm.h -o include/olm/olm.h
# add memset to the header so that we can use it to clear buffers
echo 'void *memset(void *s, int c, size_t n);' >> include/olm/olm.h
olm-python2: include/olm/olm.h
DEVELOP=$(DEVELOP) python2 setup.py build
olm-python3: include/olm/olm.h
DEVELOP=$(DEVELOP) python3 setup.py build
install: install-python2 install-python3
install-python2: olm-python2
python2 setup.py install --skip-build -O1 --root=$(DESTDIR)
install-python3: olm-python3
python3 setup.py install --skip-build -O1 --root=$(DESTDIR)
test: olm-python2 olm-python3
rm -rf install-temp
mkdir -p install-temp/2 install-temp/3
PYTHONPATH=install-temp/2 python2 setup.py install --skip-build --install-lib install-temp/2 --install-script install-temp/bin
PYTHONPATH=install-temp/3 python3 setup.py install --skip-build --install-lib install-temp/3 --install-script install-temp/bin
PYTHONPATH=install-temp/3 python3 -m pytest
PYTHONPATH=install-temp/2 python2 -m pytest
PYTHONPATH=install-temp/3 python3 -m pytest --flake8 --benchmark-disable
PYTHONPATH=install-temp/3 python3 -m pytest --isort --benchmark-disable
PYTHONPATH=install-temp/3 python3 -m pytest --cov --cov-branch --benchmark-disable
rm -rf install-temp
clean:
rm -rf python_olm.egg-info/ dist/ __pycache__/
rm -rf *.so _libolm.o
rm -rf packages/
rm -rf build/
rm -rf install-temp/
rm -rf include/
.PHONY: all olm-python2 olm-python3 install install-python2 install-python3 clean test

161
python/README.md Normal file
View file

@ -0,0 +1,161 @@
python-olm
==========
Python bindings for Olm.
The specification of the Olm cryptographic ratchet which is used for peer to
peer sessions of this library can be found [here][4].
The specification of the Megolm cryptographic ratchet which is used for group
sessions of this library can be found [here][5].
An example of the implementation of the Olm and Megolm cryptographic protocol
can be found in the Matrix protocol for which the implementation guide can be
found [here][6].
The full API reference can be found [here][7].
# Accounts
Accounts create and hold the central identity of the Olm protocol, they consist of a fingerprint and identity
key pair. They also produce one time keys that are used to start peer to peer
encrypted communication channels.
## Account Creation
A new account is created with the Account class, it creates a new Olm key pair.
The public parts of the key pair are available using the identity_keys property
of the class.
```python
>>> alice = Account()
>>> alice.identity_keys
{'curve25519': '2PytGagXercwHjzQETLcMa3JOsaU2qkPIESaqoi59zE',
'ed25519': 'HHpOuFYdHwoa54GxSttz9YmaTmbuVU3js92UTUjYJgM'}
```
## One Time keys
One time keys need to be generated before people can start an encrypted peer to
peer channel to an account.
```python
>>> alice.generate_one_time_keys(1)
>>> alice.one_time_keys
{'curve25519': {'AAAAAQ': 'KiHoW6CIy905UC4V1Frmwr3VW8bTWkBL4uWtWFFllxM'}}
```
After the one time keys are published they should be marked as such so they
aren't reused.
```python
>>> alice.mark_keys_as_published()
>>> alice.one_time_keys
{'curve25519': {}}
```
## Pickling
Accounts should be stored for later reuse, storing an account is done with the
pickle method while the restoring step is done with the from_pickle class
method.
```python
>>> pickle = alice.pickle()
>>> restored = Account.from_pickle(pickle)
```
# Sessions
Sessions are used to create an encrypted peer to peer communication channel
between two accounts.
## Session Creation
```python
>>> alice = Account()
>>> bob = Account()
>>> bob.generate_one_time_keys(1)
>>> id_key = bob.identity_keys["curve25519"]
>>> one_time = list(bob.one_time_keys["curve25519"].values())[0]
>>> alice_session = OutboundSession(alice, id_key, one_time)
```
## Encryption
After an outbound session is created an encrypted message can be exchanged:
```python
>>> message = alice_session.encrypt("It's a secret to everybody")
>>> message.ciphertext
'AwogkL7RoakT9gnjcZMra+y39WXKRmnxBPEaEp6OSueIA0cSIJxGpBoP8YZ+CGweXQ10LujbXMgK88
xG/JZMQJ5ulK9ZGiC8TYrezNYr3qyIBLlecXr/9wnegvJaSFDmWDVOcf4XfyI/AwogqIZfAklRXGC5b
ZJcZxVxQGgJ8Dz4OQII8k0Dp8msUXwQACIQvagY1dO55Qvnk5PZ2GF+wdKnvj6Zxl2g'
>>> message.message_type
0
```
After the message is transfered, bob can create an InboundSession to decrypt the
message.
```python
>>> bob_session = InboundSession(bob, message)
>>> bob_session.decrypt(message)
"It's a secret to everybody"
```
## Pickling
Sessions like accounts can be stored for later use the API is the same as for
accounts.
```python
>>> pickle = session.pickle()
>>> restored = Session.from_pickle(pickle)
```
# Group Sessions
Group Sessions are used to create a one-to-many encrypted communication channel.
The group session key needs to be shared with all participants that should be able
to decrypt the group messages. Another thing to notice is that, since the group
session key is ratcheted every time a message is encrypted, the session key should
be shared before any messages are encrypted.
## Group Session Creation
Group sessions aren't bound to an account like peer-to-peer sessions so their
creation is straightforward.
```python
>>> alice_group = OutboundGroupSession()
>>> bob_inbound_group = InboundGroupSession(alice_group.session_key)
```
## Group Encryption
Group encryption is pretty simple. The important part is to share the session
key with all participants over a secure channel (e.g. peer-to-peer Olm
sessions).
```python
>>> message = alice_group.encrypt("It's a secret to everybody")
>>> bob_inbound_group.decrypt(message)
("It's a secret to everybody", 0)
```
## Pickling
Pickling works the same way as for peer-to-peer Olm sessions.
```python
>>> pickle = session.pickle()
>>> restored = InboundGroupSession.from_pickle(pickle)
```
[1]: https://git.matrix.org/git/olm/about/
[2]: https://git.matrix.org/git/olm/tree/python?id=f8c61b8f8432d0b0b38d57f513c5048fb42f22ab
[3]: https://cffi.readthedocs.io/en/latest/
[4]: https://git.matrix.org/git/olm/about/docs/olm.rst
[5]: https://git.matrix.org/git/olm/about/docs/megolm.rst
[6]: https://matrix.org/docs/guides/e2e_implementation.html
[7]: https://poljar.github.io/python-olm/html/index.html

20
python/docs/Makefile Normal file
View file

@ -0,0 +1,20 @@
# Minimal makefile for Sphinx documentation
#
# You can set these variables from the command line.
SPHINXOPTS =
SPHINXBUILD = sphinx-build
SPHINXPROJ = olm
SOURCEDIR = .
BUILDDIR = .
# Put it first so that "make" without argument is like "make help".
help:
@$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
.PHONY: help Makefile
# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
%: Makefile
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)

165
python/docs/conf.py Normal file
View file

@ -0,0 +1,165 @@
# -*- coding: utf-8 -*-
#
# Configuration file for the Sphinx documentation builder.
#
# This file does only contain a selection of the most common options. For a
# full list see the documentation:
# http://www.sphinx-doc.org/en/master/config
# -- Path setup --------------------------------------------------------------
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#
import os
import sys
sys.path.insert(0, os.path.abspath('../'))
# -- Project information -----------------------------------------------------
project = 'python-olm'
copyright = '2018, Damir Jelić'
author = 'Damir Jelić'
# The short X.Y version
version = ''
# The full version, including alpha/beta/rc tags
release = '2.2'
# -- General configuration ---------------------------------------------------
# If your documentation needs a minimal Sphinx version, state it here.
#
# needs_sphinx = '1.0'
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = [
'sphinx.ext.autodoc',
'sphinx.ext.doctest',
'sphinx.ext.coverage',
'sphinx.ext.viewcode',
'sphinx.ext.githubpages',
'sphinx.ext.napoleon',
]
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
# The suffix(es) of source filenames.
# You can specify multiple suffix as a list of string:
#
# source_suffix = ['.rst', '.md']
source_suffix = '.rst'
# The master toctree document.
master_doc = 'index'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
#
# This is also used if you do content translation via gettext catalogs.
# Usually you set "language" from the command line for these cases.
language = None
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This pattern also affects html_static_path and html_extra_path .
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'sphinx'
# -- Options for HTML output -------------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
html_theme = 'alabaster'
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
# documentation.
#
# html_theme_options = {}
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
# Custom sidebar templates, must be a dictionary that maps document names
# to template names.
#
# The default sidebars (for documents that don't match any pattern) are
# defined by theme itself. Builtin themes are using these templates by
# default: ``['localtoc.html', 'relations.html', 'sourcelink.html',
# 'searchbox.html']``.
#
# html_sidebars = {}
# -- Options for HTMLHelp output ---------------------------------------------
# Output file base name for HTML help builder.
htmlhelp_basename = 'olmdoc'
# -- Options for LaTeX output ------------------------------------------------
latex_elements = {
# The paper size ('letterpaper' or 'a4paper').
#
# 'papersize': 'letterpaper',
# The font size ('10pt', '11pt' or '12pt').
#
# 'pointsize': '10pt',
# Additional stuff for the LaTeX preamble.
#
# 'preamble': '',
# Latex figure (float) alignment
#
# 'figure_align': 'htbp',
}
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title,
# author, documentclass [howto, manual, or own class]).
latex_documents = [
(master_doc, 'olm.tex', 'olm Documentation',
'Damir Jelić', 'manual'),
]
# -- Options for manual page output ------------------------------------------
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [
(master_doc, 'olm', 'olm Documentation',
[author], 1)
]
# -- Options for Texinfo output ----------------------------------------------
# Grouping the document tree into Texinfo files. List of tuples
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
(master_doc, 'olm', 'olm Documentation',
author, 'olm', 'One line description of project.',
'Miscellaneous'),
]
# -- Extension configuration -------------------------------------------------

1
python/docs/index.html Normal file
View file

@ -0,0 +1 @@
<meta http-equiv="refresh" content="0; url=./html/index.html" />

19
python/docs/index.rst Normal file
View file

@ -0,0 +1,19 @@
.. olm documentation master file, created by
sphinx-quickstart on Sun Jun 17 15:57:08 2018.
Welcome to olm's documentation!
===============================
.. toctree::
Olm API reference <olm.rst>
:maxdepth: 2
:caption: Contents:
Indices and tables
==================
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`

36
python/docs/make.bat Normal file
View file

@ -0,0 +1,36 @@
@ECHO OFF
pushd %~dp0
REM Command file for Sphinx documentation
if "%SPHINXBUILD%" == "" (
set SPHINXBUILD=sphinx-build
)
set SOURCEDIR=.
set BUILDDIR=_build
set SPHINXPROJ=olm
if "%1" == "" goto help
%SPHINXBUILD% >NUL 2>NUL
if errorlevel 9009 (
echo.
echo.The 'sphinx-build' command was not found. Make sure you have Sphinx
echo.installed, then set the SPHINXBUILD environment variable to point
echo.to the full path of the 'sphinx-build' executable. Alternatively you
echo.may add the Sphinx directory to PATH.
echo.
echo.If you don't have Sphinx installed, grab it from
echo.http://sphinx-doc.org/
exit /b 1
)
%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS%
goto end
:help
%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS%
:end
popd

34
python/docs/olm.rst Normal file
View file

@ -0,0 +1,34 @@
olm package
===========
olm.account module
------------------
.. automodule:: olm.account
:members:
:undoc-members:
:show-inheritance:
olm.group\_session module
-------------------------
.. automodule:: olm.group_session
:members:
:undoc-members:
:show-inheritance:
olm.session module
------------------
.. automodule:: olm.session
:members:
:undoc-members:
:show-inheritance:
olm.utility module
------------------
.. automodule:: olm.utility
:members:
:undoc-members:
:show-inheritance:

2
python/dummy/README Normal file
View file

@ -0,0 +1,2 @@
Dummy header files, so that we can generate the function list for cffi from the
olm header files.

0
python/dummy/stddef.h Normal file
View file

0
python/dummy/stdint.h Normal file
View file

View file

@ -1,5 +1,38 @@
from .account import Account # -*- coding: utf-8 -*-
from .session import Session # libolm python bindings
from .outbound_group_session import OutboundGroupSession # Copyright © 2015-2017 OpenMarket Ltd
from .inbound_group_session import InboundGroupSession # Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
from .utility import ed25519_verify #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Olm Python bindings
~~~~~~~~~~~~~~~~~~~~~
| This package implements python bindings for the libolm C library.
| © Copyright 2015-2017 by OpenMarket Ltd
| © Copyright 2018 by Damir Jelić
"""
from .utility import ed25519_verify, OlmVerifyError
from .account import Account, OlmAccountError
from .session import (
Session,
InboundSession,
OutboundSession,
OlmSessionError,
OlmMessage,
OlmPreKeyMessage
)
from .group_session import (
InboundGroupSession,
OutboundGroupSession,
OlmGroupSessionError
)

View file

@ -1,468 +0,0 @@
#!/usr/bin/env python
from __future__ import print_function
import argparse
import json
import os
import sys
import yaml
from . import *
def read_base64_file(filename):
"""Read a base64 file, dropping any CR/LF characters"""
with open(filename, "rb") as f:
return f.read().translate(None, "\r\n")
def build_arg_parser():
parser = argparse.ArgumentParser()
parser.add_argument("--key", help="Account encryption key", default="")
commands = parser.add_subparsers()
create_account = commands.add_parser("create_account",
help="Create a new account")
create_account.add_argument("account_file", help="Local account file")
def do_create_account(args):
if os.path.exists(args.account_file):
sys.stderr.write("Account %r file already exists" % (
args.account_file,
))
sys.exit(1)
account = Account()
account.create()
with open(args.account_file, "wb") as f:
f.write(account.pickle(args.key))
create_account.set_defaults(func=do_create_account)
keys = commands.add_parser("keys", help="List public keys for an account")
keys.add_argument("account_file", help="Local account file")
keys.add_argument("--json", action="store_true", help="Output as JSON")
def do_keys(args):
account = Account()
account.unpickle(args.key, read_base64_file(args.account_file))
result = {
"account_keys": account.identity_keys(),
"one_time_keys": account.one_time_keys(),
}
try:
if args.json:
json.dump(result, sys.stdout, indent=4)
else:
yaml.safe_dump(result, sys.stdout, default_flow_style=False)
except:
pass
keys.set_defaults(func=do_keys)
def do_id_key(args):
account = Account()
account.unpickle(args.key, read_base64_file(args.account_file))
print(account.identity_keys()['curve25519'])
id_key = commands.add_parser(
"identity_key",
help="Get the public part of the identity key for an account",
)
id_key.add_argument("account_file", help="Local account file")
id_key.set_defaults(func=do_id_key)
def do_signing_key(args):
account = Account()
account.unpickle(args.key, read_base64_file(args.account_file))
print(account.identity_keys()['ed25519'])
signing_key = commands.add_parser(
"signing_key",
help="Get the public part of the signing key for an account",
)
signing_key.add_argument("account_file", help="Local account file")
signing_key.set_defaults(func=do_signing_key)
def do_one_time_key(args):
account = Account()
account.unpickle(args.key, read_base64_file(args.account_file))
keys = account.one_time_keys()['curve25519'].values()
key_num = args.key_num
if key_num < 1 or key_num > len(keys):
print(
"Invalid key number %i: %i keys available" % (
key_num, len(keys),
), file=sys.stderr,
)
sys.exit(1)
print(keys[key_num-1])
one_time_key = commands.add_parser(
"one_time_key",
help="Get a one-time key for the account",
)
one_time_key.add_argument("account_file", help="Local account file")
one_time_key.add_argument("--key-num", "-n", type=int, default=1,
help="Index of key to retrieve (default: 1)")
one_time_key.set_defaults(func=do_one_time_key)
sign = commands.add_parser("sign", help="Sign a message")
sign.add_argument("account_file", help="Local account file")
sign.add_argument("message_file", help="Message to sign")
sign.add_argument("signature_file", help="Signature to output")
def do_sign(args):
account = Account()
account.unpickle(args.key, read_base64_file(args.account_file))
with open_in(args.message_file) as f:
message = f.read()
signature = account.sign(message)
with open_out(args.signature_file) as f:
f.write(signature)
sign.set_defaults(func=do_sign)
generate_keys = commands.add_parser("generate_keys",
help="Generate one time keys")
generate_keys.add_argument("account_file", help="Local account file")
generate_keys.add_argument("count", type=int,
help="Number of keys to generate")
def do_generate_keys(args):
account = Account()
account.unpickle(args.key, read_base64_file(args.account_file))
account.generate_one_time_keys(args.count)
with open(args.account_file, "wb") as f:
f.write(account.pickle(args.key))
generate_keys.set_defaults(func=do_generate_keys)
outbound = commands.add_parser("outbound",
help="Create an outbound session")
outbound.add_argument("account_file", help="Local account file")
outbound.add_argument("session_file", help="Local session file")
outbound.add_argument("identity_key", help="Remote identity key")
outbound.add_argument("one_time_key", help="Remote one time key")
def do_outbound(args):
if os.path.exists(args.session_file):
sys.stderr.write("Session %r file already exists" % (
args.session_file,
))
sys.exit(1)
account = Account()
account.unpickle(args.key, read_base64_file(args.account_file))
session = Session()
session.create_outbound(
account, args.identity_key, args.one_time_key
)
with open(args.session_file, "wb") as f:
f.write(session.pickle(args.key))
outbound.set_defaults(func=do_outbound)
def open_in(path):
if path == "-":
return sys.stdin
else:
return open(path, "rb")
def open_out(path):
if path == "-":
return sys.stdout
else:
return open(path, "wb")
inbound = commands.add_parser("inbound", help="Create an inbound session")
inbound.add_argument("account_file", help="Local account file")
inbound.add_argument("session_file", help="Local session file")
inbound.add_argument("message_file", help="Message", default="-")
inbound.add_argument("plaintext_file", help="Plaintext", default="-")
def do_inbound(args):
if os.path.exists(args.session_file):
sys.stderr.write("Session %r file already exists" % (
args.session_file,
))
sys.exit(1)
account = Account()
account.unpickle(args.key, read_base64_file(args.account_file))
with open_in(args.message_file) as f:
message_type = f.read(8)
message = f.read()
if message_type != "PRE_KEY ":
sys.stderr.write("Expecting a PRE_KEY message")
sys.exit(1)
session = Session()
session.create_inbound(account, message)
plaintext = session.decrypt(0, message)
with open(args.session_file, "wb") as f:
f.write(session.pickle(args.key))
with open_out(args.plaintext_file) as f:
f.write(plaintext)
inbound.set_defaults(func=do_inbound)
session_id = commands.add_parser("session_id", help="Session ID")
session_id.add_argument("session_file", help="Local session file")
def do_session_id(args):
session = Session()
session.unpickle(args.key, read_base64_file(args.session_file))
sys.stdout.write(session.session_id() + "\n")
session_id.set_defaults(func=do_session_id)
encrypt = commands.add_parser("encrypt", help="Encrypt a message")
encrypt.add_argument("session_file", help="Local session file")
encrypt.add_argument("plaintext_file", help="Plaintext", default="-")
encrypt.add_argument("message_file", help="Message", default="-")
def do_encrypt(args):
session = Session()
session.unpickle(args.key, read_base64_file(args.session_file))
with open_in(args.plaintext_file) as f:
plaintext = f.read()
message_type, message = session.encrypt(plaintext)
with open(args.session_file, "wb") as f:
f.write(session.pickle(args.key))
with open_out(args.message_file) as f:
f.write(["PRE_KEY ", "MESSAGE "][message_type])
f.write(message)
encrypt.set_defaults(func=do_encrypt)
decrypt = commands.add_parser("decrypt", help="Decrypt a message")
decrypt.add_argument("session_file", help="Local session file")
decrypt.add_argument("message_file", help="Message", default="-")
decrypt.add_argument("plaintext_file", help="Plaintext", default="-")
def do_decrypt(args):
session = Session()
session.unpickle(args.key, read_base64_file(args.session_file))
with open_in(args.message_file) as f:
message_type = f.read(8)
message = f.read()
if message_type not in {"PRE_KEY ", "MESSAGE "}:
sys.stderr.write("Expecting a PRE_KEY or MESSAGE message")
sys.exit(1)
message_type = 1 if message_type == "MESSAGE " else 0
plaintext = session.decrypt(message_type, message)
with open(args.session_file, "wb") as f:
f.write(session.pickle(args.key))
with open_out(args.plaintext_file) as f:
f.write(plaintext)
decrypt.set_defaults(func=do_decrypt)
outbound_group = commands.add_parser(
"outbound_group",
help="Create an outbound group session",
)
outbound_group.add_argument("session_file",
help="Local group session file")
outbound_group.set_defaults(func=do_outbound_group)
group_credentials = commands.add_parser(
"group_credentials",
help="Export the current outbound group session credentials",
)
group_credentials.add_argument(
"session_file",
help="Local outbound group session file",
)
group_credentials.add_argument(
"credentials_file",
help="File to write credentials to (default stdout)",
type=argparse.FileType('w'), nargs='?',
default=sys.stdout,
)
group_credentials.set_defaults(func=do_group_credentials)
group_encrypt = commands.add_parser(
"group_encrypt",
help="Encrypt a group message",
)
group_encrypt.add_argument("session_file",
help="Local outbound group session file")
group_encrypt.add_argument("plaintext_file",
help="Plaintext file (default stdin)",
type=argparse.FileType('rb'), nargs='?',
default=sys.stdin)
group_encrypt.add_argument("message_file",
help="Message file (default stdout)",
type=argparse.FileType('w'), nargs='?',
default=sys.stdout)
group_encrypt.set_defaults(func=do_group_encrypt)
inbound_group = commands.add_parser(
"inbound_group",
help=("Create an inbound group session based on credentials from an " +
"outbound group session"))
inbound_group.add_argument("session_file",
help="Local inbound group session file")
inbound_group.add_argument(
"credentials_file",
help="File to read credentials from (default stdin)",
type=argparse.FileType('r'), nargs='?',
default=sys.stdin,
)
inbound_group.set_defaults(func=do_inbound_group)
import_inbound_group = commands.add_parser(
"import_inbound_group",
help="Create an inbound group session based an exported inbound group"
)
import_inbound_group.add_argument("session_file",
help="Local inbound group session file")
import_inbound_group.add_argument(
"export_file",
help="File to read credentials from (default stdin)",
type=argparse.FileType('r'), nargs='?',
default=sys.stdin,
)
import_inbound_group.set_defaults(func=do_import_inbound_group)
group_decrypt = commands.add_parser("group_decrypt",
help="Decrypt a group message")
group_decrypt.add_argument("session_file",
help="Local inbound group session file")
group_decrypt.add_argument("message_file",
help="Message file (default stdin)",
type=argparse.FileType('r'), nargs='?',
default=sys.stdin)
group_decrypt.add_argument("plaintext_file",
help="Plaintext file (default stdout)",
type=argparse.FileType('wb'), nargs='?',
default=sys.stdout)
group_decrypt.set_defaults(func=do_group_decrypt)
export_inbound_group = commands.add_parser(
"export_inbound_group",
help="Export the keys for an inbound group session",
)
export_inbound_group.add_argument(
"session_file", help="Local inbound group session file",
)
export_inbound_group.add_argument(
"export_file", help="File to export to (default stdout)",
type=argparse.FileType('w'), nargs='?',
default=sys.stdout,
)
export_inbound_group.add_argument(
"--message_index",
help=("Index to export session at. Defaults to the earliest known " +
"index"),
type=int,
)
export_inbound_group.set_defaults(func=do_export_inbound_group)
ed25519_verify = commands.add_parser("ed25519_verify",
help="Verify an ed25519 signature")
ed25519_verify.add_argument(
"signing_key",
help="Public signing key used to create the signature"
)
ed25519_verify.add_argument("signature",
help="Signature to be verified")
ed25519_verify.add_argument("message_file",
help="Message file (default stdin)",
type=argparse.FileType('r'), nargs='?',
default=sys.stdin)
ed25519_verify.set_defaults(func=do_verify_ed25519_signature)
return parser
def do_outbound_group(args):
if os.path.exists(args.session_file):
sys.stderr.write("Session %r file already exists" % (
args.session_file,
))
sys.exit(1)
session = OutboundGroupSession()
with open(args.session_file, "wb") as f:
f.write(session.pickle(args.key))
def do_group_encrypt(args):
session = OutboundGroupSession()
session.unpickle(args.key, read_base64_file(args.session_file))
plaintext = args.plaintext_file.read()
message = session.encrypt(plaintext)
with open(args.session_file, "wb") as f:
f.write(session.pickle(args.key))
args.message_file.write(message)
def do_group_credentials(args):
session = OutboundGroupSession()
session.unpickle(args.key, read_base64_file(args.session_file))
result = {
'message_index': session.message_index(),
'session_key': session.session_key(),
}
json.dump(result, args.credentials_file, indent=4)
def do_inbound_group(args):
if os.path.exists(args.session_file):
sys.stderr.write("Session %r file already exists\n" % (
args.session_file,
))
sys.exit(1)
credentials = json.load(args.credentials_file)
for k in ('session_key', ):
if k not in credentials:
sys.stderr.write("Credentials file is missing %s\n" % k)
sys.exit(1)
session = InboundGroupSession()
session.init(credentials['session_key'])
with open(args.session_file, "wb") as f:
f.write(session.pickle(args.key))
def do_import_inbound_group(args):
if os.path.exists(args.session_file):
sys.stderr.write("Session %r file already exists\n" % (
args.session_file,
))
sys.exit(1)
data = args.export_file.read().translate(None, "\r\n")
session = InboundGroupSession()
session.import_session(data)
with open(args.session_file, "wb") as f:
f.write(session.pickle(args.key))
def do_group_decrypt(args):
session = InboundGroupSession()
session.unpickle(args.key, read_base64_file(args.session_file))
message = args.message_file.read()
plaintext, message_index = session.decrypt(message)
with open(args.session_file, "wb") as f:
f.write(session.pickle(args.key))
args.plaintext_file.write(plaintext)
def do_export_inbound_group(args):
session = InboundGroupSession()
session.unpickle(args.key, read_base64_file(args.session_file))
index = args.message_index
if index is None:
# default to first known index
index = session.first_known_index()
args.export_file.write(session.export_session(index))
def do_verify_ed25519_signature(args):
message = args.message_file.read()
ed25519_verify(args.signing_key, message, args.signature)
if __name__ == '__main__':
parser = build_arg_parser()
args = parser.parse_args()
args.func(args)

View file

@ -0,0 +1,9 @@
__title__ = "python-olm"
__description__ = ("python CFFI bindings for the olm "
"cryptographic ratchet library")
__url__ = "https://github.com/poljar/python-olm"
__version__ = "0.1"
__author__ = "Damir Jelić"
__author_email__ = "poljar@termina.org.uk"
__license__ = "Apache 2.0"
__copyright__ = "Copyright 2018 Damir Jelić"

View file

@ -1,17 +0,0 @@
import os.path
from ctypes import *
lib = cdll.LoadLibrary(os.path.join(
os.path.dirname(__file__), "..", "..", "build", "libolm.so.2")
)
lib.olm_error.argtypes = []
lib.olm_error.restypes = c_size_t
ERR = lib.olm_error()
class OlmError(Exception):
pass

46
python/olm/_compat.py Normal file
View file

@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
# libolm python bindings
# Copyright © 2015-2017 OpenMarket Ltd
# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from builtins import bytes, str
from typing import AnyStr
try:
import secrets
URANDOM = secrets.token_bytes # pragma: no cover
except ImportError: # pragma: no cover
from os import urandom
URANDOM = urandom # type: ignore
def to_bytearray(string):
# type: (AnyStr) -> bytes
if isinstance(string, bytes):
return bytearray(string)
elif isinstance(string, str):
return bytearray(string, "utf-8")
raise TypeError("Invalid type {}".format(type(string)))
def to_bytes(string):
# type: (AnyStr) -> bytes
if isinstance(string, bytes):
return string
elif isinstance(string, str):
return bytes(string, "utf-8")
raise TypeError("Invalid type {}".format(type(string)))

65
python/olm/_finalize.py Normal file
View file

@ -0,0 +1,65 @@
# The MIT License (MIT)
# Copyright (c) 2010 Benjamin Peterson <benjamin@python.org>
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
# OR OTHER DEALINGS IN THE SOFTWARE.
"""Finalization with weakrefs
This is designed for avoiding __del__.
"""
from __future__ import print_function
import sys
import traceback
import weakref
__author__ = "Benjamin Peterson <benjamin@python.org>"
class OwnerRef(weakref.ref):
"""A simple weakref.ref subclass, so attributes can be added."""
pass
def _run_finalizer(ref):
"""Internal weakref callback to run finalizers"""
del _finalize_refs[id(ref)]
finalizer = ref.finalizer
item = ref.item
try:
finalizer(item)
except Exception: # pragma: no cover
print("Exception running {}:".format(finalizer), file=sys.stderr)
traceback.print_exc()
_finalize_refs = {}
def track_for_finalization(owner, item, finalizer):
"""Register an object for finalization.
``owner`` is the the object which is responsible for ``item``.
``finalizer`` will be called with ``item`` as its only argument when
``owner`` is destroyed by the garbage collector.
"""
ref = OwnerRef(owner, _run_finalizer)
ref.item = item
ref.finalizer = finalizer
_finalize_refs[id(ref)] = ref

View file

@ -1,136 +1,271 @@
# -*- coding: utf-8 -*-
# libolm python bindings
# Copyright © 2015-2017 OpenMarket Ltd
# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""libolm Account module.
This module contains the account part of the Olm library. It contains a single
Account class which handles the creation of new accounts as well as the storing
and restoring of them.
Examples:
>>> acc = Account()
>>> account.identity_keys()
>>> account.generate_one_time_keys(1)
"""
import json import json
from os import urandom # pylint: disable=redefined-builtin,unused-import
from builtins import bytes, super
from typing import AnyStr, Dict, Optional, Type
from ._base import * from future.utils import bytes_to_native_str
lib.olm_account_size.argtypes = [] # pylint: disable=no-name-in-module
lib.olm_account_size.restype = c_size_t from _libolm import ffi, lib # type: ignore
lib.olm_account.argtypes = [c_void_p] from ._compat import URANDOM, to_bytearray
lib.olm_account.restype = c_void_p from ._finalize import track_for_finalization
lib.olm_account_last_error.argtypes = [c_void_p] # This is imported only for type checking purposes
lib.olm_account_last_error.restype = c_char_p if False:
from .session import Session # pragma: no cover
def account_errcheck(res, func, args): def _clear_account(account):
if res == ERR: # type: (ffi.cdata) -> None
raise OlmError("%s: %s" % ( lib.olm_clear_account(account)
func.__name__, lib.olm_account_last_error(args[0])
))
return res
def account_function(func, *types): class OlmAccountError(Exception):
func.argtypes = (c_void_p,) + types """libolm Account error exception."""
func.restypes = c_size_t
func.errcheck = account_errcheck
account_function(
lib.olm_pickle_account, c_void_p, c_size_t, c_void_p, c_size_t
)
account_function(
lib.olm_unpickle_account, c_void_p, c_size_t, c_void_p, c_size_t
)
account_function(lib.olm_create_account_random_length)
account_function(lib.olm_create_account, c_void_p, c_size_t)
account_function(lib.olm_account_identity_keys_length)
account_function(lib.olm_account_identity_keys, c_void_p, c_size_t)
account_function(lib.olm_account_signature_length)
account_function(lib.olm_account_sign, c_void_p, c_size_t, c_void_p, c_size_t)
account_function(lib.olm_account_one_time_keys_length)
account_function(lib.olm_account_one_time_keys, c_void_p, c_size_t)
account_function(lib.olm_account_mark_keys_as_published)
account_function(lib.olm_account_max_number_of_one_time_keys)
account_function(lib.olm_pickle_account_length)
account_function(
lib.olm_account_generate_one_time_keys_random_length,
c_size_t
)
account_function(
lib.olm_account_generate_one_time_keys,
c_size_t,
c_void_p, c_size_t
)
account_function(
lib.olm_remove_one_time_keys,
c_void_p # Session
)
class Account(object): class Account(object):
"""libolm Account class."""
def __new__(cls):
# type: (Type[Account]) -> Account
obj = super().__new__(cls)
obj._buf = ffi.new("char[]", lib.olm_account_size())
obj._account = lib.olm_account(obj._buf)
track_for_finalization(obj, obj._account, _clear_account)
return obj
def __init__(self): def __init__(self):
self.buf = create_string_buffer(lib.olm_account_size()) # type: () -> None
self.ptr = lib.olm_account(self.buf) """Create a new Olm account.
def create(self): Creates a new account and its matching identity key pair.
random_length = lib.olm_create_account_random_length(self.ptr)
random = urandom(random_length)
random_buffer = create_string_buffer(random)
lib.olm_create_account(self.ptr, random_buffer, random_length)
def pickle(self, key): Raises OlmAccountError on failure. If there weren't enough random bytes
key_buffer = create_string_buffer(key) for the account creation the error message for the exception will be
pickle_length = lib.olm_pickle_account_length(self.ptr) NOT_ENOUGH_RANDOM.
pickle_buffer = create_string_buffer(pickle_length) """
lib.olm_pickle_account( # This is needed to silence mypy not knowing the type of _account.
self.ptr, key_buffer, len(key), pickle_buffer, pickle_length # There has to be a better way for this.
) if False: # pragma: no cover
return pickle_buffer.raw self._account = self._account # type: ffi.cdata
def unpickle(self, key, pickle): random_length = lib.olm_create_account_random_length(self._account)
key_buffer = create_string_buffer(key) random = URANDOM(random_length)
pickle_buffer = create_string_buffer(pickle)
lib.olm_unpickle_account(
self.ptr, key_buffer, len(key), pickle_buffer, len(pickle)
)
self._check_error(
lib.olm_create_account(self._account, ffi.from_buffer(random),
random_length))
def _check_error(self, ret):
# type: (int) -> None
if ret != lib.olm_error():
return
last_error = bytes_to_native_str(
ffi.string((lib.olm_account_last_error(self._account))))
raise OlmAccountError(last_error)
def pickle(self, passphrase=""):
# type: (Optional[str]) -> bytes
"""Store an Olm account.
Stores an account as a base64 string. Encrypts the account using the
supplied passphrase. Returns a byte object containing the base64
encoded string of the pickled account. Raises OlmAccountError on
failure.
Args:
passphrase(str, optional): The passphrase to be used to encrypt
the account.
"""
byte_key = bytearray(passphrase, "utf-8") if passphrase else b""
pickle_length = lib.olm_pickle_account_length(self._account)
pickle_buffer = ffi.new("char[]", pickle_length)
try:
self._check_error(
lib.olm_pickle_account(self._account,
ffi.from_buffer(byte_key),
len(byte_key),
pickle_buffer,
pickle_length))
finally:
# zero out copies of the passphrase
for i in range(0, len(byte_key)):
byte_key[i] = 0
return ffi.unpack(pickle_buffer, pickle_length)
@classmethod
def from_pickle(cls, pickle, passphrase=""):
# type: (bytes, Optional[str]) -> Account
"""Load a previously stored olm account.
Loads an account from a pickled base64-encoded string and returns an
Account object. Decrypts the account using the supplied passphrase.
Raises OlmAccountError on failure. If the passphrase doesn't match the
one used to encrypt the account then the error message for the
exception will be "BAD_ACCOUNT_KEY". If the base64 couldn't be decoded
then the error message will be "INVALID_BASE64".
Args:
pickle(bytes): Base64 encoded byte string containing the pickled
account
passphrase(str, optional): The passphrase used to encrypt the
account.
"""
if not pickle:
raise ValueError("Pickle can't be empty")
byte_key = bytearray(passphrase, "utf-8") if passphrase else b""
# copy because unpickle will destroy the buffer
pickle_buffer = ffi.new("char[]", pickle)
obj = cls.__new__(cls)
try:
ret = lib.olm_unpickle_account(obj._account,
ffi.from_buffer(byte_key),
len(byte_key),
pickle_buffer,
len(pickle))
obj._check_error(ret)
finally:
for i in range(0, len(byte_key)):
byte_key[i] = 0
return obj
@property
def identity_keys(self): def identity_keys(self):
out_length = lib.olm_account_identity_keys_length(self.ptr) # type: () -> Dict[str, str]
out_buffer = create_string_buffer(out_length) """dict: Public part of the identity keys of the account."""
lib.olm_account_identity_keys( out_length = lib.olm_account_identity_keys_length(self._account)
self.ptr, out_buffer = ffi.new("char[]", out_length)
out_buffer, out_length
) self._check_error(
return json.loads(out_buffer.raw) lib.olm_account_identity_keys(self._account, out_buffer,
out_length))
return json.loads(ffi.unpack(out_buffer, out_length).decode("utf-8"))
def sign(self, message): def sign(self, message):
out_length = lib.olm_account_signature_length(self.ptr) # type: (AnyStr) -> str
message_buffer = create_string_buffer(message) """Signs a message with this account.
out_buffer = create_string_buffer(out_length)
lib.olm_account_sign(
self.ptr, message_buffer, len(message), out_buffer, out_length
)
return out_buffer.raw
def one_time_keys(self): Signs a message with the private ed25519 identity key of this account.
out_length = lib.olm_account_one_time_keys_length(self.ptr) Returns the signature.
out_buffer = create_string_buffer(out_length) Raises OlmAccountError on failure.
lib.olm_account_one_time_keys(self.ptr, out_buffer, out_length)
return json.loads(out_buffer.raw) Args:
message(str): The message to sign.
"""
bytes_message = to_bytearray(message)
out_length = lib.olm_account_signature_length(self._account)
out_buffer = ffi.new("char[]", out_length)
try:
self._check_error(
lib.olm_account_sign(self._account,
ffi.from_buffer(bytes_message),
len(bytes_message), out_buffer,
out_length))
finally:
# clear out copies of the message, which may be plaintext
if bytes_message is not message:
for i in range(0, len(bytes_message)):
bytes_message[i] = 0
return bytes_to_native_str(ffi.unpack(out_buffer, out_length))
@property
def max_one_time_keys(self):
# type: () -> int
"""int: The maximum number of one-time keys the account can store."""
return lib.olm_account_max_number_of_one_time_keys(self._account)
def mark_keys_as_published(self): def mark_keys_as_published(self):
lib.olm_account_mark_keys_as_published(self.ptr) # type: () -> None
"""Mark the current set of one-time keys as being published."""
def max_number_of_one_time_keys(self): lib.olm_account_mark_keys_as_published(self._account)
return lib.olm_account_max_number_of_one_time_keys(self.ptr)
def generate_one_time_keys(self, count): def generate_one_time_keys(self, count):
# type: (int) -> None
"""Generate a number of new one-time keys.
If the total number of keys stored by this account exceeds
max_one_time_keys() then the old keys are discarded.
Raises OlmAccountError on error.
Args:
count(int): The number of keys to generate.
"""
random_length = lib.olm_account_generate_one_time_keys_random_length( random_length = lib.olm_account_generate_one_time_keys_random_length(
self.ptr, count self._account, count)
) random = URANDOM(random_length)
random = urandom(random_length)
random_buffer = create_string_buffer(random) self._check_error(
lib.olm_account_generate_one_time_keys( lib.olm_account_generate_one_time_keys(
self.ptr, count, random_buffer, random_length self._account, count, ffi.from_buffer(random), random_length))
)
@property
def one_time_keys(self):
# type: () -> Dict[str, Dict[str, str]]
"""dict: The public part of the one-time keys for this account."""
out_length = lib.olm_account_one_time_keys_length(self._account)
out_buffer = ffi.new("char[]", out_length)
self._check_error(
lib.olm_account_one_time_keys(self._account, out_buffer,
out_length))
return json.loads(ffi.unpack(out_buffer, out_length).decode("utf-8"))
def remove_one_time_keys(self, session): def remove_one_time_keys(self, session):
lib.olm_remove_one_time_keys( # type: (Session) -> None
self.ptr, """Remove used one-time keys.
session.ptr
)
def clear(self): Removes the one-time keys that the session used from the account.
pass Raises OlmAccountError on failure. If the account doesn't have any
matching one-time keys then the error message of the exception will be
"BAD_MESSAGE_KEY_ID".
Args:
session(Session): An Olm Session object that was created with this
account.
"""
self._check_error(lib.olm_remove_one_time_keys(self._account,
session._session))

525
python/olm/group_session.py Normal file
View file

@ -0,0 +1,525 @@
# -*- coding: utf-8 -*-
# libolm python bindings
# Copyright © 2015-2017 OpenMarket Ltd
# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""libolm Group session module.
This module contains the group session part of the Olm library. It contains two
classes for creating inbound and outbound group sessions.
Examples:
>>> outbound = OutboundGroupSession()
>>> InboundGroupSession(outbound.session_key)
"""
# pylint: disable=redefined-builtin,unused-import
from builtins import bytes, super
from typing import AnyStr, Optional, Tuple, Type
from future.utils import bytes_to_native_str
# pylint: disable=no-name-in-module
from _libolm import ffi, lib # type: ignore
from ._compat import URANDOM, to_bytearray, to_bytes
from ._finalize import track_for_finalization
def _clear_inbound_group_session(session):
# type: (ffi.cdata) -> None
lib.olm_clear_inbound_group_session(session)
def _clear_outbound_group_session(session):
# type: (ffi.cdata) -> None
lib.olm_clear_outbound_group_session(session)
class OlmGroupSessionError(Exception):
"""libolm Group session error exception."""
class InboundGroupSession(object):
"""Inbound group session for encrypted multiuser communication."""
def __new__(
cls, # type: Type[InboundGroupSession]
session_key=None # type: Optional[str]
):
# type: (...) -> InboundGroupSession
obj = super().__new__(cls)
obj._buf = ffi.new("char[]", lib.olm_inbound_group_session_size())
obj._session = lib.olm_inbound_group_session(obj._buf)
track_for_finalization(obj, obj._session, _clear_inbound_group_session)
return obj
def __init__(self, session_key):
# type: (AnyStr) -> None
"""Create a new inbound group session.
Start a new inbound group session, from a key exported from
an outbound group session.
Raises OlmGroupSessionError on failure. The error message of the
exception will be "OLM_INVALID_BASE64" if the session key is not valid
base64 and "OLM_BAD_SESSION_KEY" if the session key is invalid.
"""
if False: # pragma: no cover
self._session = self._session # type: ffi.cdata
byte_session_key = to_bytearray(session_key)
try:
ret = lib.olm_init_inbound_group_session(
self._session,
ffi.from_buffer(byte_session_key), len(byte_session_key)
)
finally:
if byte_session_key is not session_key:
for i in range(0, len(byte_session_key)):
byte_session_key[i] = 0
self._check_error(ret)
def pickle(self, passphrase=""):
# type: (Optional[str]) -> bytes
"""Store an inbound group session.
Stores a group session as a base64 string. Encrypts the session using
the supplied passphrase. Returns a byte object containing the base64
encoded string of the pickled session.
Args:
passphrase(str, optional): The passphrase to be used to encrypt
the session.
"""
byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b""
pickle_length = lib.olm_pickle_inbound_group_session_length(
self._session)
pickle_buffer = ffi.new("char[]", pickle_length)
try:
ret = lib.olm_pickle_inbound_group_session(
self._session,
ffi.from_buffer(byte_passphrase), len(byte_passphrase),
pickle_buffer, pickle_length
)
self._check_error(ret)
finally:
# clear out copies of the passphrase
for i in range(0, len(byte_passphrase)):
byte_passphrase[i] = 0
return ffi.unpack(pickle_buffer, pickle_length)
@classmethod
def from_pickle(cls, pickle, passphrase=""):
# type: (bytes, Optional[str]) -> InboundGroupSession
"""Load a previously stored inbound group session.
Loads an inbound group session from a pickled base64 string and returns
an InboundGroupSession object. Decrypts the session using the supplied
passphrase. Raises OlmSessionError on failure. If the passphrase
doesn't match the one used to encrypt the session then the error
message for the exception will be "BAD_ACCOUNT_KEY". If the base64
couldn't be decoded then the error message will be "INVALID_BASE64".
Args:
pickle(bytes): Base64 encoded byte string containing the pickled
session
passphrase(str, optional): The passphrase used to encrypt the
session
"""
if not pickle:
raise ValueError("Pickle can't be empty")
byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b""
# copy because unpickle will destroy the buffer
pickle_buffer = ffi.new("char[]", pickle)
obj = cls.__new__(cls)
try:
ret = lib.olm_unpickle_inbound_group_session(
obj._session,
ffi.from_buffer(byte_passphrase),
len(byte_passphrase),
pickle_buffer,
len(pickle)
)
obj._check_error(ret)
finally:
# clear out copies of the passphrase
for i in range(0, len(byte_passphrase)):
byte_passphrase[i] = 0
return obj
def _check_error(self, ret):
# type: (int) -> None
if ret != lib.olm_error():
return
last_error = bytes_to_native_str(ffi.string(
lib.olm_inbound_group_session_last_error(self._session)))
raise OlmGroupSessionError(last_error)
def decrypt(self, ciphertext):
# type: (AnyStr) -> Tuple[str, int]
"""Decrypt a message
Returns a tuple of the decrypted plain-text and the message index of
the decrypted message or raises OlmGroupSessionError on failure.
On failure the error message of the exception will be:
* OLM_INVALID_BASE64 if the message is not valid base64
* OLM_BAD_MESSAGE_VERSION if the message was encrypted with an
unsupported version of the protocol
* OLM_BAD_MESSAGE_FORMAT if the message headers could not be
decoded
* OLM_BAD_MESSAGE_MAC if the message could not be verified
* OLM_UNKNOWN_MESSAGE_INDEX if we do not have a session key
corresponding to the message's index (i.e., it was sent before
the session key was shared with us)
Args:
ciphertext(str): Base64 encoded ciphertext containing the encrypted
message
"""
if not ciphertext:
raise ValueError("Ciphertext can't be empty.")
byte_ciphertext = to_bytes(ciphertext)
# copy because max_plaintext_length will destroy the buffer
ciphertext_buffer = ffi.new("char[]", byte_ciphertext)
max_plaintext_length = lib.olm_group_decrypt_max_plaintext_length(
self._session, ciphertext_buffer, len(byte_ciphertext)
)
self._check_error(max_plaintext_length)
plaintext_buffer = ffi.new("char[]", max_plaintext_length)
# copy because max_plaintext_length will destroy the buffer
ciphertext_buffer = ffi.new("char[]", byte_ciphertext)
message_index = ffi.new("uint32_t*")
plaintext_length = lib.olm_group_decrypt(
self._session, ciphertext_buffer, len(byte_ciphertext),
plaintext_buffer, max_plaintext_length,
message_index
)
self._check_error(plaintext_length)
plaintext = bytes_to_native_str(ffi.unpack(
plaintext_buffer,
plaintext_length
))
# clear out copies of the plaintext
lib.memset(plaintext_buffer, 0, max_plaintext_length)
return plaintext, message_index[0]
@property
def id(self):
# type: () -> str
"""str: A base64 encoded identifier for this session."""
id_length = lib.olm_inbound_group_session_id_length(self._session)
id_buffer = ffi.new("char[]", id_length)
ret = lib.olm_inbound_group_session_id(
self._session,
id_buffer,
id_length
)
self._check_error(ret)
return bytes_to_native_str(ffi.unpack(id_buffer, id_length))
@property
def first_known_index(self):
# type: () -> int
"""int: The first message index we know how to decrypt."""
return lib.olm_inbound_group_session_first_known_index(self._session)
def export_session(self, message_index):
# type: (int) -> str
"""Export an inbound group session
Export the base64-encoded ratchet key for this session, at the given
index, in a format which can be used by import_session().
Raises OlmGroupSessionError on failure. The error message for the
exception will be:
* OLM_UNKNOWN_MESSAGE_INDEX if we do not have a session key
corresponding to the given index (ie, it was sent before the
session key was shared with us)
Args:
message_index(int): The message index at which the session should
be exported.
"""
export_length = lib.olm_export_inbound_group_session_length(
self._session)
export_buffer = ffi.new("char[]", export_length)
ret = lib.olm_export_inbound_group_session(
self._session,
export_buffer,
export_length,
message_index
)
self._check_error(ret)
export_str = bytes_to_native_str(ffi.unpack(export_buffer, export_length))
# clear out copies of the key
lib.memset(export_buffer, 0, export_length)
return export_str
@classmethod
def import_session(cls, session_key):
# type: (AnyStr) -> InboundGroupSession
"""Create an InboundGroupSession from an exported session key.
Creates an InboundGroupSession with an previously exported session key,
raises OlmGroupSessionError on failure. The error message for the
exception will be:
* OLM_INVALID_BASE64 if the session_key is not valid base64
* OLM_BAD_SESSION_KEY if the session_key is invalid
Args:
session_key(str): The exported session key with which the inbound
group session will be created
"""
obj = cls.__new__(cls)
byte_session_key = to_bytearray(session_key)
try:
ret = lib.olm_import_inbound_group_session(
obj._session,
ffi.from_buffer(byte_session_key),
len(byte_session_key)
)
obj._check_error(ret)
finally:
# clear out copies of the key
if byte_session_key is not session_key:
for i in range(0, len(byte_session_key)):
byte_session_key[i] = 0
return obj
class OutboundGroupSession(object):
"""Outbound group session for encrypted multiuser communication."""
def __new__(cls):
# type: (Type[OutboundGroupSession]) -> OutboundGroupSession
obj = super().__new__(cls)
obj._buf = ffi.new("char[]", lib.olm_outbound_group_session_size())
obj._session = lib.olm_outbound_group_session(obj._buf)
track_for_finalization(
obj,
obj._session,
_clear_outbound_group_session
)
return obj
def __init__(self):
# type: () -> None
"""Create a new outbound group session.
Start a new outbound group session. Raises OlmGroupSessionError on
failure.
"""
if False: # pragma: no cover
self._session = self._session # type: ffi.cdata
random_length = lib.olm_init_outbound_group_session_random_length(
self._session
)
random = URANDOM(random_length)
ret = lib.olm_init_outbound_group_session(
self._session, ffi.from_buffer(random), random_length
)
self._check_error(ret)
def _check_error(self, ret):
# type: (int) -> None
if ret != lib.olm_error():
return
last_error = bytes_to_native_str(ffi.string(
lib.olm_outbound_group_session_last_error(self._session)
))
raise OlmGroupSessionError(last_error)
def pickle(self, passphrase=""):
# type: (Optional[str]) -> bytes
"""Store an outbound group session.
Stores a group session as a base64 string. Encrypts the session using
the supplied passphrase. Returns a byte object containing the base64
encoded string of the pickled session.
Args:
passphrase(str, optional): The passphrase to be used to encrypt
the session.
"""
byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b""
pickle_length = lib.olm_pickle_outbound_group_session_length(
self._session)
pickle_buffer = ffi.new("char[]", pickle_length)
try:
ret = lib.olm_pickle_outbound_group_session(
self._session,
ffi.from_buffer(byte_passphrase), len(byte_passphrase),
pickle_buffer, pickle_length
)
self._check_error(ret)
finally:
# clear out copies of the passphrase
for i in range(0, len(byte_passphrase)):
byte_passphrase[i] = 0
return ffi.unpack(pickle_buffer, pickle_length)
@classmethod
def from_pickle(cls, pickle, passphrase=""):
# type: (bytes, Optional[str]) -> OutboundGroupSession
"""Load a previously stored outbound group session.
Loads an outbound group session from a pickled base64 string and
returns an OutboundGroupSession object. Decrypts the session using the
supplied passphrase. Raises OlmSessionError on failure. If the
passphrase doesn't match the one used to encrypt the session then the
error message for the exception will be "BAD_ACCOUNT_KEY". If the
base64 couldn't be decoded then the error message will be
"INVALID_BASE64".
Args:
pickle(bytes): Base64 encoded byte string containing the pickled
session
passphrase(str, optional): The passphrase used to encrypt the
"""
if not pickle:
raise ValueError("Pickle can't be empty")
byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b""
# copy because unpickle will destroy the buffer
pickle_buffer = ffi.new("char[]", pickle)
obj = cls.__new__(cls)
try:
ret = lib.olm_unpickle_outbound_group_session(
obj._session,
ffi.from_buffer(byte_passphrase),
len(byte_passphrase),
pickle_buffer,
len(pickle)
)
obj._check_error(ret)
finally:
# clear out copies of the passphrase
for i in range(0, len(byte_passphrase)):
byte_passphrase[i] = 0
return obj
def encrypt(self, plaintext):
# type: (AnyStr) -> str
"""Encrypt a message.
Returns the encrypted ciphertext.
Args:
plaintext(str): A string that will be encrypted using the group
session.
"""
byte_plaintext = to_bytearray(plaintext)
message_length = lib.olm_group_encrypt_message_length(
self._session, len(byte_plaintext)
)
message_buffer = ffi.new("char[]", message_length)
try:
ret = lib.olm_group_encrypt(
self._session,
ffi.from_buffer(byte_plaintext), len(byte_plaintext),
message_buffer, message_length,
)
self._check_error(ret)
finally:
# clear out copies of plaintext
if byte_plaintext is not plaintext:
for i in range(0, len(byte_plaintext)):
byte_plaintext[i] = 0
return bytes_to_native_str(ffi.unpack(message_buffer, message_length))
@property
def id(self):
# type: () -> str
"""str: A base64 encoded identifier for this session."""
id_length = lib.olm_outbound_group_session_id_length(self._session)
id_buffer = ffi.new("char[]", id_length)
ret = lib.olm_outbound_group_session_id(
self._session,
id_buffer,
id_length
)
self._check_error(ret)
return bytes_to_native_str(ffi.unpack(id_buffer, id_length))
@property
def message_index(self):
# type: () -> int
"""int: The current message index of the session.
Each message is encrypted with an increasing index. This is the index
for the next message.
"""
return lib.olm_outbound_group_session_message_index(self._session)
@property
def session_key(self):
# type: () -> str
"""The base64-encoded current ratchet key for this session.
Each message is encrypted with a different ratchet key. This function
returns the ratchet key that will be used for the next message.
"""
key_length = lib.olm_outbound_group_session_key_length(self._session)
key_buffer = ffi.new("char[]", key_length)
ret = lib.olm_outbound_group_session_key(
self._session,
key_buffer,
key_length
)
self._check_error(ret)
return bytes_to_native_str(ffi.unpack(key_buffer, key_length))

View file

@ -1,138 +0,0 @@
import json
from ._base import *
lib.olm_inbound_group_session_size.argtypes = []
lib.olm_inbound_group_session_size.restype = c_size_t
lib.olm_inbound_group_session.argtypes = [c_void_p]
lib.olm_inbound_group_session.restype = c_void_p
lib.olm_inbound_group_session_last_error.argtypes = [c_void_p]
lib.olm_inbound_group_session_last_error.restype = c_char_p
def inbound_group_session_errcheck(res, func, args):
if res == ERR:
raise OlmError("%s: %s" % (
func.__name__, lib.olm_inbound_group_session_last_error(args[0])
))
return res
def inbound_group_session_function(func, *types):
func.argtypes = (c_void_p,) + types
func.restypes = c_size_t
func.errcheck = inbound_group_session_errcheck
inbound_group_session_function(
lib.olm_pickle_inbound_group_session,
c_void_p, c_size_t, c_void_p, c_size_t,
)
inbound_group_session_function(
lib.olm_unpickle_inbound_group_session,
c_void_p, c_size_t, c_void_p, c_size_t,
)
inbound_group_session_function(
lib.olm_init_inbound_group_session, c_void_p, c_size_t
)
inbound_group_session_function(
lib.olm_import_inbound_group_session, c_void_p, c_size_t
)
inbound_group_session_function(
lib.olm_group_decrypt_max_plaintext_length, c_void_p, c_size_t
)
inbound_group_session_function(
lib.olm_group_decrypt,
c_void_p, c_size_t, # message
c_void_p, c_size_t, # plaintext
POINTER(c_uint32), # message_index
)
inbound_group_session_function(
lib.olm_inbound_group_session_id_length,
)
inbound_group_session_function(
lib.olm_inbound_group_session_id,
c_void_p, c_size_t,
)
lib.olm_inbound_group_session_first_known_index.argtypes = (c_void_p,)
lib.olm_inbound_group_session_first_known_index.restypes = c_uint32
inbound_group_session_function(
lib.olm_export_inbound_group_session_length,
)
inbound_group_session_function(
lib.olm_export_inbound_group_session, c_void_p, c_size_t, c_uint32,
)
class InboundGroupSession(object):
def __init__(self):
self.buf = create_string_buffer(lib.olm_inbound_group_session_size())
self.ptr = lib.olm_inbound_group_session(self.buf)
def pickle(self, key):
key_buffer = create_string_buffer(key)
pickle_length = lib.olm_pickle_inbound_group_session_length(self.ptr)
pickle_buffer = create_string_buffer(pickle_length)
lib.olm_pickle_inbound_group_session(
self.ptr, key_buffer, len(key), pickle_buffer, pickle_length
)
return pickle_buffer.raw
def unpickle(self, key, pickle):
key_buffer = create_string_buffer(key)
pickle_buffer = create_string_buffer(pickle)
lib.olm_unpickle_inbound_group_session(
self.ptr, key_buffer, len(key), pickle_buffer, len(pickle)
)
def init(self, session_key):
key_buffer = create_string_buffer(session_key)
lib.olm_init_inbound_group_session(
self.ptr, key_buffer, len(session_key)
)
def import_session(self, session_key):
key_buffer = create_string_buffer(session_key)
lib.olm_import_inbound_group_session(
self.ptr, key_buffer, len(session_key)
)
def decrypt(self, message):
message_buffer = create_string_buffer(message)
max_plaintext_length = lib.olm_group_decrypt_max_plaintext_length(
self.ptr, message_buffer, len(message)
)
plaintext_buffer = create_string_buffer(max_plaintext_length)
message_buffer = create_string_buffer(message)
message_index = c_uint32()
plaintext_length = lib.olm_group_decrypt(
self.ptr, message_buffer, len(message),
plaintext_buffer, max_plaintext_length,
byref(message_index)
)
return plaintext_buffer.raw[:plaintext_length], message_index.value
def session_id(self):
id_length = lib.olm_inbound_group_session_id_length(self.ptr)
id_buffer = create_string_buffer(id_length)
lib.olm_inbound_group_session_id(self.ptr, id_buffer, id_length)
return id_buffer.raw
def first_known_index(self):
return lib.olm_inbound_group_session_first_known_index(self.ptr)
def export_session(self, message_index):
length = lib.olm_export_inbound_group_session_length(self.ptr)
buffer = create_string_buffer(length)
lib.olm_export_inbound_group_session(self.ptr, buffer, length,
message_index)
return buffer.raw

View file

@ -1,134 +0,0 @@
import json
from os import urandom
from ._base import *
lib.olm_outbound_group_session_size.argtypes = []
lib.olm_outbound_group_session_size.restype = c_size_t
lib.olm_outbound_group_session.argtypes = [c_void_p]
lib.olm_outbound_group_session.restype = c_void_p
lib.olm_outbound_group_session_last_error.argtypes = [c_void_p]
lib.olm_outbound_group_session_last_error.restype = c_char_p
def outbound_group_session_errcheck(res, func, args):
if res == ERR:
raise OlmError("%s: %s" % (
func.__name__, lib.olm_outbound_group_session_last_error(args[0])
))
return res
def outbound_group_session_function(func, *types):
func.argtypes = (c_void_p,) + types
func.restypes = c_size_t
func.errcheck = outbound_group_session_errcheck
outbound_group_session_function(
lib.olm_pickle_outbound_group_session,
c_void_p, c_size_t, c_void_p, c_size_t,
)
outbound_group_session_function(
lib.olm_unpickle_outbound_group_session,
c_void_p, c_size_t, c_void_p, c_size_t,
)
outbound_group_session_function(
lib.olm_init_outbound_group_session_random_length,
)
outbound_group_session_function(
lib.olm_init_outbound_group_session,
c_void_p, c_size_t,
)
lib.olm_outbound_group_session_message_index.argtypes = [c_void_p]
lib.olm_outbound_group_session_message_index.restype = c_uint32
outbound_group_session_function(
lib.olm_group_encrypt_message_length,
c_size_t,
)
outbound_group_session_function(
lib.olm_group_encrypt,
c_void_p, c_size_t, # Plaintext
c_void_p, c_size_t, # Message
)
outbound_group_session_function(
lib.olm_outbound_group_session_id_length,
)
outbound_group_session_function(
lib.olm_outbound_group_session_id,
c_void_p, c_size_t,
)
outbound_group_session_function(
lib.olm_outbound_group_session_key_length,
)
outbound_group_session_function(
lib.olm_outbound_group_session_key,
c_void_p, c_size_t,
)
class OutboundGroupSession(object):
def __init__(self):
self.buf = create_string_buffer(lib.olm_outbound_group_session_size())
self.ptr = lib.olm_outbound_group_session(self.buf)
random_length = lib.olm_init_outbound_group_session_random_length(
self.ptr
)
random = urandom(random_length)
random_buffer = create_string_buffer(random)
lib.olm_init_outbound_group_session(
self.ptr, random_buffer, random_length
)
def pickle(self, key):
key_buffer = create_string_buffer(key)
pickle_length = lib.olm_pickle_outbound_group_session_length(self.ptr)
pickle_buffer = create_string_buffer(pickle_length)
lib.olm_pickle_outbound_group_session(
self.ptr, key_buffer, len(key), pickle_buffer, pickle_length
)
return pickle_buffer.raw
def unpickle(self, key, pickle):
key_buffer = create_string_buffer(key)
pickle_buffer = create_string_buffer(pickle)
lib.olm_unpickle_outbound_group_session(
self.ptr, key_buffer, len(key), pickle_buffer, len(pickle)
)
def encrypt(self, plaintext):
message_length = lib.olm_group_encrypt_message_length(
self.ptr, len(plaintext)
)
message_buffer = create_string_buffer(message_length)
plaintext_buffer = create_string_buffer(plaintext)
lib.olm_group_encrypt(
self.ptr,
plaintext_buffer, len(plaintext),
message_buffer, message_length,
)
return message_buffer.raw
def session_id(self):
id_length = lib.olm_outbound_group_session_id_length(self.ptr)
id_buffer = create_string_buffer(id_length)
lib.olm_outbound_group_session_id(self.ptr, id_buffer, id_length)
return id_buffer.raw
def message_index(self):
return lib.olm_outbound_group_session_message_index(self.ptr)
def session_key(self):
key_length = lib.olm_outbound_group_session_key_length(self.ptr)
key_buffer = create_string_buffer(key_length)
lib.olm_outbound_group_session_key(self.ptr, key_buffer, key_length)
return key_buffer.raw

View file

@ -1,204 +1,486 @@
from os import urandom # -*- coding: utf-8 -*-
# libolm python bindings
# Copyright © 2015-2017 OpenMarket Ltd
# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""libolm Session module.
from ._base import * This module contains the Olm Session part of the Olm library.
It is used to establish a peer-to-peer encrypted communication channel between
two Olm accounts.
Examples:
>>> alice = Account()
>>> bob = Account()
>>> bob.generate_one_time_keys(1)
>>> id_key = bob.identity_keys['curve25519']
>>> one_time = list(bob.one_time_keys["curve25519"].values())[0]
>>> session = OutboundSession(alice, id_key, one_time)
"""
# pylint: disable=redefined-builtin,unused-import
from builtins import bytes, super
from typing import AnyStr, Optional, Type
from future.utils import bytes_to_native_str
# pylint: disable=no-name-in-module
from _libolm import ffi, lib # type: ignore
from ._compat import URANDOM, to_bytearray, to_bytes
from ._finalize import track_for_finalization
# This is imported only for type checking purposes
if False:
from .account import Account # pragma: no cover
lib.olm_session_size.argtypes = [] class OlmSessionError(Exception):
lib.olm_session_size.restype = c_size_t """libolm Session exception."""
lib.olm_session.argtypes = [c_void_p]
lib.olm_session.restype = c_void_p
lib.olm_session_last_error.argtypes = [c_void_p]
lib.olm_session_last_error.restype = c_char_p
def session_errcheck(res, func, args): class _OlmMessage(object):
if res == ERR: def __init__(self, ciphertext, message_type):
raise OlmError("%s: %s" % ( # type: (AnyStr, ffi.cdata) -> None
func.__name__, lib.olm_session_last_error(args[0]) if not ciphertext:
)) raise ValueError("Ciphertext can't be empty")
return res
# I don't know why mypy wants a type annotation here nor why AnyStr
# doesn't work
self.ciphertext = ciphertext # type: ignore
self.message_type = message_type
def __str__(self):
# type: () -> str
type_to_prefix = {
lib.OLM_MESSAGE_TYPE_PRE_KEY: "PRE_KEY",
lib.OLM_MESSAGE_TYPE_MESSAGE: "MESSAGE"
}
prefix = type_to_prefix[self.message_type]
return "{} {}".format(prefix, self.ciphertext)
def session_function(func, *types): class OlmPreKeyMessage(_OlmMessage):
func.argtypes = (c_void_p,) + types """Olm prekey message class
func.restypes = c_size_t
func.errcheck = session_errcheck
session_function(lib.olm_session_last_error) Prekey messages are used to establish an Olm session. After the first
session_function( message exchange the session switches to normal messages
lib.olm_pickle_session, c_void_p, c_size_t, c_void_p, c_size_t """
)
session_function( def __init__(self, ciphertext):
lib.olm_unpickle_session, c_void_p, c_size_t, c_void_p, c_size_t # type: (AnyStr) -> None
) """Create a new Olm prekey message with the supplied ciphertext
session_function(lib.olm_create_outbound_session_random_length)
session_function( Args:
lib.olm_create_outbound_session, ciphertext(str): The ciphertext of the prekey message.
c_void_p, # Account """
c_void_p, c_size_t, # Identity Key _OlmMessage.__init__(self, ciphertext, lib.OLM_MESSAGE_TYPE_PRE_KEY)
c_void_p, c_size_t, # One Time Key
c_void_p, c_size_t, # Random def __repr__(self):
) # type: () -> str
session_function( return "OlmPreKeyMessage({})".format(self.ciphertext)
lib.olm_create_inbound_session,
c_void_p, # Account
c_void_p, c_size_t, # Pre Key Message class OlmMessage(_OlmMessage):
) """Olm message class"""
session_function(
lib.olm_create_inbound_session_from, def __init__(self, ciphertext):
c_void_p, # Account # type: (AnyStr) -> None
c_void_p, c_size_t, # Identity Key """Create a new Olm message with the supplied ciphertext
c_void_p, c_size_t, # Pre Key Message
) Args:
session_function(lib.olm_session_id_length) ciphertext(str): The ciphertext of the message.
session_function(lib.olm_session_id, c_void_p, c_size_t) """
session_function(lib.olm_matches_inbound_session, c_void_p, c_size_t) _OlmMessage.__init__(self, ciphertext, lib.OLM_MESSAGE_TYPE_MESSAGE)
session_function(
lib.olm_matches_inbound_session_from, def __repr__(self):
c_void_p, c_size_t, # Identity Key # type: () -> str
c_void_p, c_size_t, # Pre Key Message return "OlmMessage({})".format(self.ciphertext)
)
session_function(lib.olm_pickle_session_length)
session_function(lib.olm_encrypt_message_type) def _clear_session(session):
session_function(lib.olm_encrypt_random_length) # type: (ffi.cdata) -> None
session_function(lib.olm_encrypt_message_length, c_size_t) lib.olm_clear_session(session)
session_function(
lib.olm_encrypt,
c_void_p, c_size_t, # Plaintext
c_void_p, c_size_t, # Random
c_void_p, c_size_t, # Message
)
session_function(
lib.olm_decrypt_max_plaintext_length,
c_size_t, # Message Type
c_void_p, c_size_t, # Message
)
session_function(
lib.olm_decrypt,
c_size_t, # Message Type
c_void_p, c_size_t, # Message
c_void_p, c_size_t, # Plaintext
)
class Session(object): class Session(object):
"""libolm Session class.
This is an abstract class that can't be instantiated except when unpickling
a previously pickled InboundSession or OutboundSession object with
from_pickle.
"""
def __new__(cls):
# type: (Type[Session]) -> Session
obj = super().__new__(cls)
obj._buf = ffi.new("char[]", lib.olm_session_size())
obj._session = lib.olm_session(obj._buf)
track_for_finalization(obj, obj._session, _clear_session)
return obj
def __init__(self): def __init__(self):
self.buf = create_string_buffer(lib.olm_session_size()) # type: () -> None
self.ptr = lib.olm_session(self.buf) if type(self) is Session:
raise TypeError("Session class may not be instantiated.")
def pickle(self, key): if False:
key_buffer = create_string_buffer(key) self._session = self._session # type: ffi.cdata
pickle_length = lib.olm_pickle_session_length(self.ptr)
pickle_buffer = create_string_buffer(pickle_length)
lib.olm_pickle_session(
self.ptr, key_buffer, len(key), pickle_buffer, pickle_length
)
return pickle_buffer.raw
def unpickle(self, key, pickle): def _check_error(self, ret):
key_buffer = create_string_buffer(key) # type: (int) -> None
pickle_buffer = create_string_buffer(pickle) if ret != lib.olm_error():
lib.olm_unpickle_session( return
self.ptr, key_buffer, len(key), pickle_buffer, len(pickle)
)
def create_outbound(self, account, identity_key, one_time_key): last_error = bytes_to_native_str(
r_length = lib.olm_create_outbound_session_random_length(self.ptr) ffi.string(lib.olm_session_last_error(self._session)))
random = urandom(r_length)
random_buffer = create_string_buffer(random)
identity_key_buffer = create_string_buffer(identity_key)
one_time_key_buffer = create_string_buffer(one_time_key)
lib.olm_create_outbound_session(
self.ptr,
account.ptr,
identity_key_buffer, len(identity_key),
one_time_key_buffer, len(one_time_key),
random_buffer, r_length
)
def create_inbound(self, account, one_time_key_message): raise OlmSessionError(last_error)
one_time_key_message_buffer = create_string_buffer(
one_time_key_message
)
lib.olm_create_inbound_session(
self.ptr,
account.ptr,
one_time_key_message_buffer, len(one_time_key_message)
)
def create_inbound_from(self, account, identity_key, one_time_key_message): def pickle(self, passphrase=""):
identity_key_buffer = create_string_buffer(identity_key) # type: (Optional[str]) -> bytes
one_time_key_message_buffer = create_string_buffer( """Store an Olm session.
one_time_key_message
)
lib.olm_create_inbound_session_from(
self.ptr,
account.ptr,
identity_key_buffer, len(identity_key),
one_time_key_message_buffer, len(one_time_key_message)
)
def session_id(self): Stores a session as a base64 string. Encrypts the session using the
id_length = lib.olm_session_id_length(self.ptr) supplied passphrase. Returns a byte object containing the base64
id_buffer = create_string_buffer(id_length) encoded string of the pickled session. Raises OlmSessionError on
lib.olm_session_id(self.ptr, id_buffer, id_length) failure.
return id_buffer.raw
def matches_inbound(self, one_time_key_message): Args:
one_time_key_message_buffer = create_string_buffer( passphrase(str, optional): The passphrase to be used to encrypt
one_time_key_message, the session.
) """
return bool(lib.olm_matches_inbound_session( byte_key = bytearray(passphrase, "utf-8") if passphrase else b""
self.ptr,
one_time_key_message_buffer, len(one_time_key_message)
))
def matches_inbound_from(self, identity_key, one_time_key_message): pickle_length = lib.olm_pickle_session_length(self._session)
identity_key_buffer = create_string_buffer(identity_key) pickle_buffer = ffi.new("char[]", pickle_length)
one_time_key_message_buffer = create_string_buffer(
one_time_key_message, try:
) self._check_error(
return bool(lib.olm_matches_inbound_session( lib.olm_pickle_session(self._session,
self.ptr, ffi.from_buffer(byte_key),
identity_key_buffer, len(identity_key), len(byte_key),
one_time_key_message_buffer, len(one_time_key_message) pickle_buffer, pickle_length))
)) finally:
# clear out copies of the passphrase
for i in range(0, len(byte_key)):
byte_key[i] = 0
return ffi.unpack(pickle_buffer, pickle_length)
@classmethod
def from_pickle(cls, pickle, passphrase=""):
# type: (bytes, Optional[str]) -> Session
"""Load a previously stored Olm session.
Loads a session from a pickled base64 string and returns a Session
object. Decrypts the session using the supplied passphrase. Raises
OlmSessionError on failure. If the passphrase doesn't match the one
used to encrypt the session then the error message for the
exception will be "BAD_ACCOUNT_KEY". If the base64 couldn't be decoded
then the error message will be "INVALID_BASE64".
Args:
pickle(bytes): Base64 encoded byte string containing the pickled
session
passphrase(str, optional): The passphrase used to encrypt the
session.
"""
if not pickle:
raise ValueError("Pickle can't be empty")
byte_key = bytearray(passphrase, "utf-8") if passphrase else b""
# copy because unpickle will destroy the buffer
pickle_buffer = ffi.new("char[]", pickle)
session = cls.__new__(cls)
try:
ret = lib.olm_unpickle_session(session._session,
ffi.from_buffer(byte_key),
len(byte_key),
pickle_buffer,
len(pickle))
session._check_error(ret)
finally:
# clear out copies of the passphrase
for i in range(0, len(byte_key)):
byte_key[i] = 0
return session
def encrypt(self, plaintext): def encrypt(self, plaintext):
r_length = lib.olm_encrypt_random_length(self.ptr) # type: (AnyStr) -> _OlmMessage
random = urandom(r_length) """Encrypts a message using the session. Returns the ciphertext as a
random_buffer = create_string_buffer(random) base64 encoded string on success. Raises OlmSessionError on failure.
message_type = lib.olm_encrypt_message_type(self.ptr) Args:
message_length = lib.olm_encrypt_message_length( plaintext(str): The plaintext message that will be encrypted.
self.ptr, len(plaintext) """
) byte_plaintext = to_bytearray(plaintext)
message_buffer = create_string_buffer(message_length)
plaintext_buffer = create_string_buffer(plaintext) r_length = lib.olm_encrypt_random_length(self._session)
random = URANDOM(r_length)
lib.olm_encrypt( try:
self.ptr, message_type = lib.olm_encrypt_message_type(self._session)
plaintext_buffer, len(plaintext),
random_buffer, r_length, self._check_error(message_type)
message_buffer, message_length,
) ciphertext_length = lib.olm_encrypt_message_length(
return message_type, message_buffer.raw self._session, len(byte_plaintext)
)
ciphertext_buffer = ffi.new("char[]", ciphertext_length)
self._check_error(lib.olm_encrypt(
self._session,
ffi.from_buffer(byte_plaintext), len(byte_plaintext),
ffi.from_buffer(random), r_length,
ciphertext_buffer, ciphertext_length,
))
finally:
# clear out copies of plaintext
if byte_plaintext is not plaintext:
for i in range(0, len(byte_plaintext)):
byte_plaintext[i] = 0
if message_type == lib.OLM_MESSAGE_TYPE_PRE_KEY:
return OlmPreKeyMessage(
bytes_to_native_str(ffi.unpack(
ciphertext_buffer,
ciphertext_length
)))
elif message_type == lib.OLM_MESSAGE_TYPE_MESSAGE:
return OlmMessage(
bytes_to_native_str(ffi.unpack(
ciphertext_buffer,
ciphertext_length
)))
else: # pragma: no cover
raise ValueError("Unknown message type")
def decrypt(self, message):
# type: (_OlmMessage) -> str
"""Decrypts a message using the session. Returns the plaintext string
on success. Raises OlmSessionError on failure. If the base64 couldn't
be decoded then the error message will be "INVALID_BASE64". If the
message is for an unsupported version of the protocol the error message
will be "BAD_MESSAGE_VERSION". If the message couldn't be decoded then
the error message will be "BAD_MESSAGE_FORMAT". If the MAC on the
message was invalid then the error message will be "BAD_MESSAGE_MAC".
Args:
message(OlmMessage): The Olm message that will be decrypted. It can
be either an OlmPreKeyMessage or an OlmMessage.
"""
if not message.ciphertext:
raise ValueError("Ciphertext can't be empty")
byte_ciphertext = to_bytes(message.ciphertext)
# make a copy the ciphertext buffer, because
# olm_decrypt_max_plaintext_length wants to destroy something
ciphertext_buffer = ffi.new("char[]", byte_ciphertext)
def decrypt(self, message_type, message):
message_buffer = create_string_buffer(message)
max_plaintext_length = lib.olm_decrypt_max_plaintext_length( max_plaintext_length = lib.olm_decrypt_max_plaintext_length(
self.ptr, message_type, message_buffer, len(message) self._session, message.message_type, ciphertext_buffer,
len(byte_ciphertext)
) )
plaintext_buffer = create_string_buffer(max_plaintext_length) self._check_error(max_plaintext_length)
message_buffer = create_string_buffer(message) plaintext_buffer = ffi.new("char[]", max_plaintext_length)
# make a copy the ciphertext buffer, because
# olm_decrypt_max_plaintext_length wants to destroy something
ciphertext_buffer = ffi.new("char[]", byte_ciphertext)
plaintext_length = lib.olm_decrypt( plaintext_length = lib.olm_decrypt(
self.ptr, message_type, message_buffer, len(message), self._session, message.message_type,
ciphertext_buffer, len(byte_ciphertext),
plaintext_buffer, max_plaintext_length plaintext_buffer, max_plaintext_length
) )
return plaintext_buffer.raw[:plaintext_length] self._check_error(plaintext_length)
plaintext = bytes_to_native_str(
ffi.unpack(plaintext_buffer, plaintext_length))
def clear(self): # clear out copies of the plaintext
pass lib.memset(plaintext_buffer, 0, max_plaintext_length)
return plaintext
@property
def id(self):
# type: () -> str
"""str: An identifier for this session. Will be the same for both
ends of the conversation.
"""
id_length = lib.olm_session_id_length(self._session)
id_buffer = ffi.new("char[]", id_length)
self._check_error(
lib.olm_session_id(self._session, id_buffer, id_length)
)
return bytes_to_native_str(ffi.unpack(id_buffer, id_length))
def matches(self, message, identity_key=None):
# type: (OlmPreKeyMessage, Optional[AnyStr]) -> bool
"""Checks if the PRE_KEY message is for this in-bound session.
This can happen if multiple messages are sent to this session before
this session sends a message in reply. Returns True if the session
matches. Returns False if the session does not match. Raises
OlmSessionError on failure. If the base64 couldn't be decoded then the
error message will be "INVALID_BASE64". If the message was for an
unsupported protocol version then the error message will be
"BAD_MESSAGE_VERSION". If the message couldn't be decoded then then the
error message will be * "BAD_MESSAGE_FORMAT".
Args:
message(OlmPreKeyMessage): The Olm prekey message that will checked
if it is intended for this session.
identity_key(str, optional): The identity key of the sender. To
check if the message was also sent using this identity key.
"""
if not isinstance(message, OlmPreKeyMessage):
raise TypeError("Matches can only be called with prekey messages.")
if not message.ciphertext:
raise ValueError("Ciphertext can't be empty")
ret = None
byte_ciphertext = to_bytes(message.ciphertext)
# make a copy, because olm_matches_inbound_session(_from) will distroy
# it
message_buffer = ffi.new("char[]", byte_ciphertext)
if identity_key:
byte_id_key = to_bytes(identity_key)
ret = lib.olm_matches_inbound_session_from(
self._session,
ffi.from_buffer(byte_id_key), len(byte_id_key),
message_buffer, len(byte_ciphertext)
)
else:
ret = lib.olm_matches_inbound_session(
self._session,
message_buffer, len(byte_ciphertext))
self._check_error(ret)
return bool(ret)
class InboundSession(Session):
"""Inbound Olm session for p2p encrypted communication.
"""
def __new__(cls, account, message, identity_key=None):
# type: (Account, OlmPreKeyMessage, Optional[AnyStr]) -> Session
return super().__new__(cls)
def __init__(self, account, message, identity_key=None):
# type: (Account, OlmPreKeyMessage, Optional[AnyStr]) -> None
"""Create a new inbound Olm session.
Create a new in-bound session for sending/receiving messages from an
incoming prekey message. Raises OlmSessionError on failure. If the
base64 couldn't be decoded then error message will be "INVALID_BASE64".
If the message was for an unsupported protocol version then
the errror message will be "BAD_MESSAGE_VERSION". If the message
couldn't be decoded then then the error message will be
"BAD_MESSAGE_FORMAT". If the message refers to an unknown one-time
key then the error message will be "BAD_MESSAGE_KEY_ID".
Args:
account(Account): The Olm Account that will be used to create this
session.
message(OlmPreKeyMessage): The Olm prekey message that will checked
that will be used to create this session.
identity_key(str, optional): The identity key of the sender. To
check if the message was also sent using this identity key.
"""
if not message.ciphertext:
raise ValueError("Ciphertext can't be empty")
super().__init__()
byte_ciphertext = to_bytes(message.ciphertext)
message_buffer = ffi.new("char[]", byte_ciphertext)
if identity_key:
byte_id_key = to_bytes(identity_key)
identity_key_buffer = ffi.new("char[]", byte_id_key)
self._check_error(lib.olm_create_inbound_session_from(
self._session,
account._account,
identity_key_buffer, len(byte_id_key),
message_buffer, len(byte_ciphertext)
))
else:
self._check_error(lib.olm_create_inbound_session(
self._session,
account._account,
message_buffer, len(byte_ciphertext)
))
class OutboundSession(Session):
"""Outbound Olm session for p2p encrypted communication."""
def __new__(cls, account, identity_key, one_time_key):
# type: (Account, AnyStr, AnyStr) -> Session
return super().__new__(cls)
def __init__(self, account, identity_key, one_time_key):
# type: (Account, AnyStr, AnyStr) -> None
"""Create a new outbound Olm session.
Creates a new outbound session for sending messages to a given
identity key and one-time key.
Raises OlmSessionError on failure. If the keys couldn't be decoded as
base64 then the error message will be "INVALID_BASE64".
Args:
account(Account): The Olm Account that will be used to create this
session.
identity_key(str): The identity key of the person with whom we want
to start the session.
one_time_key(str): A one-time key from the person with whom we want
to start the session.
"""
if not identity_key:
raise ValueError("Identity key can't be empty")
if not one_time_key:
raise ValueError("One-time key can't be empty")
super().__init__()
byte_id_key = to_bytes(identity_key)
byte_one_time = to_bytes(one_time_key)
session_random_length = lib.olm_create_outbound_session_random_length(
self._session)
random = URANDOM(session_random_length)
self._check_error(lib.olm_create_outbound_session(
self._session,
account._account,
ffi.from_buffer(byte_id_key), len(byte_id_key),
ffi.from_buffer(byte_one_time), len(byte_one_time),
ffi.from_buffer(random), session_random_length
))

View file

@ -1,56 +1,110 @@
from ._base import lib, c_void_p, c_size_t, c_char_p, \ # -*- coding: utf-8 -*-
create_string_buffer, ERR, OlmError # libolm python bindings
# Copyright © 2015-2017 OpenMarket Ltd
# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""libolm Utility module.
lib.olm_utility_size.argtypes = [] This module contains utilities for olm.
lib.olm_utility_size.restype = c_size_t It only contains the ed25519_verify function for signature verification.
lib.olm_utility.argtypes = [c_void_p] Examples:
lib.olm_utility.restype = c_void_p >>> alice = Account()
lib.olm_utility_last_error.argtypes = [c_void_p] >>> message = "Test"
lib.olm_utility_last_error.restype = c_char_p >>> signature = alice.sign(message)
>>> signing_key = alice.identity_keys["ed25519"]
>>> ed25519_verify(signing_key, message, signature)
"""
# pylint: disable=redefined-builtin,unused-import
from typing import AnyStr, Type
# pylint: disable=no-name-in-module
from _libolm import ffi, lib # type: ignore
from ._compat import to_bytearray, to_bytes
from ._finalize import track_for_finalization
def utility_errcheck(res, func, args): def _clear_utility(utility): # pragma: no cover
if res == ERR: # type: (ffi.cdata) -> None
raise OlmError("%s: %s" % ( lib.olm_clear_utility(utility)
func.__name__, lib.olm_utility_last_error(args[0])
))
return res
def utility_function(func, *types): class OlmVerifyError(Exception):
func.argtypes = (c_void_p,) + types """libolm signature verification exception."""
func.restypes = c_size_t
func.errcheck = utility_errcheck
utility_function(
lib.olm_ed25519_verify,
c_void_p, c_size_t, # key, key_length
c_void_p, c_size_t, # message, message_length
c_void_p, c_size_t, # signature, signature_length
)
class Utility(object): class _Utility(object):
def __init__(self): # pylint: disable=too-few-public-methods
self.buf = create_string_buffer(lib.olm_utility_size()) """libolm Utility class."""
self.ptr = lib.olm_utility(self.buf)
_utility = None _buf = None
_utility = None
@classmethod
def _allocate(cls):
# type: (Type[_Utility]) -> None
cls._buf = ffi.new("char[]", lib.olm_utility_size())
cls._utility = lib.olm_utility(cls._buf)
track_for_finalization(cls, cls._utility, _clear_utility)
@classmethod
def _check_error(cls, ret):
# type: (int) -> None
if ret != lib.olm_error():
return
raise OlmVerifyError("{}".format(
ffi.string(lib.olm_utility_last_error(
cls._utility)).decode("utf-8")))
@classmethod
def _ed25519_verify(cls, key, message, signature):
# type: (Type[_Utility], AnyStr, AnyStr, AnyStr) -> None
if not cls._utility:
cls._allocate()
byte_key = to_bytes(key)
byte_message = to_bytearray(message)
byte_signature = to_bytes(signature)
try:
cls._check_error(
lib.olm_ed25519_verify(cls._utility, byte_key, len(byte_key),
ffi.from_buffer(byte_message),
len(byte_message),
byte_signature, len(byte_signature)))
finally:
# clear out copies of the message, which may be a plaintext
if byte_message is not message:
for i in range(0, len(byte_message)):
byte_message[i] = 0
def ed25519_verify(key, message, signature): def ed25519_verify(key, message, signature):
""" Verify an ed25519 signature. Raises an OlmError if verification fails. # type: (AnyStr, AnyStr, AnyStr) -> None
"""Verify an ed25519 signature.
Raises an OlmVerifyError if verification fails.
Args: Args:
key(bytes): The ed25519 public key used for signing. key(str): The ed25519 public key used for signing.
message(bytes): The signed message. message(str): The signed message.
signature(bytes): The message signature. signature(bytes): The message signature.
""" """
global _utility return _Utility._ed25519_verify(key, message, signature)
if not _utility:
_utility = Utility()
lib.olm_ed25519_verify(_utility.ptr,
key, len(key),
message, len(message),
signature, len(signature))

51
python/olm_build.py Normal file
View file

@ -0,0 +1,51 @@
# -*- coding: utf-8 -*-
# libolm python bindings
# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
#
# Permission to use, copy, modify, and/or distribute this software for
# any purpose with or without fee is hereby granted, provided that the
# above copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
# SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
from __future__ import unicode_literals
import os
from cffi import FFI
ffibuilder = FFI()
PATH = os.path.dirname(__file__)
DEVELOP = os.environ.get("DEVELOP")
compile_args = ["-I../include"]
link_args = ["-L../build"]
if DEVELOP and DEVELOP.lower() in ["yes", "true", "1"]:
link_args.append('-Wl,-rpath=../build')
ffibuilder.set_source(
"_libolm",
r"""
#include <olm/olm.h>
#include <olm/inbound_group_session.h>
#include <olm/outbound_group_session.h>
""",
libraries=["olm"],
extra_compile_args=compile_args,
extra_link_args=link_args)
with open(os.path.join(PATH, "include/olm/olm.h")) as f:
ffibuilder.cdef(f.read(), override=True)
if __name__ == "__main__":
ffibuilder.compile(verbose=True)

3
python/requirements.txt Normal file
View file

@ -0,0 +1,3 @@
future
cffi
typing

8
python/setup.cfg Normal file
View file

@ -0,0 +1,8 @@
[tool:pytest]
testpaths = tests
flake8-ignore =
olm/*.py F401
tests/*.py W503
[coverage:run]
omit=olm/__version__.py

27
python/setup.py Normal file
View file

@ -0,0 +1,27 @@
# -*- coding: utf-8 -*-
import os
from codecs import open
from setuptools import setup
here = os.path.abspath(os.path.dirname(__file__))
about = {}
with open(os.path.join(here, "olm", "__version__.py"), "r", "utf-8") as f:
exec(f.read(), about)
setup(
name=about["__title__"],
version=about["__version__"],
description=about["__description__"],
author=about["__author__"],
author_email=about["__author_email__"],
url=about["__url__"],
license=about["__license__"],
packages=["olm"],
setup_requires=["cffi>=1.0.0"],
cffi_modules=["olm_build.py:ffibuilder"],
install_requires=["cffi>=1.0.0", "future", "typing"],
zip_safe=False
)

View file

@ -0,0 +1,7 @@
pytest
hypothesis
pytest-flake8
pytest-isort
pytest-cov
pytest-benchmark
aspectlib

View file

@ -1,46 +0,0 @@
#! /bin/bash
set -e
cd `dirname $0`
OLM="python -m olm"
ALICE_ACCOUNT=alice.account
ALICE_SESSION=alice.session
ALICE_GROUP_SESSION=alice.group_session
BOB_ACCOUNT=bob.account
BOB_SESSION=bob.session
BOB_GROUP_SESSION=bob.group_session
CHARLIE_GROUP_SESSION=charlie.group_session
rm -f $ALICE_ACCOUNT $BOB_ACCOUNT
rm -f $ALICE_SESSION $BOB_SESSION
rm -f $ALICE_GROUP_SESSION $BOB_GROUP_SESSION $CHARLIE_GROUP_SESSION
$OLM create_account $ALICE_ACCOUNT
$OLM create_account $BOB_ACCOUNT
$OLM generate_keys $BOB_ACCOUNT 1
BOB_IDENTITY_KEY="$($OLM identity_key $BOB_ACCOUNT)"
BOB_ONE_TIME_KEY="$($OLM one_time_key $BOB_ACCOUNT)"
$OLM outbound $ALICE_ACCOUNT $ALICE_SESSION "$BOB_IDENTITY_KEY" "$BOB_ONE_TIME_KEY"
echo "Hello world" | $OLM encrypt $ALICE_SESSION - - | $OLM inbound $BOB_ACCOUNT $BOB_SESSION - -
### group sessions
$OLM outbound_group $ALICE_GROUP_SESSION
$OLM group_credentials $ALICE_GROUP_SESSION | $OLM inbound_group $BOB_GROUP_SESSION
echo "Hello group" | $OLM group_encrypt $ALICE_GROUP_SESSION - group_message
$OLM group_decrypt $BOB_GROUP_SESSION group_message
$OLM export_inbound_group $BOB_GROUP_SESSION | $OLM import_inbound_group $CHARLIE_GROUP_SESSION
$OLM group_decrypt $CHARLIE_GROUP_SESSION group_message
### Sign/verify
ALICE_SIGNING_KEY="$($OLM signing_key $ALICE_ACCOUNT)"
sig="$(echo "Test message" | $OLM sign $ALICE_ACCOUNT - -)"
echo "Test message" | $OLM ed25519_verify $ALICE_SIGNING_KEY $sig -

View file

@ -0,0 +1,100 @@
from builtins import int
import pytest
from hypothesis import given
from hypothesis.strategies import text
from olm import Account, OlmAccountError, OlmVerifyError, ed25519_verify
from olm._compat import to_bytes
class TestClass(object):
def test_to_bytes(self):
assert isinstance(to_bytes("a"), bytes)
assert isinstance(to_bytes(u"a"), bytes)
assert isinstance(to_bytes(b"a"), bytes)
assert isinstance(to_bytes(r"a"), bytes)
with pytest.raises(TypeError):
to_bytes(0)
def test_account_creation(self):
alice = Account()
assert alice.identity_keys
assert len(alice.identity_keys) == 2
def test_account_pickle(self):
alice = Account()
pickle = alice.pickle()
assert (alice.identity_keys == Account.from_pickle(pickle)
.identity_keys)
def test_invalid_unpickle(self):
with pytest.raises(ValueError):
Account.from_pickle(b"")
def test_passphrase_pickle(self):
alice = Account()
passphrase = "It's a secret to everybody"
pickle = alice.pickle(passphrase)
assert (alice.identity_keys == Account.from_pickle(
pickle, passphrase).identity_keys)
def test_wrong_passphrase_pickle(self):
alice = Account()
passphrase = "It's a secret to everybody"
pickle = alice.pickle(passphrase)
with pytest.raises(OlmAccountError):
Account.from_pickle(pickle, "")
def test_one_time_keys(self):
alice = Account()
alice.generate_one_time_keys(10)
one_time_keys = alice.one_time_keys
assert one_time_keys
assert len(one_time_keys["curve25519"]) == 10
def test_max_one_time_keys(self):
alice = Account()
assert isinstance(alice.max_one_time_keys, int)
def test_publish_one_time_keys(self):
alice = Account()
alice.generate_one_time_keys(10)
one_time_keys = alice.one_time_keys
assert one_time_keys
assert len(one_time_keys["curve25519"]) == 10
alice.mark_keys_as_published()
assert not alice.one_time_keys["curve25519"]
def test_clear(self):
alice = Account()
del alice
@given(text())
def test_valid_signature(self, message):
alice = Account()
signature = alice.sign(message)
signing_key = alice.identity_keys["ed25519"]
assert signature
assert signing_key
ed25519_verify(signing_key, message, signature)
@given(text())
def test_invalid_signature(self, message):
alice = Account()
bob = Account()
signature = alice.sign(message)
signing_key = bob.identity_keys["ed25519"]
assert signature
assert signing_key
with pytest.raises(OlmVerifyError):
ed25519_verify(signing_key, message, signature)

View file

@ -0,0 +1,114 @@
import pytest
from olm import InboundGroupSession, OlmGroupSessionError, OutboundGroupSession
class TestClass(object):
def test_session_create(self):
OutboundGroupSession()
def test_session_id(self):
session = OutboundGroupSession()
assert isinstance(session.id, str)
def test_session_index(self):
session = OutboundGroupSession()
assert isinstance(session.message_index, int)
assert session.message_index == 0
def test_outbound_pickle(self):
session = OutboundGroupSession()
pickle = session.pickle()
assert (session.id == OutboundGroupSession.from_pickle(
pickle).id)
def test_invalid_unpickle(self):
with pytest.raises(ValueError):
OutboundGroupSession.from_pickle(b"")
with pytest.raises(ValueError):
InboundGroupSession.from_pickle(b"")
def test_inbound_create(self):
outbound = OutboundGroupSession()
InboundGroupSession(outbound.session_key)
def test_invalid_decrypt(self):
outbound = OutboundGroupSession()
inbound = InboundGroupSession(outbound.session_key)
with pytest.raises(ValueError):
inbound.decrypt("")
def test_inbound_pickle(self):
outbound = OutboundGroupSession()
inbound = InboundGroupSession(outbound.session_key)
pickle = inbound.pickle()
InboundGroupSession.from_pickle(pickle)
def test_inbound_export(self):
outbound = OutboundGroupSession()
inbound = InboundGroupSession(outbound.session_key)
imported = InboundGroupSession.import_session(
inbound.export_session(inbound.first_known_index)
)
assert "Test", 0 == imported.decrypt(outbound.encrypt("Test"))
def test_first_index(self):
outbound = OutboundGroupSession()
inbound = InboundGroupSession(outbound.session_key)
index = inbound.first_known_index
assert isinstance(index, int)
def test_encrypt(self, benchmark):
benchmark.weave(OutboundGroupSession.encrypt, lazy=True)
outbound = OutboundGroupSession()
inbound = InboundGroupSession(outbound.session_key)
assert "Test", 0 == inbound.decrypt(outbound.encrypt("Test"))
def test_decrypt(self, benchmark):
benchmark.weave(InboundGroupSession.decrypt, lazy=True)
outbound = OutboundGroupSession()
inbound = InboundGroupSession(outbound.session_key)
assert "Test", 0 == inbound.decrypt(outbound.encrypt("Test"))
def test_decrypt_twice(self):
outbound = OutboundGroupSession()
inbound = InboundGroupSession(outbound.session_key)
outbound.encrypt("Test 1")
message, index = inbound.decrypt(outbound.encrypt("Test 2"))
assert isinstance(index, int)
assert ("Test 2", 1) == (message, index)
def test_decrypt_failure(self):
outbound = OutboundGroupSession()
inbound = InboundGroupSession(outbound.session_key)
eve_outbound = OutboundGroupSession()
with pytest.raises(OlmGroupSessionError):
inbound.decrypt(eve_outbound.encrypt("Test"))
def test_id(self):
outbound = OutboundGroupSession()
inbound = InboundGroupSession(outbound.session_key)
assert outbound.id == inbound.id
def test_inbound_fail(self):
with pytest.raises(TypeError):
InboundGroupSession()
def test_oubtound_pickle_fail(self):
outbound = OutboundGroupSession()
pickle = outbound.pickle("Test")
with pytest.raises(OlmGroupSessionError):
OutboundGroupSession.from_pickle(pickle)
def test_outbound_clear(self):
session = OutboundGroupSession()
del session
def test_inbound_clear(self):
outbound = OutboundGroupSession()
inbound = InboundGroupSession(outbound.session_key)
del inbound

View file

@ -0,0 +1,143 @@
import pytest
from olm import (Account, InboundSession, OlmMessage, OlmPreKeyMessage,
OlmSessionError, OutboundSession, Session)
class TestClass(object):
def _create_session(self):
alice = Account()
bob = Account()
bob.generate_one_time_keys(1)
id_key = bob.identity_keys["curve25519"]
one_time = list(bob.one_time_keys["curve25519"].values())[0]
session = OutboundSession(alice, id_key, one_time)
return alice, bob, session
def test_session_create(self):
_, _, session_1 = self._create_session()
_, _, session_2 = self._create_session()
assert session_1
assert session_2
assert session_1.id != session_2.id
assert isinstance(session_1.id, str)
def test_session_clear(self):
_, _, session = self._create_session()
del session
def test_invalid_session_create(self):
with pytest.raises(TypeError):
Session()
def test_session_pickle(self):
alice, bob, session = self._create_session()
Session.from_pickle(session.pickle()).id == session.id
def test_session_invalid_pickle(self):
with pytest.raises(ValueError):
Session.from_pickle(b"")
def test_wrong_passphrase_pickle(self):
alice, bob, session = self._create_session()
passphrase = "It's a secret to everybody"
pickle = alice.pickle(passphrase)
with pytest.raises(OlmSessionError):
Session.from_pickle(pickle, "")
def test_encrypt(self):
plaintext = "It's a secret to everybody"
alice, bob, session = self._create_session()
message = session.encrypt(plaintext)
assert (repr(message)
== "OlmPreKeyMessage({})".format(message.ciphertext))
assert (str(message)
== "PRE_KEY {}".format(message.ciphertext))
bob_session = InboundSession(bob, message)
assert plaintext == bob_session.decrypt(message)
def test_empty_message(self):
with pytest.raises(ValueError):
OlmPreKeyMessage("")
empty = OlmPreKeyMessage("x")
empty.ciphertext = ""
alice, bob, session = self._create_session()
with pytest.raises(ValueError):
session.decrypt(empty)
def test_inbound_with_id(self):
plaintext = "It's a secret to everybody"
alice, bob, session = self._create_session()
message = session.encrypt(plaintext)
alice_id = alice.identity_keys["curve25519"]
bob_session = InboundSession(bob, message, alice_id)
assert plaintext == bob_session.decrypt(message)
def test_two_messages(self):
plaintext = "It's a secret to everybody"
alice, bob, session = self._create_session()
message = session.encrypt(plaintext)
alice_id = alice.identity_keys["curve25519"]
bob_session = InboundSession(bob, message, alice_id)
bob.remove_one_time_keys(bob_session)
assert plaintext == bob_session.decrypt(message)
bob_plaintext = "Grumble, Grumble"
bob_message = bob_session.encrypt(bob_plaintext)
assert (repr(bob_message)
== "OlmMessage({})".format(bob_message.ciphertext))
assert bob_plaintext == session.decrypt(bob_message)
def test_matches(self):
plaintext = "It's a secret to everybody"
alice, bob, session = self._create_session()
message = session.encrypt(plaintext)
alice_id = alice.identity_keys["curve25519"]
bob_session = InboundSession(bob, message, alice_id)
assert plaintext == bob_session.decrypt(message)
message_2nd = session.encrypt("Hey! Listen!")
assert bob_session.matches(message_2nd) is True
assert bob_session.matches(message_2nd, alice_id) is True
def test_invalid(self):
alice, bob, session = self._create_session()
message = OlmMessage("x")
with pytest.raises(TypeError):
session.matches(message)
message = OlmPreKeyMessage("x")
message.ciphertext = ""
with pytest.raises(ValueError):
session.matches(message)
with pytest.raises(ValueError):
InboundSession(bob, message)
with pytest.raises(ValueError):
OutboundSession(alice, "", "x")
with pytest.raises(ValueError):
OutboundSession(alice, "x", "")
def test_doesnt_match(self):
plaintext = "It's a secret to everybody"
alice, bob, session = self._create_session()
message = session.encrypt(plaintext)
alice_id = alice.identity_keys["curve25519"]
bob_session = InboundSession(bob, message, alice_id)
_, _, new_session = self._create_session()
new_message = new_session.encrypt(plaintext)
assert bob_session.matches(new_message) is False

43
python/tox.ini Normal file
View file

@ -0,0 +1,43 @@
# content of: tox.ini , put in same dir as setup.py
[tox]
envlist = py27,py36,pypy,{py2,py3}-cov,coverage
[testenv]
basepython =
py27: python2.7
py36: python3.6
pypy: pypy
py2: python2.7
py3: python3.6
deps = -rrequirements.txt
-rtest-requirements.txt
passenv = TOXENV CI TRAVIS TRAVIS_*
commands = pytest --benchmark-disable
usedevelop = True
[testenv:py2-cov]
commands =
pytest --cov-report term-missing --cov=olm --benchmark-disable --cov-branch
setenv =
COVERAGE_FILE=.coverage.py2
[testenv:py3-cov]
commands =
py.test --cov=olm --cov-report term-missing --benchmark-disable --cov-branch
setenv =
COVERAGE_FILE=.coverage.py3
[testenv:coverage]
basepython = python3.6
commands =
coverage erase
coverage combine
coverage xml
coverage report --show-missing
codecov -e TOXENV
deps =
coverage
codecov>=1.4.0
setenv =
COVERAGE_FILE=.coverage