Apache Airflow is a popular workflow management platform. A wide range of AWS services are integrated with the platform by Amazon AWS Operators. AWS Lambda is one of the integrated services, and it can be used to develop workflows efficiently. The current Lambda Operator, however, just invokes a Lambda function, and it can fail to report the invocation result of a function correctly and to record the exact error message from failure. In this post, we’ll discuss a custom Lambda operator that handles those limitations.

Architecture

We’ll discuss a custom Lambda operator, and it extends the Lambda operator provided by AWS. When a DAG creates a task that invokes a Lambda function, it updates the Lambda payload with a correlation ID that uniquely identifies the task. The correlation ID is added to every log message that the Lambda function generates. Finally, the custom operator filters the associating CloudWatch log events, prints the log messages and raises a runtime error when an error message is found. In this setup, we are able to correctly identify the function invocation result and to point to the exact error message if it fails. The source of this post can be found in a GitHub repository.

Lambda Setup

Lambda Function

The Logger utility of the Lambda Powertools Python package is used to record log messages. The correlation ID is added to the event payload, and it is set to be injected with log messages by the logger.inject_lambda_context decorator. Note the Lambda Context would be a better place to add a correlation ID as we can add a custom client context object. However, it is not recognised when an invocation is made asynchronously, and we have to add it to the event payload. We use another decorator (middleware_before_after) and it logs messages before and after the function invocation. The latter message that indicates the end of a function is important as we can rely on it in order to identify whether a function is completed without an error. If a function finishes with an error, the last log message won’t be recorded. Also, we can check if a function fails by checking a log message where its level is ERROR, and it is created by the logger.exception method. The Lambda event payload has two extra attributes - n for setting-up the number of iteration and to_fail for determining whether to raise an error.

 1# lambda/src/lambda_function.py
 2import time
 3from aws_lambda_powertools import Logger
 4from aws_lambda_powertools.utilities.typing import LambdaContext
 5from aws_lambda_powertools.middleware_factory import lambda_handler_decorator
 6
 7logger = Logger(log_record_order=["correlation_id", "level", "message", "location"])
 8
 9
10@lambda_handler_decorator
11def middleware_before_after(handler, event, context):
12    logger.info("Function started")
13    response = handler(event, context)
14    logger.info("Function ended")
15    return response
16
17
18@logger.inject_lambda_context(correlation_id_path="correlation_id")
19@middleware_before_after
20def lambda_handler(event: dict, context: LambdaContext):
21    num_iter = event.get("n", 10)
22    to_fail = event.get("to_fail", False)
23    logger.info(f"num_iter - {num_iter}, fail - {to_fail}")
24    try:
25        for n in range(num_iter):
26            logger.info(f"iter - {n + 1}...")
27            time.sleep(1)
28        if to_fail:
29            raise Exception
30    except Exception as e:
31        logger.exception("Function invocation failed...")
32        raise RuntimeError("Unable to finish loop") from e

SAM Template

The Serverless Application Model (SAM) framework is used to deploy the Lambda function. The Lambda Powertools Python package is added as a Lambda layer. The Lambda log group is configured so that messages are kept only for 1 day, and it can help reduce time to filter log events. By default, a Lambda function is invoked twice more on error when it is invoked asynchronously - the default retry attempts equals to 2. It is set to 0 as retry behaviour can be controlled by Airflow if necessary, and it can make it easier to track function invocation status.

 1# lambda/template.yml
 2AWSTemplateFormatVersion: "2010-09-09"
 3Transform: AWS::Serverless-2016-10-31
 4Description: Lambda functions used to demonstrate Lambda invoke operator with S3 log extension
 5
 6Globals:
 7  Function:
 8    MemorySize: 128
 9    Timeout: 30
