Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

setElementConverter for HttpSinkRequestEntry is deprecated ,what is alternate method or code we need to use #61

Closed
someshwar1 opened this issue Jul 10, 2023 · 7 comments
Assignees
Labels
question Further information is requested

Comments

@someshwar1
Copy link

@deprecated
@PublicEvolving
public HttpSinkBuilder setElementConverter(ElementConverter<InputT, HttpSinkRequestEntry> elementConverter) {
this.elementConverter = elementConverter;
return this;
}

code :
HttpSink.builder()
.setEndpointUrl("http://example.com/myendpoint")
.setElementConverter(
(s, _context) -> new HttpSinkRequestEntry("POST", s.getBytes(StandardCharsets.UTF_8)))
.setProperty("gid.connector.http.sink.header.X-Content-Type-Options", "nosniff")
.build();

@kristoffSC
Copy link
Collaborator

kristoffSC commented Jul 10, 2023

Hi,
thanks for asking. Please use setElementConverter(SchemaLifecycleAwareElementConverter) instead.

SchemaLifecycleAwareElementConverter extends ElementConverter interface by adding open method to the API.

The info you are looking for is included in java dock of the deprecated method
image

I see that README and the examples were not updated... sorry :)

I've created an issue for this:
#62

@kristoffSC kristoffSC self-assigned this Jul 10, 2023
@kristoffSC kristoffSC added the question Further information is requested label Jul 10, 2023
@someshwar1
Copy link
Author

image
after updating i am getting compilation issue , added dependencies
org.apache.flink.flink-java
org.apache.flink.flink-clients
org.apache.flink.flink-connector-base

can you please suggest

@kristoffSC
Copy link
Collaborator

kristoffSC commented Jul 11, 2023

The SchemeLifecycleElementConverter is not a functional interface like ElementConverter was, hence you cannot use lambda expression to represent it.

What you can do is implement the interface and pass its instance or use anonymous class like so:

private final SchemaLifecycleAwareElementConverter<String, HttpSinkRequestEntry>
        ELEMENT_CONVERTER_V2 = new SchemaLifecycleAwareElementConverter<>() {
            @Override
            public void open(InitContext context) {
                //noOp
            }

            @Override
            public HttpSinkRequestEntry apply(String element, Context context) {
                return new HttpSinkRequestEntry("POST", element.getBytes(StandardCharsets.UTF_8));
            }
        };

   HttpSink.<String>builder().setElementConverter(ELEMENT_CONVERTER_V2)...

The open method is called when SinkWriter is created.

@someshwar1
Copy link
Author

someshwar1 commented Jul 11, 2023

Thank @kristoffSC for suggestion, i have tried it out getting serialization issue

Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not serialize object for key serializedUDF.

Exception in thread "main" org.apache.flink.util.FlinkRuntimeException: Error in serialization.
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:326)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:160)
at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:1024)
at org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:56)
at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:61)
at org.apache.flink.client.deployment.executors.LocalExecutor.getJobGraph(LocalExecutor.java:105)
at org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:82)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2197)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2084)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:68)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2058)

@kristoffSC
Copy link
Collaborator

Hi @someshwar1
sorry but I'm sure if this is caused by the http connector nor API changes we did.

Please post a job example that reproduce this issue so we could narrow it down.

@someshwar1
Copy link
Author

someshwar1 commented Jul 12, 2023

functional interf

found issue
instead implement SchemaLifecycleAwareElementConverter in same class created new class and call as shown below fixed the issue

public class ElementConverterCustom implements SchemaLifecycleAwareElementConverter<String, HttpSinkRequestEntry>{
    @Override
    public void open(InitContext context) {
      //noOp
    }

    @Override
    public HttpSinkRequestEntry apply(String element, Context context) {
      return new HttpSinkRequestEntry("GET", element.getBytes(StandardCharsets.UTF_8));
    }
  }
var httpSink = HttpSink.<String>builder()
    .setEndpointUrl("("http://example.com/myendpoint")
    .setElementConverter(
       new  ElementConverterCustom())
    .setSinkHttpClientBuilder(JavaNetSinkHttpClient::new)
    .setProperty(
        HttpConnectorConfigConstants.SINK_HEADER_PREFIX + "Content-Type",
        contentTypeHeader)
           .build();

@kristoffSC
Copy link
Collaborator

kristoffSC commented Jul 12, 2023

Glad that worked, seems like straightforward Java <-> Flink case :)

instead implement SchemaLifecycleAwareElementConverter in same class

This will work but your "new inner class" must be declared static. If its not static then it holds an reference to the "outer" class which is not serializable.

Also if you use a reference to static Anonymous class this will also work if defined in the same class.

Anyways the options are:

  1. use static class or anonymous class if you wish to define SchemaLifecycleAwareElementConverter as inner class
  2. implement SchemaLifecycleAwareElementConverter on a brand new class.

Ad 1:
This will work:

public class MyClass {
 private static class EC implements SchemaLifecycleAwareElementConverter<String, HttpSinkRequestEntry> {
  ....
 }
}

This will also work:

public class MyClass {
 
 private static final SchemaLifecycleAwareElementConverter<String, HttpSinkRequestEntry>
        EC = new SchemaLifecycleAwareElementConverter<>() {
         ....
        }
}

and those will not work, causing serialization exception:

public class MyClass {
  private class EC implements SchemaLifecycleAwareElementConverter<String, HttpSinkRequestEntry> {
  ....
 }
}
public class MyClass {
 
 private final SchemaLifecycleAwareElementConverter<String, HttpSinkRequestEntry>
        EC = new SchemaLifecycleAwareElementConverter<>() {
         ....
        }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants