Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 43 additions & 29 deletions lib/aikido/zen.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# frozen_string_literal: true

require_relative "zen/ipc"
require_relative "zen/helpers"
require_relative "zen/version"
require_relative "zen/errors"
Expand All @@ -10,10 +11,11 @@
require_relative "zen/worker"
require_relative "zen/agent"
require_relative "zen/api_client"
require_relative "zen/api_cache"
require_relative "zen/api_stream"
require_relative "zen/context"
require_relative "zen/current_context"
require_relative "zen/detached_agent"
require_relative "zen/worker_process"
require_relative "zen/middleware/middleware"
require_relative "zen/middleware/fork_detector"
require_relative "zen/middleware/context_setter"
Expand Down Expand Up @@ -86,6 +88,23 @@ def self.runtime_settings=(settings)
@runtime_settings = settings
end

def self.api_cache
@api_cache ||= APICache.new
end

def self.rate_limiter
@rate_limiter ||= RateLimiter.new
end

def self.calculate_rate_limits(request)
agent = @worker_process_client
agent ? agent.calculate_rate_limits(request) : rate_limiter.calculate_rate_limits(request)
end

def self.secret
@secret ||= SecureRandom.bytes(32)
end

# @return [Boolean] whether the Aikido agent is currently blocking requests.
# Blocking mode is configured at startup and can be controlled through the
# Aikido dashboard at runtime.
Expand Down Expand Up @@ -316,37 +335,29 @@ def self.load_sinks!
# Stop any background threads.
def self.stop!
@agent&.stop!
@detached_agent_server&.stop!
@worker_process_server&.stop!

@worker_process_client&.stop
end

# @!visibility private
# Starts the background agent if it has not been started yet.
def self.agent
@agent ||= Agent.start
@agent
end

def self.detached_agent
@detached_agent ||= DetachedAgent::Agent.new
def self.worker_process_server
@worker_process_server
end

def self.detached_agent_server
@detached_agent_server ||= DetachedAgent::Server.start
end
@has_started = Concurrent::AtomicBoolean.new(false)

class << self
# `agent` and `detached_agent` are started on the first method call.
# A mutex controls thread execution to prevent multiple attempts.
LOCK = Mutex.new

def start!
return unless start?

@pid = Process.pid
return unless @has_started.make_true

LOCK.synchronize do
agent
detached_agent_server
end
@worker_process_server = WorkerProcess::Agent::Server.start
@agent = Agent.start
end

def start?
Expand All @@ -355,18 +366,21 @@ def start?
config.debugging?
end

def check_and_handle_fork
handle_fork if forked?
end
def fork!
server = @worker_process_server
return unless server

def forked?
pid_changed = Process.pid != @pid
@pid = Process.pid
pid_changed
end
# TODO: Factor; stop the server and old client then start the new client

client = @worker_process_client
@worker_process_client = nil
client&.close

def handle_fork
@detached_agent&.handle_fork
client = WorkerProcess::Agent::Client.new(server.host, server.port)
client.start
@worker_process_client = client
rescue => err
config.logger.error("Forked worker process #{Process.pid}: failed to start worker process client: #{err.message}")
end
end

Expand Down
20 changes: 10 additions & 10 deletions lib/aikido/zen/actor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ def self.Actor(data)
class Actor
def self.from_json(data)
new(
id: data[:id],
name: data[:name],
ip: data[:lastIpAddress],
first_seen_at: Time.at(data[:firstSeenAt] / 1000),
last_seen_at: Time.at(data[:lastSeenAt] / 1000)
id: data["id"],
name: data["name"],
ip: data["lastIpAddress"],
first_seen_at: Time.at(data["firstSeenAt"] / 1000),
last_seen_at: Time.at(data["lastSeenAt"] / 1000)
)
end

Expand Down Expand Up @@ -135,11 +135,11 @@ def hash

