Background
The public LittleHorse API is a GRPC service exposed by the LH Server. We have complete auto-generated documentation for our GRPC Service and Protocol Buffers on our docs site. For the highly curious readers, you can find the actual protocol buffer code that underpins our system in our source code repository.
Because the public LittleHorse API is a GRPC service, you may notice that the LHConfig
object in all three of our SDK's has a getStub()
method or equivalent. This returns the autogenerated GRPC client in the appropriate language.
This page describes several patterns in the LittleHorse API. Some of these patterns come directly from GRPC (such as error handling and status codes), and others such as our implementation of cursor-based pagination are specific to LittleHorse.
LittleHorse GRPC Quickstart
The entities in our GRPC service are protocol buffers. You can find our up-to-date API contract in our api documentation. Our SDK's in Java, Go, and Python ship with pre-compiled protobufs for LittleHorse: you don't need to add an extra dependency or compile the protobuf yourself.
The below is an example of how to access a GRPC client, build a protobuf, and make a request in Java, Go, and Python. The request we will make is the rpc PutExternalEventDef
.
- Java
- Go
- Python
package io.littlehorse.quickstart;
import java.io.IOException;
import io.littlehorse.sdk.common.LHLibUtil;
import io.littlehorse.sdk.common.config.LHConfig;
import io.littlehorse.sdk.common.proto.LittleHorseGrpc.LittleHorseBlockingStub;
// All protobuf objects can be found in this package.
import io.littlehorse.sdk.common.proto.ExternalEventDef;
import io.littlehorse.sdk.common.proto.PutExternalEventDefRequest;
public class Main {
public static void main(String[] args) throws IOException {
// First, create an LHConfig. Using the default constructor loads the
// configurations from your environment variables.
LHConfig config = new LHConfig();
// Get a GRPC client. Java GRPC has two types: "Blocking" and regular.
// "Blocking" stubs are easier to work with as they are synchronous.
LittleHorseBlockingStub client = config.getBlockingStub();
// Build the request
PutExternalEventDefRequest req = PutExternalEventDefRequest.newBuilder()
.setName("my-external-event")
.build();
// Make the request
ExternalEventDef result = client.putExternalEventDef(req);
// Print the result in JSON format
System.out.println(LHLibUtil.protoToJson(result));
}
}
package main
import (
"context"
"log"
// Littlehorse functions are found in this package
"github.com/littlehorse-enterprises/littlehorse/sdk-go/littlehorse"
// All protobuf data structs and grpc clients are found in this package
"github.com/littlehorse-enterprises/littlehorse/sdk-go/lhproto"
)
func main() {
// Create a config using the environment variables.
config := littlehorse.NewConfigFromEnv()
// Load the client
client, err := config.GetGrpcClient()
if err != nil {
log.Fatal(err)
}
// Create the request protobuf structure
req := &lhproto.PutExternalEventDefRequest{
Name: "my-external-event-def",
}
// Make the request
var result *lhproto.ExternalEventDef
result, err = (*client).PutExternalEventDef(context.Background(), req)
littlehorse.PrintProto(result)
}
from littlehorse.config import LHConfig
# All protobuf models are in this package
from littlehorse.model import LittleHorseStub, PutExternalEventDefRequest, ExternalEventDef
# You can use this utility to print protobuf prettily (:
from google.protobuf.json_format import MessageToJson
if __name__ == '__main__':
# Create a config object using the environment variables.
config: LHConfig = LHConfig()
# Create the GRPC Client (in grpc, a client is called a "stub")
client: LittleHorseStub = config.stub()
# Formulate the request, which is a protobuf object.
request = PutExternalEventDefRequest(name="my-external-event-def")
# Make the request!
result: ExternalEventDef = client.PutExternalEventDef(request)
# Print it out
print(MessageToJson(result))
Error Handling
The LittleHorse API uses the standard GRPC Error Codes, and we strictly follow the conventions described in the official documentation. The most common error codes you will encounter are NOT_FOUND
, FAILED_PRECONDITION
, ALREADY_EXISTS
, and INVALID_ARGUMENT
.
When handling errors from the LittleHorse API, you should treat the status code as a machine-readable signal, and the error-message should be treated as a human-readable debugging aid. Your control flow logic should not depend on the content of the error message; it should only pay attention to the error status code.
LittleHorse currently does not utilize the GRPC Trailers to send error content in the form of a well-formed protobuf message.
In the example below, we will make a RunWf
request and provide the id
of the WfRun
which we want to run. Such a request will fail with an ALREADY_EXISTS
error if the WfRun
already exists. The example below will show you how to catch such an error.
- Java
- Go
- Python
package io.littlehorse.quickstart;
import java.io.IOException;
import io.grpc.StatusRuntimeException;
import io.grpc.Status.Code;
import io.littlehorse.sdk.common.config.LHConfig;
import io.littlehorse.sdk.common.proto.LittleHorseGrpc.LittleHorseBlockingStub;
import io.littlehorse.sdk.common.proto.RunWfRequest;
public class Main {
public static void main(String[] args) throws IOException {
LHConfig config = new LHConfig();
LittleHorseBlockingStub client = config.getBlockingStub();
// Only one WfRun may exist with a given ID.
String wfRunId = "some-wf-run-id";
try {
// Run a WfSpec and set the WfRunId beforehand.
client.runWf(RunWfRequest.newBuilder()
.setWfSpecName("quickstart")
.setId(wfRunId)
.build());
} catch(StatusRuntimeException exn) {
// All GRPC errors are in the form of `StatusRuntimeException`, which extends
// `RuntimeException` and contains a `io.grpc.Status` object.
if (exn.getStatus().getCode() == Code.ALREADY_EXISTS) {
System.out.println("The wfRun already exists!");
} else {
System.out.println("Yikes, we have a different error.");
throw exn;
}
}
}
}
package main
import (
"context"
"fmt"
"log"
"github.com/littlehorse-enterprises/littlehorse/sdk-go/littlehorse"
"github.com/littlehorse-enterprises/littlehorse/sdk-go/lhproto"
// Use the GRPC utilities to inspect GRPC errors
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func main() {
// Get a client
config := littlehorse.NewConfigFromEnv()
client, _ := config.GetGrpcClient()
wfRunId := "my-wf-run-id"
req := &lhproto.RunWfRequest{
Id: &wfRunId,
WfSpecName: "quickstart",
}
result, err := (*client).RunWf(context.Background(), req)
if err != nil {
// First, check if it is a GRPC error
st, ok := status.FromError(err)
if ok {
// Check the status
if st.Code() == codes.AlreadyExists {
fmt.Println("The WfRun with the specified ID already exists!")
} else {
fmt.Println("Got another error from LittleHorse")
log.Fatal(err)
}
} else {
// Handle non-GRPC errors
fmt.Println("Got a non-GRPC error")
log.Fatal(err)
}
} else {
littlehorse.PrintProto(result)
}
}
Error handling in Go is messy due to some weird decisions made by the language authors.
import grpc
from littlehorse.config import LHConfig
from littlehorse.model import RunWfRequest
config = LHConfig()
client = config.stub()
wf_run_id = "obi-wan"
try:
client.RunWf(RunWfRequest(
wf_spec_name="quickstart",
id=wf_run_id,
))
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.ALREADY_EXISTS:
# then a WfRun already exists with that id.
print("WfRun with specified id already exists!")
else:
raise e
In python GRPC, the .code()
method is an internal method of the RpcError
class so your code linters may complain about it. The alternative to using that method is not much better: the grpc-status
package requires adding another dependency to your project and is also EXPERIMENTAL
.
Note, however, that the .code()
method is safe to use as the littlehorse-client
package has tested it and pins to a version with which it works.
Read-Only Requests and Mutating Requests
In LittleHorse, there are two predominant types of GRPC requests:
- Read-Only Requests, which simply returns information about the current state of the system without altering it (eg. get a
WfRun
via therpc GetWfRun
), and - Mutating Requests, which may alter the state of the system (eg. run a
WfRun
via therpc RunWf
).
In LittleHorse, all Read-Only Requests (with the exception of rpc Whoami
) start with Get
, List
, or Search
. Any other request is a Mutating Request.
All Mutating Requests in LittleHorse can be made idempotent if you pass in the proper information. For example, if you pass in the id
field on the rpc RunWf
, you can safely retry the request multiple times and only one WfRun
will be created.
List and Search
List Requests and Search Requests are highly similar, with one distinct difference: a List Request returns a series of objects, whereas a Search Request returns a series of object id's. For example, the rpc ListTaskRun
returns a TaskRunList
(list of TaskRun
's) whereas the rpc SearchTaskRun
returns a TaskRunIdList
(list of TaskRunId
's).
Generally, List Requests list all objects of a certain type belonging to a specific WfRun
, but that is an observation and not a rule.
What is an LH API Object?
Something that is stored in the LittleHorse Data Store and can be retrieved through some request rpc GetFoo
in the LittleHorse API is refered to as a "LittleHorse API Object". Some common types of LH Api Objects are WfRun
, WfSpec
, TaskRun
, and TaskDef
.
An "Object Id" is a unique identifier for an LH API Object and contains all of the necessary information required to retrieve the LH API Object from the API via a request rpc GetFoo
, such as: rpc GetWfRun
, rpc GetWfSpec
, rpc GetTaskRun
, and rpc GetTaskDef
.
For a given LH API Object Type (in this example, TaskRun
), it is common to have some or all of the following requests:
message TaskRunId {
WfRunId wf_run_id = 1;
string guid = 2;
}
message TaskRunIdList {
repeated TaskRunId results = 1;
optional bytes bookmark = 2; // for cursor-based pagination
}
message TaskRunList {
repeated TaskRun results = 1;
optional bytes bookmark = 2; // for cursor-based pagination
}
// ...
service LittleHorse {
// ...
rpc GetTaskRun(TaskRunId) returns(TaskRun) {}
rpc ListTaskRun(ListTaskRunRequest) returns (TaskRunList) {}
rpc SearchTaskRun(SearchTaskRunRequest) returns (TaskRunIdList) {}
}
Pagination
Both List Requests and Search Requests alike use Cursor-Based Pagination. For an example, we will look at the rpc SearchTaskRun
. Note the optional bytes bookmark
field and the optional int32 limit
field.
// Searches for TaskRuns by various criteria.
message SearchTaskRunRequest {
optional bytes bookmark = 1;
optional int32 limit = 2;
string task_def_name = 3;
optional TaskStatus status = 4;
optional google.protobuf.Timestamp earliest_start = 5;
optional google.protobuf.Timestamp latest_start = 6;
}
The limit
field determines the maximum number of results to be returned in a single request.
Recall the optional bytes bookmark
field in the TaskRunIdList
proto. The TaskRunIdList
is the response format for the request rpc SearchTaskRun
. If the rpc SearchTaskRun
has more results than can be returned in one request (see limit
), then the bookmark
field of the TaskRunIdList
message is set to a byte-string that serves as a cursor.
To retrieve the next page of results, simply pass in the bookmark
from your previous request to your next request.
The below example shows how to iterate through a paginated list of TaskRun
s.
- Java
- Go
- Python
package io.littlehorse.quickstart;
import java.io.IOException;
import io.littlehorse.sdk.common.LHLibUtil;
import io.littlehorse.sdk.common.config.LHConfig;
import io.littlehorse.sdk.common.proto.LittleHorseGrpc.LittleHorseBlockingStub;
import io.littlehorse.sdk.common.proto.SearchTaskRunRequest;
import io.littlehorse.sdk.common.proto.TaskRunId;
import io.littlehorse.sdk.common.proto.TaskRunIdList;
public class Main {
public static void main(String[] args) throws IOException {
LHConfig config = new LHConfig();
LittleHorseBlockingStub client = config.getBlockingStub();
TaskRunIdList results = client.searchTaskRun(SearchTaskRunRequest.newBuilder()
.setTaskDefName("greet")
.setLimit(5)
.build());
processTaskRuns(results);
while (results.hasBookmark()) {
results = client.searchTaskRun(SearchTaskRunRequest.newBuilder()
.setTaskDefName("greet")
.setLimit(5)
.setBookmark(results.getBookmark())
.build());
processTaskRuns(results);
}
}
private static void processTaskRuns(TaskRunIdList taskRuns) {
System.out.println("Processing a batch of size: " + taskRuns.getResultsCount());
for (TaskRunId taskRun : taskRuns.getResultsList()) {
System.out.println(LHLibUtil.protoToJson(taskRun));
}
}
}
package main
import (
"context"
"fmt"
"strconv"
"github.com/littlehorse-enterprises/littlehorse/sdk-go/littlehorse"
"github.com/littlehorse-enterprises/littlehorse/sdk-go/lhproto"
// Use the GRPC utilities to inspect GRPC errors
)
func main() {
// Get a client
config := littlehorse.NewConfigFromEnv()
client, _ := config.GetGrpcClient()
limit := int32(5)
req := lhproto.SearchTaskRunRequest{
TaskDefName: "greet",
Limit: &limit,
}
results, _ := (*client).SearchTaskRun(context.Background(), &req)
processTaskRuns(results)
// For some reason GoLang decided to use `for` instead of `while`...
for results.Bookmark != nil {
req.Bookmark = results.Bookmark
results, _ = (*client).SearchTaskRun(context.Background(), &req)
processTaskRuns(results)
}
}
func processTaskRuns(taskRuns *lhproto.TaskRunIdList) {
fmt.Println("Processing a batch of size " + strconv.Itoa(len(taskRuns.Results)))
for _, taskRunId := range taskRuns.Results {
littlehorse.PrintProto(taskRunId)
}
}
from littlehorse.config import LHConfig
from littlehorse.model import *
from google.protobuf.json_format import MessageToJson
def process_task_runs(task_run_ids: TaskRunIdList):
print("Processing a batch of size " + str(len(task_run_ids.results)))
for task_run_id in task_run_ids.results:
print(MessageToJson(task_run_id))
if __name__ == '__main__':
config = LHConfig()
client = config.stub()
results: TaskRunIdList = client.SearchTaskRun(SearchTaskRunRequest(
task_def_name="greet",
limit=5,
))
process_task_runs(results)
# the `HasField()` method is the proper way to check for presence of an
# `optional` field in python Protobuf.
while results.HasField("bookmark"):
new_request = SearchTaskRunRequest(
task_def_name="greet",
limit=5,
# pass in the bookmark from the previous call
bookmark=results.bookmark,
)
results = client.SearchTaskRun(new_request)
process_task_runs(results)