10    Runtime: python3.8
11    Tracing: Active
12    Environment:
13      Variables:
14        POWERTOOLS_SERVICE_NAME: airflow
15        LOG_LEVEL: INFO
16    Tags:
17      Application: LambdaInvokeOperatorDemo
18Resources:
19  ExampleFunction:
20    Type: AWS::Serverless::Function
21    Properties:
22      FunctionName: example-lambda-function
23      Description: Example lambda function
24      CodeUri: src/
25      Handler: lambda_function.lambda_handler
26      Layers:
27        - !Sub arn:aws:lambda:${AWS::Region}:017000801446:layer:AWSLambdaPowertoolsPython:26
28  ExampleFunctionAsyncConfig:
29    Type: AWS::Lambda::EventInvokeConfig
30    Properties:
31      FunctionName: !Ref ExampleFunction
32      MaximumRetryAttempts: 0
33      Qualifier: "$LATEST"
34  LogGroup:
35    Type: AWS::Logs::LogGroup
36    Properties:
37      LogGroupName: !Sub "/aws/lambda/${ExampleFunction}"
38      RetentionInDays: 1
39
40Outputs:
41  ExampleFunction:
42    Value: !Ref ExampleFunction
43    Description: Example lambda function ARN

Lambda Operator

Lambda Invoke Function Operator

Below shows the source of the Lambda invoke function operator. After invoking a Lambda function, the _execute _method checks if the response status code indicates success and whether _FunctionError _is found in the response payload. When an invocation is made synchronously (RequestResponse invocation type), it can identify whether the invocation is successful or not because the response is returned after it finishes. However it reports a generic error message when it fails and we have to visit CloudWatch Logs if we want to check the exact error. It gets worse when it is invoked asynchronously (Event invocation type) because the response is made before the invocation finishes. In this case it is not even possible to check whether the invocation is successful.

 1...
 2
 3class AwsLambdaInvokeFunctionOperator(BaseOperator):
 4    def __init__(
 5        self,
 6        *,
 7        function_name: str,
 8        log_type: Optional[str] = None,
 9        qualifier: Optional[str] = None,
10        invocation_type: Optional[str] = None,
11        client_context: Optional[str] = None,
12        payload: Optional[str] = None,
13        aws_conn_id: str = 'aws_default',
14        **kwargs,
15    ):
16        super().__init__(**kwargs)
17        self.function_name = function_name
18        self.payload = payload
19        self.log_type = log_type
20        self.qualifier = qualifier
21        self.invocation_type = invocation_type
22        self.client_context = client_context
23        self.aws_conn_id = aws_conn_id
24
25    def execute(self, context: 'Context'):
26        hook = LambdaHook(aws_conn_id=self.aws_conn_id)
27        success_status_codes = [200, 202, 204]
28        self.log.info("Invoking AWS Lambda function: %s with payload: %s", self.function_name, self.payload)
29        response = hook.invoke_lambda(
30            function_name=self.function_name,
31            invocation_type=self.invocation_type,
32            log_type=self.log_type,
33            client_context=self.client_context,
34            payload=self.payload,
35            qualifier=self.qualifier,
36        )
37        self.log.info("Lambda response metadata: %r", response.get("ResponseMetadata"))
38        if response.get("StatusCode") not in success_status_codes:
39            raise ValueError('Lambda function did not execute', json.dumps(response.get("ResponseMetadata")))
40        payload_stream = response.get("Payload")
41        payload = payload_stream.read().decode()
42        if "FunctionError" in response:
43            raise ValueError(
44                'Lambda function execution resulted in error',
45                {"ResponseMetadata": response.get("ResponseMetadata"), "Payload": payload},
46            )
47        self.log.info('Lambda function invocation succeeded: %r', response.get("ResponseMetadata"))
48        return payload

Custom Lambda Operator

The custom Lambda operator extends the Lambda invoke function operator. It updates the Lambda payload by adding a correlation ID. The _execute _method is extended by the _log_processor _decorator function. As the name suggests, the decorator function filters all log messages that include the correlation ID and print them. This process loops over the lifetime of the invocation. While processing log events, it raises an error if an error message is found. And log event processing gets stopped when a message that indicates the end of the invocation is encountered. Finally, in order to handle the case where an invocation doesn’t finish within the timeout seconds, it raises an error at the end of the loop.

