上篇文章分析了UpdateEngine服务的启动,初始化了四种Action(InstallPlanAction, DownloadAction, FileSystemVerifierAction, PostinstallRunnerAction), 然后将四种Action添加到列表中,最后将Actions列表加入到ActionProcessor的工作任务中去。
ActionProcessor的启动
我们从ActionProcessor如何启动的开始分析。在上篇分析的UpdateAttermpterAndroid的ApplyPayload中,在调用了BuildUpdateAction(payload_url)之后,有调用 UpdateBootFlags(), 这个就是启动ActionProcessor的关键。
void UpdateAttempterAndroid::UpdateBootFlags() {
if (updated_boot_flags_) {
LOG(INFO) << "Already updated boot flags. Skipping.";
CompleteUpdateBootFlags(true);
return;
}
LOG(INFO) << "Marking booted slot as good.";
if (!boot_control_->MarkBootSuccessfulAsync(
Bind(&UpdateAttempterAndroid::CompleteUpdateBootFlags,
base::Unretained(this)))) {
LOG(ERROR) << "Failed to mark current boot as successful.";
CompleteUpdateBootFlags(false);
}
}
void UpdateAttempterAndroid::CompleteUpdateBootFlags(bool successful) {
updated_boot_flags_ = true;
ScheduleProcessingStart();
}
void UpdateAttempterAndroid::ScheduleProcessingStart() {
LOG(INFO) << "Scheduling an action processor start.";
brillo::MessageLoop::current()->PostTask(
FROM_HERE,
Bind([](ActionProcessor* processor) { processor->StartProcessing(); },
base::Unretained(processor_.get())));
}
void ActionProcessor::StartProcessing() {
CHECK(!IsRunning());
if (!actions_.empty()) {
current_action_ = actions_.front();
LOG(INFO) << "ActionProcessor: starting " << current_action_->Type();
actions_.pop_front();
current_action_->PerformAction();
}
}
从上面的注释中可以看到,在之前将各种Action 加入到Actions列表中后,然后交Actions列表中的action再遍历加到的ActionProcessor之后,就调用UpdateBootFlags来启动ActionProcessor了, 通过PostTask的方式来启动。启动之后调用第一个Action的performAction() 开始工作干活。
InstallPlanAction的工作
上面ActionProcessor的启动中分析到了StartProcessing()中,会启动actions列中的第一个acion。然后执行它的PerformAction. 之前我们有分析到 在UpdateAttermpterAndroid::BuildUpdateActions 中 依次将install_plan_action, download_action, filesystem_verifier_action, postinstall_runner_action 添加到了ActionProcessor中。这些Action都是从AbstractAction类上继承而来的。上节看到的调用第一个Action的PerformAction(), 第一个Action就是 InstallPlanAction,来看看InstallPlanAction的PerformAction():
void PerformAction() override {
if (HasOutputPipe()) {
SetOutputObject(install_plan_);
}
processor_->ActionComplete(this, ErrorCode::kSuccess);
}
void ActionProcessor::ActionComplete(AbstractAction* actionptr,
ErrorCode code) {
CHECK_EQ(actionptr, current_action_);
if (delegate_)
delegate_->ActionCompleted(this, actionptr, code);
string old_type = current_action_->Type();
current_action_->ActionCompleted(code);
current_action_->SetProcessor(nullptr);
current_action_ = nullptr;
LOG(INFO) << "ActionProcessor: finished "
<< (actions_.empty() ? "last action " : "") << old_type
<< (suspended_ ? " while suspended" : "")
<< " with code " << utils::ErrorCodeToString(code);
if (!actions_.empty() && code != ErrorCode::kSuccess) {
LOG(INFO) << "ActionProcessor: Aborting processing due to failure.";
actions_.clear();
}
if (suspended_) {
suspended_error_code_ = code;
return;
}
StartNextActionOrFinish(code);
}
void ActionProcessor::StartNextActionOrFinish(ErrorCode code) {
if (actions_.empty()) {
if (delegate_) {
delegate_->ProcessingDone(this, code);
}
return;
}
current_action_ = actions_.front();
actions_.pop_front();
LOG(INFO) << "ActionProcessor: starting " << current_action_->Type();
current_action_->PerformAction();
}
分析完上面的代码,在InstallPlanAction中,没有具体做任何事情,做一个ActionProcessor的ActionComplete,然后就开始下一个Action行为了。当然在ActionProcessor的ActionComplete中,UpdateAttermpterAndroid 作为其委托项,需要执行其ActionComplete,这里面针对DownloadAction和 PostInstallRunnerAction做一些特定的操作和保存其状态。
到这里InstallPlanAction就做完了,下一个Action行为就是DownloadAction行为。
DownloadAction 的工作
从上面就可以看到installPlanAction 执行完了就开始执行DownloadAction, 现在来看看DownloadAction的PerformAction():
void DownloadAction::PerformAction() {
http_fetcher_->set_delegate(this);
CHECK(HasInputObject());
install_plan_ = GetInputObject();
install_plan_.Dump();
bytes_received_ = 0;
bytes_received_previous_payloads_ = 0;
bytes_total_ = 0;
for (const auto& payload : install_plan_.payloads)
bytes_total_ += payload.size;
if (install_plan_.is_resume) {
int64_t payload_index = 0;
if (prefs_->GetInt64(kPrefsUpdateStatePayloadIndex, &payload_index) &&
static_cast<size_t>(payload_index) < install_plan_.payloads.size()) {
resume_payload_index_ = payload_index;
for (int i = 0; i < payload_index; i++)
install_plan_.payloads[i].already_applied = true;
}
}
if (!payload_)
payload_ = &install_plan_.payloads[0];
LOG(INFO) << "Marking new slot as unbootable";
if (!boot_control_->MarkSlotUnbootable(install_plan_.target_slot)) {
LOG(WARNING) << "Unable to mark new slot "
<< BootControlInterface::SlotName(install_plan_.target_slot)
<< ". Proceeding with the update anyway.";
}
StartDownloading();
}
void DownloadAction::StartDownloading() {
download_active_ = true;
http_fetcher_->ClearRanges();
if (install_plan_.is_resume &&
payload_ == &install_plan_.payloads[resume_payload_index_]) {
int64_t manifest_metadata_size = 0;
int64_t manifest_signature_size = 0;
prefs_->GetInt64(kPrefsManifestMetadataSize, &manifest_metadata_size);
prefs_->GetInt64(kPrefsManifestSignatureSize, &manifest_signature_size);
http_fetcher_->AddRange(base_offset_,
manifest_metadata_size + manifest_signature_size);
int64_t next_data_offset = 0;
prefs_->GetInt64(kPrefsUpdateStateNextDataOffset, &next_data_offset);
uint64_t resume_offset =
manifest_metadata_size + manifest_signature_size + next_data_offset;
if (!payload_->size) {
http_fetcher_->AddRange(base_offset_ + resume_offset);
} else if (resume_offset < payload_->size) {
http_fetcher_->AddRange(base_offset_ + resume_offset,
payload_->size - resume_offset);
}
} else {
if (payload_->size) {
http_fetcher_->AddRange(base_offset_, payload_->size);
} else {
http_fetcher_->AddRange(base_offset_);
}
}
if (writer_ && writer_ != delta_performer_.get()) {
LOG(INFO) << "Using writer for test.";
} else {
delta_performer_.reset(new DeltaPerformer(prefs_,
boot_control_,
hardware_,
delegate_,
&install_plan_,
payload_,
is_interactive_));
writer_ = delta_performer_.get();
}
if (system_state_ != nullptr) {
const PayloadStateInterface* payload_state = system_state_->payload_state();
string file_id = utils::CalculateP2PFileId(payload_->hash, payload_->size);
if (payload_state->GetUsingP2PForSharing()) {
p2p_file_id_ = file_id;
LOG(INFO) << "p2p file id: " << p2p_file_id_;
} else {
FilePath path = system_state_->p2p_manager()->FileGetPath(file_id);
if (!path.empty()) {
if (unlink(path.value().c_str()) != 0) {
PLOG(ERROR) << "Error deleting p2p file " << path.value();
} else {
LOG(INFO) << "Deleting partial p2p file " << path.value()
<< " since we're not using p2p to share.";
}
}
}
if (payload_state->GetUsingP2PForDownloading() &&
payload_state->GetP2PUrl() == install_plan_.download_url) {
LOG(INFO) << "Tweaking HTTP fetcher since we're downloading via p2p";
http_fetcher_->set_low_speed_limit(kDownloadP2PLowSpeedLimitBps,
kDownloadP2PLowSpeedTimeSeconds);
http_fetcher_->set_max_retry_count(kDownloadP2PMaxRetryCount);
http_fetcher_->set_connect_timeout(kDownloadP2PConnectTimeoutSeconds);
}
}
http_fetcher_->BeginTransfer(install_plan_.download_url);
}
上面主要是开始下载的工作,具体的http_fetcher_如何下载的,不是我们这篇的重点,我们后面有时间找一篇专门来分析。这个Action走完之后,就开始FilesystemVerifierAction的工作
FilesystemVerifierAction的工作
那直接分析FilesystemVerifierAction的PerformAction():
void FilesystemVerifierAction::PerformAction() {
ScopedActionCompleter abort_action_completer(processor_, this);
if (!HasInputObject()) {
LOG(ERROR) << "FilesystemVerifierAction missing input object.";
return;
}
install_plan_ = GetInputObject();
if (install_plan_.partitions.empty()) {
LOG(INFO) << "No partitions to verify.";
if (HasOutputPipe())
SetOutputObject(install_plan_);
abort_action_completer.set_code(ErrorCode::kSuccess);
return;
}
StartPartitionHashing();
abort_action_completer.set_should_complete(false);
}
void FilesystemVerifierAction::StartPartitionHashing() {
if (partition_index_ == install_plan_.partitions.size()) {
Cleanup(ErrorCode::kSuccess);
return;
}
InstallPlan::Partition& partition =
install_plan_.partitions[partition_index_];
string part_path;
switch (verifier_step_) {
case VerifierStep::kVerifySourceHash:
part_path = partition.source_path;
remaining_size_ = partition.source_size;
break;
case VerifierStep::kVerifyTargetHash:
part_path = partition.target_path;
remaining_size_ = partition.target_size;
break;
}
LOG(INFO) << "Hashing partition " << partition_index_ << " ("
<< partition.name << ") on device " << part_path;
if (part_path.empty())
return Cleanup(ErrorCode::kFilesystemVerifierError);
brillo::ErrorPtr error;
src_stream_ = brillo::FileStream::Open(
base::FilePath(part_path),
brillo::Stream::AccessMode::READ,
brillo::FileStream::Disposition::OPEN_EXISTING,
&error);
if (!src_stream_) {
LOG(ERROR) << "Unable to open " << part_path << " for reading";
return Cleanup(ErrorCode::kFilesystemVerifierError);
}
buffer_.resize(kReadFileBufferSize);
read_done_ = false;
hasher_.reset(new HashCalculator());
ScheduleRead();
}
void FilesystemVerifierAction::ScheduleRead() {
size_t bytes_to_read = std::min(static_cast<int64_t>(buffer_.size()),
remaining_size_);
if (!bytes_to_read) {
OnReadDoneCallback(0);
return;
}
bool read_async_ok = src_stream_->ReadAsync(
buffer_.data(),
bytes_to_read,
base::Bind(&FilesystemVerifierAction::OnReadDoneCallback,
base::Unretained(this)),
base::Bind(&FilesystemVerifierAction::OnReadErrorCallback,
base::Unretained(this)),
nullptr);
if (!read_async_ok) {
LOG(ERROR) << "Unable to schedule an asynchronous read from the stream.";
Cleanup(ErrorCode::kError);
}
}
在上面的startPartitionHashing中可以看到,有分一个验证阶段,第一个是kVerifySourceHash, 第二个是kVerifyTargetHash,默认的是kVerifyTargetHash,也就是从kVerifyTargetHash阶段开始,这个验证完了之后,再来验证kVerifySourceHash。打开了对应的分区路径之后,就开始了读取的过程,读取时又设置了两个回调,读取完成时回调OnReadDoneCallback, 读取出错时回调OnReadErrorCallback。 那我们继续来看OnReadDoneCallback。
void FilesystemVerifierAction::OnReadDoneCallback(size_t bytes_read) {
if (bytes_read == 0) {
read_done_ = true;
} else {
remaining_size_ -= bytes_read;
CHECK(!read_done_);
if (!hasher_->Update(buffer_.data(), bytes_read)) {
LOG(ERROR) << "Unable to update the hash.";
Cleanup(ErrorCode::kError);
return;
}
}
if (cancelled_)
return Cleanup(ErrorCode::kError);
if (read_done_ || remaining_size_ == 0) {
if (remaining_size_ != 0) {
LOG(ERROR) << "Failed to read the remaining " << remaining_size_
<< " bytes from partition "
<< install_plan_.partitions[partition_index_].name;
return Cleanup(ErrorCode::kFilesystemVerifierError);
}
return FinishPartitionHashing();
}
ScheduleRead();
}
一次读取结束后,更新剩余大小和读到数据的大小做判断,当前分区是否已经读取结束。如果读取没有结束,继续读取,不然就做结束的读写操作。
void FilesystemVerifierAction::FinishPartitionHashing() {
if (!hasher_->Finalize()) {
LOG(ERROR) << "Unable to finalize the hash.";
return Cleanup(ErrorCode::kError);
}
InstallPlan::Partition& partition =
install_plan_.partitions[partition_index_];
LOG(INFO) << "Hash of " << partition.name << ": "
<< Base64Encode(hasher_->raw_hash());
switch (verifier_step_) {
case VerifierStep::kVerifyTargetHash:
if (partition.target_hash != hasher_->raw_hash()) {
LOG(ERROR) << "New '" << partition.name
<< "' partition verification failed.";
if (partition.source_hash.empty()) {
return Cleanup(ErrorCode::kNewRootfsVerificationError);
}
verifier_step_ = VerifierStep::kVerifySourceHash;
} else {
partition_index_++;
}
break;
case VerifierStep::kVerifySourceHash:
if (partition.source_hash != hasher_->raw_hash()) {
return Cleanup(ErrorCode::kDownloadStateInitializationError);
}
return Cleanup(ErrorCode::kNewRootfsVerificationError);
}
hasher_.reset();
buffer_.clear();
src_stream_->CloseBlocking(nullptr);
StartPartitionHashing();
}
上面主要是针对FinishPartitionHashing()做的一些操作,首先结束hash的操作,然后获取当前分区信息,再根据当前验证阶段,是继续下一个分区的校验,还是结束校验。
有一点我们之前的分析中有漏掉的,就是partition的信息是如何获取的。往前查找下发现 partition的信息是manifest_里赋值出来的,但manifest_又是怎么来的呢,可以肯定的是从payload中获取的。那我们就来先从ParsePayloadMetadata这个函数。
MetadataParseResult DeltaPerformer::ParsePayloadMetadata(
const brillo::Blob& payload, ErrorCode* error) {
*error = ErrorCode::kSuccess;
if (!IsHeaderParsed()) {
MetadataParseResult result = payload_metadata_.ParsePayloadHeader(
payload, supported_major_version_, error);
if (result != MetadataParseResult::kSuccess)
return result;
metadata_size_ = payload_metadata_.GetMetadataSize();
metadata_signature_size_ = payload_metadata_.GetMetadataSignatureSize();
major_payload_version_ = payload_metadata_.GetMajorVersion();
if (install_plan_->hash_checks_mandatory) {
if (payload_->metadata_size != metadata_size_) {
LOG(ERROR) << "Mandatory metadata size in Omaha response ("
<< payload_->metadata_size
<< ") is missing/incorrect, actual = " << metadata_size_;
*error = ErrorCode::kDownloadInvalidMetadataSize;
return MetadataParseResult::kError;
}
}
}
if (payload.size() < metadata_size_ + metadata_signature_size_)
return MetadataParseResult::kInsufficientData;
if (payload_->metadata_size == metadata_size_) {
LOG(INFO) << "Manifest size in payload matches expected value from Omaha";
} else {
LOG(WARNING) << "Ignoring missing/incorrect metadata size ("
<< payload_->metadata_size
<< ") in Omaha response as validation is not mandatory. "
<< "Trusting metadata size in payload = " << metadata_size_;
}
base::FilePath path_to_public_key(public_key_path_);
base::FilePath tmp_key;
if (GetPublicKeyFromResponse(&tmp_key))
path_to_public_key = tmp_key;
ScopedPathUnlinker tmp_key_remover(tmp_key.value());
if (tmp_key.empty())
tmp_key_remover.set_should_remove(false);
*error = payload_metadata_.ValidateMetadataSignature(
payload, payload_->metadata_signature, path_to_public_key);
if (*error != ErrorCode::kSuccess) {
if (install_plan_->hash_checks_mandatory) {
LOG(ERROR) << "Mandatory metadata signature validation failed";
return MetadataParseResult::kError;
}
LOG(WARNING) << "Ignoring metadata signature validation failures";
*error = ErrorCode::kSuccess;
}
if (!payload_metadata_.GetManifest(payload, &manifest_)) {
LOG(ERROR) << "Unable to parse manifest in update file.";
*error = ErrorCode::kDownloadManifestParseError;
return MetadataParseResult::kError;
}
manifest_parsed_ = true;
return MetadataParseResult::kSuccess;
}
bool PayloadMetadata::GetManifest(const brillo::Blob& payload,
DeltaArchiveManifest* out_manifest) const {
uint64_t manifest_offset;
if (!GetManifestOffset(&manifest_offset))
return false;
CHECK_GE(payload.size(), manifest_offset + manifest_size_);
return out_manifest->ParseFromArray(&payload[manifest_offset],
manifest_size_);
}
到这里为止,我们分析ActionProcessor里的前三项工作已经分析完了,还有最后一个工作PostinstallRunnerAction,留到下篇里来分析。
|