Apache Flink系列-①⑤应用模块IO组件Flink链接器

图图资源公众号引导关注.jpg
以上资料,免费领取,领取地址:https://vip.f6sj.com

源/接收器I/O模块允许您插入尚未集成到专用I/O模块中的现有或自定义Flink连接器。有关可用连接器的完整列表以及如何构建自己的连接器的详细信息,请参阅Apache Flink官方文档。连接器可以通过嵌入式模块插入运行时

依赖关系#

要使用自定义Flink连接器,请在pom中包含以下依赖项。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>statefun-flink-io</artifactId>
    <version>3.2.0</version>
</dependency>

源规范#

源函数规范从输出TypedValue的Flink源函数创建入口。此外,每个入口都需要一个路由器功能,该功能接收每个传入消息并将其路由到一个或多个功能实例。

package org.apache.flink.statefun.docs.io.flink;

import Java.util.Map;
import org.apache.flink.statefun.flink.io.datastream.SourceFunctionSpec;
import org.apache.flink.statefun.sdk.io.IngressIdentifier;
import org.apache.flink.statefun.sdk.io.IngressSpec;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;

public class ModuleWithSourceSpec implements StatefulFunctionModule {

    @Override
    public void configure(Map<String, String> globalConfiguration, Binder binder) {
        IngressIdentifier<TypedValue> id =
            new IngressIdentifier<>(TypedValue.class, "com.example", "custom-source");
        IngressSpec<TypedValue> spec = new SourceFunctionSpec<>(id, new FlinkSource<>());
        binder.bindIngress(spec);
        binder.bindIngressRouter(id, new CustomRouter());
    }
}

sink规格#

sink函数规范从使用TypedValue的Flink sink函数创建出口。

package org.apache.flink.statefun.docs.io.flink;

import java.util.Map;
import org.apache.flink.statefun.docs.models.User;
import org.apache.flink.statefun.flink.io.datastream.SinkFunctionSpec;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.apache.flink.statefun.sdk.io.EgressSpec;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;

public class ModuleWithSinkSpec implements StatefulFunctionModule {

    @Override
    public void configure(Map<String, String> globalConfiguration, Binder binder) {
        EgressIdentifier<TypedValue> id = new EgressIdentifier<>("com.example", "custom-sink", TypedValue.class);
        EgressSpec<TypedValue> spec = new SinkFunctionSpec<>(id, new FlinkSink<>());
        binder.bindEgress(spec);
    }
}

然后,函数使用通用的出口消息生成器向出口发送消息。

Java

import java.util.concurrent.CompletableFuture;
import org.apache.flink.statefun.sdk.java.Context;
import org.apache.flink.statefun.sdk.java.StatefulFunction;
import org.apache.flink.statefun.sdk.java.typename;
import org.apache.flink.statefun.sdk.java.ValueSpec;
import org.apache.flink.statefun.sdk.java.message.Message;
import org.apache.flink.statefun.sdk.java.message.EgressMessageBuilder;

public class GreeterFn implements StatefulFunction {

    static final TypeName TYPE = TypeName.typeNameFromString("com.example.fns/greeter");

    static final TypeName CUSTOM_EGRESS = TypeName.typeNameFromString("com.example/custom-sink");

    static final ValueSpec<Integer> SEEN = ValueSpec.named("seen").withIntType();

    @Override 
    CompletableFuture<Void> apply(Context context, Message message) {
        if (!message.is(User.TYPE)) {
            throw new IllegalStateException("Unknown type");
        }

        User user = message.as(User.TYPE);
        String name = user.getName();

        var storage = context.storage();
        var seen = storage.get(SEEN).orElse(0);
        storage.set(SEEN, seen + 1);

        context.send(
            EgressMessageBuilder.forEgress(CUSTOM_EGRESS)
                .withUtf8Value("Hello " + name + " for the " + seen + "th time!")
                .build());

        return context.done();
    }
}

Python

from statefun import *

functions = StatefulFunctions()


@functions.bind(
    typename='com.example.fns/greeter',
    specs=[ValueSpec(name='seen_count', type=IntType)])
async def greet(context, message):
    if not message.is_type(UserType):
        raise ValueError('Unknown type')

    user = message.as_type(UserType)
    name = user.name

    storage = context.storage
    seen = storage.seen_count or 0
    storage.seen_count = seen + 1

    context.send_egress(egress_message_builder(
        typename='com.example/custom-sink',
        value=f"Hello {name} for the {seen}th time!",
        value_type=StringType))

Golang

import (
	"fmt"
	"github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun"
    "time"
)

func (g *Greeter) Invoke(ctx statefun.Context, message: statefun.Message) error {
    if !message.Is(UserType) {
        return fmt.Errorf("unknown type %s", message.ValueTypeName())
    }

    var user User
    _ = user.As(UserType, &user)

    storage = context.Storage()

    var seen int32
    storage.Get(g.SeenCount, &seen)
    seen += 1
    storage.Set(g.SeenCount, seen)

	ctx.SendEgress(&statefun.GenericEgressBuilder{
		Target: statefun.TypeNameFrom("com.example/greets"),
		Value:  fmt.Sprintf("Hello %s for the %s-th time!", user.Name, count),
		ValueType: statefun.StringType,
	})

    return nil
}

Apache Flink系列-①⑤应用模块IO组件Flink链接器

网站右侧边栏广告位.jpg
本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 630371849@qq.com 举报,一经查实,本站将立刻删除。
如若转载,请注明出处:https://www.f6sj.com/1103.html