项目作者: olitheolix

项目描述 :
Asynchronous Kubernetes Client
高级语言: Python
项目地址: git://github.com/olitheolix/aiokubernetes.git
创建时间: 2018-06-29T15:02:59Z
项目社区:https://github.com/olitheolix/aiokubernetes

开源协议:Apache License 2.0

下载


Build Status
PyPI version
pypi supported versions

Sync and Async Kubernetes Client

See the documentation and
examples for usage instructions.

Key Features

Getting Started

  1. pip install aiokubernetes

List all Pods

  1. import asyncio
  2. import aiokubernetes as k8s
  3. async def main():
  4. # Load default kubeconfig file and create an aiohttp client instance.
  5. config = k8s.utils.load_config(warn=False)
  6. client = k8s.clients.get_aiohttp(config)
  7. proxy = k8s.api_proxy.Proxy(config)
  8. # Ask Kubernetes for all Pods.
  9. cargs = k8s.api.CoreV1Api(proxy).list_namespace(watch=False)
  10. ret = await client.request(**cargs)
  11. # Ensure the API call to Kubernetes did succeed.
  12. assert ret.status == 200
  13. # Optional: wrap the JSon response into a Swagger generated Python object.
  14. obj = k8s.swagger.unpack(await ret.read())
  15. # Print the pod names.
  16. for i in obj.items:
  17. print(f"{i.metadata.namespace} {i.metadata.name}")
  18. # Close all pending client connections.
  19. await client.close()
  20. if __name__ == '__main__':
  21. loop = asyncio.get_event_loop()
  22. loop.run_until_complete(main())
  23. loop.close()

Mix Synchronous and Asynchronous Clients To List Pods

The following example lists pods, just like the previous one. The difference is
that it does so twice, once with the asynchronous
AioHttp library and once with the
synchronous Requests library. The
example does not use any convenience function to configure the SSL certificates
for the (a)sync client. This is to emphasise that aiokubernetes does not add
any secret sauce to use the Kubernetes API because Kubernetes is, after all,
just that: an API. aiokubernetes conveniently compiles the necessary headers,
request body and URL for the selected resource endpoint. The rest is up to you.

  1. import asyncio
  2. import aiohttp
  3. import requests
  4. import ssl
  5. import aiokubernetes as k8s
  6. async def main():
  7. # Load default kubeconfig file and create an aiohttp client instance.
  8. config = k8s.utils.load_config(warn=False)
  9. # --- Compile the API request parameters to list all Pods in the cluster ---
  10. # `cargs` is a dict with keys for 'data' (body), 'headers', 'method' (eg
  11. # GET, POST), and 'url'. This encompasses the necessary information _any_
  12. # REST client would need to successfully list the K8s namespaces.
  13. cargs = k8s.api.CoreV1Api(k8s.api_proxy.Proxy(config)).list_namespace(watch=False)
  14. # --- Setup the AioHttp client ---
  15. connector = aiohttp.TCPConnector(
  16. limit=4,
  17. ssl_context=ssl.create_default_context(cafile=config.ssl_ca_cert),
  18. verify_ssl=True,
  19. )
  20. client_aiohttp = aiohttp.ClientSession(connector=connector)
  21. # --- Setup the Requests client ---
  22. client_requests = requests.Session()
  23. client_requests.verify = config.ssl_ca_cert
  24. # --- Ask with asynchronous client ---
  25. ret = await client_aiohttp.request(**cargs)
  26. assert ret.status == 200
  27. text_async = await ret.text()
  28. await client_aiohttp.close()
  29. # --- Ask with synchronous client ---
  30. ret = client_requests.request(**cargs, verify=config.ssl_ca_cert)
  31. assert ret.status_code == 200
  32. text_sync = ret.text
  33. # --- Verify that both clients returned the same data ---
  34. assert text_sync == text_async
  35. # Optional: wrap the raw (byte encoded) response into a Swagger object.
  36. obj = k8s.swagger.unpack(text_sync.encode('utf8'))
  37. # Print the pod names.
  38. for i in obj.items:
  39. print(f"{i.metadata.namespace} {i.metadata.name}")
  40. if __name__ == '__main__':
  41. loop = asyncio.get_event_loop()
  42. loop.run_until_complete(main())
  43. loop.close()

