golang高并发限流操作 ping / telnet
需求
当需要同时ping/telnet多个ip时,可以通过引入ping包/telnet包实现,也可以通过go调用cmd命令实现,不过后者调用效率较差,所以这里选择ping包和telnet包
还有就是高并发的问题,可以通过shell脚本或者go实现高并发,所以我选择的用go自带的协程实现,但是如果要同时处理1000+个ip,考虑到机器的性能,需要ratelimit控制开辟的go协程数量,这里主要写一下我的建议和淌过的坑
ping
参考链接: https://github.com/sparrc/go-ping
import "github.com/sparrc/go-ping" import "time" func (p *Ping) doPing(timeout time.Duration, count int, ip string) (err error) { pinger, cmdErr := ping.NewPinger(ip) if cmdErr != nil { glog.Error("Failed to ping " + p.ipAddr) err = cmdErr return } pinger.Count = count pinger.Interval = time.Second pinger.Timeout = timeout // true的话,代表是标准的icmp包,false代表可以丢包类似udp pinger.SetPrivileged(false) // 执行 pinger.Run() // 获取ping后的返回信息 stats := pinger.Statistics() //延迟 latency = float64(stats.AvgRtt) // 标准的往返总时间 jitter = float64(stats.StdDevRtt) //丢包率 packetLoss = stats.PacketLoss return }
注意: pinger.Run() 这里执行的时候是阻塞的,如果并发量大的时候,程序会卡死在这里,所以当有高并发的需求时建议如下处理:
go pinger.Run()
time.Sleep(timeout)
telnet
package main import ( "github.com/reiver/go-telnet" ) func doTelnet(ip string, port int) { var caller telnet.Caller = telnet.StandardCaller address := ip + ":"+ strconv.Itoa(port) // DialToAndCall 检查连通性并且调用 telnet.DialToAndCall(address, caller) } }
bug出现报错:
lookup tcp/: nodename nor servname provided, or not known
解决:
修改string(port)为strconv.Itoa(port)
DialToAndCall这种方式telnet无法设置超时时间,默认的超时时间有1分钟,所以使用DialTimeout这个方式实现telnet
import "net" func doTelnet(ip string, ports []string) map[string]string { // 检查 emqx 1883, 8083, 8080, 18083 端口 results := make(map[string]string) for _, port := range ports { address := net.JoinHostPort(ip, port) // 3 秒超时 conn, err := net.DialTimeout("tcp", address, 3*time.Second) if err != nil { results[port] = "failed" } else { if conn != nil { results[port] = "success" _ = conn.Close() } else { results[port] = "failed" } } } return results }
shell高并发
本质就是读取ip.txt文件里的ip,然后调用ping方法,实现高并发也是借助&遍历所有的ip然后同一交给操作系统去处理高并发
while read ip do { doPing(ip) } & done < ip.txt
go高并发限速
import ( "context" "fmt" "log" "time" "sync" "golang.org/x/time/rate" ) func Limit(ips []string)([]string, []string, error) { //第一个参数是每秒钟最大的并发数,第二个参数是桶的容量,第一次的时候每秒可执行的数量就是桶的容量,建议这两个值都写成一样的 r := rate.NewLimiter(10, 10) ctx := context.Background() wg := sync.WaitGroup{} wg.Add(len(ips)) lock := sync.Mutex{} var success []string var fail []string defer wg.Done() for _,ip:=range ips{ //每次消耗2个,放入一个,消耗完了还会放进去,如果初始是5个,所以这段代码再执行到第4次的时候筒里面就空了,如果当前不够取两个了,本次就不取,再放一个进去,然后返回false err := r.WaitN(ctx, 2) if err != nil { log.Fatal(err) } go func(ip string) { defer func() { wg.Done() }() err := doPing(time.Second, 2, ip) lock.Lock() defer lock.Unlock() if err != nil { fail = append(fail, ip) return } else { success = append(success, ip) } }(ip) } // wait等待所有go协程结束 wg.wait() return success,fail,nil } func main() { ips := [2]string{"192.168.1.1","192.168.1.2"} success,fail,err := Limit(ips) if err != nil { fmt.Printf("ping error") } }
这里注意一个并发实现的坑,在for循环里使用goroutine时要把遍历的参数传进去才能保证每个遍历的参数都被执行,否则只能执行一次
(拓展)管道、死锁
先看个例子:
func main() { go print() // 启动一个goroutine print() } func print() { fmt.Println("*******************") }
输出结果:
*******************
没错,只有一行,因为当go开辟一个协程想去执行print方法时,主函数已经执行完print并打印出来,所以goroutine还没有机会执行程序就已经结束了,解决这个问题可是在主函数里加time.sleep让主函数等待goroutine执行完,也可以使用WaitGroup.wait等待goroutine执行完,还有一种就是信道
信道分无缓冲信道和缓冲信道
无缓冲信道
无缓冲信道也就是定义长度为0的信道,存入一个数据,从无缓冲信道取数据,若信道中无数据,就会阻塞,还可能引发死锁,同样数据进入无缓冲信道, 如果没有其他goroutine来拿走这个数据,也会阻塞,记住无缓冲数据并不存储数据
func main() { var channel chan string = make(chan string) go func(message string) { channel<- message // 存消息 }("Ping!") fmt.Println(<-messages) // 取消息 }
缓存信道
顾名思义,缓存信道可以存储数据,goroutine之间不会发生阻塞,for循环读取信道中的数据时,一定要判断当管道中不存在数据时的情况,否则会发生死锁,看个例子
channel := make(chan int, 3) channel <- 1 channel <- 2 channel <- 3 // 显式关闭信道 close(channel) for v := range channel { fmt.Println(v) // 如果现有数据量为0,跳出循环,与显式关闭隧道效果一样,选一个即可 if len(ch) <= 0 { break } }
但是这里有个问题,信道中数据是可以随时存入的,所以我们遍历的时候无法确定目前的个数就是信道的总个数,所以推荐使用select监听信道
// 创建一个计时信道 timeout := time.After(1 * time.Second) // 监听3个信道的数据 select { case v1 := <- c1: fmt.Printf("received %d from c1", v1) case v2 := <- c2: fmt.Printf("received %d from c2", v2) case v3 := <- c3: fmt.Printf("received %d from c3", v3) case <- timeout: is_timeout = true // 超时 break } }
以上为个人经验,希望能给大家一个参考,也希望大家多多支持。如有错误或未考虑完全的地方,望不吝赐教。
下一篇:golang接口IP限流,IP黑名单,IP白名单的实例