Subversion Repositories pub

Compare Revisions

No changes between revisions

Ignore whitespace Rev 617 → Rev 618

/relevation/branches/1.3.1/src/relevation/__main__.py
0,0 → 1,688
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
'''
Relevation Password Printer
a command line interface to Revelation Password Manager.
 
Code based on Revelation's former BTS (no longer online, not archived?):
(ref1) code:
http://oss.wired-networks.net/bugzilla/attachment.cgi?id=13&action=view
(ref2) bug report:
http://oss.wired-networks.net/bugzilla/show_bug.cgi?id=111
-> http://web.archive.org/http://oss.wired-networks.net/bugzilla/show_bug.cgi?id=111
(ref3) http://docs.python.org/library/zlib.html
(ref4) http://pymotw.com/2/getpass/
 
$Id$
'''
# Relevation Password Printer
#
# Copyright (c) 2011,2012,2013,2014 Toni Corvera
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
 
__date__ = '$Date$'
__revision__ = '$Rev$'
 
import ConfigParser
import getopt
import getpass
from lxml import etree
import locale
import os
import stat
import string
import sys
import zlib
# Help py2exe in packaging lxml
# <http://www.py2exe.org/index.cgi/WorkingWithVariousPackagesAndModules>
import lxml._elementpath as _dummy
import gzip # py2exe again
import hashlib # required by newer format
# PBKDF2 stolen from Revelation
from relevation import PBKDF2, __version__, __author__, RELEASE
 
USE_PYCRYPTO = True
 
try:
from Crypto.Cipher import AES
except ImportError:
USE_PYCRYPTO = False
try:
from crypto.cipher import rijndael, cbc
from crypto.cipher.base import noPadding
except ImportError:
sys.stderr.write('Either PyCrypto or cryptopy is required\n')
raise
 
# These are pseudo-standardized exit codes, in Linux (*NIX?) they are defined
#+in the header </usr/include/sysexits.h> and available as properties of 'os'
#+In windows they aren't defined at all
 
if 'EX_OK' not in dir(os):
# If not defined set them manually
codes = { 'EX_OK': 0, 'EX_USAGE': 64, 'EX_DATAERR': 65,
'EX_NOINPUT': 66, 'EX_SOFTWARE': 70, 'EX_IOERR': 74,
}
for (k,v) in codes.items():
setattr(os, k, v)
del codes, k, v
 
TAGNAMES ={ 'generic-url': 'Url:',
'generic-username': 'Username:',
'generic-password': 'Password:',
'generic-email': 'Email:',
'generic-hostname': 'Hostname:',
'generic-location': 'Location:',
'generic-code': 'Code:',
'generic-certificate': 'Certificate:',
'generic-database': 'Database:',
'generic-domain': 'Domain:',
'generic-keyfile': 'Key file:',
'generic-pin': 'PIN',
'generic-port': 'Port'
}
MODE_AND='and'
MODE_OR='or'
 
# Errors
class RlvError(Exception):
def __str__(self):
return self.msg
class DecryptError(RlvError):
exitCode = os.EX_DATAERR
def __init__(self, msg = 'Failed to decrypt data. Wrong password?'):
self.msg = msg
class DecompressError(RlvError):
exitCode = os.EX_DATAERR
def __init__(self, msg = 'Failed to decompress data.'):
self.msg = msg
class DataFormatError(RlvError):
exitCode = os.EX_DATAERR
def __init__(self, msg = 'Incorrect data format'):
self.msg = msg
class DataVersionError(RlvError):
exitCode = os.EX_DATAERR
def __init__(self, msg = 'Data format version not supported'):
self.msg = msg
 
def printe(s):
' Print to stderr '
sys.stderr.write(s+'\n')
 
def usage(channel):
' Print help message '
def p(s):
channel.write(s)
p('%s {-f passwordfile} {-p password | -0} [search] [search2] [...]\n' % sys.argv[0])
p('\nOptions:\n')
# Reference: 80 characters
# -------------------------------------------------------------------------------
p(' -f FILE, --file=FILE Revelation password file.\n')
p(' -p PASS, --password=PASS Master password.\n')
p(' -s SEARCH, --search=SEARCH Search for string.\n')
p(' -i, --case-insensitive Case insensitive search (default).\n')
p(' -c, --case-sensitive Case sensitive search.\n')
p(' -a, --ask Interactively ask for password.\n')
p(' Note it will be displayed in clear as you\n')
p(' type it.\n')
p(' -t TYPE, --type=TYPE Print only entries of type TYPE.\n')
p(' With no search string, prints all entries of\n')
p(' type TYPE.\n')
p(' -A, --and When multiple search terms are used, use an AND\n')
p(' operator to combine them.\n')
p(' -O, --or When multiple search terms are used, use an OR\n')
p(' operator to combine them.\n')
p(' -x, --xml Dump unencrypted XML document.\n')
p(' -0, --stdin Read password from standard input.\n')
p(' -h, --help Print help (this message).\n')
p(' --version Print the program\'s version information.\n')
p('\n')
 
