diff --git a/src/ice/agent.zig b/src/ice/agent.zig index bc4852b..2082f33 100644 --- a/src/ice/agent.zig +++ b/src/ice/agent.zig @@ -23,6 +23,7 @@ const failing_timeout: Io.Clock.Duration = .{ .clock = .awake, .raw = .fromSecon pub const AgentConfig = struct { on_connection_state_change: *const fn (*Agent, ice.ConnectionState) void, on_data: *const fn (*Agent, []const u8) void, + on_candidate: *const fn (*Agent, candidate: ?Candidate) void, /// Local credentials of the agent (ufrag and password) /// /// Generated automatically if not provided @@ -34,10 +35,12 @@ io: Io, allocator: Allocator, buffer_pool: std.heap.MemoryPool([max_message_size]u8), connection_state: ice.ConnectionState = .new, +gathering_state: ice.GatheringState = .new, // callbacks on_connection_state_change: *const fn (*Agent, ice.ConnectionState) void, on_data: *const fn (*Agent, []const u8) void, +on_candidate: *const fn (*Agent, candidate: ?Candidate) void, // Stun related fields role: ice.Role, @@ -56,8 +59,7 @@ selected_pair: ?SelectedPair = null, // This the final pair selected by this agent or the remote one. nominated_pair: ?SelectedPair = null, -// Io handling -group: Io.Group = .init, +mutex: Io.Mutex = .init, const SelectedPair = struct { pair: CandidatePair, @@ -103,13 +105,13 @@ pub fn init(io: Io, allocator: Allocator, config: AgentConfig) !Agent { .credentials = credens, .on_connection_state_change = config.on_connection_state_change, .on_data = config.on_data, + .on_candidate = config.on_candidate, }; } pub fn deinit(agent: *Agent) void { const io = agent.io; const allocator = agent.allocator; - agent.group.cancel(io); for (agent.sockets) |socket| socket.close(io); allocator.free(agent.sockets); @@ -127,7 +129,10 @@ pub fn deinit(agent: *Agent) void { /// /// This function should be run concurrently (e.g. in a `Io.Group`). pub fn startEventLoop(agent: *Agent) !void { - return agent.innerEventHandler(); + return agent.innerEventHandler() catch |err| switch (err) { + error.Canceled => error.Canceled, + else => |e| std.log.err("error in event loop: {}", .{e}), + }; } /// Set remote credentials @@ -155,9 +160,13 @@ pub fn addRemoteCandidate(agent: *Agent, remote_candidate: Candidate) !void { /// /// This function should be called first after initializing the agent. pub fn gatherCandidates(agent: *Agent) !void { + agent.gathering_state = .gathering; try agent.gatherHostCandidates(); agent.sockets = try initSockets(agent.io, agent.allocator, &agent.candidates); - try agent.group.concurrent(agent.io, innerEventHandlerWrapper, .{agent}); + + for (agent.candidates.items) |candidate| agent.on_candidate(agent, candidate); + agent.gathering_state = .complete; + agent.on_candidate(agent, null); } pub fn sendData(agent: *const Agent, data: []const u8) Socket.SendError!void { @@ -226,7 +235,7 @@ fn doAddRemoteCandidate(agent: *Agent, remote_candidate: Candidate) Allocator.Er if (pair.local.base.eql(&candidate.base) and pair.remote.address.eql(&remote_candidate.address)) continue :outer_loop; - try agent.pairs.append(agent.allocator, .{ + try agent.appendCandidatePair(.{ .local = candidate, .remote = remote_candidate, .priority = calculatePairPriority(candidate.priority, remote_candidate.priority, agent.role), @@ -258,7 +267,7 @@ fn handleRequest(agent: *Agent, msg: *const stun.Message, base_addr: IpAddress, .priority = stun_req.priority, }; - try agent.pairs.append(agent.allocator, .{ + try agent.appendCandidatePair(.{ .local = local, .remote = remote, .priority = calculatePairPriority(local.priority, remote.priority, agent.role), @@ -311,7 +320,7 @@ fn handleSuccessResponse(agent: *Agent, msg: *const stun.Message, base_addr: IpA return; } - try agent.pairs.append(agent.allocator, .{ + try agent.appendCandidatePair(.{ .local = local_candidate, .remote = candidate_pair.remote, .priority = calculatePairPriority(local_candidate.priority, candidate_pair.remote.priority, agent.role), @@ -493,6 +502,12 @@ fn setConnectionState(agent: *Agent, new_state: ice.ConnectionState) void { agent.on_connection_state_change(agent, new_state); } +fn appendCandidatePair(agent: *Agent, candidate_pair: CandidatePair) !void { + agent.mutex.lockUncancelable(agent.io); + defer agent.mutex.unlock(agent.io); + try agent.pairs.append(agent.allocator, candidate_pair); +} + // ============== Io related functions ====================== const MessageError = (Allocator.Error || Socket.ReceiveTimeoutError); @@ -510,13 +525,6 @@ const InnerEvent = union(enum) { keep_alive: Io.Cancelable!void, }; -fn innerEventHandlerWrapper(agent: *Agent) !void { - agent.innerEventHandler() catch |err| switch (err) { - error.Canceled => return error.Canceled, - else => |e| std.log.err("Error occurred in event handler: {}", .{e}), - }; -} - fn innerEventHandler(agent: *Agent) !void { const io = agent.io; const Select = Io.Select(InnerEvent); @@ -574,15 +582,15 @@ fn innerEventHandler(agent: *Agent) !void { defer agent.destroyPacket(resp); try message.socket.send(io, &sender, resp); - const candidate_pair: ?CandidatePair = blk: { + const nominated_pair: ?CandidatePair = blk: { if (agent.role == .controlling or agent.nominated_pair != null) break :blk null; for (agent.pairs.items) |candidate_pair| if (candidate_pair.state.nominated) break :blk candidate_pair; break :blk null; }; - if (candidate_pair != null) { + if (nominated_pair != null) { agent.nominated_pair = .{ - .pair = candidate_pair.?, + .pair = nominated_pair.?, .socket = message.socket.*, }; } @@ -743,6 +751,7 @@ fn testNewAgent() !Agent { return try .init(testing.io, testing.allocator, .{ .on_connection_state_change = undefined, .on_data = undefined, + .on_candidate = undefined, .role = .controlled, }); } @@ -769,6 +778,7 @@ test "init agent" { var agent: Agent = try .init(testing.io, testing.allocator, .{ .on_connection_state_change = undefined, .on_data = undefined, + .on_candidate = undefined, }); defer agent.deinit(); } diff --git a/src/ice/ice.zig b/src/ice/ice.zig index 1f180a8..8eea46a 100644 --- a/src/ice/ice.zig +++ b/src/ice/ice.zig @@ -6,6 +6,8 @@ const Io = std.Io; pub const ConnectionState = enum { new, checking, connected, completed, disconnected, failed, closed }; +pub const GatheringState = enum { new, gathering, complete }; + pub const Role = enum { controlling, controlled }; pub const CandidateType = enum { @@ -115,7 +117,7 @@ pub const Candidate = struct { } pub fn format(self: @This(), writer: *std.Io.Writer) !void { - try writer.print("{d} {} {s} {} ", .{ self.foundation, self.component, "udp", self.priority }); + try writer.print("{d} {} {s} {} ", .{ self.foundation, self.component, @tagName(self.transport), self.priority }); switch (self.address) { .ip4 => |ip| try writer.print("{d}.{d}.{d}.{d} {d} ", .{ ip.bytes[0],