Await Workflow Events
You can await a WorkflowEvent
using the AwaitWorkflowEvent
RPC call.
AwaitWorkflowEventRequest
You need to create an AwaitWorkflowEventRequest
to do so. The protobuf definition is:
message AwaitWorkflowEventRequest {
WfRunId wf_run_id = 1;
repeated WorkflowEventDefId event_def_ids = 2;
repeated WorkflowEventId workflow_events_to_ignore = 3;
}
Required Parameters
The three required values are:
wfRunId
is the ID of theWfRun
that theWorkflowEvent
is thrown from.eventDefIds
is a repeated field including the IDs of theWorkflowEventDef
s you want to await thrownWorkflowEvent
s for. The request will return the first matchingWorkflowEvent
thrown. If this field is empty, the request will return the firstWorkflowEvent
thrown by theWfRun
.workflowEventsToIgnore
is a repeated field of IDs ofWorkflowEvents
that you want to ignore. This gives the client the ability to ignoreWorkflowEvent
s that have already been awaited. See Ignoring WorkflowEvents for an example.
Examples
Awaiting a WorkflowEvent
- Java
- Python
- Go
WorkflowEvent event = client.awaitWorkflowEvent(AwaitWorkflowEventRequest.newBuilder()
.setWfRunId(WfRunId.newBuilder().setId("your-workflow-run-id"))
.addEventDefIds(WorkflowEventDefId.newBuilder().setName("my-workflow-event-def"))
.build());
async def main() -> None:
config = get_config()
client = config.stub()
await_workflow_event_request = AwaitWorkflowEventRequest(
wf_run_id=WfRunId(id="your-wf-run-id"),
event_def_ids=[WorkflowEventDefId(name="my-workflow-event-def")],
workflow_events_to_ignore=None)
client.AwaitWorkflowEvent(await_workflow_event_request)
config := littlehorse.NewConfigFromEnv()
client, err := config.GetGrpcClient()
event, err := (*client).AwaitWorkflowEvent(context.Background(),
&lhproto.AwaitWorkflowEventRequest{
WfRunId: &lhproto.WfRunId{
Id: "your-workflow-run-id",
},
EventDefIds: []*lhproto.WorkflowEventDefId {
&lhproto.WorkflowEventDefId{
Name: "my-workflow-event-def",
},
}
},
)
Awaiting Events from Multiple WorkflowEventDef
s
In the event your workflow throws multiple types of WorkflowEvent
s, you can specify multiple WorkflowEventDef
s in your request. The request will return the first matching WorkflowEvent
thrown by your workflow.
- Java
- Python
- Go
WorkflowEvent event = client.awaitWorkflowEvent(AwaitWorkflowEventRequest.newBuilder()
.setWfRunId(WfRunId.newBuilder().setId("your-workflow-run-id"))
.addEventDefIds(WorkflowEventDefId.newBuilder().setName("my-workflow-event-def"))
.addEventDefIds(WorkflowEventDefId.newBuilder().setName("another-workflow-event-def"))
.build());
async def main() -> None:
config = get_config()
client = config.stub()
await_workflow_event_request = AwaitWorkflowEventRequest(
wf_run_id=WfRunId(id="your-wf-run-id"),
event_def_ids=[WorkflowEventDefId(name="my-workflow-event-def"),
WorkflowEventDefId(name="another-workflow-event-def")],
workflow_events_to_ignore=None)
event: WorkflowEvent = client.AwaitWorkflowEvent(await_workflow_event_request)
config := littlehorse.NewConfigFromEnv()
client, err := config.GetGrpcClient()
event, err := (*client).AwaitWorkflowEvent(context.Background(),
&lhproto.AwaitWorkflowEventRequest{
WfRunId: &lhproto.WfRunId{
Id: "your-workflow-run-id",
},
EventDefIds: []*lhproto.WorkflowEventDefId {
&lhproto.WorkflowEventDefId{
Name: "my-workflow-event-def",
},
&lhproto.WorkflowEventDefId{
Name: "another-workflow-event-def",
},
}
},
)
Using Deadlines
Upon the execution of a THROW_EVENT
node, LittleHorse will always ensure that your WorkflowEvent
s are thrown and returned to any clients awaiting them. However, you may still find it useful to set a gRPC deadline on your AwaitWorkflowEvent
request in case a WorkflowEvent
is not thrown within a specified period of time.
You can configure a gRPC deadline for any LittleHorse Client request, not just AwaitWorkflowEvent
! If the request does not complete within the specified time, it will be automatically canceled and return gRPC Status Code 1 CANCELLED
.
- Java
- Python
- Go
To use deadlines with our Java SDK, you can call the LittleHorseBlockingStub#withDeadlineAfter()
method before your gRPC request method call.
Properties props = getConfigProps();
LHConfig config = new LHConfig(props);
LittleHorseBlockingStub client = config.getBlockingStub();
WorkflowEvent event = client.withDeadlineAfter(1000, TimeUnit.MILLISECONDS)
.awaitWorkflowEvent(AwaitWorkflowEventRequest.newBuilder()
.setWfRunId(WfRunId.newBuilder().setId("your-workflow-run-id"))
.build());
Parameters
The LittleHorseBlockingStub#withDeadlineAfter()
method takes two parameters:
long duration
: the duration to await the requestTimeUnit unit
: the unit of time for the duration value
To use deadlines with our Python SDK, you can use the timeout
parameter in any gRPC request method call.
async def main() -> None:
config = get_config()
client = config.stub()
await_workflow_event_request = AwaitWorkflowEventRequest(
wf_run_id=WfRunId(id="your-wf-run-id"),
event_def_ids=[WorkflowEventDefId(name="my-workflow-event-def")],
workflow_events_to_ignore=None)
event: WorkflowEvent = client.AwaitWorkflowEvent(await_workflow_event_request, timeout=1)
Parameters
Every LittleHorseStub
request method features an int timeout
parameter which represents the amount of time in seconds
that the client will wait before cancelling your request.
To use deadlines in our Go SDK, you can use the context
library's withTimeout()
method to wrap your context with a fixed deadline.
config := littlehorse.NewConfigFromEnv()
client, err := config.GetGrpcClient()
contextWithTimeout, cancel := context.WithTimeout(context.Background(), time.Millisecond*1000)
defer cancel() // Ensure that cancel is called to release resources
event, err := (*client).AwaitWorkflowEvent(contextWithTimeout,
&lhproto.AwaitWorkflowEventRequest{
WfRunId: &lhproto.WfRunId{
Id: "your-workflow-run-id",
},
EventDefIds: []*lhproto.WorkflowEventDefId {
&lhproto.WorkflowEventDefId{
Name: "my-workflow-event-def",
},
}
},
)
Parameters
The Context.WithTimeout()
method takes two parameters:
parent Context
: the context of your request, usuallycontext.Background()
timeout time.Duration
: the unit and duration of time for your deadline
Ignoring WorkflowEvent
s
Since a single WfRun
may throw multiple WorkflowEvent
s with the same WorkflowEventDefId
, clients have the ability to "ignore" WorkflowEvent
s that have already been awaited. Any WorkflowEvent
specified within the workflowEventsToIgnore
field will be ignored.
- Java
- Python
- Go
// The first WorkflowEvent awaited by the client
WorkflowEvent event1 = client.awaitWorkflowEvent(AwaitWorkflowEventRequest.newBuilder()
.setWfRunId(WfRunId.newBuilder().setId("your-workflow-run-id"))
.addEventDefIds(WorkflowEventDefId.newBuilder().setName("my-workflow-event-def"))
.build());
// The second WorkflowEvent awaited by the client
WorkflowEvent event2 = client.awaitWorkflowEvent(AwaitWorkflowEventRequest.newBuilder()
.setWfRunId(WfRunId.newBuilder().setId("your-workflow-run-id"))
.addEventDefIds(WorkflowEventDefId.newBuilder().setName("my-workflow-event-def"))
// Ignore any WorkflowEvents matching the first one received
.addWorkflowEventsToIgnore(event1.getId())
.build());
async def main() -> None:
config = get_config()
client = config.stub()
await_workflow_event_request = AwaitWorkflowEventRequest(
wf_run_id=WfRunId(id="your-wf-run-id"),
event_def_ids=[WorkflowEventDefId(name="my-workflow-event-def")],
workflow_events_to_ignore=None)
event1: WorkflowEvent = client.AwaitWorkflowEvent(await_workflow_event_request)
await_workflow_event_request_2 = AwaitWorkflowEventRequest(
wf_run_id=WfRunId(id="your-wf-run-id"),
event_def_ids=[WorkflowEventDefId(name="my-workflow-event-def")],
# Ignore any WorkflowEvents matching the first one received
workflow_events_to_ignore=[event1.id])
event2: WorkflowEvent = client.AwaitWorkflowEvent(await_workflow_event_request_2)
config := littlehorse.NewConfigFromEnv()
client, err := config.GetGrpcClient()
event1, err := (*client).AwaitWorkflowEvent(context.Background(),
&lhproto.AwaitWorkflowEventRequest{
WfRunId: &lhproto.WfRunId{
Id: "your-workflow-run-id",
},
EventDefIds: []*lhproto.WorkflowEventDefId{
&lhproto.WorkflowEventDefId{
Name: "my-workflow-event-def",
},
},
},
)
event2, err := (*client).AwaitWorkflowEvent(context.Background(),
&lhproto.AwaitWorkflowEventRequest{
WfRunId: &lhproto.WfRunId{
Id: "your-workflow-run-id",
},
EventDefIds: []*lhproto.WorkflowEventDefId{
&lhproto.WorkflowEventDefId{
Name: "my-workflow-event-def",
},
},
WorkflowEventsToIgnore: []*lhproto.WorkflowEventId{
event1.Id,
},
},
)