1
0
Fork 0
mirror of https://github.com/cosmo-sims/cosmICweb-music.git synced 2024-09-19 16:53:43 +02:00
cosmICweb-music/cosmicweb_music/cosmICweb.py

404 lines
13 KiB
Python
Executable file

import os
import sys
import tempfile
import subprocess
from typing import NamedTuple, Any, List, Dict
import click
import requests
from datetime import datetime
import logging
# Logger
logger = logging.getLogger()
handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s %(levelname)-8s %(message)s", "%Y-%m-%d %H:%M:%S"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel("INFO")
# Some constants
DEFAULT_URL = "https://cosmicweb.eu"
EDITOR = os.environ.get("EDITOR", "vim")
# Types
class Ellipsoid(NamedTuple):
center: int
shape: int
traceback_radius: int
radius_definition: int
class DownloadConfig(NamedTuple):
simulation_name: str
halo_names: List[Any]
halo_urls: List[str]
traceback_radius: float
api_token: str
MUSIC: str
settings: Dict[Any, Any]
accessed_at: datetime
class Args(NamedTuple):
url: str
output_path: str
common_directory: str
attempts: int
def query_yes_no(question, default="yes"):
"""Ask a yes/no question via raw_input() and return their answer.
"question" is a string that is presented to the user.
"default" is the presumed answer if the user just hits <Enter>.
It must be "yes" (the default), "no" or None (meaning
an answer is required of the user).
The "answer" return value is True for "yes" or False for "no".
"""
valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False}
if default is None:
prompt = " [y/n] "
elif default == "yes":
prompt = " [Y/n] "
elif default == "no":
prompt = " [y/N] "
else:
raise ValueError("invalid default answer: '%s'" % default)
while True:
sys.stdout.write(question + prompt)
choice = input().lower()
if default is not None and choice == "":
return valid[default]
elif choice in valid:
return valid[choice]
else:
sys.stdout.write("Please respond with 'yes' or 'no' " "(or 'y' or 'n').\n")
# Routines
def fetch_ellipsoids(url, api_token, attempts):
for i in range(attempts):
try:
r = requests.get(url, headers={"Authorization": "Token " + api_token})
# This will raise an error if not successful
r.raise_for_status()
except requests.exceptions.HTTPError as e:
logging.warning("Failed fetching (attempt {}/{}) ...".format(i, attempts))
logging.warning(e)
else:
content = r.json()
return [
Ellipsoid(
center=e["ellips_center"],
shape=e["ellips_matrix"],
radius_definition=e["radius_definition"],
traceback_radius=e["traceback_radius"],
)
for e in content
]
logging.error("Unable to download ellipsoids from {}".format(url))
return None
def fetch_ellipsoid(url, api_token, traceback_radius, attempts=3):
ellipsoids = fetch_ellipsoids(url, api_token, attempts)
if ellipsoids is not None:
return next(
(e for e in ellipsoids if e.traceback_radius == traceback_radius), None
)
return None
def fetch_downloadstore(cosmicweb_url, target):
try:
r = requests.get(cosmicweb_url + "/api/music/store/" + target)
# This will raise an error if not successful
r.raise_for_status()
except requests.exceptions.HTTPError as e:
logging.critical(f"Failed downloading from cosmICweb.")
logging.critical(e)
sys.exit(1)
content = r.json()
sim = content["simulation"]
halo_urls = [
"{url}/simulation/{sid}/halo/{hid}".format(
url=sim["api_url"], sid=sim["api_id"], hid=h
)
for h in content["halos"]
]
return DownloadConfig(
simulation_name=sim["name"],
halo_names=["halo_{}".format(h) for h in content["halos"]],
halo_urls=halo_urls,
traceback_radius=content["traceback_radius"],
api_token=sim["api_token"],
MUSIC=sim["ics"],
settings=content["configuration"],
accessed_at=datetime.now(),
)
def fetch_publication(cosmicweb_url, publication_name, traceback_radius):
try:
r = requests.get(cosmicweb_url + "/api/publications/" + publication_name)
# This will raise an error if not successful
r.raise_for_status()
except requests.exceptions.HTTPError as e:
logging.critical("Failed downloading from cosmICweb.")
logging.critical(e)
sys.exit(1)
content = r.json()
sim = content["simulation"]
halo_names = [h["name"] for h in content["halos"]]
halo_urls = [
"{url}/simulation/{sid}/halo/{hid}".format(
url=sim["api_url"], sid=sim["api_id"], hid=h["id"]
)
for h in content["halos"]
]
return DownloadConfig(
simulation_name=sim["name"],
halo_names=halo_names,
halo_urls=halo_urls,
traceback_radius=traceback_radius,
api_token=sim["api_token"],
MUSIC=sim["ics"],
settings={},
accessed_at=datetime.now(),
)
def edit_template(template):
with tempfile.NamedTemporaryFile(suffix=".tmp", mode="r+") as tf:
tf.write(template)
tf.flush()
# Call the editor. backupcopy=yes prevents vim from creating copy and rename
subprocess.call([EDITOR, "+set backupcopy=yes", tf.name])
tf.seek(0)
template = tf.read()
return template
def music_config_to_template(music_config, configuration):
# TODO: apply configuraton, add header
return (
"[SETUP]\n" + music_config["setup"] + "\n\n<ELLIPSOID_TEMPLATE>\n\n"
"[COSMOLOGY]\n" + music_config["cosmology"] + "\n\n"
"[RANDOM]\n" + music_config["random"] + "\n\n"
"[POISSON]\n" + music_config["poisson"]
)
def compose_template(template, ellipsoid):
# TODO: add ellipsoid header (rtb, halo_name, etc)
shape_0 = ", ".join("{:13.6e}".format(e) for e in ellipsoid.shape[0])
shape_1 = ", ".join("{:13.6e}".format(e) for e in ellipsoid.shape[1])
shape_2 = ", ".join("{:13.6e}".format(e) for e in ellipsoid.shape[2])
center = ", ".join("{:13.6e}".format(x) for x in ellipsoid.center)
ellipsoid_lines = (
"region = ellipsoid\n"
"region_ellipsoid_matrix[0] = {s0}\n"
"region_ellipsoid_matrix[1] = {s1}\n"
"region_ellipsoid_matrix[2] = {s2}\n"
"region_ellipsoid_center = {c}\n".format(
s0=shape_0, s1=shape_1, s2=shape_2, c=center
)
)
return template.replace("<ELLIPSOID_TEMPLATE>", ellipsoid_lines)
def write_music_file(output_file, music_config):
dirname = os.path.dirname(output_file)
if not os.path.exists(dirname):
logging.debug("Creating directory {}".format(dirname))
os.makedirs(dirname)
with open(output_file, "w") as f:
f.write(music_config)
def call_music():
pass
def process_config(config, args: Args):
ellipsoids = []
for halo_name, url in zip(config.halo_names, config.halo_urls):
logging.info("Fetching ellipsoids from halo " + halo_name)
ellipsoids.append(
fetch_ellipsoid(
url + "/ellipsoids",
config.api_token,
config.traceback_radius,
args.attempts,
)
)
# Edit template
logging.info("Creating MUSIC template")
music_template = music_config_to_template(config.MUSIC, config.settings)
if query_yes_no(
"Do you want to edit the MUSIC template before creating the IC files?\n"
"(changing zstart, levelmin, levelmax, etc.)",
default="no",
):
logging.debug("Editing MUSIC template")
music_template = edit_template(music_template)
logging.debug("Finished editing MUSIC template")
# Store template to file
for halo_name, ellipsoid in zip(config.halo_names, ellipsoids):
if ellipsoid is None:
logging.warning(
"Ellipsoid for halo {} not available, skipping".format(halo_name)
)
continue
logging.info("Composing MUSIC configuration file for halo {}".format(halo_name))
music_config = compose_template(music_template, ellipsoid)
if args.common_directory and len(ellipsoids) > 1:
output_file = os.path.join(args.output_path, str(halo_name), "ics.cfg")
else:
output_file = os.path.join(
args.output_path, "/ics_{}.cfg".format(halo_name)
)
logging.info(
"Storing MUSIC configuration file for halo {} in {}".format(
halo_name, output_file
)
)
write_music_file(output_file, music_config)
# TODO: Execute MUSIC?
def downloadstore_mode(args: Args, target: str):
logging.info("Fetching download configuration from the cosmICweb server")
config = fetch_downloadstore(args.url, target)
if args.output_path == "./":
args = args._replace(output_path = "./cosmICweb-zooms-{}".format(config.simulation_name))
logging.debug("Output directory set to " + args.output_path)
logging.info("Download configuration successfully fetched")
process_config(config, args)
def publication_mode(args: Args, publication_name: str, traceback_radius: int):
logging.info(
"Fetching publication " + publication_name + " from the cosmICweb server"
)
config = fetch_publication(args.url, publication_name, traceback_radius)
args = args._replace(output_path=os.path.join(args.output_path, publication_name))
logging.debug("Output directory set to " + args.output_path)
logging.info("Publication successfully fetched")
process_config(config, args)
def dir_path(p):
if os.path.isdir(p):
return p
else:
raise NotADirectoryError(p)
@click.group()
@click.option(
"--url", default=DEFAULT_URL, help="overwrite URL of the cosmICweb server"
)
@click.option(
"--output-path",
type=dir_path,
default="./",
help="Download target for IC files. If downloading publication, will create a subfolder with the name of the publication",
)
@click.option("--common-directory", is_flag=True)
@click.option(
"--attempts", type=int, default=3, help="number of attempts to download ellipsoids"
)
@click.option("--verbose", is_flag=True)
@click.pass_context
def cli(ctx, url, output_path, common_directory, attempts, verbose):
if verbose:
logger.setLevel("DEBUG")
ctx.obj = Args(
url=url,
output_path=output_path,
common_directory=common_directory,
attempts=attempts
)
@cli.command(help="Download ICs using a target UUID generated on cosmICweb")
@click.argument("target")
@click.pass_context
def get(ctx, target):
args: Args = ctx.obj
downloadstore_mode(args, target)
@cli.command(help="Download published ICs using the publication name")
@click.argument("publication_name")
@click.option("--traceback_radius", type=click.Choice([1, 2, 4, 10]), default=2)
@click.pass_context
def publication(ctx, publication_name, traceback_radius):
args: Args = ctx.obj
publication_mode(args, publication_name, traceback_radius)
if __name__ == "__main__":
cli()
# if __name__ == "__main__":
# parser = argparse.ArgumentParser()
# parser.add_argument(
# "--url",
# dest="cosmicweb_url",
# default=DEFAULT_URL,
# help="overwrite URL of the cosmicweb server",
# )
# parser.add_argument(
# "--output-path",
# type=dir_path,
# default="./",
# help="Download target for IC files. If downloading publication, will create a subfolder with the "
# "name of the publication",
# )
# parser.add_argument(
# "--common-directory", dest="create_subdirs", action="store_false"
# )
# parser.add_argument(
# "--attempts",
# type=int,
# default=3,
# help="number of attempts to download ellipsoids",
# )
# parser.add_argument("--verbose", action="store_true")
# subparsers = parser.add_subparsers(dest="mode")
# # Downloading from publications
# publication_parser = subparsers.add_parser(
# "publication", help="download publications"
# )
# publication_parser.add_argument("publication_name", help="name of the publication")
# publication_parser.add_argument(
# "--traceback_radius", type=int, choices=[1, 2, 4, 10], default=2, help=""
# )
# # Downloading from download object
# download_parser = subparsers.add_parser("get")
# download_parser.add_argument("target")
# args = parser.parse_args()
# if args.verbose:
# logger.setLevel("DEBUG")
# if args.mode == "get":
# downloadstore_mode(args)
# elif args.mode == "publication":
# publication_mode(args)
# else:
# raise NotImplementedError("unknown subparser")