python: Import improved python bindings.

This commit imports the python bindings from:
    https://github.com/poljar/python-olm

The bindings are imported at commit c44b145818520d69eaaa350fb95afcb846125e0f

Minor modifications were made while importing:
    - Removed travis config
    - Removed Arch Linux PKGBUILD
    - Removed the html docs, they can be rebuild by running make html in
      the docs folder
    - Slightly modified the README

The new bindings feature some improvements over the old ones:
    - Python 2 and 3 support
    - Automatic memory management
    - Automatic memory clearing before it is freed
    - Type signatures via the python typing module
    - Full test coverage
    - Properties are utilized where it makes sense (e.g. account.id)

Signed-off-by: Damir Jelić <poljar@termina.org.uk>
This commit is contained in:
Damir Jelić 2018-07-08 12:19:16 +02:00 committed by Hubert Chathi
parent 2fccf44015
commit e3d6673371
29 changed files with 3310 additions and 0 deletions

11
python/.gitignore vendored Normal file
View file

@ -0,0 +1,11 @@
.coverage
.mypy_cache/
.ropeproject/
.pytest_cache/
packages/
python_olm.egg-info/
_libolm*
__pycache__
*.pyc
.hypothesis/
.tox/

177
python/LICENSE Normal file
View file

@ -0,0 +1,177 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS

3
python/MANIFEST.in Normal file
View file

@ -0,0 +1,3 @@
include olm.h
include Makefile
include olm_build.py

35
python/Makefile Normal file
View file

@ -0,0 +1,35 @@
PYTHON ?= python
all: olm
olm:
$(PYTHON) setup.py build
install: olm
$(PYTHON) setup.py install --skip-build -O1 --root=$(DESTDIR)
test: develop py2develop
python3 -m pytest
python2 -m pytest
python3 -m pytest --flake8 --benchmark-disable
python3 -m pytest --isort --benchmark-disable
python3 -m pytest --cov --cov-branch --benchmark-disable
clean:
-rm -r python_olm.egg-info/ dist/ __pycache__/
-rm *.so _libolm.o
-rm -r packages/
-rm -r build/
develop: _libolm.o
py2develop: _libolm.so
_libolm.so: include/olm/olm.h olm_build.py
python2 olm_build.py
-rm _libolm.c
_libolm.o: include/olm/olm.h olm_build.py
python3 olm_build.py
-rm _libolm.c
.PHONY: all olm install clean test develop

164
python/README.md Normal file
View file

