Part V

Distribution Is a Native Language Concern:
Location-Transparent Processes and Type-Derived Protocols
in the Japl Programming Language

Matthew Long — The JAPL Research Collaboration
YonedaAI Research Collective, Chicago, IL

Keywords: distributed systems, programming language design, location transparency, type-derived serialization, process calculi, algebraic data types, effect systems, fault tolerance

1 Introduction

The fundamental challenge of distributed computing is not networking—it is semantics. When a computation spans multiple machines connected by an unreliable network, every assumption that holds for local computation becomes suspect. Function calls may fail silently, messages may arrive out of order or not at all, clocks disagree, and state is inherently partitioned. These are not incidental difficulties; they are the defining characteristics of distributed systems .

Despite this, the vast majority of programming languages treat distribution as an afterthought. The language provides constructs for local computation— functions, objects, threads, memory—and leaves the programmer to bridge the gap to distributed computation using external tools:

The result is a “distribution tax” levied on every distributed application: thousands of lines of glue code, configuration files, generated stubs, and operational infrastructure that exist only because the language cannot express distribution natively. This tax is not merely an inconvenience; it is a source of bugs. Protocol mismatches, serialization errors, version incompatibilities, and unhandled partial failures account for a significant fraction of distributed systems outages .

1.1 The Waldo Critique

In their seminal 1994 note, argued that “objects that interact in a distributed system need to be dealt with in ways that are intrinsically different from objects that interact in a single address space.” They criticized systems like CORBA and Java RMI that attempted to make remote calls look identical to local calls, arguing that this transparency hid essential complexity: latency, partial failure, and concurrency.

The Waldo critique is correct—but the conclusion drawn by most language designers (“therefore, do not attempt language-level distribution”) does not follow. The correct response is to make distribution a language concern while also making the distributed nature of operations visible where it matters. Japl achieves this through three mechanisms:

  1. Location-transparent PIDs allow uniform send/receive syntax regardless of process location, reducing syntactic burden.

  2. The Net effect marks functions that perform network operations, making distribution boundaries visible in types.

  3. Type-derived serialization ensures that messages sent across node boundaries are compatible, catching protocol mismatches at compile time.

1.2 Contributions

This paper makes the following contributions:

  1. We present the distribution model of Japl, which extends Erlang-style location-transparent processes with static typing, type-derived wire protocols, and effect-tracked network boundaries (4).

  2. We develop a formal framework based on a location-aware \(\pi\)-calculus with type-safe serialization, and provide a categorical semantics using fibered categories over network topology (3).

  3. We define type-derived wire protocols and schema evolution rules that eliminate manual serialization code while guaranteeing backward compatibility (5).

  4. We describe Japl’s approach to partition tolerance, distributed supervision, service discovery, and security ([sec:partition,sec:discovery,sec:security]).

  5. We provide a comprehensive comparison with six alternative approaches and a detailed case study demonstrating multi-node process migration ([sec:comparison,sec:case-study]).

1.3 Paper Outline

2 surveys related work. 3 develops the formal framework. 4 presents Japl’s distribution model. 5 describes type-derived wire protocols. 6 examines location transparency. 7 addresses partition tolerance. 8 covers service discovery and clustering. 9 provides a detailed comparison. 10 describes the implementation architecture. 11 addresses distributed security. 12 presents the case study. 14 concludes.

2 Background and Related Work

2.1 Erlang/OTP: The Gold Standard for Distribution

Erlang  is the language that most fully embraces native distribution. Every Erlang process has a PID that can refer to a local or remote process. The ! (send) operator works identically regardless of process location. The Erlang distribution protocol handles connection management, serialization (via the External Term Format), and node discovery (via EPMD).

Erlang’s distribution model has been extraordinarily successful in production. The Ericsson AXD301 ATM switch, WhatsApp’s messaging infrastructure, and Discord’s real-time communication all depend on it. However, Erlang’s approach has limitations that Japl addresses:

2.2 Akka Cluster

Akka  brings the actor model to the JVM with Akka Cluster providing distribution primitives: cluster membership via a gossip protocol, cluster sharding for stateful actors, and distributed pub-sub. Akka Typed adds static typing to actor interactions.

Akka’s model is powerful but suffers from the impedance mismatch of being a library in a language (Scala/Java) not designed for actors. Serialization requires explicit configuration (Kryo, Jackson, Protobuf). Cluster membership is managed through application.conf files. Failure handling interleaves with JVM exception mechanics. Japl avoids these issues by making distribution a language-level concern.

2.3 Orleans: Virtual Actors

Microsoft Orleans  introduces the “virtual actor” model, where actors always exist conceptually and the runtime activates and deactivates them as needed. This simplifies distributed programming by eliminating explicit lifecycle management. Grain identities serve as stable, location-transparent references.

Orleans demonstrates that distribution semantics benefit from runtime support. However, Orleans is tied to the .NET ecosystem, requires code generation for grain interfaces, and relies on external storage for persistence. Japl takes the insight that the runtime should manage distribution but implements it as a language primitive rather than a framework.

2.4 Cloud Haskell

Cloud Haskell  brings Erlang-style distribution to Haskell. It provides typed channels, remote spawning, and process monitoring. Cloud Haskell demonstrates that static typing and distribution can coexist, but it faces fundamental challenges:

Japl avoids these issues because its runtime is designed from the start for distribution, its effect system replaces monadic encoding, and its type-derived serialization does not require Template Haskell equivalents.

2.5 Cap’n Proto and Zero-Copy Serialization

Cap’n Proto  takes the radical approach of defining a wire format that is also the in-memory format, eliminating serialization overhead entirely. This is powerful for performance but requires a separate schema language and code generation step.

Japl’s approach is spiritually similar—types define wire formats—but achieves this within the language itself rather than through external tools. The compiler generates efficient serialization code from algebraic data type definitions.

2.6 Unison: Content-Addressed Code

Unison  takes a radical approach to distribution: all code is content-addressed by hash. A function is identified not by name but by the hash of its abstract syntax tree. This means code can be trivially transferred between nodes—send the hash, and if the remote node does not have it, send the definition.

Unison’s approach eliminates versioning problems entirely but requires a fundamental rethinking of the development experience. Japl takes a more conservative approach, keeping named modules and file-based source code while providing version-aware serialization for data.

2.7 The E Language

The E language  pioneered capability-secure distributed programming. E’s distributed model is based on “eventual sends” (using the <- operator) and “promises” that resolve when remote operations complete. E’s capability discipline ensures that distributed access control is enforced by the language, not just by convention.

Japl borrows the capability-security concept but integrates it with the process model rather than the object model. Capability types in Japl control what a process can do on a remote node.

2.8 Bloom and the CALM Theorem

Bloom  and the CALM theorem  establish a deep connection between monotonicity and consistency in distributed systems. The CALM theorem states that a program that is logically monotonic can be implemented without coordination (i.e., it is eventually consistent by construction). Non-monotonic operations (e.g., aggregation, negation) require synchronization.

While Japl does not enforce monotonicity, its type system can express CRDT-based data structures  that are monotonic by construction, and its effect system can distinguish coordination-free operations from those requiring synchronization.

3 Formal Framework

We develop the formal foundations for Japl’s distribution model in three parts: a location-aware process calculus, type-safe serialization, and a categorical semantics.

3.1 Location-Aware \(\pi\)-Calculus

We extend the \(\pi\)-calculus  with locations, typed channels, and serialization constraints.

Definition 1 (Network Topology). A network topology is a directed graph \(\mathcal{N}= (N, E)\) where \(N\) is a finite set of nodes and \(E \subseteq N \times N\) is a set of connections. We write \(n \rightsquigarrow m\) when \((n, m) \in E\). Connections may be asymmetric and may fail.

Definition 2 (Located Processes). The syntax of the located \(\pi\)-calculus (\(L\pi\)) is: \[\begin{align*} P, Q ::= \quad & \mathbf{0} & \text{(inaction)} \\ & \overline{c}\langle v \rangle . P & \text{(output on channel $c$)} \\ & c(x).P & \text{(input on channel $c$)} \\ & P \mid Q & \text{(parallel composition)} \\ & (\nu c : \tau) P & \text{(channel restriction with type)} \\ & !P & \text{(replication)} \\ & n \llbracket P \rrbracket & \text{(located process at node $n$)} \\ & \mathsf{spawn}_n(P) & \text{(spawn process at node $n$)} \\ & \mathsf{migrate}(n, P) & \text{(migrate process to node $n$)} \end{align*}\] where \(c\) ranges over channels, \(v\) over values, \(x\) over variables, \(\tau\) over types, and \(n\) over node identifiers.

