Skip to content

phoenixframework/group

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

21 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Group

Eventually Consistent distributed process registry, process groups, lifecycle monitoring, and isolated subclusters for Elixir. No external dependencies.

Features

  • Registry — unique key-to-process mapping, cluster-wide. One process per key, enforced across all nodes.
  • Process groups — many processes per key with join/leave. Discoverable via members/2.
  • Lifecycle monitoring — pattern-based event subscriptions. Get notified when processes register, unregister, join, or leave anywhere in the cluster.
  • Named subclusters — isolate registries and groups into named clusters where only connected nodes participate.
  • Sharded writes — writes fan out across N GenServer shards to reduce contention. Reads go directly to ETS.

Installation

def deps do
  [{:group, "~> 0.1.0"}]
end

Quick Start

Start a Group instance under your supervision tree:

children = [
  {Group, name: :my_app}
]

Registry

# Register the calling process under a unique key
:ok = Group.register(:my_app, "user/123", %{name: "Alice"})

# Look up by key — returns {pid, meta} or nil
{pid, %{name: "Alice"}} = Group.lookup(:my_app, "user/123")

# Unregister (also happens automatically on process death)
:ok = Group.unregister(:my_app, "user/123")

Process Groups

# Join a group (many processes can join the same key)
:ok = Group.join(:my_app, "chat/room/42", %{role: :member})

# List all members — returns [{pid, meta}, ...]
members = Group.members(:my_app, "chat/room/42")

# Leave
:ok = Group.leave(:my_app, "chat/room/42")

members/2 returns joined processes for a key. Registered processes are not included — use lookup/2 for those. Keys ending with "/" perform a prefix query across all shards:

# All members in rooms under "chat/"
Group.members(:my_app, "chat/")

Monitoring

Subscribe to lifecycle events matching a pattern:

# Prefix match — all keys starting with "user/"
:ok = Group.monitor(:my_app, "user/")

# Exact match
:ok = Group.monitor(:my_app, "user/123")

# Everything
:ok = Group.monitor(:my_app, :all)

Events arrive as {:group, events, info} tuples in the monitoring process's mailbox:

def handle_info({:group, events, _info}, state) do
  Enum.each(events, fn
    %Group.Event{type: :registered, key: key, pid: pid, meta: meta} ->
      # a process registered at `key`
      :ok
    %Group.Event{type: :unregistered, key: key, meta: meta, reason: reason} ->
      # a registered process died or unregistered
      :ok
    %Group.Event{type: :joined, key: key, pid: pid, meta: meta} ->
      # a process joined the group at `key`
      :ok
    %Group.Event{type: :left, key: key, pid: pid, meta: meta, reason: reason} ->
      # a process left or died
      :ok
  end)
  {:noreply, state}
end

Single operations (register, join) produce one event per tuple. Bulk operations (nodedown, process death) batch all events from that operation into one tuple.

Dispatch

Send a message to all members of a key:

:ok = Group.dispatch(:my_app, "chat/room/42", {:new_message, "hello"})
:ok = Group.dispatch(:my_app, "chat/room/42", {:new_message, "hello"}, cluster: "servers_123")

Compared to Phoenix.PubSub, dispatch only broadcasts to nodes with at least one subscription and can also be tailored to a given cluster.

Named Clusters

Isolate groups and registries into named subclusters. Only nodes that have called connect/2 for a cluster participate in that cluster's replication.

# Connect this node to a named cluster
:ok = Group.connect(:my_app, "game_servers_123")

# Or lease the connection while this node still has local interest in it
:ok = Group.connect(:my_app, "game_servers_123", ttl: 30_000)

# All operations accept a :cluster option
:ok = Group.join(:my_app, "room/1", %{}, cluster: "game_servers_123")
members = Group.members(:my_app, "room/1", cluster: "game_servers_123")
:ok = Group.monitor(:my_app, :all, cluster: "game_servers_123")

