Source code for httk.core.crypto

#
#    The high-throughput toolkit (httk)
#    Copyright (C) 2012-2015 Rickard Armiento
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU Affero General Public License as
#    published by the Free Software Foundation, either version 3 of the
#    License, or (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU Affero General Public License for more details.
#
#    You should have received a copy of the GNU Affero General Public License
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""
Provides a few central and very helpful functions for cryptographic hashes, etc.
"""
import hashlib, os.path, base64, re, sys, codecs
from httk.core.basic import print_, unicode_type
from httk.core import ed25519

if sys.version_info[0] == 3:
    import configparser
else:
    import ConfigParser as configparser

try:
    import bz2
except Exception:
    pass

from httk.core.ioadapters import IoAdapterFileReader, IoAdapterFileWriter
from httk.core.basic import nested_split


[docs]def hexhash_str(data, prepend=None): s = hashlib.sha1() s.update("httk\0".encode("utf-8")) if prepend is not None: s.update(prepend.encode("utf-8")) s.update("\0".encode("utf-8")) s.update(data.encode("utf-8")) s.update("\0%u\0".encode("utf-8") % len(data)) return s.hexdigest()
[docs]def tuple_to_hexhash(t): return hexhash_str(tuple_to_str(t))
[docs]def hexhash_ioa(ioa, prepend=None): def chunks(f, size=8192): while True: s = f.read(size) if not s: break yield s ioa = IoAdapterFileReader.use(ioa) f = ioa.file s = hashlib.sha1() size = 0 s.update("httk\0") if prepend is not None: s.update("\0") s.update(prepend) for chunk in chunks(f): size += len(chunk) s.update(chunk) ioa.close() s.update("\0%u\0" % size) return s.hexdigest()
# def generate_manifest_for_files(codename,codever, basedir, filenames,fakenames=None,skip_filenames=False,write=False,excludes=None): # hash_manifest=codename+" "+codever+"\n\n" # # We should always exclude the ht.manifest file form the manifest # if excludes == None: # excludes = ['ht.manifest'] # else: # excludes.append('ht.manifest') # # for i in range(len(filenames)): # f = filenames[i] # if skip_filenames: # try: # hash_manifest += hexhash_ioa(f)+"\n" # except IOError: # hash_manifest += "0\n" # else: # if fakenames != None and fakenames[i] != None: # relpath = fakenames[i] # else: # common_prefix = os.path.commonprefix([basedir, f]) # if common_prefix == '': # relpath = f # else: # relpath = os.path.relpath(f, common_prefix) # if relpath in excludes: # continue # try: # hash_manifest += relpath+" "+hexhash_ioa(f)+"\n" # except IOError: # hash_manifest += relpath+" "+"0\n" # # hash_hex = hexhash_str(hash_manifest) # # if write != None: # ioa = IoAdapterFileWriter.use(os.path.join(basedir,"ht.manifest")) # ioa.file.write(hash_manifest) # ioa.file.close() # # return (hash_manifest,hash_hex) # def generate_manifest_for_dir(codename,codever, basedir, excludes=None, fakenames=None, skip_filenames=False,write=False): # filelist = [] # # for root, dirs, files in os.walk(basedir): # filelist += [os.path.join(root,x) for x in files] # # Make sure we always generate the same manifest # filelist = sorted(filelist) # # return generate_manifest_for_files(codename, codever, basedir, filelist, fakenames=fakenames, skip_filenames=skip_filenames,write=write,excludes=excludes) # # # #raise Exception("generate_manifest_for_dir: Not implemented, sorry") # # def check_or_generate_manifest_for_dir(codename,codever, basedir, excludes=None, fakenames=None, skip_filenames=False): # manifestpath = os.path.join(basedir,"ht.manifest") # if excludes == None: # excludes = ['ht.manifest'] # else: # excludes.append('ht.manifest') # # if os.path.exists(manifestpath): # # Just verify that manifest is correct # (manifest, hexhash) = generate_manifest_for_dir(codename,codever, basedir, excludes=None, fakenames=None, skip_filenames=False,write=False) # test = IoAdapterFileReader.use(manifestpath).file.read() # hexhash1 = hexhash_str(manifest) # hexhash2 = hexhash_str(test) # hexhash3 = hexhash_ioa(manifestpath) # #print(hexhash,hexhash1, hexhash2,hexhash3) # #print("-----") # #print(test) # #print("-----") # #print(manifest) # #print("-----") # if hexhash != hexhash2: # raise Exception("Manifest mismatch, the ht.manifest in the directory does not match the actual files! Hashes are:"+str(hexhash)+" vs. "+str(hexhash2)) # else: # # Generate a new manifest # (manifest, hexhash) = generate_manifest_for_dir(codename,codever, basedir, excludes=None, fakenames=None, skip_filenames=False,write=True) # # return (manifest, hexhash)
[docs]def tuple_to_str(t): strlist = [] for i in t: if isinstance(i, tuple): tuplestr = "\n" tuplestr += tuple_to_str(i) #tuplestr += "\n" strlist.append(tuplestr) else: strlist.append(unicode_type(i)) return " ".join(strlist)
[docs]def read_keys(keydir): f = open(os.path.join(keydir, 'key1.priv'), "r") b64sk = f.read() f.close() sk = base64.b64decode(b64sk) pk = ed25519.publickey(sk) return (sk, pk)
[docs]def sha256file(filename): def chunks(f, size=8192): while True: s = f.read(size) if not s: break yield s f = open(filename, 'rb') s = hashlib.sha256() for chunk in chunks(f): s.update(chunk) f.close() return s.hexdigest()
[docs]def manifest_dir(basedir, manifestfile, excludespath, keydir, sk, pk, debug=False, force=False): message = "" excludes = [] if os.path.exists(os.path.join(excludespath, "excludes")): f = open(os.path.join(excludespath, "excludes")) excludes = [x.strip() for x in f.readlines()] f.close() try: cp = configparser.ConfigParser() cp.read(os.path.join(excludespath, "config")) excludestr = cp.get('main', 'excludes').strip() excludes += nested_split(excludestr, '[', ']') except (configparser.NoOptionError, configparser.NoSectionError): pass if len(excludes) == 0: excludes = ['.*~'] excludes += ['ht\.manifest\..*', 'ht\.project/keys', 'ht\.project/manifest', 'ht\.project/computers', 'ht\.project/excludes', 'ht\.project/tags', 'ht\.project/references', 'ht\.tmp\..*'] f = open(os.path.join(keydir, 'key1.pub'), "r") pubkey = f.readlines()[0].strip() f.close() manifestfile.write(pubkey+"\n") message += pubkey+"\n" for root, unsorteddirs, unsortedfiles in os.walk(keydir, topdown=True, followlinks=False): files = sorted(unsortedfiles) for filename in files: if filename != 'key1.pub' and re.match(".*\.pub", filename): f = open(os.path.join(root, filename), "r") filedata = f.readlines() f.close() pubkey = filedata[0].strip() comment = filedata[1].strip() manifestfile.write(pubkey+" "+str(comment)+"\n") message += pubkey+" "+str(comment)+"\n" manifestfile.write("\n") message += "\n" for root, unsorteddirs, unsortedfiles in os.walk(basedir, topdown=True, followlinks=False): if root == basedir: root = "" else: root = os.path.relpath(root, basedir) files = sorted(unsortedfiles) dirs = sorted(unsorteddirs) filenames = [os.path.join(root, x) for x in files] for i, f in enumerate(files): filename = filenames[i] for exclude in excludes: if re.match(exclude, f) is not None or re.match(exclude, filename) is not None: break else: truefilename = os.path.join(basedir, filename) hh = sha256file(truefilename) manifestfile.write(hh+" "+filename+"\n") message += hh+" "+filename+"\n" if debug: print("Adding:", hh+" "+filename) keepdirs = [] for d in dirs: fulldir = os.path.join(root, d) for exclude in excludes: if re.match(exclude, d) is not None or re.match(exclude, fulldir) is not None: break else: if d.startswith("ht.task.") or os.path.exists(os.path.join(fulldir, 'ht.config')): if force or (not os.path.exists(os.path.join(fulldir, 'ht.manifest.bz2'))): submanifestfile = bz2.BZ2File(os.path.join(basedir, fulldir, 'ht.tmp.manifest.bz2'), 'w') print("Generating manifest:", os.path.join(basedir, fulldir, 'ht.manifest.bz2')) manifest_dir(os.path.join(basedir, fulldir), submanifestfile, os.path.join(basedir, fulldir, 'ht.config'), keydir, sk, pk) submanifestfile.close() os.rename(os.path.join(basedir, fulldir, 'ht.tmp.manifest.bz2'), os.path.join(basedir, fulldir, 'ht.manifest.bz2')) hh = sha256file(os.path.join(basedir, fulldir, 'ht.manifest.bz2')) manifestfile.write(hh+" "+fulldir+"/\n") message += hh+" "+fulldir+"/\n" if debug: print("Adding:", hh+" "+fulldir+"/ ") else: keepdirs += [d] unsorteddirs[:] = keepdirs #print("===="+message+"====") sig = ed25519.signature(message, sk, pk) b64sig = base64.b64encode(sig) manifestfile.write("\n") manifestfile.write(b64sig) manifestfile.write("\n")
#def generate_rsa_keys(path,extraargs=[]): # args = ['ssh-keygen','-q','-N','','-f']+extraargs+[path] # p = Popen(args, stdout=PIPE) # stdout = p.communicate()[0] # if p.returncode != 0: # raise Exception("httk.crypto.crypto: failed to generate keys") # return stdout.strip() # def generate_keys_ssl(key,pubkey): # args = ['openssl','genrsa','-out',key,'-passout','pass:','2048'] # p = Popen(args, stdout=PIPE) # stdout = p.communicate()[0] # if p.returncode != 0: # raise Exception("httk.crypto.crypto: failed to generate keys") # args = ['openssl','rsa','-pubout','-out',pubkey,'-in',key] # p = Popen(args, stdout=PIPE) # stdout += p.communicate()[0] # if p.returncode != 0: # raise Exception("httk.crypto.crypto: failed to generate keys") # return stdout # # def sign_message_ssl(msg,key): # args = ['openssl','genrsa','-sign','-key',key] # p = Popen(args, stdout=PIPE, stdin=msg) # stdout = p.communicate()[0] # if p.returncode != 0: # raise Exception("httk.crypto.crypto: failed to sign message") # return stdout # # def encrypt_message_ssl(msg,pubkey): # args = ['openssl','rsautl','-encrypt','-pubin','-inkey',pubkey] # p = Popen(args, stdout=PIPE, stdin=msg) # stdout = p.communicate()[0] # if p.returncode != 0: # raise Exception("httk.crypto.crypto: failed to encrypt message") # return stdout # # def decrypt_message_sll(msg,key): # args = ['openssl','rsautl','-decrypt','-inkey',key] # p = Popen(args, stdout=PIPE, stdin=msg) # stdout = p.communicate()[0] # if p.returncode != 0: # raise Exception("httk.crypto.crypto: failed to decrypt message") # return stdout
[docs]def generate_keys(public_key_path, secret_key_path): """ Generates a public and a private key pair and stores them in respective files """ try: secret_key = os.urandom(64) #sr = random.SystemRandom() #secret_key = sr.getrandbits(512) except NotImplementedError: raise Exception("crypto.generate_keys: Running on a system without a safe random number generator,cannot create safe cryptographic keys on this system.") public_key = ed25519.publickey(secret_key) b64secret_key = base64.b64encode(secret_key) b64public_key = base64.b64encode(public_key) pubfile = IoAdapterFileWriter.use(public_key_path) pubfile.file.write(codecs.decode(b64public_key,'utf-8')) pubfile.close() secfile = IoAdapterFileWriter.use(secret_key_path) secfile.file.write(codecs.decode(b64secret_key,'utf-8')) secfile.close()
[docs]def get_crypto_signature(message, secret_key=None, keyfile=None): if keyfile: ioa = IoAdapterFileReader.use(keyfile) secret_key = ioa.file.read() ioa.close() secret_key = base64.b64decode(secret_key) public_key = ed25519.publickey(secret_key) signature = ed25519.signature(message, secret_key, public_key) b64signature = base64.b64encode(signature) return b64signature
[docs]def verify_crytpo_signature(signature, message, public_key=None, keyfile=None): if keyfile: ioa = IoAdapterFileReader.use(keyfile) public_key = ioa.file.read() ioa.close() binsignature = base64.b64decode(signature) binpublic_key = base64.b64decode(public_key) return ed25519.checkvalid(binsignature, message, binpublic_key)
[docs]def verify_crytpo_signature_old(signature, message, public_key_path): ioa = IoAdapterFileReader.use(public_key_path) b64public_key = ioa.file.read() ioa.close() binsignature = base64.b64decode(signature) public_key = base64.b64decode(b64public_key) return ed25519.checkvalid(binsignature, message, public_key)
[docs]def main(): print_("Generating keys, this may take some time.") generate_keys("/tmp/pub.key", "/tmp/priv.key") message = "This is my message." print_("Signing message") my_signature = get_crypto_signature(message, keyfile="/tmp/priv.key") print_("Signature is") print_(my_signature) print_("Check if signature is valid") result = verify_crytpo_signature(my_signature, message, keyfile="/tmp/pub.key") print_("True message validates", result) assert(result == True) forged_message = "This is not my message." result = verify_crytpo_signature(my_signature, forged_message, keyfile="/tmp/pub.key") print_("Forged message validates", result) assert(result == False) print_("Finished")
if __name__ == "__main__": main()