最近在学习golang, 写了一个简单的tcp服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package main

import (
"fmt"
"net"
"time"
"os"
"os/signal"
"sync"
)

const (
WorkerCount = 2
)

type Task struct {
Id int32
Message string
}

var wg sync.WaitGroup
var taskChannel = make(chan Task)
var signChannel = make(chan os.Signal, 1)
var exitChanel = make(chan int)

func main() {
go installSign()
tcpAddr, err := net.ResolveTCPAddr("tcp4", "127.0.0.1:2202")
if err != nil {
panic("解析ip地址失败: " + err.Error())
}
fmt.Println("Listening 127.0.0.1:2202 ....")
listener, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
panic("监听TCP失败: " + err.Error())
}
fmt.Println("Listen success on 127.0.0.1:2202 with tcp4")
defer func() {
fmt.Println("Close listenning ....")
listener.Close()
fmt.Println("Shutdown")
}()

connChannel := make(chan net.Conn)

go accept(listener, connChannel)
go handleConn(connChannel)
go taskDispatch()

for {
select {
case <- signChannel:
fmt.Println("Get shutdown sign")
go notifyGoroutingExit()
goto EXIT
}
}

EXIT:
fmt.Println("Waiting gorouting exit ....")
wg.Wait()
}

func accept(listener * net.TCPListener, connChannel chan net.Conn) {
for {
connection, err := listener.AcceptTCP()
if err != nil {
fmt.Println("Accept 失败: " + err.Error())
} else {
connChannel <- connection
}
}
}

func handleConn(connChannel chan net.Conn) {
fmt.Println("Wating connection ....")
for {
select {
case conn := <- connChannel:
remoteAddr := conn.RemoteAddr()
fmt.Println("Client " + remoteAddr.String() + " connected")
readConn(&conn)
}
}

}

func readConn(conn *net.Conn) {
for {
(*conn).SetReadDeadline(time.Now().Add(5 * time.Second))
buf := make([]byte, 1024)
_, err := (*conn).Read(buf)
if err != nil {
fmt.Println("Read connection error: " + err.Error())
if err.Error() == "EOF" {
(*conn).Close();
fmt.Println("Close connection " + (*conn).RemoteAddr().String())
break
}
}
if buf != nil {
fmt.Printf("Read message from connect: %s", string(buf))
writeConn(conn, buf)
var task Task
task.Id = 1
task.Message = string(buf)
taskChannel <- task
}
}
}

func writeConn(conn *net.Conn, msg []byte) {
_, err := (*conn).Write(msg)
if err != nil {
fmt.Println("Write connection error: " + err.Error())
if err.Error() == "EOF" {
(*conn).Close();
fmt.Println("Close connection " + (*conn).RemoteAddr().String())
}
}
}


func taskDispatch() {
fmt.Println("Init task moniter ....")
for i := 0; i < WorkerCount; i ++ {
go loop()
}
fmt.Println("Init task moniter DONE!")
}

func loop() {
ticker := time.NewTicker(10 * time.Second)
wg.Add(1)
defer func() {
defer wg.Done()
defer ticker.Stop()
}()
for {
fmt.Println("Wating task ....")
select {
case task := <- taskChannel:
fmt.Println("Task comming: " + task.Message)
break;
case <- exitChanel:
fmt.Println("Woker get exit sign")
goto STOP
//default:
}
// Epoll, 去读任务数据, 不需要处理超时的情况
//select {
// case <- ticker.C:
// fmt.Println(time.Now().String() + " No task after 10 second")
// break;
//}
}
STOP:
//TODO: Clear undo task
}

func installSign() {
signal.Notify(signChannel, os.Interrupt, os.Kill)
}

func notifyGoroutingExit() {
for i := 0; i < WorkerCount; i ++ {
exitChanel <- 1
}
}