TTL leases are local policy only:

  • Group.connect(..., ttl: ms) still does the normal ETS membership check first, so repeated connects while already connected stay a cheap noop and do not refresh the TTL.
  • When a TTL expires, Group only disconnects that named cluster if the local node has no cluster-scoped monitors, no local registrations, and no local group memberships in that cluster.
  • If local interest still exists, the next sweep extends the lease by one TTL interval and checks again later.

Nodes

# All Group peers (nodes that completed peer discovery), excluding self
Group.nodes(:my_app)

# All nodes in a named cluster
Group.nodes(:my_app, "game_servers_123")

Runtime Log Level

Toggle verbose logging at runtime without restart:

Group.log_level(:my_app, :verbose)  # turn on verbose
Group.log_level(:my_app, :info)     # back to normal
Group.log_level(:my_app, false)     # silence all Group logs

Group.log_level/2 updates :persistent_term, so it should be used as an occasional admin control, not from a hot path.

Events

Events are delivered as {:group, events, %{name: name}} tuples containing %Group.Event{} structs:

%Group.Event{
  type: :registered | :unregistered | :joined | :left,
  supervisor: :my_app,
  cluster: nil | "cluster_name",
  key: "user/123",
  pid: #PID<0.150.0>,
  meta: %{},
  previous_meta: nil | %{},    # old meta on re-register/re-join
  reason: nil | term()          # exit reason on unregistered/left
}
Event Trigger
:registered register/4 — new or re-register (updates meta)
:unregistered Process died or unregister/3 called
:joined join/4 — new or re-join (updates meta)
:left Process died or leave/3 called

Re-registering or re-joining an existing key updates the metadata in place and delivers an event with previous_meta set to the old value.

Consistency Model

All operations are eventually consistent:

  • Writes (register, join, etc.) return immediately after updating local ETS.
  • Changes replicate to other nodes asynchronously over Erlang distribution.
  • During network partitions, nodes may have divergent views.
  • When partitions heal, state is re-synced via cluster_state messages.
  • Registry conflicts (same key registered on two nodes during a partition) can be resolved with a configurable resolve_registry_conflict callback. The losing process is killed with {:group_registry_conflict, key, meta}.

Configuration

{Group,
  name: :my_app,
  shards: 8,                                   # number of write shards (default)
  log: :info,                                  # :info | :verbose | false
  resolve_registry_conflict: {MyResolver, :resolve, []},  # partition conflict resolver
  extract_meta: {MyApp, :extract_meta, []},    # transform meta on read
  replicated_pg_receiver_buffer_size: 64,      # buffered remote PG join/leave ops per shard
  replicated_pg_receiver_flush_interval: 5     # max buffer age in ms before flush
}

Options

  • name (required) — atom identifying this Group instance. Passed as the first argument to all API functions.
  • shards — number of GenServer shards for write operations. Defaults to 8. Must match across all nodes.
  • log — logging level. :info (default) logs peer discovery, node connects/disconnects, and cluster membership changes. :verbose additionally logs per-shard operations (register, join, leave, process deaths, replication). false disables all Group log output. All log output uses Logger.info. Can be changed at runtime with Group.log_level/2.
  • resolve_registry_conflict{module, function, extra_args} callback invoked as apply(mod, fun, [name, key, {pid1, meta1, time1}, {pid2, meta2, time2} | extra_args]). Called when partition healing or concurrent registration finds the same key registered on two nodes. Must return the winning pid. Runs synchronously inside the shard GenServer — must return quickly and never block.
  • extract_meta{module, function, args} or fun(meta) applied to metadata on reads (lookup, members). Useful for stripping internal fields.
  • replicated_pg_receiver_buffer_size — max buffered replicated PG join/leave ops per shard before the receiver flushes immediately. Defaults to 64.
  • replicated_pg_receiver_flush_interval — max time in milliseconds a shard will buffer replicated PG join/leave ops before flushing. Defaults to 5.

Architecture