Concurrently Watch Multiple Resources

Watch several Kubernetes resources for 5 seconds then shut down gracefully.

  1. import asyncio
  2. import aiokubernetes as k8s
  3. async def watch_resource(client, cargs):
  4. """Consume and print the events as they stream in."""
  5. # Use helper class to consume the K8s events via an async iterator.
  6. watch = k8s.watch.AioHttpClientWatch(client.request(**cargs))
  7. async for event in watch:
  8. print(f"{event.name} {event.obj.kind} {event.obj.metadata.name}")
  9. async def main():
  10. # Load default kubeconfig file and create an aiohttp client instance.
  11. config = k8s.utils.load_config(warn=False)
  12. client = k8s.clients.get_aiohttp(config)
  13. proxy = k8s.api_proxy.Proxy(config)
  14. # Namespaces and Pods are in the K8s Core API, Deployments in an extension (unless you run a very recent K8s version).
  15. corev1 = k8s.CoreV1Api(proxy)
  16. extv1beta = k8s.ExtensionsV1beta1Api(proxy)
  17. # Compile the necessary request parameters (headers, body, parameters, ...)
  18. # for the API calls we want to make. We also specify watch=True because we
  19. # want to listen to changes.
  20. cargs_ns = corev1.list_namespace(timeout_seconds=5, watch=True)
  21. cargs_pods = corev1.list_pod_for_all_namespaces(timeout_seconds=5, watch=True)
  22. cargs_deploy = extv1beta.list_deployment_for_all_namespaces(timeout_seconds=5, watch=True) # noqa
  23. # Define and dispatch the tasks.
  24. tasks = [
  25. watch_resource(client, cargs_ns),
  26. watch_resource(client, cargs_pods),
  27. watch_resource(client, cargs_deploy),
  28. ]
  29. await asyncio.gather(*tasks)
  30. # Close all pending connections.
  31. await client.close()
  32. if __name__ == '__main__':
  33. # Setup event loop and start the program.
  34. loop = asyncio.get_event_loop()
  35. loop.run_until_complete(asyncio.ensure_future(main()))
  36. loop.close()

Source code

  1. git clone https://github.com/olitheolix/aiokubernetes.git
  2. cd aiokubernetes
  3. pip install -r requirements.txt
  4. pip install -r test-requirements.txt

Feel free to submit pull requests and issues.

Unit Tests

We use pytest:

  1. # Run tests sequentially.
  2. pytest
  3. # Run tests in 8 parallel processes.
  4. pytest -n8

CI Tests

Travis runs all tests in
Docker containers (see .travis for details).

You can run these containerised tests locally as well to ensure Travis on
Github will approve your build before you push it.

  1. docker run -ti -v`pwd`:/src python:3.6-alpine3.7 /bin/ash -c "cd /src; ash scripts/run-ci-tests.sh"
  2. docker run -ti -v`pwd`:/src python:3.7-alpine3.7 /bin/ash -c "cd /src; ash scripts/run-ci-tests.sh"

Motivation

Like Python Kubernetes and
kubernetes_asyncio, this
client library uses the
Swagger-generated Api- and model
classes. Unlike those two libraries, it only supports Python 3.6+. All Python
2.x code paths are gone, as is the dependency on
six.

Also unlike
kubernetes_asyncio,
aiokubernetes is not backwards compatible with the synchronous Python
Kubernetes
client. The
differences are, however, minor from a user’s point of view (see the
examples/).

The primary difference is the increased emphasis on “Explicit is better than
implicit” (Zen of Python). To that
end, aiokubernetes never creates implicit HTTP session objects (the user
must supply ApiClient instances) and the library always returns the
native HTTP response object.

The HTTP response object is particularly useful for debugging because it
provides trivial access to Url, method, path, headers, status code and raw
response data before it was de-serialised into a Swagger class.

The plan is to simplify the new ApiClient further and cover it with tests
along the way because the auto-generated api_client.py and rest.py classes
have none, unfortunately.

License

aiokubernetes is licensed under the Apache 2 license.