#!/usr/bin/env python import sys import time import os import math import optparse import boto import boto.ec2 import subprocess packages="" script_header="""#!/bin/bash append() { file="$1" shift $@ | tee -a "$file" } suappend() { file="$1" shift $@ | sudo tee -a "$file" } try() { log="$HOME/$1" shift msg="$1" shift (echo "------ `date`: $@") >> $log ($@ 2>&1) | tee -a "$log" | tail -1 > tmp-log ret="${PIPESTATUS[0]}" if [ 0 != "$ret" ] then echo 1>&2 "Failed during $msg: `cat tmp-log`" rm tmp-log return 1 fi rm tmp-log return 0 } try1() { if ! try "$@" then echo 1>&2 "Retrying..." sleep 10 if ! try "$@" then echo 1>&2 "Retrying again..." sleep 20 if ! try "$@" then echo 1>&2 "Giving up." exit 1 fi fi fi return 0 } """ exec_args=["-cfgRoundtripTimeout=20000000","+RTS","-K20000000"] local_files = ["KMeans.hs","MakeData.hs","KMeansCommon.hs"] node_config_data = {"apts":"ghc cabal-install git-core libghc6-binary-dev libghc6-mtl-dev libghc6-network-dev libghc6-stm-dev"} node_config_script = script_header+""" cd try cfg.log "update apt repository" sudo apt-get -q -y update try cfg.log "install apt packages" sudo apt-get -q -y install %(apts)s try1 cfg.log "cabalupdate" cabal update try1 cfg.log "cabalify" cabal install pureMD5 utf8-string directory crypto-api tagged data-default cereal semigroups try cfg.log "retrieve git repository" git clone git://github.com/jepst/CloudHaskell.git cd CloudHaskell try cfg.log "configure local repository" cabal configure try1 cfg.log "build local repository" cabal build try1 cfg.log "install local repoistory" cabal install cd try cfg.log "compile executable" ghc -O3 --make KMeans.hs """ % node_config_data def writefile(fname,content): with open(fname,"w") as f: f.write(content) def err(s): print s sys.exit(1) def waitforallup(instances): while True: pending = False running = False for instance in instances: if instance.state == u'pending': pending = True instance.update() elif instance.state == u'running': running = True else: print("Encountered unexpected instance state: "+instance.state) # and terminate remaining instances? if pending: time.sleep(3) else: break ssh_opts=["-o","BatchMode=yes","-o","ServerAliveInterval=240","-o","StrictHostKeyChecking=no"] def allinstances_xfer(keyfile,username,instances,thefile): processes = [] for i in instances: cmd=["scp","-r","-q","-i",keyfile]+ssh_opts + thefile + [username+"@"+i.public_dns_name+":"] while True: subproc=subprocess.Popen(cmd,stderr=subprocess.STDOUT) result = subproc.wait() if result == 0: break else: print "Problem transfering",thefile,"to",i.public_dns_name," and retrying" time.sleep(60) def allinstances_recv(keyfile,username,instances,thefile): processes = [] for i in instances: cmd=["scp","-r","-q","-i",keyfile]+ssh_opts+ [ username+"@"+i.public_dns_name+":"+thefile,thefile] subproc=subprocess.Popen(cmd,stderr=subprocess.STDOUT) processes.append(subproc) for (i,p) in zip(instances,processes): result = p.wait() if result != 0: err("Error with remote receiving on instance "+i.id+" at "+i.public_dns_name) def allinstances_runscript(keyfile,username,instances,scriptmaker): processes = [] counter=0 for i in instances: text = scriptmaker(i) fname="tmp-"+str(counter) # use random nm here writefile(fname,text) cmd1=["scp","-r","-q","-i",keyfile]+ssh_opts+[ fname ,username+"@"+i.public_dns_name+":"] cmd2=["ssh","-q","-i",keyfile]+ssh_opts+["-l",username,"-o","StrictHostKeyChecking=no", i.public_dns_name,"bash",fname] cmd = ["bash","-c",(" ".join(cmd1))+" ; "+(" ".join(cmd2))] subproc=subprocess.Popen(cmd,stderr=subprocess.STDOUT) processes.append(subproc) counter=counter+1 time.sleep(5) for (i,p) in zip(instances,processes): result = p.wait() if result != 0: err("Error during remote script execution on instance "+i.id+" at "+i.public_dns_name) for i in range(counter): os.remove("tmp-"+str(i)) def allinstances_exec(keyfile,username,instances,thecmd): processes = [] for i in instances: cmd=["ssh","-i",keyfile,"-l",username]+ssh_opts+[ i.public_dns_name]+thecmd # print " ".join(cmd) subproc=subprocess.Popen(cmd,stderr=subprocess.STDOUT) processes.append(subproc) for (i,p) in zip(instances,processes): result = p.wait() if result != 0: err("Error "+str(result)+" with remote process on instance "+i.id+" at "+i.public_dns_name) def allinstances_spawn(keyfile,username,instances,thecmd): processes = [] for i in instances: cmd=["ssh","-n","-q","-i",keyfile,"-l",username]+ssh_opts+[ i.public_dns_name]+thecmd subproc=subprocess.Popen(cmd,stderr=subprocess.STDOUT) processes.append(subproc) time.sleep(5) def hostscript(instance): hostsfile = "try cfg.log 'setup hostname' suappend /etc/hosts echo 127.0.1.1 `hostname`" return hostsfile def main(): parser = optparse.OptionParser() parser.add_option("-q", "--datamultiple",help="Data multiplier (default 1)",dest="data_multiple",default=1,type="int") parser.add_option("-r", "--region", help="Region (default eu-west-1)", dest="region", default="eu-west-1") parser.add_option("-i", "--image", help="Virtual machine image (default ami-311f2b45)",dest="ami",default="ami-311f2b45") # other useful ubuntu instances at # and here http://uec-images.ubuntu.com/releases/10.04/release/ # and http://aws.amazon.com/ec2/instance-types/ parser.add_option("-m", "--nmapers", help="Number of mapper nodes (default 10)",dest="mapper_nodes",type="int",default=10) parser.add_option("-n", "--nreducers", help="Number of reducer nodes (default 3)",dest="reducer_nodes",type="int",default=3) parser.add_option("-p", "--nodesperhost",help="Number of nodes to start on each host (default 4)",dest="nodes_per_host",type="int",default=4) parser.add_option("-k", "--keypair",help="Keypair for starting EC2 instances (default aws) -- will look for corresponding file in ~/.ssh/",dest="keypair",default="aws") parser.add_option("-s", "--securitygroup",help="Name of security group for launching instances (default default)",dest="securitygroup",default="default") parser.add_option("-t","--instancetype",help="Name of instance type for launching instances (default m1.small)",dest="instancetype",default="m1.small") parser.add_option("-l","--login-user",help="Username for connecting to instances (default ubuntu)",dest="username",default="ubuntu") (options, args) = parser.parse_args() if os.sep in options.keypair: s = os.path.split(options.keypair)[-1] keypair = s.split(".pem")[0] keypairfile = os.path.expanduser(options.keypair) if not os.path.exists(keypairfile): err("Specified keypair file "+keypairfile+" doesn't exist") else: keypairbase = options.keypair attempts = [os.path.join("~",".ssh,",keypairbase), os.path.join("~",".ssh,",keypairbase+".pem"), os.path.join("~",".ec2",keypairbase), os.path.join("~",".ec2",keypairbase+".pem") ] for attempt in attempts: path = os.path.expanduser(attempt) if os.path.exists(path): keypair = options.keypair keypairfile = path break else: err("Could not find a keyfile matching \""+keypairbase+"\" in ~/.ssh or ~/.ec2") try: for r in boto.ec2.regions(): if r.name == options.region: region = r break else: err("Region %s not found." % options.region) except AttributeError: err("Can't connect to AWS. Set AWS_SECRET_ACCESS_KEY and AWS_ACCESS_KEY_ID environment variables to the values visible under your Security Credentials screen.") ec2 = boto.connect_ec2(region=region) num_nodes = 1 + options.mapper_nodes + options.reducer_nodes numhosts = int (math.ceil (float(num_nodes) / options.nodes_per_host)) print "Starting "+str(numhosts)+" instances of image "+options.ami+"." image = ec2.get_all_images(image_ids=[options.ami])[0] reservation = image.run(numhosts,numhosts,key_name=keypair,instance_type=options.instancetype,security_groups=[options.securitygroup]) waitforallup(reservation.instances) time.sleep(30) print "Configuring instances." allinstances_xfer(keypairfile,options.username,reservation.instances,local_files) allinstances_runscript(keypairfile,options.username,reservation.instances,lambda a: node_config_script + hostscript(a) + "\n") config_file="cfgKnownHosts "+(" ".join([i.private_dns_name for i in reservation.instances])) writefile("config",config_file) allinstances_xfer(keypairfile,options.username,reservation.instances,["config"]) print "Transfering data." mappers=[] reducers=[] master=[] insts = reservation.instances * options.nodes_per_host master.append(insts.pop(0)) for i in range(options.mapper_nodes): mappers.append(insts.pop(0)) for i in range(options.reducer_nodes): reducers.append(insts.pop(0)) allinstances_xfer(keypairfile,options.username,reservation.instances,["kmeans-points","kmeans-clusters"]) print "Starting worker nodes." allinstances_spawn(keypairfile,options.username,mappers,["./KMeans","-cfgRole=MAPPER"]+exec_args) allinstances_spawn(keypairfile,options.username,reducers,["./KMeans","-cfgRole=REDUCER"]+exec_args) time.sleep(30) print "Starting master node on "+master[0].public_dns_name+"." starttime=time.time() allinstances_exec(keypairfile,options.username,master,["./KMeans","-cfgRole=MASTER"]+exec_args) endtime=time.time() # print "Completed computation in "+str(endtime-starttime)+" seconds." # print "Retrieving results." # allinstances_recv(keypairfile,options.username,master,"kmeans-converged") print "Shutting down instances." for i in reservation.instances: i.stop() def safe_main(): try: main() except boto.exception.EC2ResponseError as a: err("Error from EC2 service: "+a.error_message+": "+a.reason) if __name__ == "__main__": safe_main()