hongalex
Repos
39
Followers
34
Following
14

Google Cloud Client Libraries for Go.

0
0

Public interface definitions of Google APIs.

0
0

Sample apps and code written for Google Cloud in the Go programming language.

3661
1506

One-stop shop for all dice rolls, coin flips, and other random object generation

Node.js samples for Google Cloud Platform products.

2490
1781

Events

switch publish span to const

Created at 1 week ago
pubsub: extended processing appears to create memory leak

Client

PubSub

Environment

Docker on GKE

Go Environment

$ go version

go version go1.17.2 darwin/amd64

$ go env

GO111MODULE="on"
GOARCH="amd64"
GOBIN=""
GOCACHE="/Users/mike/Library/Caches/go-build"
GOENV="/Users/mike/Library/Application Support/go/env"
GOEXE=""
GOEXPERIMENT=""
GOFLAGS=""
GOHOSTARCH="amd64"
GOHOSTOS="darwin"
GOINSECURE=""
GOMODCACHE="/Users/mike/go/pkg/mod"
GONOPROXY=""
GONOSUMDB=""
GOOS="darwin"
GOPATH="/Users/mike/go"
GOPRIVATE=""
GOPROXY="https://proxy.golang.org,direct"
GOROOT="/Users/mike/go/go1.17.2"
GOSUMDB="sum.golang.org"
GOTMPDIR=""
GOTOOLDIR="/Users/mike/go/go1.17.2/pkg/tool/darwin_amd64"
GOVCS=""
GOVERSION="go1.17.2"
GCCGO="gccgo"
AR="ar"
CC="clang"
CXX="clang++"
CGO_ENABLED="1"
GOMOD="/Users/mike/Documents/event-handler/go.mod"
CGO_CFLAGS="-g -O2"
CGO_CPPFLAGS=""
CGO_CXXFLAGS="-g -O2"
CGO_FFLAGS="-g -O2"
CGO_LDFLAGS="-g -O2"
PKG_CONFIG="pkg-config"
GOGCCFLAGS="-fPIC -arch x86_64 -m64 -pthread -fno-caret-diagnostics -Qunused-arguments -fmessage-length=0 -fdebug-prefix-map=/var/folders/8l/0qhz2jl149bdfk71_vsx2t6r0000gn/T/go-build3621358332=/tmp/go-build -gno-record-gcc-switches -fno-common"

Code

package main

func main() {
	...
	sub := client.Subscription(config.subscription)
	sub.ReceiveSettings = pubsub.ReceiveSettings{
		MaxExtension:           2 * time.Minute,
		MaxExtensionPeriod:     10 * time.Second,
		MaxOutstandingMessages: config.maxBuffer,
		NumGoroutines:          10,
	}

	for {
		tctx, cancel := context.WithTimeout(ctx, 30*time.Minute)
		tctx = context.WithValue(tctx, key("cancel"), &cancel)

		err = sub.Receive(tctx, processOneEvent)
		cancelWithFatalTimeout(cancel, 30*time.Second)
		if err != nil {
			logger.Fatal("Error during Pub/Sub receive", zap.Error(err))
		}
	}
}

Expected behavior

Related to my question at #5858, what I expect to happen here is that when processing takes an extended amount of time, the message when finally complete (ack'd or nack'd) should be a no-op. It should then retry based on the subscription's backoff / retry settings.

Actual behavior

When the ReceiveSettings.MaxOutstandingMessages is very high (started at 20k), it appears to quickly allocate and use memory. In the graph above you can see a restart on 03/30/2022 where the memory almost immediately was around 8GB. This is already, well above the default ReceiveSettings.MaxOutstandingBytes value (not set). After lowering the ReceiveSettings.MaxOutstandingMessages value to 4500, it was restarted again on 04/05/2022 (again, chart above). This resulted in much less memory usage - only about 250MB initially. It started to grow again, indicating a leak.

Again, my working theory here is that some tasks are taking beyond the ReceiveSettings.MaxExtension value, and late Ack() or Nack() requests are causing objects to stick around.

Additional context