def make_xpath_query(search_text=None, type_filter=None, ignore_case=True, negate_filter=False):
''' Construct the actual XPath expression
make_xpath_query(str, str, bool, bool) -> str
or
make_xpath_query(list, str, bool, bool) -> str
 
Passing a list as the second argument implies combining its elements
in the search (AND)
'''
xpath = '/revelationdata//entry'
if type_filter:
sign = '='
if negate_filter:
sign = '!='
xpath = '%s[@type%s"%s"]' % ( xpath, sign, type_filter )
if type_filter != 'folder':
# Avoid printing folders since all their children are printed
# alongside
xpath += '[@type!="folder"]'
if search_text:
#xpath = xpath + '//text()'
needles = []
if type(search_text) == list:
needles = search_text
else:
# FIXME: Used for OR's
assert type(search_text) == str or type(search_text) == unicode
needles = [ search_text, ]
selector = ''
for search in needles:
if ignore_case:
# must pass lowercase to actually be case insensitive
search = string.lower(search)
# XPath 2.0 has lower-case, upper-case, matches(..., -i) etc.
selector += '//text()[contains(translate(., "ABCDEFGHIJKLMNOPQRSTUVWXYZ", "abcdefghijklmnopqrstuvwxyz"), "%s")]/../..' % search
else:
selector += '//text()[contains(., "%s")]/../..' % search
xpath = '%s%s' % ( xpath, selector )
if not RELEASE:
printe("> Xpath: %s\n" % xpath)
return xpath
 
def dump_all_entries(xmldata):
' Dump all entries from xmldata, with no filter at all '
tree = etree.fromstring(xmldata)
res = tree.xpath('//entry')
return dump_result(res, 'all')
 
def dump_entries(xmldata, search_text=None, type_filter=None, ignore_case=True, negate_filter=False):
''' Dump entries from xmldata that match criteria
dump_entries(str, str, str, bool, bool) -> int
or
dump_entries(str, list, str, bool, bool) -> int
'''
tree = etree.fromstring(xmldata)
xpath = make_xpath_query(search_text, type_filter, ignore_case, negate_filter)
try:
res = tree.xpath(xpath)
except etree.XPathEvalError:
if not RELEASE:
printe('Failed with xpath expression: %s' % xpath)
raise
query_desc = ''
if search_text:
query_desc = '"%s"' % search_text
if type_filter:
neg = ''
if negate_filter:
neg = 'not '
if search_text:
query_desc = '%s (\'%s%s\' entries)' % ( query_desc, neg, type_filter )
else:
query_desc = '%s%s entries' % ( neg, type_filter )
nr = dump_result(res, query_desc)
return nr
 
def dump_single_result(typeName, name, descr, notes, fields):
''' dump_single_result(str, unicode, unicode, list) -> None '''
printe('-------------------------------------------------------------------------------')
s = u'\n'
s += 'Type: %s\n' % typeName
s += 'Name: %s\n' % name
s += 'Description: %s\n' % descr
s += 'Notes: %s\n' % notes
for field in fields:
s += '%s %s\n' % field # field, value
try:
# sys.stdout.encoding will be None if piped
print s.encode(sys.stdout.encoding or locale.getpreferredencoding())
except UnicodeEncodeError:
# E.g. console in ASCII ($ LC_ALL=C relevation)
# TODO: Flag for notification
#printe("WARNING: The console doesn't have a compatible encoding, falling back to UTF-8")
print s.encode('utf8')
 
def dump_result(res, query_desc, dumpfn=dump_single_result):
''' Print query results.
dump_result(list of entries, query description) -> int
'''
def u8(s):
''' Return a unicode version of s, whether it's already a unicode
string or a str encoded in UTF-8
'''
# Note the XML is in UTF-8, and that the extracted fields will be
# either type
if type(s) == unicode:
return s
return unicode(s.decode('utf8'))
 
