Handler
Your handler can be used to process any asynchronous workloads. It can also be used for running ML models using a variety of frameworks such as: PyTorch, ONNX, scikit-learn, XGBoost, TensorFlow (if not using SavedModel
s), etc.
If you plan on deploying models with TensorFlow in SavedModel
format, you can also use the TensorFlow Handler that was specifically built for this purpose.
Project files
Cortex makes all files in the project directory (i.e. the directory which contains cortex.yaml
) available for use in your handler implementation. Python bytecode files (*.pyc
, *.pyo
, *.pyd
), files or folders that start with .
, and the api configuration file (e.g. cortex.yaml
) are excluded.
The following files can also be added at the root of the project's directory:
.cortexignore
file, which follows the same syntax and behavior as a .gitignore file. This may be necessary if you are reaching the size limit for your project directory (32mb)..env
file, which exports environment variables that can be used in the handler. Each line of this file must followthe
VARIABLE=value
format.
For example, if your directory looks like this:
./my-classifier/
├── cortex.yaml
├── values.json
├── handler.py
├── ...
└── requirements.txt
You can access values.json
in your Handler class like this:
# handler.py
import json
class Handler:
def __init__(self, config):
with open('values.json', 'r') as values_file:
values = json.load(values_file)
self.values = values
Interface
# initialization code and variables can be declared here in global scope
class Handler:
def __init__(self, config, metrics_client):
"""(Required) Called once before the API becomes available. Performs
setup such as downloading/initializing the model or downloading a
vocabulary.
Args:
config (required): Dictionary passed from API configuration (if
specified). This may contain information on where to download
the model and/or metadata.
metrics_client (optional): The cortex metrics client, which allows
you to push custom metrics in order to build custom dashboards
in grafana.
"""
pass
def handle_async(self, payload, request_id):
"""(Required) Called once per request. Preprocesses the request payload
(if necessary), runs the workload, and postprocesses the resulting output
(if necessary).
Args:
payload (optional): The request payload (see below for the possible
payload types).
request_id (optional): The request id string that identifies a workload
Returns:
JSON-serializeable result.
"""
pass
For proper separation of concerns, it is recommended to use the constructor's config
parameter for information such as from where to download the model and initialization files, or any configurable model parameters. You define config
in your API configuration, and it is passed through to your handler's constructor.
Your API can accept requests with different types of payloads. Navigate to the API requests section to learn about how headers can be used to change the type of payload
that is passed into your handle_async
method.
At this moment, the AsyncAPI handle_async
method can only return JSON
-parseable objects. Navigate to the API responses section to learn about how to configure it.
API requests
The type of the payload
parameter in handle_async(self, payload)
can vary based on the content type of the request. The payload
parameter is parsed according to the Content-Type
header in the request. Here are the parsing rules (see below for examples):
For
Content-Type: application/json
,payload
will be the parsed JSON body.For
Content-Type: text/plain
,payload
will be a string.utf-8
encoding is assumed, unless specified otherwise (e.g. via
Content-Type: text/plain; charset=us-ascii
)For all other
Content-Type
values,payload
will be the rawbytes
of the request body.
Here are some examples:
JSON data
Making the request
curl http://***.amazonaws.com/my-api \
-X POST -H "Content-Type: application/json" \
-d '{"key": "value"}'
Reading the payload
When sending a JSON payload, the payload
parameter will be a Python object:
class Handler:
def __init__(self, config):
pass
def handle_async(self, payload):
print(payload["key"]) # prints "value"
Binary data
Making the request
curl http://***.amazonaws.com/my-api \
-X POST -H "Content-Type: application/octet-stream" \
--data-binary @object.pkl
Reading the payload
Since the Content-Type: application/octet-stream
header is used, the payload
parameter will be a bytes
object:
import pickle
class Handler:
def __init__(self, config):
pass
def handle_async(self, payload):
obj = pickle.loads(payload)
print(obj["key"]) # prints "value"
Here's an example if the binary data is an image:
from PIL import Image
import io
class Handler:
def __init__(self, config):
pass
def handle_async(self, payload):
img = Image.open(io.BytesIO(payload)) # read the payload bytes as an image
print(img.size)
Text data
Making the request
curl http://***.amazonaws.com/my-api \
-X POST -H "Content-Type: text/plain" \
-d "hello world"
Reading the payload
Since the Content-Type: text/plain
header is used, the payload
parameter will be a string
object:
class Handle:
def __init__(self, config):
pass
def handle_async(self, payload):
print(payload) # prints "hello world"
API responses
The return value of your handle_async()
method must be a JSON-serializable dictionary. The result for each request will remain queryable for 7 days after the request was completed.
Chaining APIs
It is possible to make requests from one API to another within a Cortex cluster. All running APIs are accessible from within the handler at http://api-<api_name>:8888/
, where <api_name>
is the name of the API you are making a request to.
For example, if there is an api named text-generator
running in the cluster, you could make a request to it from a different API by using:
import requests
class Handler:
def handle_async(self, payload):
response = requests.post("http://api-text-generator:8888/", json={"text": "machine learning is"})
# ...
Structured logging
You can use Cortex's logger in your handler implemention to log in JSON. This will enrich your logs with Cortex's metadata, and you can add custom metadata to the logs by adding key value pairs to the extra
key when using the logger. For example:
...
from cortex_internal.lib.log import logger as log
class Handler:
def handle_async(self, payload):
log.info("received payload", extra={"payload": payload})
The dictionary passed in via the extra
will be flattened by one level. e.g.
{"asctime": "2021-01-19 15:14:05,291", "levelname": "INFO", "message": "received payload", "process": 235, "payload": "this movie is awesome"}
To avoid overriding essential Cortex metadata, please refrain from specifying the following extra keys: asctime
, levelname
, message
, labels
, and process
. Log lines greater than 5 MB in size will be ignored.
Last updated