We are processing thousands of events per second, and some have network errors or other intermittent failures that can cause the processing to timeout. We're typically averaging 2-30 timeouts per second out of the thousands per second being successfully processed or marked as failure. For timeout and failures, we call Nack(). At the end of the processing func we Ack() the message like this:

func processOneEvent(ctx context.Context, m *pubsub.Message) {
...
	if m != nil {
		m.Ack()
	}
}

The nil check is because we have sometimes received nil items passed as messages - though it's pretty rare.

Created at 2 weeks ago
pubsub: extended processing appears to create memory leak

Feel free to comment again if this is still actual and we'll continue to look into this.

Created at 2 weeks ago
pubsub: ack called but message retried

Feel free to comment again if this is still actual and we'll continue to look into this.

Created at 2 weeks ago
pubsub: ack called but message retried

Client

PubSub

Environment

Alpine on GKE

Go Environment

$ go version 1.17.5 $ go env

Code

e.g.

ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
	return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

sub := client.Subscription(subID)

return sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
        fmt.Printf("Got message: %q\n", string(msg.Data))
        msg.Ack()
})

Expected behavior

Messages are ack and not retried

Actual behavior

Some messages are retried despite being acked almost instantly

Additional context

Subscriptions config:

Type: Pull
Retention: 10min
Expiry: 31 days
Ack deadline: 300s (it should not matter in pull mode anyway)
Retry Policy: exponential backoff from 1s to 30s

The problem appeared after a recent migration, the messages count might be a factor but we are far from what pubsub can support. The cpu usage of the pod is really low so I don't think that's an issue either.

We tried a lot of things but nothing worked. We also have 3 different deployment of this application on three different gcp projects and only one of them has this issue.

I am not sure where is the issue but if anyone has an idea I take it.

Created at 2 weeks ago
pubsub: ack handler should be settable for testing purposes

Is your feature request related to a problem? Please describe. Writing unit tests for a pubsub receiver is challenging because there is no way to determine if the message was acked or nacked. There are no fields or methods available to retrieve that information and the ack handler (ackh) is an unexported field with no available method to set it. Perhaps this data is available if you setup the test server, but that complexity shouldn't be required to simply test a receiver.

For example:

func (r *pubsubReceiver) Receive(ctx context.Context, msg *pubsub.Message) {
  if someLogic() {
    msg.Ack()
  } else {
    msg.Nack()
  }
}

There is no way to write a test to determine what happened (as far as I can tell).

Describe the solution you'd like I can see this being resolved in one of two ways:

  1. Provide an exported field or method on Message to determine if the message was acked or nacked. I suppose the challenge is differentiating between Ack() being called and the message actually being acked, but I don't think it would be that hard to make that clear.
  2. Provide a method to set your own handler for the ackh field, for testing purposes. See below for my example of what I'm doing now. If you go this route, you could also provide a simple mock handler.

Describe alternatives you've considered Previously mentioned are the two alternatives I think are suitable. See below for the hack I am currently employing.

Additional context To get around this issue, I employed this hack (slightly modified, as that is now outdated) to forcefully set the ackh field with my own implementation of AckHandler.

IE:

type AckStatus struct {
	acked  bool
	nacked bool
}

func (a *AckStatus) IsAcked() bool {
	return a.acked
}

func (a *AckStatus) IsNacked() bool {
	return a.nacked
}

func (a *AckStatus) OnAck() {
	a.acked = true
}

func (a *AckStatus) OnNack() {
	a.nacked = true
}

func (a *AckStatus) OnAckWithResult() *pubsub.AckResult {
	a.OnNack()
	return &pubsub.AckResult{}
}

func (a *AckStatus) OnNackWithResult() *pubsub.AckResult {
	a.OnNack()
	return &pubsub.AckResult{}
}
Created at 2 weeks ago
pubsub: ack handler should be settable for testing purposes

I also need this feature. This will also be useful for monitoring purpose (ex: exporting ack metrics).

We do have some metrics exporting available via OpenCensus right now, including ack count. We plan to move this implementation towards using OpenTelemetry once the metrics SDK is stabilized across languages.

Aside from that, the workarounds are listed in the comment above so I'll be closing the issue.

Created at 2 weeks ago