print '-> Search %s: ' % query_desc,
if not len(res):
print 'No results'
return False
print '%d matches' % len(res)
for x in res:
typeName = x.get('type')
name = None
descr = None
fields = []
notes = None
for chld in x.getchildren():
n = chld.tag
val = chld.text
if val is None:
val = ''
val = u8(val)
if n == 'name':
name = val
elif n == 'description':
descr = val
elif n == 'field':
idv = chld.get('id')
if idv in TAGNAMES:
idv = TAGNAMES[idv]
val = chld.text
if val is None:
val = ''
# Maintain order => list
fields += [ ( idv, val ), ]
elif n == 'notes':
notes = val
dumpfn(typeName, name, descr, notes, fields)
# / for chld in x.children
nr = len(res)
plural = ''
if nr > 1:
plural = 's'
printe('-------------------------------------------------------------------------------')
printe('<- (end of %d result%s for {%s})\n' % ( nr, plural, query_desc ))
return nr
 
def world_readable(path):
' Check if a file is readable by everyone '
assert os.path.exists(path)
if sys.platform == 'win32':
return True
st = os.stat(path)
return bool(st.st_mode & stat.S_IROTH)
 
def load_config():
''' Load configuration file if one is found
load_config() -> ( str file, str pass )
'''
cfg = os.path.join(os.path.expanduser('~'), '.relevation.conf')
pw = None
fl = None
mode = MODE_OR
if os.path.isfile(cfg):
if os.access(cfg, os.R_OK):
wr = world_readable(cfg)
if wr and sys.platform != 'win32':
printe('Configuration (~/.relevation.conf) is world-readable!!!')
parser = ConfigParser.ConfigParser()
parser.read(cfg)
ops = parser.options('relevation')
if 'file' in ops:
fl = os.path.expanduser(parser.get('relevation', 'file'))
if 'password' in ops:
if wr: # TODO: how to check in windows?
printe('Your password can be read by anyone!!!')
pw = parser.get('relevation', 'password')
if 'mode' in ops:
mode = parser.get('relevation', 'mode')
if mode not in [ MODE_AND, MODE_OR ]:
printe('Warning: Unknown mode \'%s\' set in configuration' % mode)
mode=MODE_OR
else: # exists but not readable
printe('Configuration file (~/.relevation.conf) is not readable!')
return ( fl, pw, mode )
 
class _DataReaderBase(object):
' Common methods for reading data files '
def validate_compressed_padding(self, data):
''' Checks that the gzip-compressed 'data' is padded correctly.
validate_compressed_padding(str) -> bool
'''
padlen = ord(data[-1])
for i in data[-padlen:]:
if ord(i) != padlen:
return False
return True
def validate_cipher_length(self, data):
''' Checks that encrypted 'data' has an appropriate length.
validate_cipher_length(str) -> bool
Encrypted data length must be a multiple of 16
'''
return ( len(data) % 16 == 0 )
def _aes_decrypt_ecb(self, key, data):
''' Decrypt AES cipher text in ECB mode
_aes_decrypt_ecb(str, str) -> str
This function will use the underlying, available, cipher module.
'''
if USE_PYCRYPTO:
c = AES.new(key)
cleardata = c.decrypt(data)
else:
c = rijndael.Rijndael(key, keySize=len(key), padding=noPadding())
cleardata = c.decrypt(data)
return cleardata
def _aes_decrypt_cbc(self, key, iv, data):
''' Decrypt AES cipher text in CBC mode
_aes_decrypt_ecb(str, str, str) -> str
This function will use the underlying, available, cipher module.
'''
if USE_PYCRYPTO:
c = AES.new(key, AES.MODE_CBC, iv)
cleardata = c.decrypt(data)
else:
bc = rijndael.Rijndael(key, keySize=len(key), padding=noPadding())
c = cbc.CBC(bc, padding=noPadding())
cleardata = c.decrypt(data, iv=iv)
return cleardata
def get_xml(self, data, password):
''' Extract the XML contents from the encrypted and compressed input.
get_xml(str, str) -> str
'''
pass
 
class DataReaderV1(_DataReaderBase):
''' Data reading for Revelation data files in the original format.
Old format header:
[0:12) 12B header: "rvl" 0x00, 0x01, 0x00
[12:28) 16B ECB encrypted IV (for CBC-encrypted data)
[28:] CBC encrypted data
'''
def _decrypt_compressed_data(self, password, cipher_text):
''' Decrypt cipher_text using password.
_decrypt_compressed_data(str, str) -> cleartext (gzipped xml)
'''
# Minimum length of header
if len(cipher_text) < 28:
raise DataFormatError
# Key <= Padded password
key = password
key += (chr(0) * (32 - len(password)))
# Extract IV
iv = self._aes_decrypt_ecb(key, cipher_text[12:28])
# Skip IV
cipher_text = cipher_text[28:]
# Input strings for decrypt must be a multiple of 16 in length
if not self.validate_cipher_length(cipher_text):
raise DataFormatError
# Decrypt data, CBC mode
return self._aes_decrypt_cbc(key, iv, cipher_text)
 
