Async Callbacks with AWS Step Functions and Task Tokens

Zac Charles
7 min readOct 7, 2019

Step Functions is one of my favourite AWS services. I enjoy experimenting with it, especially when new features are released. In my opinion, two of the most important changes since launch have been the additional Service Integrations and Task Tokens.

Prior to the new Service Integrations, if you wanted to interact with other AWS services from your Step Function, you needed to invoke a Lambda function that would call the other service. On Nov 29, 2018, AWS added eight new Service Integrations that allow you to directly call more core services like DynamoDB, SQS, and SNS.

Service Integrations give Step Functions a nice way to access other services. On the other hand, until recently, the only way the outside world could interact with a running Step Function was to abort it. That changed on May 23, 2019 when Task Tokens were released as an extension to the Task state.

Now, when you append .waitForTaskToken to a Task state’s resource, an opaque token is generated for you to pass to another service. For example, you could include the token in the message you’re sending to an SQS queue. The Step Function will then pause until a task success or failure API call is made containing the token. The callback also contains a JSON string that becomes the output of the Task state.

Service Integrations combined with Task Tokens enables native asynchronous communication. Previously, you’d achieve this by polling a database or having multiple Step Functions, both of which are more expensive and complex. The poster child for this functionality is implementing manual approval steps. However, it can also be used in automated flows to deal with temporal coupling.

Temporal coupling occurs when one service can’t continue without a response from another service. For example, a Step Function is temporally coupled to another service if it synchronously calls its REST API. Imagine an order processing Step Function that needs to call a Payment API to authorize a payment. In these cases, you must pay for a Lambda function to wait and you might pay for retries.

To remove temporal coupling, the other service should accept requests while its down and be able to respond while the requester is down. Also, following serverless principles, nothing should happen or cost anything until the response is ready. That means no polling and or synchronous waiting.

In this post, I’ll show how to implement asynchronous communication from a Step Function to another service via an SQS queue. I assume you have some Step Functions knowledge already, but will provide links where possible.

👩‍🏫 Basic example

Let’s take a look at the most basic example. To make things easier, I’m using the Serverless Framework with the Serverless Step Functions plugin. For simplicity, there will only be one service talking to itself. In the real world, this would usually be two separate services communicating.

The full code for this example can be found here on GitHub 📥.

⚡ serverless.yml

The example service is written in Node.js, and uses two plugins:

The SQS queue defined in the resources section will buffer requests from the Step Function to the “worker” service. Using a queue means the Step Function can send requests while the worker service is down, and the worker can process them when it’s healthy again.

The stepFunctions section comes from the first plugin. Here I define a very simple Step Function with one Task state named PublishAndWait.

Take a look at the Resource property of the PublishAndWait state. On the end is waitForTaskToken which converts this Task from a simple API call to one that subsequently waits a callback.

In the Parameters property I provide the URL of the queue defined earlier. I also configure the MessageBody so that it gets the input of the Task state and the Task Token.

Below is an example of what the messages sent to SQS will look like. Since the Task is the first state, its input comes from the Step Function execution itself.

Lastly, I define the worker function. It’s configured with an SQS event trigger linked to the same queue the Step Function will send to.

The iamRoleStatements property comes from the second plugin. I’m allowing the states:SendTaskSuccess and states:SendTaskFailure actions so the worker can notify the Step Function when the response is ready. These actions don’t support resource-level permissions, so Resource is all (*). This is nice because the worker doesn’t need to know about all of the Step Functions that may send it requests. The task tokes themselves are around 620 characters long which seems secure enough.

🤖 worker.js

As I said earlier, the worker would normally be a different service, but it’s defined here for the sake of simplicity.

I’m importing the uuid package to do some fake work, along with the AWS SDK from which I create a Step Functions API client.

The sleep function is just a little helper that I’ll use to make it look like the worker is doing something difficult.

Let’s dive into the handler logic.

It starts by sleeping for 2 seconds to pretend to be working. Then we check if the input number is even which will affect the result.

If number is even, we call SendTaskSuccess. We must JSON.stringify the output JSON before assigning it to the output param. uuidv4() is called to give the output some spice.

On the other hand, if number is odd, we call SendTaskFailure to report an error and its cause. You could use these for error handling with Catch and Error properties on the Task state.

👀 Trying it out

After deploying the service, we can try it out by starting a new execution of the Step Function using the AWS console. Here I pass the number 12, which is even, so I expect it to succeed.

The Execution event history can be used to keep track of the execution.

  1. ExecutionStarted — The Step Function execution was started.
  2. TaskStateEntered — It transitioned to the PublishAndWait state.
  3. TaskScheduled — The sqs:sendMessage.waitForTaskToken task was scheduled to run behind the scenes.
  4. TaskStarted — The behind the scenes task runner picked up our task.
  5. TaskSubmitted — The message was sent to the SQS queue. You can see the SQS API RequestId at the bottom.

❄️ The execution is paused at this point.

6. TaskSucceeded— Over five seconds has elapsed. The worker handled the message and called SendTaskSuccess. See the spicy UUID in result? 🌶️

7. TaskStateExited — The Step Function is awake again and has exited the PublishAndWait state. The output from the worker became the state’s output.

8. ExecutionSucceeded — There was only one state, so the execution is done.

Scrolling back up to the diagram, we can click on the PublishAndWait state to see the same input and output with result.

☠️ Failures

Let’s quickly see what failures look like. This time I’ll pass a 7 in the execution input.

As you probably guessed, the Execution event history now shows a TaskFailed record. I didn’t configure any error handling, so the error bubbled up and caused the execution to fail with ExecutionFailed.

Scrolling back up again we can see the same Error and Cause that the worker returned in the SendTaskFailure API call.

🤔 Conclusion

I’ve shown that Step Functions can communicate with other services via SQS queues. Not only that, but the Step Function can go to sleep while waiting for a response.

The Step Function is no longer temporally coupled to the worker service, but this came at the cost of latency. The worker sleeps for 2 seconds but execution pauses for around 5 seconds.

Another trade-off is that the worker service needs to know it should call the Step Functions API and needs permission to do so. I know some people will see this as a problem, so I’ll provide alternatives in a follow-up post.

Service Integrations and Task Tokens open up a lot of interesting patterns. For example, call two services and await both responses with a Parallel state. Something I didn’t mention is configuring the Task to timeout waiting for the callback, which is good for optional requests or error handling. What other uses have you found?

Please let me know if you have any questions or would like to hear more about a particular topic.

For more like this, please follow me on Medium and Twitter.

--

--

Zac Charles

Serverless Engineer @ Stedi | AWS Community Builder