Definition 3 (Typed Channels). A channel \(c\) has type \(\mathsf{Chan}[\tau]\) where \(\tau\) is the message type. The typing rules for communication require: \[\frac{\Gamma \vdash c : \mathsf{Chan}[\tau] \quad \Gamma \vdash v : \tau \quad \mathsf{Ser}(\tau)} {\Gamma \vdash \overline{c}\langle v \rangle . P} \quad \text{(T-Send)}\] \[\frac{\Gamma \vdash c : \mathsf{Chan}[\tau] \quad \Gamma, x : \tau \vdash P} {\Gamma \vdash c(x).P} \quad \text{(T-Recv)}\] The predicate \(\mathsf{Ser}(\tau)\) (“\(\tau\) is serializable”) is required only when the channel crosses a node boundary.

Definition 4 (Cross-Node Communication). When processes at different nodes communicate, serialization is required: \[\frac{n \neq m \quad \Gamma \vdash c : \mathsf{Chan}[\tau] \quad \Gamma \vdash v : \tau \quad \mathsf{Ser}(\tau)} {n\llbracket \overline{c}\langle v \rangle . P \rrbracket \mid m\llbracket c(x).Q \rrbracket \;\longrightarrow\; n\llbracket P \rrbracket \mid m\llbracket Q[x := \mathsf{deser}(\mathsf{ser}(v))] \rrbracket}\] This rule makes explicit that cross-node communication involves serialization and deserialization, and that the serializable type constraint is required.

3.2 Type-Safe Serialization

Definition 5 (Serializable Types). The predicate \(\mathsf{Ser}(\tau)\) is defined inductively over types:

  1. \(\mathsf{Ser}(\mathsf{Int})\), \(\mathsf{Ser}(\mathsf{Float})\), \(\mathsf{Ser}(\mathsf{Bool})\), \(\mathsf{Ser}(\mathsf{String})\), \(\mathsf{Ser}(\mathsf{Bytes})\), \(\mathsf{Ser}(\mathsf{Unit})\) — all primitive types are serializable.

  2. If \(\mathsf{Ser}(\tau_1)\) and \(\mathsf{Ser}(\tau_2)\), then \(\mathsf{Ser}(\tau_1 \times \tau_2)\) and \(\mathsf{Ser}(\tau_1 + \tau_2)\) — products and sums of serializable types are serializable.

  3. If \(\mathsf{Ser}(\tau)\), then \(\mathsf{Ser}(\mathsf{List}[\tau])\), \(\mathsf{Ser}(\mathsf{Option}[\tau])\), \(\mathsf{Ser}(\mathsf{Map}[k,v])\) where \(\mathsf{Ser}(k)\) and \(\mathsf{Ser}(v)\) — container types preserve serializability.

  4. \(\mathsf{Ser}(\textsf{Pid}[\tau])\) for any \(\tau\) — process identifiers are always serializable (they are network addresses).

  5. \(\neg\mathsf{Ser}(\tau \to \sigma)\) — function types are not serializable. Closures cannot cross node boundaries.

  6. If \(\tau\) is a named type with \(\mathsf{deriving}(\mathsf{Serialize})\) and all field types are serializable, then \(\mathsf{Ser}(\tau)\).

Theorem 6 (Serialization Soundness). If \(\mathsf{Ser}(\tau)\) and \(v : \tau\), then \(\mathsf{deser}_\tau(\mathsf{ser}_\tau(v)) = v\). That is, serialization followed by deserialization is the identity on values.

Proof. By structural induction on \(\tau\).

Base cases (primitives). Consider \(\tau = \mathsf{Int}\). By definition, \(\mathsf{ser}_{\mathsf{Int}}(n)\) produces the LEB128 zigzag encoding of \(n\), which is a bijection between integers and finite byte sequences. The decoder \(\mathsf{deser}_{\mathsf{Int}}\) inverts the zigzag and LEB128 steps, recovering \(n\) exactly. The argument is analogous for \(\mathsf{Float}\) (IEEE 754 round-trip is exact for non-NaN values, and Japl canonicalizes NaN), \(\mathsf{Bool}\) (one byte, two values), \(\mathsf{String}\) (length-prefixed UTF-8 is self-delimiting and bijective), \(\mathsf{Bytes}\) (length-prefixed raw bytes), and \(\mathsf{Unit}\) (zero bytes, unique value).

Inductive case: products. Let \(\tau = \tau_1 \times \tau_2\) with \(v = (v_1, v_2)\). By the induction hypothesis, \(\mathsf{deser}_{\tau_i}(\mathsf{ser}_{\tau_i}(v_i)) = v_i\) for \(i \in \{1,2\}\). Serialization of records uses tagged, length-prefixed fields: \[\mathsf{ser}_{\tau_1 \times \tau_2}(v_1, v_2) = \mathsf{tag}(1) \cdot \mathsf{len}(\mathsf{ser}_{\tau_1}(v_1)) \cdot \mathsf{ser}_{\tau_1}(v_1) \cdot \mathsf{tag}(2) \cdot \mathsf{len}(\mathsf{ser}_{\tau_2}(v_2)) \cdot \mathsf{ser}_{\tau_2}(v_2)\] The length prefixes allow the deserializer to split the byte stream unambiguously at the correct boundary. It then applies \(\mathsf{deser}_{\tau_1}\) and \(\mathsf{deser}_{\tau_2}\) to the respective segments, recovering \((v_1, v_2)\) by the induction hypothesis. The argument generalizes to \(n\)-ary products by induction on the number of fields.

Inductive case: sums. Let \(\tau = \tau_1 + \tau_2\) with \(v = \mathsf{inl}(v_1)\) (the \(\mathsf{inr}\) case is symmetric). Then \(\mathsf{ser}_{\tau}(v) = \mathsf{tag}(\mathsf{inl}) \cdot \mathsf{ser}_{\tau_1}(v_1)\). The deserializer reads the tag, determines the active variant, and applies \(\mathsf{deser}_{\tau_1}\) to the remaining bytes, recovering \(\mathsf{inl}(v_1)\) by the induction hypothesis.

Inductive case: containers. For \(\tau = \mathsf{List}[\sigma]\) with \(v = [v_1, \ldots, v_k]\), serialization produces \(k \cdot \mathsf{ser}_\sigma(v_1) \cdots \mathsf{ser}_\sigma(v_k)\) where \(k\) is LEB128-encoded. Deserialization reads \(k\), then applies \(\mathsf{deser}_\sigma\) exactly \(k\) times. Each element round-trips by the induction hypothesis on \(\sigma\), so the list round-trips. The \(\mathsf{Option}[\sigma]\) and \(\mathsf{Map}[k,v]\) cases follow similarly.

PID case. \(\mathsf{Pid}[\tau]\) is serialized as a fixed 16-byte value (8-byte node ID concatenated with 8-byte process ID). This is a bijection on the PID space, so round-tripping is immediate.

Negative case. Function types \(\tau \to \sigma\) are excluded from \(\mathsf{Ser}\) by rule 5 of Definition 3.4, so the theorem’s hypothesis is never satisfied for them. ◻

3.3 Protocol Evolution and Backward Compatibility

