Akka Algemeen Improve Scala17 september 2015

Akka Process Framework

Akka is a great framework when you want to write reactive applications. While writing applications with Akka I noticed
that it was somewhat difficult to write processes flows. In this blog post I will show why it is not trivial to write a
process with Akka and how the ProcessFramework (https://github.com/jgordijn/process) solves this.

The problem

Let’s start by writing a rather simple process for order processing. We do not allow orders to be lost. This means
process needs to be persistent. The application can restart the process after application restart (or rebalancing).

The flow is like this:

Basic_flow

Let’s try to implement this in plain Akka with AkkaPersistence. A process starts with some command (in our case this is the incoming order). We need to keep track of the progression of the process. Therefore every step which is performed should be persisted. When the process is restarted, all events should be replayed and the process should continue where it left before the restart. Fortunately AtLeastoncedelivery can help us with that. During recovery all messages are queued until all events are replayed. A pending message can be removed from the queue during recovery (read more at: Akka documentation). When recovery is finished any pending message will be send. While this mechanism is great, the consequence is that logic is moving to the updateState. From an evount-sourcing perspective, this is not desirable.

class OrderProcess(orderId: UUID, emailModule: ActorRef, fulfillment: ActorRef, paymentModule: ActorRef) extends PersistentActor with AtLeastOnceDelivery {
  import OrderProcess._
  override def persistenceId: String = s"order-$orderId"

  var state = Option.empty[Order]

  def receiveCommand: Receive = {
    case order: Order 
      persist(OrderAccepted(orderId, order)) { event 
        sender() ! Accepted
        updateState(event)
      }
    case EmailModule.ConfirmationEmailSent(id) 
      persist(ConfirmationEmailSent(id)) (updateState)
    case EmailModule.SendNotDeliveredEmail(id) =>
      persist(NotDeliveredEmailSent(id)) (updateState)
    case FulfillmentModule.Fulfilled(id, delivered) =>
      persist(Fulfilled(id, delivered)) (updateState)
    case FinanceModule.Updated(id) =>
      persist(FinanceUpdated(id)) (updateState)
  }

  def updateState: Receive = {
    case OrderAccepted(id, order) 
      state = Some(order)
      deliver(emailModule.path, { id  EmailModule.SendConfirmationEmail(id, order.email, order.product) })
    case ConfirmationEmailSent(id) 
      confirmDelivery(id)
      deliver(fulfillment.path, { id  FulfillmentModule.Fulfill(id, state.get.product) })
    case Fulfilled(id, fulfilled) 
      confirmDelivery(id)
      deliver(financeModule.path, { id  FinanceModule.UpdateFinance(id, state.get) })
  }

  def receiveRecover: Receive = updateState
}

As you can see it is quit hard to see the releation between the drawed process and the code. This makes code hard to maintain and bugs are harder to spot.

Process Framework

The process framework is aimed at getting the process image and code more in sync. Ultimately allowing non-technical people to look at the code and understand the process flow.

A process consists of two pieces:

  1. Steps which perform actions and have side effect
  2. The process flow definition which combines the individual steps into a flow.

Let’s look at how the flow in the picture above would look like when written with the ProcessFramework. We need to create instances of the steps which can be used in the process definition. Looking at def process we can correlate the image of the process above with the code below. You can clearly see what the process flow is.

class BasicFlow extends PersistentProcess[State] {
  val sendToProcess = super.unhandled _

  val init: ProcessStep[State] = new InitStep
  val sendMail: ProcessStep[State] = new EmailStep
  val startFulfillment: ProcessStep[State] = new FulfillmentStep
  val updateFinance: ProcessStep[State] = new UpdateFinanceStep

  var state = State()

  override def receiveCommand = {
    case order:Order =>
      val originalSender = sender()
      sendMail.onCompleteAsync( originalSender ! Ack )
      sendToProcess(order)
  }

  def process = init ~> sendMail ~> startFulfillment ~> updateFinance
}

The flow is now clearly defined, but we still need to implement the action that needs to be taken by steps. Let’s implement the EmailStep. A step follows the rules of event-sourcing, so performing the action and updating state is separated into different parts. A step has three functions you need to implement:

  1. The execute function executes the logic of the step and can use the state to do so.
  2. The receiveCommand receives events back from other actors as a result from the execute. This is the place to define the Event that needs to be stored.
  3. The updateState updates the state with the Event that was emitted by step 2. The changed state is stored in the process, so that it can be used in following steps.

class EmailStep(emailActor: ActorRef)(implicit val context: ActorContext) extends ProcessStep[State] {

  def execute()(implicit process: ActorRef): Execution = { state  emailActor ! EmailModule.SendMail(state.order.get.email) }
  def receiveCommand: CommandToEvent = {
    case EmailModule.Sent 
      MailSent
  }
  def updateState: UpdateFunction = {
    case MailSent  { state 
      markDone() // This marks the step as completed
      state.copy(mailed = true)
    }
  }
}

Conclusion

The ProcessFramework is aimed at making it easier to implement business processes with Akka. I’m very proud of the result and I think that the goal is achieved to make the process flow readable. It is currently used in production, so it is proven to work correctly. The intention is to add more functionality to it, so that it becomes more and more complete.

I am very interested in your opinion and invite you to leave a comment.