Skip to main content

Developing Task Workers

Each LittleHorse SDK provides an LHTaskWorker object or struct which lets you turn an arbitrary function or method into a LittleHorse Task.

Quickstart

The LHTaskWorker object allows you to create and start a Task Worker in all three of our SDK's. Below, you will find compiler-ready programs that you can run which will:

  1. Register a TaskDef called greet which takes in one STR variable as input.
  2. Starts a Task Worker to poll the LH Cluster asking for a task to execute.

To create a Task Worker, you need to do the following:

  1. Create an LHConfig (see this configuration documentation).
  2. Write a Task Worker class with an annotated @LHTaskMethod method.
  3. Create an LHTaskWorker object with your config and Task Worker Object
  4. Register the TaskDef with worker.registerTaskDef()
  5. And finally call .start().

Let's build a Task Worker for a TaskDef named my-task that takes in a String and returns a String. First, the Task Worker Object:

package io.littlehorse.quickstart;

import java.io.IOException;
import io.littlehorse.sdk.common.config.LHConfig;
import io.littlehorse.sdk.worker.LHTaskMethod;
import io.littlehorse.sdk.worker.LHTaskWorker;

class MyWorker {
@LHTaskMethod("greet")
public String greeting(String firstName) {
String result = "Hello there, " + firstName + "!";
System.out.println(result);
return result;
}
}

public class Main {
public static void main(String[] args) throws IOException, InterruptedException {
LHConfig config = new LHConfig();

MyWorker executable = new MyWorker();
LHTaskWorker greetWorker = new LHTaskWorker(executable, "greet", config);
Runtime.getRuntime().addShutdownHook(new Thread(greetWorker::close));

greetWorker.registerTaskDef();

greetWorker.start();
}
}
tip

The TaskDef is generated by the registerTaskDef() call. It uses reflection to determine the parameter names. If you provide the options.compilerArgs << '-parameters' setting, the resulting TaskDef Variable Names will be more descriptive, rather than just argo, arg1...argn.

Advanced Usage

The Task Worker library has some features that make advanced use cases easier.

Throwing Workflow EXCEPTIONs

As described in our Failure Handling Concept Docs, LittleHorse distinguishes between technical ERRORs and business EXCEPTIONs:

  • A technical ERROR denotes a technological failure, such as a Timeout caused by a network outage, or an unexpected error returned by your Task Worker.
  • A Business EXCEPTION represents an unhappy-path case in your business logic, such as when an item is out of stock or a credit card got declined.

If your Task Worker throws an uncaught error (depending on your language), then it is treated as a LittleHorse ERROR with the error code LHErrorType.TASK_FAILURE. However, sometimes your Task Worker notices that a business process-level failure (what LittleHorse calls an EXCEPTION) has occurred. For example, the Task Worker could notice that a credit card got declined. In this case, you can make the TaskRun throw a LittleHorse EXCEPTION by using the LHTaskException object.

info

The LittleHorse EXCEPTION result is NOT retryable. That means that if your Task Method throws an LHTaskException, it will not be retried. If it throws any error/exception other than the LHTaskException, it will be treated as a LittleHorse ERROR, which is retryable.

In the following example, we will throw the out-of-stock user-defined business EXCEPTION if the item is out of stock.

package io.littlehorse.quickstart;

import io.littlehorse.sdk.common.exception.LHTaskException;
import io.littlehorse.sdk.worker.LHTaskMethod;

class MyWorker {
@LHTaskMethod("ship-item")
public String shipItem(String itemSku) {
if (isOutOfStock(itemSku)) {
throw new LHTaskException("out-of-stock", "Some human readable message");
}
return "Item " + itemSku + " successfully shipped!";
}
}

The first argument to the LHTaskException constructor is the name of the EXCEPTION we are going to throw. This is useful if you want to be able to catch specific EXCEPTIONs with specific types in your Failure Handlers. The second argument is a human-readable error message that shows up on the NodeRun's output as the error_message field, which is useful for debugging purposes.

If you want to throw a Failure that has content which can be caught in your Failure Handler using the INPUT variable name, you can use a third argument named content. It is optional in python and is available in an overloaded method signature in Java. The below is an example of how you might throw such an EXCEPTION:

package io.littlehorse.quickstart;

import io.littlehorse.sdk.common.exception.LHTaskException;
import io.littlehorse.sdk.worker.LHTaskMethod;

class MyWorker {
@LHTaskMethod("ship-item")
public String shipItem(String itemSku) {
if (isOutOfStock(itemSku)) {
int daysUntilBackInStock = calculateDaysUntilBackInStock(itemSku);

// The `content` of the `Failure` that is thrown will be an INT variable containing
// the number of days until the item is expected to be back in stock.
throw new LHTaskException(
"out-of-stock",
"Some human readable message",
daysUntilBackInStock);
}
return "Item " + itemSku + " successfully shipped!";
}
}

Json Deserialization

In some SDK's, LittleHorse will automatically deserialize JSON variables into objects or structs for you.

Let's say we have a class MyCar as follows:

class MyCar {
String make;
String model;

public MyCar(String make, String model) {
this.make = make;
this.model = model;
}

// getters, setters omitted
}

And one of the Variables (for example, my-obj) in our WfSpec is of type JSON_OBJ.

Let's say there's a TaskDef called json-example with one input variable of type JSON_OBJ. We can have a Task Worker defined as follows:

class MyWorker {

@LHTaskMethod("json-example")
public void executeTask(MyCar input) {
System.out.println(input.getMake());
System.out.println(input.getModel());
}
}

The Library will deserialize the JSON from something like: {"make": "Ford", "model": "Explorer"} to an actual MyCar object.

Accessing Metadata

Sometimes, your Task Worker needs to know something about where the TaskRun came from. Each LittleHorse SDK offers a WorkerContext object or struct that exposes this metadata to the Task Worker.

If you need to access metadata about the Task Run that is being executed, you can add a WorkerContext parameter to the end of your method signature for the Task Method.

Let's say you have a TaskDef with one input parameter of type INT. You can access the WorkerContext by doing the following:

class SomeWorker {

@LHTaskMethod("my-task")
public void doTask(long inputLong, WorkerContext context) {
String wfRunId = context.getWfRunId();
TaskRunId taskRunId = context.getTaskRunId();
NodeRunId nodeRunId = context.getNodeRunId();

Date timeWhenTaskWasScheduled = context.getScheduledTime();

context.log(
"This is a message that gets sent to the log output on the scheduler"\
);

int attemptNumber = context.getAttemptNumber();
if (attemptNumber == 0) {
// then this is the first time this Task Run has been attempted.
} else {
// then this is a retry.
}

// This is a constant value between all attempts for this TaskRun.
// Useful to allow retries to third-party API's that accept idempotency
// keys, such as Stripe.
String idempotencyKey = context.getIdempotencyKey();
}
}

Best Practices

Client ID

Every Task Worker instance should have a unique LHC_CLIENT_ID set in its configuration. This is important so that you can audit which client executed which Task, and also so that the LH Server can efficiently assign partitions of work to your Task Workers.

Idempotence

With all workflow engines, it is best when your tasks are idempotent. You can use the NodeRunIdPb from WorkerContext::getNodeRunId() as an idempotency key.