正文
Go语言异步排序 go语言map排序
小程序:扫一扫查出行
【扫一扫了解最新限行尾号】
复制小程序
【扫一扫了解最新限行尾号】
复制小程序
Golang kafka简述和操作(sarama同步异步和消费组)
一、Kafka简述
1. 为什么需要用到消息队列
异步:对比以前的串行同步方式来说,可以在同一时间做更多的事情,提高效率;
解耦:在耦合太高的场景,多个任务要对同一个数据进行操作消费的时候,会导致一个任务的处理因为另一个任务对数据的操作变得及其复杂。
缓冲:当遇到突发大流量的时候,消息队列可以先把所有消息有序保存起来,避免直接作用于系统主体,系统主题始终以一个平稳的速率去消费这些消息。
2.为什么选择kafka呢?
这没有绝对的好坏,看个人需求来选择,我这里就抄了一段他人总结的的优缺点,可见原文
kafka的优点:
1.支持多个生产者和消费者2.支持broker的横向拓展3.副本集机制,实现数据冗余,保证数据不丢失4.通过topic将数据进行分类5.通过分批发送压缩数据的方式,减少数据传输开销,提高吞高量6.支持多种模式的消息7.基于磁盘实现数据的持久化8.高性能的处理信息,在大数据的情况下,可以保证亚秒级的消息延迟9.一个消费者可以支持多种topic的消息10.对CPU和内存的消耗比较小11.对网络开销也比较小12.支持跨数据中心的数据复制13.支持镜像集群
kafka的缺点:
1.由于是批量发送,所以数据达不到真正的实时2.对于mqtt协议不支持3.不支持物联网传感数据直接接入4.只能支持统一分区内消息有序,无法实现全局消息有序5.监控不完善,需要安装插件6.需要配合zookeeper进行元数据管理7.会丢失数据,并且不支持事务8.可能会重复消费数据,消息会乱序,可用保证一个固定的partition内部的消息是有序的,但是一个topic有多个partition的话,就不能保证有序了,需要zookeeper的支持,topic一般需要人工创建,部署和维护一般都比mq高
3. Golang 操作kafka
3.1. kafka的环境
网上有很多搭建kafka环境教程,这里就不再搭建,就展示一下kafka的环境,在kubernetes上进行的搭建,有需要的私我,可以发yaml文件
3.2. 第三方库
github.com/Shopify/sarama // kafka主要的库*github.com/bsm/sarama-cluster // kafka消费组
3.3. 消费者
单个消费者
funcconsumer(){varwg sync.WaitGroup consumer, err := sarama.NewConsumer([]string{"172.20.3.13:30901"},nil)iferr !=nil{ fmt.Println("Failed to start consumer: %s", err)return} partitionList, err := consumer.Partitions("test0")//获得该topic所有的分区iferr !=nil{ fmt.Println("Failed to get the list of partition:, ", err)return}forpartition :=rangepartitionList { pc, err := consumer.ConsumePartition("test0",int32(partition), sarama.OffsetNewest)iferr !=nil{ fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)return} wg.Add(1)gofunc(sarama.PartitionConsumer){//为每个分区开一个go协程去取值formsg :=rangepc.Messages() {//阻塞直到有值发送过来,然后再继续等待fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value)) }deferpc.AsyncClose() wg.Done() }(pc) } wg.Wait()}funcmain(){ consumer()}
消费组
funcconsumerCluster(){ groupID :="group-1"config := cluster.NewConfig() config.Group.Return.Notifications =trueconfig.Consumer.Offsets.CommitInterval =1* time.Second config.Consumer.Offsets.Initial = sarama.OffsetNewest//初始从最新的offset开始c, err := cluster.NewConsumer(strings.Split("172.20.3.13:30901",","),groupID, strings.Split("test0",","), config)iferr !=nil{ glog.Errorf("Failed open consumer: %v", err)return}deferc.Close()gofunc(c *cluster.Consumer){ errors := c.Errors() noti := c.Notifications()for{select{caseerr := -errors: glog.Errorln(err)case-noti: } } }(c)formsg :=rangec.Messages() { fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value)) c.MarkOffset(msg,"")//MarkOffset 并不是实时写入kafka,有可能在程序crash时丢掉未提交的offset}}funcmain(){goconsumerCluster()}
3.4. 生产者
同步生产者
packagemainimport("fmt""github.com/Shopify/sarama")funcmain(){ config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll//赋值为-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成。config.Producer.Partitioner = sarama.NewRandomPartitioner//写到随机分区中,默认设置8个分区config.Producer.Return.Successes =truemsg := sarama.ProducerMessage{} msg.Topic =`test0`msg.Value = sarama.StringEncoder("Hello World!") client, err := sarama.NewSyncProducer([]string{"172.20.3.13:30901"}, config)iferr !=nil{ fmt.Println("producer close err, ", err)return}deferclient.Close() pid, offset, err := client.SendMessage(msg)iferr !=nil{ fmt.Println("send message failed, ", err)return} fmt.Printf("分区ID:%v, offset:%v \n", pid, offset)}
异步生产者
funcasyncProducer(){ config := sarama.NewConfig() config.Producer.Return.Successes =true//必须有这个选项config.Producer.Timeout =5* time.Second p, err := sarama.NewAsyncProducer(strings.Split("172.20.3.13:30901",","), config)deferp.Close()iferr !=nil{return}//这个部分一定要写,不然通道会被堵塞gofunc(p sarama.AsyncProducer){ errors := p.Errors() success := p.Successes()for{select{caseerr := -errors:iferr !=nil{ glog.Errorln(err) }case-success: } } }(p)for{ v :="async: "+ strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000)) fmt.Fprintln(os.Stdout, v) msg := sarama.ProducerMessage{ Topic: topics, Value: sarama.ByteEncoder(v), } p.Input() - msg time.Sleep(time.Second *1) }}funcmain(){goasyncProducer()select{ }}
3.5. 结果展示-
同步生产打印:
分区ID:0,offset:90
消费打印:
Partition:0,Offset:90,key:,value:Hello World!
异步生产打印:
async:7272async:7616async:998
消费打印:
Partition:0,Offset:91,key:,value:async:7272Partition:0,Offset:92,key:,value:async:7616Partition:0,Offset:93,key:,value:async:998
go语言:数组
数组是一个由 固定长度 的 特定类型元素 组成的序列,一个数组可以由零个或多个元素组成。 数组是值类型
数组的每个元素都可以通过索引下标来访问,索引下标的范围是从0开始到数组长度减1的位置,内置函数 len() 可以返回数组中元素的个数。
2.类型的打印,结果的第二种打印方式
3.对元素的修改或者赋值
4.判断数组是否相等:长度、类型
4.数组的地址:连续存储的空间
5.数组的赋值、地址、取值
6.数组的默认值
7.数组的初始化
8.数组的逆置
9.求数组的最大值、最小值、平均值
10.对数组字符串进行连接
11.冒泡排序法的实现
12.数组做函数的参数
13.二维数组:赋值和地址
14.二维数组:打印和输出
15. 指针数组,每一个元素都是地址
17.数组的内存分配
Go语言 排序与搜索切片
Go语言标准库中提供了sort包对整型,浮点型,字符串型切片进行排序,检查一个切片是否排好序,使用二分法搜索函数在一个有序切片中搜索一个元素等功能。
关于sort包内的函数说明与使用,请查看
在这里简单讲几个sort包中常用的函数
在Go语言中,对字符串的排序都是按照字节排序,也就是说在对字符串排序时是区分大小写的。
二分搜索算法
Go语言中提供了一个使用二分搜索算法的sort.Search(size,fn)方法:每次只需要比较㏒₂n个元素,其中n为切片中元素的总数。
sort.Search(size,fn)函数接受两个参数:所处理的切片的长度和一个将目标元素与有序切片的元素相比较的函数,该函数是一个闭包,如果该有序切片是升序排列,那么在判断时使用 有序切片的元素 = 目标元素。该函数返回一个int值,表示与目标元素相同的切片元素的索引。
在切片中查找出某个与目标字符串相同的元素索引
golang语言:for循环里面包含一个函数体的执行循序
go func是golang的协程,就像多线程,异步执行,所以,代码段1执行完3遍后,可能3次协成刚执行完。在代码段1中如果sleep一下应该就能给协程时间执行了。
Go语言如何给字符串排序
因为char *strings[]不是指针而是指针数组,那么
temp = strings[top];
strings[top] = strings[seek];
strings[seek] = temp;
这种交换交换的就是主调函数中的数组中的指针,把指向字符串的指针顺序改变了,当然按次序输出就达到排序目的了……
GO语言学习系列八——GO函数(func)的声明与使用
GO是编译性语言,所以函数的顺序是无关紧要的,为了方便阅读,建议入口函数 main 写在最前面,其余函数按照功能需要进行排列
GO的函数 不支持嵌套,重载和默认参数
GO的函数 支持 无需声明变量,可变长度,多返回值,匿名,闭包等
GO的函数用 func 来声明,且左大括号 { 不能另起一行
一个简单的示例:
输出为:
参数:可以传0个或多个值来供自己用
返回:通过用 return 来进行返回
输出为:
上面就是一个典型的多参数传递与多返回值
对例子的说明:
按值传递:是对某个变量进行复制,不能更改原变量的值
引用传递:相当于按指针传递,可以同时改变原来的值,并且消耗的内存会更少,只有4或8个字节的消耗
在上例中,返回值 (d int, e int, f int) { 是进行了命名,如果不想命名可以写成 (int,int,int){ ,返回的结果都是一样的,但要注意:
当返回了多个值,我们某些变量不想要,或实际用不到,我们可以使用 _ 来补位,例如上例的返回我们可以写成 d,_,f := test(a,b,c) ,我们不想要中间的返回值,可以以这种形式来舍弃掉
在参数后面以 变量 ... type 这种形式的,我们就要以判断出这是一个可变长度的参数
输出为:
在上例中, strs ...string 中, strs 的实际值是b,c,d,e,这就是一个最简单的传递可变长度的参数的例子,更多一些演变的形式,都非常类似
在GO中 defer 关键字非常重要,相当于面相对像中的析构函数,也就是在某个函数执行完成后,GO会自动这个;
如果在多层循环中函数里,都定义了 defer ,那么它的执行顺序是先进后出;
当某个函数出现严重错误时, defer 也会被调用
输出为
这是一个最简单的测试了,当然还有更复杂的调用,比如调试程序时,判断是哪个函数出了问题,完全可以根据 defer 打印出来的内容来进行判断,非常快速,这种留给你们去实现
一个函数在函数体内自己调用自己我们称之为递归函数,在做递归调用时,经常会将内存给占满,这是非常要注意的,常用的比如,快速排序就是用的递归调用
本篇重点介绍了GO函数(func)的声明与使用,下一篇将介绍GO的结构 struct
Go语言异步排序的介绍就聊到这里吧,感谢你花时间阅读本站内容,更多关于go语言map排序、Go语言异步排序的信息别忘了在本站进行查找喔。