Group.Supervisor (:"my_app_group_sup")
├── Group.Replica.Data        — owns all ETS tables, survives shard crashes
├── Group.Replica.Supervisor  — supervises N shard GenServers
│   ├── Group.Replica (shard 0)
│   ├── Group.Replica (shard 1)
│   └── ...
├── Registry                  — local monitor subscriptions (:"my_app_group_registry")
└── Group.ClusterLease        — local named-cluster TTL sweeper

Sharding

Keys are routed to shards via :erlang.phash2({cluster, key}, num_shards). Including the cluster in the hash avoids false contention between the default cluster and named clusters.

Reads (lookup, members) go directly to ETS — no GenServer hop. This is the hot path and runs at millions of ops/sec.

Writes (register, join, etc.) go through the shard's GenServer, which updates ETS and broadcasts replication messages. Multiple shards reduce write contention for unrelated keys.

ETS Tables

Each shard owns 4 ETS tables:

Table Type Key Purpose
reg_by_key :set {cluster, key} Registry lookup — O(1)
reg_by_pid :ordered_set {pid, cluster, key} Reverse index for death cleanup
pg_by_key :ordered_set {cluster, key, pid} Group membership lookup
pg_by_pid :ordered_set {pid, cluster, key} Reverse index for death cleanup

Plus 3 shared tables:

  • cluster_nodes (:bag, cluster→nodes)
  • node_clusters (:bag, node→clusters)
  • cluster_leases (:set, cluster→{ttl_ms, expires_at}) for local connect(..., ttl: ms) policy

cluster_nodes / node_clusters remain the authoritative cluster-membership tables. cluster_leases is only local lease metadata used by the sweeper.

Group.Replica.Data owns all tables and is supervised with rest_for_one so tables survive shard crashes.

Peer Discovery

When Group starts (or a new Erlang node connects), shards exchange peer_connect / peer_connect_ack messages with their counterparts on other nodes. This handshake:

  1. Validates that shard counts match (raises on mismatch).
  2. Exchanges cluster membership lists.
  3. Sends cluster_state snapshots for shared clusters — the full registry and group data, delivered in a single message per cluster.

This is how a new node catches up to the existing cluster state.

Replication

After the initial sync, steady-state changes propagate through separate sender and receiver batching lanes:

  • local writes enqueue outbound registry or PG replication in shard-local sender buffers
  • sender flushes group those ops by target node and send one replicate_registry_batch or replicate_pg_batch message per remote node
  • remote shards buffer those replicated registry / PG ops receiver-side, apply them in FIFO order with bulk ETS operations, then take a bounded fairness turn before yielding back to the mailbox

The sender flush timer is mainly a fallback for idle periods. Outbound buffers also flush immediately when they hit the configured size, when a new enqueue finds the buffer already past its flush interval, and before control or routing work such as cluster connect/disconnect or peer-protocol handling.

Named Cluster TTL Leases

Named-cluster TTLs are a local way to reduce replication fanout to nodes that no longer care about a cluster.

  • connect(..., ttl: ms) writes a lease row only when the cluster is newly connected.
  • A dedicated Group.ClusterLease process sweeps the local lease rows by nearest expiry.
  • On expiry, the sweeper extends the lease if the local node still has cluster-scoped monitors, local registry entries, or local PG memberships in that cluster.
  • Otherwise it runs the normal disconnect path, which removes the node from the named cluster and stops future replication for that cluster.

Process Death Cleanup

Shards monitor all registered/joined processes. On DOWN, the shard:

  1. Removes entries from both the primary and reverse-index ETS tables.
  2. Enqueues replicated unregister / leave ops into the sender batching lanes for peer nodes.
  3. Fires :unregistered / :left events to local monitors.

Node Disconnect

On nodedown, each shard purges all entries owned by the disconnected node from its ETS tables and fires events for each removed entry.

Testing

mix test

See test/README.md for details on the distributed test infrastructure.

Benchmarks

cd priv/bench

# Local (single-node)
./run_local.sh

# Distributed (3 separate BEAM VMs)
./run_distributed.sh
./run_distributed.sh --shards 4

See priv/bench/README.md for scenario descriptions.

License

MIT

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors