IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 三、区分存储对象的不同版本 -> 正文阅读

[大数据]三、区分存储对象的不同版本

架构

?(架构图源于参考书籍)

Elasticsearch

官方简介:Elasticsearch 是一个分布式、RESTful 风格的搜索和数据分析引擎,能够解决不断涌现出的各种用例。 作为 Elastic Stack 的核心,它集中存储您的数据,帮助您发现意料之中以及意料之外的情况。

# 下载
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.15.1-linux-x86_64.tar.gz

# 解压
tar xzvf elasticsearch-7.15.1-linux-x86_64.tar.gz

# 以非 root 用户启动
cd /elasticsearch-7.15.1/bin/
./elasticsearch

# 检验是否启动成功,172.16.16.4 为 elasticsearch.yml 配置绑定的 IP 地址
curl 172.16.16.4:9200

若无法正常启动,则修改配置:

/home/sam/elasticsearch-7.15.1/config

修改 jvm.options 中内存配置:
-Xms256m
-Xmx256m

修改 vim elasticsearch.yml :
cluster.name: my-application
node.name: node-1
network.host: 172.16.16.4
http.port: 9200
discovery.seed_hosts: ["172.26.26.4", "::1"]
cluster.initial_master_nodes: ["node-1"]

创建映射

创建 metadata 索引以及 objects 类型的映射:

curl -H "Content-Type: application/json" -XPUT 172.16.16.4:9200/metadata?include_type_name=true -d'{"mappings":{"objects":{"properties":{"name":{"type":"text","index":"false"},"version":{"type":"integer"},"size":{"type":"integer"},"hash":{"type":"text"}}}}}'

ES包封装

该 ES 包封装了以 HTTP 访问 ES 的各种 API 的操作。

package es

/* 该 ES 包封装了以 HTTP 访问 ES 的各种 API 的操作 */
import (
	"demo/sys"
	"encoding/json"
	"fmt"
	"io/ioutil"
	"net/http"
	"net/url"
	"os"
	"strings"
)

/* 元数据结构体 */
type Metadata struct {
	Name    string
	Version int
	Size    int64
	Hash    string
}

type hit struct {
	Source Metadata `json:"_source"`
}

type searchResult struct {
	Hits struct {
		Total int
		Hits  []hit
	}
}

/*根据对象的名称和版本号来获取元数据*/
func getMetadata(name string, versionId int) (meta Metadata, e error) {
	// 索引为 metadata ,类型为 objects,文档 id 为对象名称和版本号的拼接
	url := fmt.Sprintf(sys.GetMetadataUrl, os.Getenv(sys.EsServer), name, versionId)
	// 通过 GET URL 可以直接获取该对象的元数据,免除了耗时的搜索操作
	r, e := http.Get(url)
	if e != nil {
		return
	}
	if r.StatusCode != http.StatusOK {
		e = fmt.Errorf(sys.FailToGetMetadata, name, versionId, r.StatusCode)
		return
	}
	result, _ := ioutil.ReadAll(r.Body)
	// 将请求结果反序列化为元数据结构
	json.Unmarshal(result, &meta)
	return
}

/*根据对象名称获取最新版本的元数据*/
func SearchLatestVersion(name string) (meta Metadata, e error) {
	// 构建 url 时需要将名称转移成 url 字符
	url := fmt.Sprintf(sys.SearchLatestVersionUrl, os.Getenv(sys.EsServer), url.PathEscape(name))
	r, e := http.Get(url)
	if e != nil {
		return
	}
	if r.StatusCode != http.StatusOK {
		e = fmt.Errorf(sys.FailToSearchLatestMetadata, r.StatusCode)
		return
	}
	result, _ := ioutil.ReadAll(r.Body)
	var sr searchResult
	// 请求结果反序列化
	json.Unmarshal(result, &sr)
	// 如果长度为 0 则没有搜索结果,直接返回
	if len(sr.Hits.Hits) != 0 {
		meta = sr.Hits.Hits[0].Source
	}
	return
}

/*根据对象的名称和版本号来获取元数据*/
func GetMetadata(name string, version int) (Metadata, error) {
	// 没有指定版本号时默认返回最新版本的元数据
	if version == 0 {
		return SearchLatestVersion(name)
	}
	return getMetadata(name, version)
}

