envoy网络模型源码解析
一、envoy主入口的运行envoy 源码运行的主入口的运行流程我们看到envoy运行的主要核心入口在InstanceImpl::initialize,InstanceImpl::initialize到底做了什么呢?在源码里有了明确注释(很多功能工作中还没接触,所以也不知道是做什么用的)1.加载静态配置 解析yaml配置文件到bootstrap。注意envoy的bootstrap都对应的都是pro
一、envoy主入口的运行
envoy 源码运行的主入口的运行流程
我们看到envoy运行的主要核心入口在InstanceImpl::initialize,InstanceImpl::initialize到底做了什么呢?在源码里有了明确注释(很多功能工作中还没接触,所以也不知道是做什么用的)
1.加载静态配置 解析yaml配置文件到bootstrap。
注意envoy的bootstrap都对应的都是protobuf文件
// Handle configuration that needs to take place prior to the main configuration load.
InstanceUtil::loadBootstrapConfig(bootstrap_, options,
messageValidationContext().staticValidationVisitor(), *api_);
2.创建集群管理和所有主要的集群
3.通过bootstrap的配置项初始化各种各样的service服务
比如admin服务:
if (initial_config.admin().address()) {
admin_->startHttpListener(initial_config.admin().accessLogs(), options.adminAddressPath(),
initial_config.admin().address(),
initial_config.admin().socketOptions(),
stats_store_.createScope("listener.admin."));
} else {
ENVOY_LOG(warn, "No admin address given, so no admin HTTP server started.");
}
config_tracker_entry_ = admin_->getConfigTracker().add(
"bootstrap", [this](const Matchers::StringMatcher&) { return dumpBootstrapConfig(); });
if (initial_config.admin().address()) {
admin_->addListenerToHandler(handler_.get());
}
4.主集群初始化rtds,在提供xds之前使用运行时进行覆盖
5.初始化次要集群
6.重置被提供的动态配置
运行服务:
在run中去运行服务
void InstanceImpl::run() {
// RunHelper exists primarily to facilitate testing of how we respond to early shutdown during
// startup (see RunHelperTest in server_test.cc).
const auto run_helper = RunHelper(*this, options_, *dispatcher_, clusterManager(),
access_log_manager_, init_manager_, overloadManager(), [this] {
notifyCallbacksForStage(Stage::PostInit);
startWorkers();
});
// Run the main dispatch loop waiting to exit.
ENVOY_LOG(info, "starting main dispatch loop");
auto watchdog = main_thread_guard_dog_->createWatchDog(api_->threadFactory().currentThreadId(),
"main_thread", *dispatcher_);
dispatcher_->post([this] { notifyCallbacksForStage(Stage::Startup); });
dispatcher_->run(Event::Dispatcher::RunType::Block);
ENVOY_LOG(info, "main dispatch loop exited");
main_thread_guard_dog_->stopWatching(watchdog);
watchdog.reset();
terminate();
}
admin服务的运行和初始化:
admin_ = std::make_unique<AdminImpl>(initial_config.admin().profilePath(), *this);
admin_->startHttpListener(initial_config.admin().accessLogs(), options.adminAddressPath(),
initial_config.admin().address(),
initial_config.admin().socketOptions(),
stats_store_.createScope("listener.admin."));
startHttpListener 的具体实现:
void AdminImpl::startHttpListener(const std::list<AccessLog::InstanceSharedPtr>& access_logs,
const std::string& address_out_path,
Network::Address::InstanceConstSharedPtr address,
const Network::Socket::OptionsSharedPtr& socket_options,
Stats::ScopePtr&& listener_scope) {
for (const auto& access_log : access_logs) {
access_logs_.emplace_back(access_log);
}
null_overload_manager_.start();
socket_ = std::make_shared<Network::TcpListenSocket>(address, socket_options, true);
RELEASE_ASSERT(0 == socket_->ioHandle().listen(ENVOY_TCP_BACKLOG_SIZE).return_value_,
"listen() failed on admin listener");
socket_factory_ = std::make_unique<AdminListenSocketFactory>(socket_);
listener_ = std::make_unique<AdminListener>(*this, std::move(listener_scope));
ENVOY_LOG(info, "admin address: {}",
socket().connectionInfoProvider().localAddress()->asString());
if (!address_out_path.empty()) {
std::ofstream address_out_file(address_out_path);
if (!address_out_file) {
ENVOY_LOG(critical, "cannot open admin address output file {} for writing.",
address_out_path);
} else {
address_out_file << socket_->connectionInfoProvider().localAddress()->asString();
}
}
}
我们可以看到在这里几个关键的语句:
//监听了端口
RELEASE_ASSERT(0 == socket_->ioHandle().listen(ENVOY_TCP_BACKLOG_SIZE).return_value_,
"listen() failed on admin listener");
//创建了listener
listener_ = std::make_unique<AdminListener>(*this, std::move(listener_scope));
listner 底层是什么?
class AdminListener : public Network::ListenerConfig
底层是 ListenerConfig 这是一个非常重要的数据结构
二、envoy线程模型是什么
从网上摘到的一个比较好的图:
envoy线程数目是如何确定的?
ListenerManagerImpl::ListenerManagerImpl(Instance& server,
ListenerComponentFactory& listener_factory,
WorkerFactory& worker_factory,
bool enable_dispatcher_stats,
Quic::QuicStatNames& quic_stat_names)
: server_(server), factory_(listener_factory),
scope_(server.stats().createScope("listener_manager.")), stats_(generateStats(*scope_)),
config_tracker_entry_(server.admin().getConfigTracker().add(
"listeners",
[this](const Matchers::StringMatcher& name_matcher) {
return dumpListenerConfigs(name_matcher);
})),
enable_dispatcher_stats_(enable_dispatcher_stats), quic_stat_names_(quic_stat_names) {
for (uint32_t i = 0; i < server.options().concurrency(); i++) {
workers_.emplace_back(
worker_factory.createWorker(i, server.overloadManager(), absl::StrCat("worker_", i)));
}
}
我们可以很清楚看到通过concurrency来确定的
子线程的创建在哪里?
void ListenerManagerImpl::startWorkers(GuardDog& guard_dog, std::function<void()> callback) {
ENVOY_LOG(info, "all dependencies initialized. starting workers");
ASSERT(!workers_started_);
workers_started_ = true;
uint32_t i = 0;
absl::BlockingCounter workers_waiting_to_run(workers_.size());
Event::PostCb worker_started_running = [&workers_waiting_to_run]() {
workers_waiting_to_run.DecrementCount();
};
// We can not use "Cleanup" to simplify this logic here, because it results in a issue if Envoy is
// killed before workers are actually started. Specifically the AdminRequestGetStatsAndKill test
// case in main_common_test fails with ASAN error if we use "Cleanup" here.
const auto listeners_pending_init =
std::make_shared<std::atomic<uint64_t>>(workers_.size() * active_listeners_.size());
ASSERT(warming_listeners_.empty());
// We need to protect against inline deletion so have to use iterators directly.
for (auto listener_it = active_listeners_.begin(); listener_it != active_listeners_.end();) {
auto& listener = *listener_it;
listener_it++;
if (!doFinalPreWorkerListenerInit(*listener)) {
incListenerCreateFailureStat();
removeListenerInternal(listener->name(), false);
continue;
}
for (const auto& worker : workers_) {
addListenerToWorker(*worker, absl::nullopt, *listener,
[this, listeners_pending_init, callback]() {
if (--(*listeners_pending_init) == 0) {
stats_.workers_started_.set(1);
callback();
}
});
}
}
for (const auto& worker : workers_) {
ENVOY_LOG(debug, "starting worker {}", i);
worker->start(guard_dog, worker_started_running);
if (enable_dispatcher_stats_) {
worker->initializeStats(*scope_);
}
i++;
}
// Wait for workers to start running.
workers_waiting_to_run.Wait();
if (active_listeners_.empty()) {
stats_.workers_started_.set(1);
callback();
}
}
可以非常清晰看到这里启动了子线程
listener是如何和线程绑定的?
for (auto listener_it = active_listeners_.begin(); listener_it != active_listeners_.end();) {
auto& listener = *listener_it;
listener_it++;
if (!doFinalPreWorkerListenerInit(*listener)) {
incListenerCreateFailureStat();
removeListenerInternal(listener->name(), false);
continue;
}
for (const auto& worker : workers_) {
addListenerToWorker(*worker, absl::nullopt, *listener,
[this, listeners_pending_init, callback]() {
if (--(*listeners_pending_init) == 0) {
stats_.workers_started_.set(1);
callback();
}
});
}
}
他循环了整个listeners 然后讲单个listener与所有线程绑定,也就意味着listner里面有socket,listener的socket在哪里创建的?
listener的socket在什么时候创建的?
我们首先看
// Now the configuration gets parsed. The configuration may start setting
// thread local data per above. See MainImpl::initialize() for why ConfigImpl
// is constructed as part of the InstanceImpl and then populated once
// cluster_manager_factory_ is available.
config_.initialize(bootstrap_, *this, *cluster_manager_factory_);
进去后继续看
void MainImpl::initialize(const envoy::config::bootstrap::v3::Bootstrap& bootstrap,
Instance& server,
Upstream::ClusterManagerFactory& cluster_manager_factory) {
// In order to support dynamic configuration of tracing providers,
// a former server-wide HttpTracer singleton has been replaced by
// an HttpTracer instance per "envoy.filters.network.http_connection_manager" filter.
// Tracing configuration as part of bootstrap config is still supported,
// however, it's become mandatory to process it prior to static Listeners.
// Otherwise, static Listeners will be configured in assumption that
// tracing configuration is missing from the bootstrap config.
initializeTracers(bootstrap.tracing(), server);
const auto& secrets = bootstrap.static_resources().secrets();
ENVOY_LOG(info, "loading {} static secret(s)", secrets.size());
for (ssize_t i = 0; i < secrets.size(); i++) {
ENVOY_LOG(debug, "static secret #{}: {}", i, secrets[i].name());
server.secretManager().addStaticSecret(secrets[i]);
}
ENVOY_LOG(info, "loading {} cluster(s)", bootstrap.static_resources().clusters().size());
cluster_manager_ = cluster_manager_factory.clusterManagerFromProto(bootstrap);
const auto& listeners = bootstrap.static_resources().listeners();
ENVOY_LOG(info, "loading {} listener(s)", listeners.size());
for (ssize_t i = 0; i < listeners.size(); i++) {
ENVOY_LOG(debug, "listener #{}:", i);
server.listenerManager().addOrUpdateListener(listeners[i], "", false);
}
initializeWatchdogs(bootstrap, server);
initializeStatsConfig(bootstrap, server);
}
贴出envoy配置文件继续看
static_resources:
listeners:
- address:
socket_address:
address: 0.0.0.0
port_value: 8080
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
codec_type: AUTO
stat_prefix: ingress_http
route_config:
name: local_route
virtual_hosts:
- name: backend
domains:
- "*"
routes:
- match:
prefix: "/service/1"
route:
cluster: service1
- match:
prefix: "/service/2"
route:
cluster: service2
http_filters:
- name: envoy.filters.http.router
这里其实很好懂了。解析了配置文件,加到了listener里
for (ssize_t i = 0; i < listeners.size(); i++) {
ENVOY_LOG(debug, "listener #{}:", i);
server.listenerManager().addOrUpdateListener(listeners[i], "", false);
}
回到正题,我们已经知道了创建,再回头看addListenerToWorker,核心看到这里:
void ConnectionHandlerImpl::addListener(absl::optional<uint64_t> overridden_listener,
Network::ListenerConfig& config) {
const bool support_udp_in_place_filter_chain_update = Runtime::runtimeFeatureEnabled(
"envoy.reloadable_features.udp_listener_updates_filter_chain_in_place");
if (support_udp_in_place_filter_chain_update && overridden_listener.has_value()) {
ActiveListenerDetailsOptRef listener_detail =
findActiveListenerByTag(overridden_listener.value());
ASSERT(listener_detail.has_value());
listener_detail->get().listener_->updateListenerConfig(config);
return;
}
ActiveListenerDetails details;
if (config.listenSocketFactory().socketType() == Network::Socket::Type::Stream) {
if (!support_udp_in_place_filter_chain_update && overridden_listener.has_value()) {
for (auto& listener : listeners_) {
if (listener.second.listener_->listenerTag() == overridden_listener) {
listener.second.tcpListener()->get().updateListenerConfig(config);
return;
}
}
NOT_REACHED_GCOVR_EXCL_LINE;
}
// worker_index_ doesn't have a value on the main thread for the admin server.
auto tcp_listener = std::make_unique<ActiveTcpListener>(
*this, config, worker_index_.has_value() ? *worker_index_ : 0);
details.typed_listener_ = *tcp_listener;
details.listener_ = std::move(tcp_listener);
} else {
ASSERT(config.udpListenerConfig().has_value(), "UDP listener factory is not initialized.");
ASSERT(worker_index_.has_value());
ConnectionHandler::ActiveUdpListenerPtr udp_listener =
config.udpListenerConfig()->listenerFactory().createActiveUdpListener(*worker_index_, *this,
dispatcher_, config);
details.typed_listener_ = *udp_listener;
details.listener_ = std::move(udp_listener);
}
if (disable_listeners_) {
details.listener_->pauseListening();
}
if (auto* listener = details.listener_->listener(); listener != nullptr) {
listener->setRejectFraction(listener_reject_fraction_);
}
listeners_.emplace_back(config.listenSocketFactory().localAddress(), std::move(details));
}
看关键点了
auto tcp_listener = std::make_unique<ActiveTcpListener>(
*this, config, worker_index_.has_value() ? *worker_index_ : 0);
进去后我们看到
ActiveTcpListener::ActiveTcpListener(Network::TcpConnectionHandler& parent,
Network::ListenerConfig& config, uint32_t worker_index)
: OwnedActiveStreamListenerBase(parent, parent.dispatcher(),
parent.dispatcher().createListener(
config.listenSocketFactory().getListenSocket(worker_index),
*this, config.bindToPort()),
config),
tcp_conn_handler_(parent) {
config.connectionBalancer().registerHandler(*this);
}
关键的一行代码:
parent.dispatcher().createListener(
config.listenSocketFactory().getListenSocket(worker_index),
继续往里跟:
Network::ListenerPtr DispatcherImpl::createListener(Network::SocketSharedPtr&& socket,
Network::TcpListenerCallbacks& cb,
bool bind_to_port) {
ASSERT(isThreadSafe());
return std::make_unique<Network::TcpListenerImpl>(*this, api_.randomGenerator(),
std::move(socket), cb, bind_to_port);
}
看到这里我们就明白了:
TcpListenerImpl::TcpListenerImpl(Event::DispatcherImpl& dispatcher, Random::RandomGenerator& random,
SocketSharedPtr socket, TcpListenerCallbacks& cb,
bool bind_to_port)
: BaseListenerImpl(dispatcher, std::move(socket)), cb_(cb), random_(random),
bind_to_port_(bind_to_port), reject_fraction_(0.0) {
if (bind_to_port) {
// Although onSocketEvent drains to completion, use level triggered mode to avoid potential
// loss of the trigger due to transient accept errors.
socket_->ioHandle().initializeFileEvent(
dispatcher, [this](uint32_t events) -> void { onSocketEvent(events); },
Event::FileTriggerType::Level, Event::FileReadyType::Read);
}
}
在初始化ActiveTcpListener的时候,他把套接字绑定到了libevent中
listener是如何进行调度的?
我们之前看到了MainImpl::initialize里进行了addOrUpdateListener,继续看addOrUpdateListener里发生了什么
bool ListenerManagerImpl::addOrUpdateListener(const envoy::config::listener::v3::Listener& config,
const std::string& version_info, bool added_via_api) {
RELEASE_ASSERT(
!config.address().has_envoy_internal_address(),
fmt::format("listener {} has envoy internal address {}. Internal address cannot be used by "
"listener yet",
config.name(), config.address().envoy_internal_address().DebugString()));
// TODO(junr03): currently only one ApiListener can be installed via bootstrap to avoid having to
// build a collection of listeners, and to have to be able to warm and drain the listeners. In the
// future allow multiple ApiListeners, and allow them to be created via LDS as well as bootstrap.
if (config.has_api_listener()) {
if (!api_listener_ && !added_via_api) {
// TODO(junr03): dispatch to different concrete constructors when there are other
// ApiListenerImplBase derived classes.
api_listener_ = std::make_unique<HttpApiListener>(config, *this, config.name());
return true;
} else {
ENVOY_LOG(warn, "listener {} can not be added because currently only one ApiListener is "
"allowed, and it can only be added via bootstrap configuration");
return false;
}
}
std::string name;
if (!config.name().empty()) {
name = config.name();
} else {
name = server_.api().randomGenerator().uuid();
}
auto it = error_state_tracker_.find(name);
TRY_ASSERT_MAIN_THREAD {
return addOrUpdateListenerInternal(config, version_info, added_via_api, name);
}
END_TRY
catch (const EnvoyException& e) {
if (it == error_state_tracker_.end()) {
it = error_state_tracker_.emplace(name, std::make_unique<UpdateFailureState>()).first;
}
TimestampUtil::systemClockToTimestamp(server_.api().timeSource().systemTime(),
*(it->second->mutable_last_update_attempt()));
it->second->set_details(e.what());
it->second->mutable_failed_configuration()->PackFrom(config);
throw e;
}
error_state_tracker_.erase(it);
return false;
}
继续看addOrUpdateListenerInternal
bool ListenerManagerImpl::addOrUpdateListenerInternal(
const envoy::config::listener::v3::Listener& config, const std::string& version_info,
bool added_via_api, const std::string& name) {
if (listenersStopped(config)) {
ENVOY_LOG(
debug,
"listener {} can not be added because listeners in the traffic direction {} are stopped",
name, envoy::config::core::v3::TrafficDirection_Name(config.traffic_direction()));
return false;
}
const uint64_t hash = MessageUtil::hash(config);
ENVOY_LOG(debug, "begin add/update listener: name={} hash={}", name, hash);
auto existing_active_listener = getListenerByName(active_listeners_, name);
auto existing_warming_listener = getListenerByName(warming_listeners_, name);
// The listener should be updated back to its original state and the warming listener should be
// removed.
if (existing_warming_listener != warming_listeners_.end() &&
existing_active_listener != active_listeners_.end() &&
(*existing_active_listener)->blockUpdate(hash)) {
warming_listeners_.erase(existing_warming_listener);
updateWarmingActiveGauges();
stats_.listener_modified_.inc();
return true;
}
// Do a quick blocked update check before going further. This check needs to be done against both
// warming and active.
if ((existing_warming_listener != warming_listeners_.end() &&
(*existing_warming_listener)->blockUpdate(hash)) ||
(existing_active_listener != active_listeners_.end() &&
(*existing_active_listener)->blockUpdate(hash))) {
ENVOY_LOG(debug, "duplicate/locked listener '{}'. no add/update", name);
return false;
}
ListenerImplPtr new_listener = nullptr;
// In place filter chain update depends on the active listener at worker.
if (existing_active_listener != active_listeners_.end() &&
(*existing_active_listener)->supportUpdateFilterChain(config, workers_started_)) {
ENVOY_LOG(debug, "use in place update filter chain update path for listener name={} hash={}",
name, hash);
new_listener =
(*existing_active_listener)->newListenerWithFilterChain(config, workers_started_, hash);
stats_.listener_in_place_updated_.inc();
} else {
ENVOY_LOG(debug, "use full listener update path for listener name={} hash={}", name, hash);
new_listener =
std::make_unique<ListenerImpl>(config, version_info, *this, name, added_via_api,
workers_started_, hash, server_.options().concurrency());
}
ListenerImpl& new_listener_ref = *new_listener;
bool added = false;
if (existing_warming_listener != warming_listeners_.end()) {
// In this case we can just replace inline.
ASSERT(workers_started_);
new_listener->debugLog("update warming listener");
if (!(*existing_warming_listener)->hasCompatibleAddress(*new_listener)) {
setNewOrDrainingSocketFactory(name, config.address(), *new_listener);
} else {
new_listener->setSocketFactory((*existing_warming_listener)->getSocketFactory().clone());
}
*existing_warming_listener = std::move(new_listener);
} else if (existing_active_listener != active_listeners_.end()) {
// In this case we have no warming listener, so what we do depends on whether workers
// have been started or not.
if (!(*existing_active_listener)->hasCompatibleAddress(*new_listener)) {
setNewOrDrainingSocketFactory(name, config.address(), *new_listener);
} else {
new_listener->setSocketFactory((*existing_active_listener)->getSocketFactory().clone());
}
if (workers_started_) {
new_listener->debugLog("add warming listener");
warming_listeners_.emplace_back(std::move(new_listener));
} else {
new_listener->debugLog("update active listener");
*existing_active_listener = std::move(new_listener);
}
} else {
// We have no warming or active listener so we need to make a new one. What we do depends on
// whether workers have been started or not.
setNewOrDrainingSocketFactory(name, config.address(), *new_listener);
if (workers_started_) {
new_listener->debugLog("add warming listener");
warming_listeners_.emplace_back(std::move(new_listener));
} else {
new_listener->debugLog("add active listener");
active_listeners_.emplace_back(std::move(new_listener));
}
added = true;
}
updateWarmingActiveGauges();
if (added) {
stats_.listener_added_.inc();
} else {
stats_.listener_modified_.inc();
}
new_listener_ref.initialize();
return true;
}
核心代码是setNewOrDrainingSocketFactory,继续看setNewOrDrainingSocketFactory
void ListenerManagerImpl::setNewOrDrainingSocketFactory(
const std::string& name, const envoy::config::core::v3::Address& proto_address,
ListenerImpl& listener) {
// Typically we catch address issues when we try to bind to the same address multiple times.
// However, for listeners that do not bind we must check to make sure we are not duplicating. This
// is an edge case and nothing will explicitly break, but there is no possibility that two
// listeners that do not bind will ever be used. Only the first one will be used when searched for
// by address. Thus we block it.
if (!listener.bindToPort() && (hasListenerWithCompatibleAddress(warming_listeners_, listener) ||
hasListenerWithCompatibleAddress(active_listeners_, listener))) {
const std::string message =
fmt::format("error adding listener: '{}' has duplicate address '{}' as existing listener",
name, listener.address()->asString());
ENVOY_LOG(warn, "{}", message);
throw EnvoyException(message);
}
// Search through draining listeners to see if there is a listener that has a socket factory for
// the same address we are configured for. This is an edge case, but
// may happen if a listener is removed and then added back with a same or different name and
// intended to listen on the same address. This should work and not fail.
const Network::ListenSocketFactory* draining_listen_socket_factory = nullptr;
auto existing_draining_listener = std::find_if(
draining_listeners_.cbegin(), draining_listeners_.cend(),
[&listener](const DrainingListener& draining_listener) {
return draining_listener.listener_->listenSocketFactory().getListenSocket(0)->isOpen() &&
listener.hasCompatibleAddress(*draining_listener.listener_);
});
if (existing_draining_listener != draining_listeners_.cend()) {
draining_listen_socket_factory = &existing_draining_listener->listener_->getSocketFactory();
}
listener.setSocketFactory(draining_listen_socket_factory != nullptr
? draining_listen_socket_factory->clone()
: createListenSocketFactory(proto_address, listener));
}
关键代码是createListenSocketFactory,继续看createListenSocketFactory
Network::ListenSocketFactoryPtr ListenerManagerImpl::createListenSocketFactory(
const envoy::config::core::v3::Address& proto_address, ListenerImpl& listener) {
Network::Socket::Type socket_type = Network::Utility::protobufAddressSocketType(proto_address);
ListenerComponentFactory::BindType bind_type = ListenerComponentFactory::BindType::NoBind;
if (listener.bindToPort()) {
bind_type = listener.reusePort() ? ListenerComponentFactory::BindType::ReusePort
: ListenerComponentFactory::BindType::NoReusePort;
}
TRY_ASSERT_MAIN_THREAD {
return std::make_unique<ListenSocketFactoryImpl>(
factory_, listener.address(), socket_type, listener.listenSocketOptions(), listener.name(),
listener.tcpBacklogSize(), bind_type, server_.options().concurrency());
}
END_TRY
catch (const EnvoyException& e) {
ENVOY_LOG(error, "listener '{}' failed to bind or apply socket options: {}", listener.name(),
e.what());
incListenerCreateFailureStat();
throw e;
}
}
核心代码是
return std::make_unique<ListenSocketFactoryImpl>(
factory_, listener.address(), socket_type, listener.listenSocketOptions(), listener.name(),
listener.tcpBacklogSize(), bind_type, server_.options().concurrency());
注意server_.options().concurrency(),这个是要复制的listner的socket数目,和worker线程索引对应,继续看ListenSocketFactoryImpl的构造函数
ListenSocketFactoryImpl::ListenSocketFactoryImpl(
ListenerComponentFactory& factory, Network::Address::InstanceConstSharedPtr address,
Network::Socket::Type socket_type, const Network::Socket::OptionsSharedPtr& options,
const std::string& listener_name, uint32_t tcp_backlog_size,
ListenerComponentFactory::BindType bind_type, uint32_t num_sockets)
: factory_(factory), local_address_(address), socket_type_(socket_type), options_(options),
listener_name_(listener_name), tcp_backlog_size_(tcp_backlog_size), bind_type_(bind_type) {
if (local_address_->type() == Network::Address::Type::Ip) {
if (socket_type == Network::Socket::Type::Datagram) {
ASSERT(bind_type_ == ListenerComponentFactory::BindType::ReusePort);
}
} else {
ASSERT(local_address_->type() == Network::Address::Type::Pipe);
// Listeners with Unix domain socket always use shared socket.
// TODO(mattklein123): This should be blocked at the config parsing layer instead of getting
// here and disabling reuse_port.
if (bind_type_ == ListenerComponentFactory::BindType::ReusePort) {
bind_type_ = ListenerComponentFactory::BindType::NoReusePort;
}
}
sockets_.push_back(createListenSocketAndApplyOptions(factory, socket_type, 0));
if (sockets_[0] != nullptr && local_address_->ip() && local_address_->ip()->port() == 0) {
local_address_ = sockets_[0]->connectionInfoProvider().localAddress();
}
ENVOY_LOG(debug, "Set listener {} socket factory local address to {}", listener_name,
local_address_->asString());
// Now create the remainder of the sockets that will be used by the rest of the workers.
for (uint32_t i = 1; i < num_sockets; i++) {
if (bind_type_ != ListenerComponentFactory::BindType::ReusePort && sockets_[0] != nullptr) {
sockets_.push_back(sockets_[0]->duplicate());
} else {
sockets_.push_back(createListenSocketAndApplyOptions(factory, socket_type, i));
}
}
ASSERT(sockets_.size() == num_sockets);
}
注意了重点中的重点:
for (uint32_t i = 1; i < num_sockets; i++) {
if (bind_type_ != ListenerComponentFactory::BindType::ReusePort && sockets_[0] != nullptr) {
sockets_.push_back(sockets_[0]->duplicate());
} else {
sockets_.push_back(createListenSocketAndApplyOptions(factory, socket_type, i));
}
}
他在这里复制了多个listener的socket,数目和worker线程数一样,并且套接字选项使用的是reuseport来避免惊群,回到上面的代码
config.listenSocketFactory().getListenSocket(worker_index),
*this, config.bindToPort())
我们可以很清楚看到
Network::SocketSharedPtr ListenSocketFactoryImpl::getListenSocket(uint32_t worker_index) {
// Per the TODO above, sockets at this point can never be null. That only happens in the
// config validation path.
ASSERT(worker_index < sockets_.size() && sockets_[worker_index] != nullptr);
return sockets_[worker_index];
}
到这里我们很清楚listener怎么绑定到worker里的了,一张流程图揭秘:
复制套接字后分配到线程,通过使用reuseport选项,避免惊群,由内核负责线程调度,因为accept惊群问题再linux内核2.6后已经得到了完美解决
三、线程之间是如何调度通信的
随便照一个代码核心是 dispatcher->post
server_.dispatcher().post([this, draining_it]() -> void {
if (--draining_it->workers_pending_removal_ == 0) {
draining_it->listener_->debugLog("draining listener removal complete");
draining_listeners_.erase(draining_it);
stats_.total_listeners_draining_.set(draining_listeners_.size());
}
});
跟下去看具体实现:
void DispatcherImpl::post(std::function<void()> callback) {
bool do_post;
{
Thread::LockGuard lock(post_lock_);
do_post = post_callbacks_.empty();
post_callbacks_.push_back(callback);
}
if (do_post) {
post_cb_->scheduleCallbackCurrentIteration();
}
}
可以看到先写到
std::list<std::function<void()>> post_callbacks_ ABSL_GUARDED_BY(post_lock_);
然后再进行调度,调度实现:
主要是进行了唤醒功能
void SchedulableCallbackImpl::scheduleCallbackCurrentIteration() {
if (enabled()) {
return;
}
// event_active directly adds the event to the end of the work queue so it executes in the current
// iteration of the event loop.
event_active(&raw_event_, EV_TIMEOUT, 0);
}
event_active底层使用socketpair,当然你也可以用eventfd,当然这都无所谓,我们看下event_active的实现:
代码注释:
/* Helper callback: wake an event_base from another thread. This version
* works by writing a byte to one end of a socketpair, so that the event_base
* listening on the other end will wake up as the corresponding event
* triggers */
void
event_active(struct event *ev, int res, short ncalls)
{
if (EVUTIL_FAILURE_CHECK(!ev->ev_base)) {
event_warnx("%s: event has no event_base set.", __func__);
return;
}
EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);
event_debug_assert_is_setup_(ev);
event_active_nolock_(ev, res, ncalls);
EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);
}
具体唤醒回调,在初始化DispatcherImpl时候进行
DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api,
Event::TimeSystem& time_system,
const Buffer::WatermarkFactorySharedPtr& watermark_factory)
: DispatcherImpl(
name, api, time_system,
[](Dispatcher& dispatcher) {
return std::make_unique<ScaledRangeTimerManagerImpl>(dispatcher);
},
watermark_factory) {}
DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api,
Event::TimeSystem& time_system,
const ScaledRangeTimerManagerFactory& scaled_timer_factory,
const Buffer::WatermarkFactorySharedPtr& watermark_factory)
: name_(name), api_(api),
buffer_factory_(watermark_factory != nullptr
? watermark_factory
: std::make_shared<Buffer::WatermarkBufferFactory>(
api.bootstrap().overload_manager().buffer_factory_config())),
scheduler_(time_system.createScheduler(base_scheduler_, base_scheduler_)),
thread_local_delete_cb_(
base_scheduler_.createSchedulableCallback([this]() -> void { runThreadLocalDelete(); })),
deferred_delete_cb_(base_scheduler_.createSchedulableCallback(
[this]() -> void { clearDeferredDeleteList(); })),
post_cb_(base_scheduler_.createSchedulableCallback([this]() -> void { runPostCallbacks(); })),
current_to_delete_(&to_delete_1_), scaled_timer_manager_(scaled_timer_factory(*this)) {
ASSERT(!name_.empty());
FatalErrorHandler::registerFatalErrorHandler(*this);
updateApproximateMonotonicTimeInternal();
base_scheduler_.registerOnPrepareCallback(
std::bind(&DispatcherImpl::updateApproximateMonotonicTime, this));
}
核心在这里
post_cb_(base_scheduler_.createSchedulableCallback([this]() -> void { runPostCallbacks(); })),
每个线程会定时刷新callbacks,防止出现堆积,同时也可以被唤醒
SchedulableCallbackImpl::SchedulableCallbackImpl(Libevent::BasePtr& libevent,
std::function<void()> cb)
: cb_(cb) {
ASSERT(cb_);
evtimer_assign(
&raw_event_, libevent.get(),
[](evutil_socket_t, short, void* arg) -> void {
SchedulableCallbackImpl* cb = static_cast<SchedulableCallbackImpl*>(arg);
cb->cb_();
},
this);
}
void SchedulableCallbackImpl::scheduleCallbackCurrentIteration() {
if (enabled()) {
return;
}
// event_active directly adds the event to the end of the work queue so it executes in the current
// iteration of the event loop.
event_active(&raw_event_, EV_TIMEOUT, 0);
}
看libevent核心数据结构event
struct event {
struct event_callback ev_evcallback;
/* for managing timeouts */
union {
TAILQ_ENTRY(event) ev_next_with_common_timeout;
int min_heap_idx;
} ev_timeout_pos;
evutil_socket_t ev_fd;
struct event_base *ev_base;
union {
/* used for io events */
struct {
LIST_ENTRY (event) ev_io_next;
struct timeval ev_timeout;
} ev_io;
/* used by signal events */
struct {
LIST_ENTRY (event) ev_signal_next;
short ev_ncalls;
/* Allows deletes in callback */
short *ev_pncalls;
} ev_signal;
} ev_;
short ev_events;
short ev_res; /* result passed to event callback */
struct timeval ev_timeout;
};
更多推荐
所有评论(0)