这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos
java -jar swagger-codegen-cli-2.4.9.jar generate \ -i ./openapiv2.json \ -l go \ -o swagger
复制
执行完命令后,控制台输出如下
查看swagger目录,发现已经生成了大量文件
➜ 001 tree swagger swagger ├── README.md ├── api │ └── swagger.yaml ├── api_consumers.go ├── api_default.go ├── api_producer.go ├── api_seek.go ├── api_topics.go ├── client.go ├── configuration.go ├── docs │ ├── AssignedTopicPartitions.md │ ├── BridgeInfo.md │ ├── Consumer.md │ ├── ConsumerRecord.md │ ├── ConsumerRecordList.md │ ├── ConsumersApi.md │ ├── CreatedConsumer.md │ ├── DefaultApi.md │ ├── KafkaHeader.md │ ├── KafkaHeaderList.md │ ├── ModelError.md │ ├── OffsetCommitSeek.md │ ├── OffsetCommitSeekList.md │ ├── OffsetRecordSent.md │ ├── OffsetRecordSentList.md │ ├── OffsetsSummary.md │ ├── Partition.md │ ├── PartitionMetadata.md │ ├── Partitions.md │ ├── ProducerApi.md │ ├── ProducerRecord.md │ ├── ProducerRecordList.md │ ├── ProducerRecordToPartition.md │ ├── ProducerRecordToPartitionList.md │ ├── Replica.md │ ├── SeekApi.md │ ├── SubscribedTopicList.md │ ├── TopicMetadata.md │ ├── Topics.md │ └── TopicsApi.md ├── git_push.sh ├── model_assigned_topic_partitions.go ├── model_bridge_info.go ├── model_consumer.go ├── model_consumer_record.go ├── model_consumer_record_list.go ├── model_created_consumer.go ├── model_error.go ├── model_kafka_header.go ├── model_kafka_header_list.go ├── model_offset_commit_seek.go ├── model_offset_commit_seek_list.go ├── model_offset_record_sent.go ├── model_offset_record_sent_list.go ├── model_offsets_summary.go ├── model_partition.go ├── model_partition_metadata.go ├── model_partitions.go ├── model_producer_record.go ├── model_producer_record_list.go ├── model_producer_record_to_partition.go ├── model_producer_record_to_partition_list.go ├── model_replica.go ├── model_subscribed_topic_list.go ├── model_topic_metadata.go ├── model_topics.go └── response.go 2 directories, 66 files
复制
go mod init sdkdemo
复制
go get golang.org/x/oauth2 go get github.com/antihax/optional
复制
将前面生成代码的swagger文件夹复制到sdkdemo的文件夹下面
现在sdkdemo的文件夹下面有这些东西
为了方便开发,接下来用IDE工具进行开发,我这里用的是goland,打开项目后新增名为main.go的文件
接下来咱们要面对的是一堆破绽百出的sdk代码,不过还好,可以拯救,咱们一起啦拯救吧
func (c *APIClient) decode(v interface{}, b []byte, contentType string) (err error) {
if strings.Contains(contentType, "application/xml") {
if err = xml.Unmarshal(b, v); err != nil {
return err
}
return nil
} else if strings.Contains(contentType, "application/json") {
if err = json.Unmarshal(b, v); err != nil {
return err
}
return nil
}
return errors.New("undefined response type")
}
复制
func (c *APIClient) decode(v interface{}, b []byte, contentType string) (err error) {
if strings.Contains(contentType, "application/xml") {
if err = xml.Unmarshal(b, v); err != nil {
return err
}
return nil
} else if strings.Contains(contentType, "application/json") ||
strings.Contains(contentType, "application/vnd.kafka.v2+json") ||
strings.Contains(contentType, "application/vnd.kafka.json.v2+json") {
if err = json.Unmarshal(b, v); err != nil {
return err
}
return nil
}
return errors.New("undefined response type")
}
复制
// Set request body from an interface{}
func setBody(body interface{}, contentType string) (bodyBuf *bytes.Buffer, err error) {
if bodyBuf == nil {
bodyBuf = &bytes.Buffer{}
}
if reader, ok := body.(io.Reader); ok {
_, err = bodyBuf.ReadFrom(reader)
} else if b, ok := body.([]byte); ok {
_, err = bodyBuf.Write(b)
} else if s, ok := body.(string); ok {
_, err = bodyBuf.WriteString(s)
} else if s, ok := body.(*string); ok {
_, err = bodyBuf.WriteString(*s)
} else if jsonCheck.MatchString(contentType) {
err = json.NewEncoder(bodyBuf).Encode(body)
} else if xmlCheck.MatchString(contentType) {
xml.NewEncoder(bodyBuf).Encode(body)
}
if err != nil {
return nil, err
}
if bodyBuf.Len() == 0 {
err = fmt.Errorf("Invalid body type %s\n", contentType)
return nil, err
}
return bodyBuf, nil
}
复制
package swagger
type ConsumerRecordList struct {
}
复制
package swagger
type ConsumerRecordList []ConsumerRecord
复制
package swagger
type ProducerRecord struct {
Partition int32 `json:"partition,omitempty"`
Headers *KafkaHeaderList `json:"headers,omitempty"`
}
复制
package swagger
type ProducerRecord struct {
Partition int32 `json:"partition,omitempty"`
Value string `json:"value"`
Key string `json:"key,omitempty"`
Headers *KafkaHeaderList `json:"headers,omitempty"`
}
复制
最后一个问题,是在提交offset的时候,bridge后台不接受contentType,所以请打开文件api_consumers.go,修改如下,注释掉一行代码
坑已经填完了,开始验证SDK能不能用吧
// 测试用的topic
const TEST_TOPIC = "bridge-quickstart-topic"
const TEST_GROUP = "client-sdk-group"
const CONSUMER_NAME = "client-sdk-consumer-002"
// strimzi bridge地址
const BASE_PATH = "http://127.0.0.1:31331"
var client *swagger.APIClient
func init() {
configuration := swagger.NewConfiguration()
configuration.BasePath = BASE_PATH
client = swagger.NewAPIClient(configuration)
}
复制
func getAllTopics() ([]string, error) {
array, response, err := client.TopicsApi.ListTopics(context.Background())
if err != nil {
log.Printf("getAllTopics err: %v\n", err)
return nil, err
}
log.Printf("response: %v", response)
return array, nil
}
复制
func main() {
topics, err := getAllTopics()
if err != nil {
return
}
fmt.Printf("topics: %v\n", topics)
}
复制
2022/12/18 21:26:33 response: &{200 OK 200 HTTP/1.1 1 1 map[Content-Length:[109] Content-Type:[application/vnd.kafka.v2+json]] 0x140000e0300 109 [] false false map[] 0x14000118100 <nil>} topics: [__strimzi_store_topic bridge-quickstart-topic __strimzi-topic-operator-kstreams-topic-store-changelog] Process finished with the exit code 0
复制
// 发送消息(异步模式,不会收到offset返回)
func sendAsync(info string) error {
log.Print("send [" + info + "]")
_, response, err := client.ProducerApi.Send(context.Background(),
TEST_TOPIC,
swagger.ProducerRecordList{
Records: []swagger.ProducerRecord{
{Value: "message from go swagger SDK"},
},
},
&swagger.SendOpts{Async: optional.NewBool(true)},
)
if err != nil {
log.Printf("send err: %v\n", err)
return err
}
log.Printf("response: %v", response.StatusCode)
return nil
}
复制
func main() {
for i := 0; i < 10; i++ {
sendAsync("message from go client " + strconv.Itoa(i))
}
}
复制
/private/var/folders/5v/p3bj9bzx2nd99y5l21nb1c080000gn/T/GoLand/___go_build_sdkdemo 2022/12/18 21:35:47 send [message from go client 0] 2022/12/18 21:35:47 response: 204 2022/12/18 21:35:47 send [message from go client 1] 2022/12/18 21:35:47 response: 204 2022/12/18 21:35:47 send [message from go client 2] 2022/12/18 21:35:47 response: 204 2022/12/18 21:35:47 send [message from go client 3] 2022/12/18 21:35:47 response: 204 2022/12/18 21:35:47 send [message from go client 4] 2022/12/18 21:35:47 response: 204 2022/12/18 21:35:47 send [message from go client 5] 2022/12/18 21:35:47 response: 204 2022/12/18 21:35:47 send [message from go client 6] 2022/12/18 21:35:47 response: 204 2022/12/18 21:35:47 send [message from go client 7] 2022/12/18 21:35:47 response: 204 2022/12/18 21:35:47 send [message from go client 8] 2022/12/18 21:35:47 response: 204 2022/12/18 21:35:47 send [message from go client 9] 2022/12/18 21:35:47 response: 204 Process finished with the exit code 0
复制
// 取出swagger特有的error类型,从中提取中有效的错误信息
func getErrorMessage(err error) string {
e := err.(swagger.GenericSwaggerError)
return string(e.Body())
}
func getBodyStr(body io.ReadCloser) string {
buf := new(bytes.Buffer)
buf.ReadFrom(body)
return buf.String()
}
复制
// 创建consumer
func CreateConsumer(group string, consumerName string) (*swagger.CreatedConsumer, error) {
consumer, response, err := client.ConsumersApi.CreateConsumer(context.Background(),
group,
swagger.Consumer{
Name: consumerName,
AutoOffsetReset: "latest",
FetchMinBytes: 16,
ConsumerRequestTimeoutMs: 300 * 1000,
EnableAutoCommit: false,
Format: "json",
})
if err != nil {
log.Printf("CreateConsumer error : %v", getErrorMessage(err))
return nil, err
}
log.Printf("CreateConsumer response : %v, body [%v]", response, getBodyStr(response.Body))
log.Printf("consumer : %v", consumer)
return &consumer, nil
}
复制
func main() {
// 创建consumer
CreateConsumer(TEST_GROUP, CONSUMER_NAME)
}
复制
// 订阅
func Subsciribe(topic string, consumerGroup string, consumerName string) error {
response, err := client.ConsumersApi.Subscribe(context.Background(),
swagger.Topics{Topics: []string{topic}},
consumerGroup,
consumerName,
)
if err != nil {
log.Printf("Subscribe error : %v", err)
return err
}
log.Printf("Subscribe response : %v", response)
return nil
}
复制
func main() {
err := Subsciribe(TEST_TOPIC, TEST_GROUP, CONSUMER_NAME)
if err != nil {
fmt.Printf("err : %v\n", err)
}
}
复制
// 拉取消息
func Poll(consumerGroup string, consumerName string) error {
// ctx context.Context, groupid string, name string, localVarOptionals *PollOpts
recordList, response, err := client.ConsumersApi.Poll(context.Background(), consumerGroup, consumerName, nil)
if err != nil {
log.Printf("Poll error : %v", err)
return err
}
log.Printf("Poll response : %v", response)
fmt.Printf("recordList: %v\n", recordList)
return nil
}
复制
func main() {
Poll(TEST_GROUP, CONSUMER_NAME)
}
复制
/private/var/folders/5v/p3bj9bzx2nd99y5l21nb1c080000gn/T/GoLand/___go_build_sdkdemo 2022/12/18 21:43:16 Poll response : &{200 OK 200 HTTP/1.1 1 1 map[Content-Length:[2301] Content-Type:[application/vnd.kafka.json.v2+json]] 0x140000e0340 2301 [] false false map[] 0x1400011a100 <nil>} recordList: [{ 163468 0 bridge-quickstart-topic message from go swagger SDK <nil>} { 163469 0 bridge-quickstart-topic message from go swagger SDK <nil>} { 163470 0 bridge-quickstart-topic message from go swagger SDK <nil>} { 163471 0 bridge-quickstart-topic message from go swagger SDK <nil>} { 163472 0 bridge-quickstart-topic message from go swagger SDK <nil>} { 163473 0 bridge-quickstart-topic message from go swagger SDK <nil>} { 162246 2 bridge-quickstart-topic message from go swagger SDK <nil>} { 162247 2 bridge-quickstart-topic message from go swagger SDK <nil>} { 162248 2 bridge-quickstart-topic message from go swagger SDK <nil>} { 162249 2 bridge-quickstart-topic message from go swagger SDK <nil>} { 162250 2 bridge-quickstart-topic message from go swagger SDK <nil>} { 163669 1 bridge-quickstart-topic message from go swagger SDK <nil>} { 163670 1 bridge-quickstart-topic message from go swagger SDK <nil>} { 163671 1 bridge-quickstart-topic message from go swagger SDK <nil>} { 163672 1 bridge-quickstart-topic message from go swagger SDK <nil>} { 163146 3 bridge-quickstart-topic message from go swagger SDK <nil>} { 163147 3 bridge-quickstart-topic message from go swagger SDK <nil>} { 163148 3 bridge-quickstart-topic message from go swagger SDK <nil>} { 163149 3 bridge-quickstart-topic message from go swagger SDK <nil>} { 163150 3 bridge-quickstart-topic message from go swagger SDK <nil>}] Process finished with the exit code 0
复制
// 提交offset
func Offset(consumerGroup string, consumerName string) error {
response, err := client.ConsumersApi.Commit(context.Background(),
consumerGroup,
consumerName, nil)
if err != nil {
log.Printf("Poll error : %v", err)
return err
}
log.Printf("Offset response : %v", response)
return nil
}
复制
func main() {
err := Offset(TEST_GROUP, CONSUMER_NAME)
if err != nil {
print(err)
}
}
复制
/private/var/folders/5v/p3bj9bzx2nd99y5l21nb1c080000gn/T/GoLand/___go_build_sdkdemo 2022/12/18 22:07:38 Offset response : &{204 No Content 204 HTTP/1.1 1 1 map[] {} 0 [] false false map[] 0x1400011a100 <nil>} Process finished with the exit code 0
复制