Skip to main content

Developing Task Workers

Each LittleHorse SDK provides an LHTaskWorker object or struct which lets you turn an arbitrary function or method into a LittleHorse Task.

Basics

The LHTaskWorker object allows you to create and start a Task Worker.

To create a Task Worker, you need to do the following:

  1. Create an LHConfig (see this configuration documentation).
  2. Write a Task Worker class with an annotated @LHTaskMethod method.
  3. Create an LHTaskWorker object with your config and Task Worker Object
  4. Register the TaskDef with worker.registerTaskDef()
  5. And finally call .start().

Let's build a Task Worker for a TaskDef named my-task that takes in a String and returns a String. First, the Task Worker Object:

class MyWorker {

@LHTaskMethod("my-task")
public String doTheTask(String input) {
return "The input I got was: " + input;
}
}

The @LHTaskMethod annotation tells the LH Library that doTheTask is the method that should be called for every my-task Task Run that is scheduled.

Finally, we can create an LHTaskWorker, create the TaskDef, and start the worker:


MyWorker workerObject = new MyWorker();
LHWorkerConfig config = new LHWorkerConfig();

LHTaskWorker worker = new LHTaskWorker(workerObject, "my-task", config);
worker.registerTaskDef(false);
worker.start();

To gracefully shutdown, you can call worker.close();

Advanced Usage

The Task Worker library has some features that make advanced use cases easier.

Throwing Workflow EXCEPTIONs

As described in our Failure Handling Concept Docs, LittleHorse distinguishes between technical ERRORs and business EXCEPTIONs:

  • A technical ERROR denotes a technological failure, such as a Timeout caused by a network outage, or an unexpected error returned by your Task Worker.
  • A Business EXCEPTION represents an unhappy-path case in your business logic, such as when an item is out of stock or a credit card got declined.

If your Task Worker throws an uncaught error (depending on your language), then it is treated as a LittleHorse ERROR with the error code LHErrorType.TASK_FAILURE. However, sometimes your Task Worker notices that a business process-level failure (what LittleHorse calls an EXCEPTION) has occurred. For example, the Task Worker could notice that a credit card got declined. In this case, you can make the TaskRun throw a LittleHorse EXCEPTION by using the LHTaskException object.

In the following example, we will throw the out-of-stock user-defined business EXCEPTION if the item is out of stock.

@LHTaskMethod("ship-item")
public String shipItem() throws Exception {
if (isOutOfStock()) {
throw new LHTaskException("out-of-stock", "some descriptive message");
}
}

Json Deserialization

In some SDK's, LittleHorse will automatically deserialize JSON variables into objects or structs for you.

Let's say we have a class `MyCar` as follows:
class MyCar {
String make;
String model;

public MyCar(String make, String model) {
this.make = make;
this.model = model;
}

// getters, setters omitted
}

And one of the Variables (for example, my-obj) in our WfSpec is of type JSON_OBJ.

Let's say there's a TaskDef called json-example with one input variable of type JSON_OBJ. We can have a Task Worker defined as follows:

class MyWorker {

@LHTaskMethod("json-example")
public void executeTask(MyCar input) {
System.out.println(input.getMake());
System.out.println(input.getModel());
}
}

The Library will deserialize the JSON from something like: {"make": "Ford", "model": "Explorer"} to an actual MyCar object.

Accessing Metadata

Sometimes, your Task Worker needs to know something about where the TaskRun came from. Each LittleHorse SDK offers a WorkerContext object or struct that exposes this metadata to the Task Worker.

If you need to access metadata about the Task Run that is being executed, you can add a WorkerContext parameter to the end of your method signature for the Task Method.

Let's say you have a TaskDef with one input parameter of type INT. You can access the WorkerContext by doing the following:

class SomeWorker {

@LHTaskMethod("my-task")
public void doTask(long inputLong, WorkerContext context) {
String wfRunId = context.getWfRunId();
TaskRunId taskRunId = context.getTaskRunId();
NodeRunId nodeRunId = context.getNodeRunId();

Date timeWhenTaskWasScheduled = context.getScheduledTime();

context.log(
"This is a message that gets sent to the log output on the scheduler"\
);

int attemptNumber = context.getAttemptNumber();
if (attemptNumber == 0) {
// then this is the first time this Task Run has been attempted.
} else {
// then this is a retry.
}

// This is a constant value between all attempts for this TaskRun.
// Useful to allow retries to third-party API's that accept idempotency
// keys, such as Stripe.
String idempotencyKey = context.getIdempotencyKey();
}
}

Best Practices

Client ID

Every Task Worker instance should have a unique LHC_CLIENT_ID set in its configuration. This is important so that you can audit which client executed which Task, and also so that the LH Server can efficiently assign partitions of work to your Task Workers.

Idempotence

With all workflow engines, it is best when your tasks are idempotent. You can use the NodeRunIdPb from WorkerContext::getNodeRunId() as an idempotency key.