def get_xml(self, data, password):
# Decrypt. Decrypted data is compressed
cleardata_gz = self._decrypt_compressed_data(password, data)
# Validate padding for decompression
if not self.validate_compressed_padding(cleardata_gz):
raise DataFormatError
# Decompress actual data (15 is wbits [ref3] DON'T CHANGE, 2**15 is the (initial) buf size)
padlen = ord(cleardata_gz[-1])
try:
# Note data is encoded in UTF-8 but not decoded yet (because
# the XML parser is too easy to choke in that case)
# http://lxml.de/parsing.html#python-unicode-strings
return zlib.decompress(cleardata_gz[:-padlen], 15, 2**15)
except zlib.error:
raise DecompressError
 
class DataReaderV2(_DataReaderBase):
''' Data reading for Revelation data files in the new format.
New format header:
[0:12) 12B header: "rvl" 0x00, 0x02, 0x00
[12:20) 8B salt
[20:36) 16B IV (for CBC-encrypted data)
[36:] CBC encrypted data
The encryption key is derived from the password and salt through the
PBKDF2 module.
'''
def _decrypt_compressed_data(self, password, cipher_text):
# Minimum length of header
if len(cipher_text) < 36:
raise DataFormatError
salt = cipher_text[12:20]
iv = cipher_text[20:36]
key = PBKDF2.PBKDF2(password, salt, iterations=12000).read(32)
# Skip encryption header
cipher_text = cipher_text[36:]
if not self.validate_cipher_length(cipher_text):
raise DataFormatError
# Decrypt data (CBC)
decrypted = self._aes_decrypt_cbc(key, iv, cipher_text)
sha256hash = decrypted[0:32]
# Skip hash. decrypted <= Decrypted, Compressed data
decrypted = decrypted[32:]
# Validate hash
if sha256hash != hashlib.sha256(decrypted).digest():
raise DecryptError
return decrypted
 
def get_xml(self, data, password):
# Decrypt...
cleardata_gz = self._decrypt_compressed_data(password, data)
# Validate padding for decompression
if not self.validate_compressed_padding(cleardata_gz):
raise DataFormatError
# Decompress
padlen = ord(cleardata_gz[-1])
try:
return zlib.decompress(cleardata_gz[:-padlen])
except zlib.error:
raise DecompressError
 
class DataReader(object):
''' Interface to read Revelation's data files '''
def __init__(self, filename):
''' Loads file data and checks file format and data version for compatibility
DataReader(str)
 
raises IOError If 'filename' not readable
raises DataFormatError If 'filename' not in Revelation format
raises DataVersionError If filename not in a supported data format version
'''
self._impl = None
self._data = None
self._filename = filename
f = None
try:
if not os.access(filename, os.R_OK):
raise IOError('File \'%s\' not accessible' % filename)
f = open(filename, "rb")
# Encrypted data
self._data = f.read()
finally:
if f:
f.close()
self._check_header()
def _check_header(self):
''' Checks the file header for compatibility.
_check_header() -> None
 
raises DataFormatError If the file isn't a Revelation data file
raises DataVersionError If the data format is in an unsupported version
'''
header = self._data[0:12]
magic = header[0:4]
if magic != "rvl\x00":
raise DataFormatError
data_version = header[4]
app_version = header[6:9]
if data_version == '\x01':
self._impl = DataReaderV1()
elif data_version == '\x02':
self._impl = DataReaderV2()
else:
raise DataVersionError
def get_xml(self, password):
''' Decrypt and decompress file data
get_xml(str) -> str
'''
return self._impl.get_xml(self._data, password)
 
def main(argv):
datafile = None
password = None
# values to search for
needles = []
caseInsensitive = True
# individual search: ( 'value to search', 'type of search', 'type of entry to filter' )
searchTypes = []
dump_xml = False
mode = None
 
printe('Relevation v%s, (c) 2011-2014 Toni Corvera\n' % __version__)
 