@ -0,0 +1,164 @@
python-olm
==========
[![Travis Build Status](https://travis-ci.org/poljar/python-olm.svg?branch=master)](https://travis-ci.org/poljar/python-olm)
[![Codecov Coverage Status](https://codecov.io/gh/poljar/python-olm/branch/master/graph/badge.svg)](https://codecov.io/gh/poljar/python-olm)
Python bindings for Olm.
The specification of the Olm cryptographic ratchet which is used for peer to
peer sessions of this library can be found [here][4].
The specification of the Megolm cryptographic ratchet which is used for group
sessions of this library can be found [here][5].
An example of the implementation of the Olm and Megolm cryptographic protocol
can be found in the Matrix protocol for which the implementation guide can be
found [here][6].
The full API reference can be found [here][7].
# Accounts
Accounts create and hold the central identity of the Olm protocol, they consist of a fingerprint and identity
key pair. They also produce one time keys that are used to start peer to peer
encrypted communication channels.
## Account Creation
A new account is created with the Account class, it creates a new Olm key pair.
The public parts of the key pair are available using the identity_keys property
of the class.
```python
>>> alice = Account()
>>> alice.identity_keys
{'curve25519': '2PytGagXercwHjzQETLcMa3JOsaU2qkPIESaqoi59zE',
'ed25519': 'HHpOuFYdHwoa54GxSttz9YmaTmbuVU3js92UTUjYJgM'}
```
## One Time keys
One time keys need to be generated before people can start an encrypted peer to
peer channel to an account.
```python
>>> alice.generate_one_time_keys(1)
>>> alice.one_time_keys
{'curve25519': {'AAAAAQ': 'KiHoW6CIy905UC4V1Frmwr3VW8bTWkBL4uWtWFFllxM'}}
```
After the one time keys are published they should be marked as such so they
aren't reused.
```python
>>> alice.mark_keys_as_published()
>>> alice.one_time_keys
{'curve25519': {}}
```
## Pickling
Accounts should be stored for later reuse, storing an account is done with the
pickle method while the restoring step is done with the from_pickle class
method.
```python
>>> pickle = alice.pickle()
>>> restored = Account.from_pickle(pickle)
```
# Sessions
Sessions are used to create an encrypted peer to peer communication channel
between two accounts.
## Session Creation
```python
>>> alice = Account()
>>> bob = Account()
>>> bob.generate_one_time_keys(1)
>>> id_key = bob.identity_keys["curve25519"]
>>> one_time = list(bob.one_time_keys["curve25519"].values())[0]
>>> alice_session = OutboundSession(alice, id_key, one_time)
```
## Encryption
After an outbound session is created an encrypted message can be exchanged:
```python
>>> message = alice_session.encrypt("It's a secret to everybody")
>>> message.ciphertext
'AwogkL7RoakT9gnjcZMra+y39WXKRmnxBPEaEp6OSueIA0cSIJxGpBoP8YZ+CGweXQ10LujbXMgK88
xG/JZMQJ5ulK9ZGiC8TYrezNYr3qyIBLlecXr/9wnegvJaSFDmWDVOcf4XfyI/AwogqIZfAklRXGC5b
ZJcZxVxQGgJ8Dz4OQII8k0Dp8msUXwQACIQvagY1dO55Qvnk5PZ2GF+wdKnvj6Zxl2g'
>>> message.message_type
0
```
After the message is transfered, bob can create an InboundSession to decrypt the
message.
```python
>>> bob_session = InboundSession(bob, message)
>>> bob_session.decrypt(message)
"It's a secret to everybody"
```
## Pickling
Sessions like accounts can be stored for later use the API is the same as for
accounts.
```python
>>> pickle = session.pickle()
>>> restored = Session.from_pickle(pickle)
```
# Group Sessions
Group Sessions are used to create a one-to-many encrypted communication channel.
The group session key needs to be shared with all participants that should be able
to decrypt the group messages. Another thing to notice is that, since the group
session key is ratcheted every time a message is encrypted, the session key should
be shared before any messages are encrypted.
## Group Session Creation
Group sessions aren't bound to an account like peer-to-peer sessions so their
creation is straightforward.
```python
>>> alice_group = OutboundGroupSession()
>>> bob_inbound_group = InboundGroupSession(alice_group.session_key)
```
## Group Encryption
Group encryption is pretty simple. The important part is to share the session
key with all participants over a secure channel (e.g. peer-to-peer Olm
sessions).
```python
>>> message = alice_group.encrypt("It's a secret to everybody")
>>> bob_inbound_group.decrypt(message)
("It's a secret to everybody", 0)
```
## Pickling
Pickling works the same way as for peer-to-peer Olm sessions.
```python
>>> pickle = session.pickle()
>>> restored = InboundGroupSession.from_pickle(pickle)
```
[1]: https://git.matrix.org/git/olm/about/
[2]: https://git.matrix.org/git/olm/tree/python?id=f8c61b8f8432d0b0b38d57f513c5048fb42f22ab
[3]: https://cffi.readthedocs.io/en/latest/
[4]: https://git.matrix.org/git/olm/about/docs/olm.rst
[5]: https://git.matrix.org/git/olm/about/docs/megolm.rst
[6]: https://matrix.org/docs/guides/e2e_implementation.html
[7]: https://poljar.github.io/python-olm/html/index.html

20
python/docs/Makefile Normal file
View file

@ -0,0 +1,20 @@
# Minimal makefile for Sphinx documentation
#
# You can set these variables from the command line.
SPHINXOPTS =
SPHINXBUILD = sphinx-build
SPHINXPROJ = olm
SOURCEDIR = .
BUILDDIR = .
# Put it first so that "make" without argument is like "make help".
help:
@$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
.PHONY: help Makefile
# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
%: Makefile
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)

165
python/docs/conf.py Normal file
View file

@ -0,0 +1,165 @@
# -*- coding: utf-8 -*-
#
# Configuration file for the Sphinx documentation builder.
#
# This file does only contain a selection of the most common options. For a
# full list see the documentation:
# http://www.sphinx-doc.org/en/master/config
# -- Path setup --------------------------------------------------------------
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#
import os
import sys
sys.path.insert(0, os.path.abspath('../'))
# -- Project information -----------------------------------------------------
project = 'python-olm'
copyright = '2018, Damir Jelić'
author = 'Damir Jelić'
# The short X.Y version
version = ''
# The full version, including alpha/beta/rc tags
release = '2.2'
# -- General configuration ---------------------------------------------------
# If your documentation needs a minimal Sphinx version, state it here.
#
# needs_sphinx = '1.0'
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = [
'sphinx.ext.autodoc',
'sphinx.ext.doctest',
'sphinx.ext.coverage',
'sphinx.ext.viewcode',
'sphinx.ext.githubpages',
'sphinx.ext.napoleon',
]
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
# The suffix(es) of source filenames.
# You can specify multiple suffix as a list of string:
#
# source_suffix = ['.rst', '.md']
source_suffix = '.rst'
# The master toctree document.
master_doc = 'index'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
#
# This is also used if you do content translation via gettext catalogs.
# Usually you set "language" from the command line for these cases.
language = None
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This pattern also affects html_static_path and html_extra_path .
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'sphinx'
# -- Options for HTML output -------------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
html_theme = 'alabaster'
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
# documentation.
#
# html_theme_options = {}
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
# Custom sidebar templates, must be a dictionary that maps document names
# to template names.
#
# The default sidebars (for documents that don't match any pattern) are
# defined by theme itself. Builtin themes are using these templates by
# default: ``['localtoc.html', 'relations.html', 'sourcelink.html',
# 'searchbox.html']``.
#
# html_sidebars = {}
# -- Options for HTMLHelp output ---------------------------------------------
# Output file base name for HTML help builder.
htmlhelp_basename = 'olmdoc'
# -- Options for LaTeX output ------------------------------------------------
latex_elements = {
# The paper size ('letterpaper' or 'a4paper').
#
# 'papersize': 'letterpaper',
# The font size ('10pt', '11pt' or '12pt').
#
# 'pointsize': '10pt',
# Additional stuff for the LaTeX preamble.
#
# 'preamble': '',
# Latex figure (float) alignment
#
# 'figure_align': 'htbp',
}
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title,
# author, documentclass [howto, manual, or own class]).
latex_documents = [
(master_doc, 'olm.tex', 'olm Documentation',
'Damir Jelić', 'manual'),
]
# -- Options for manual page output ------------------------------------------
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [
(master_doc, 'olm', 'olm Documentation',
[author], 1)
]
# -- Options for Texinfo output ----------------------------------------------
# Grouping the document tree into Texinfo files. List of tuples
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
(master_doc, 'olm', 'olm Documentation',
author, 'olm', 'One line description of project.',
'Miscellaneous'),
]
# -- Extension configuration -------------------------------------------------

1
python/docs/index.html Normal file
View file

@ -0,0 +1 @@
<meta http-equiv="refresh" content="0; url=./html/index.html" />

19
python/docs/index.rst Normal file
View file

@ -0,0 +1,19 @@
.. olm documentation master file, created by
sphinx-quickstart on Sun Jun 17 15:57:08 2018.
Welcome to olm's documentation!
===============================
.. toctree::
Olm API reference <olm.rst>
:maxdepth: 2
:caption: Contents:
Indices and tables
==================
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`

36
python/docs/make.bat Normal file
View file

@ -0,0 +1,36 @@
@ECHO OFF
pushd %~dp0
REM Command file for Sphinx documentation
if "%SPHINXBUILD%" == "" (
set SPHINXBUILD=sphinx-build
)
set SOURCEDIR=.
set BUILDDIR=_build
set SPHINXPROJ=olm
if "%1" == "" goto help
%SPHINXBUILD% >NUL 2>NUL
if errorlevel 9009 (
echo.
echo.The 'sphinx-build' command was not found. Make sure you have Sphinx
echo.installed, then set the SPHINXBUILD environment variable to point
echo.to the full path of the 'sphinx-build' executable. Alternatively you
echo.may add the Sphinx directory to PATH.
echo.
echo.If you don't have Sphinx installed, grab it from
echo.http://sphinx-doc.org/
exit /b 1
)
%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS%
goto end
:help
%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS%
:end
popd

34
python/docs/olm.rst Normal file
View file

@ -0,0 +1,34 @@
olm package
===========
olm.account module
------------------
.. automodule:: olm.account
:members:
:undoc-members:
:show-inheritance:
olm.group\_session module
-------------------------
.. automodule:: olm.group_session
:members:
:undoc-members:
:show-inheritance:
olm.session module
------------------
.. automodule:: olm.session
:members:
:undoc-members:
:show-inheritance:
olm.utility module
------------------
.. automodule:: olm.utility
:members:
:undoc-members:
:show-inheritance:

787
python/include/olm/olm.h Normal file
View file

@ -0,0 +1,787 @@
/* Copyright 2015, 2016 OpenMarket Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
static const size_t OLM_MESSAGE_TYPE_PRE_KEY = 0;
static const size_t OLM_MESSAGE_TYPE_MESSAGE = 1;
typedef struct OlmAccount OlmAccount;
typedef struct OlmSession OlmSession;
typedef struct OlmUtility OlmUtility;
/** Get the version number of the library.
* Arguments will be updated if non-null.
*/
void olm_get_library_version(uint8_t *major, uint8_t *minor, uint8_t *patch);
/** The size of an account object in bytes */
size_t olm_account_size();
/** The size of a session object in bytes */
size_t olm_session_size();
/** The size of a utility object in bytes */
size_t olm_utility_size();
/** Initialise an account object using the supplied memory
* The supplied memory must be at least olm_account_size() bytes */
OlmAccount * olm_account(
void * memory
);
/** Initialise a session object using the supplied memory
* The supplied memory must be at least olm_session_size() bytes */
OlmSession * olm_session(
void * memory
);
/** Initialise a utility object using the supplied memory
* The supplied memory must be at least olm_utility_size() bytes */
OlmUtility * olm_utility(
void * memory
);
/** The value that olm will return from a function if there was an error */
size_t olm_error();
/** A null terminated string describing the most recent error to happen to an
* account */
const char * olm_account_last_error(
OlmAccount * account
);
/** A null terminated string describing the most recent error to happen to a
* session */
const char * olm_session_last_error(
OlmSession * session
);
/** A null terminated string describing the most recent error to happen to a
* utility */
const char * olm_utility_last_error(
OlmUtility * utility
);
/** Clears the memory used to back this account */
size_t olm_clear_account(
OlmAccount * account
);
/** Clears the memory used to back this session */
size_t olm_clear_session(
OlmSession * session
);
/** Clears the memory used to back this utility */
size_t olm_clear_utility(
OlmUtility * utility
);
/** Returns the number of bytes needed to store an account */
size_t olm_pickle_account_length(
OlmAccount * account
);
/** Returns the number of bytes needed to store a session */
size_t olm_pickle_session_length(
OlmSession * session
);
/** Stores an account as a base64 string. Encrypts the account using the
* supplied key. Returns the length of the pickled account on success.
* Returns olm_error() on failure. If the pickle output buffer
* is smaller than olm_pickle_account_length() then
* olm_account_last_error() will be "OUTPUT_BUFFER_TOO_SMALL" */
size_t olm_pickle_account(
OlmAccount * account,
void const * key, size_t key_length,
void * pickled, size_t pickled_length
);
/** Stores a session as a base64 string. Encrypts the session using the
* supplied key. Returns the length of the pickled session on success.
* Returns olm_error() on failure. If the pickle output buffer
* is smaller than olm_pickle_session_length() then
* olm_session_last_error() will be "OUTPUT_BUFFER_TOO_SMALL" */
size_t olm_pickle_session(
OlmSession * session,
void const * key, size_t key_length,
void * pickled, size_t pickled_length
);
/** Loads an account from a pickled base64 string. Decrypts the account using
* the supplied key. Returns olm_error() on failure. If the key doesn't
* match the one used to encrypt the account then olm_account_last_error()
* will be "BAD_ACCOUNT_KEY". If the base64 couldn't be decoded then
* olm_account_last_error() will be "INVALID_BASE64". The input pickled
* buffer is destroyed */
size_t olm_unpickle_account(
OlmAccount * account,
void const * key, size_t key_length,
void * pickled, size_t pickled_length
);
/** Loads a session from a pickled base64 string. Decrypts the session using
* the supplied key. Returns olm_error() on failure. If the key doesn't
* match the one used to encrypt the account then olm_session_last_error()
* will be "BAD_ACCOUNT_KEY". If the base64 couldn't be decoded then
* olm_session_last_error() will be "INVALID_BASE64". The input pickled
* buffer is destroyed */
size_t olm_unpickle_session(
OlmSession * session,
void const * key, size_t key_length,
void * pickled, size_t pickled_length
);
/** The number of random bytes needed to create an account.*/
size_t olm_create_account_random_length(
OlmAccount * account
);
/** Creates a new account. Returns olm_error() on failure. If weren't
* enough random bytes then olm_account_last_error() will be
* "NOT_ENOUGH_RANDOM" */
size_t olm_create_account(
OlmAccount * account,
void * random, size_t random_length
);
/** The size of the output buffer needed to hold the identity keys */
size_t olm_account_identity_keys_length(
OlmAccount * account
);
/** Writes the public parts of the identity keys for the account into the
* identity_keys output buffer. Returns olm_error() on failure. If the
* identity_keys buffer was too small then olm_account_last_error() will be
* "OUTPUT_BUFFER_TOO_SMALL". */
size_t olm_account_identity_keys(
OlmAccount * account,
void * identity_keys, size_t identity_key_length
);
/** The length of an ed25519 signature encoded as base64. */
size_t olm_account_signature_length(
OlmAccount * account
);
/** Signs a message with the ed25519 key for this account. Returns olm_error()
* on failure. If the signature buffer was too small then
* olm_account_last_error() will be "OUTPUT_BUFFER_TOO_SMALL" */
size_t olm_account_sign(
OlmAccount * account,
void const * message, size_t message_length,
void * signature, size_t signature_length
);
/** The size of the output buffer needed to hold the one time keys */
size_t olm_account_one_time_keys_length(
OlmAccount * account
);
/** Writes the public parts of the unpublished one time keys for the account
* into the one_time_keys output buffer.
* <p>
* The returned data is a JSON-formatted object with the single property
* <tt>curve25519</tt>, which is itself an object mapping key id to
* base64-encoded Curve25519 key. For example:
* <pre>
* {
* curve25519: {
* "AAAAAA": "wo76WcYtb0Vk/pBOdmduiGJ0wIEjW4IBMbbQn7aSnTo",
* "AAAAAB": "LRvjo46L1X2vx69sS9QNFD29HWulxrmW11Up5AfAjgU"
* }
* }
* </pre>
* Returns olm_error() on failure.
* <p>
* If the one_time_keys buffer was too small then olm_account_last_error()
* will be "OUTPUT_BUFFER_TOO_SMALL". */
size_t olm_account_one_time_keys(
OlmAccount * account,
void * one_time_keys, size_t one_time_keys_length
);
/** Marks the current set of one time keys as being published. */
size_t olm_account_mark_keys_as_published(
OlmAccount * account
);
/** The largest number of one time keys this account can store. */
size_t olm_account_max_number_of_one_time_keys(
OlmAccount * account
);
/** The number of random bytes needed to generate a given number of new one
* time keys. */
size_t olm_account_generate_one_time_keys_random_length(
OlmAccount * account,
size_t number_of_keys
);
/** Generates a number of new one time keys. If the total number of keys stored
* by this account exceeds max_number_of_one_time_keys() then the old keys are
* discarded. Returns olm_error() on error. If the number of random bytes is
* too small then olm_account_last_error() will be "NOT_ENOUGH_RANDOM". */
size_t olm_account_generate_one_time_keys(
OlmAccount * account,
size_t number_of_keys,
void * random, size_t random_length
);
/** The number of random bytes needed to create an outbound session */
size_t olm_create_outbound_session_random_length(
OlmSession * session
);
/** Creates a new out-bound session for sending messages to a given identity_key
* and one_time_key. Returns olm_error() on failure. If the keys couldn't be
* decoded as base64 then olm_session_last_error() will be "INVALID_BASE64"
* If there weren't enough random bytes then olm_session_last_error() will
* be "NOT_ENOUGH_RANDOM". */
size_t olm_create_outbound_session(
OlmSession * session,
OlmAccount * account,
void const * their_identity_key, size_t their_identity_key_length,
void const * their_one_time_key, size_t their_one_time_key_length,
void * random, size_t random_length
);
/** Create a new in-bound session for sending/receiving messages from an
* incoming PRE_KEY message. Returns olm_error() on failure. If the base64
* couldn't be decoded then olm_session_last_error will be "INVALID_BASE64".
* If the message was for an unsupported protocol version then
* olm_session_last_error() will be "BAD_MESSAGE_VERSION". If the message
* couldn't be decoded then then olm_session_last_error() will be
* "BAD_MESSAGE_FORMAT". If the message refers to an unknown one time
* key then olm_session_last_error() will be "BAD_MESSAGE_KEY_ID". */
size_t olm_create_inbound_session(
OlmSession * session,
OlmAccount * account,
void * one_time_key_message, size_t message_length
);
/** Create a new in-bound session for sending/receiving messages from an
* incoming PRE_KEY message. Returns olm_error() on failure. If the base64
* couldn't be decoded then olm_session_last_error will be "INVALID_BASE64".
* If the message was for an unsupported protocol version then
* olm_session_last_error() will be "BAD_MESSAGE_VERSION". If the message
* couldn't be decoded then then olm_session_last_error() will be
* "BAD_MESSAGE_FORMAT". If the message refers to an unknown one time
* key then olm_session_last_error() will be "BAD_MESSAGE_KEY_ID". */
size_t olm_create_inbound_session_from(
OlmSession * session,
OlmAccount * account,
void const * their_identity_key, size_t their_identity_key_length,
void * one_time_key_message, size_t message_length
);
/** The length of the buffer needed to return the id for this session. */
size_t olm_session_id_length(
OlmSession * session
);
/** An identifier for this session. Will be the same for both ends of the
* conversation. If the id buffer is too small then olm_session_last_error()
* will be "OUTPUT_BUFFER_TOO_SMALL". */
size_t olm_session_id(
OlmSession * session,
void * id, size_t id_length
);
int olm_session_has_received_message(
OlmSession *session
);
/** Checks if the PRE_KEY message is for this in-bound session. This can happen
* if multiple messages are sent to this account before this account sends a
* message in reply. Returns 1 if the session matches. Returns 0 if the session
* does not match. Returns olm_error() on failure. If the base64
* couldn't be decoded then olm_session_last_error will be "INVALID_BASE64".
* If the message was for an unsupported protocol version then
* olm_session_last_error() will be "BAD_MESSAGE_VERSION". If the message
* couldn't be decoded then then olm_session_last_error() will be
* "BAD_MESSAGE_FORMAT". */
size_t olm_matches_inbound_session(
OlmSession * session,
void * one_time_key_message, size_t message_length
);
/** Checks if the PRE_KEY message is for this in-bound session. This can happen
* if multiple messages are sent to this account before this account sends a
* message in reply. Returns 1 if the session matches. Returns 0 if the session
* does not match. Returns olm_error() on failure. If the base64
* couldn't be decoded then olm_session_last_error will be "INVALID_BASE64".
* If the message was for an unsupported protocol version then
* olm_session_last_error() will be "BAD_MESSAGE_VERSION". If the message
* couldn't be decoded then then olm_session_last_error() will be
* "BAD_MESSAGE_FORMAT". */
size_t olm_matches_inbound_session_from(
OlmSession * session,
void const * their_identity_key, size_t their_identity_key_length,
void * one_time_key_message, size_t message_length
);
/** Removes the one time keys that the session used from the account. Returns
* olm_error() on failure. If the account doesn't have any matching one time
* keys then olm_account_last_error() will be "BAD_MESSAGE_KEY_ID". */
size_t olm_remove_one_time_keys(
OlmAccount * account,
OlmSession * session
);
/** The type of the next message that olm_encrypt() will return. Returns
* OLM_MESSAGE_TYPE_PRE_KEY if the message will be a PRE_KEY message.
* Returns OLM_MESSAGE_TYPE_MESSAGE if the message will be a normal message.
* Returns olm_error on failure. */
size_t olm_encrypt_message_type(
OlmSession * session
);
/** The number of random bytes needed to encrypt the next message. */
size_t olm_encrypt_random_length(
OlmSession * session
);
/** The size of the next message in bytes for the given number of plain-text
* bytes. */
size_t olm_encrypt_message_length(
OlmSession * session,
size_t plaintext_length
);
/** Encrypts a message using the session. Returns the length of the message in
* bytes on success. Writes the message as base64 into the message buffer.
* Returns olm_error() on failure. If the message buffer is too small then
* olm_session_last_error() will be "OUTPUT_BUFFER_TOO_SMALL". If there
* weren't enough random bytes then olm_session_last_error() will be
* "NOT_ENOUGH_RANDOM". */
size_t olm_encrypt(
OlmSession * session,
void const * plaintext, size_t plaintext_length,
void * random, size_t random_length,
void * message, size_t message_length
);
/** The maximum number of bytes of plain-text a given message could decode to.
* The actual size could be different due to padding. The input message buffer
* is destroyed. Returns olm_error() on failure. If the message base64
* couldn't be decoded then olm_session_last_error() will be
* "INVALID_BASE64". If the message is for an unsupported version of the
* protocol then olm_session_last_error() will be "BAD_MESSAGE_VERSION".
* If the message couldn't be decoded then olm_session_last_error() will be
* "BAD_MESSAGE_FORMAT". */
size_t olm_decrypt_max_plaintext_length(
OlmSession * session,
size_t message_type,
void * message, size_t message_length
);
/** Decrypts a message using the session. The input message buffer is destroyed.
* Returns the length of the plain-text on success. Returns olm_error() on
* failure. If the plain-text buffer is smaller than
* olm_decrypt_max_plaintext_length() then olm_session_last_error()
* will be "OUTPUT_BUFFER_TOO_SMALL". If the base64 couldn't be decoded then
* olm_session_last_error() will be "INVALID_BASE64". If the message is for
* an unsupported version of the protocol then olm_session_last_error() will
* be "BAD_MESSAGE_VERSION". If the message couldn't be decoded then
* olm_session_last_error() will be BAD_MESSAGE_FORMAT".
* If the MAC on the message was invalid then olm_session_last_error() will
* be "BAD_MESSAGE_MAC". */
size_t olm_decrypt(
OlmSession * session,
size_t message_type,
void * message, size_t message_length,
void * plaintext, size_t max_plaintext_length
);
/** The length of the buffer needed to hold the SHA-256 hash. */
size_t olm_sha256_length(
OlmUtility * utility
);
/** Calculates the SHA-256 hash of the input and encodes it as base64. If the
* output buffer is smaller than olm_sha256_length() then
* olm_session_last_error() will be "OUTPUT_BUFFER_TOO_SMALL". */
size_t olm_sha256(
OlmUtility * utility,
void const * input, size_t input_length,
void * output, size_t output_length
);
/** Verify an ed25519 signature. If the key was too small then
* olm_session_last_error will be "INVALID_BASE64". If the signature was invalid
* then olm_session_last_error() will be "BAD_MESSAGE_MAC". */
size_t olm_ed25519_verify(
OlmUtility * utility,
void const * key, size_t key_length,
void const * message, size_t message_length,
void * signature, size_t signature_length
);
typedef struct OlmOutboundGroupSession OlmOutboundGroupSession;
/** get the size of an outbound group session, in bytes. */
size_t olm_outbound_group_session_size();
/**
* Initialise an outbound group session object using the supplied memory
* The supplied memory should be at least olm_outbound_group_session_size()
* bytes.
*/
OlmOutboundGroupSession * olm_outbound_group_session(
void *memory
);
/**
* A null terminated string describing the most recent error to happen to a
* group session */
const char *olm_outbound_group_session_last_error(
const OlmOutboundGroupSession *session
);
/** Clears the memory used to back this group session */
size_t olm_clear_outbound_group_session(
OlmOutboundGroupSession *session
);
/** Returns the number of bytes needed to store an outbound group session */
size_t olm_pickle_outbound_group_session_length(
const OlmOutboundGroupSession *session
);
/**
* Stores a group session as a base64 string. Encrypts the session using the
* supplied key. Returns the length of the session on success.
*
* Returns olm_error() on failure. If the pickle output buffer
* is smaller than olm_pickle_outbound_group_session_length() then
* olm_outbound_group_session_last_error() will be "OUTPUT_BUFFER_TOO_SMALL"
*/
size_t olm_pickle_outbound_group_session(
OlmOutboundGroupSession *session,
void const * key, size_t key_length,
void * pickled, size_t pickled_length
);
/**
* Loads a group session from a pickled base64 string. Decrypts the session
* using the supplied key.
*
* Returns olm_error() on failure. If the key doesn't match the one used to
* encrypt the account then olm_outbound_group_session_last_error() will be
* "BAD_ACCOUNT_KEY". If the base64 couldn't be decoded then
* olm_outbound_group_session_last_error() will be "INVALID_BASE64". The input
* pickled buffer is destroyed
*/
size_t olm_unpickle_outbound_group_session(
OlmOutboundGroupSession *session,
void const * key, size_t key_length,
void * pickled, size_t pickled_length
);
/** The number of random bytes needed to create an outbound group session */
size_t olm_init_outbound_group_session_random_length(
const OlmOutboundGroupSession *session
);
/**
* Start a new outbound group session. Returns olm_error() on failure. On
* failure last_error will be set with an error code. The last_error will be
* NOT_ENOUGH_RANDOM if the number of random bytes was too small.
*/
size_t olm_init_outbound_group_session(
OlmOutboundGroupSession *session,
uint8_t *random, size_t random_length
);
/**
* The number of bytes that will be created by encrypting a message
*/
size_t olm_group_encrypt_message_length(
OlmOutboundGroupSession *session,
size_t plaintext_length
);
/**
* Encrypt some plain-text. Returns the length of the encrypted message or
* olm_error() on failure. On failure last_error will be set with an
* error code. The last_error will be OUTPUT_BUFFER_TOO_SMALL if the output
* buffer is too small.
*/
size_t olm_group_encrypt(
OlmOutboundGroupSession *session,
uint8_t const * plaintext, size_t plaintext_length,
uint8_t * message, size_t message_length
);
/**
* Get the number of bytes returned by olm_outbound_group_session_id()
*/
size_t olm_outbound_group_session_id_length(
const OlmOutboundGroupSession *session
);
/**
* Get a base64-encoded identifier for this session.
*
* Returns the length of the session id on success or olm_error() on
* failure. On failure last_error will be set with an error code. The
* last_error will be OUTPUT_BUFFER_TOO_SMALL if the id buffer was too
* small.
*/
size_t olm_outbound_group_session_id(
OlmOutboundGroupSession *session,
uint8_t * id, size_t id_length
);
/**
* Get the current message index for this session.
*
* Each message is sent with an increasing index; this returns the index for
* the next message.
*/
uint32_t olm_outbound_group_session_message_index(
OlmOutboundGroupSession *session
);
/**
* Get the number of bytes returned by olm_outbound_group_session_key()
*/
size_t olm_outbound_group_session_key_length(
const OlmOutboundGroupSession *session
);
/**
* Get the base64-encoded current ratchet key for this session.
*
* Each message is sent with a different ratchet key. This function returns the
* ratchet key that will be used for the next message.
*
* Returns the length of the ratchet key on success or olm_error() on
* failure. On failure last_error will be set with an error code. The
* last_error will be OUTPUT_BUFFER_TOO_SMALL if the buffer was too small.
*/
size_t olm_outbound_group_session_key(
OlmOutboundGroupSession *session,
uint8_t * key, size_t key_length
);
typedef struct OlmInboundGroupSession OlmInboundGroupSession;
/** get the size of an inbound group session, in bytes. */
size_t olm_inbound_group_session_size();
/**
* Initialise an inbound group session object using the supplied memory
* The supplied memory should be at least olm_inbound_group_session_size()
* bytes.
*/
OlmInboundGroupSession * olm_inbound_group_session(
void *memory
);
/**
* A null terminated string describing the most recent error to happen to a
* group session */
const char *olm_inbound_group_session_last_error(
const OlmInboundGroupSession *session
);
/** Clears the memory used to back this group session */
size_t olm_clear_inbound_group_session(
OlmInboundGroupSession *session
);
/** Returns the number of bytes needed to store an inbound group session */
size_t olm_pickle_inbound_group_session_length(
const OlmInboundGroupSession *session
);
/**
* Stores a group session as a base64 string. Encrypts the session using the
* supplied key. Returns the length of the session on success.
*
* Returns olm_error() on failure. If the pickle output buffer
* is smaller than olm_pickle_inbound_group_session_length() then
* olm_inbound_group_session_last_error() will be "OUTPUT_BUFFER_TOO_SMALL"
*/
size_t olm_pickle_inbound_group_session(
OlmInboundGroupSession *session,
void const * key, size_t key_length,
void * pickled, size_t pickled_length
);
/**
* Loads a group session from a pickled base64 string. Decrypts the session
* using the supplied key.
*
* Returns olm_error() on failure. If the key doesn't match the one used to
* encrypt the account then olm_inbound_group_session_last_error() will be
* "BAD_ACCOUNT_KEY". If the base64 couldn't be decoded then
* olm_inbound_group_session_last_error() will be "INVALID_BASE64". The input
* pickled buffer is destroyed
*/
size_t olm_unpickle_inbound_group_session(
OlmInboundGroupSession *session,
void const * key, size_t key_length,
void * pickled, size_t pickled_length
);
/**
* Start a new inbound group session, from a key exported from
* olm_outbound_group_session_key
*
* Returns olm_error() on failure. On failure last_error will be set with an
* error code. The last_error will be:
*
* * OLM_INVALID_BASE64 if the session_key is not valid base64
* * OLM_BAD_SESSION_KEY if the session_key is invalid
*/
size_t olm_init_inbound_group_session(
OlmInboundGroupSession *session,
/* base64-encoded keys */
uint8_t const * session_key, size_t session_key_length
);
/**
* Import an inbound group session, from a previous export.
*
* Returns olm_error() on failure. On failure last_error will be set with an
* error code. The last_error will be:
*
* * OLM_INVALID_BASE64 if the session_key is not valid base64
* * OLM_BAD_SESSION_KEY if the session_key is invalid
*/
size_t olm_import_inbound_group_session(
OlmInboundGroupSession *session,
/* base64-encoded keys; note that it will be overwritten with the base64-decoded
data. */
uint8_t const * session_key, size_t session_key_length
);
/**
* Get an upper bound on the number of bytes of plain-text the decrypt method
* will write for a given input message length. The actual size could be
* different due to padding.
*
* The input message buffer is destroyed.
*
* Returns olm_error() on failure.
*/
size_t olm_group_decrypt_max_plaintext_length(
OlmInboundGroupSession *session,
uint8_t * message, size_t message_length
);
/**
* Decrypt a message.
*
* The input message buffer is destroyed.
*
* Returns the length of the decrypted plain-text, or olm_error() on failure.
*
* On failure last_error will be set with an error code. The last_error will
* be:
* * OLM_OUTPUT_BUFFER_TOO_SMALL if the plain-text buffer is too small
* * OLM_INVALID_BASE64 if the message is not valid base-64
* * OLM_BAD_MESSAGE_VERSION if the message was encrypted with an unsupported
* version of the protocol
* * OLM_BAD_MESSAGE_FORMAT if the message headers could not be decoded
* * OLM_BAD_MESSAGE_MAC if the message could not be verified
* * OLM_UNKNOWN_MESSAGE_INDEX if we do not have a session key corresponding to the
* message's index (ie, it was sent before the session key was shared with
* us)
*/
size_t olm_group_decrypt(
OlmInboundGroupSession *session,
/* input; note that it will be overwritten with the base64-decoded
message. */
uint8_t * message, size_t message_length,
/* output */
uint8_t * plaintext, size_t max_plaintext_length,
uint32_t * message_index
);
/**
* Get the number of bytes returned by olm_inbound_group_session_id()
*/
size_t olm_inbound_group_session_id_length(
const OlmInboundGroupSession *session
);
/**
* Get a base64-encoded identifier for this session.
*
* Returns the length of the session id on success or olm_error() on
* failure. On failure last_error will be set with an error code. The
* last_error will be OUTPUT_BUFFER_TOO_SMALL if the id buffer was too
* small.
*/
size_t olm_inbound_group_session_id(
OlmInboundGroupSession *session,
uint8_t * id, size_t id_length
);
/**
* Get the first message index we know how to decrypt.
*/
uint32_t olm_inbound_group_session_first_known_index(
const OlmInboundGroupSession *session
);
/**
* Check if the session has been verified as a valid session.
*
* (A session is verified either because the original session share was signed,
* or because we have subsequently successfully decrypted a message.)
*
* This is mainly intended for the unit tests, currently.
*/
int olm_inbound_group_session_is_verified(
const OlmInboundGroupSession *session
);
/**
* Get the number of bytes returned by olm_export_inbound_group_session()
*/
size_t olm_export_inbound_group_session_length(
const OlmInboundGroupSession *session
);
/**
* Export the base64-encoded ratchet key for this session, at the given index,
* in a format which can be used by olm_import_inbound_group_session
*
* Returns the length of the ratchet key on success or olm_error() on
* failure. On failure last_error will be set with an error code. The
* last_error will be:
* * OUTPUT_BUFFER_TOO_SMALL if the buffer was too small
* * OLM_UNKNOWN_MESSAGE_INDEX if we do not have a session key corresponding to the
* given index (ie, it was sent before the session key was shared with
* us)
*/
size_t olm_export_inbound_group_session(
OlmInboundGroupSession *session,
uint8_t * key, size_t key_length, uint32_t message_index
);

26
python/olm/__init__.py Normal file
View file

@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
# libolm python bindings
# Copyright © 2015-2017 OpenMarket Ltd
# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
"""
Olm Python bindings
~~~~~~~~~~~~~~~~~~~~~
| This package implements python bindings for the libolm C library.
| © Copyright 2015-2017 by OpenMarket Ltd
| © Copyright 2018 by Damir Jelić
"""
from .utility import ed25519_verify, OlmVerifyError
from .account import Account, OlmAccountError
from .session import (
Session,
InboundSession,
OutboundSession,
OlmSessionError,
OlmMessage,
OlmPreKeyMessage
)
from .group_session import (
InboundGroupSession,
OutboundGroupSession,
OlmGroupSessionError
)

View file

@ -0,0 +1,9 @@
__title__ = "python-olm"
__description__ = ("python CFFI bindings for the olm "
"cryptographic ratchet library")
__url__ = "https://github.com/poljar/python-olm"
__version__ = "0.1"
__author__ = "Damir Jelić"
__author_email__ = "poljar@termina.org.uk"
__license__ = "Apache 2.0"
__copyright__ = "Copyright 2018 Damir Jelić"

24
python/olm/_compat.py Normal file
View file

@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
# libolm python bindings
# Copyright © 2015-2017 OpenMarket Ltd
# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
from builtins import bytes, str
from typing import AnyStr
try:
import secrets
URANDOM = secrets.token_bytes # pragma: no cover
except ImportError: # pragma: no cover
from os import urandom
URANDOM = urandom # type: ignore
def to_bytes(string):
# type: (AnyStr) -> bytes
if isinstance(string, bytes):
return string
elif isinstance(string, str):
return bytes(string, "utf-8")
raise TypeError("Invalid type {}".format(type(string)))

65
python/olm/_finalize.py Normal file
View file

@ -0,0 +1,65 @@
# The MIT License (MIT)
# Copyright (c) 2010 Benjamin Peterson <benjamin@python.org>
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
# OR OTHER DEALINGS IN THE SOFTWARE.
"""Finalization with weakrefs
This is designed for avoiding __del__.
"""
from __future__ import print_function
import sys
import traceback
import weakref
__author__ = "Benjamin Peterson <benjamin@python.org>"
class OwnerRef(weakref.ref):
"""A simple weakref.ref subclass, so attributes can be added."""
pass
def _run_finalizer(ref):
"""Internal weakref callback to run finalizers"""
del _finalize_refs[id(ref)]
finalizer = ref.finalizer
item = ref.item
try:
finalizer(item)
except Exception: # pragma: no cover
print("Exception running {}:".format(finalizer), file=sys.stderr)
traceback.print_exc()
_finalize_refs = {}
def track_for_finalization(owner, item, finalizer):
"""Register an object for finalization.
``owner`` is the the object which is responsible for ``item``.
``finalizer`` will be called with ``item`` as its only argument when
``owner`` is destroyed by the garbage collector.
"""
ref = OwnerRef(owner, _run_finalizer)
ref.item = item
ref.finalizer = finalizer
_finalize_refs[id(ref)] = ref

239
python/olm/account.py Normal file
View file

@ -0,0 +1,239 @@
# -*- coding: utf-8 -*-
# libolm python bindings
# Copyright © 2015-2017 OpenMarket Ltd
# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
"""libolm Account module.
This module contains the account part of the Olm library. It contains a single
Account class which handles the creation of new accounts as well as the storing
and restoring of them.
Examples:
>>> acc = Account()
>>> account.identity_keys()
>>> account.generate_one_time_keys(1)
"""
import json
# pylint: disable=redefined-builtin,unused-import
from builtins import bytes, super
from typing import AnyStr, Dict, Optional, Type
from future.utils import bytes_to_native_str
# pylint: disable=no-name-in-module
from _libolm import ffi, lib # type: ignore
from ._compat import URANDOM, to_bytes
from ._finalize import track_for_finalization
# This is imported only for type checking purposes
if False:
from .session import Session # pragma: no cover
def _clear_account(account):
# type: (ffi.cdata) -> None
lib.olm_clear_account(account)
class OlmAccountError(Exception):
"""libolm Account error exception."""
class Account(object):
"""libolm Account class."""
def __new__(cls):
# type: (Type[Account]) -> Account
obj = super().__new__(cls)
obj._buf = ffi.new("char[]", lib.olm_account_size())
obj._account = lib.olm_account(obj._buf)
track_for_finalization(obj, obj._account, _clear_account)
return obj
def __init__(self):
# type: () -> None
"""Create a new Olm account.
Creates a new account and its matching identity key pair.
Raises OlmAccountError on failure. If there weren't enough random bytes
for the account creation the error message for the exception will be
NOT_ENOUGH_RANDOM.
"""
# This is needed to silence mypy not knowing the type of _account.
# There has to be a better way for this.
if False: # pragma: no cover
self._account = self._account # type: ffi.cdata
random_length = lib.olm_create_account_random_length(self._account)
random = URANDOM(random_length)
random_buffer = ffi.new("char[]", random)
self._check_error(
lib.olm_create_account(self._account, random_buffer,
random_length))
def _check_error(self, ret):
# type: (int) -> None
if ret != lib.olm_error():
return
last_error = bytes_to_native_str(
ffi.string((lib.olm_account_last_error(self._account))))
raise OlmAccountError(last_error)
def pickle(self, passphrase=""):
# type: (Optional[str]) -> bytes
"""Store an Olm account.
Stores an account as a base64 string. Encrypts the account using the
supplied passphrase. Returns a byte object containing the base64
encoded string of the pickled account. Raises OlmAccountError on
failure.
Args:
passphrase(str, optional): The passphrase to be used to encrypt
the account.
"""
byte_key = bytes(passphrase, "utf-8") if passphrase else b""
key_buffer = ffi.new("char[]", byte_key)
pickle_length = lib.olm_pickle_account_length(self._account)
pickle_buffer = ffi.new("char[]", pickle_length)
self._check_error(
lib.olm_pickle_account(self._account, key_buffer, len(byte_key),
pickle_buffer, pickle_length))
return ffi.unpack(pickle_buffer, pickle_length)
@classmethod
def from_pickle(cls, pickle, passphrase=""):
# type: (bytes, Optional[str]) -> Account
"""Load a previously stored olm account.
Loads an account from a pickled base64-encoded string and returns an
Account object. Decrypts the account using the supplied passphrase.
Raises OlmAccountError on failure. If the passphrase doesn't match the
one used to encrypt the account then the error message for the
exception will be "BAD_ACCOUNT_KEY". If the base64 couldn't be decoded
then the error message will be "INVALID_BASE64".
Args:
pickle(bytes): Base64 encoded byte string containing the pickled
account
passphrase(str, optional): The passphrase used to encrypt the
account.
"""
if not pickle:
raise ValueError("Pickle can't be empty")
byte_key = bytes(passphrase, "utf-8") if passphrase else b""
key_buffer = ffi.new("char[]", byte_key)
pickle_buffer = ffi.new("char[]", pickle)
obj = cls.__new__(cls)
ret = lib.olm_unpickle_account(obj._account, key_buffer, len(byte_key),
pickle_buffer, len(pickle))
obj._check_error(ret)
return obj
@property
def identity_keys(self):
# type: () -> Dict[str, str]
"""dict: Public part of the identity keys of the account."""
out_length = lib.olm_account_identity_keys_length(self._account)
out_buffer = ffi.new("char[]", out_length)
self._check_error(
lib.olm_account_identity_keys(self._account, out_buffer,
out_length))
return json.loads(ffi.unpack(out_buffer, out_length).decode("utf-8"))
def sign(self, message):
# type: (AnyStr) -> str
"""Signs a message with this account.
Signs a message with the private ed25519 identity key of this account.
Returns the signature.
Raises OlmAccountError on failure.
Args:
message(str): The message to sign.
"""
bytes_message = to_bytes(message)
out_length = lib.olm_account_signature_length(self._account)
message_buffer = ffi.new("char[]", bytes_message)
out_buffer = ffi.new("char[]", out_length)
self._check_error(
lib.olm_account_sign(self._account, message_buffer,
len(bytes_message), out_buffer, out_length))
return bytes_to_native_str(ffi.unpack(out_buffer, out_length))
@property
def max_one_time_keys(self):
# type: () -> int
"""int: The maximum number of one-time keys the account can store."""
return lib.olm_account_max_number_of_one_time_keys(self._account)
def mark_keys_as_published(self):
# type: () -> None
"""Mark the current set of one-time keys as being published."""
lib.olm_account_mark_keys_as_published(self._account)
def generate_one_time_keys(self, count):
# type: (int) -> None
"""Generate a number of new one-time keys.
If the total number of keys stored by this account exceeds
max_one_time_keys() then the old keys are discarded.
Raises OlmAccountError on error. If the number of random bytes is
too small then the error message of the exception will be
NOT_ENOUGH_RANDOM.
Args:
count(int): The number of keys to generate.
"""
random_length = lib.olm_account_generate_one_time_keys_random_length(
self._account, count)
random = URANDOM(random_length)
random_buffer = ffi.new("char[]", random)
self._check_error(
lib.olm_account_generate_one_time_keys(
self._account, count, random_buffer, random_length))
@property
def one_time_keys(self):
# type: () -> Dict[str, Dict[str, str]]
"""dict: The public part of the one-time keys for this account."""
out_length = lib.olm_account_one_time_keys_length(self._account)
out_buffer = ffi.new("char[]", out_length)
self._check_error(
lib.olm_account_one_time_keys(self._account, out_buffer,
out_length))
return json.loads(ffi.unpack(out_buffer, out_length).decode("utf-8"))
def remove_one_time_keys(self, session):
# type: (Session) -> None
"""Remove used one-time keys.
Removes the one-time keys that the session used from the account.
Raises OlmAccountError on failure. If the account doesn't have any
matching one-time keys then the error message of the exception will be
"BAD_MESSAGE_KEY_ID".
Args:
session(Session): An Olm Session object that was created with this
account.
"""
self._check_error(lib.olm_remove_one_time_keys(self._account,
session._session))

467
python/olm/group_session.py Normal file
View file

@ -0,0 +1,467 @@
# -*- coding: utf-8 -*-
# libolm python bindings
# Copyright © 2015-2017 OpenMarket Ltd
# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
"""libolm Group session module.
This module contains the group session part of the Olm library. It contains two
classes for creating inbound and outbound group sessions.
Examples:
>>> outbound = OutboundGroupSession()
>>> InboundGroupSession(outbound.session_key)
"""
# pylint: disable=redefined-builtin,unused-import
from builtins import bytes, super
from typing import AnyStr, Optional, Tuple, Type
from future.utils import bytes_to_native_str
# pylint: disable=no-name-in-module
from _libolm import ffi, lib # type: ignore
from ._compat import URANDOM, to_bytes
from ._finalize import track_for_finalization
def _clear_inbound_group_session(session):
# type: (ffi.cdata) -> None
lib.olm_clear_inbound_group_session(session)
def _clear_outbound_group_session(session):
# type: (ffi.cdata) -> None
lib.olm_clear_outbound_group_session(session)
class OlmGroupSessionError(Exception):
"""libolm Group session error exception."""
class InboundGroupSession(object):
"""Inbound group session for encrypted multiuser communication."""
def __new__(
cls, # type: Type[InboundGroupSession]
session_key=None # type: Optional[str]
):
# type: (...) -> InboundGroupSession
obj = super().__new__(cls)
obj._buf = ffi.new("char[]", lib.olm_inbound_group_session_size())
obj._session = lib.olm_inbound_group_session(obj._buf)
track_for_finalization(obj, obj._session, _clear_inbound_group_session)
return obj
def __init__(self, session_key):
# type: (AnyStr) -> None
"""Create a new inbound group session.
Start a new inbound group session, from a key exported from
an outbound group session.
Raises OlmGroupSessionError on failure. The error message of the
exception will be "OLM_INVALID_BASE64" if the session key is not valid
base64 and "OLM_BAD_SESSION_KEY" if the session key is invalid.
"""
if False: # pragma: no cover
self._session = self._session # type: ffi.cdata
byte_session_key = to_bytes(session_key)
key_buffer = ffi.new("char[]", byte_session_key)
ret = lib.olm_init_inbound_group_session(
self._session, key_buffer, len(byte_session_key)
)
self._check_error(ret)
def pickle(self, passphrase=""):
# type: (Optional[str]) -> bytes
"""Store an inbound group session.
Stores a group session as a base64 string. Encrypts the session using
the supplied passphrase. Returns a byte object containing the base64
encoded string of the pickled session.
Args:
passphrase(str, optional): The passphrase to be used to encrypt
the session.
"""
byte_passphrase = bytes(passphrase, "utf-8") if passphrase else b""
passphrase_buffer = ffi.new("char[]", byte_passphrase)
pickle_length = lib.olm_pickle_inbound_group_session_length(
self._session)
pickle_buffer = ffi.new("char[]", pickle_length)
ret = lib.olm_pickle_inbound_group_session(
self._session, passphrase_buffer, len(byte_passphrase),
pickle_buffer, pickle_length
)
self._check_error(ret)
return ffi.unpack(pickle_buffer, pickle_length)
@classmethod
def from_pickle(cls, pickle, passphrase=""):
# type: (bytes, Optional[str]) -> InboundGroupSession
"""Load a previously stored inbound group session.
Loads an inbound group session from a pickled base64 string and returns
an InboundGroupSession object. Decrypts the session using the supplied
passphrase. Raises OlmSessionError on failure. If the passphrase
doesn't match the one used to encrypt the session then the error
message for the exception will be "BAD_ACCOUNT_KEY". If the base64
couldn't be decoded then the error message will be "INVALID_BASE64".
Args:
pickle(bytes): Base64 encoded byte string containing the pickled
session
passphrase(str, optional): The passphrase used to encrypt the
session
"""
if not pickle:
raise ValueError("Pickle can't be empty")
byte_passphrase = bytes(passphrase, "utf-8") if passphrase else b""
passphrase_buffer = ffi.new("char[]", byte_passphrase)
pickle_buffer = ffi.new("char[]", pickle)
obj = cls.__new__(cls)
ret = lib.olm_unpickle_inbound_group_session(
obj._session,
passphrase_buffer,
len(byte_passphrase),
pickle_buffer,
len(pickle)
)
obj._check_error(ret)
return obj
def _check_error(self, ret):
# type: (int) -> None
if ret != lib.olm_error():
return
last_error = bytes_to_native_str(ffi.string(
lib.olm_inbound_group_session_last_error(self._session)))
raise OlmGroupSessionError(last_error)
def decrypt(self, ciphertext):
# type: (AnyStr) -> Tuple[str, int]
"""Decrypt a message
Returns a tuple of the decrypted plain-text and the message index of
the decrypted message or raises OlmGroupSessionError on failure.
On failure the error message of the exception will be:
* OLM_INVALID_BASE64 if the message is not valid base64
* OLM_BAD_MESSAGE_VERSION if the message was encrypted with an
unsupported version of the protocol
* OLM_BAD_MESSAGE_FORMAT if the message headers could not be
decoded
* OLM_BAD_MESSAGE_MAC if the message could not be verified
* OLM_UNKNOWN_MESSAGE_INDEX if we do not have a session key
corresponding to the message's index (i.e., it was sent before
the session key was shared with us)
Args:
ciphertext(str): Base64 encoded ciphertext containing the encrypted
message
"""
if not ciphertext:
raise ValueError("Ciphertext can't be empty.")
byte_ciphertext = to_bytes(ciphertext)
ciphertext_buffer = ffi.new("char[]", byte_ciphertext)
max_plaintext_length = lib.olm_group_decrypt_max_plaintext_length(
self._session, ciphertext_buffer, len(byte_ciphertext)
)
plaintext_buffer = ffi.new("char[]", max_plaintext_length)
ciphertext_buffer = ffi.new("char[]", byte_ciphertext)
message_index = ffi.new("uint32_t*")
plaintext_length = lib.olm_group_decrypt(
self._session, ciphertext_buffer, len(byte_ciphertext),
plaintext_buffer, max_plaintext_length,
message_index
)
self._check_error(plaintext_length)
return bytes_to_native_str(ffi.unpack(
plaintext_buffer,
plaintext_length
)), message_index[0]
@property
def id(self):
# type: () -> str
"""str: A base64 encoded identifier for this session."""
id_length = lib.olm_inbound_group_session_id_length(self._session)
id_buffer = ffi.new("char[]", id_length)
ret = lib.olm_inbound_group_session_id(
self._session,
id_buffer,
id_length
)
self._check_error(ret)
return bytes_to_native_str(ffi.unpack(id_buffer, id_length))
@property
def first_known_index(self):
# type: () -> int
"""int: The first message index we know how to decrypt."""
return lib.olm_inbound_group_session_first_known_index(self._session)
def export_session(self, message_index):
# type: (int) -> str
"""Export an inbound group session
Export the base64-encoded ratchet key for this session, at the given
index, in a format which can be used by import_session().
Raises OlmGroupSessionError on failure. The error message for the
exception will be:
* OLM_UNKNOWN_MESSAGE_INDEX if we do not have a session key
corresponding to the given index (ie, it was sent before the
session key was shared with us)
Args:
message_index(int): The message index at which the session should
be exported.
"""
export_length = lib.olm_export_inbound_group_session_length(
self._session)
export_buffer = ffi.new("char[]", export_length)
ret = lib.olm_export_inbound_group_session(
self._session,
export_buffer,
export_length,
message_index
)
self._check_error(ret)
return bytes_to_native_str(ffi.unpack(export_buffer, export_length))
@classmethod
def import_session(cls, session_key):
# type: (AnyStr) -> InboundGroupSession
"""Create an InboundGroupSession from an exported session key.
Creates an InboundGroupSession with an previously exported session key,
raises OlmGroupSessionError on failure. The error message for the
exception will be:
* OLM_INVALID_BASE64 if the session_key is not valid base64
* OLM_BAD_SESSION_KEY if the session_key is invalid
Args:
session_key(str): The exported session key with which the inbound
group session will be created
"""
obj = cls.__new__(cls)
byte_session_key = to_bytes(session_key)
key_buffer = ffi.new("char[]", byte_session_key)
ret = lib.olm_import_inbound_group_session(
obj._session,
key_buffer,
len(byte_session_key)
)
obj._check_error(ret)
return obj
class OutboundGroupSession(object):
"""Outbound group session for encrypted multiuser communication."""
def __new__(cls):
# type: (Type[OutboundGroupSession]) -> OutboundGroupSession
obj = super().__new__(cls)
obj._buf = ffi.new("char[]", lib.olm_outbound_group_session_size())
obj._session = lib.olm_outbound_group_session(obj._buf)
track_for_finalization(
obj,
obj._session,
_clear_outbound_group_session
)
return obj
def __init__(self):
# type: () -> None
"""Create a new outbound group session.
Start a new outbound group session. Raises OlmGroupSessionError on
failure. If there weren't enough random bytes for the session creation
the error message for the exception will be NOT_ENOUGH_RANDOM.
"""
if False: # pragma: no cover
self._session = self._session # type: ffi.cdata
random_length = lib.olm_init_outbound_group_session_random_length(
self._session
)
random = URANDOM(random_length)
random_buffer = ffi.new("char[]", random)
ret = lib.olm_init_outbound_group_session(
self._session, random_buffer, random_length
)
self._check_error(ret)
def _check_error(self, ret):
# type: (int) -> None
if ret != lib.olm_error():
return
last_error = bytes_to_native_str(ffi.string(
lib.olm_outbound_group_session_last_error(self._session)
))
raise OlmGroupSessionError(last_error)
def pickle(self, passphrase=""):
# type: (Optional[str]) -> bytes
"""Store an outbound group session.
Stores a group session as a base64 string. Encrypts the session using
the supplied passphrase. Returns a byte object containing the base64
encoded string of the pickled session.
Args:
passphrase(str, optional): The passphrase to be used to encrypt
the session.
"""
byte_passphrase = bytes(passphrase, "utf-8") if passphrase else b""
passphrase_buffer = ffi.new("char[]", byte_passphrase)
pickle_length = lib.olm_pickle_outbound_group_session_length(
self._session)
pickle_buffer = ffi.new("char[]", pickle_length)
ret = lib.olm_pickle_outbound_group_session(
self._session, passphrase_buffer, len(byte_passphrase),
pickle_buffer, pickle_length
)
self._check_error(ret)
return ffi.unpack(pickle_buffer, pickle_length)
@classmethod
def from_pickle(cls, pickle, passphrase=""):
# type: (bytes, Optional[str]) -> OutboundGroupSession
"""Load a previously stored outbound group session.
Loads an outbound group session from a pickled base64 string and
returns an OutboundGroupSession object. Decrypts the session using the
supplied passphrase. Raises OlmSessionError on failure. If the
passphrase doesn't match the one used to encrypt the session then the
error message for the exception will be "BAD_ACCOUNT_KEY". If the
base64 couldn't be decoded then the error message will be
"INVALID_BASE64".
Args:
pickle(bytes): Base64 encoded byte string containing the pickled
session
passphrase(str, optional): The passphrase used to encrypt the
"""
if not pickle:
raise ValueError("Pickle can't be empty")
byte_passphrase = bytes(passphrase, "utf-8") if passphrase else b""
passphrase_buffer = ffi.new("char[]", byte_passphrase)
pickle_buffer = ffi.new("char[]", pickle)
obj = cls.__new__(cls)
ret = lib.olm_unpickle_outbound_group_session(
obj._session,
passphrase_buffer,
len(byte_passphrase),
pickle_buffer,
len(pickle)
)
obj._check_error(ret)
return obj
def encrypt(self, plaintext):
# type: (AnyStr) -> str
"""Encrypt a message.
Returns the encrypted ciphertext.
Args:
plaintext(str): A string that will be encrypted using the group
session.
"""
byte_plaintext = to_bytes(plaintext)
message_length = lib.olm_group_encrypt_message_length(
self._session, len(byte_plaintext)
)
message_buffer = ffi.new("char[]", message_length)
plaintext_buffer = ffi.new("char[]", byte_plaintext)
ret = lib.olm_group_encrypt(
self._session,
plaintext_buffer, len(byte_plaintext),
message_buffer, message_length,
)
self._check_error(ret)
return bytes_to_native_str(ffi.unpack(message_buffer, message_length))
@property
def id(self):
# type: () -> str
"""str: A base64 encoded identifier for this session."""
id_length = lib.olm_outbound_group_session_id_length(self._session)
id_buffer = ffi.new("char[]", id_length)
ret = lib.olm_outbound_group_session_id(
self._session,
id_buffer,
id_length
)
self._check_error(ret)
return bytes_to_native_str(ffi.unpack(id_buffer, id_length))
@property
def message_index(self):
# type: () -> int
"""int: The current message index of the session.
Each message is encrypted with an increasing index. This is the index
for the next message.
"""
return lib.olm_outbound_group_session_message_index(self._session)
@property
def session_key(self):
# type: () -> str
"""The base64-encoded current ratchet key for this session.
Each message is encrypted with a different ratchet key. This function
returns the ratchet key that will be used for the next message.
"""
key_length = lib.olm_outbound_group_session_key_length(self._session)
key_buffer = ffi.new("char[]", key_length)
ret = lib.olm_outbound_group_session_key(
self._session,
key_buffer,
key_length
)
self._check_error(ret)
return bytes_to_native_str(ffi.unpack(key_buffer, key_length))

452
python/olm/session.py Normal file
View file

@ -0,0 +1,452 @@
# -*- coding: utf-8 -*-
# libolm python bindings
# Copyright © 2015-2017 OpenMarket Ltd
# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
"""libolm Session module.
This module contains the Olm Session part of the Olm library.
It is used to establish a peer-to-peer encrypted communication channel between
two Olm accounts.
Examples:
>>> alice = Account()
>>> bob = Account()
>>> bob.generate_one_time_keys(1)
>>> id_key = bob.identity_keys['curve25519']
>>> one_time = list(bob.one_time_keys["curve25519"].values())[0]
>>> session = OutboundSession(alice, id_key, one_time)
"""
# pylint: disable=redefined-builtin,unused-import
from builtins import bytes, super
from typing import AnyStr, Optional, Type
from future.utils import bytes_to_native_str
# pylint: disable=no-name-in-module
from _libolm import ffi, lib # type: ignore
from ._compat import URANDOM, to_bytes
from ._finalize import track_for_finalization
# This is imported only for type checking purposes
if False:
from .account import Account # pragma: no cover
class OlmSessionError(Exception):
"""libolm Session exception."""
class _OlmMessage(object):
def __init__(self, ciphertext, message_type):
# type: (AnyStr, ffi.cdata) -> None
if not ciphertext:
raise ValueError("Ciphertext can't be empty")
# I don't know why mypy wants a type annotation here nor why AnyStr
# doesn't work
self.ciphertext = ciphertext # type: ignore
self.message_type = message_type
def __str__(self):
# type: () -> str
type_to_prefix = {
lib.OLM_MESSAGE_TYPE_PRE_KEY: "PRE_KEY",
lib.OLM_MESSAGE_TYPE_MESSAGE: "MESSAGE"
}
prefix = type_to_prefix[self.message_type]
return "{} {}".format(prefix, self.ciphertext)
class OlmPreKeyMessage(_OlmMessage):
"""Olm prekey message class
Prekey messages are used to establish an Olm session. After the first
message exchange the session switches to normal messages
"""
def __init__(self, ciphertext):
# type: (AnyStr) -> None
"""Create a new Olm prekey message with the supplied ciphertext
Args:
ciphertext(str): The ciphertext of the prekey message.
"""
_OlmMessage.__init__(self, ciphertext, lib.OLM_MESSAGE_TYPE_PRE_KEY)
def __repr__(self):
# type: () -> str
return "OlmPreKeyMessage({})".format(self.ciphertext)
class OlmMessage(_OlmMessage):
"""Olm message class"""
def __init__(self, ciphertext):
# type: (AnyStr) -> None
"""Create a new Olm message with the supplied ciphertext
Args:
ciphertext(str): The ciphertext of the message.
"""
_OlmMessage.__init__(self, ciphertext, lib.OLM_MESSAGE_TYPE_MESSAGE)
def __repr__(self):
# type: () -> str
return "OlmMessage({})".format(self.ciphertext)
def _clear_session(session):
# type: (ffi.cdata) -> None
lib.olm_clear_session(session)
class Session(object):
"""libolm Session class.
This is an abstract class that can't be instantiated except when unpickling
a previously pickled InboundSession or OutboundSession object with
from_pickle.
"""
def __new__(cls):
# type: (Type[Session]) -> Session
obj = super().__new__(cls)
obj._buf = ffi.new("char[]", lib.olm_session_size())
obj._session = lib.olm_session(obj._buf)
track_for_finalization(obj, obj._session, _clear_session)
return obj
def __init__(self):
# type: () -> None
if type(self) is Session:
raise TypeError("Session class may not be instantiated.")
if False:
self._session = self._session # type: ffi.cdata
def _check_error(self, ret):
# type: (int) -> None
if ret != lib.olm_error():
return
last_error = bytes_to_native_str(
ffi.string(lib.olm_session_last_error(self._session)))
raise OlmSessionError(last_error)
def pickle(self, passphrase=""):
# type: (Optional[str]) -> bytes
"""Store an Olm session.
Stores a session as a base64 string. Encrypts the session using the
supplied passphrase. Returns a byte object containing the base64
encoded string of the pickled session. Raises OlmSessionError on
failure.
Args:
passphrase(str, optional): The passphrase to be used to encrypt
the session.
"""
byte_key = bytes(passphrase, "utf-8") if passphrase else b""
key_buffer = ffi.new("char[]", byte_key)
pickle_length = lib.olm_pickle_session_length(self._session)
pickle_buffer = ffi.new("char[]", pickle_length)
self._check_error(
lib.olm_pickle_session(self._session, key_buffer, len(byte_key),
pickle_buffer, pickle_length))
return ffi.unpack(pickle_buffer, pickle_length)
@classmethod
def from_pickle(cls, pickle, passphrase=""):
# type: (bytes, Optional[str]) -> Session
"""Load a previously stored Olm session.
Loads a session from a pickled base64 string and returns a Session
object. Decrypts the session using the supplied passphrase. Raises
OlmSessionError on failure. If the passphrase doesn't match the one
used to encrypt the session then the error message for the
exception will be "BAD_ACCOUNT_KEY". If the base64 couldn't be decoded
then the error message will be "INVALID_BASE64".
Args:
pickle(bytes): Base64 encoded byte string containing the pickled
session
passphrase(str, optional): The passphrase used to encrypt the
session.
"""
if not pickle:
raise ValueError("Pickle can't be empty")
byte_key = bytes(passphrase, "utf-8") if passphrase else b""
key_buffer = ffi.new("char[]", byte_key)
pickle_buffer = ffi.new("char[]", pickle)
session = cls.__new__(cls)
ret = lib.olm_unpickle_session(session._session, key_buffer,
len(byte_key), pickle_buffer,
len(pickle))
session._check_error(ret)
return session
def encrypt(self, plaintext):
# type: (AnyStr) -> _OlmMessage
"""Encrypts a message using the session. Returns the ciphertext as a
base64 encoded string on success. Raises OlmSessionError on failure. If
there weren't enough random bytes to encrypt the message the error
message for the exception will be NOT_ENOUGH_RANDOM.
Args:
plaintext(str): The plaintext message that will be encrypted.
"""
byte_plaintext = to_bytes(plaintext)
r_length = lib.olm_encrypt_random_length(self._session)
random = URANDOM(r_length)
random_buffer = ffi.new("char[]", random)
message_type = lib.olm_encrypt_message_type(self._session)
self._check_error(message_type)
ciphertext_length = lib.olm_encrypt_message_length(
self._session, len(plaintext)
)
ciphertext_buffer = ffi.new("char[]", ciphertext_length)
plaintext_buffer = ffi.new("char[]", byte_plaintext)
self._check_error(lib.olm_encrypt(
self._session,
plaintext_buffer, len(byte_plaintext),
random_buffer, r_length,
ciphertext_buffer, ciphertext_length,
))
if message_type == lib.OLM_MESSAGE_TYPE_PRE_KEY:
return OlmPreKeyMessage(
bytes_to_native_str(ffi.unpack(
ciphertext_buffer,
ciphertext_length
)))
elif message_type == lib.OLM_MESSAGE_TYPE_MESSAGE:
return OlmMessage(
bytes_to_native_str(ffi.unpack(
ciphertext_buffer,
ciphertext_length
)))
else: # pragma: no cover
raise ValueError("Unknown message type")
def decrypt(self, message):
# type: (_OlmMessage) -> str
"""Decrypts a message using the session. Returns the plaintext string
on success. Raises OlmSessionError on failure. If the base64 couldn't
be decoded then the error message will be "INVALID_BASE64". If the
message is for an unsupported version of the protocol the error message
will be "BAD_MESSAGE_VERSION". If the message couldn't be decoded then
the error message will be "BAD_MESSAGE_FORMAT". If the MAC on the
message was invalid then the error message will be "BAD_MESSAGE_MAC".
Args:
message(OlmMessage): The Olm message that will be decrypted. It can
be either an OlmPreKeyMessage or an OlmMessage.
"""
if not message.ciphertext:
raise ValueError("Ciphertext can't be empty")
byte_ciphertext = to_bytes(message.ciphertext)
ciphertext_buffer = ffi.new("char[]", byte_ciphertext)
max_plaintext_length = lib.olm_decrypt_max_plaintext_length(
self._session, message.message_type, ciphertext_buffer,
len(byte_ciphertext)
)
plaintext_buffer = ffi.new("char[]", max_plaintext_length)
ciphertext_buffer = ffi.new("char[]", byte_ciphertext)
plaintext_length = lib.olm_decrypt(
self._session, message.message_type, ciphertext_buffer,
len(byte_ciphertext), plaintext_buffer, max_plaintext_length
)
self._check_error(plaintext_length)
return bytes_to_native_str(
ffi.unpack(plaintext_buffer, plaintext_length))
@property
def id(self):
# type: () -> str
"""str: An identifier for this session. Will be the same for both
ends of the conversation.
"""
id_length = lib.olm_session_id_length(self._session)
id_buffer = ffi.new("char[]", id_length)
self._check_error(
lib.olm_session_id(self._session, id_buffer, id_length)
)
return bytes_to_native_str(ffi.unpack(id_buffer, id_length))
def matches(self, message, identity_key=None):
# type: (OlmPreKeyMessage, Optional[AnyStr]) -> bool
"""Checks if the PRE_KEY message is for this in-bound session.
This can happen if multiple messages are sent to this session before
this session sends a message in reply. Returns True if the session
matches. Returns False if the session does not match. Raises
OlmSessionError on failure. If the base64 couldn't be decoded then the
error message will be "INVALID_BASE64". If the message was for an
unsupported protocol version then the error message will be
"BAD_MESSAGE_VERSION". If the message couldn't be decoded then then the
error message will be * "BAD_MESSAGE_FORMAT".
Args:
message(OlmPreKeyMessage): The Olm prekey message that will checked
if it is intended for this session.
identity_key(str, optional): The identity key of the sender. To
check if the message was also sent using this identity key.
"""
if not isinstance(message, OlmPreKeyMessage):
raise TypeError("Matches can only be called with prekey messages.")
if not message.ciphertext:
raise ValueError("Ciphertext can't be empty")
ret = None
byte_ciphertext = to_bytes(message.ciphertext)
message_buffer = ffi.new("char[]", byte_ciphertext)
if identity_key:
byte_id_key = to_bytes(identity_key)
identity_key_buffer = ffi.new("char[]", byte_id_key)
ret = lib.olm_matches_inbound_session_from(
self._session,
identity_key_buffer, len(byte_id_key),
message_buffer, len(byte_ciphertext)
)
else:
ret = lib.olm_matches_inbound_session(
self._session,
message_buffer, len(byte_ciphertext))
self._check_error(ret)
return bool(ret)
class InboundSession(Session):
"""Inbound Olm session for p2p encrypted communication.
"""
def __new__(cls, account, message, identity_key=None):
# type: (Account, OlmPreKeyMessage, Optional[AnyStr]) -> Session
return super().__new__(cls)
def __init__(self, account, message, identity_key=None):
# type: (Account, OlmPreKeyMessage, Optional[AnyStr]) -> None
"""Create a new inbound Olm session.
Create a new in-bound session for sending/receiving messages from an
incoming prekey message. Raises OlmSessionError on failure. If the
base64 couldn't be decoded then error message will be "INVALID_BASE64".
If the message was for an unsupported protocol version then
the errror message will be "BAD_MESSAGE_VERSION". If the message
couldn't be decoded then then the error message will be
"BAD_MESSAGE_FORMAT". If the message refers to an unknown one-time
key then the error message will be "BAD_MESSAGE_KEY_ID".
Args:
account(Account): The Olm Account that will be used to create this
session.
message(OlmPreKeyMessage): The Olm prekey message that will checked
that will be used to create this session.
identity_key(str, optional): The identity key of the sender. To
check if the message was also sent using this identity key.
"""
if not message.ciphertext:
raise ValueError("Ciphertext can't be empty")
super().__init__()
byte_ciphertext = to_bytes(message.ciphertext)
message_buffer = ffi.new("char[]", byte_ciphertext)
if identity_key:
byte_id_key = to_bytes(identity_key)
identity_key_buffer = ffi.new("char[]", byte_id_key)
self._check_error(lib.olm_create_inbound_session_from(
self._session,
account._account,
identity_key_buffer, len(byte_id_key),
message_buffer, len(byte_ciphertext)
))
else:
self._check_error(lib.olm_create_inbound_session(
self._session,
account._account,
message_buffer, len(byte_ciphertext)
))
class OutboundSession(Session):
"""Outbound Olm session for p2p encrypted communication."""
def __new__(cls, account, identity_key, one_time_key):
# type: (Account, AnyStr, AnyStr) -> Session
return super().__new__(cls)
def __init__(self, account, identity_key, one_time_key):
# type: (Account, AnyStr, AnyStr) -> None
"""Create a new outbound Olm session.
Creates a new outbound session for sending messages to a given
identity key and one-time key.
Raises OlmSessionError on failure. If the keys couldn't be decoded as
base64 then the error message will be "INVALID_BASE64". If there
weren't enough random bytes for the session creation the error message
for the exception will be NOT_ENOUGH_RANDOM.
Args:
account(Account): The Olm Account that will be used to create this
session.
identity_key(str): The identity key of the person with whom we want
to start the session.
one_time_key(str): A one-time key from the person with whom we want
to start the session.
"""
if not identity_key:
raise ValueError("Identity key can't be empty")
if not one_time_key:
raise ValueError("One-time key can't be empty")
super().__init__()
byte_id_key = to_bytes(identity_key)
byte_one_time = to_bytes(one_time_key)
session_random_length = lib.olm_create_outbound_session_random_length(
self._session)
random = URANDOM(session_random_length)
random_buffer = ffi.new("char[]", random)
identity_key_buffer = ffi.new("char[]", byte_id_key)
one_time_key_buffer = ffi.new("char[]", byte_one_time)
self._check_error(lib.olm_create_outbound_session(
self._session,
account._account,
identity_key_buffer, len(byte_id_key),
one_time_key_buffer, len(byte_one_time),
random_buffer, session_random_length
))

91
python/olm/utility.py Normal file
View file

@ -0,0 +1,91 @@
# -*- coding: utf-8 -*-
# libolm python bindings
# Copyright © 2015-2017 OpenMarket Ltd
# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
"""libolm Utility module.
This module contains utilities for olm.
It only contains the ed25519_verify function for signature verification.
Examples:
>>> alice = Account()
>>> message = "Test"
>>> signature = alice.sign(message)
>>> signing_key = alice.identity_keys["ed25519"]
>>> ed25519_verify(signing_key, message, signature)
"""
# pylint: disable=redefined-builtin,unused-import
from typing import AnyStr, Type
# pylint: disable=no-name-in-module
from _libolm import ffi, lib # type: ignore
from ._compat import to_bytes
from ._finalize import track_for_finalization
def _clear_utility(utility): # pragma: no cover
# type: (ffi.cdata) -> None
lib.olm_clear_utility(utility)
class OlmVerifyError(Exception):
"""libolm signature verification exception."""
class _Utility(object):
# pylint: disable=too-few-public-methods
"""libolm Utility class."""
_buf = None
_utility = None
@classmethod
def _allocate(cls):
# type: (Type[_Utility]) -> None
cls._buf = ffi.new("char[]", lib.olm_utility_size())
cls._utility = lib.olm_utility(cls._buf)
track_for_finalization(cls, cls._utility, _clear_utility)
@classmethod
def _check_error(cls, ret):
# type: (int) -> None
if ret != lib.olm_error():
return
raise OlmVerifyError("{}".format(
ffi.string(lib.olm_utility_last_error(
cls._utility)).decode("utf-8")))
@classmethod
def _ed25519_verify(cls, key, message, signature):
# type: (Type[_Utility], AnyStr, AnyStr, AnyStr) -> None
if not cls._utility:
cls._allocate()
byte_key = to_bytes(key)
byte_message = to_bytes(message)
byte_signature = to_bytes(signature)
cls._check_error(
lib.olm_ed25519_verify(cls._utility, byte_key, len(byte_key),
byte_message, len(byte_message),
byte_signature, len(byte_signature)))
def ed25519_verify(key, message, signature):
# type: (AnyStr, AnyStr, AnyStr) -> None
"""Verify an ed25519 signature.
Raises an OlmVerifyError if verification fails.
Args:
key(str): The ed25519 public key used for signing.
message(str): The signed message.
signature(bytes): The message signature.
"""
return _Utility._ed25519_verify(key, message, signature)

40
python/olm_build.py Normal file
View file

@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-
# libolm python bindings
# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
#
# Permission to use, copy, modify, and/or distribute this software for
# any purpose with or without fee is hereby granted, provided that the
# above copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
# SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
from __future__ import unicode_literals
import os
from cffi import FFI
ffibuilder = FFI()
PATH = os.path.dirname(__file__)
ffibuilder.set_source(
"_libolm",
r"""
#include <olm/olm.h>
#include <olm/inbound_group_session.h>
#include <olm/outbound_group_session.h>
""", libraries=["olm"])
with open(os.path.join(PATH, "include/olm/olm.h")) as f:
ffibuilder.cdef(f.read(), override=True)
if __name__ == "__main__":
ffibuilder.compile(verbose=True)

3
python/requirements.txt Normal file
View file

@ -0,0 +1,3 @@
future
cffi
typing

8
python/setup.cfg Normal file
View file

@ -0,0 +1,8 @@
[tool:pytest]
testpaths = tests
flake8-ignore =
olm/*.py F401
tests/*.py W503
[coverage:run]
omit=olm/__version__.py

27
python/setup.py Normal file
View file

@ -0,0 +1,27 @@
# -*- coding: utf-8 -*-
import os
from codecs import open
from setuptools import setup
here = os.path.abspath(os.path.dirname(__file__))
about = {}
with open(os.path.join(here, "olm", "__version__.py"), "r", "utf-8") as f:
exec(f.read(), about)
setup(
name=about["__title__"],
version=about["__version__"],
description=about["__description__"],
author=about["__author__"],
author_email=about["__author_email__"],
url=about["__url__"],
license=about["__license__"],
packages=["olm"],
setup_requires=["cffi>=1.0.0"],
cffi_modules=["olm_build.py:ffibuilder"],
install_requires=["cffi>=1.0.0", "future", "typing"],
zip_safe=False
)

View file

@ -0,0 +1,7 @@
pytest
hypothesis
pytest-flake8
pytest-isort
pytest-cov
pytest-benchmark
aspectlib

View file

@ -0,0 +1,100 @@
from builtins import int
import pytest
from hypothesis import given
from hypothesis.strategies import text
from olm import Account, OlmAccountError, OlmVerifyError, ed25519_verify
from olm._compat import to_bytes
class TestClass(object):
def test_to_bytes(self):
assert isinstance(to_bytes("a"), bytes)
assert isinstance(to_bytes(u"a"), bytes)
assert isinstance(to_bytes(b"a"), bytes)
assert isinstance(to_bytes(r"a"), bytes)
with pytest.raises(TypeError):
to_bytes(0)
def test_account_creation(self):
alice = Account()
assert alice.identity_keys
assert len(alice.identity_keys) == 2
def test_account_pickle(self):
alice = Account()
pickle = alice.pickle()
assert (alice.identity_keys == Account.from_pickle(pickle)
.identity_keys)
def test_invalid_unpickle(self):
with pytest.raises(ValueError):
Account.from_pickle(b"")
def test_passphrase_pickle(self):
alice = Account()
passphrase = "It's a secret to everybody"
pickle = alice.pickle(passphrase)
assert (alice.identity_keys == Account.from_pickle(
pickle, passphrase).identity_keys)
def test_wrong_passphrase_pickle(self):
alice = Account()
passphrase = "It's a secret to everybody"
pickle = alice.pickle(passphrase)
with pytest.raises(OlmAccountError):
Account.from_pickle(pickle, "")
def test_one_time_keys(self):
alice = Account()
alice.generate_one_time_keys(10)
one_time_keys = alice.one_time_keys
assert one_time_keys
assert len(one_time_keys["curve25519"]) == 10
def test_max_one_time_keys(self):
alice = Account()
assert isinstance(alice.max_one_time_keys, int)
def test_publish_one_time_keys(self):
alice = Account()
alice.generate_one_time_keys(10)
one_time_keys = alice.one_time_keys
assert one_time_keys
assert len(one_time_keys["curve25519"]) == 10
alice.mark_keys_as_published()
assert not alice.one_time_keys["curve25519"]
def test_clear(self):
alice = Account()
del alice
@given(text())
def test_valid_signature(self, message):
alice = Account()
signature = alice.sign(message)
signing_key = alice.identity_keys["ed25519"]
assert signature
assert signing_key
ed25519_verify(signing_key, message, signature)
@given(text())
def test_invalid_signature(self, message):
alice = Account()
bob = Account()
signature = alice.sign(message)
signing_key = bob.identity_keys["ed25519"]
assert signature
assert signing_key
with pytest.raises(OlmVerifyError):
ed25519_verify(signing_key, message, signature)

View file

@ -0,0 +1,114 @@
import pytest
from olm import InboundGroupSession, OlmGroupSessionError, OutboundGroupSession
class TestClass(object):
def test_session_create(self):
OutboundGroupSession()
def test_session_id(self):
session = OutboundGroupSession()
assert isinstance(session.id, str)
def test_session_index(self):
session = OutboundGroupSession()
assert isinstance(session.message_index, int)
assert session.message_index == 0
def test_outbound_pickle(self):
session = OutboundGroupSession()
pickle = session.pickle()
assert (session.id == OutboundGroupSession.from_pickle(
pickle).id)
def test_invalid_unpickle(self):
with pytest.raises(ValueError):
OutboundGroupSession.from_pickle(b"")
with pytest.raises(ValueError):
InboundGroupSession.from_pickle(b"")
def test_inbound_create(self):
outbound = OutboundGroupSession()
InboundGroupSession(outbound.session_key)
def test_invalid_decrypt(self):
outbound = OutboundGroupSession()
inbound = InboundGroupSession(outbound.session_key)
with pytest.raises(ValueError):
inbound.decrypt("")
def test_inbound_pickle(self):
outbound = OutboundGroupSession()
inbound = InboundGroupSession(outbound.session_key)
pickle = inbound.pickle()
InboundGroupSession.from_pickle(pickle)
def test_inbound_export(self):
outbound = OutboundGroupSession()
inbound = InboundGroupSession(outbound.session_key)
imported = InboundGroupSession.import_session(
inbound.export_session(inbound.first_known_index)
)
assert "Test", 0 == imported.decrypt(outbound.encrypt("Test"))
def test_first_index(self):
outbound = OutboundGroupSession()
inbound = InboundGroupSession(outbound.session_key)
index = inbound.first_known_index
assert isinstance(index, int)
def test_encrypt(self, benchmark):
benchmark.weave(OutboundGroupSession.encrypt, lazy=True)
outbound = OutboundGroupSession()
inbound = InboundGroupSession(outbound.session_key)
assert "Test", 0 == inbound.decrypt(outbound.encrypt("Test"))
def test_decrypt(self, benchmark):
benchmark.weave(InboundGroupSession.decrypt, lazy=True)
outbound = OutboundGroupSession()
inbound = InboundGroupSession(outbound.session_key)
assert "Test", 0 == inbound.decrypt(outbound.encrypt("Test"))
def test_decrypt_twice(self):
outbound = OutboundGroupSession()
inbound = InboundGroupSession(outbound.session_key)
outbound.encrypt("Test 1")
message, index = inbound.decrypt(outbound.encrypt("Test 2"))
assert isinstance(index, int)
assert ("Test 2", 1) == (message, index)
def test_decrypt_failure(self):
outbound = OutboundGroupSession()
inbound = InboundGroupSession(outbound.session_key)
eve_outbound = OutboundGroupSession()
with pytest.raises(OlmGroupSessionError):
inbound.decrypt(eve_outbound.encrypt("Test"))
def test_id(self):
outbound = OutboundGroupSession()
inbound = InboundGroupSession(outbound.session_key)
assert outbound.id == inbound.id
def test_inbound_fail(self):
with pytest.raises(TypeError):
InboundGroupSession()
def test_oubtound_pickle_fail(self):
outbound = OutboundGroupSession()
pickle = outbound.pickle("Test")
with pytest.raises(OlmGroupSessionError):
OutboundGroupSession.from_pickle(pickle)
def test_outbound_clear(self):
session = OutboundGroupSession()
del session
def test_inbound_clear(self):
outbound = OutboundGroupSession()
inbound = InboundGroupSession(outbound.session_key)
del inbound

View file

@ -0,0 +1,143 @@
import pytest
from olm import (Account, InboundSession, OlmMessage, OlmPreKeyMessage,
OlmSessionError, OutboundSession, Session)
class TestClass(object):
def _create_session(self):
alice = Account()
bob = Account()
bob.generate_one_time_keys(1)
id_key = bob.identity_keys["curve25519"]
one_time = list(bob.one_time_keys["curve25519"].values())[0]
session = OutboundSession(alice, id_key, one_time)
return alice, bob, session
def test_session_create(self):
_, _, session_1 = self._create_session()
_, _, session_2 = self._create_session()
assert session_1
assert session_2
assert session_1.id != session_2.id
assert isinstance(session_1.id, str)
def test_session_clear(self):
_, _, session = self._create_session()
del session
def test_invalid_session_create(self):
with pytest.raises(TypeError):
Session()
def test_session_pickle(self):
alice, bob, session = self._create_session()
Session.from_pickle(session.pickle()).id == session.id
def test_session_invalid_pickle(self):
with pytest.raises(ValueError):
Session.from_pickle(b"")
def test_wrong_passphrase_pickle(self):
alice, bob, session = self._create_session()
passphrase = "It's a secret to everybody"
pickle = alice.pickle(passphrase)
with pytest.raises(OlmSessionError):
Session.from_pickle(pickle, "")
def test_encrypt(self):
plaintext = "It's a secret to everybody"
alice, bob, session = self._create_session()
message = session.encrypt(plaintext)
assert (repr(message)
== "OlmPreKeyMessage({})".format(message.ciphertext))
assert (str(message)
== "PRE_KEY {}".format(message.ciphertext))
bob_session = InboundSession(bob, message)
assert plaintext == bob_session.decrypt(message)
def test_empty_message(self):
with pytest.raises(ValueError):
OlmPreKeyMessage("")
empty = OlmPreKeyMessage("x")
empty.ciphertext = ""
alice, bob, session = self._create_session()
with pytest.raises(ValueError):
session.decrypt(empty)
def test_inbound_with_id(self):
plaintext = "It's a secret to everybody"
alice, bob, session = self._create_session()
message = session.encrypt(plaintext)
alice_id = alice.identity_keys["curve25519"]
bob_session = InboundSession(bob, message, alice_id)
assert plaintext == bob_session.decrypt(message)
def test_two_messages(self):
plaintext = "It's a secret to everybody"
alice, bob, session = self._create_session()
message = session.encrypt(plaintext)
alice_id = alice.identity_keys["curve25519"]
bob_session = InboundSession(bob, message, alice_id)
bob.remove_one_time_keys(bob_session)
assert plaintext == bob_session.decrypt(message)
bob_plaintext = "Grumble, Grumble"
bob_message = bob_session.encrypt(bob_plaintext)
assert (repr(bob_message)
== "OlmMessage({})".format(bob_message.ciphertext))
assert bob_plaintext == session.decrypt(bob_message)
def test_matches(self):
plaintext = "It's a secret to everybody"
alice, bob, session = self._create_session()
message = session.encrypt(plaintext)
alice_id = alice.identity_keys["curve25519"]
bob_session = InboundSession(bob, message, alice_id)
assert plaintext == bob_session.decrypt(message)
message_2nd = session.encrypt("Hey! Listen!")
assert bob_session.matches(message_2nd) is True
assert bob_session.matches(message_2nd, alice_id) is True
def test_invalid(self):
alice, bob, session = self._create_session()
message = OlmMessage("x")
with pytest.raises(TypeError):
session.matches(message)
message = OlmPreKeyMessage("x")
message.ciphertext = ""
with pytest.raises(ValueError):
session.matches(message)
with pytest.raises(ValueError):
InboundSession(bob, message)
with pytest.raises(ValueError):
OutboundSession(alice, "", "x")
with pytest.raises(ValueError):
OutboundSession(alice, "x", "")
def test_doesnt_match(self):
plaintext = "It's a secret to everybody"
alice, bob, session = self._create_session()
message = session.encrypt(plaintext)
alice_id = alice.identity_keys["curve25519"]
bob_session = InboundSession(bob, message, alice_id)
_, _, new_session = self._create_session()
new_message = new_session.encrypt(plaintext)
assert bob_session.matches(new_message) is False

43
python/tox.ini Normal file
View file

@ -0,0 +1,43 @@
# content of: tox.ini , put in same dir as setup.py
[tox]
envlist = py27,py36,pypy,{py2,py3}-cov,coverage
[testenv]
basepython =
py27: python2.7
py36: python3.6
pypy: pypy
py2: python2.7
py3: python3.6
deps = -rrequirements.txt
-rtest-requirements.txt
passenv = TOXENV CI TRAVIS TRAVIS_*
commands = pytest --benchmark-disable
usedevelop = True
[testenv:py2-cov]
commands =
pytest --cov-report term-missing --cov=olm --benchmark-disable --cov-branch
setenv =
COVERAGE_FILE=.coverage.py2
[testenv:py3-cov]
commands =
py.test --cov=olm --cov-report term-missing --benchmark-disable --cov-branch
setenv =
COVERAGE_FILE=.coverage.py3
[testenv:coverage]
basepython = python3.6
commands =
coverage erase
coverage combine
coverage xml
coverage report --show-missing
codecov -e TOXENV
deps =
coverage
codecov>=1.4.0
setenv =
COVERAGE_FILE=.coverage