[转]golang 实现延迟消息原理与方法

实现延迟消息最主要的两个结构:

环形队列:通过golang中的数组实现,分成3600个slot。

任务集合:通过map[key]*Task,每个slot一个map,map的值就是我们要执行的任务。

原理图如下:

140030-20170726142613250-1793759044

实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package main;
import (
    "time"
    "errors"
    "fmt"
)
//延迟消息
type DelayMessage struct {
    //当前下标
    curIndex int;
    //环形槽
    slots [3600]map[string]*Task;
    //关闭
    closed chan bool;
    //任务关闭
    taskClose chan bool;
    //时间关闭
    timeClose chan bool;
    //启动时间
    startTime time.Time;
}
//执行的任务函数
type TaskFunc func(args ...interface{});
//任务
type Task struct {
    //循环次数
    cycleNum int;
    //执行的函数
    exec   TaskFunc;
    params []interface{};
}
//创建一个延迟消息
func NewDelayMessage() *DelayMessage {
    dm := &DelayMessage{
        curIndex:  0,
        closed:    make(chan bool),
        taskClose: make(chan bool),
        timeClose: make(chan bool),
        startTime: time.Now(),
    };
    for i := 0; i < 3600; i++ {
        dm.slots[i] = make(map[string]*Task);
    }
    return dm;
}
//启动延迟消息
func (dm *DelayMessage) Start() {
    go dm.taskLoop();
    go dm.timeLoop();
    select {
    case <-dm.closed:
        {
            dm.taskClose <- true;
            dm.timeClose <- true;
            break;
        }
    };
}
//关闭延迟消息
func (dm *DelayMessage) Close() {
    dm.closed <- true;
}
//处理每1秒的任务
func (dm *DelayMessage) taskLoop() {
    defer func() {
        fmt.Println("taskLoop exit");
    }();
    for {
        select {
        case <-dm.taskClose:
            {
                return;
            }
        default:
            {
                //取出当前的槽的任务
                tasks := dm.slots[dm.curIndex];
                if len(tasks) > 0 {
                    //遍历任务,判断任务循环次数等于0,则运行任务
                    //否则任务循环次数减1
                    for k, v := range tasks {
                        if v.cycleNum == 0 {
                            go v.exec(v.params...);
                            //删除运行过的任务
                            delete(tasks, k);
                        else {
                            v.cycleNum--;
                        }
                    }
                }
            }
        }
    }
}
//处理每1秒移动下标
func (dm *DelayMessage) timeLoop() {
    defer func() {
        fmt.Println("timeLoop exit");
    }();
    tick := time.NewTicker(time.Second);
    for {
        select {
        case <-dm.timeClose:
            {
                return;
            }
        case <-tick.C:
            {
                fmt.Println(time.Now().Format("2006-01-02 15:04:05"));
                //判断当前下标,如果等于3599则重置为0,否则加1
                if dm.curIndex == 3599 {
                    dm.curIndex = 0;
                else {
                    dm.curIndex++;
                }
            }
        }
    }
}
//添加任务
func (dm *DelayMessage) AddTask(t time.Time, key string, exec TaskFunc, params []interface{}) error {
    if dm.startTime.After(t) {
        return errors.New("时间错误");
    }
    //当前时间与指定时间相差秒数
    subSecond := t.Unix() - dm.startTime.Unix();
    //计算循环次数
    cycleNum := int(subSecond / 3600);
    //计算任务所在的slots的下标
    ix := subSecond % 3600;
    //把任务加入tasks中
    tasks := dm.slots[ix];
    if _, ok := tasks[key]; ok {
        return errors.New("该slots中已存在key为" + key + "的任务");
    }
    tasks[key] = &Task{
        cycleNum: cycleNum,
        exec:     exec,
        params:   params,
    };
    return nil;
}
func main() {
    //创建延迟消息
    dm := NewDelayMessage();
    //添加任务
    dm.AddTask(time.Now().Add(time.Second*10), "test1"func(args ...interface{}) {
        fmt.Println(args...);
    }, []interface{}{1, 2, 3});
    dm.AddTask(time.Now().Add(time.Second*10), "test2"func(args ...interface{}) {
        fmt.Println(args...);
    }, []interface{}{4, 5, 6});
    dm.AddTask(time.Now().Add(time.Second*20), "test3"func(args ...interface{}) {
        fmt.Println(args...);
    }, []interface{}{"hello""world""test"});
    dm.AddTask(time.Now().Add(time.Second*30), "test4"func(args ...interface{}) {
        sum := 0;
        for arg := range args {
            sum += arg;
        }
        fmt.Println("sum : ", sum);
    }, []interface{}{1, 2, 3});
    //40秒后关闭
    time.AfterFunc(time.Second*40, func() {
        dm.Close();
    });
    dm.Start();
}

测试结果如下:

[go]json处理的一些总结

https://colobu.com/2017/06/21/json-tricks-in-Go/

本文是基于 Go 官方和 https://eager.io/blog/go-and-json/ 进行翻译整理的

JSON 是一种轻量级的数据交换格式,常用作前后端数据交换,Go 在 encoding/json 包中提供了对 JSON 的支持。

序列化

把 Go struct 序列化成 JSON 对象,Go 提供了 Marshal 方法,正如其含义所示表示编排序列化,函数签名如下:

1
func Marshal(v interface{}) ([]byte, error)

举例来说,比如下面的 Go struct:

1
2
3
4
5
type Message struct {
    Name string
    Body string
    Time int64
}

使用 Marshal 序列化:

1
2
3
m := Message{"Alice", "Hello", 1294706395881547000}
b, err := json.Marshal(m) 
fmt.Println(b) //{"Name":"Alice","Body":"Hello","Time":1294706395881547000}

在 Go 中并不是所有的类型都能进行序列化:

  • JSON object key 只支持 string
  • Channel、complex、function 等 type 无法进行序列化
  • 数据中如果存在循环引用,则不能进行序列化,因为序列化时会进行递归
  • Pointer 序列化之后是其指向的值或者是 nil

还需要注意的是:只有 struct 中支持导出的 field 才能被 JSON package 序列化,即首字母大写的 field

反序列化

反序列化函数是 Unmarshal ,其函数签名如下:

1
func Unmarshal(data []byte, v interface{}) error

如果要进行反序列化,我们首先需要创建一个可以接受序列化数据的 Go struct:

1
2
var m Message
err := json.Unmarshal(b, &m)

JSON 对象一般都是小写表示,Marshal 之后 JSON 对象的首字母依然是大写,如果序列化之后名称想要改变如何实现,答案就是 struct tags

Struct Tag

Struct tag 可以决定 Marshal 和 Unmarshal 函数如何序列化和反序列化数据。

指定 JSON filed name

JSON object 中的 name 一般都是小写,我们可以通过 struct tag 来实现:

1
2
3
type MyStruct struct {
    SomeField string `json:"some_field"`
}

SomeField 序列化之后会变成 some_field。

指定 field 是 empty 时的行为

使用 omitempty 可以告诉 Marshal 函数如果 field 的值是对应类型的 zero-value,那么序列化之后的 JSON object 中不包含此 field:

1
2
3
4
5
6
type MyStruct struct {
    SomeField string `json:"some_field,omitempty"`
}

m := MyStruct{}
b, err := json.Marshal(m) //{}

如果 SomeField == “” ,序列化之后的对象就是 {}

跳过 field

Struct tag “-” 表示跳过指定的 filed:

1
2
3
4
5
6
type MyStruct struct {
    SomeField string `json:"some_field"`
    Passwd string `json:"-"`
}
m := MyStruct{}
b, err := json.Marshal(m) //{"some_feild":""}

即序列化的时候不输出,这样可以有效保护需要保护的字段不被序列化。

反序列化任意 JSON 数据

默认的 JSON 只支持以下几种 Go 类型:

  • bool for JSON booleans
  • float64 for JSON numbers
  • string for JSON strings
  • nil for JSON null

在序列化之前如果不知道 JSON 数据格式,我们使用 interface{} 来存储。interface {} 的作用详见本博的其他文章。

有如下的数据格式:

1
b := []byte(`{"Name":"Wednesday","Age":6,"Parents":["Gomez","Morticia"]}`)

如果我们序列化之前不知道其数据格式,我们可以使用 interface{} 来存储我们的 decode 之后的数据:

1
2
var f interface{}
err := json.Unmarshal(b, &f)

反序列化之后 f 应该是像下面这样:

1
2
3
4
5
6
7
8
f = map[string]interface{}{
    "Name": "Wednesday",
    "Age":  6,
    "Parents": []interface{}{
        "Gomez",
        "Morticia",
    },
}

key 是 string,value 是存储在 interface{} 内的。想要获得 f 中的数据,我们首先需要进行 type assertion,然后通过 range 迭代获得 f 中所有的 key :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
m := f.(map[string]interface{})
for k, v := range m {
    switch vv := v.(type) {
    case string:
        fmt.Println(k, "is string", vv)
    case float64:
        fmt.Println(k, "is float64", vv)
    case []interface{}:
        fmt.Println(k, "is an array:")
        for i, u := range vv {
            fmt.Println(i, u)
        }
    default:
        fmt.Println(k, "is of a type I don't know how to handle")
    }
}

反序列化对 slice、map、pointer 的处理

我们定义一个 struct 继续对上面例子中的 b 进行反序列化:

1
2
3
4
5
6
7
8
type FamilyMember struct {
    Name    string
    Age     int
    Parents []string
}

var m FamilyMember
err := json.Unmarshal(b, &m)

这个例子是能够正常工作的,你一定也注意到了,struct 中包含一个 slice Parents ,slice 默认是 nil,之所以反序列化可以正常进行就是因为 Unmarshal 在序列化时进行了对 slice Parents 做了初始化,同理,对 map 和 pointer 都会做类似的工作,比如序列化如果 Pointer 不是 nil 首先进行 dereference 获得其指向的值,然后再进行序列化,反序列化时首先对 nil pointer 进行初始化

Stream JSON

除了 marshal 和 unmarshal 函数,Go 还提供了 Decoder 和 Encoder 对 stream JSON 进行处理,常见 request 中的 Body、文件等:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
jsonFile, err := os.Open("post.json")
if err != nil {
    fmt.Println("Error opening json file:", err)
    return
}

defer jsonFile.Close()
decoder := json.NewDecoder(jsonFile)
for {
    var post Post
    err := decoder.Decode(&post)
    if err == io.EOF {
        break
    }

    if err != nil {
        fmt.Println("error decoding json:", err)
        return
    }

    fmt.Println(post)
}

嵌入式 struct 的序列化

Go 支持对 nested struct 进行序列化和反序列化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
type App struct {
	Id string `json:"id"`
}

type Org struct {
	Name string `json:"name"`
}

type AppWithOrg struct {
	App
	Org
}

func main() {
	data := []byte(`
        {
            "id": "k34rAT4",
            "name": "My Awesome Org"
        }
    `)

	var b AppWithOrg

	json.Unmarshal(data, &b)
	fmt.Printf("%#v", b)

	a := AppWithOrg{
		App: App{
			Id: "k34rAT4",
		},
		Org: Org{
			Name: "My Awesome Org",
		},
	}
	data, _ = json.Marshal(a)
	fmt.Println(string(data))
}

Nested struct 虽然看起来有点怪异,有些时候它将非常有用。

自定义序列化函数

Go JSON package 中定了两个 Interface Marshaler 和 Unmarshaler ,实现这两个 Interface 可以让你定义的 type 支持序列化操作。

错误处理

总是记得检查序列或反序列化的错误,可以让你的程序更健壮,而不是在出错之后带着错误继续执行下去。

参考资料

  1. 临时忽略struct空字段
  2. 临时添加额外的字段
  3. 临时粘合两个struct
  4. 一个json切分成两个struct
  5. 临时改名struct的字段
  6. 用字符串传递数字
  7. 容忍字符串和数字互转
  8. 容忍空数组作为对象
  9. 使用 MarshalJSON支持time.Time
  10. 使用 RegisterTypeEncoder支持time.Time
  11. 使用 MarshalText支持非字符串作为key的map
  12. 使用 json.RawMessage
  13. 使用 json.Number
  14. 统一更改字段的命名风格
  15. 使用私有的字段
  16. 忽略掉一些字段
  17. 忽略掉一些字段2

taowenjson-iterator的作者。 序列化和反序列化需要处理JSON和struct的关系,其中会用到一些技巧。 原文 Golang 中使用 JSON 的小技巧是他的经验之谈,介绍了一些struct解析成json的技巧,以及 json-iterator 库的一些便利的处理。

有的时候上游传过来的字段是string类型的,但是我们却想用变成数字来使用。 本来用一个json:”,string” 就可以支持了,如果不知道golang的这些小技巧,就要大费周章了。

参考文章:http://attilaolah.eu/2014/09/10/json-and-struct-composition-in-go/

临时忽略struct空字段

1
2
3
4
5
type User struct {
Email string `json:”email”`
Password string `json:”password”`
// many more fields…
}

如果想临时忽略掉空Password字段,可以用omitempty:

1
2
3
4
5
6
json.Marshal(struct {
*User
Password bool `json:”password,omitempty”`
}{
User: user,
})

临时添加额外的字段

1
2
3
4
5
type User struct {
Email string `json:”email”`
Password string `json:”password”`
// many more fields…
}

临时忽略掉空Password字段,并且添加token字段

1
2
3
4
5
6
7
8
json.Marshal(struct {
*User
Token string `json:”token”`
Password bool `json:”password,omitempty”`
}{
User: user,
Token: token,
})

临时粘合两个struct

通过嵌入struct的方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type BlogPost struct {
URL string `json:”url”`
Title string `json:”title”`
}
type Analytics struct {
Visitors int `json:”visitors”`
PageViews int `json:”page_views”`
}
json.Marshal(struct{
*BlogPost
*Analytics
}{post, analytics})

一个json切分成两个struct

1
2
3
4
5
6
7
8
9
json.Unmarshal([]byte(`{
“url”: “attila@attilaolah.eu”,
“title”: “Attila’s Blog”,
“visitors”: 6,
“page_views”: 14
}`), &struct {
*BlogPost
*Analytics
}{&post, &analytics})

临时改名struct的字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type CacheItem struct {
Key string `json:”key”`
MaxAge int `json:”cacheAge”`
Value Value `json:”cacheValue”`
}
json.Marshal(struct{
*CacheItem
// Omit bad keys
OmitMaxAge omit `json:”cacheAge,omitempty”`
OmitValue omit `json:”cacheValue,omitempty”`
// Add nice keys
MaxAge int `json:”max_age”`
Value *Value `json:”value”`
}{
CacheItem: item,
// Set the int by value:
MaxAge: item.MaxAge,
// Set the nested struct by reference, avoid making a copy:
Value: &item.Value,
})

用字符串传递数字

1
2
3
type TestObject struct {
Field1 int `json:”,string”`
}

这个对应的json是 {"Field1": "100"}

如果json是 {"Field1": 100} 则会报错

容忍字符串和数字互转

如果你使用的是jsoniter,可以启动模糊模式来支持 PHP 传递过来的 JSON。

1
2
3
import “github.com/json-iterator/go/extra”
extra.RegisterFuzzyDecoders()

这样就可以处理字符串和数字类型不对的问题了。比如

1
2
var val string
jsoniter.UnmarshalFromString(`100`, &val)

又比如

1
2
var val float32
jsoniter.UnmarshalFromString(`”1.23″`, &val)

容忍空数组作为对象

PHP另外一个令人崩溃的地方是,如果 PHP array是空的时候,序列化出来是[]。但是不为空的时候,序列化出来的是{"key":"value"}。 我们需要把 [] 当成 {} 处理。

如果你使用的是jsoniter,可以启动模糊模式来支持 PHP 传递过来的 JSON。

1
2
3
import “github.com/json-iterator/go/extra”
extra.RegisterFuzzyDecoders()

这样就可以支持了

1
2
var val map[string]interface{}
jsoniter.UnmarshalFromString(`[]`, &val)

使用 MarshalJSON支持time.Time

golang 默认会把 time.Time 用字符串方式序列化。如果我们想用其他方式表示 time.Time,需要自定义类型并定义 MarshalJSON

1
2
3
4
5
6
type timeImplementedMarshaler time.Time
func (obj timeImplementedMarshaler) MarshalJSON() ([]byte, error) {
seconds := time.Time(obj).Unix()
return []byte(strconv.FormatInt(seconds, 10)), nil
}

序列化的时候会调用 MarshalJSON

1
2
3
4
5
6
7
8
9
type TestObject struct {
Field timeImplementedMarshaler
}
should := require.New(t)
val := timeImplementedMarshaler(time.Unix(123, 0))
obj := TestObject{val}
bytes, err := jsoniter.Marshal(obj)
should.Nil(err)
should.Equal(`{“Field”:123}`, string(bytes))

使用 RegisterTypeEncoder支持time.Time

jsoniter 能够对不是你定义的type自定义JSON编解码方式。比如对于 time.Time 可以用 epoch int64 来序列化

1
2
3
4
5
import “github.com/json-iterator/go/extra”
extra.RegisterTimeAsInt64Codec(time.Microsecond)
output, err := jsoniter.Marshal(time.Unix(1, 1002))
should.Equal(“1000001”, string(output))

如果要自定义的话,参见 RegisterTimeAsInt64Codec 的实现代码

使用 MarshalText支持非字符串作为key的map

虽然 JSON 标准里只支持 string 作为 key 的 map。但是 golang 通过 MarshalText() 接口,使得其他类型也可以作为 map 的 key。例如

1
2
3
4
f, _, _ := big.ParseFloat(“1”, 10, 64, big.ToZero)
val := map[*big.Float]string{f: “2”}
str, err := MarshalToString(val)
should.Equal(`{“1″:”2”}`, str)

其中 big.Float 就实现了 MarshalText()

使用 json.RawMessage

如果部分json文档没有标准格式,我们可以把原始的信息用[]byte保存下来。

1
2
3
4
5
6
7
type TestObject struct {
Field1 string
Field2 json.RawMessage
}
var data TestObject
json.Unmarshal([]byte(`{“field1”: “hello”, “field2”: [1,2,3]}`), &data)
should.Equal(` [1,2,3]`, string(data.Field2))

使用 json.Number

默认情况下,如果是 interface{} 对应数字的情况会是 float64 类型的。如果输入的数字比较大,这个表示会有损精度。所以可以 UseNumber() 启用 json.Number 来用字符串表示数字。

1
2
3
4
5
decoder1 := json.NewDecoder(bytes.NewBufferString(`123`))
decoder1.UseNumber()
var obj1 interface{}
decoder1.Decode(&obj1)
should.Equal(json.Number(“123”), obj1)

jsoniter 支持标准库的这个用法。同时,扩展了行为使得 Unmarshal 也可以支持 UseNumber 了。

1
2
3
4
json := Config{UseNumber:true}.Froze()
var obj interface{}
json.UnmarshalFromString(“123”, &obj)
should.Equal(json.Number(“123”), obj)

统一更改字段的命名风格

经常 JSON 里的字段名 Go 里的字段名是不一样的。我们可以用 field tag 来修改。

1
2
3
4
5
6
7
8
output, err := jsoniter.Marshal(struct {
UserName string `json:”user_name”`
FirstLanguage string `json:”first_language”`
}{
UserName: “taowen”,
FirstLanguage: “Chinese”,
})
should.Equal(`{“user_name”:”taowen”,”first_language”:”Chinese”}`, string(output))

但是一个个字段来设置,太麻烦了。如果使用 jsoniter,我们可以统一设置命名风格。

1
2
3
4
5
6
7
8
9
10
11
12
import “github.com/json-iterator/go/extra”
extra.SetNamingStrategy(LowerCaseWithUnderscores)
output, err := jsoniter.Marshal(struct {
UserName string
FirstLanguage string
}{
UserName: “taowen”,
FirstLanguage: “Chinese”,
})
should.Nil(err)
should.Equal(`{“user_name”:”taowen”,”first_language”:”Chinese”}`, string(output))

使用私有的字段

Go 的标准库只支持 public 的 field。jsoniter 额外支持了 private 的 field。需要使用 SupportPrivateFields() 来开启开关。

1
2
3
4
5
6
7
8
9
import “github.com/json-iterator/go/extra”
extra.SupportPrivateFields()
type TestObject struct {
field1 string
}
obj := TestObject{}
jsoniter.UnmarshalFromString(`{“field1″:”Hello”}`, &obj)
should.Equal(“Hello”, obj.field1)

下面是我补充的内容

忽略掉一些字段

原文中第一节有个错误,我更正过来了。omitempty不会忽略某个字段,而是忽略空的字段,当字段的值为空值的时候,它不会出现在JSON数据中。

如果想忽略某个字段,需要使用 json:"-"格式。

1
2
3
4
5
type User struct {
Email string `json:”email”`
Password string `json:”password”`
// many more fields…
}

如果想临时忽略掉空Password字段,可以用-:

1
2
3
4
5
6
json.Marshal(struct {
*User
Password bool `json:”-“`
}{
User: user,
})

忽略掉一些字段2

如果不想更改原struct,还可以使用下面的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type User struct {
Email string `json:”email”`
Password string `json:”password”`
// many more fields…
}
type omit *struct{}
type PublicUser struct {
*User
Password omit `json:”-“`
}
json.Marshal(PublicUser{
User: user,
})