The main benefits of this approach are

  • we don’t have to rewrite Lambda invocation logic as we extend the Lambda invoke function operator,
  • we can track a lambda invocation status regardless of its invocation type, and
  • we are able to record all relevant log messages of an invocation
 1# airflow/dags/lambda_operator.py
 2...
 3
 4class CustomLambdaFunctionOperator(AwsLambdaInvokeFunctionOperator):
 5    def __init__(
 6        self,
 7        *,
 8        function_name: str,
 9        log_type: Optional[str] = None,
10        qualifier: Optional[str] = None,
11        invocation_type: Optional[str] = None,
12        client_context: Optional[str] = None,
13        payload: Optional[str] = None,
14        aws_conn_id: str = "aws_default",
15        correlation_id: str = str(uuid4()),
16        **kwargs,
17    ):
18        super().__init__(
19            function_name=function_name,
20            log_type=log_type,
21            qualifier=qualifier,
22            invocation_type=invocation_type,
23            client_context=client_context,
24            payload=json.dumps(
25                {**json.loads((payload or "{}")), **{"correlation_id": correlation_id}}
26            ),
27            aws_conn_id=aws_conn_id,
28            **kwargs,
29        )
30        self.correlation_id = correlation_id
31
32    def log_processor(func):
33        @functools.wraps(func)
34        def wrapper_decorator(self, *args, **kwargs):
35            payload = func(self, *args, **kwargs)
36            function_timeout = self.get_function_timeout()
37            self.process_log_events(function_timeout)
38            return payload
39
40        return wrapper_decorator
41
42    @log_processor
43    def execute(self, context: "Context"):
44        return super().execute(context)
45
46    def get_function_timeout(self):
47        resp = boto3.client("lambda").get_function_configuration(FunctionName=self.function_name)
48        return resp["Timeout"]
49
50    def process_log_events(self, function_timeout: int):
51        start_time = 0
52        for _ in range(function_timeout):
53            response_iterator = self.get_response_iterator(
54                self.function_name, self.correlation_id, start_time
55            )
56            for page in response_iterator:
57                for event in page["events"]:
58                    start_time = event["timestamp"]
59                    message = json.loads(event["message"])
60                    print(message)
61                    if message["level"] == "ERROR":
62                        raise RuntimeError("ERROR found in log")
63                    if message["message"] == "Function ended":
64                        return
65            time.sleep(1)
66        raise RuntimeError("Lambda function end message not found after function timeout")
67
68    @staticmethod
69    def get_response_iterator(function_name: str, correlation_id: str, start_time: int):
70        paginator = boto3.client("logs").get_paginator("filter_log_events")
71        return paginator.paginate(
72            logGroupName=f"/aws/lambda/{function_name}",
73            filterPattern=f'"{correlation_id}"',
74            startTime=start_time + 1,
75        )

Unit Testing

Unit testing is performed for the main log processing function (process_log_events). Log events fixture is created by a closure function. Depending on the case argument, it returns a log events list that covers success, error or timeout error. It is used as the mock response of the _get_response_iterator _method. The 3 testing cases cover each of the possible scenarios.

 1# airflow/tests/test_lambda_operator.py
 2import json
 3import pytest
 4from unittest.mock import MagicMock
 5from dags.lambda_operator import CustomLambdaFunctionOperator
 6
 7
 8@pytest.fixture
 9def log_events():