def as_json
{
id: id,
name: name,
lastIpAddress: ip,
firstSeenAt: first_seen_at.to_i * 1000,
lastSeenAt: last_seen_at.to_i * 1000
"id" => id,
"name" => name,
"lastIpAddress" => ip,
"firstSeenAt" => first_seen_at.to_i * 1000,
"lastSeenAt" => last_seen_at.to_i * 1000
}.compact
end
end
Expand Down
4 changes: 2 additions & 2 deletions lib/aikido/zen/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ def self.start(**opts)
def initialize(
config: Aikido::Zen.config,
collector: Aikido::Zen.collector,
detached_agent: Aikido::Zen.detached_agent,
worker: Aikido::Zen::Worker.new(config: config),
api_client: Aikido::Zen::APIClient.new(config: config),
api_stream: Aikido::Zen::APIStream.new(config: config)
)
@config = config
@collector = collector
@detached_agent = detached_agent
@worker = worker
@api_client = api_client
@api_stream = api_stream
Expand Down Expand Up @@ -243,6 +241,7 @@ def heartbeats
def update_settings_from_runtime_config!(data)
return unless @runtime_config_update_mutex.try_lock
begin
Aikido::Zen.api_cache.runtime_config = data
Aikido::Zen.runtime_settings.update_from_runtime_config_json(data)
ensure
@runtime_config_update_mutex.unlock
Expand All @@ -254,6 +253,7 @@ def update_settings_from_runtime_config!(data)
def update_settings_from_runtime_firewall_lists!(data)
return unless @runtime_firewall_lists_update_mutex.try_lock
begin
Aikido::Zen.api_cache.runtime_firewall_lists = data
Aikido::Zen.runtime_settings.update_from_runtime_firewall_lists_json(data)
ensure
@runtime_firewall_lists_update_mutex.unlock
Expand Down
8 changes: 8 additions & 0 deletions lib/aikido/zen/api_cache.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

module Aikido::Zen
class APICache
attr_accessor :runtime_config
attr_accessor :runtime_firewall_lists
end
end
28 changes: 14 additions & 14 deletions lib/aikido/zen/attack.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ def metadata

def as_json
{
kind: kind,
blocked: blocked?,
metadata: metadata,
operation: @operation,
stack: @stack
"kind" => kind,
"blocked" => blocked?,
"metadata" => metadata,
"operation" => @operation,
"stack" => @stack
}.compact.merge(input.as_json)
end

Expand All @@ -70,7 +70,7 @@ def initialize(input:, filepath:, **opts)

def metadata
{
filename: filepath
"filename" => filepath
}
end

Expand Down Expand Up @@ -107,7 +107,7 @@ def kind

def metadata
{
command: @command
"command" => @command
}
end

Expand Down Expand Up @@ -139,9 +139,9 @@ def kind

def metadata
{
sql: @query,
dialect: @dialect.name,
failedToTokenize: @failed_to_tokenize || nil
"sql" => @query,
"dialect" => @dialect.name,
"failedToTokenize" => @failed_to_tokenize || nil
}.compact
end

Expand Down Expand Up @@ -174,8 +174,8 @@ def exception(*)

def metadata
{
hostname: @request.uri.hostname,
port: @request.uri.port.to_s
"hostname" => @request.uri.hostname,
"port" => @request.uri.port.to_s
}
end
end
Expand Down Expand Up @@ -212,8 +212,8 @@ def input

def metadata
{
hostname: @hostname,
privateIP: @address
"hostname" => @hostname,
"privateIP" => @address
}
end
end
Expand Down
16 changes: 8 additions & 8 deletions lib/aikido/zen/attack_wave.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ def initialize(ip_address:, user_agent:, source:)

def as_json
{
ipAddress: @ip_address,
userAgent: @user_agent,
source: @source
"ipAddress" => @ip_address,
"userAgent" => @user_agent,
"source" => @source
}.compact
end

Expand Down Expand Up @@ -101,10 +101,10 @@ def initialize(samples:, user:)

def as_json
{
metadata: {
samples: @samples.as_json.to_json # The API only accepts string values in metadata
"metadata" => {
"samples" => @samples.as_json.to_json # The API only accepts string values in metadata
},
user: @user.as_json
"user" => @user.as_json
}.compact
end

Expand All @@ -130,8 +130,8 @@ def initialize(verb:, path:)

def as_json
{
method: @verb.as_json,
url: @path.as_json
"method" => @verb.as_json,
"url" => @path.as_json
}.compact
end

Expand Down
Loading
Loading