package org.apache.inlong.manager.client.api.impl;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.manager.client.api.InlongStream;
import org.apache.inlong.manager.client.api.InlongStreamBuilder;
import org.apache.inlong.manager.client.api.inner.InnerGroupContext;
import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
import org.apache.inlong.manager.client.api.inner.InnerStreamContext;
import org.apache.inlong.manager.client.api.util.StreamTransformTransfer;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.source.StreamSource;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.manager.common.pojo.stream.StreamTransform;
import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
import org.apache.inlong.manager.common.util.JsonUtils;

/* loaded from: input_file:org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.class */
public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
    private final InlongStreamImpl inlongStream;
    private final InnerStreamContext streamContext;
    private final InnerInlongManagerClient managerClient;

    public DefaultInlongStreamBuilder(InlongStreamInfo inlongStreamInfo, InnerGroupContext innerGroupContext, InnerInlongManagerClient innerInlongManagerClient) {
        this.managerClient = innerInlongManagerClient;
        if (MapUtils.isEmpty(innerGroupContext.getStreamContextMap())) {
            innerGroupContext.setStreamContextMap(Maps.newHashMap());
        }
        InlongGroupInfo groupInfo = innerGroupContext.getGroupInfo();
        String inlongGroupId = groupInfo.getInlongGroupId();
        inlongStreamInfo.setInlongGroupId(inlongGroupId);
        inlongStreamInfo.setCreator(groupInfo.getCreator());
        InnerStreamContext innerStreamContext = new InnerStreamContext(inlongStreamInfo);
        innerGroupContext.setStreamContext(innerStreamContext);
        this.streamContext = innerStreamContext;
        this.inlongStream = new InlongStreamImpl(inlongGroupId, inlongStreamInfo.getInlongStreamId(), innerInlongManagerClient);
        if (CollectionUtils.isNotEmpty(inlongStreamInfo.getFieldList())) {
            this.inlongStream.setStreamFields(inlongStreamInfo.getFieldList());
        }
        innerGroupContext.setStream(this.inlongStream);
    }

    @Override // org.apache.inlong.manager.client.api.InlongStreamBuilder
    public InlongStreamBuilder source(StreamSource streamSource) {
        InlongStreamInfo streamInfo = this.streamContext.getStreamInfo();
        streamSource.setInlongGroupId(streamInfo.getInlongGroupId());
        streamSource.setInlongStreamId(streamInfo.getInlongStreamId());
        this.inlongStream.addSource(streamSource);
        this.streamContext.setSourceRequest(streamSource.genSourceRequest());
        return this;
    }

    @Override // org.apache.inlong.manager.client.api.InlongStreamBuilder
    public InlongStreamBuilder sink(StreamSink streamSink) {
        InlongStreamInfo streamInfo = this.streamContext.getStreamInfo();
        streamSink.setInlongGroupId(streamInfo.getInlongGroupId());
        streamSink.setInlongStreamId(streamInfo.getInlongStreamId());
        this.inlongStream.addSink(streamSink);
        this.streamContext.setSinkRequest(streamSink.genSinkRequest());
        return this;
    }

    @Override // org.apache.inlong.manager.client.api.InlongStreamBuilder
    public InlongStreamBuilder fields(List<StreamField> list) {
        this.inlongStream.setStreamFields(list);
        this.streamContext.updateStreamFields(list);
        return this;
    }

    @Override // org.apache.inlong.manager.client.api.InlongStreamBuilder
    public InlongStreamBuilder transform(StreamTransform streamTransform) {
        this.inlongStream.addTransform(streamTransform);
        this.streamContext.setTransformRequest(StreamTransformTransfer.createTransformRequest(streamTransform, this.streamContext.getStreamInfo()));
        return this;
    }

    @Override // org.apache.inlong.manager.client.api.InlongStreamBuilder
    public InlongStream init() {
        InlongStreamInfo streamInfo = this.streamContext.getStreamInfo();
        streamInfo.setExtParams(JsonUtils.toJsonString(this.inlongStream.createPipeline()));
        streamInfo.setId(this.managerClient.createStreamInfo(streamInfo));
        for (SourceRequest sourceRequest : Lists.newArrayList(this.streamContext.getSourceRequests().values())) {
            sourceRequest.setId(this.managerClient.createSource(sourceRequest));
        }
        for (SinkRequest sinkRequest : Lists.newArrayList(this.streamContext.getSinkRequests().values())) {
            sinkRequest.setId(this.managerClient.createSink(sinkRequest));
        }
        for (TransformRequest transformRequest : Lists.newArrayList(this.streamContext.getTransformRequests().values())) {
            transformRequest.setId(this.managerClient.createTransform(transformRequest).intValue());
        }
        return this.inlongStream;
    }

    @Override // org.apache.inlong.manager.client.api.InlongStreamBuilder
    public InlongStream initOrUpdate() {
        InlongStreamInfo streamInfo = this.streamContext.getStreamInfo();
        streamInfo.setExtParams(JsonUtils.toJsonString(this.inlongStream.createPipeline()));
        if (!this.managerClient.isStreamExists(streamInfo).booleanValue()) {
            return init();
        }
        Pair<Boolean, String> updateStreamInfo = this.managerClient.updateStreamInfo(streamInfo);
        if (!((Boolean) updateStreamInfo.getKey()).booleanValue()) {
            throw new RuntimeException(String.format("Update data stream failed:%s", updateStreamInfo.getValue()));
        }
        initOrUpdateTransform();
        initOrUpdateSource();
        initOrUpdateSink();
        return this.inlongStream;
    }

    private void initOrUpdateTransform() {
        Map<String, TransformRequest> transformRequests = this.streamContext.getTransformRequests();
        InlongStreamInfo streamInfo = this.streamContext.getStreamInfo();
        List<TransformResponse> listTransform = this.managerClient.listTransform(streamInfo.getInlongGroupId(), streamInfo.getInlongStreamId());
        ArrayList newArrayList = Lists.newArrayList();
        for (TransformResponse transformResponse : listTransform) {
            StreamTransform parseStreamTransform = StreamTransformTransfer.parseStreamTransform(transformResponse);
            String transformName = parseStreamTransform.getTransformName();
            int id = transformResponse.getId();
            if (transformRequests.get(transformName) == null) {
                TransformRequest createTransformRequest = StreamTransformTransfer.createTransformRequest(parseStreamTransform, streamInfo);
                if (!this.managerClient.deleteTransform(createTransformRequest)) {
                    throw new RuntimeException(String.format("Delete transform=%s failed", createTransformRequest));
                }
            } else {
                TransformRequest transformRequest = transformRequests.get(transformName);
                transformRequest.setId(id);
                Pair<Boolean, String> updateTransform = this.managerClient.updateTransform(transformRequest);
                if (!((Boolean) updateTransform.getKey()).booleanValue()) {
                    throw new RuntimeException(String.format("Update transform=%s failed with err=%s", transformRequest, updateTransform.getValue()));
                }
                transformRequest.setId(transformResponse.getId());
                newArrayList.add(transformName);
            }
        }
        for (Map.Entry<String, TransformRequest> entry : transformRequests.entrySet()) {
            if (!newArrayList.contains(entry.getKey())) {
                TransformRequest value = entry.getValue();
                value.setId(this.managerClient.createTransform(value).intValue());
            }
        }
    }

    private void initOrUpdateSource() {
        Map<String, SourceRequest> sourceRequests = this.streamContext.getSourceRequests();
        InlongStreamInfo streamInfo = this.streamContext.getStreamInfo();
        List<SourceListResponse> listSources = this.managerClient.listSources(streamInfo.getInlongGroupId(), streamInfo.getInlongStreamId());
        ArrayList newArrayList = Lists.newArrayList();
        for (SourceListResponse sourceListResponse : listSources) {
            String sourceName = sourceListResponse.getSourceName();
            int intValue = sourceListResponse.getId().intValue();
            if (sourceRequests.get(sourceName) != null) {
                SourceRequest sourceRequest = sourceRequests.get(sourceName);
                sourceRequest.setId(Integer.valueOf(intValue));
                Pair<Boolean, String> updateSource = this.managerClient.updateSource(sourceRequest);
                if (!((Boolean) updateSource.getKey()).booleanValue()) {
                    throw new RuntimeException(String.format("Update source=%s failed with err=%s", sourceRequest, updateSource.getValue()));
                }
                newArrayList.add(sourceName);
                sourceRequest.setId(sourceListResponse.getId());
            } else if (!this.managerClient.deleteSource(intValue)) {
                throw new RuntimeException(String.format("Delete source=%s failed", sourceListResponse));
            }
        }
        for (Map.Entry<String, SourceRequest> entry : sourceRequests.entrySet()) {
            if (!newArrayList.contains(entry.getKey())) {
                SourceRequest value = entry.getValue();
                value.setId(this.managerClient.createSource(value));
            }
        }
    }

    private void initOrUpdateSink() {
        Map<String, SinkRequest> sinkRequests = this.streamContext.getSinkRequests();
        InlongStreamInfo streamInfo = this.streamContext.getStreamInfo();
        List<SinkListResponse> listSinks = this.managerClient.listSinks(streamInfo.getInlongGroupId(), streamInfo.getInlongStreamId());
        ArrayList newArrayList = Lists.newArrayList();
        for (SinkListResponse sinkListResponse : listSinks) {
            String sinkName = sinkListResponse.getSinkName();
            int intValue = sinkListResponse.getId().intValue();
            if (sinkRequests.get(sinkName) != null) {
                SinkRequest sinkRequest = sinkRequests.get(sinkName);
                sinkRequest.setId(Integer.valueOf(intValue));
                Pair<Boolean, String> updateSink = this.managerClient.updateSink(sinkRequest);
                if (!((Boolean) updateSink.getKey()).booleanValue()) {
                    throw new RuntimeException(String.format("Update sink=%s failed with err=%s", sinkRequest, updateSink.getValue()));
                }
                newArrayList.add(sinkName);
                sinkRequest.setId(sinkListResponse.getId());
            } else if (!this.managerClient.deleteSink(intValue)) {
                throw new RuntimeException(String.format("Delete sink=%s failed", sinkListResponse));
            }
        }
        for (Map.Entry<String, SinkRequest> entry : sinkRequests.entrySet()) {
            if (!newArrayList.contains(entry.getKey())) {
                SinkRequest value = entry.getValue();
                value.setId(this.managerClient.createSink(value));
            }
        }
    }
}