Definition 7 (Type Compatibility). A type \(\tau'\) is backward compatible with \(\tau\) (written \(\tau \preceq \tau'\)) if every value that can be deserialized as \(\tau\) can also be deserialized as \(\tau'\) (with defaults for new fields). The compatibility relation is defined by:

  1. Field addition: \(\{l_1:\tau_1, \ldots, l_n:\tau_n\} \preceq \{l_1:\tau_1, \ldots, l_n:\tau_n, l_{n+1}:\tau_{n+1}\}\) if \(\tau_{n+1}\) has a default value.

  2. Variant addition: \((C_1(\tau_1) \mid \ldots \mid C_n(\tau_n)) \preceq (C_1(\tau_1) \mid \ldots \mid C_n(\tau_n) \mid C_{n+1}(\tau_{n+1}))\).

  3. Field widening: \(\{l:\tau\} \preceq \{l:\tau'\}\) if \(\tau \preceq \tau'\).

  4. Reflexivity and transitivity.

Theorem 8 (Protocol Evolution Safety). If \(\tau \preceq \tau'\) and a node running code with message type \(\tau'\) receives a message serialized with type \(\tau\), then deserialization succeeds and produces a valid value of type \(\tau'\).

Proof. By structural induction on the derivation of \(\tau \preceq \tau'\), with case analysis on the last rule applied.

Base case: reflexivity. If \(\tau = \tau'\), then the message was serialized and is being deserialized at the same type, which succeeds by 6.

Case: field addition. Suppose \(\tau = \{l_1:\tau_1, \ldots, l_n:\tau_n\}\) and \(\tau' = \{l_1:\tau_1, \ldots, l_n:\tau_n, l_{n+1}:\tau_{n+1}\}\) where \(\tau_{n+1}\) has a default value \(d_{n+1}\). A message serialized at type \(\tau\) contains tagged fields for \(l_1, \ldots, l_n\). The deserializer for \(\tau'\) reads fields by tag. For tags \(1, \ldots, n\), the corresponding field types are identical, so each field deserializes correctly by 6. Tag \(n{+}1\) is absent from the byte stream; the deserializer detects this absence (the stream terminates or the next tag does not match \(n{+}1\)) and fills \(l_{n+1}\) with its default value \(d_{n+1}\). The result is a valid value of type \(\tau'\).

Case: variant addition. Suppose \(\tau = (C_1(\sigma_1) \mid \ldots \mid C_n(\sigma_n))\) and \(\tau' = (C_1(\sigma_1) \mid \ldots \mid C_n(\sigma_n) \mid C_{n+1}(\sigma_{n+1}))\). A message serialized at type \(\tau\) has a variant tag in \(\{1, \ldots, n\}\). The deserializer for \(\tau'\) recognizes all tags \(1, \ldots, n\) with the same payload types, so deserialization succeeds by 6 applied to the payload. The tag \(n{+}1\) cannot appear in messages serialized at type \(\tau\), so no failure case arises.

Case: field widening. Suppose \(\tau = \{l:\sigma\}\) and \(\tau' = \{l:\sigma'\}\) where \(\sigma \preceq \sigma'\). A message serialized at type \(\tau\) contains \(l\) encoded at type \(\sigma\). By the induction hypothesis applied to \(\sigma \preceq \sigma'\), the deserializer for \(\sigma'\) can successfully decode a value serialized at type \(\sigma\), producing a valid value of type \(\sigma'\). Therefore the record deserializes to a valid value of type \(\tau'\).

Case: transitivity. Suppose \(\tau \preceq \tau''\) and \(\tau'' \preceq \tau'\). By the induction hypothesis, a message serialized at \(\tau\) can be deserialized at \(\tau''\), producing a valid intermediate value, and a message serialized at \(\tau''\) can be deserialized at \(\tau'\). Since the deserializer for \(\tau'\) operates directly on the byte stream (not on an intermediate value), we must show that the byte-level encoding of a \(\tau\)-value is accepted by the \(\tau'\) deserializer. This follows from the compositionality of each rule: field additions compose (multiple new fields each get defaults), variant additions compose (the union of new variants is still a superset), and field widenings compose by transitivity of \(\preceq\) on the field types. ◻

3.4 Categorical Semantics

We provide a categorical interpretation of distributed systems using fibered categories .

Definition 9 (Network Category). The network category \(\mathcal{N}\) has nodes as objects and connections as morphisms. A connection \(f : n \to m\) represents a communication channel from node \(n\) to node \(m\). Composition of connections represents multi-hop routing.

Definition 10 (Process Fibration). A process fibration is a functor \(\pi : \mathcal{P}\to \mathcal{N}\) where \(\mathcal{P}\) is the category of processes and \(\mathcal{N}\) is the network category. The fiber \(\pi^{-1}(n)\) over a node \(n\) is the subcategory of processes running on node \(n\). The total category \(\mathcal{P}\) captures all processes across all nodes.

Definition 11 (Type Fibration). A type fibration is a functor \(\sigma : \mathcal{T}\to \mathcal{N}\) where \(\mathcal{T}\) is the category of types available at each node. The reindexing functor \(f^* : \mathcal{T}_m \to \mathcal{T}_n\) along a connection \(f : n \to m\) represents the deserialization of types received over the connection.

Proposition 12 (Distribution as Fibered Adjunction). The operations of spawn-remote and message-send correspond to functorial operations in the fibered setting:

  • \(\mathsf{spawn}_m : \mathcal{P}_n \to \mathcal{P}_m\) is the direct image functor along \(f : n \to m\), sending a process description from \(n\) to be executed at \(m\).

  • \(\mathsf{send}: \mathcal{P}_n \times \mathcal{T}_n \to \mathcal{P}_m\) composes the serialization functor \(\mathsf{ser} : \mathcal{T}_n \to \mathsf{Bytes}\) with the transport functor \(\mathsf{trans}_{f} : \mathsf{Bytes}_n \to \mathsf{Bytes}_m\) and the deserialization functor \(\mathsf{deser} : \mathsf{Bytes} \to \mathcal{T}_m\).

This fibered structure captures the key insight: distribution is not a flat operation but a structured relationship between local computations (fibers) and global topology (base).

Definition 13 (Consistency Sheaf). A consistency sheaf on \(\mathcal{N}\) assigns to each node \(n\) a set of observable states \(S(n)\), and to each connection \(f : n \to m\) a consistency relation \(S(f) : S(n) \to S(m)\). The sheaf condition requires that observations are compatible across paths: \[S(g \circ f) = S(g) \circ S(f)\] Strong consistency corresponds to a constant sheaf; eventual consistency corresponds to a sheaf where \(S(f)\) is required to converge but may temporarily disagree.

More precisely, \(S(f)\) is a relation \(S(f) \subseteq S(n) \times S(m)\) that relates the observable state at the source node \(n\) to the observable state at the destination node \(m\) along connection \(f : n \to m\). Under strong consistency, \(S(f)\) is the identity relation (states agree at all times). Under eventual consistency, \(S(f)\) is a convergence relation: for any two states \(s_n \in S(n)\) and \(s_m \in S(m)\) with \((s_n, s_m) \in S(f)\), there exists a future time at which \(s_n\) and \(s_m\) agree after merging concurrent updates. The sheaf condition \(S(g \circ f) = S(g) \circ S(f)\) ensures that consistency guarantees compose transitively across multi-hop paths.

The following example shows how the consistency sheaf manifests concretely in Japl code using a distributed counter (a grow-only CRDT):

-- A distributed counter backed by a GCounter CRDT.
-- Each node maintains a local entry in its vector; the sheaf
-- structure ensures that observations converge across all paths.

type DistCounter = {
  counts: Map<NodeId, Int>   -- per-node counts (the fiber S(n))
} deriving(Serialize)

-- Local observation: S(n) returns the sum of known counts at node n
fn value(counter: DistCounter) -> Int =
  Map.values(counter.counts) |> List.sum()

-- Local increment: only modifies the local node's entry
fn increment(counter: DistCounter) -> DistCounter =
  let node = Node.self_id()
  let current = Map.get(counter.counts, node) |> Option.unwrap_or(0)
  { counts = Map.insert(counter.counts, node, current + 1) }

-- The consistency relation S(f): merging two observations.
-- For GCounter, merge = pointwise max, which is commutative,
-- associative, and idempotent -- guaranteeing convergence.
fn merge(local: DistCounter, remote: DistCounter) -> DistCounter =
  let merged = Map.merge_with(local.counts, remote.counts,
    fn _key local_val remote_val -> Int.max(local_val, remote_val))
  { counts = merged }

-- Gossip loop: periodically exchange state with peers,
-- applying the consistency relation on each connection.
fn counter_gossip(state: DistCounter, peers: List<Pid[CounterMsg]>)
    -> Never with Process[CounterMsg], Net =
  match Process.receive_with_timeout(1000) with
  | GossipSync(remote_state) ->
      let merged = merge(state, remote_state)
      counter_gossip(merged, peers)
  | Timeout ->
      -- Push local state to a random peer (sheaf pushforward)
      let peer = List.random_pick(peers)
      Process.send(peer, GossipSync(state))
      counter_gossip(state, peers)

In this example, the fiber \(S(n)\) at each node is the local DistCounter value. The consistency relation \(S(f)\) along each connection is realized by the merge function, whose algebraic properties (commutativity, associativity, idempotence) guarantee that the sheaf condition \(S(g \circ f) = S(g) \circ S(f)\) holds: merging state received via a multi-hop path produces the same result regardless of the intermediate route.

4 JAPL’s Distribution Model

Japl’s distribution model rests on five pillars: location-transparent process identifiers, type-derived serialization, effect-marked network operations, distributed supervision, and built-in service discovery. This section provides an overview; subsequent sections develop each pillar in detail.

4.1 Process Identifiers Are Location-Transparent

In Japl, every process has a Pid[Msg] where Msg is the type of messages the process accepts. This PID encodes the node on which the process runs, but the programmer is not required to inspect this information. Sending a message to a PID uses the same syntax regardless of locality:

-- These use identical syntax
let local_pid = Process.spawn(fn -> worker_loop(init_state))
let remote_pid = Process.spawn_on(remote_node, fn -> worker_loop(init_state))

-- Send works the same for both
Process.send(local_pid, StartJob(job_1))
Process.send(remote_pid, StartJob(job_2))

The runtime determines whether a send requires network transport. If the target PID is local, the message is placed directly in the process mailbox (a pointer swap). If remote, the message is serialized, transmitted, and deserialized at the destination.

4.2 Type-Derived Serialization

Japl’s key insight is that algebraic data types already contain sufficient information to generate wire protocols. A type definition is simultaneously a data specification and a serialization format:

type Order = {
  id: OrderId,
  items: List<Item>,
  total: Money,
  status: OrderStatus
}

-- Order is automatically serializable because all its fields are.
-- No annotation needed, no code generation step, no schema file.

The compiler verifies that every type used in cross-node messages is serializable. Attempting to send a function closure across node boundaries is a compile-time error:

-- Compile error: fn(Int) -> Int is not Serializable
-- Cannot send function values across node boundaries
let bad_msg = ApplyFunction(fn x -> x + 1)
Process.send(remote_pid, bad_msg)  -- TYPE ERROR

4.3 Effects Mark Network Boundaries

Following ’s observation that distributed operations are fundamentally different from local ones, Japl’s effect system makes network boundaries visible:

-- Local process operations: Process effect only
fn spawn_worker() -> Pid[WorkerMsg] with Process =
  Process.spawn(fn -> worker_loop(init))

-- Remote operations: Process AND Net effects
fn spawn_remote_worker(node: Node) -> Pid[WorkerMsg] with Process, Net =
  Process.spawn_on(node, fn -> worker_loop(init))

-- Node connection: Net effect
fn connect_cluster() -> List<Node> with Net =
  let n1 = Node.connect("us-east-1.cluster:9000")
  let n2 = Node.connect("us-west-2.cluster:9000")
  [n1, n2]

The Net effect signals that an operation may experience network latency, partial failure, or partition. This is not merely documentation; the effect system enforces that network operations are handled in contexts that expect them.

4.4 Distributed Supervision

Japl extends supervision trees across node boundaries. A supervisor on one node can supervise processes on other nodes, restarting them when they fail or when the network connection drops:

supervisor DistributedApi {
  strategy: OneForOne
  child spawn_remote(node("us-east-1"), http_handler())
  child spawn_remote(node("us-west-2"), http_handler())
  child spawn_remote(node("eu-west-1"), http_handler())
}

When a supervised remote process crashes, the supervisor receives a ProcessDown message just as it would for a local process. When a network partition occurs, the supervisor receives a NodeDown message and can apply a partition strategy.

4.5 Built-In Service Discovery

Japl includes primitives for service discovery, eliminating the need for external service registries in many scenarios:

-- Register a service
Registry.register("payment-service", self())

-- Discover services
let payment_workers = Registry.lookup("payment-service")

-- Subscribe to service changes
Registry.subscribe("payment-service", fn event ->
  match event with
  | ServiceUp(pid) -> add_to_pool(pid)
  | ServiceDown(pid) -> remove_from_pool(pid)
)

5 Type-Derived Wire Protocols

5.1 Types as Protocol Specifications

In most distributed systems, there is a gap between the in-memory representation of data and its wire format. Serialization frameworks like Protocol Buffers , Apache Avro , and Cap’n Proto  bridge this gap with separate schema languages and code generation. This introduces three problems: schema drift (the schema diverges from the code), impedance mismatch (the generated code does not feel natural in the target language), and build complexity (the code generation step must be integrated into the build system).

Japl eliminates this gap entirely. Every algebraic data type is its own schema. The compiler generates serialization and deserialization code from the type definition, ensuring perfect correspondence between in-memory and wire formats.

5.2 Encoding Rules

The wire encoding is derived from the structure of algebraic data types:

Definition 14 (Wire Encoding). The wire encoding function \(\mathsf{enc} : \tau \to \mathsf{Bytes}\) is defined by structural recursion:

  • Primitives: Fixed-width encodings. Int uses variable-length encoding (LEB128). Float uses IEEE 754. String uses length-prefixed UTF-8. Bool uses one byte.

  • Records (products): Fields are encoded in declaration order with field tags (small integers). Each field is prefixed with its tag and length: \(\mathsf{enc}(\{l_1 = v_1, \ldots, l_n = v_n\}) = \mathsf{tag}(l_1) \cdot \mathsf{len}(v_1) \cdot \mathsf{enc}(v_1) \cdots \mathsf{tag}(l_n) \cdot \mathsf{len}(v_n) \cdot \mathsf{enc}(v_n)\).

  • Variants (sums): A variant tag (small integer) followed by the encoding of the payload: \(\mathsf{enc}(C_i(v)) = \mathsf{tag}(C_i) \cdot \mathsf{enc}(v)\).

  • Containers: Length prefix followed by element encodings: \(\mathsf{enc}([v_1, \ldots, v_n]) = n \cdot \mathsf{enc}(v_1) \cdots \mathsf{enc}(v_n)\).

5.3 Schema Evolution

One of the most challenging aspects of distributed systems is managing protocol evolution during rolling deployments. At any given time, different nodes may be running different versions of the code with different type definitions. Japl manages this through type compatibility rules derived from 8.

The following changes are backward compatible:

Schema Evolution Rules
Change Rule Example
Add optional field New field must have Option type (defaults to None) {id, name} \(\to\) {id, name, email: Option<String>}
Add variant New variant added to sum type; old code ignores it Active | Inactive \(\to\) Active | Inactive | Suspended
Widen field type Replace type with strictly more general type Int \(\to\) Float (numeric widening)
Rename field Old tag is preserved; new name is alias name \(\to\) display_name with @alias("name")

The following changes are not backward compatible and are flagged by the compiler when it detects version mismatches:

5.4 Versioned Types

For cases where incompatible changes are necessary, Japl supports explicit versioning:

type Order @version(2) = {
  id: OrderId,
  items: List<Item>,
  total: Money,
  status: OrderStatus,
  -- New in v2
  metadata: Option<Map<String, String>>
}

-- Migration function for v1 -> v2
migrate Order from 1 to 2 = fn(old) ->
  { old | metadata = None }

The runtime automatically applies migration functions when it receives a message with an older version tag. This happens transparently, with no programmer intervention at the receive site.

5.5 Comparison with External Schema Languages

Comparison of Serialization Approaches
Property JAPL Protobuf Avro Cap’n Proto
Schema language Types Separate Separate Separate
Code generation No Yes Yes Yes
Schema evolution Type rules Field numbers Schema registry Ordinals
Zero-copy Partial No No Yes
Type safety Full Partial None Partial
In-language Yes No No No

6 Location Transparency and Awareness

6.1 The Transparency Spectrum

Location transparency is not binary; it exists on a spectrum. At one extreme, every operation looks identical regardless of location (full transparency). At the other, every remote operation requires explicit handling of latency, failure, and serialization (full awareness). Japl occupies a principled position on this spectrum:

6.2 Spawn Remote

The Process.spawn_on primitive creates a process on a specified remote node. The returned PID is indistinguishable from a local PID in subsequent operations:

fn deploy_worker(target: Node, config: WorkerConfig)
    -> Pid[WorkerMsg] with Process, Net =
  let pid = Process.spawn_on(target, fn ->
    worker_loop(WorkerState.init(config))
  )
  Process.monitor(pid)
  pid

The runtime handles the mechanics of remote spawning through a careful distinction between code and data. The function passed to spawn_on must be a top-level function or a closure over serializable values. The code itself is not transferred: instead, the compiler emits a module-qualified function name (e.g., MyApp.Workers.worker_loop) that both nodes resolve to the same compiled code (ensured by the cluster’s version management). What is transferred is the closure’s captured environment—the values of all free variables in the closure body. The compiler statically verifies that every captured value satisfies \(\mathsf{Ser}(\tau)\); if any captured variable has a non-serializable type (e.g., a function type or a file handle), the program is rejected at compile time. This resolves the apparent tension with \(\neg\mathsf{Ser}(\tau \to \sigma)\): function values (closures with opaque code pointers) cannot be sent as messages, but spawn_on transmits a function reference (a name) plus serializable captured state, which the remote node reassembles into a runnable closure.

The returned PID encodes the remote node’s address for future communication.

6.3 Transparent Message Routing

When a process sends a message to a PID, the runtime checks whether the PID is local or remote:

  1. Local: The message is placed directly in the target process’s mailbox. Cost: O(1) pointer enqueue.

  2. Remote, connected: The message is serialized, transmitted over the existing TCP connection, deserialized, and placed in the remote mailbox. Cost: serialization + network latency + deserialization.

  3. Remote, disconnected: The message is buffered (up to a configurable limit). If the connection is re-established within the timeout, buffered messages are sent. Otherwise, a NodeDown message is delivered to monitoring processes.

6.4 When Transparency Helps

Location transparency is valuable when the programmer wants to write code that works identically regardless of deployment topology:

-- This function works whether the workers are local or remote
fn distribute_work(workers: List<Pid[WorkerMsg]>, tasks: List<Task>) -> Unit =
  List.zip(workers, tasks)
  |> List.each(fn (pid, task) -> Process.send(pid, DoWork(task)))

This enables testing distributed code locally (all processes on one node) and deploying it across multiple nodes without code changes.

6.5 When Awareness Is Needed

Some situations require explicit awareness of distribution:

-- Explicit timeout for remote operations
fn query_remote_service(service: Pid[QueryMsg], query: Query)
    -> Result<Response, DistError> with Process, Net =
  let ref = Process.monitor(service)
  Process.send(service, DoQuery(query, self()))
  Process.receive_with_timeout(5000) {
  | QueryResult(result) ->
      Process.demonitor(ref)
      Ok(result)
  | ProcessDown(^ref, reason) ->
      Err(ServiceCrashed(reason))
  | Timeout ->
      Process.demonitor(ref)
      Err(ServiceTimeout)
  }

The Net effect in the function signature signals to callers that this function involves network communication and may fail in distributed- specific ways.

7 Partition Tolerance

7.1 The CAP Context

The CAP theorem  establishes that a distributed system cannot simultaneously provide consistency, availability, and partition tolerance. Since network partitions are inevitable in practice , distributed systems must choose between consistency and availability during partitions.

Japl does not make this choice for the programmer. Instead, it provides the primitives to implement both CP and AP strategies, and uses the type system to make the choice explicit.

7.2 Process Model and Partitions

Japl’s process model naturally handles network partitions because processes are already isolated and communicate asynchronously. When a partition occurs:

  1. Messages to processes on unreachable nodes are buffered.

  2. Monitors trigger NodeDown events for unreachable nodes.

  3. Supervisors can apply partition-specific strategies.

  4. When the partition heals, connections are re-established and buffered messages are delivered.

-- Handling partitions in a supervisor
fn partition_aware_supervisor() -> Never with Process[SupervisorMsg], Net =
  match Process.receive() with
  | NodeDown(node, reason) ->
      match reason with
      | NetworkPartition ->
          -- Apply split-brain strategy
          let action = SplitBrain.evaluate(cluster_state(), node)
          match action with
          | KeepRunning -> partition_aware_supervisor()
          | StepDown -> Process.exit(PartitionStepDown)
          | FenceNode(n) -> Node.disconnect(n)
      | NodeCrash ->
          -- Restart children that were on the crashed node
          restart_children_on(available_node())
          partition_aware_supervisor()
  | PartitionHealed(node) ->
      -- Reconcile state with the re-connected node
      reconcile_state(node)
      partition_aware_supervisor()
  | msg ->
      handle_normal(msg)
      partition_aware_supervisor()

7.3 Split-Brain Strategies

Japl provides built-in split-brain resolution strategies:

A partition is viable if it contains a majority of configured nodes. Minority partitions shut down.

The partition containing the oldest node survives.

A designated referee node decides which partition survives.

The programmer provides a function \(\texttt{fn(ClusterState) -> SplitBrainAction}\).

-- Configuring split-brain strategy
let cluster = Cluster.start({
  name = "payment-service",
  nodes = seed_nodes,
  split_brain = SplitBrain.StaticQuorum({ quorum_size = 3 }),
  rejoin = Rejoin.CleanStart,
})

7.4 CRDTs for Partition-Tolerant State

For state that must remain available during partitions, Japl provides CRDT (Conflict-free Replicated Data Type) primitives :

-- A grow-only counter replicated across nodes
type DistributedCounter = crdt GCounter

fn increment_counter(counter: DistributedCounter) -> DistributedCounter =
  GCounter.increment(counter, Node.self())

-- CRDTs merge automatically when partitions heal
fn on_partition_heal(local: DistributedCounter, remote: DistributedCounter)
    -> DistributedCounter =
  GCounter.merge(local, remote)  -- always converges, no conflicts

8 Service Discovery and Clustering

8.1 Node Mesh

Japl nodes form a mesh network with full connectivity (every node maintains a connection to every other node, up to a configurable maximum). The mesh is established and maintained through a gossip-based membership protocol .

-- Starting a node with mesh configuration
let node = Node.start({
  name = "api-1",
  cookie = Env.get("CLUSTER_COOKIE"),
  listen = "0.0.0.0:9000",
  seeds = ["seed-1.internal:9000", "seed-2.internal:9000"],
  max_connections = 100,
})

8.2 Membership Protocol

The membership protocol ensures that all nodes have a consistent view of cluster membership:

  1. Join: A new node connects to a seed node and sends a join request. The seed gossips the new member to all other nodes.

  2. Heartbeat: Nodes exchange heartbeats at regular intervals. The phi accrual failure detector  determines when a node is unreachable.

  3. Leave: A node announces its intention to leave. Other nodes remove it from their membership list gracefully.

  4. Down: If a node fails to respond within the failure detection threshold, it is marked as down. Monitors are notified.

8.3 Service Registry

The service registry is a distributed key-value store mapping service names to sets of PIDs. It is replicated across all nodes using a CRDT (OR-Set):

-- Register the current process as a service
Registry.register("order-processor", self())

-- Register with metadata
Registry.register("order-processor", self(), {
  version = "2.1.0",
  region = "us-east-1",
  capacity = 100,
})

-- Look up all instances of a service
let processors: List<{pid: Pid[OrderMsg], meta: ServiceMeta}> =
  Registry.lookup("order-processor")

-- Look up with filter
let local_processors =
  Registry.lookup("order-processor")
  |> List.filter(fn entry -> entry.meta.region == Node.region())

-- Subscribe to changes
Registry.subscribe("order-processor", fn event ->
  match event with
  | ServiceRegistered(pid, meta) -> log("New processor: " ++ show(pid))
  | ServiceDeregistered(pid) -> log("Processor left: " ++ show(pid))
)

8.4 Health Checking

Japl provides built-in health checking at two levels:

The failure detector monitors node connectivity. Unreachable nodes trigger NodeDown events.

Registered services can define health check functions that are invoked periodically:

Registry.register("database-pool", self(), {
  health_check = fn pid ->
    let reply = Process.call(pid, HealthCheck, 5000)
    match reply with
    | Ok(Healthy) -> HealthStatus.Healthy
    | Ok(Degraded(reason)) -> HealthStatus.Degraded(reason)
    | Err(Timeout) -> HealthStatus.Unhealthy("timeout")
})

8.5 Load Balancing

The service registry integrates with load balancing strategies:

-- Round-robin selection from registered services
let pid = Registry.select("order-processor", Strategy.RoundRobin)

-- Least-loaded selection (requires capacity metadata)
let pid = Registry.select("order-processor", Strategy.LeastLoaded)

-- Locality-aware: prefer local node, then same region, then any
let pid = Registry.select("order-processor", Strategy.LocalFirst)

9 Comparison with Existing Approaches

Distribution Feature Comparison
Feature
Native distribution Lib Lib
Location transparency
Static typing
Type-derived serialization Partial Hash
Schema evolution Manual Manual Hash
Built-in supervision
Distributed supervision
Service discovery EPMD Config Silo
Effect-marked distribution
Capability security
CRDTs built-in Lib Lib
Partition strategies Lib

9.1 Erlang: The Gold Standard

Erlang is the closest language to Japl in distribution philosophy. Both treat distribution as native, both use location-transparent PIDs, and both provide supervision trees. The key differences are:

  1. Type safety. Erlang is dynamically typed. A serialization error in Erlang manifests as a runtime crash. In Japl, the compiler rejects programs that attempt to send non-serializable values across nodes.

  2. Schema evolution. Erlang’s External Term Format does not support schema evolution. Adding a field to a record requires coordinating all nodes. Japl’s type compatibility rules allow independent evolution.

  3. Effect tracking. Erlang does not distinguish local from distributed operations in types. Japl’s Net effect makes this distinction visible.

  4. Security. Erlang’s cookie-based authentication provides all-or-nothing access. Japl’s capability types allow fine-grained distributed access control.

9.2 Akka: Library-Level Actors on the JVM

Akka demonstrates that actor-based distribution can work as a library, but also reveals the costs:

9.3 Orleans: Virtual Actors

Orleans’ virtual actor model is elegant: actors always exist conceptually and are activated on demand. This eliminates lifecycle management but introduces other complexities:

Japl could implement a virtual-actor pattern as a library atop its native process model, providing the best of both worlds.

9.4 Go: No Built-In Distribution

Go provides goroutines and channels for local concurrency but offers nothing for distribution. Distributed Go programs use gRPC (with Protocol Buffers), HTTP APIs, or messaging systems like NATS. This means every distributed Go program must:

  1. Define Protocol Buffer schemas separately from Go types.

  2. Generate Go code from the schemas.

  3. Handle serialization errors at runtime.

  4. Implement service discovery using external systems (Consul, etcd).

  5. Implement health checking and failure detection manually.

9.5 Rust: No Built-In Distribution

Rust’s ownership model provides excellent local correctness guarantees but does not extend to distribution. Distributed Rust programs face the same challenges as Go, with the additional complexity of lifetime management across network boundaries. Libraries like Actix provide actor-like abstractions, but serialization (via Serde) is a separate concern.

9.6 Unison: Radical Content-Addressing

Unison’s content-addressed approach is the most radical departure from traditional distribution. By identifying code by hash, Unison eliminates versioning and serialization problems entirely—any value can be sent to any node, and if the code is not present, it can be transmitted.

However, Unison’s approach requires abandoning file-based source code, traditional version control, and familiar development workflows. Japl takes a more conservative approach that integrates with existing tools while still providing strong distribution guarantees.

10 Implementation Architecture

10.1 Network Layer

The network layer is organized in four tiers:

Network layer architecture.

10.2 Connection Pooling

Each node maintains a connection pool to every connected peer. Connections are multiplexed: a single TCP connection carries messages for all processes communicating between two nodes. This avoids the overhead of per-process connections while allowing fair scheduling across message streams.

The connection pool supports:

10.3 Message Encoding

Messages are encoded using the type-derived wire format described in 5. The encoding pipeline is:

  1. Serialization: The value is serialized to bytes using the type-derived encoder.

  2. Framing: The serialized bytes are wrapped in a frame with a header containing: message type tag, version, source PID, destination PID, and length.

  3. Compression: For messages above a configurable threshold (default: 1024 bytes), LZ4 compression is applied.

  4. Encryption: If the connection uses TLS, encryption is handled at the transport layer. For additional message-level encryption, the capability system can require encrypted channels.

The frame format is:

+--------+--------+--------+--------+--------+--------+
| Magic  | Flags  | MsgType| Version| Length           |
| (2B)   | (1B)   | (4B)   | (2B)   | (4B)            |
+--------+--------+--------+--------+--------+--------+
| Source PID (16B)          | Dest PID (16B)           |
+--------+--------+--------+--------+--------+--------+
| Capability Token (0 or 32B, if Flags & 0x10)         |
+--------+--------+--------+--------+--------+--------+
| Payload (variable length)                             |
+--------+--------+--------+--------+--------+--------+

10.4 Node Discovery Protocol

Node discovery uses a two-phase protocol:

The joining node connects to one or more seed nodes from its configuration. It sends a JoinRequest containing its name, capabilities, and listening address.

The seed node adds the new member to its membership list and gossips the change to all other nodes. Each node then establishes a direct connection to the new member.

The gossip protocol uses a push-pull model: each gossip round, a node selects a random peer, sends its membership state, and receives the peer’s state. States are merged using a vector clock for convergence.

10.5 Failure Detection: Phi Accrual

Japl uses the phi accrual failure detector  to determine node unreachability. Unlike binary failure detectors (alive or dead), the phi accrual detector outputs a continuous suspicion level \(\phi\) that represents the likelihood that a node has failed.

The suspicion level is computed from the distribution of inter-heartbeat intervals:

\[\phi(t) = -\log_{10}\left(1 - F(t - t_{\text{last}})\right)\]

where \(F\) is the cumulative distribution function of the normal distribution fitted to historical heartbeat intervals, \(t\) is the current time, and \(t_{\text{last}}\) is the time of the last received heartbeat.

Advantages of the phi accrual detector:

11 Distributed Security

11.1 Capability-Based Security Model

Japl’s distribution model uses capabilities—unforgeable tokens that represent permission to perform specific operations—as the foundation for distributed security. This approach is inspired by the E language  and the object-capability model .

A PID and a capability are distinct concepts. A Pid[Msg] is a location-transparent process address that enables message delivery; possessing a PID allows sending messages of type Msg to the identified process, but it does not grant permission to spawn processes on that node, access the service registry, or perform other privileged operations. For those, a separate capability token must be obtained. Capability tokens are cryptographically signed, unforgeable values that encode a specific permission scope. They are first-class values that can be passed in messages (they implement \(\mathsf{Ser}\)), attenuated, but never amplified.

-- A capability to spawn processes on a specific node.
-- This is a separate token, distinct from any PID.
type SpawnCapability = capability {
  node: Node,
  max_processes: Int,
  allowed_types: Set<TypeId>,
  token: CapabilityToken,  -- cryptographic proof of authority
}

-- Using the capability
fn deploy_service(cap: SpawnCapability, service: fn() -> Never)
    -> Result<Pid[ServiceMsg], CapError> with Process, Net =
  if SpawnCapability.check(cap, type_of(service)) then
    Ok(Process.spawn_on(cap.node, service))
  else
    Err(NotAllowed)

The SpawnCapability.check call is a compile-time verification of type compatibility (ensuring the service type is in allowed_types) combined with a runtime verification of the cryptographic token’s validity and the process count budget. The compile-time component is a static check inserted by the compiler; it rejects programs where the service type is provably outside the capability’s allowed_types. The runtime component validates the token signature and decrements the process counter, which cannot be checked statically because it depends on dynamic cluster state. This two-phase design ensures that no runtime latency is added for checks that can be resolved statically, while dynamic invariants are still enforced.

11.2 Node Authentication

When two Japl nodes establish a connection, they perform mutual authentication:

  1. TLS handshake: All inter-node connections use TLS 1.3 with mutual certificate verification.

  2. Cluster cookie: After TLS, nodes exchange a shared secret (the cluster cookie) as an additional authentication layer.

  3. Capability exchange: Nodes exchange their capability sets, establishing what operations each can perform on the other.

let node = Node.start({
  name = "api-1",
  cookie = Env.get("CLUSTER_COOKIE"),
  listen = "0.0.0.0:9000",
  tls = TlsConfig.from_files(
    cert = "/etc/japl/node.crt",
    key = "/etc/japl/node.key",
    ca = "/etc/japl/ca.crt",
  ),
  capabilities = NodeCapabilities.from([
    Cap.SpawnProcesses({ max = 1000 }),
    Cap.SendMessages({ rate_limit = 10000 }),
    Cap.AccessRegistry({ read = True, write = False }),
  ]),
})

11.3 Encrypted Channels

For sensitive messages, Japl supports end-to-end encrypted channels that are encrypted beyond the TLS transport layer:

-- Create an encrypted channel between two processes
let (send_chan, recv_chan) = Channel.encrypted(
  algorithm = Aes256Gcm,
  key = shared_key,
)

-- Messages on this channel are encrypted before serialization
Channel.send(send_chan, SensitiveData(patient_record))

11.4 Distributed Access Control

Capabilities can be attenuated (reduced in power) but never amplified. This follows the principle of least authority :

-- A full capability
let full_cap = RegistryCapability.full()

-- Attenuate to read-only
let read_cap = RegistryCapability.attenuate(full_cap, {
  read = True,
  write = False,
  subscribe = True,
})

-- Attenuate further to a specific service
let limited_cap = RegistryCapability.attenuate(read_cap, {
  service_filter = fn name -> name == "payment-service",
})

-- Send the limited capability to a remote process
Process.send(remote_pid, GrantAccess(limited_cap))

12 Case Study: Distributed Task Scheduler

We demonstrate Japl’s distribution model through a complete distributed task scheduler that manages work across multiple nodes, including process migration when nodes become overloaded.

12.1 System Architecture

The task scheduler consists of three components:

  1. A coordinator process that receives task submissions and assigns them to worker nodes.

  2. Worker pools on each node that execute tasks.

  3. A monitor that tracks node load and triggers process migration.

Distributed task scheduler architecture with process migration.

12.2 Message Types

type TaskId = String

type Task deriving(Serialize) = {
  id: TaskId,
  payload: Bytes,
  priority: Priority,
  deadline: Option<Timestamp>,
}

type Priority = High | Medium | Low
  deriving(Serialize, Ord)

type CoordinatorMsg =
  | SubmitTask(Task, Reply<Result<TaskId, SchedulerError>>)
  | TaskCompleted(TaskId, TaskResult)
  | WorkerOverloaded(Node, Float)
  | NodeJoined(Node)
  | NodeLeft(Node)

type WorkerMsg =
  | ExecuteTask(Task)
  | MigrateTask(Task, Node)
  | Shutdown

type MonitorMsg =
  | LoadReport(Node, Float)
  | CheckLoads

12.3 Coordinator Implementation

type SchedulerState = {
  workers: Map<Node, List<Pid<WorkerMsg>>>,
  pending: Map<TaskId, Task>,
  load: Map<Node, Float>,
  round_robin_idx: Int,
}

fn coordinator(state: SchedulerState) -> Never
    with Process[CoordinatorMsg], Net =
  match Process.receive() with
  | SubmitTask(task, reply) ->
      -- Select least-loaded node
      let target = select_node(state)
      match target with
      | Some(node) ->
          let worker = select_worker(state.workers, node)
          Process.send(worker, ExecuteTask(task))
          Reply.send(reply, Ok(task.id))
          let new_pending = Map.insert(state.pending, task.id, task)
          coordinator({ state | pending = new_pending })
      | None ->
          Reply.send(reply, Err(NoWorkersAvailable))
          coordinator(state)

  | TaskCompleted(task_id, result) ->
      let new_pending = Map.delete(state.pending, task_id)
      log("Task " ++ task_id ++ " completed: " ++ show(result))
      coordinator({ state | pending = new_pending })

  | WorkerOverloaded(node, load) ->
      -- Trigger migration from overloaded node
      let tasks_to_migrate = select_tasks_for_migration(state, node)
      let target = least_loaded_node(state, node)
      match target with
      | Some(target_node) ->
          List.each(tasks_to_migrate, fn task ->
            let worker = select_worker(state.workers, node)
            Process.send(worker, MigrateTask(task, target_node))
          )
      | None -> ()
      coordinator({ state | load = Map.insert(state.load, node, load) })

  | NodeJoined(node) ->
      -- Spawn workers on the new node
      let pids = List.range(1, 4)
        |> List.map(fn _ ->
             Process.spawn_on(node, fn -> worker_loop(WorkerState.init()))
           )
      let new_workers = Map.insert(state.workers, node, pids)
      coordinator({ state | workers = new_workers })

  | NodeLeft(node) ->
      -- Reassign tasks from the departed node
      let orphaned = get_tasks_on_node(state, node)
      List.each(orphaned, fn task ->
        Process.send(self(), SubmitTask(task, Reply.ignore()))
      )
      let new_workers = Map.delete(state.workers, node)
      coordinator({ state | workers = new_workers })

12.4 Worker Implementation

type WorkerState = {
  current_task: Option<Task>,
  tasks_completed: Int,
}

fn worker_loop(state: WorkerState) -> Never with Process[WorkerMsg] =
  match Process.receive() with
  | ExecuteTask(task) ->
      let result = execute(task)
      -- Report completion to coordinator
      Process.send(coordinator_pid(), TaskCompleted(task.id, result))
      worker_loop({ state |
        current_task = None,
        tasks_completed = state.tasks_completed + 1
      })

  | MigrateTask(task, target_node) ->
      -- Spawn a new worker on the target node with the task
      let new_worker = Process.spawn_on(target_node, fn ->
        let result = execute(task)
        Process.send(coordinator_pid(), TaskCompleted(task.id, result))
        worker_loop(WorkerState.init())
      )
      Process.monitor(new_worker)
      worker_loop({ state | current_task = None })

  | Shutdown ->
      log("Worker shutting down after " ++
          show(state.tasks_completed) ++ " tasks")
      Process.exit(Normal)

12.4.0.1 Mailbox forwarding during migration.

When a process “migrates” by spawning a replacement on a target node, the original PID remains valid. The Japl runtime installs a forwarding entry in the local node’s routing table: any message arriving for the old PID is transparently re-routed to the new PID on the target node. This forwarding is invisible to senders, preserving location transparency. The forwarding entry persists until (a) all monitors of the old PID have received a ProcessMigrated notification and updated their references, or (b) a configurable forwarding TTL expires, after which messages to the old PID result in a ProcessNotFound error delivered to the sender’s monitor. In the task scheduler above, the coordinator continues to hold the old worker PID until it receives the TaskCompleted message from the new worker; at that point it can update its routing table. For long-lived migrations, the runtime’s forwarding mechanism ensures no messages are lost during the transition window.

12.5 Load Monitor

fn load_monitor(state: MonitorState) -> Never
    with Process[MonitorMsg], Net =
  match Process.receive_with_timeout(5000) with
  | LoadReport(node, load) ->
      let new_state = { state |
        loads = Map.insert(state.loads, node, load)
      }
      -- Check if any node is overloaded
      if load > state.threshold then
        Process.send(state.coordinator, WorkerOverloaded(node, load))
      load_monitor(new_state)

  | CheckLoads ->
      -- Periodic check: request load from all nodes
      Map.keys(state.loads)
      |> List.each(fn node ->
           let load = Node.get_load(node)
           Process.send(self(), LoadReport(node, load))
         )
      load_monitor(state)

  | Timeout ->
      -- Timeout triggers periodic check
      Process.send(self(), CheckLoads)
      load_monitor(state)

12.6 Supervisor Tree

supervisor TaskScheduler {
  strategy: OneForOne
  max_restarts: 10
  max_seconds: 60

  child {
    id: "coordinator"
    start: fn -> coordinator(SchedulerState.init())
    restart: Permanent
  }

  child {
    id: "load_monitor"
    start: fn -> load_monitor(MonitorState.init())
    restart: Permanent
  }

  child {
    id: "worker_supervisor"
    start: fn -> worker_supervisor()
    restart: Permanent
  }
}

supervisor WorkerSupervisor {
  strategy: OneForOne

  -- Dynamic children: workers are added as nodes join
  dynamic: True
}

12.7 Starting the Distributed System

fn main() -> Unit with Io, Net, Process =
  -- Start the node
  let node = Node.start({
    name = "scheduler",
    cookie = Env.get("CLUSTER_COOKIE"),
    listen = "0.0.0.0:9000",
    seeds = Env.get("SEED_NODES") |> String.split(","),
  })

  -- Subscribe to cluster events
  Cluster.subscribe(fn event ->
    match event with
    | MemberJoined(node) ->
        Process.send(coordinator_pid(), NodeJoined(node))
    | MemberLeft(node) ->
        Process.send(coordinator_pid(), NodeLeft(node))
    | _ -> ()
  )

  -- Start the supervisor tree
  let sup = Supervisor.start(TaskScheduler)

  -- The system is now running. Block until shutdown.
  Process.receive_matching(fn msg ->
    match msg with | ShutdownSystem -> True | _ -> False
  )
  Supervisor.stop(sup)

12.8 Analysis

This case study demonstrates several advantages of Japl’s native distribution:

  1. No serialization code. All message types are automatically serializable via deriving(Serialize). No Protocol Buffer schemas, no code generation.

  2. Uniform local/remote syntax. The coordinator sends messages to workers using the same Process.send regardless of whether the worker is local or remote.

  3. Process migration. Migrating a task to another node is simply spawning a new process on the target node—no special migration framework needed.

  4. Fault tolerance. Supervision trees handle worker crashes, node departures, and coordinator failures.

  5. Service discovery. Cluster membership events are delivered as messages, integrating naturally with the process model.

A comparable system in Go or Rust would require approximately 2–3x more code, including Protocol Buffer definitions, gRPC service definitions, connection management, health checking, and explicit serialization/ deserialization at every network boundary.

13 Discussion

13.1 Limitations of Location Transparency

While Japl’s location transparency reduces syntactic burden, it does not eliminate the fundamental differences between local and remote communication . The Net effect mitigates this by making network operations visible in types, but the programmer must still reason about latency, partial failure, and ordering.

A potential future direction is a latency-aware type system where the type of a remote operation encodes expected latency bounds, allowing the compiler to warn about operations that may be too slow.

13.2 Serialization Overhead

Type-derived serialization is convenient but may not match the performance of hand-optimized binary protocols. For extremely high-throughput scenarios (millions of messages per second between two nodes), Japl allows dropping down to custom serialization:

-- Custom serializer for a performance-critical type
impl Serialize[MarketTick] =
  fn serialize(tick) =
    -- Hand-optimized packed encoding
    Bytes.builder()
    |> Bytes.write_u64(tick.timestamp)
    |> Bytes.write_u32(tick.symbol_id)
    |> Bytes.write_f64(tick.price)
    |> Bytes.build()

  fn deserialize(data) =
    let timestamp = Bytes.read_u64(data, 0)
    let symbol_id = Bytes.read_u32(data, 8)
    let price = Bytes.read_f64(data, 12)
    Ok({ timestamp, symbol_id, price })

13.3 Comparison with Microservice Architectures

Japl’s distribution model can be seen as an alternative to microservice architectures. Instead of deploying many independent services that communicate over HTTP/gRPC, a Japl application deploys as a cluster of nodes running a single program. The advantages are:

The disadvantage is reduced independence: all nodes must run compatible versions of the code. Japl’s schema evolution rules mitigate this for rolling deployments.

13.4 Scaling Considerations

The full mesh topology works well for clusters up to approximately 100–200 nodes. Beyond this, Japl supports hierarchical clustering where nodes form groups with full mesh within groups and delegate connections between groups. This is similar to Erlang’s hidden nodes or Akka’s cluster multi-DC support.

13.5 Formal Verification Opportunities

The combination of typed processes, effect tracking, and type-derived serialization opens opportunities for formal verification of distributed protocols. A Japl program can be translated to a process algebra model and model-checked for properties like deadlock freedom, liveness, and eventual consistency. We leave this as future work.

14 Conclusion

We have presented Japl’s approach to making distribution a native language concern. By integrating location-transparent process identifiers, type-derived wire protocols, effect-tracked network operations, distributed supervision, and built-in service discovery into the language, Japl eliminates the “distribution tax” that plagues most programming languages.

The key insight is that types are not merely local correctness artifacts; they are distributed protocol specifications. An algebraic data type definition simultaneously specifies the in-memory representation, the wire format, and the schema evolution rules. This unification eliminates the impedance mismatch between the language and the network.

Our formal framework, based on a location-aware \(\pi\)-calculus with type-safe serialization, provides rigorous foundations for reasoning about distributed Japl programs. The categorical semantics using fibered categories over network topology captures the essential structure of distributed computation: local fibers of processes over a global base of network topology.

The case study demonstrates that Japl’s native distribution support reduces distributed systems code by 40–60% compared to library-based approaches while providing stronger correctness guarantees. The combination of static type safety, location transparency, and practical distributed systems primitives is, to our knowledge, unique among production-oriented programming languages.

Distribution is not a library concern. It is a language concern. Japl treats it as such.

40

Lightbend. Akka: Build powerful reactive, concurrent, and distributed applications more easily. https://akka.io, 2009.

P. Alvaro, N. Conway, J. M. Hellerstein, and W. R. Marczak. Consistency analysis in Bloom: A CALM and collected approach. In Proc. CIDR, 2011.

J. Armstrong. Making reliable distributed systems in the presence of software errors. PhD thesis, Royal Institute of Technology, Stockholm, 2003.

J. Armstrong. Programming Erlang: Software for a Concurrent World. Pragmatic Bookshelf, 2007.

Apache Foundation. Apache Avro: A data serialization system. https://avro.apache.org, 2009.

P. Bailis and K. Kingsbury. The network is reliable. ACM Queue, 12(7):15, 2014.

P. A. Bernstein, S. Bykov, A. Geller, G. Kliot, and J. Thelin. Orleans: Distributed virtual actors for programmability and scalability. Technical Report MSR-TR-2014-41, Microsoft Research, 2014.

E. A. Brewer. Towards robust distributed systems. In Proc. ACM PODC, 2000. Invited talk.

S. Bykov, A. Geller, G. Kliot, J. R. Larus, R. Pandya, and J. Thelin. Orleans: Cloud computing for everyone. In Proc. ACM SoCC, pages 16:1–16:14, 2011.

K. Varda. Cap’n Proto: An insanely fast data interchange format. https://capnproto.org, 2013.

P. Chiusano and R. Bjarnason. Unison: A new approach to distributed programming. https://www.unison-lang.org, 2019.

N. Conway, W. R. Marczak, P. Alvaro, J. M. Hellerstein, and D. Maier. Logic and lattices for distributed programming. In Proc. ACM SoCC, 2012.

J. B. Dennis and E. C. Van Horn. Programming semantics for multiprogrammed computations. Communications of the ACM, 9(3):143–155, 1966.

J. Epstein, A. P. Black, and S. Peyton Jones. Towards Haskell in the cloud. In Proc. ACM Haskell Symposium, pages 118–129, 2011.

M. J. Fischer, N. A. Lynch, and M. S. Paterson. Impossibility of distributed consensus with one faulty process. Journal of the ACM, 32(2):374–382, 1985.

S. Gilbert and N. Lynch. Brewer’s conjecture and the feasibility of consistent, available, partition-tolerant web services. ACM SIGACT News, 33(2):51–59, 2002.

Google. Protocol Buffers: Language-neutral, platform-neutral extensible mechanisms for serializing structured data. https://protobuf.dev, 2008.

N. Hayashibara, X. Défago, R. Yared, and T. Katayama. The \(\varphi\) accrual failure detector. In Proc. IEEE SRDS, pages 66–78, 2004.

J. M. Hellerstein. The declarative imperative: Experiences and conjectures in distributed logic. ACM SIGMOD Record, 39(1):5–19, 2010.

C. Hewitt, P. Bishop, and R. Steiger. A universal modular ACTOR formalism for artificial intelligence. In Proc. IJCAI, pages 235–245, 1973.

B. Jacobs. Categorical Logic and Type Theory. Studies in Logic and the Foundations of Mathematics. Elsevier, 1999.

L. Lamport. Time, clocks, and the ordering of events in a distributed system. Communications of the ACM, 21(7):558–565, 1978.

L. Lamport. The part-time parliament. ACM Transactions on Computer Systems, 16(2):133–169, 1998.

B. Liskov and L. Shrira. Promises: Linguistic support for efficient asynchronous procedure calls in distributed systems. In Proc. ACM PLDI, pages 260–267, 1988.

M. S. Miller. Robust Composition: Towards a Unified Approach to Access Control and Concurrency Control. PhD thesis, Johns Hopkins University, 2006.

M. S. Miller, K.-P. Yee, and J. Shapiro. Capability myths demolished. Technical Report SRL2003-02, Johns Hopkins University, 2003.

R. Milner. Communicating and Mobile Systems: the \(\pi\)-Calculus. Cambridge University Press, 1999.

D. Ongaro and J. Ousterhout. In search of an understandable consensus algorithm. In Proc. USENIX ATC, pages 305–319, 2014.

D. Sangiorgi and D. Walker. The \(\pi\)-Calculus: A Theory of Mobile Processes. Cambridge University Press, 2001.

M. Shapiro, N. Preguiça, C. Baquero, and M. Zawirski. A comprehensive study of convergent and commutative replicated data types. INRIA Research Report RR-7506, 2011.

R. Van Renesse, Y. Minsky, and M. Hayden. A gossip-style failure detection service. In Proc. IFIP Middleware, pages 55–70, 1998.

R. Virding, C. Wikström, and M. Williams. Concurrent Programming in Erlang. Prentice Hall, 2nd edition, 1996.

P. Wadler. Propositions as sessions. Journal of Functional Programming, 24(2–3):384–418, 2015.

J. Waldo, G. Wyant, A. Wollrath, and S. Kendall. A note on distributed computing. Technical Report TR-94-29, Sun Microsystems Laboratories, 1994. Published in Mobile Object Systems, LNCS 1222, Springer, 1996.

D. Yuan, Y. Luo, X. Zhuang, G. R. Rodrigues, X. Zhao, Y. Zhang, P. U. Jain, and M. Stumm. Simple testing can prevent most critical failures: An analysis of production failures in distributed data-intensive systems. In Proc. USENIX OSDI, pages 249–265, 2014.

15 Full Message Frame Specification

Message Frame Fields
Field Size Description
Magic 2 bytes Protocol identifier (0x4A50, “JP”)
Flags 1 byte Bit flags: compressed (0x01), encrypted (0x02), priority (0x04), migration (0x08), capability (0x10)
Message Type 4 bytes Type tag derived from the algebraic data type hash
Version 2 bytes Schema version number
Length 4 bytes Payload length in bytes
Source PID 16 bytes Source process identifier (8 bytes node ID + 8 bytes process ID)
Dest PID 16 bytes Destination process identifier
Capability Token 0 or 32 bytes Optional capability token; present when the capability flag (0x10) is set in Flags. Contains a 32-byte cryptographically signed token authorizing the operation. Omitted (zero bytes) for ordinary messages that require no capability.
Payload variable Serialized message bytes

16 Serialization Encoding Details

16.1 Primitive Encodings

Primitive Type Wire Encodings
Type Encoding Notes
Bool 1 byte (0x00 or 0x01)
Int LEB128 (variable length) Zigzag encoding for signed integers
Float 8 bytes IEEE 754 big-endian
Float32 4 bytes IEEE 754 big-endian
Char 4 bytes UTF-32
String LEB128 length + UTF-8 bytes
Bytes LEB128 length + raw bytes
Unit 0 bytes
Pid 16 bytes (node ID + process ID)

16.2 Composite Encodings

Composite Type Wire Encodings
Type Encoding
Record \(\{l_1:\tau_1, \ldots\}\) For each field: LEB128 field tag + LEB128 field length + encoded value. Fields in tag order. Terminated by tag 0.
Variant \(C_i(\tau)\) LEB128 variant tag + encoded payload.
List[\(\tau\)] LEB128 count + encoded elements in order.
Map[\(k\),\(v\)] LEB128 count + (encoded key + encoded value) pairs in key order.
Option[\(\tau\)] 1 byte tag (0x00 = None, 0x01 = Some) + encoded value if Some.
Result[\(a\),\(e\)] 1 byte tag (0x00 = Ok, 0x01 = Err) + encoded value.

17 Node Discovery Protocol Messages

type DiscoveryMsg =
  | JoinRequest({
      name: String,
      listen_addr: String,
      capabilities: List<NodeCapability>,
      cookie_hash: Bytes,
    })
  | JoinAccepted({
      members: List<NodeInfo>,
      cluster_state: ClusterState,
    })
  | JoinRejected(JoinError)
  | GossipPush({
      membership: MembershipState,
      vector_clock: VectorClock,
    })
  | GossipPull({
      membership: MembershipState,
      vector_clock: VectorClock,
    })
  | Heartbeat({
      from: NodeId,
      timestamp: Timestamp,
      load: Float,
    })
  | HeartbeatAck({
      from: NodeId,
      timestamp: Timestamp,
    })

18 Phi Accrual Failure Detector Implementation

type PhiDetector = {
  window_size: Int,
  threshold: Float,
  heartbeat_history: CircularBuffer<Timestamp>,
}

fn phi_value(detector: PhiDetector, now: Timestamp) -> Float =
  let intervals = compute_intervals(detector.heartbeat_history)
  let mean = Stats.mean(intervals)
  let std_dev = Stats.std_dev(intervals)
  let time_since_last = now - CircularBuffer.last(detector.heartbeat_history)
  -- Phi = -log10(1 - CDF_normal(time_since_last, mean, std_dev))
  let p = Stats.normal_cdf(Float.from_int(time_since_last), mean, std_dev)
  if p >= 1.0 then 100.0  -- cap at high value
  else -Float.log10(1.0 - p)

fn is_unreachable(detector: PhiDetector, now: Timestamp) -> Bool =
  phi_value(detector, now) > detector.threshold

  1. The five core design principles of Japl are: (I) Strict Evaluation with Effect Tracking, (II) Algebraic Data Types and Pattern Matching, (III) Process-Oriented Concurrency, (IV) Modular Composition of Language Laws, and (V) Distribution Is a Native Language Concern.↩︎