Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 28 additions & 18 deletions src/ice/agent.zig
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const failing_timeout: Io.Clock.Duration = .{ .clock = .awake, .raw = .fromSecon
pub const AgentConfig = struct {
on_connection_state_change: *const fn (*Agent, ice.ConnectionState) void,
on_data: *const fn (*Agent, []const u8) void,
on_candidate: *const fn (*Agent, candidate: ?Candidate) void,
/// Local credentials of the agent (ufrag and password)
///
/// Generated automatically if not provided
Expand All @@ -34,10 +35,12 @@ io: Io,
allocator: Allocator,
buffer_pool: std.heap.MemoryPool([max_message_size]u8),
connection_state: ice.ConnectionState = .new,
gathering_state: ice.GatheringState = .new,

// callbacks
on_connection_state_change: *const fn (*Agent, ice.ConnectionState) void,
on_data: *const fn (*Agent, []const u8) void,
on_candidate: *const fn (*Agent, candidate: ?Candidate) void,

// Stun related fields
role: ice.Role,
Expand All @@ -56,8 +59,7 @@ selected_pair: ?SelectedPair = null,
// This the final pair selected by this agent or the remote one.
nominated_pair: ?SelectedPair = null,

// Io handling
group: Io.Group = .init,
mutex: Io.Mutex = .init,

const SelectedPair = struct {
pair: CandidatePair,
Expand Down Expand Up @@ -103,13 +105,13 @@ pub fn init(io: Io, allocator: Allocator, config: AgentConfig) !Agent {
.credentials = credens,
.on_connection_state_change = config.on_connection_state_change,
.on_data = config.on_data,
.on_candidate = config.on_candidate,
};
}

pub fn deinit(agent: *Agent) void {
const io = agent.io;
const allocator = agent.allocator;
agent.group.cancel(io);

for (agent.sockets) |socket| socket.close(io);
allocator.free(agent.sockets);
Expand All @@ -127,7 +129,10 @@ pub fn deinit(agent: *Agent) void {
///
/// This function should be run concurrently (e.g. in a `Io.Group`).
pub fn startEventLoop(agent: *Agent) !void {
return agent.innerEventHandler();
return agent.innerEventHandler() catch |err| switch (err) {
error.Canceled => error.Canceled,
else => |e| std.log.err("error in event loop: {}", .{e}),
};
}

/// Set remote credentials
Expand Down Expand Up @@ -155,9 +160,13 @@ pub fn addRemoteCandidate(agent: *Agent, remote_candidate: Candidate) !void {
///
/// This function should be called first after initializing the agent.
pub fn gatherCandidates(agent: *Agent) !void {
agent.gathering_state = .gathering;
try agent.gatherHostCandidates();
agent.sockets = try initSockets(agent.io, agent.allocator, &agent.candidates);
try agent.group.concurrent(agent.io, innerEventHandlerWrapper, .{agent});

for (agent.candidates.items) |candidate| agent.on_candidate(agent, candidate);
agent.gathering_state = .complete;
agent.on_candidate(agent, null);
}

pub fn sendData(agent: *const Agent, data: []const u8) Socket.SendError!void {
Expand Down Expand Up @@ -226,7 +235,7 @@ fn doAddRemoteCandidate(agent: *Agent, remote_candidate: Candidate) Allocator.Er
if (pair.local.base.eql(&candidate.base) and pair.remote.address.eql(&remote_candidate.address))
continue :outer_loop;

try agent.pairs.append(agent.allocator, .{
try agent.appendCandidatePair(.{
.local = candidate,
.remote = remote_candidate,
.priority = calculatePairPriority(candidate.priority, remote_candidate.priority, agent.role),
Expand Down Expand Up @@ -258,7 +267,7 @@ fn handleRequest(agent: *Agent, msg: *const stun.Message, base_addr: IpAddress,
.priority = stun_req.priority,
};

try agent.pairs.append(agent.allocator, .{
try agent.appendCandidatePair(.{
.local = local,
.remote = remote,
.priority = calculatePairPriority(local.priority, remote.priority, agent.role),
Expand Down Expand Up @@ -311,7 +320,7 @@ fn handleSuccessResponse(agent: *Agent, msg: *const stun.Message, base_addr: IpA
return;
}

try agent.pairs.append(agent.allocator, .{
try agent.appendCandidatePair(.{
.local = local_candidate,
.remote = candidate_pair.remote,
.priority = calculatePairPriority(local_candidate.priority, candidate_pair.remote.priority, agent.role),
Expand Down Expand Up @@ -493,6 +502,12 @@ fn setConnectionState(agent: *Agent, new_state: ice.ConnectionState) void {
agent.on_connection_state_change(agent, new_state);
}

fn appendCandidatePair(agent: *Agent, candidate_pair: CandidatePair) !void {
agent.mutex.lockUncancelable(agent.io);
defer agent.mutex.unlock(agent.io);
try agent.pairs.append(agent.allocator, candidate_pair);
}

// ============== Io related functions ======================
const MessageError = (Allocator.Error || Socket.ReceiveTimeoutError);

Expand All @@ -510,13 +525,6 @@ const InnerEvent = union(enum) {
keep_alive: Io.Cancelable!void,
};

fn innerEventHandlerWrapper(agent: *Agent) !void {
agent.innerEventHandler() catch |err| switch (err) {
error.Canceled => return error.Canceled,
else => |e| std.log.err("Error occurred in event handler: {}", .{e}),
};
}

fn innerEventHandler(agent: *Agent) !void {
const io = agent.io;
const Select = Io.Select(InnerEvent);
Expand Down Expand Up @@ -574,15 +582,15 @@ fn innerEventHandler(agent: *Agent) !void {
defer agent.destroyPacket(resp);
try message.socket.send(io, &sender, resp);

const candidate_pair: ?CandidatePair = blk: {
const nominated_pair: ?CandidatePair = blk: {
if (agent.role == .controlling or agent.nominated_pair != null) break :blk null;
for (agent.pairs.items) |candidate_pair| if (candidate_pair.state.nominated) break :blk candidate_pair;
break :blk null;
};

if (candidate_pair != null) {
if (nominated_pair != null) {
agent.nominated_pair = .{
.pair = candidate_pair.?,
.pair = nominated_pair.?,
.socket = message.socket.*,
};
}
Expand Down Expand Up @@ -743,6 +751,7 @@ fn testNewAgent() !Agent {
return try .init(testing.io, testing.allocator, .{
.on_connection_state_change = undefined,
.on_data = undefined,
.on_candidate = undefined,
.role = .controlled,
});
}
Expand All @@ -769,6 +778,7 @@ test "init agent" {
var agent: Agent = try .init(testing.io, testing.allocator, .{
.on_connection_state_change = undefined,
.on_data = undefined,
.on_candidate = undefined,
});
defer agent.deinit();
}
Expand Down
4 changes: 3 additions & 1 deletion src/ice/ice.zig
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ const Io = std.Io;

pub const ConnectionState = enum { new, checking, connected, completed, disconnected, failed, closed };

pub const GatheringState = enum { new, gathering, complete };

pub const Role = enum { controlling, controlled };

pub const CandidateType = enum {
Expand Down Expand Up @@ -115,7 +117,7 @@ pub const Candidate = struct {
}

pub fn format(self: @This(), writer: *std.Io.Writer) !void {
try writer.print("{d} {} {s} {} ", .{ self.foundation, self.component, "udp", self.priority });
try writer.print("{d} {} {s} {} ", .{ self.foundation, self.component, @tagName(self.transport), self.priority });
switch (self.address) {
.ip4 => |ip| try writer.print("{d}.{d}.{d}.{d} {d} ", .{
ip.bytes[0],
Expand Down
Loading