10    def _(case):
11        events = [
12            {
13                "timestamp": 1659296879605,
14                "message": '{"correlation_id":"2850fda4-9005-4375-aca8-88dfdda222ba","level":"INFO","message":"Function started","location":"middleware_before_after:12","timestamp":"2022-07-31 19:47:59,605+0000","service":"airflow", ...}\n',
15            },
16            {
17                "timestamp": 1659296879605,
18                "message": '{"correlation_id":"2850fda4-9005-4375-aca8-88dfdda222ba","level":"INFO","message":"num_iter - 10, fail - False","location":"lambda_handler:23","timestamp":"2022-07-31 19:47:59,605+0000","service":"airflow", ...}\n',
19            },
20            {
21                "timestamp": 1659296879605,
22                "message": '{"correlation_id":"2850fda4-9005-4375-aca8-88dfdda222ba","level":"INFO","message":"iter - 1...","location":"lambda_handler:26","timestamp":"2022-07-31 19:47:59,605+0000","service":"airflow", ...}\n',
23            },
24        ]
25        if case == "success":
26            events.append(
27                {
28                    "timestamp": 1659296889620,
29                    "message": '{"correlation_id":"2850fda4-9005-4375-aca8-88dfdda222ba","level":"INFO","message":"Function ended","location":"middleware_before_after:14","timestamp":"2022-07-31 19:48:09,619+0000","service":"airflow", ...}\n',
30                }
31            )
32        elif case == "error":
33            events.append(
34                {
35                    "timestamp": 1659296889629,
36                    "message": '{"correlation_id":"2850fda4-9005-4375-aca8-88dfdda222ba","level":"ERROR","message":"Function invocation failed...","location":"lambda_handler:31","timestamp":"2022-07-31 19:48:09,628+0000","service":"airflow", ..., "exception":"Traceback (most recent call last):\\n  File \\"/var/task/lambda_function.py\\", line 29, in lambda_handler\\n    raise Exception\\nException","exception_name":"Exception","xray_trace_id":"1-62e6dc6f-30b8e51d000de0ee5a22086b"}\n',
37                },
38            )
39        return [{"events": events}]
40
41    return _
42
43
44def test_process_log_events_success(log_events):
45    success_resp = log_events("success")
46    operator = CustomLambdaFunctionOperator(
47        task_id="sync_w_error",
48        function_name="",
49        invocation_type="RequestResponse",
50        payload=json.dumps({"n": 1, "to_fail": True}),
51        aws_conn_id=None,
52    )
53    operator.get_response_iterator = MagicMock(return_value=success_resp)
54    assert operator.process_log_events(1) == None
55
56
57def test_process_log_events_fail_with_error(log_events):
58    fail_resp = log_events("error")
59    operator = CustomLambdaFunctionOperator(
60        task_id="sync_w_error",
61        function_name="",
62        invocation_type="RequestResponse",
63        payload=json.dumps({"n": 1, "to_fail": True}),
64        aws_conn_id=None,
65    )
66    operator.get_response_iterator = MagicMock(return_value=fail_resp)
67    with pytest.raises(RuntimeError) as e:
68        operator.process_log_events(1)
69    assert "ERROR found in log" == str(e.value)
70
71
72def test_process_log_events_fail_by_timeout(log_events):
73    fail_resp = log_events(None)
74    operator = CustomLambdaFunctionOperator(
75        task_id="sync_w_error",
76        function_name="",
77        invocation_type="RequestResponse",
78        payload=json.dumps({"n": 1, "to_fail": True}),
79        aws_conn_id=None,
80    )
81    operator.get_response_iterator = MagicMock(return_value=fail_resp)
82    with pytest.raises(RuntimeError) as e:
83        operator.process_log_events(1)
84    assert "Lambda function end message not found after function timeout" == str(e.value)

Below shows the results of the testing.

 1$ pytest airflow/tests/test_lambda_operator.py -v
 2============================================ test session starts =============================================
 3platform linux -- Python 3.8.10, pytest-7.1.2, pluggy-1.0.0 -- /home/jaehyeon/personal/revisit-lambda-operator/venv/bin/python3
 4cachedir: .pytest_cache
 5rootdir: /home/jaehyeon/personal/revisit-lambda-operator
 6plugins: anyio-3.6.1
 7collected 3 items                                                                                            
 8
 9airflow/tests/test_lambda_operator.py::test_process_log_events_success PASSED                          [ 33%]
10airflow/tests/test_lambda_operator.py::test_process_log_events_fail_with_error PASSED                  [ 66%]
11airflow/tests/test_lambda_operator.py::test_process_log_events_fail_by_timeout PASSED                  [100%]
12
13============================================= 3 passed in 1.34s ==============================================

Compare Operators

Docker Compose

In order to compare the two operators, the Airflow Docker quick start guide is simplified into using the Local Executor. In this setup, both scheduling and task execution are handled by the airflow scheduler service. Instead of creating an AWS connection for invoking Lambda functions, the host AWS configuration is shared by volume-mapping (${HOME}/.aws to /home/airflow/.aws). Also, as I don’t use the default AWS profile but a profile named cevo, it is added to the scheduler service as an environment variable (AWS_PROFILE: “cevo”).

 1# airflow/docker-compose.yaml
 2---
 3version: "3"
 4x-airflow-common: &airflow-common
 5  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.3.3}
 6  environment: airflow-common-env
 7    AIRFLOW__CORE__EXECUTOR: LocalExecutor
 8    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
 9    # For backward compatibility, with Airflow <2.3
10    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
11    AIRFLOW__CORE__FERNET_KEY: ""
12    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
13    AIRFLOW__CORE__LOAD_EXAMPLES: "false"
14    AIRFLOW__API__AUTH_BACKENDS: "airflow.api.auth.backend.basic_auth"
15    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
16  volumes:
17    - ./dags:/opt/airflow/dags
18    - ./logs:/opt/airflow/logs
19    - ./plugins:/opt/airflow/plugins
20    - ${HOME}/.aws:/home/airflow/.aws
21  user: "${AIRFLOW_UID:-50000}:0"
22  depends_on: &airflow-common-depends-on
23    postgres:
24      condition: service_healthy
25
26services:
27  postgres:
28    image: postgres:13
29    ports:
30      - 5432:5432
31    environment:
32      POSTGRES_USER: airflow
33      POSTGRES_PASSWORD: airflow
34      POSTGRES_DB: airflow
35    volumes:
36      - postgres-db-volume:/var/lib/postgresql/data
37    healthcheck:
38      test: ["CMD", "pg_isready", "-U", "airflow"]
39      interval: 5s
40      retries: 5
41
42  airflow-webserver:
43    <<: *airflow-common
44    command: webserver
45    ports:
46      - 8080:8080
47    depends_on:
48      <<: *airflow-common-depends-on
49      airflow-init:
50        condition: service_completed_successfully
51
52  airflow-scheduler:
53    <<: *airflow-common
54    command: scheduler
55    environment:
56      <<: *airflow-common-env
57      AWS_PROFILE: "cevo"
58    depends_on:
59      <<: *airflow-common-depends-on
60      airflow-init:
61        condition: service_completed_successfully
62
63  airflow-init:
64    <<: *airflow-common
65    entrypoint: /bin/bash
66    # yamllint disable rule:line-length
67    command:
68      - -c
69      - |
70        if [[ -z "${AIRFLOW_UID}" ]]; then
71          echo
72          echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
73          echo "If you are on Linux, you SHOULD follow the instructions below to set "
74          echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
75          echo "For other operating systems you can get rid of the warning with manually created .env file:"
76          echo "    See: https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#setting-the-right-airflow-user"
77          echo
78        fi
79        mkdir -p /sources/logs /sources/dags /sources/plugins
80        chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
81        exec /entrypoint airflow version        
82    # yamllint enable rule:line-length
83    environment:
84      <<: *airflow-common-env
85      _AIRFLOW_DB_UPGRADE: "true"
86      _AIRFLOW_WWW_USER_CREATE: "true"
87      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
88      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
89      _PIP_ADDITIONAL_REQUIREMENTS: ""
90    user: "0:0"
91    volumes:
92      - .:/sources
93
94volumes:
95  postgres-db-volume:

The quick start guide requires a number of steps to initialise an environment before starting the services and they are added to a single shell script shown below.

 1# airflow/init.sh
 2#!/usr/bin/env bash
 3
 4## initialising environment
 5# remove docker-compose services
 6docker-compose down --volumes
 7# create folders to mount
 8rm -rf ./logs
 9mkdir -p ./dags ./logs ./plugins ./tests
10# setting the right airflow user
11echo -e "AIRFLOW_UID=$(id -u)" > .env
12# initialise database
13docker-compose up airflow-init

After finishing the initialisation steps, the docker compose services can be started by docker-compose up -d.

Lambda Invoke Function Operator

Two tasks are created with the Lambda invoke function operator. The first is invoked synchronously (RequestResponse) while the latter is asynchronously (Event). Both are configured to raise an error after 10 seconds.

 1# airflow/dags/example_without_logging.py
 2import os
 3import json
 4from datetime import datetime
 5
 6from airflow import DAG
 7from airflow.providers.amazon.aws.operators.aws_lambda import AwsLambdaInvokeFunctionOperator
 8
 9LAMBDA_FUNCTION_NAME = os.getenv("LAMBDA_FUNCTION_NAME", "example-lambda-function")
