From a9dd40fb901249354e449efd1ca8fe13d170dd5d Mon Sep 17 00:00:00 2001 From: Michael Beaumont Date: Wed, 21 Jul 2021 09:27:24 +0200 Subject: [PATCH] Allow users to cancel requests --- ocpp1.6/charge_point.go | 5 +-- ocpp1.6/v16.go | 3 +- ocpp1.6_test/ocpp16_test.go | 3 +- ocpp2.0.1/charging_station.go | 5 +-- ocpp2.0.1/v2.go | 3 +- ocpp2.0.1_test/ocpp2_test.go | 3 +- ocppj/client.go | 9 ++++-- ocppj/dispatcher.go | 59 ++++++++++++++++++++++++----------- ocppj/queue.go | 2 ++ ocppj/server.go | 3 +- 10 files changed, 66 insertions(+), 29 deletions(-) diff --git a/ocpp1.6/charge_point.go b/ocpp1.6/charge_point.go index c30fd0b0..a61ef812 100644 --- a/ocpp1.6/charge_point.go +++ b/ocpp1.6/charge_point.go @@ -1,6 +1,7 @@ package ocpp16 import ( + "context" "fmt" "github.com/lorenzodonini/ocpp-go/internal/callbackqueue" @@ -238,7 +239,7 @@ func (cp *chargePoint) SendRequest(request ocpp.Request) (ocpp.Response, error) } } -func (cp *chargePoint) SendRequestAsync(request ocpp.Request, callback func(confirmation ocpp.Response, err error)) error { +func (cp *chargePoint) SendRequestAsync(ctx context.Context, request ocpp.Request, callback func(confirmation ocpp.Response, err error)) error { featureName := request.GetFeatureName() if _, found := cp.client.GetProfileForFeature(featureName); !found { return fmt.Errorf("feature %v is unsupported on charge point (missing profile), cannot send request", featureName) @@ -252,7 +253,7 @@ func (cp *chargePoint) SendRequestAsync(request ocpp.Request, callback func(conf } // Response will be retrieved asynchronously via asyncHandler send := func() error { - return cp.client.SendRequest(request) + return cp.client.SendRequestCtx(ctx, request) } err := cp.callbacks.TryQueue("main", send, callback) return err diff --git a/ocpp1.6/v16.go b/ocpp1.6/v16.go index 2382d2ad..d0e56ed0 100644 --- a/ocpp1.6/v16.go +++ b/ocpp1.6/v16.go @@ -2,6 +2,7 @@ package ocpp16 import ( + "context" "crypto/tls" "github.com/gorilla/websocket" @@ -86,7 +87,7 @@ type ChargePoint interface { // The central system will respond with a confirmation messages, or with an error if the request was invalid or could not be processed. // This result is propagated via a callback, called asynchronously. // In case of network issues (i.e. the remote host couldn't be reached), the function returns an error directly. In this case, the callback is never called. - SendRequestAsync(request ocpp.Request, callback func(confirmation ocpp.Response, protoError error)) error + SendRequestAsync(ctx context.Context, request ocpp.Request, callback func(confirmation ocpp.Response, protoError error)) error // Connects to the central system and starts the charge point routine. // The function doesn't block and returns right away, after having attempted to open a connection to the central system. // If the connection couldn't be opened, an error is returned. diff --git a/ocpp1.6_test/ocpp16_test.go b/ocpp1.6_test/ocpp16_test.go index 3b4d0e2b..a44b6f7c 100644 --- a/ocpp1.6_test/ocpp16_test.go +++ b/ocpp1.6_test/ocpp16_test.go @@ -1,6 +1,7 @@ package ocpp16_test import ( + "context" "crypto/tls" "fmt" "reflect" @@ -550,7 +551,7 @@ func testUnsupportedRequestFromChargePoint(suite *OcppV16TestSuite, request ocpp err := suite.chargePoint.Start(wsUrl) assert.Nil(t, err) // Run request test, expecting an error - err = suite.chargePoint.SendRequestAsync(request, func(confirmation ocpp.Response, err error) { + err = suite.chargePoint.SendRequestAsync(context.Background(), request, func(confirmation ocpp.Response, err error) { t.Fail() }) assert.Error(t, err) diff --git a/ocpp2.0.1/charging_station.go b/ocpp2.0.1/charging_station.go index fd118df0..e9d68d1b 100644 --- a/ocpp2.0.1/charging_station.go +++ b/ocpp2.0.1/charging_station.go @@ -1,6 +1,7 @@ package ocpp2 import ( + "context" "fmt" "github.com/lorenzodonini/ocpp-go/internal/callbackqueue" @@ -489,7 +490,7 @@ func (cs *chargingStation) SendRequest(request ocpp.Request) (ocpp.Response, err return asyncResult.r, asyncResult.e } -func (cs *chargingStation) SendRequestAsync(request ocpp.Request, callback func(response ocpp.Response, err error)) error { +func (cs *chargingStation) SendRequestAsync(ctx context.Context, request ocpp.Request, callback func(response ocpp.Response, err error)) error { featureName := request.GetFeatureName() if _, found := cs.client.GetProfileForFeature(featureName); !found { return fmt.Errorf("feature %v is unsupported on charging station (missing profile), cannot send request", featureName) @@ -526,7 +527,7 @@ func (cs *chargingStation) SendRequestAsync(request ocpp.Request, callback func( } // Response will be retrieved asynchronously via asyncHandler send := func() error { - return cs.client.SendRequest(request) + return cs.client.SendRequestCtx(ctx, request) } err := cs.callbacks.TryQueue("main", send, callback) return err diff --git a/ocpp2.0.1/v2.go b/ocpp2.0.1/v2.go index 6b6c30c7..2ba1718a 100644 --- a/ocpp2.0.1/v2.go +++ b/ocpp2.0.1/v2.go @@ -2,6 +2,7 @@ package ocpp2 import ( + "context" "crypto/tls" "github.com/gorilla/websocket" @@ -150,7 +151,7 @@ type ChargingStation interface { // This result is propagated via a callback, called asynchronously. // // In case of network issues (i.e. the remote host couldn't be reached), the function returns an error directly. In this case, the callback is never invoked. - SendRequestAsync(request ocpp.Request, callback func(confirmation ocpp.Response, protoError error)) error + SendRequestAsync(ctx context.Context, request ocpp.Request, callback func(confirmation ocpp.Response, protoError error)) error // Connects to the CSMS and starts the charging station routine. // The function doesn't block and returns right away, after having attempted to open a connection to the CSMS. // If the connection couldn't be opened, an error is returned. diff --git a/ocpp2.0.1_test/ocpp2_test.go b/ocpp2.0.1_test/ocpp2_test.go index fc8cf317..a72f2cb2 100644 --- a/ocpp2.0.1_test/ocpp2_test.go +++ b/ocpp2.0.1_test/ocpp2_test.go @@ -1,6 +1,7 @@ package ocpp2_test import ( + "context" "crypto/tls" "fmt" "reflect" @@ -984,7 +985,7 @@ func testUnsupportedRequestFromChargingStation(suite *OcppV2TestSuite, request o err := suite.chargingStation.Start(wsUrl) require.Nil(t, err) // Run request test - err = suite.chargingStation.SendRequestAsync(request, func(confirmation ocpp.Response, err error) { + err = suite.chargingStation.SendRequestAsync(context.Background(), request, func(confirmation ocpp.Response, err error) { t.Fail() }) require.Error(t, err) diff --git a/ocppj/client.go b/ocppj/client.go index 08ea779f..1e02394d 100644 --- a/ocppj/client.go +++ b/ocppj/client.go @@ -1,6 +1,7 @@ package ocppj import ( + "context" "fmt" "github.com/lorenzodonini/ocpp-go/ocpp" @@ -108,7 +109,7 @@ func (c *Client) Stop() { // - the endpoint doesn't support the feature // // - the output queue is full -func (c *Client) SendRequest(request ocpp.Request) error { +func (c *Client) SendRequestCtx(ctx context.Context, request ocpp.Request) error { if !c.dispatcher.IsRunning() { return fmt.Errorf("ocppj client is not started, couldn't send request") } @@ -125,7 +126,7 @@ func (c *Client) SendRequest(request ocpp.Request) error { return err } // Message will be processed by dispatcher. A dedicated mechanism allows to delegate the message queue handling. - if err := c.dispatcher.SendRequest(RequestBundle{Call: call, Data: jsonMessage}); err != nil { + if err := c.dispatcher.SendRequest(RequestBundle{Call: call, Data: jsonMessage, Ctx: ctx}); err != nil { log.Errorf("request %v - %v: %v", call.UniqueId, call.Action, err) return err } @@ -133,6 +134,10 @@ func (c *Client) SendRequest(request ocpp.Request) error { return nil } +func (c *Client) SendRequest(request ocpp.Request) error { + return c.SendRequestCtx(context.Background(), request) +} + // Sends an OCPP Response to the server. // The requestID parameter is required and identifies the previously received request. // diff --git a/ocppj/dispatcher.go b/ocppj/dispatcher.go index 1bb160db..b047d798 100644 --- a/ocppj/dispatcher.go +++ b/ocppj/dispatcher.go @@ -1,6 +1,7 @@ package ocppj import ( + "context" "fmt" "sync" "time" @@ -168,6 +169,20 @@ func (d *DefaultClientDispatcher) SendRequest(req RequestBundle) error { func (d *DefaultClientDispatcher) messagePump() { rdy := true // Ready to transmit at the beginning + cancelled := func() { + if d.pendingRequestState.HasPendingRequest() { + // Current request timed out. Removing request and triggering cancel callback + el := d.requestQueue.Peek() + bundle, _ := el.(RequestBundle) + d.CompleteRequest(bundle.Call.UniqueId) + if d.onRequestCancel != nil { + d.onRequestCancel(bundle.Call.UniqueId, bundle.Call.Action, bundle.Call.Payload) + } + } + // No request is currently pending -> set timer to high number + d.timer.Reset(defaultTimeoutTick) + } + var reqContextDone <-chan struct{} for { select { case _, ok := <-d.requestChannel: @@ -177,22 +192,15 @@ func (d *DefaultClientDispatcher) messagePump() { d.requestChannel = nil return } + case <-reqContextDone: + // user cancelled request + cancelled() case _, ok := <-d.timer.C: // Timeout elapsed if !ok { continue } - if d.pendingRequestState.HasPendingRequest() { - // Current request timed out. Removing request and triggering cancel callback - el := d.requestQueue.Peek() - bundle, _ := el.(RequestBundle) - d.CompleteRequest(bundle.Call.UniqueId) - if d.onRequestCancel != nil { - d.onRequestCancel(bundle.Call.UniqueId, bundle.Call.Action, bundle.Call.Payload) - } - } - // No request is currently pending -> set timer to high number - d.timer.Reset(defaultTimeoutTick) + cancelled() case rdy = <-d.readyForDispatch: // Ready flag set, keep going } @@ -206,22 +214,36 @@ func (d *DefaultClientDispatcher) messagePump() { } // Only dispatch request if able to send and request queue isn't empty if rdy && !d.requestQueue.IsEmpty() { - d.dispatchNextRequest() - rdy = false - // Set timer - if !d.timer.Stop() { - <-d.timer.C + if done, ok := d.dispatchNextRequest(); ok { + rdy = false + // Set timer + if !d.timer.Stop() { + <-d.timer.C + } + d.timer.Reset(d.timeout) + reqContextDone = done } - d.timer.Reset(d.timeout) } } } -func (d *DefaultClientDispatcher) dispatchNextRequest() { +func (d *DefaultClientDispatcher) dispatchNextRequest() (<-chan struct{}, bool) { // Get first element in queue el := d.requestQueue.Peek() bundle, _ := el.(RequestBundle) jsonMessage := bundle.Data + ctx := bundle.Ctx + if ctx == nil { + ctx = context.Background() + } + select { + case <-ctx.Done(): + if d.onRequestCancel != nil { + d.onRequestCancel(bundle.Call.UniqueId, bundle.Call.Action, bundle.Call.Payload) + } + return nil, false + default: + } d.pendingRequestState.AddPendingRequest(bundle.Call.UniqueId, bundle.Call.Payload) // Attempt to send over network err := d.network.Write(jsonMessage) @@ -232,6 +254,7 @@ func (d *DefaultClientDispatcher) dispatchNextRequest() { d.onRequestCancel(bundle.Call.UniqueId, bundle.Call.Action, bundle.Call.Payload) } } + return ctx.Done(), true } func (d *DefaultClientDispatcher) Pause() { diff --git a/ocppj/queue.go b/ocppj/queue.go index c4f004a7..b402b8c6 100644 --- a/ocppj/queue.go +++ b/ocppj/queue.go @@ -1,6 +1,7 @@ package ocppj import ( + "context" "fmt" "sync" ) @@ -10,6 +11,7 @@ import ( type RequestBundle struct { Call *Call Data []byte + Ctx context.Context } // RequestQueue can be arbitrarily implemented, as long as it conforms to the Queue interface. diff --git a/ocppj/server.go b/ocppj/server.go index d2d365e2..41e306f3 100644 --- a/ocppj/server.go +++ b/ocppj/server.go @@ -1,6 +1,7 @@ package ocppj import ( + "context" "fmt" "github.com/lorenzodonini/ocpp-go/ocpp" @@ -137,7 +138,7 @@ func (s *Server) SendRequest(clientID string, request ocpp.Request) error { return err } // Will not send right away. Queuing message and let it be processed by dedicated requestPump routine - if err := s.dispatcher.SendRequest(clientID, RequestBundle{call, jsonMessage}); err != nil { + if err := s.dispatcher.SendRequest(clientID, RequestBundle{call, jsonMessage, context.Background()}); err != nil { log.Errorf("request %v - %v for client %v: %v", call.UniqueId, call.Action, clientID, err) return err }