当前位置: 首页 > news >正文

优购物官方网站直播中国建设银行个人网上登录入口

优购物官方网站直播,中国建设银行个人网上登录入口,创建网站域名多少钱,wordpress倒序阅读MapReduce 写在前面 身为大数据专业的学生#xff0c;其实大学我也多多少少接触过mapreduce#xff0c;但是当时觉得这玩意太老了#xff0c;觉得这和php一样会被时代淘汰。只能说当时确实太年轻了#xff0c;没有好好珍惜那时候的学习资源… 现在回过头来看mapreduce其实大学我也多多少少接触过mapreduce但是当时觉得这玩意太老了觉得这和php一样会被时代淘汰。只能说当时确实太年轻了没有好好珍惜那时候的学习资源… 现在回过头来看mapreduce发现技术这东西和语言不一样技术万变不离其中而语言只是实现技术的一种方法而已用什么语言其实并不重要。 原论文地址MapReduce: Simplified Data Processing on Large Clusters 总览 这次 lab1 的 mapreduce其实是在 搜索引擎tangseng 的时候需要用来构建倒排索引。所以会和课程上所要求的不太一样这里也没有使用rpc调用而是为了与项目统一便改用了grpc进行调用。 这里需要注意几点 不同的Map任务之间不会进行通信不同的Reduce任务之间也不会发生任何信息交换所有的数据交换都是通过MapReduce框架自身去实现的 那么如何对 map tasks 和 reduce tasks 进行合理的协调呢这里我们就要引入两个角色master 和 worker在原论文中对这两者的并没有非常明确的定义但我们可以摘录并提炼原论文对这两个角色的描述 master : The master picks idle workers and assigns each one a map task or a reduce task. worker : The map worker who is assigned a map task reads the contents of the corresponding input split. The reduce worker iterates over the sorted intermediate data and for each unique intermediate key encountered, it passes the key and the corresponding set of intermediate values to the user’s Reduce function 这里我们先说一下几个状态枚举值 idle 空闲状态in-progress 进行状态completed 完成状态 这三个枚举值代表着每一个 map task 和 reduce task 的状态标识着这些 task 是未开始进行中还是已完成。 那么 master 其实就是选择空闲的 worker 节点为每一个空闲的 worker 节点分配 map task 或者 reduce task。而 worker 看似分成了 map worker 和 reduce worker但其实这两个 worker 都是一样只是看 master 分配的是 map task 还是 reduce task。这样我们的 map 和 reduce 的数据传送就非常清晰了。 接下来我们来详细讲解一下这几个重要的角色 Worker 首先我们先定义一个 MapReduce 的任务也就是我们 worker 需要用到参数 type MapReduceTask struct {Input string json:input // 输入的文件TaskState State json:task_state // 状态NReducer int json:n_reducer // reducer 数量TaskNumber int json:task_number // 任务数量Intermediates []string json:intermediates // map 之后的文件存储地址Output string json:output // output的输出地址 }接着再定义 State 枚举值 type MasterTaskStatus intconst (Idle MasterTaskStatus iota 1 // 未开始InProgress // 进行中Completed // 已完成 )接下来我们的 Worker 函数就很简单了 func Worker(ctx context.Context, mapf func(string, string) []*types.KeyValue, reducef func(string, []string) *roaring.Bitmap) {// 启动workerfor {task, err : getTask(ctx) // worker从master获取任务if err ! nil {log.LogrusObj.Error(Worker-getTask, err)return}// 拿到task之后根据task的statemap task交给mapper reduce task交给reducer// 额外加两个state让 worker 等待 或者 直接退出switch task.TaskState {case int64(types.Map):mapper(ctx, task, mapf)case int64(types.Reduce):reducer(ctx, task, reducef)case int64(types.Wait):time.Sleep(5 * time.Second)case int64(types.Exit):returndefault:return}} }至于 mapper 和 reducer 如何实现的先桥豆麻袋一下下文在 map 和 reduce 中会给出答案如何从 master 中拿到 task 呢这就涉及到 worker 和 master 的通信。本来打算用 RPC 通信的但为了项目的整体统一还是用了 gRPC 。 创建一个proto文件 syntaxproto3; option go_package /index_platform;;message MapReduceTask{// inject_tag:form:input uri:inputstring input 1;// inject_tag:form:task_state uri:task_stateint64 task_state 2;// inject_tag:form:n_reducer uri:n_reducerint64 n_reducer 3;// inject_tag:form:task_number uri:task_numberint64 task_number 4;// inject_tag:form:intermediates uri:intermediatesrepeated string intermediates 5;// inject_tag:form:output uri:outputstring output 6; }message MasterTaskCompletedResp {// inject_tag:form:code uri:codeint64 code1;// inject_tag:form:message uri:messagestring message2; }service MapReduceService {rpc MasterAssignTask(MapReduceTask) returns (MapReduceTask);rpc MasterTaskCompleted(MapReduceTask) returns (MasterTaskCompletedResp); }定义两个 RPC 函数MasterAssignTask 用来接受 master 分配的 task MasterTaskCompleted 完成 task 之后对这个 task 进行标识意味着该任务结束。 所以我们 worker 接受任务的通信如下 func getTask(ctx context.Context) (resp *mapreduce.MapReduceTask, err error) {// worker从master获取任务taskReq : mapreduce.MapReduceTask{}resp, err rpc.MapReduceClient.MasterAssignTask(ctx, taskReq)return }当完成任务时通过gRPC发送给master func TaskCompleted(ctx context.Context, task *mapreduce.MapReduceTask) (reply *mapreduce.MasterTaskCompletedResp, err error) {// 通过RPC把task信息发给masterreply, err rpc.MapReduceClient.MasterTaskCompleted(ctx, task)return }那么 master 是如何分配任务的接下来我们来介绍一下 master 节点。 Master 我们定义这么一个 Master 服务的结构体 type MasterSrv struct {TaskQueue chan *types.MapReduceTask // 等待执行的taskTaskMeta map[int]*types.MasterTask // 当前所有task的信息MasterPhase types.State // Master的阶段NReduce int // Reduce的数量InputFiles []string // 输入的文件Intermediates [][]string // Map任务产生的R个中间文件的信息mapreduce.UnimplementedMapReduceServiceServer // gRPC服务实现接口 }那么当我们 New 一个 Master 服务的时候顺便创建 map tasks 任务 func NewMaster(files []string, nReduce int) *MasterSrv {m : MasterSrv{TaskQueue: make(chan *types.MapReduceTask, int(math.Max(float64(nReduce), float64(len(files))))),TaskMeta: map[int]*types.MasterTask{},MasterPhase: types.Map,NReduce: nReduce,InputFiles: files,Intermediates: make([][]string, nReduce),}m.createMapTask()return m }创建 map task 任务 func (m *MasterSrv) createMapTask() {// 把输入的files都形成一个task元数据塞到queue中for idx, filename : range m.InputFiles { taskMeta : types.MapReduceTask{Input: filename,TaskState: types.Map, // map节点NReducer: m.NReduce,TaskNumber: idx,}m.TaskQueue - taskMetam.TaskMeta[idx] types.MasterTask{TaskStatus: types.Idle, // 状态为 idle 等待worker节点来领取 taskTaskReference: taskMeta,}} }创建 reduce task 任务 func (m *MasterSrv) createReduceTask() {m.TaskMeta map[int]*types.MasterTask{}for idx, files : range m.Intermediates {taskMeta : types.MapReduceTask{TaskState: types.Reduce, // reduce 阶段NReducer: m.NReduce,TaskNumber: idx,Intermediates: files,}m.TaskQueue - taskMetam.TaskMeta[idx] types.MasterTask{TaskStatus: types.Idle, // 找到空闲的 workerTaskReference: taskMeta,}} }MasterAssignTask 等待 worker 来领取 task func (m *MasterSrv) MasterAssignTask(ctx context.Context, req *mapreduce.MapReduceTask) (reply *mapreduce.MapReduceTask, err error) {mu.Lock()defer mu.Unlock()task : types.MapReduceTask{Input: req.Input,TaskState: types.State(req.TaskState),NReducer: int(req.NReducer),TaskNumber: int(req.TaskNumber),Intermediates: req.Intermediates,Output: req.Output,}if len(m.TaskQueue) 0 {// 如果queue中还有任务的话就发出去*task *-m.TaskQueuem.TaskMeta[task.TaskNumber].TaskStatus types.InProgress // 修改worker的状态为进行中m.TaskMeta[task.TaskNumber].StartTime time.Now() // 记录task的启动时间} else if m.MasterPhase types.Exit {*task types.MapReduceTask{TaskState: types.Exit,}} else {// 没有task就让worker等待*task types.MapReduceTask{TaskState: types.Wait}}// 返回该任务的状态因为发出去就是给task了这个状态已经改变了worker可以工作了reply mapreduce.MapReduceTask{Input: task.Input,TaskState: int64(task.TaskState),NReducer: int64(task.NReducer),TaskNumber: int64(task.TaskNumber),Intermediates: task.Intermediates,Output: task.Output,}return }那么如果 task 把任务都做完了master 应该怎么回应呢 func (m *MasterSrv) MasterTaskCompleted(ctx context.Context, req *mapreduce.MapReduceTask) (resp *mapreduce.MasterTaskCompletedResp, err error) {resp new(mapreduce.MasterTaskCompletedResp)resp.Code e.ERRORresp.Message map finish successfully// 更新task状态if req.TaskState ! int64(m.MasterPhase) || m.TaskMeta[int(req.TaskNumber)].TaskStatus types.Completed {// 因为worker写在同一个文件这次盘上对于重复的结果要丢弃return}m.TaskMeta[int(req.TaskNumber)].TaskStatus types.Completederr m.processTaskResult(req) // always success haha and hope u so :)if err ! nil {resp.Code e.ERRORresp.Message map finish failedreturn}return }处理任务的结果如果是 map 完成后就变成 reduce 阶段reduce 之后就是 all done. // processTaskResult 处理任务结果 func (m *MasterSrv) processTaskResult(task *mapreduce.MapReduceTask) (err error) {switch task.TaskState {case int64(types.Map):// 收集intermediate信息for reduceTaskId, filePath : range task.Intermediates {m.Intermediates[reduceTaskId] append(m.Intermediates[reduceTaskId], filePath)}if m.allTaskDone() {// 获取所有的map task后进入reduce阶段m.createReduceTask()m.MasterPhase types.Reduce}case int64(types.Reduce):if m.allTaskDone() {// 获得所有的reduce task后进去exit阶段m.MasterPhase types.Exit}}return }介绍完master之后我们具体来看一下map的具体行为。 Map 在 map 中我们抽离出一个 mapper具体的map函数可根据实际情况进行修改然后将map function传入mapper中进行实际的map动作我们读取每一个文件然后把输出的结果都放到 intermediates 中并且根据 task 所设定的 NReducer 也就是 reducer 数 进行hash 将结果均匀分到每个中间文件中。 func mapper(ctx context.Context, task *mapreduce.MapReduceTask, mapf func(string, string) []*types.KeyValue) {// 从文件名读取contentcontent, err : os.ReadFile(task.Input)if err ! nil {log.LogrusObj.Error(mapper, err)return}// 将content交给mapf缓存结果intermediates : mapf(task.Input, string(content))// 缓存后的结果会写到本地磁盘并切成R份// 切分方式是根据key做hashbuffer : make([][]*types.KeyValue, task.NReducer)for _, intermediate : range intermediates {slot : ihash(intermediate.Key) % task.NReducerbuffer[slot] append(buffer[slot], intermediate)}mapOutput : make([]string, 0)for i : 0; i int(task.NReducer); i {mapOutput append(mapOutput, writeToLocalFile(int(task.TaskNumber), i, buffer[i]))}// R个文件的位置发送给mastertask.Intermediates mapOutput_, err TaskCompleted(ctx, task) // 完成后给master发送消息map阶段结束if err ! nil {fmt.Println(mapper-TaskCompleted, err)}return }具体的 Map方法由于是用于搜索引擎所以这里是建立倒排索引 func Map(filename string, contents string) (res []*types.KeyValue) {res make([]*types.KeyValue, 0)lines : strings.Split(contents, \r\n) // 分行var inputData *model.InputDatafor _, line : range lines[1:] {docStruct, _ : doc2Struct(line) // 字符串转 doc structtokens, err : analyzer.GseCutForBuildIndex(docStruct.DocId, docStruct.Body)if err ! nil {return}for _, v : range tokens {res append(res, types.KeyValue{Key: v.Token, Value: cast.ToString(v.DocId)}) // token:docId 倒排索引}}return }至此map就已经完成了是不是很简单其实具体的map和reduce并不难难的是如何平衡调度接下来我们来看看reduce是如何怎么的。 Reduce 和map一样我们抽离出一个reducer然后把具体的 reduce 传进去当然还有一个shuffle过程这里进行排序会减少后面的reduce计算。可以少计算几次。 func reducer(ctx context.Context, task *mapreduce.MapReduceTask, reducef func(string, []string) *roaring.Bitmap) {// 先从filepath读取intermediate的KeyValueintermediate : *readFromLocalFile(task.Intermediates)// 根据kv排序 shuffle 过程sort.Sort(types.ByKey(intermediate))dir, _ : os.Getwd()outName : fmt.Sprintf(%s/mr-tmp-%d.%s,dir, task.TaskNumber, consts.InvertedBucket)invertedDB : storage.NewInvertedDB(outName)output : roaring.NewBitmap()var outByte []bytei : 0for i len(intermediate) {// 将相同的key放在一起分组合并j : i 1for j len(intermediate) intermediate[j].Key intermediate[i].Key {j}var values []stringfor k : i; k j; k {values append(values, intermediate[k].Value)}// 交给reducef拿到结果output reducef(intermediate[i].Key, values)// 落倒排索引库outByte, _ output.MarshalBinary()_ invertedDB.StoragePostings(intermediate[i].Key, outByte)i j}task.Output outName_, err : TaskCompleted(ctx, task) // 完成后给master发送消息reduce阶段结束if err ! nil {fmt.Println(reducer-TaskCompleted, err)return} }具体的Reduce其实就是把相同的key的value聚合在一起。比如 after map: {apple:1} {apple:2} {poizon:3}after reduce: {apple:{1,2}} {poizon:{3}}具体实现如下所示 func Reduce(key string, values []string) *roaring.Bitmap {docIds : roaring.New()for _, v : range values {docIds.AddInt(cast.ToInt(v))}return docIds }最终 output 输出 以上就是我对6.824这个课程的lab1的所有理解了并且运用到了 tangseng 搜索引擎中。 具体代码实现地址在 https://github.com/CocaineCong/tangseng/app/mapreduce 中。
http://www.lakalapos1.cn/news/9124/