# ---------- OPTIONS ---------- #
( datafile, password, mode ) = load_config()
try:
# gnu_getopt requires py >= 2.3
ops, args = getopt.gnu_getopt(argv, 'f:p:s:0ciaht:xAO',
[ 'file=', 'password=', 'search=', 'stdin',
'case-sensitive', 'case-insensitive', 'ask',
'help', 'version', 'type=', 'xml',
'and', 'or' ])
except getopt.GetoptError, err:
print str(err)
usage(sys.stderr)
sys.exit(os.EX_USAGE)
if args:
needles = args
if ( '-h', '' ) in ops or ( '--help', '' ) in ops:
usage(sys.stdout)
sys.exit(os.EX_OK)
if ( '--version', '' ) in ops:
release=''
if not RELEASE:
release=' [DEBUG]'
print 'Relevation version %s%s' % ( __version__, release )
print 'Python version %s' % sys.version
if USE_PYCRYPTO:
import Crypto
print 'PyCrypto version %s' % Crypto.__version__
else:
# AFAIK cryptopy doesn't export version info
print 'cryptopy'
sys.exit(os.EX_OK)
for opt, arg in ops:
if opt in ( '-f', '--file' ):
datafile = arg
elif opt in ( '-p', '--password' ):
password = arg
elif opt in ( '-a', '--ask', '-0', '--stdin' ):
prompt = ''
if opt in ( '-a', '--ask' ):
prompt = 'File password: '
# see [ref4]
if sys.stdin.isatty():
password = getpass.getpass(prompt=prompt, stream=sys.stderr)
else:
# Not a terminal, getpass won't work
password = sys.stdin.readline();
password = password[:-1] # XXX: would .rstrip() be safe enough?
elif opt in ( '-s', '--search' ):
needles.append(arg)
elif opt in ( '-i', '--case-insensitive' ):
caseInsensitive = True
elif opt in ( '-c', '--case-sensitive' ):
caseInsensitive = False
elif opt in ( '-t', '--type' ):
iarg = arg.lower()
neg = False
if iarg.startswith('-'):
iarg = iarg[1:]
neg = True
if not iarg in ( 'creditcard', 'cryptokey', 'database', 'door', 'email',
'folder', 'ftp', 'generic', 'phone', 'shell', 'website' ):
printe('Warning: Type "%s" is not known by relevation.' % arg)
searchTypes.append( ( iarg, neg ) )
elif opt in ( '-x', '--xml' ):
dump_xml = True
elif opt in ( '-A', '--and' ):
mode = MODE_AND
elif opt in ( '-O', '--or' ):
mode = MODE_OR
else:
printe('Unhandled option: %s' % opt)
assert False, "internal error parsing options"
if not datafile or not password:
usage(sys.stderr)
if not datafile:
printe('Input password filename is required')
if not password:
printe('Password is required')
sys.exit(os.EX_USAGE)
# ---------- PASSWORDS FILE DECRYPTION AND DECOMPRESSION ---------- #
xmldata = DataReader(datafile).get_xml(password)
# ---------- QUERIES ---------- #
if dump_xml:
print xmldata
sys.exit(os.EX_OK)
# Multiply values to search by type of searches
numhits = 0
 
if not ( needles or searchTypes ): # No search nor filters, print all
numhits = dump_all_entries(xmldata)
elif not searchTypes: # Simple case, all searches are text searches
if mode == MODE_OR:
for text in needles:
numhits += dump_entries(xmldata, text, 'folder', caseInsensitive, True)
else:
assert mode == MODE_AND, "Unknown boolean operation mode"
numhits += dump_entries(xmldata, needles, 'folder', caseInsensitive, True)
elif needles:
if mode == MODE_OR: # Do a search filtered for each type
for text in needles:
for ( sfilter, negate ) in searchTypes:
numhits += dump_entries(xmldata, text, sfilter, caseInsensitive,
negate_filter=negate)
else: # Do a combined search, filter for each type
assert mode == MODE_AND, "Unknown boolean operation mode"
for ( sfilter, negate ) in searchTypes:
numhits += dump_entries(xmldata, needles, sfilter, caseInsensitive,
negate_filter=negate)
else: # Do a search only of types
for ( sfilter, negate ) in searchTypes:
numhits += dump_entries(xmldata, None, sfilter, negate_filter=negate)
if numhits == 0:
sys.exit(80)
 
if __name__ == '__main__':
try:
main(sys.argv[1:])
except RlvError as e:
printe('Error: %s' % e.msg)
if not RELEASE:
traceback.print_exc()
sys.exit(e.exitCode)
except etree.XMLSyntaxError as e:
printe('XML parsing error')
if not RELEASE:
traceback.print_exc()
sys.exit(os.EX_DATAERR)
except IOError as e:
if not RELEASE:
traceback.print_exc()
printe(str(e))
sys.exit(os.EX_IOERR)
 
# vim:set ts=4 et ai fileencoding=utf-8: #
Property changes:
Added: svn:keywords
+Rev Id Date
\ No newline at end of property