10
11
12def _set_payload(n: int = 10, to_fail: bool = True):
13    return json.dumps({"n": n, "to_fail": to_fail})
14
15
16with DAG(
17    dag_id="example_without_logging",
18    schedule_interval=None,
19    start_date=datetime(2022, 1, 1),
20    max_active_runs=2,
21    concurrency=2,
22    tags=["logging"],
23    catchup=False,
24) as dag:
25    [
26        AwsLambdaInvokeFunctionOperator(
27            task_id="sync_w_error",
28            function_name=LAMBDA_FUNCTION_NAME,
29            invocation_type="RequestResponse",
30            payload=_set_payload(),
31            aws_conn_id=None,
32        ),
33        AwsLambdaInvokeFunctionOperator(
34            task_id="async_w_error",
35            function_name=LAMBDA_FUNCTION_NAME,
36            invocation_type="Event",
37            payload=_set_payload(),
38            aws_conn_id=None,
39        ),
40    ]

As shown below the task by asynchronous invocation is incorrectly marked as success. It is because practically only the response status code is checked as it doesn’t wait until the invocation finishes. On the other hand, the task by synchronous invocation is indicated as failed. However, it doesn’t show the exact error that fails the invocation - see below for further details.

The error message is Lambda function execution resulted in error, and it is the generic message constructed by the Lambda invoke function operator.

Custom Lambda Operator

Five tasks are created with the custom Lambda operator The first four tasks cover success and failure by synchronous and asynchronous invocations. The last task is to check failure due to timeout.

 1# airflow/dags/example_with_logging.py
 2import os
 3import json
 4from datetime import datetime
 5
 6from airflow import DAG
 7from lambda_operator import CustomLambdaFunctionOperator
 8
 9LAMBDA_FUNCTION_NAME = os.getenv("LAMBDA_FUNCTION_NAME", "example-lambda-function")
10
11
12def _set_payload(n: int = 10, to_fail: bool = True):
13    return json.dumps({"n": n, "to_fail": to_fail})
14
15
16with DAG(
17    dag_id="example_with_logging",
18    schedule_interval=None,
19    start_date=datetime(2022, 1, 1),
20    max_active_runs=2,
21    concurrency=5,
22    tags=["logging"],
23    catchup=False,
24) as dag:
25    [
26        CustomLambdaFunctionOperator(
27            task_id="sync_w_error",
28            function_name=LAMBDA_FUNCTION_NAME,
29            invocation_type="RequestResponse",
30            payload=_set_payload(),
31            aws_conn_id=None,
32        ),
33        CustomLambdaFunctionOperator(
34            task_id="async_w_error",
35            function_name=LAMBDA_FUNCTION_NAME,
36            invocation_type="Event",
37            payload=_set_payload(),
38            aws_conn_id=None,
39        ),
40        CustomLambdaFunctionOperator(
41            task_id="sync_wo_error",
42            function_name=LAMBDA_FUNCTION_NAME,
43            invocation_type="RequestResponse",
44            payload=_set_payload(to_fail=False),
45            aws_conn_id=None,
46        ),
47        CustomLambdaFunctionOperator(
48            task_id="async_wo_error",
49            function_name=LAMBDA_FUNCTION_NAME,
50            invocation_type="Event",
51            payload=_set_payload(to_fail=False),
52            aws_conn_id=None,
53        ),
54        CustomLambdaFunctionOperator(
55            task_id="async_timeout_error",
56            function_name=LAMBDA_FUNCTION_NAME,
57            invocation_type="Event",
58            payload=_set_payload(n=40, to_fail=False),
59            aws_conn_id=None,
60        ),
61    ]

As expected we see two success tasks and three failure tasks. The custom Lambda operator tracks Lambda function invocation status correctly.

Below shows log messages of the success task by asynchronous invocation. Each message includes the same correlation ID and the last message from the Lambda function is Function ended.

The failed task by asynchronous invocation also shows all log messages, and it is possible to check what caused the invocation to fail.

The case of failure due to timeout doesn’t show an error message from the Lambda invocation. However, we can treat it as failure because we don’t see the message of the function invocation ended within the function timeout.

Still the failure by synchronous invocation doesn’t show the exact error message, and it is because an error is raised before the process log events function is executed. Because of this, I advise to invoke a Lambda function asynchronously.

Summary

In this post, we discussed limitations of the Lambda invoke function operator and created a custom Lambda operator. The custom operator reports the invocation result of a function correctly and records the exact error message from failure. A number of tasks are created to compare the results between the two operators, and it is shown that the custom operator handles those limitations successfully.