-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathosdataproc.py
executable file
·109 lines (88 loc) · 4.59 KB
/
osdataproc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import argparse
import subprocess
import os
import sys
import volumes
import yaml
def create(args):
if args['nfs_volume'] is not None and args['volume_size'] is None:
# find ID of specified volume
args['nfs_volume'] = volumes.get_volume_id(args['nfs_volume'], to_create=False)
elif args['nfs_volume'] is not None and args['volume_size'] is not None:
# create a volume and return its ID if unique name
if volumes.get_volume_id(args['nfs_volume'], to_create=True) is None:
args['nfs_volume'] = volumes.create_volume(args['nfs_volume'], args['volume_size'])
else:
sys.exit("Please use a unique volume name.")
act(args, 'apply')
def destroy(args):
volumes_to_destroy = volumes.get_attached_volumes(
os.environ['OS_USERNAME'] + "-" + args['cluster-name'] + "-master")
act(args, 'destroy')
if args['destroy-volumes'] and volumes_to_destroy is not None:
# destroy volumes attached to instance
volumes.destroy_volumes(volumes_to_destroy)
def update(args):
act(args, 'update')
def reboot(args):
act(args, 'reboot')
def act(args, command):
if "OS_USERNAME" not in os.environ:
sys.exit("openrc.sh must be sourced")
osdataproc_home = os.path.dirname(os.path.realpath(__file__))
run_args = get_args(args, command)
subprocess.run([f'{osdataproc_home}/run', 'init'])
subprocess.run(run_args)
def get_args(args, command):
osdataproc_home = os.path.dirname(os.path.realpath(__file__))
run_args = [f'{osdataproc_home}/run', command]
# FIXME The order in which the keys are iterated through matters to
# the downstream "run" script, which uses positional arguments.
# Changed loop to explicit items to match script's expectations.
run_args += [
str(args["cluster-name"]),
str(args["num_workers"]),
str(args["public_key"]),
str(args["flavour"]),
str(args["network_name"]),
str(args["lustre_network"]),
str(args["image_name"]),
str(args["nfs_volume"]),
str(args["volume_size"]),
str(args["device_name"]),
str(args["floating_ip"])
]
return run_args
def cli():
'''osdataproc'''
parser = argparse.ArgumentParser(description='CLI tool to manage a Spark and Hadoop cluster')
subparsers = parser.add_subparsers()
parser_create = subparsers.add_parser('create', help='create a Spark cluster')
parser_create.add_argument('cluster-name', help='name of the cluster to create')
parser_create.add_argument('-n', '--num-workers', type=int, help='number of worker nodes')
parser_create.add_argument('-p', '--public-key', help='path to public key file')
parser_create.add_argument('-f', '--flavour', '--flavor', help='OpenStack flavour to use')
parser_create.add_argument('--network-name', help='OpenStack network to use')
parser_create.add_argument('--lustre-network', help='OpenStack Secure Lustre network to use')
parser_create.add_argument('-i', '--image-name', help='OpenStack image to use - Ubuntu only')
parser_create.add_argument('-v', '--nfs-volume', help='Name or ID of an nfs volume to attach to the cluster')
volume_create = parser_create.add_mutually_exclusive_group()
volume_create.add_argument('-s', '--volume-size', help='Size of OpenStack volume to create')
volume_create.add_argument('-d', '--device-name', help='Device mountpoint name of volume - see NFS.md')
parser_create.add_argument('--floating-ip', help='OpenStack floating IP to associate to the master node - will automatically create one if not specified')
parser_create.set_defaults(func=create)
parser_destroy = subparsers.add_parser('destroy', help='destroy a Spark cluster')
parser_destroy.add_argument('cluster-name', help='name of the cluster to destroy')
parser_destroy.add_argument('-d', '--destroy-volumes', dest='destroy-volumes', action='store_true', help='also destroy volumes attached to cluster')
parser_destroy.set_defaults(func=destroy)
parser_reboot = subparsers.add_parser('reboot', help='reboot all worker nodes of a cluster, e.g. to pick up mount point changes')
parser_reboot.add_argument('cluster-name', help='name of the cluster to reboot')
parser_reboot.set_defaults(func=reboot)
args = parser.parse_args()
osdataproc_home = os.path.dirname(os.path.realpath(__file__))
with open(f'{osdataproc_home}/vars.yml', 'r') as stream:
defaults = yaml.safe_load(stream)
defaults['osdataproc'].update({k: v for k, v in vars(args).items() if v is not None})
args.func(defaults['osdataproc'])
if __name__ == "__main__":
cli()