From a0173f53fa2960eda74d722f2eb7a4c224fd349a Mon Sep 17 00:00:00 2001 From: Kang Date: Sat, 27 Apr 2024 23:55:17 +0800 Subject: [PATCH 1/7] add config log_request to log doris stream load response --- extension/beats/doris/client.go | 312 +++++++++++++++++--------------- extension/beats/doris/config.go | 128 ++++++------- extension/beats/doris/doris.go | 169 ++++++++--------- 3 files changed, 312 insertions(+), 297 deletions(-) diff --git a/extension/beats/doris/client.go b/extension/beats/doris/client.go index c6775dc321be80..28ab82771e310b 100644 --- a/extension/beats/doris/client.go +++ b/extension/beats/doris/client.go @@ -20,181 +20,193 @@ package doris import ( - "context" - "fmt" - "net/http" - "net/http/httputil" - "strings" - "time" - - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/libbeat/outputs" - "github.com/elastic/beats/v7/libbeat/outputs/codec" - "github.com/elastic/beats/v7/libbeat/publisher" + "context" + "fmt" + "net/http" + "net/http/httputil" + "strings" + "time" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/outputs" + "github.com/elastic/beats/v7/libbeat/outputs/codec" + "github.com/elastic/beats/v7/libbeat/publisher" ) type client struct { - url string - httpClient *http.Client - headers map[string]string - beat beat.Info - codec codec.Codec - - database string - table string - labelPrefix string - lineDelimiter string - - observer outputs.Observer - logger *logp.Logger + url string + httpClient *http.Client + headers map[string]string + beat beat.Info + codec codec.Codec + + database string + table string + labelPrefix string + lineDelimiter string + logRequest bool + + observer outputs.Observer + logger *logp.Logger } type clientSettings struct { - URL string - Timeout time.Duration - Headers map[string]string - - Database string - Table string - LabelPrefix string - LineDelimiter string - - Beat beat.Info - Codec codec.Codec - Observer outputs.Observer - Logger *logp.Logger + URL string + Timeout time.Duration + Headers map[string]string + + Database string + Table string + LabelPrefix string + LineDelimiter string + LogRequest bool + + Beat beat.Info + Codec codec.Codec + Observer outputs.Observer + Logger *logp.Logger } func (s clientSettings) String() string { - return fmt.Sprintf("clientSettings{%s, %s, %s, %s}", s.URL, s.Timeout, s.LabelPrefix, s.Headers) + return fmt.Sprintf("clientSettings{%s, %s, %s, %s}", s.URL, s.Timeout, s.LabelPrefix, s.Headers) } func NewDorisClient(s clientSettings) (*client, error) { - s.Logger.Infof("Received settings: %s", s) - - client := &client{ - url: s.URL, - httpClient: &http.Client{ - Timeout: s.Timeout, - }, - headers: s.Headers, - - database: s.Database, - table: s.Table, - labelPrefix: s.LabelPrefix, - lineDelimiter: s.LineDelimiter, - - codec: s.Codec, - observer: s.Observer, - logger: s.Logger, - } - return client, nil + s.Logger.Infof("Received settings: %s", s) + + client := &client{ + url: s.URL, + httpClient: &http.Client{ + Timeout: s.Timeout, + }, + headers: s.Headers, + + database: s.Database, + table: s.Table, + labelPrefix: s.LabelPrefix, + lineDelimiter: s.LineDelimiter, + logRequest: s.LogRequest, + + codec: s.Codec, + observer: s.Observer, + logger: s.Logger, + } + return client, nil } func (client *client) Connect() error { - return nil + return nil } func (client *client) Close() error { - return nil + return nil } func (client *client) String() string { - return fmt.Sprintf("doris{%s, %s, %s}", client.url, client.labelPrefix, client.headers) + return fmt.Sprintf("doris{%s, %s, %s}", client.url, client.labelPrefix, client.headers) } func (client *client) Publish(_ context.Context, batch publisher.Batch) error { - events := batch.Events() - length := len(events) - client.logger.Debugf("Received events: %d", length) - - label := fmt.Sprintf("%s_%s_%s_%d", client.labelPrefix, client.database, client.table, time.Now().UnixMilli()) - rest, err := client.publishEvents(label, events) - - if len(rest) == 0 { - batch.ACK() - client.logger.Debugf("Success send events: %d", length) - } else { - client.observer.Failed(length) - batch.RetryEvents(rest) - client.logger.Warnf("Retry send events: %d", length) - } - return err + events := batch.Events() + length := len(events) + client.logger.Debugf("Received events: %d", length) + + label := fmt.Sprintf("%s_%s_%s_%d", client.labelPrefix, client.database, client.table, time.Now().UnixMilli()) + rest, err := client.publishEvents(label, events) + + if len(rest) == 0 { + batch.ACK() + client.logger.Debugf("Success send events: %d", length) + } else { + client.observer.Failed(length) + batch.RetryEvents(rest) + client.logger.Warnf("Retry send events: %d", length) + } + return err } func (client *client) publishEvents(lable string, events []publisher.Event) ([]publisher.Event, error) { - begin := time.Now() - - var logFirstEvent []byte - var stringBuilder strings.Builder - - dropped := 0 - for i := range events { - event := &events[i] - serializedEvent, err := client.codec.Encode(client.beat.Beat, &event.Content) - - if err != nil { - if event.Guaranteed() { - client.logger.Errorf("Failed to serialize the event: %+v", err) - } else { - client.logger.Warnf("Failed to serialize the event: %+v", err) - } - client.logger.Debugf("Failed event: %v", event) - - dropped++ - continue - } - - if logFirstEvent == nil { - logFirstEvent = serializedEvent - } - stringBuilder.Write(serializedEvent) - stringBuilder.WriteString(client.lineDelimiter) - } - request, requestErr := http.NewRequest(http.MethodPut, client.url, strings.NewReader(stringBuilder.String())) - if requestErr != nil { - client.logger.Errorf("Failed to create request: %s", requestErr) - return events, requestErr - } - - request.Header.Set("label", lable) - for k, v := range client.headers { - request.Header.Set(k, v) - } - response, responseErr := client.httpClient.Do(request) - if responseErr != nil { - client.logger.Errorf("Failed to stream-load request: %v", responseErr) - return events, responseErr - } - - defer response.Body.Close() - if response.StatusCode < 200 || response.StatusCode >= 300 { - client.logger.Errorf("Failed to stream-load request with status code %d", - response.StatusCode) - - reqBytes, reqErr := httputil.DumpRequestOut(request, false) - if reqErr != nil { - client.logger.Errorf("Failed to dump stream-load request: %v", reqErr) - } else { - client.logger.Errorf("Stream-Load request dump:\n%s\n first event: %s", - string(reqBytes), string(logFirstEvent)) - } - - respBytes, respErr := httputil.DumpResponse(response, true) - if respErr != nil { - client.logger.Errorf("Failed to dump stream-load response: %v", respErr) - } else { - client.logger.Errorf("Stream-Load response dump:\n%s", string(respBytes)) - } - return events, nil - } - - client.logger.Debugf("Stream-Load publish events: %d events have been published to doris in %v.", - len(events)-dropped, - time.Now().Sub(begin)) - - client.observer.Dropped(dropped) - client.observer.Acked(len(events) - dropped) - return nil, nil + begin := time.Now() + + var logFirstEvent []byte + var stringBuilder strings.Builder + + dropped := 0 + for i := range events { + event := &events[i] + serializedEvent, err := client.codec.Encode(client.beat.Beat, &event.Content) + + if err != nil { + if event.Guaranteed() { + client.logger.Errorf("Failed to serialize the event: %+v", err) + } else { + client.logger.Warnf("Failed to serialize the event: %+v", err) + } + client.logger.Debugf("Failed event: %v", event) + + dropped++ + continue + } + + if logFirstEvent == nil { + logFirstEvent = serializedEvent + } + stringBuilder.Write(serializedEvent) + stringBuilder.WriteString(client.lineDelimiter) + } + request, requestErr := http.NewRequest(http.MethodPut, client.url, strings.NewReader(stringBuilder.String())) + if requestErr != nil { + client.logger.Errorf("Failed to create request: %s", requestErr) + return events, requestErr + } + + request.Header.Set("label", lable) + for k, v := range client.headers { + request.Header.Set(k, v) + } + response, responseErr := client.httpClient.Do(request) + if responseErr != nil { + client.logger.Errorf("Failed to stream-load request: %v", responseErr) + return events, responseErr + } + + defer response.Body.Close() + if response.StatusCode < 200 || response.StatusCode >= 300 { + client.logger.Errorf("Failed to stream-load request with status code %d", + response.StatusCode) + + reqBytes, reqErr := httputil.DumpRequestOut(request, false) + if reqErr != nil { + client.logger.Errorf("Failed to dump stream-load request: %v", reqErr) + } else { + client.logger.Errorf("Stream-Load request dump:\n%s\n first event: %s", + string(reqBytes), string(logFirstEvent)) + } + + respBytes, respErr := httputil.DumpResponse(response, true) + if respErr != nil { + client.logger.Errorf("Failed to dump stream-load response: %v", respErr) + } else { + client.logger.Errorf("Stream-Load response dump:\n%s", string(respBytes)) + } + return events, nil + } + + if client.logRequest { + respBytes, respErr := httputil.DumpResponse(response, true) + if respErr != nil { + client.logger.Errorf("Failed to dump doris stream load response: %v, error: %v", response, respErr) + } else { + client.logger.Infof("doris stream load response response:\n%s", string(respBytes)) + } + } + + client.logger.Debugf("Stream-Load publish events: %d events have been published to doris in %v.", + len(events)-dropped, + time.Now().Sub(begin)) + + client.observer.Dropped(dropped) + client.observer.Acked(len(events) - dropped) + return nil, nil } diff --git a/extension/beats/doris/config.go b/extension/beats/doris/config.go index 6c374716466cd7..6db30fdbbe283e 100644 --- a/extension/beats/doris/config.go +++ b/extension/beats/doris/config.go @@ -20,87 +20,89 @@ package doris import ( - "encoding/base64" - "errors" - "strings" - "time" + "encoding/base64" + "errors" + "strings" + "time" - "github.com/elastic/beats/v7/libbeat/outputs/codec" + "github.com/elastic/beats/v7/libbeat/outputs/codec" ) type config struct { - Hosts []string `config:"fenodes" validate:"required"` - User string `config:"user"` - Password string `config:"password"` - Database string `config:"database" validate:"required"` - Table string `config:"table" validate:"required"` - LabelPrefix string `config:"label_prefix"` - LineDelimiter string `config:"line_delimiter"` + Hosts []string `config:"fenodes" validate:"required"` + User string `config:"user"` + Password string `config:"password"` + Database string `config:"database" validate:"required"` + Table string `config:"table" validate:"required"` + LabelPrefix string `config:"label_prefix"` + LineDelimiter string `config:"line_delimiter"` + logRequest bool `config:"log_request"` - Headers map[string]string `config:"headers"` + Headers map[string]string `config:"headers"` - CodecFormatString string `config:"codec_format_string"` - Codec codec.Config `config:"codec"` - Timeout time.Duration `config:"timeout"` - BulkMaxSize int `config:"bulk_max_size" validate:"min=1,nonzero"` - MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"` - Backoff backoff `config:"backoff"` + CodecFormatString string `config:"codec_format_string"` + Codec codec.Config `config:"codec"` + Timeout time.Duration `config:"timeout"` + BulkMaxSize int `config:"bulk_max_size" validate:"min=1,nonzero"` + MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"` + Backoff backoff `config:"backoff"` } type backoff struct { - Init time.Duration `config:"init"` - Max time.Duration `config:"max"` + Init time.Duration `config:"init"` + Max time.Duration `config:"max"` } func defaultConfig() config { - return config{ - Password: "", - LabelPrefix: "doris_beats", - LineDelimiter: "\n", + return config{ + Password: "", + LabelPrefix: "doris_beats", + LineDelimiter: "\n", + logRequest: true, - BulkMaxSize: 100000, - MaxRetries: 3, - Backoff: backoff{ - Init: 1 * time.Second, - Max: 60 * time.Second, - }, - } + BulkMaxSize: 100000, + MaxRetries: 3, + Backoff: backoff{ + Init: 1 * time.Second, + Max: 60 * time.Second, + }, + } } func (c *config) Validate() error { - if len(c.Hosts) == 0 { - return errors.New("no http_hosts configured") - } - if len(c.Database) == 0 { - return errors.New("no database configured") - } - if len(c.Table) == 0 { - return errors.New("no table configured") - } - if len(c.CodecFormatString) == 0 && &c.Codec == nil { - return errors.New("no codec_format_expression|codec configured") - } - return nil + if len(c.Hosts) == 0 { + return errors.New("no http_hosts configured") + } + if len(c.Database) == 0 { + return errors.New("no database configured") + } + if len(c.Table) == 0 { + return errors.New("no table configured") + } + if len(c.CodecFormatString) == 0 && &c.Codec == nil { + return errors.New("no codec_format_expression|codec configured") + } + return nil } func (c *config) createHeaders() map[string]string { - headers := make(map[string]string) - headers["Expect"] = "100-continue" - headers["Content-Type"] = "text/plain;charset=utf-8" - if len(c.User) != 0 { - headers["Authorization"] = "Basic " + base64.StdEncoding.EncodeToString([]byte(c.User+":"+c.Password)) - } - if len(c.LineDelimiter) != 0 && !strings.EqualFold(c.LineDelimiter, "\n") { - headers["line_delimiter"] = c.LineDelimiter - } + headers := make(map[string]string) + headers["Expect"] = "100-continue" + headers["Content-Type"] = "text/plain;charset=utf-8" + if len(c.User) != 0 { + headers["Authorization"] = "Basic " + base64.StdEncoding.EncodeToString([]byte(c.User+":"+c.Password)) + } + if len(c.LineDelimiter) != 0 && !strings.EqualFold(c.LineDelimiter, "\n") { + headers["line_delimiter"] = c.LineDelimiter + } - if c.Headers != nil && len(c.Headers) != 0 { - for k, v := range c.Headers { - if strings.EqualFold("line_delimiter", k) { - c.LineDelimiter = v - } - headers[k] = v - } - } - return headers + if c.Headers != nil && len(c.Headers) != 0 { + for k, v := range c.Headers { + if strings.EqualFold("line_delimiter", k) { + c.LineDelimiter = v + } + headers[k] = v + } + } + return headers } diff --git a/extension/beats/doris/doris.go b/extension/beats/doris/doris.go index a8d15915c48d09..f9bd16ae5bc0e2 100644 --- a/extension/beats/doris/doris.go +++ b/extension/beats/doris/doris.go @@ -20,101 +20,102 @@ package doris import ( - "fmt" + "fmt" - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/libbeat/outputs" - "github.com/elastic/beats/v7/libbeat/outputs/codec" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/outputs" + "github.com/elastic/beats/v7/libbeat/outputs/codec" ) const logSelector = "doris" func init() { - outputs.RegisterType("doris", makeDoris) + outputs.RegisterType("doris", makeDoris) } func makeDoris( - _ outputs.IndexManager, - beat beat.Info, - observer outputs.Observer, - cfg *common.Config, + _ outputs.IndexManager, + beat beat.Info, + observer outputs.Observer, + cfg *common.Config, ) (outputs.Group, error) { - logger := logp.NewLogger(logSelector) - - config := defaultConfig() - if err := cfg.Unpack(&config); err != nil { - return outputs.Fail(err) - } - if err := config.Validate(); err != nil { - return outputs.Fail(err) - } - - codecConfig := loadCodecConfig(config) - codec, err := codec.CreateEncoder(beat, codecConfig) - if err != nil { - return outputs.Fail(err) - } - - clients := make([]outputs.NetworkClient, len(config.Hosts)) - for i, host := range config.Hosts { - logger.Info("Making client for host: " + host) - url, err := parseURL(host) - if err != nil { - return outputs.Fail(err) - } - - streamLoadPath := fmt.Sprintf("/api/%s/%s/_stream_load", config.Database, config.Table) - hostURL, err := common.MakeURL(url.Scheme, streamLoadPath, host, 80) - if err != nil { - logger.Errorf("Invalid host param set: %s, Error: %+v", host, err) - return outputs.Fail(err) - } - logger.Infof("Final http connection endpoint: %s", hostURL) - - var client outputs.NetworkClient - client, err = NewDorisClient(clientSettings{ - URL: hostURL, - Timeout: config.Timeout, - Observer: observer, - Headers: config.createHeaders(), - Codec: codec, - LineDelimiter: config.LineDelimiter, - - LabelPrefix: config.LabelPrefix, - Database: config.Database, - Table: config.Table, - - Logger: logger, - }) - if err != nil { - return outputs.Fail(err) - } - - client = outputs.WithBackoff(client, config.Backoff.Init, config.Backoff.Max) - clients[i] = client - } - retry := 0 - if config.MaxRetries < 0 { - retry = -1 - } else { - retry = config.MaxRetries - } - return outputs.SuccessNet(true, config.BulkMaxSize, retry, clients) + logger := logp.NewLogger(logSelector) + + config := defaultConfig() + if err := cfg.Unpack(&config); err != nil { + return outputs.Fail(err) + } + if err := config.Validate(); err != nil { + return outputs.Fail(err) + } + + codecConfig := loadCodecConfig(config) + codec, err := codec.CreateEncoder(beat, codecConfig) + if err != nil { + return outputs.Fail(err) + } + + clients := make([]outputs.NetworkClient, len(config.Hosts)) + for i, host := range config.Hosts { + logger.Info("Making client for host: " + host) + url, err := parseURL(host) + if err != nil { + return outputs.Fail(err) + } + + streamLoadPath := fmt.Sprintf("/api/%s/%s/_stream_load", config.Database, config.Table) + hostURL, err := common.MakeURL(url.Scheme, streamLoadPath, host, 80) + if err != nil { + logger.Errorf("Invalid host param set: %s, Error: %+v", host, err) + return outputs.Fail(err) + } + logger.Infof("Final http connection endpoint: %s", hostURL) + + var client outputs.NetworkClient + client, err = NewDorisClient(clientSettings{ + URL: hostURL, + Timeout: config.Timeout, + Observer: observer, + Headers: config.createHeaders(), + Codec: codec, + LineDelimiter: config.LineDelimiter, + LogRequest: config.logRequest, + + LabelPrefix: config.LabelPrefix, + Database: config.Database, + Table: config.Table, + + Logger: logger, + }) + if err != nil { + return outputs.Fail(err) + } + + client = outputs.WithBackoff(client, config.Backoff.Init, config.Backoff.Max) + clients[i] = client + } + retry := 0 + if config.MaxRetries < 0 { + retry = -1 + } else { + retry = config.MaxRetries + } + return outputs.SuccessNet(true, config.BulkMaxSize, retry, clients) } func loadCodecConfig(config config) codec.Config { - if len(config.CodecFormatString) == 0 { - return config.Codec - } - - beatConfig := common.MustNewConfigFrom(map[string]interface{}{ - "format.string": config.CodecFormatString, - }) - codecConfig := codec.Config{} - if err := beatConfig.Unpack(&codecConfig); err != nil { - panic(err) - } - return codecConfig + if len(config.CodecFormatString) == 0 { + return config.Codec + } + + beatConfig := common.MustNewConfigFrom(map[string]interface{}{ + "format.string": config.CodecFormatString, + }) + codecConfig := codec.Config{} + if err := beatConfig.Unpack(&codecConfig); err != nil { + panic(err) + } + return codecConfig } From d9f681bf5f6ee814e77da4ffc2df13941e85b49d Mon Sep 17 00:00:00 2001 From: Kang Date: Fri, 10 May 2024 14:53:36 +0800 Subject: [PATCH 2/7] parse doris response status and check OK --- extension/beats/doris/client.go | 57 ++++++++++++++++++--------------- extension/beats/doris/doris.go | 1 + 2 files changed, 32 insertions(+), 26 deletions(-) diff --git a/extension/beats/doris/client.go b/extension/beats/doris/client.go index 28ab82771e310b..e6046437fadfd7 100644 --- a/extension/beats/doris/client.go +++ b/extension/beats/doris/client.go @@ -21,7 +21,9 @@ package doris import ( "context" + "encoding/json" "fmt" + "io/ioutil" "net/http" "net/http/httputil" "strings" @@ -68,6 +70,10 @@ type clientSettings struct { Logger *logp.Logger } +type responseStatus struct { + Status string `json:"Status"` +} + func (s clientSettings) String() string { return fmt.Sprintf("clientSettings{%s, %s, %s, %s}", s.URL, s.Timeout, s.LabelPrefix, s.Headers) } @@ -117,11 +123,11 @@ func (client *client) Publish(_ context.Context, batch publisher.Batch) error { if len(rest) == 0 { batch.ACK() - client.logger.Debugf("Success send events: %d", length) + client.logger.Debugf("Success send %d events", length) } else { client.observer.Failed(length) batch.RetryEvents(rest) - client.logger.Warnf("Retry send events: %d", length) + client.logger.Warnf("Retry send %d events", length) } return err } @@ -172,34 +178,33 @@ func (client *client) publishEvents(lable string, events []publisher.Event) ([]p } defer response.Body.Close() - if response.StatusCode < 200 || response.StatusCode >= 300 { - client.logger.Errorf("Failed to stream-load request with status code %d", - response.StatusCode) - - reqBytes, reqErr := httputil.DumpRequestOut(request, false) - if reqErr != nil { - client.logger.Errorf("Failed to dump stream-load request: %v", reqErr) - } else { - client.logger.Errorf("Stream-Load request dump:\n%s\n first event: %s", - string(reqBytes), string(logFirstEvent)) - } - respBytes, respErr := httputil.DumpResponse(response, true) - if respErr != nil { - client.logger.Errorf("Failed to dump stream-load response: %v", respErr) - } else { - client.logger.Errorf("Stream-Load response dump:\n%s", string(respBytes)) - } - return events, nil + responseBytes, responseErr := httputil.DumpResponse(response, true) + if responseErr != nil { + client.logger.Errorf("Failed to dump doris stream load response: %v, error: %v", response, responseErr) + return events, responseErr } if client.logRequest { - respBytes, respErr := httputil.DumpResponse(response, true) - if respErr != nil { - client.logger.Errorf("Failed to dump doris stream load response: %v, error: %v", response, respErr) - } else { - client.logger.Infof("doris stream load response response:\n%s", string(respBytes)) - } + client.logger.Infof("doris stream load response response:\n%s", string(responseBytes)) + } + + body, bodyErr := ioutil.ReadAll(response.Body) + if bodyErr != nil { + client.logger.Errorf("Failed to read doris stream load response body, error: %v, response:\n%v", bodyErr, string(responseBytes)) + return events, bodyErr + } + + var status responseStatus + parseErr := json.Unmarshal(body, &status) + if parseErr != nil { + client.logger.Errorf("Failed to parse doris stream load response to JSON, error: %v, response:\n%v", parseErr, string(responseBytes)) + return events, parseErr + } + + if status.Status != "OK" { + client.logger.Errorf("doris stream load status: %v is not 'OK', full response: %v", status.Status, string(responseBytes)) + return events, nil } client.logger.Debugf("Stream-Load publish events: %d events have been published to doris in %v.", diff --git a/extension/beats/doris/doris.go b/extension/beats/doris/doris.go index f9bd16ae5bc0e2..2095088411234a 100644 --- a/extension/beats/doris/doris.go +++ b/extension/beats/doris/doris.go @@ -102,6 +102,7 @@ func makeDoris( } else { retry = config.MaxRetries } + logger.Infof("config.MaxRetries: %v, retry: %v", config.MaxRetries, retry) return outputs.SuccessNet(true, config.BulkMaxSize, retry, clients) } From 593d6d3b3f9141ca7134e453f4aefdf050246478 Mon Sep 17 00:00:00 2001 From: Kang Date: Sat, 11 May 2024 14:36:15 +0800 Subject: [PATCH 3/7] parse doris response status and check Success --- extension/beats/doris/client.go | 18 ++++++++++-------- extension/beats/doris/doris.go | 2 +- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/extension/beats/doris/client.go b/extension/beats/doris/client.go index e6046437fadfd7..029c2ecf2041fd 100644 --- a/extension/beats/doris/client.go +++ b/extension/beats/doris/client.go @@ -70,14 +70,16 @@ type clientSettings struct { Logger *logp.Logger } -type responseStatus struct { - Status string `json:"Status"` -} - func (s clientSettings) String() string { return fmt.Sprintf("clientSettings{%s, %s, %s, %s}", s.URL, s.Timeout, s.LabelPrefix, s.Headers) } +type ResponseStatus struct { + Status string `json:"Status"` +} + +func (e *ResponseStatus) Error() string { return e.Status } + func NewDorisClient(s clientSettings) (*client, error) { s.Logger.Infof("Received settings: %s", s) @@ -195,16 +197,16 @@ func (client *client) publishEvents(lable string, events []publisher.Event) ([]p return events, bodyErr } - var status responseStatus + var status ResponseStatus parseErr := json.Unmarshal(body, &status) if parseErr != nil { client.logger.Errorf("Failed to parse doris stream load response to JSON, error: %v, response:\n%v", parseErr, string(responseBytes)) return events, parseErr } - if status.Status != "OK" { - client.logger.Errorf("doris stream load status: %v is not 'OK', full response: %v", status.Status, string(responseBytes)) - return events, nil + if status.Status != "Success" { + client.logger.Errorf("doris stream load status: '%v' is not 'Success', full response: %v", status.Status, string(responseBytes)) + return events, &status } client.logger.Debugf("Stream-Load publish events: %d events have been published to doris in %v.", diff --git a/extension/beats/doris/doris.go b/extension/beats/doris/doris.go index 2095088411234a..d6066b91c8823b 100644 --- a/extension/beats/doris/doris.go +++ b/extension/beats/doris/doris.go @@ -66,7 +66,7 @@ func makeDoris( } streamLoadPath := fmt.Sprintf("/api/%s/%s/_stream_load", config.Database, config.Table) - hostURL, err := common.MakeURL(url.Scheme, streamLoadPath, host, 80) + hostURL, err := common.MakeURL(url.Scheme, streamLoadPath, host, 8030) if err != nil { logger.Errorf("Invalid host param set: %s, Error: %+v", host, err) return outputs.Fail(err) From c4f69b7bcdc96b5cb5217727ea2ae3dfb412d972 Mon Sep 17 00:00:00 2001 From: Kang Date: Sun, 16 Jun 2024 18:33:38 +0800 Subject: [PATCH 4/7] add porgress reporter --- extension/beats/doris/client.go | 75 ++++++++++++++++++++++++++++++++- extension/beats/doris/config.go | 26 ++++++------ extension/beats/doris/doris.go | 11 +++-- extension/beats/go.mod | 1 + extension/beats/go.sum | 2 + 5 files changed, 99 insertions(+), 16 deletions(-) diff --git a/extension/beats/doris/client.go b/extension/beats/doris/client.go index 029c2ecf2041fd..54e6c9a7e2257f 100644 --- a/extension/beats/doris/client.go +++ b/extension/beats/doris/client.go @@ -27,6 +27,7 @@ import ( "net/http" "net/http/httputil" "strings" + "sync/atomic" "time" "github.com/elastic/beats/v7/libbeat/beat" @@ -34,6 +35,7 @@ import ( "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/outputs/codec" "github.com/elastic/beats/v7/libbeat/publisher" + "github.com/google/uuid" ) type client struct { @@ -50,6 +52,7 @@ type client struct { logRequest bool observer outputs.Observer + reporter *ProgressReporter logger *logp.Logger } @@ -67,6 +70,7 @@ type clientSettings struct { Beat beat.Info Codec codec.Codec Observer outputs.Observer + Reporter *ProgressReporter Logger *logp.Logger } @@ -80,6 +84,69 @@ type ResponseStatus struct { func (e *ResponseStatus) Error() string { return e.Status } +type ProgressReporter struct { + totalBytes int64 + totalRows int64 + failedRows int64 + interval time.Duration + logger *logp.Logger +} + +func NewProgressReporter(interval int, logger *logp.Logger) *ProgressReporter { + return &ProgressReporter{ + totalBytes: 0, + totalRows: 0, + failedRows: 0, + interval: time.Duration(interval) * time.Second, + logger: logger, + } +} + +func (reporter *ProgressReporter) IncrTotalBytes(bytes int64) { + atomic.AddInt64(&reporter.totalBytes, bytes) +} + +func (reporter *ProgressReporter) IncrTotalRows(rows int64) { + atomic.AddInt64(&reporter.totalRows, rows) +} + +func (reporter *ProgressReporter) IncrFailedRows(rows int64) { + atomic.AddInt64(&reporter.totalRows, rows) +} + +func (reporter *ProgressReporter) Report() { + init_time := time.Now().Unix() + last_time := init_time + last_bytes := atomic.LoadInt64(&reporter.totalBytes) + last_rows := atomic.LoadInt64(&reporter.totalRows) + + reporter.logger.Infof("start progress reporter with interval %v", reporter.interval) + for reporter.interval > 0 { + time.Sleep(reporter.interval) + + cur_time := time.Now().Unix() + cur_bytes := atomic.LoadInt64(&reporter.totalBytes) + cur_rows := atomic.LoadInt64(&reporter.totalRows) + total_time := cur_time - init_time + total_speed_mbps := cur_bytes / 1024 / 1024 / total_time + total_speed_rps := cur_rows / total_time + + inc_bytes := cur_bytes - last_bytes + inc_rows := cur_rows - last_rows + inc_time := cur_time - last_time + inc_speed_mbps := inc_bytes / 1024 / 1024 / inc_time + inc_speed_rps := inc_rows / inc_time + + reporter.logger.Infof("total %v MB %v ROWS, total speed %v MB/s %v R/s, last %v seconds speed %v MB/s %v R/s", + cur_bytes/1024/1024, cur_rows, total_speed_mbps, total_speed_rps, + inc_time, inc_speed_mbps, inc_speed_rps) + + last_time = cur_time + last_bytes = cur_bytes + last_rows = cur_rows + } +} + func NewDorisClient(s clientSettings) (*client, error) { s.Logger.Infof("Received settings: %s", s) @@ -98,6 +165,7 @@ func NewDorisClient(s clientSettings) (*client, error) { codec: s.Codec, observer: s.Observer, + reporter: s.Reporter, logger: s.Logger, } return client, nil @@ -120,7 +188,7 @@ func (client *client) Publish(_ context.Context, batch publisher.Batch) error { length := len(events) client.logger.Debugf("Received events: %d", length) - label := fmt.Sprintf("%s_%s_%s_%d", client.labelPrefix, client.database, client.table, time.Now().UnixMilli()) + label := fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix, client.database, client.table, time.Now().UnixMilli(), uuid.New()) rest, err := client.publishEvents(label, events) if len(rest) == 0 { @@ -154,6 +222,7 @@ func (client *client) publishEvents(lable string, events []publisher.Event) ([]p client.logger.Debugf("Failed event: %v", event) dropped++ + client.reporter.IncrFailedRows(1) continue } @@ -215,5 +284,9 @@ func (client *client) publishEvents(lable string, events []publisher.Event) ([]p client.observer.Dropped(dropped) client.observer.Acked(len(events) - dropped) + + client.reporter.IncrTotalBytes(int64(stringBuilder.Len())) + client.reporter.IncrTotalRows(int64(len(events) - dropped)) + return nil, nil } diff --git a/extension/beats/doris/config.go b/extension/beats/doris/config.go index 6db30fdbbe283e..330820e85f3777 100644 --- a/extension/beats/doris/config.go +++ b/extension/beats/doris/config.go @@ -29,14 +29,15 @@ import ( ) type config struct { - Hosts []string `config:"fenodes" validate:"required"` - User string `config:"user"` - Password string `config:"password"` - Database string `config:"database" validate:"required"` - Table string `config:"table" validate:"required"` - LabelPrefix string `config:"label_prefix"` - LineDelimiter string `config:"line_delimiter"` - logRequest bool `config:"log_request"` + Hosts []string `config:"fenodes" validate:"required"` + User string `config:"user"` + Password string `config:"password"` + Database string `config:"database" validate:"required"` + Table string `config:"table" validate:"required"` + LabelPrefix string `config:"label_prefix"` + LineDelimiter string `config:"line_delimiter"` + LogRequest bool `config:"log_request"` + LogProgressInterval int `config:"log_progress_interval"` Headers map[string]string `config:"headers"` @@ -55,10 +56,11 @@ type backoff struct { func defaultConfig() config { return config{ - Password: "", - LabelPrefix: "doris_beats", - LineDelimiter: "\n", - logRequest: true, + Password: "", + LabelPrefix: "doris_beats", + LineDelimiter: "\n", + LogRequest: true, + LogProgressInterval: 10, BulkMaxSize: 100000, MaxRetries: 3, diff --git a/extension/beats/doris/doris.go b/extension/beats/doris/doris.go index d6066b91c8823b..ef34d40fdead10 100644 --- a/extension/beats/doris/doris.go +++ b/extension/beats/doris/doris.go @@ -57,6 +57,8 @@ func makeDoris( return outputs.Fail(err) } + reporter := NewProgressReporter(config.LogProgressInterval, logger) + clients := make([]outputs.NetworkClient, len(config.Hosts)) for i, host := range config.Hosts { logger.Info("Making client for host: " + host) @@ -81,13 +83,14 @@ func makeDoris( Headers: config.createHeaders(), Codec: codec, LineDelimiter: config.LineDelimiter, - LogRequest: config.logRequest, + LogRequest: config.LogRequest, LabelPrefix: config.LabelPrefix, Database: config.Database, Table: config.Table, - Logger: logger, + Reporter: reporter, + Logger: logger, }) if err != nil { return outputs.Fail(err) @@ -102,7 +105,9 @@ func makeDoris( } else { retry = config.MaxRetries } - logger.Infof("config.MaxRetries: %v, retry: %v", config.MaxRetries, retry) + + go reporter.Report() + return outputs.SuccessNet(true, config.BulkMaxSize, retry, clients) } diff --git a/extension/beats/go.mod b/extension/beats/go.mod index 2935cb33cf3f37..99f7365f592333 100644 --- a/extension/beats/go.mod +++ b/extension/beats/go.mod @@ -65,6 +65,7 @@ require ( github.com/google/flatbuffers v1.12.1 // indirect github.com/google/go-cmp v0.5.6 // indirect github.com/google/gofuzz v1.1.0 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/googleapis/gnostic v0.4.1 // indirect github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 // indirect github.com/gorilla/websocket v1.4.2 // indirect diff --git a/extension/beats/go.sum b/extension/beats/go.sum index 0c17d71a6c6ec4..29277236cae7e4 100644 --- a/extension/beats/go.sum +++ b/extension/beats/go.sum @@ -467,6 +467,8 @@ github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/gnostic v0.4.1 h1:DLJCy1n/vrD4HPjOvYcT8aYQXpPIzoRZONaYwyycI+I= From 7bf7f359977d7d4066246180d632f49c28d57a7a Mon Sep 17 00:00:00 2001 From: Kang Date: Wed, 17 Jul 2024 10:16:47 +0800 Subject: [PATCH 5/7] make filebeat logstash config name consistent --- extension/beats/doris/config.go | 12 ++++++++---- extension/logstash/lib/logstash/outputs/doris.rb | 14 +++++++------- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/extension/beats/doris/config.go b/extension/beats/doris/config.go index 330820e85f3777..46d772bb661fc3 100644 --- a/extension/beats/doris/config.go +++ b/extension/beats/doris/config.go @@ -29,8 +29,9 @@ import ( ) type config struct { - Hosts []string `config:"fenodes" validate:"required"` - User string `config:"user"` + Hosts []string `config:"fenodes"` + HttpHosts []string `config:"http_hosts"` + User string `config:"user" validate:"required"` Password string `config:"password"` Database string `config:"database" validate:"required"` Table string `config:"table" validate:"required"` @@ -57,13 +58,13 @@ type backoff struct { func defaultConfig() config { return config{ Password: "", - LabelPrefix: "doris_beats", + LabelPrefix: "beats", LineDelimiter: "\n", LogRequest: true, LogProgressInterval: 10, BulkMaxSize: 100000, - MaxRetries: 3, + MaxRetries: -1, Backoff: backoff{ Init: 1 * time.Second, Max: 60 * time.Second, @@ -72,6 +73,9 @@ func defaultConfig() config { } func (c *config) Validate() error { + if len(c.HttpHosts) != 0 { + c.Hosts = HttpHosts + } if len(c.Hosts) == 0 { return errors.New("no http_hosts configured") } diff --git a/extension/logstash/lib/logstash/outputs/doris.rb b/extension/logstash/lib/logstash/outputs/doris.rb index 34124f446bbb87..c52296bd10c0e7 100644 --- a/extension/logstash/lib/logstash/outputs/doris.rb +++ b/extension/logstash/lib/logstash/outputs/doris.rb @@ -69,11 +69,11 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base config :host_resolve_ttl_sec, :validate => :number, :default => 120 - config :retry_count, :validate => :number, :default => -1 + config :max_retries, :validate => :number, :default => -1 - config :log_request, :validate => :boolean, :default => false + config :log_request, :validate => :boolean, :default => true - config :log_speed_interval, :validate => :number, :default => 10 + config :log_progress_interval, :validate => :number, :default => 10 def print_plugin_info() @@ -107,9 +107,9 @@ def register last_time = @init_time last_bytes = @total_bytes.get last_rows = @total_rows.get - @logger.info("will report speed every #{@log_speed_interval} seconds") - while @log_speed_interval > 0 - sleep(@log_speed_interval) + @logger.info("will report speed every #{@log_progress_interval} seconds") + while @log_progress_interval > 0 + sleep(@log_progress_interval) cur_time = Time.now.to_i # seconds cur_bytes = @total_bytes.get @@ -210,7 +210,7 @@ def send_events(events) else @logger.warn("FAILED doris stream load response:\n#{response}") - if @retry_count >= 0 && req_count > @retry_count + if @max_retries >= 0 && req_count > @max_retries @logger.warn("DROP this batch after failed #{req_count} times.") if @save_on_failure @logger.warn("Try save to disk.Disk file path : #{save_dir}/#{table}_#{save_file}") From febeda9e8854569026301a240c2531bff9aaaab9 Mon Sep 17 00:00:00 2001 From: Kang Date: Sat, 3 Aug 2024 00:37:35 +0800 Subject: [PATCH 6/7] fix --- extension/beats/doris/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extension/beats/doris/config.go b/extension/beats/doris/config.go index 46d772bb661fc3..7dd9a921174c9e 100644 --- a/extension/beats/doris/config.go +++ b/extension/beats/doris/config.go @@ -74,7 +74,7 @@ func defaultConfig() config { func (c *config) Validate() error { if len(c.HttpHosts) != 0 { - c.Hosts = HttpHosts + c.Hosts = c.HttpHosts } if len(c.Hosts) == 0 { return errors.New("no http_hosts configured") From f1e39c21ac560780c87feabb56be9a809bf84a93 Mon Sep 17 00:00:00 2001 From: Kang Date: Thu, 5 Sep 2024 22:02:37 +0800 Subject: [PATCH 7/7] fix group_commit error --- extension/beats/doris/client.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/extension/beats/doris/client.go b/extension/beats/doris/client.go index 54e6c9a7e2257f..d9a88f0a4f830e 100644 --- a/extension/beats/doris/client.go +++ b/extension/beats/doris/client.go @@ -238,10 +238,17 @@ func (client *client) publishEvents(lable string, events []publisher.Event) ([]p return events, requestErr } - request.Header.Set("label", lable) + var groupCommit bool = false for k, v := range client.headers { request.Header.Set(k, v) + if k == "group_commit" && v != "off_mode" { + groupCommit = true + } + } + if !groupCommit { + request.Header.Set("label", lable) } + response, responseErr := client.httpClient.Do(request) if responseErr != nil { client.logger.Errorf("Failed to stream-load request: %v", responseErr)