项目作者: michaelconnor00

项目描述 :
Extension to DigitalGlobe/gbdxtools to provide local workflow execution.
高级语言: Python
项目地址: git://github.com/michaelconnor00/gbdxrun.git
创建时间: 2017-09-18T18:44:26Z
项目社区:https://github.com/michaelconnor00/gbdxrun

开源协议:

下载


gbdxrun

Extension to DigitalGlobe/gbdxtools to provide local workflow execution.

Known compatibility issues

The input port directories must be mounted to the Docker container, However string input ports are written to the parent dir /mnt/work/input, which cannot be mounted if the childs are already mounted. At least this was the case at the time of writing the code.

As a work around, all string input ports are provided to the Docker container as Env Vars with a prefix gbdx-input-port-<port_name>=<port_value>. So any task executed with gbdxrun, must be able to fetch string input ports from env vars. For example, the GBDXTaskInterface:

  1. def __init__(self, work_path="/mnt/work/"):
  2. >>> above not shown for clarity <<<
  3. string_input_ports = os.path.join(self.__work_path, 'input', "ports.json")
  4. if os.path.exists(string_input_ports):
  5. with open(string_input_ports, 'r') as f:
  6. self.__string_input_ports = self.load_json_string_inputs(f)
  7. else:
  8. self.logger.info("Input port file doesn't exist: %s, load from ENV VARS" % string_input_ports)
  9. self.__string_input_ports = self.load_env_string_inputs()
  10. @staticmethod
  11. def load_json_string_inputs(file_obj):
  12. """
  13. Load all string values from the json file
  14. Note: ENVI requires lists and dictionarys as well as literal types.
  15. So each value should also be parsed.
  16. """
  17. string_inputs = json.load(file_obj)
  18. for key, value in string_inputs.iteritems():
  19. try:
  20. # Try to load the value
  21. string_inputs[key] = json.loads(value)
  22. except (TypeError, ValueError):
  23. # If it fails, do nothing.
  24. pass
  25. return string_inputs
  26. @staticmethod
  27. def load_env_string_inputs():
  28. # Allow string ports to be stored as env vars
  29. env_var_ports = {}
  30. for key, value in os.environ.iteritems():
  31. if key.startswith('gbdx-input-port-'):
  32. try:
  33. # Try to parse to JSON first, for dict and list which will be str -> json.dump'd
  34. env_var_ports[key[len('gbdx-input-port-'):]] = json.loads(json.loads(value))
  35. except ValueError as e:
  36. # If the above fails, then the value is a string (not an embedded dict or list)
  37. value = json.loads(value)
  38. value = True if value == 'True' else value
  39. value = False if value == 'False' else value
  40. env_var_ports[key[len('gbdx-input-port-'):]] = value
  41. return env_var_ports

Usage example

Install

  1. pip install git+https://github.com/michaelconnor00/gbdxrun.git

Run a gbdxtools script

  1. from gbdxrun import gbdx
  2. task = gbdx.Task("Mytask")
  3. wf = gbdx.Workflow([task])
  4. wf.savedata(
  5. task.outputs.envi_task_definitions,
  6. location='some/dir/path'
  7. )
  8. wf.execute()

Run a gbdxtools script with a local task definition

  1. from gbdxrun import gbdx
  2. import time
  3. import json
  4. # set task_definition
  5. fp = '<FILEPATH_TO_TASK_DEFINITION.JSON>'
  6. with open(fp) as json_data:
  7. defn = json.load(json_data)
  8. keywords = {'task_definition':defn}
  9. task = gbdx.Task("ENVI_CastRaster",**keywords)
  10. task.inputs.input_raster = 'some/dir/path'
  11. task.inputs.data_type = "float"
  12. wf = gbdx.Workflow([task])
  13. wf.savedata(task.outputs.output_raster_uri, location='some/dir/path')
  14. print('Workflow ID: %s', wf.execute())
  15. while not wf.complete:
  16. time.sleep(20)
  17. print('Workflow Complete')

Then it can be executed from a terminal by running the script (below), or you can use iPython or Jupyter to run the workflow interactively.

  1. > python myscript.py

Once the workflow is executed, you will see each task’s stdout/stderr appear in you console/shell/browser.

Development

Contributing

Please contribute! Please make pull requests directly to master. Before making a pull request, please:

  • Ensure that all new functionality is covered by unit tests.
  • Verify that all unit tests are passing. TODO use travis to run tests on PRs.
  • Ensure that all functionality is properly documented. TODO use mkdocs or sphinx to generate docs for readthedocs.org
  • Fix all versions in setup.py
  • Update the Changelog with the new features, bug fixes, etc.

Run Tests

TO COME …

Create a new version

To create a new version (after all tests have passed and PR is merged):

  1. bumpversion ( major | minor | patch )
  2. git push --tags

Versioning is according to Symantic Versioning system.