相关文章:

  • 软件开发班网站关键词优化外包服务
  • 订阅号可以做微网站wordpress 自动退出
  • .电子商务网站建设的核心是学校网站建设计划
  • 不会建网站网站页尾信息
  • 网上做代卖的网站网站开发运营产品经理招聘
  • 防做网站视频新国标小区网络建设
  • 国内网站欣赏工程公司简介
  • 网站制作文章标签研发网站建设报价
  • 查询网站怎么做wordpress默认主题下载
  • 支付网站开发费分录wordpress 双主题
  • 百媚导航app入口app入口seo 技术优化
  • 免费做团购网站的软件好达州网站建设的公司
  • 购物网站的推广怎么找客户资源
  • 做服装零售上什么网站做我的世界缩略图的网站
  • 网站首页页面设计模板小程序开发制作工具
  • 加强网站建设 通知网站域名实名认证通知
  • 短视频网站php源码免费WordPress中国市场规模
  • 食堂承包技术支持 东莞网站建设商务网站建设注意事项
  • 建设银行网站功能程序员必备软件
  • 深圳网站模板建站河北关键词搜索排名公司
  • 网站建设经销商外贸网站风格
  • 高端网站建设价钱怎么看视频号的网址
  • 网站没有备案时网站建站管理系统
  • wordpress网站搭建教程视频网站收录是怎么回事
  • ps网站设计怎么做wordpress xiu底部广告
  • 网站空间数据库需要多大wordpress主题页脚如何修改
  • 大侠wordpress深圳seo优化关键词排名
  • 手机零售网站 关键词网站发展历程
  • 西安网站建设开发熊掌号图纸之家
  • 个人 网站 备案网站如何防止黑客攻击