mirror of
https://github.com/cosmo-sims/cosmICweb-music.git
synced 2024-09-19 16:53:43 +02:00
405 lines
13 KiB
Python
Executable file
405 lines
13 KiB
Python
Executable file
import os
|
|
import sys
|
|
import tempfile
|
|
import subprocess
|
|
from typing import NamedTuple
|
|
|
|
import click
|
|
import requests
|
|
from collections import namedtuple
|
|
from datetime import datetime
|
|
import logging
|
|
|
|
# Logger
|
|
logger = logging.getLogger()
|
|
handler = logging.StreamHandler()
|
|
formatter = logging.Formatter(
|
|
"%(asctime)s %(levelname)-8s %(message)s", "%Y-%m-%d %H:%M:%S"
|
|
)
|
|
handler.setFormatter(formatter)
|
|
logger.addHandler(handler)
|
|
logger.setLevel("INFO")
|
|
|
|
# Some constants
|
|
DEFAULT_URL = "https://cosmicweb.eu"
|
|
EDITOR = os.environ.get("EDITOR", "vim")
|
|
|
|
# Types
|
|
Ellipsoid = namedtuple(
|
|
"Ellipsoid", ["center", "shape", "traceback_radius", "radius_definition"]
|
|
)
|
|
DownloadConfig = namedtuple(
|
|
"DownloadConfig",
|
|
[
|
|
"simulation_name",
|
|
"halo_names",
|
|
"halo_urls",
|
|
"traceback_radius",
|
|
"api_token",
|
|
"MUSIC",
|
|
"settings",
|
|
"accessed_at",
|
|
],
|
|
)
|
|
|
|
|
|
class Args(NamedTuple):
|
|
url: str
|
|
output_path: str
|
|
common_directory: str
|
|
attempts: int
|
|
|
|
|
|
|
|
def query_yes_no(question, default="yes"):
|
|
"""Ask a yes/no question via raw_input() and return their answer.
|
|
|
|
"question" is a string that is presented to the user.
|
|
"default" is the presumed answer if the user just hits <Enter>.
|
|
It must be "yes" (the default), "no" or None (meaning
|
|
an answer is required of the user).
|
|
|
|
The "answer" return value is True for "yes" or False for "no".
|
|
"""
|
|
valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False}
|
|
if default is None:
|
|
prompt = " [y/n] "
|
|
elif default == "yes":
|
|
prompt = " [Y/n] "
|
|
elif default == "no":
|
|
prompt = " [y/N] "
|
|
else:
|
|
raise ValueError("invalid default answer: '%s'" % default)
|
|
|
|
while True:
|
|
sys.stdout.write(question + prompt)
|
|
choice = input().lower()
|
|
if default is not None and choice == "":
|
|
return valid[default]
|
|
elif choice in valid:
|
|
return valid[choice]
|
|
else:
|
|
sys.stdout.write("Please respond with 'yes' or 'no' " "(or 'y' or 'n').\n")
|
|
|
|
|
|
# Routines
|
|
def fetch_ellipsoids(url, api_token, attempts):
|
|
for i in range(attempts):
|
|
try:
|
|
r = requests.get(url, headers={"Authorization": "Token " + api_token})
|
|
# This will raise an error if not successful
|
|
r.raise_for_status()
|
|
except requests.exceptions.HTTPError as e:
|
|
logging.warning("Failed fetching (attempt {}/{}) ...".format(i, attempts))
|
|
logging.warning(e)
|
|
else:
|
|
content = r.json()
|
|
return [
|
|
Ellipsoid(
|
|
center=e["ellips_center"],
|
|
shape=e["ellips_matrix"],
|
|
radius_definition=e["radius_definition"],
|
|
traceback_radius=e["traceback_radius"],
|
|
)
|
|
for e in content
|
|
]
|
|
logging.error("Unable to download ellipsoids from {}".format(url))
|
|
return None
|
|
|
|
|
|
def fetch_ellipsoid(url, api_token, traceback_radius, attempts=3):
|
|
ellipsoids = fetch_ellipsoids(url, api_token, attempts)
|
|
if ellipsoids is not None:
|
|
return next(
|
|
(e for e in ellipsoids if e.traceback_radius == traceback_radius), None
|
|
)
|
|
return None
|
|
|
|
|
|
def fetch_downloadstore(cosmicweb_url, target):
|
|
try:
|
|
r = requests.get(cosmicweb_url + "/api/music/store/" + target)
|
|
# This will raise an error if not successful
|
|
r.raise_for_status()
|
|
except requests.exceptions.HTTPError as e:
|
|
logging.critical(f"Failed downloading from cosmICweb.")
|
|
logging.critical(e)
|
|
sys.exit(1)
|
|
content = r.json()
|
|
sim = content["simulation"]
|
|
halo_urls = [
|
|
"{url}/simulation/{sid}/halo/{hid}".format(
|
|
url=sim["api_url"], sid=sim["api_id"], hid=h
|
|
)
|
|
for h in content["halos"]
|
|
]
|
|
return DownloadConfig(
|
|
simulation_name=sim["name"],
|
|
halo_names=["halo_{}".format(h) for h in content["halos"]],
|
|
halo_urls=halo_urls,
|
|
traceback_radius=content["traceback_radius"],
|
|
api_token=sim["api_token"],
|
|
MUSIC=sim["ics"],
|
|
settings=content["configuration"],
|
|
accessed_at=datetime.now(),
|
|
)
|
|
|
|
|
|
def fetch_publication(cosmicweb_url, publication_name, traceback_radius):
|
|
try:
|
|
r = requests.get(cosmicweb_url + "/api/publications/" + publication_name)
|
|
# This will raise an error if not successful
|
|
r.raise_for_status()
|
|
except requests.exceptions.HTTPError as e:
|
|
logging.critical("Failed downloading from cosmICweb.")
|
|
logging.critical(e)
|
|
sys.exit(1)
|
|
content = r.json()
|
|
sim = content["simulation"]
|
|
halo_names = [h["name"] for h in content["halos"]]
|
|
halo_urls = [
|
|
"{url}/simulation/{sid}/halo/{hid}".format(
|
|
url=sim["api_url"], sid=sim["api_id"], hid=h["id"]
|
|
)
|
|
for h in content["halos"]
|
|
]
|
|
return DownloadConfig(
|
|
simulation_name=sim["name"],
|
|
halo_names=halo_names,
|
|
halo_urls=halo_urls,
|
|
traceback_radius=traceback_radius,
|
|
api_token=sim["api_token"],
|
|
MUSIC=sim["ics"],
|
|
settings={},
|
|
accessed_at=datetime.now(),
|
|
)
|
|
|
|
|
|
def edit_template(template):
|
|
with tempfile.NamedTemporaryFile(suffix=".tmp", mode="r+") as tf:
|
|
tf.write(template)
|
|
tf.flush()
|
|
# Call the editor. backupcopy=yes prevents vim from creating copy and rename
|
|
subprocess.call([EDITOR, "+set backupcopy=yes", tf.name])
|
|
tf.seek(0)
|
|
template = tf.read()
|
|
return template
|
|
|
|
|
|
def music_config_to_template(music_config, configuration):
|
|
# TODO: apply configuraton, add header
|
|
return (
|
|
"[SETUP]\n" + music_config["setup"] + "\n\n<ELLIPSOID_TEMPLATE>\n\n"
|
|
"[COSMOLOGY]\n" + music_config["cosmology"] + "\n\n"
|
|
"[RANDOM]\n" + music_config["random"] + "\n\n"
|
|
"[POISSON]\n" + music_config["poisson"]
|
|
)
|
|
|
|
|
|
def compose_template(template, ellipsoid):
|
|
# TODO: add ellipsoid header (rtb, halo_name, etc)
|
|
shape_0 = ", ".join("{:13.6e}".format(e) for e in ellipsoid.shape[0])
|
|
shape_1 = ", ".join("{:13.6e}".format(e) for e in ellipsoid.shape[1])
|
|
shape_2 = ", ".join("{:13.6e}".format(e) for e in ellipsoid.shape[2])
|
|
center = ", ".join("{:13.6e}".format(x) for x in ellipsoid.center)
|
|
|
|
ellipsoid_lines = (
|
|
"region = ellipsoid\n"
|
|
"region_ellipsoid_matrix[0] = {s0}\n"
|
|
"region_ellipsoid_matrix[1] = {s1}\n"
|
|
"region_ellipsoid_matrix[2] = {s2}\n"
|
|
"region_ellipsoid_center = {c}\n".format(
|
|
s0=shape_0, s1=shape_1, s2=shape_2, c=center
|
|
)
|
|
)
|
|
return template.replace("<ELLIPSOID_TEMPLATE>", ellipsoid_lines)
|
|
|
|
|
|
def write_music_file(output_file, music_config):
|
|
dirname = os.path.dirname(output_file)
|
|
if not os.path.exists(dirname):
|
|
logging.debug("Creating directory {}".format(dirname))
|
|
os.makedirs(dirname)
|
|
with open(output_file, "w") as f:
|
|
f.write(music_config)
|
|
|
|
|
|
def call_music():
|
|
pass
|
|
|
|
|
|
def process_config(config, args: Args):
|
|
ellipsoids = []
|
|
for halo_name, url in zip(config.halo_names, config.halo_urls):
|
|
logging.info("Fetching ellipsoids from halo " + halo_name)
|
|
ellipsoids.append(
|
|
fetch_ellipsoid(
|
|
url + "/ellipsoids",
|
|
config.api_token,
|
|
config.traceback_radius,
|
|
args.attempts,
|
|
)
|
|
)
|
|
|
|
# Edit template
|
|
logging.info("Creating MUSIC template")
|
|
music_template = music_config_to_template(config.MUSIC, config.settings)
|
|
|
|
if query_yes_no(
|
|
"Do you want to edit the MUSIC template before creating the IC files?\n"
|
|
"(changing zstart, levelmin, levelmax, etc.)",
|
|
default="no",
|
|
):
|
|
logging.debug("Editing MUSIC template")
|
|
music_template = edit_template(music_template)
|
|
logging.debug("Finished editing MUSIC template")
|
|
# Store template to file
|
|
for halo_name, ellipsoid in zip(config.halo_names, ellipsoids):
|
|
if ellipsoid is None:
|
|
logging.warning(
|
|
"Ellipsoid for halo {} not available, skipping".format(halo_name)
|
|
)
|
|
continue
|
|
logging.info("Composing MUSIC configuration file for halo {}".format(halo_name))
|
|
music_config = compose_template(music_template, ellipsoid)
|
|
if args.common_directory and len(ellipsoids) > 1:
|
|
output_file = os.path.join(args.output_path, str(halo_name), "ics.cfg")
|
|
else:
|
|
output_file = os.path.join(
|
|
args.output_path, "/ics_{}.cfg".format(halo_name)
|
|
)
|
|
logging.info(
|
|
"Storing MUSIC configuration file for halo {} in {}".format(
|
|
halo_name, output_file
|
|
)
|
|
)
|
|
write_music_file(output_file, music_config)
|
|
|
|
# TODO: Execute MUSIC?
|
|
|
|
|
|
def downloadstore_mode(args: Args, target: str):
|
|
logging.info("Fetching download configuration from the cosmICweb server")
|
|
config = fetch_downloadstore(args.url, target)
|
|
if args.output_path == "./":
|
|
args.output_path = "./cosmICweb-zooms-{}".format(config.simulation_name)
|
|
logging.debug("Output directory set to " + args.output_path)
|
|
logging.info("Download configuration successfully fetched")
|
|
process_config(config, args)
|
|
|
|
|
|
def publication_mode(args: Args, publication_name: str, traceback_radius: int):
|
|
logging.info(
|
|
"Fetching publication " + publication_name + " from the cosmICweb server"
|
|
)
|
|
config = fetch_publication(args.url, publication_name, traceback_radius)
|
|
args.output_path = os.path.join(args.output_path, publication_name)
|
|
logging.debug("Output directory set to " + args.output_path)
|
|
logging.info("Publication successfully fetched")
|
|
process_config(config, args)
|
|
|
|
|
|
def dir_path(p):
|
|
if os.path.isdir(p):
|
|
return p
|
|
else:
|
|
raise NotADirectoryError(p)
|
|
|
|
|
|
@click.group()
|
|
@click.option(
|
|
"--url", default=DEFAULT_URL, help="overwrite URL of the cosmICweb server"
|
|
)
|
|
@click.option(
|
|
"--output-path",
|
|
type=dir_path,
|
|
default="./",
|
|
help="Download target for IC files. If downloading publication, will create a subfolder with the name of the publication",
|
|
)
|
|
@click.option("--common-directory", is_flag=True)
|
|
@click.option(
|
|
"--attempts", type=int, default=3, help="number of attempts to download ellipsoids"
|
|
)
|
|
@click.option("--verbose", is_flag=True)
|
|
@click.pass_context
|
|
def cli(ctx, url, output_path, common_directory, attempts, verbose):
|
|
if verbose:
|
|
logger.setLevel("DEBUG")
|
|
ctx.obj = Args(
|
|
url=url,
|
|
output_path=output_path,
|
|
common_directory=common_directory,
|
|
attempts=attempts
|
|
)
|
|
|
|
|
|
@cli.command(help="Download ICs using a target UUID generated on cosmICweb")
|
|
@click.argument("target")
|
|
@click.pass_context
|
|
def get(ctx, target):
|
|
args: Args = ctx.obj
|
|
downloadstore_mode(args, target)
|
|
|
|
|
|
@cli.command(help="Download published ICs using the publication name")
|
|
@click.argument("publication_name")
|
|
@click.option("--traceback_radius", type=click.Choice([1, 2, 4, 10]), default=2)
|
|
@click.pass_context
|
|
def publication(ctx, publication_name, traceback_radius):
|
|
args: Args = ctx.obj
|
|
publication_mode(args, publication_name, traceback_radius)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
cli()
|
|
|
|
# if __name__ == "__main__":
|
|
# parser = argparse.ArgumentParser()
|
|
# parser.add_argument(
|
|
# "--url",
|
|
# dest="cosmicweb_url",
|
|
# default=DEFAULT_URL,
|
|
# help="overwrite URL of the cosmicweb server",
|
|
# )
|
|
# parser.add_argument(
|
|
# "--output-path",
|
|
# type=dir_path,
|
|
# default="./",
|
|
# help="Download target for IC files. If downloading publication, will create a subfolder with the "
|
|
# "name of the publication",
|
|
# )
|
|
# parser.add_argument(
|
|
# "--common-directory", dest="create_subdirs", action="store_false"
|
|
# )
|
|
# parser.add_argument(
|
|
# "--attempts",
|
|
# type=int,
|
|
# default=3,
|
|
# help="number of attempts to download ellipsoids",
|
|
# )
|
|
# parser.add_argument("--verbose", action="store_true")
|
|
|
|
# subparsers = parser.add_subparsers(dest="mode")
|
|
# # Downloading from publications
|
|
# publication_parser = subparsers.add_parser(
|
|
# "publication", help="download publications"
|
|
# )
|
|
# publication_parser.add_argument("publication_name", help="name of the publication")
|
|
# publication_parser.add_argument(
|
|
# "--traceback_radius", type=int, choices=[1, 2, 4, 10], default=2, help=""
|
|
# )
|
|
# # Downloading from download object
|
|
# download_parser = subparsers.add_parser("get")
|
|
# download_parser.add_argument("target")
|
|
|
|
# args = parser.parse_args()
|
|
|
|
# if args.verbose:
|
|
# logger.setLevel("DEBUG")
|
|
|
|
# if args.mode == "get":
|
|
# downloadstore_mode(args)
|
|
# elif args.mode == "publication":
|
|
# publication_mode(args)
|
|
# else:
|
|
# raise NotImplementedError("unknown subparser")
|