/*向 ES 服务上传一个新的元数据*/
func PutMetadata(name string, version int, size int64, hash string) error {
	doc := fmt.Sprintf(sys.MetadataJson, name, version, size, hash)
	client := http.Client{}
	url := fmt.Sprintf(sys.PutMetadataUrl, os.Getenv(sys.EsServer), name, version)
	request, _ := http.NewRequest(http.MethodPut, url, strings.NewReader(doc))
	r, e := client.Do(request)
	if e != nil {
		return e
	}
	if r.StatusCode == http.StatusConflict {
		return PutMetadata(name, version+1, size, hash)
	}
	if r.StatusCode != http.StatusCreated {
		result, _ := ioutil.ReadAll(r.Body)
		return fmt.Errorf(sys.FailToPutMetadata, r.StatusCode, string(result))
	}
	return nil
}

/*版本号加一*/
func AddVersion(name, hash string, size int64) error {
	// 获取目前最新的版本
	version, e := SearchLatestVersion(name)
	if e != nil {
		return e
	}
	// 创建一个最新的版本号
	return PutMetadata(name, version.Version+1, size, hash)
}

/*搜索对象的全部版本*/
func SearchAllVersions(name string, from, size int) ([]Metadata, error) {
	// 不指定名字时则搜索全部对象的全部版本,指定名字时则搜索某个对象的全部版本
	url := fmt.Sprintf(sys.SearchAllVersionsUrl, os.Getenv(sys.EsServer), from, size)
	if name != "" {
		url += "&q=name:" + name
	}
	r, e := http.Get(url)
	if e != nil {
		return nil, e
	}
	metas := make([]Metadata, 0)
	result, _ := ioutil.ReadAll(r.Body)
	var sr searchResult
	json.Unmarshal(result, &sr)
	for i := range sr.Hits.Hits {
		metas = append(metas, sr.Hits.Hits[i].Source)
	}
	return metas, nil
}

/*删除指定的版本*/
func DelMetadata(name string, version int) {
	client := http.Client{}
	url := fmt.Sprintf(sys.DelMetadataUrl, os.Getenv(sys.EsServer), name, version)
	request, _ := http.NewRequest(http.MethodDelete, url, nil)
	client.Do(request)
}

type Bucket struct {
	Key         string
	Doc_count   int
	Min_version struct {
		Value float32
	}
}

type aggregateResult struct {
	Aggregations struct {
		Group_by_name struct {
			Buckets []Bucket
		}
	}
}

/*搜索版本状态*/
func SearchVersionStatus(min_doc_count int) ([]Bucket, error) {
	client := http.Client{}
	url := fmt.Sprintf(sys.SearchVersionStatusUrl, os.Getenv(sys.EsServer))
	body := fmt.Sprintf(sys.SearchVersionStatusJson, min_doc_count)
	request, _ := http.NewRequest(http.MethodGet, url, strings.NewReader(body))
	r, e := client.Do(request)
	if e != nil {
		return nil, e
	}
	b, _ := ioutil.ReadAll(r.Body)
	var ar aggregateResult
	json.Unmarshal(b, &ar)
	return ar.Aggregations.Group_by_name.Buckets, nil
}

func HasHash(hash string) (bool, error) {
	url := fmt.Sprintf(sys.HasHashUrl, os.Getenv(sys.EsServer), hash)
	r, e := http.Get(url)
	if e != nil {
		return false, e
	}
	b, _ := ioutil.ReadAll(r.Body)
	var sr searchResult
	json.Unmarshal(b, &sr)
	return sr.Hits.Total != 0, nil
}

func SearchHashSize(hash string) (size int64, e error) {
	url := fmt.Sprintf(sys.SearchHashSizeUrl, os.Getenv(sys.EsServer), hash)
	r, e := http.Get(url)
	if e != nil {
		return
	}
	if r.StatusCode != http.StatusOK {
		e = fmt.Errorf(sys.FailToSearchHashSize, r.StatusCode)
		return
	}
	result, _ := ioutil.ReadAll(r.Body)
	var sr searchResult
	json.Unmarshal(result, &sr)
	if len(sr.Hits.Hits) != 0 {
		size = sr.Hits.Hits[0].Source.Size
	}
	return
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-10-25 12:36:07  更:2021-10-25 12:37:22 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 2:56:32-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码