package org.apache.inlong.manager.client.api.impl;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.manager.client.api.InlongStream;
import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
import org.apache.inlong.manager.client.api.util.StreamTransformTransfer;
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.source.StreamSource;
import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.manager.common.pojo.stream.StreamNodeRelation;
import org.apache.inlong.manager.common.pojo.stream.StreamPipeline;
import org.apache.inlong.manager.common.pojo.stream.StreamTransform;
import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
import org.apache.inlong.manager.common.util.AssertUtils;
import org.apache.inlong.manager.common.util.JsonUtils;

/* loaded from: input_file:org/apache/inlong/manager/client/api/impl/InlongStreamImpl.class */
public class InlongStreamImpl implements InlongStream {
    private InnerInlongManagerClient managerClient;
    private String inlongGroupId;
    private String inlongStreamId;
    private Map<String, StreamSource> streamSources;
    private Map<String, StreamSink> streamSinks;
    private Map<String, StreamTransform> streamTransforms;
    private List<StreamField> streamFields;

    public InlongStreamImpl(FullStreamResponse fullStreamResponse, InnerInlongManagerClient innerInlongManagerClient) {
        this.streamSources = Maps.newHashMap();
        this.streamSinks = Maps.newHashMap();
        this.streamTransforms = Maps.newHashMap();
        this.streamFields = Lists.newArrayList();
        InlongStreamInfo streamInfo = fullStreamResponse.getStreamInfo();
        this.managerClient = innerInlongManagerClient;
        this.inlongGroupId = streamInfo.getInlongGroupId();
        this.inlongStreamId = streamInfo.getInlongStreamId();
        List fieldList = streamInfo.getFieldList();
        if (CollectionUtils.isNotEmpty(fieldList)) {
            this.streamFields = (List) fieldList.stream().map(streamField -> {
                return new StreamField(streamField.getId().intValue(), streamField.getFieldType(), streamField.getFieldName(), streamField.getFieldComment(), streamField.getFieldValue(), streamField.getIsMetaField(), streamField.getMetaFieldName(), streamField.getOriginNodeName());
            }).collect(Collectors.toList());
        }
        List sinkInfo = fullStreamResponse.getSinkInfo();
        if (CollectionUtils.isNotEmpty(sinkInfo)) {
            this.streamSinks = (Map) sinkInfo.stream().collect(Collectors.toMap((v0) -> {
                return v0.getSinkName();
            }, streamSink -> {
                return streamSink;
            }, (streamSink2, streamSink3) -> {
                throw new RuntimeException(String.format("duplicate sinkName:%s in stream:%s", streamSink2.getSinkName(), this.inlongStreamId));
            }));
        }
        List sourceInfo = fullStreamResponse.getSourceInfo();
        if (CollectionUtils.isNotEmpty(sourceInfo)) {
            this.streamSources = (Map) sourceInfo.stream().collect(Collectors.toMap((v0) -> {
                return v0.getSourceName();
            }, streamSource -> {
                return streamSource;
            }, (streamSource2, streamSource3) -> {
                throw new RuntimeException(String.format("duplicate sourceName: %s in streamId: %s", streamSource2.getSourceName(), this.inlongStreamId));
            }));
        }
    }

    public InlongStreamImpl(String str, String str2, InnerInlongManagerClient innerInlongManagerClient) {
        this.streamSources = Maps.newHashMap();
        this.streamSinks = Maps.newHashMap();
        this.streamTransforms = Maps.newHashMap();
        this.streamFields = Lists.newArrayList();
        this.managerClient = innerInlongManagerClient;
        this.inlongGroupId = str;
        this.inlongStreamId = str2;
    }

    @Override // org.apache.inlong.manager.client.api.InlongStream
    public List<StreamField> getStreamFields() {
        return this.streamFields;
    }

    @Override // org.apache.inlong.manager.client.api.InlongStream
    public Map<String, StreamSource> getSources() {
        return this.streamSources;
    }

    @Override // org.apache.inlong.manager.client.api.InlongStream
    public Map<String, StreamSink> getSinks() {
        return this.streamSinks;
    }

    @Override // org.apache.inlong.manager.client.api.InlongStream
    public Map<String, StreamTransform> getTransforms() {
        return this.streamTransforms;
    }

    @Override // org.apache.inlong.manager.client.api.InlongStream
    public InlongStream addSource(StreamSource streamSource) {
        AssertUtils.notNull(streamSource.getSourceName(), "Source name should not be empty");
        String sourceName = streamSource.getSourceName();
        if (this.streamSources.get(sourceName) != null) {
            throw new IllegalArgumentException(String.format("StreamSource=%s has already be set", streamSource));
        }
        this.streamSources.put(sourceName, streamSource);
        return this;
    }

    @Override // org.apache.inlong.manager.client.api.InlongStream
    public InlongStream addSink(StreamSink streamSink) {
        AssertUtils.notNull(streamSink.getSinkName(), "Sink name should not be empty");
        String sinkName = streamSink.getSinkName();
        if (this.streamSinks.get(sinkName) != null) {
            throw new IllegalArgumentException(String.format("StreamSink=%s has already be set", streamSink));
        }
        this.streamSinks.put(sinkName, streamSink);
        return this;
    }

    @Override // org.apache.inlong.manager.client.api.InlongStream
    public InlongStream addTransform(StreamTransform streamTransform) {
        AssertUtils.notNull(streamTransform.getTransformName(), "Transform name should not be empty");
        String transformName = streamTransform.getTransformName();
        if (this.streamTransforms.get(transformName) != null) {
            throw new IllegalArgumentException(String.format("TransformName=%s has already be set", streamTransform));
        }
        this.streamTransforms.put(transformName, streamTransform);
        return this;
    }

    @Override // org.apache.inlong.manager.client.api.InlongStream
    public InlongStream deleteSource(String str) {
        this.streamSources.remove(str);
        return this;
    }

    @Override // org.apache.inlong.manager.client.api.InlongStream
    public InlongStream deleteSink(String str) {
        this.streamSinks.remove(str);
        return this;
    }

    @Override // org.apache.inlong.manager.client.api.InlongStream
    public InlongStream deleteTransform(String str) {
        this.streamTransforms.remove(str);
        return this;
    }

    @Override // org.apache.inlong.manager.client.api.InlongStream
    public InlongStream updateSource(StreamSource streamSource) {
        AssertUtils.notNull(streamSource.getSourceName(), "Source name should not be empty");
        this.streamSources.put(streamSource.getSourceName(), streamSource);
        return this;
    }

    @Override // org.apache.inlong.manager.client.api.InlongStream
    public InlongStream updateSink(StreamSink streamSink) {
        AssertUtils.notNull(streamSink.getSinkName(), "Sink name should not be empty");
        this.streamSinks.put(streamSink.getSinkName(), streamSink);
        return this;
    }

    @Override // org.apache.inlong.manager.client.api.InlongStream
    public InlongStream updateTransform(StreamTransform streamTransform) {
        AssertUtils.notNull(streamTransform.getTransformName(), "Transform name should not be empty");
        this.streamTransforms.put(streamTransform.getTransformName(), streamTransform);
        return this;
    }

    @Override // org.apache.inlong.manager.client.api.InlongStream
    public StreamPipeline createPipeline() {
        StreamPipeline streamPipeline = new StreamPipeline();
        if (MapUtils.isEmpty(this.streamTransforms)) {
            StreamNodeRelation streamNodeRelation = new StreamNodeRelation();
            streamNodeRelation.setInputNodes(this.streamSources.keySet());
            streamNodeRelation.setOutputNodes(this.streamSinks.keySet());
            streamPipeline.setPipeline(Lists.newArrayList(new StreamNodeRelation[]{streamNodeRelation}));
            return streamPipeline;
        }
        HashMap newHashMap = Maps.newHashMap();
        for (StreamTransform streamTransform : this.streamTransforms.values()) {
            String transformName = streamTransform.getTransformName();
            Set preNodes = streamTransform.getPreNodes();
            StreamNodeRelation streamNodeRelation2 = new StreamNodeRelation();
            streamNodeRelation2.setInputNodes(preNodes);
            streamNodeRelation2.setOutputNodes(Sets.newHashSet(new String[]{transformName}));
            Iterator it = preNodes.iterator();
            while (it.hasNext()) {
                StreamTransform streamTransform2 = this.streamTransforms.get((String) it.next());
                if (streamTransform2 != null) {
                    streamTransform2.addPost(transformName);
                }
            }
            ((List) newHashMap.computeIfAbsent(preNodes, set -> {
                return Lists.newArrayList();
            })).add(streamNodeRelation2);
        }
        for (StreamTransform streamTransform3 : this.streamTransforms.values()) {
            String transformName2 = streamTransform3.getTransformName();
            Set postNodes = streamTransform3.getPostNodes();
            HashSet newHashSet = Sets.newHashSet();
            Iterator it2 = postNodes.iterator();
            while (it2.hasNext()) {
                StreamSink streamSink = this.streamSinks.get((String) it2.next());
                if (streamSink != null) {
                    newHashSet.add(streamSink.getSinkName());
                }
            }
            if (CollectionUtils.isNotEmpty(newHashSet)) {
                StreamNodeRelation streamNodeRelation3 = new StreamNodeRelation();
                HashSet newHashSet2 = Sets.newHashSet(new String[]{transformName2});
                streamNodeRelation3.setInputNodes(newHashSet2);
                streamNodeRelation3.setOutputNodes(newHashSet);
                ((List) newHashMap.computeIfAbsent(newHashSet2, set2 -> {
                    return Lists.newArrayList();
                })).add(streamNodeRelation3);
            }
        }
        ArrayList newArrayList = Lists.newArrayList();
        Iterator it3 = newHashMap.entrySet().iterator();
        while (it3.hasNext()) {
            List list = (List) ((Map.Entry) it3.next()).getValue();
            if (list.size() == 1) {
                newArrayList.add(list.get(0));
            } else {
                StreamNodeRelation streamNodeRelation4 = (StreamNodeRelation) list.get(0);
                for (int i = 1; i < list.size(); i++) {
                    Set outputNodes = ((StreamNodeRelation) list.get(i)).getOutputNodes();
                    streamNodeRelation4.getClass();
                    outputNodes.forEach(streamNodeRelation4::addOutputNode);
                }
                newArrayList.add(streamNodeRelation4);
            }
        }
        streamPipeline.setPipeline(newArrayList);
        Pair hasCircle = streamPipeline.hasCircle();
        if (!((Boolean) hasCircle.getLeft()).booleanValue()) {
            return streamPipeline;
        }
        Pair pair = (Pair) hasCircle.getRight();
        throw new IllegalStateException(String.format("There is circle dependency in streamPipeline for node=%s and node=%s", pair.getLeft(), pair.getRight()));
    }

    @Override // org.apache.inlong.manager.client.api.InlongStream
    public InlongStream update() {
        InlongStreamInfo streamInfo = this.managerClient.getStreamInfo(this.inlongGroupId, this.inlongStreamId);
        if (streamInfo == null) {
            throw new IllegalArgumentException(String.format("Stream is not exists for group=%s and stream=%s", this.inlongGroupId, this.inlongStreamId));
        }
        streamInfo.setFieldList(this.streamFields);
        streamInfo.setExtParams(JsonUtils.toJsonString(createPipeline()));
        Pair<Boolean, String> updateStreamInfo = this.managerClient.updateStreamInfo(streamInfo);
        if (!((Boolean) updateStreamInfo.getKey()).booleanValue()) {
            throw new RuntimeException(String.format("Update data stream failed: %s", updateStreamInfo.getValue()));
        }
        initOrUpdateTransform(streamInfo);
        initOrUpdateSource(streamInfo);
        initOrUpdateSink(streamInfo);
        return this;
    }

    private void initOrUpdateTransform(InlongStreamInfo inlongStreamInfo) {
        List<TransformResponse> listTransform = this.managerClient.listTransform(this.inlongGroupId, this.inlongStreamId);
        ArrayList newArrayList = Lists.newArrayList();
        for (TransformResponse transformResponse : listTransform) {
            StreamTransform parseStreamTransform = StreamTransformTransfer.parseStreamTransform(transformResponse);
            String transformName = parseStreamTransform.getTransformName();
            int id = transformResponse.getId();
            if (this.streamTransforms.get(transformName) == null) {
                TransformRequest createTransformRequest = StreamTransformTransfer.createTransformRequest(parseStreamTransform, inlongStreamInfo);
                if (!this.managerClient.deleteTransform(createTransformRequest)) {
                    throw new RuntimeException(String.format("Delete transform=%s failed", createTransformRequest));
                }
            } else {
                TransformRequest createTransformRequest2 = StreamTransformTransfer.createTransformRequest(this.streamTransforms.get(transformName), inlongStreamInfo);
                createTransformRequest2.setId(id);
                Pair<Boolean, String> updateTransform = this.managerClient.updateTransform(createTransformRequest2);
                if (!((Boolean) updateTransform.getKey()).booleanValue()) {
                    throw new RuntimeException(String.format("Update transform=%s failed with err=%s", createTransformRequest2, updateTransform.getValue()));
                }
                newArrayList.add(transformName);
            }
        }
        for (Map.Entry<String, StreamTransform> entry : this.streamTransforms.entrySet()) {
            if (!newArrayList.contains(entry.getKey())) {
                this.managerClient.createTransform(StreamTransformTransfer.createTransformRequest(entry.getValue(), inlongStreamInfo));
            }
        }
    }

    private void initOrUpdateSource(InlongStreamInfo inlongStreamInfo) {
        List<SourceListResponse> listSources = this.managerClient.listSources(this.inlongGroupId, this.inlongStreamId);
        ArrayList newArrayList = Lists.newArrayList();
        for (SourceListResponse sourceListResponse : listSources) {
            String sourceName = sourceListResponse.getSourceName();
            int intValue = sourceListResponse.getId().intValue();
            if (this.streamSources.get(sourceName) != null) {
                StreamSource streamSource = this.streamSources.get(sourceName);
                streamSource.setId(Integer.valueOf(intValue));
                streamSource.setInlongGroupId(inlongStreamInfo.getInlongGroupId());
                streamSource.setInlongStreamId(inlongStreamInfo.getInlongStreamId());
                Pair<Boolean, String> updateSource = this.managerClient.updateSource(streamSource.genSourceRequest());
                if (!((Boolean) updateSource.getKey()).booleanValue()) {
                    throw new RuntimeException(String.format("Update source=%s failed with err=%s", streamSource, updateSource.getValue()));
                }
                newArrayList.add(sourceName);
            } else if (!this.managerClient.deleteSource(intValue)) {
                throw new RuntimeException(String.format("Delete source=%s failed", sourceListResponse));
            }
        }
        for (Map.Entry<String, StreamSource> entry : this.streamSources.entrySet()) {
            if (!newArrayList.contains(entry.getKey())) {
                StreamSource value = entry.getValue();
                value.setInlongGroupId(inlongStreamInfo.getInlongGroupId());
                value.setInlongStreamId(inlongStreamInfo.getInlongStreamId());
                this.managerClient.createSource(value.genSourceRequest());
            }
        }
    }

    private void initOrUpdateSink(InlongStreamInfo inlongStreamInfo) {
        List<SinkListResponse> listSinks = this.managerClient.listSinks(this.inlongGroupId, this.inlongStreamId);
        ArrayList newArrayList = Lists.newArrayList();
        for (SinkListResponse sinkListResponse : listSinks) {
            String sinkName = sinkListResponse.getSinkName();
            int intValue = sinkListResponse.getId().intValue();
            if (this.streamSinks.get(sinkName) != null) {
                StreamSink streamSink = this.streamSinks.get(sinkName);
                streamSink.setId(Integer.valueOf(intValue));
                streamSink.setInlongGroupId(inlongStreamInfo.getInlongGroupId());
                streamSink.setInlongStreamId(inlongStreamInfo.getInlongStreamId());
                Pair<Boolean, String> updateSink = this.managerClient.updateSink(streamSink.genSinkRequest());
                if (!((Boolean) updateSink.getKey()).booleanValue()) {
                    throw new RuntimeException(String.format("Update sink=%s failed with err=%s", streamSink, updateSink.getValue()));
                }
                newArrayList.add(sinkName);
            } else if (!this.managerClient.deleteSink(intValue)) {
                throw new RuntimeException(String.format("Delete sink=%s failed", sinkListResponse));
            }
        }
        for (Map.Entry<String, StreamSink> entry : this.streamSinks.entrySet()) {
            if (!newArrayList.contains(entry.getKey())) {
                StreamSink value = entry.getValue();
                value.setInlongGroupId(inlongStreamInfo.getInlongGroupId());
                value.setInlongStreamId(inlongStreamInfo.getInlongStreamId());
                this.managerClient.createSink(value.genSinkRequest());
            }
        }
    }

    public InnerInlongManagerClient getManagerClient() {
        return this.managerClient;
    }

    public String getInlongGroupId() {
        return this.inlongGroupId;
    }

    @Override // org.apache.inlong.manager.client.api.InlongStream
    public String getInlongStreamId() {
        return this.inlongStreamId;
    }

    public Map<String, StreamSource> getStreamSources() {
        return this.streamSources;
    }

    public Map<String, StreamSink> getStreamSinks() {
        return this.streamSinks;
    }

    public Map<String, StreamTransform> getStreamTransforms() {
        return this.streamTransforms;
    }

    public void setManagerClient(InnerInlongManagerClient innerInlongManagerClient) {
        this.managerClient = innerInlongManagerClient;
    }

    public void setInlongGroupId(String str) {
        this.inlongGroupId = str;
    }

    public void setInlongStreamId(String str) {
        this.inlongStreamId = str;
    }

    public void setStreamSources(Map<String, StreamSource> map) {
        this.streamSources = map;
    }

    public void setStreamSinks(Map<String, StreamSink> map) {
        this.streamSinks = map;
    }

    public void setStreamTransforms(Map<String, StreamTransform> map) {
        this.streamTransforms = map;
    }

    public void setStreamFields(List<StreamField> list) {
        this.streamFields = list;
    }

    public boolean equals(Object obj) {
        if (obj == this) {
            return true;
        }
        if (!(obj instanceof InlongStreamImpl)) {
            return false;
        }
        InlongStreamImpl inlongStreamImpl = (InlongStreamImpl) obj;
        if (!inlongStreamImpl.canEqual(this)) {
            return false;
        }
        InnerInlongManagerClient managerClient = getManagerClient();
        InnerInlongManagerClient managerClient2 = inlongStreamImpl.getManagerClient();
        if (managerClient == null) {
            if (managerClient2 != null) {
                return false;
            }
        } else if (!managerClient.equals(managerClient2)) {
            return false;
        }
        String inlongGroupId = getInlongGroupId();
        String inlongGroupId2 = inlongStreamImpl.getInlongGroupId();
        if (inlongGroupId == null) {
            if (inlongGroupId2 != null) {
                return false;
            }
        } else if (!inlongGroupId.equals(inlongGroupId2)) {
            return false;
        }
        String inlongStreamId = getInlongStreamId();
        String inlongStreamId2 = inlongStreamImpl.getInlongStreamId();
        if (inlongStreamId == null) {
            if (inlongStreamId2 != null) {
                return false;
            }
        } else if (!inlongStreamId.equals(inlongStreamId2)) {
            return false;
        }
        Map<String, StreamSource> streamSources = getStreamSources();
        Map<String, StreamSource> streamSources2 = inlongStreamImpl.getStreamSources();
        if (streamSources == null) {
            if (streamSources2 != null) {
                return false;
            }
        } else if (!streamSources.equals(streamSources2)) {
            return false;
        }
        Map<String, StreamSink> streamSinks = getStreamSinks();
        Map<String, StreamSink> streamSinks2 = inlongStreamImpl.getStreamSinks();
        if (streamSinks == null) {
            if (streamSinks2 != null) {
                return false;
            }
        } else if (!streamSinks.equals(streamSinks2)) {
            return false;
        }
        Map<String, StreamTransform> streamTransforms = getStreamTransforms();
        Map<String, StreamTransform> streamTransforms2 = inlongStreamImpl.getStreamTransforms();
        if (streamTransforms == null) {
            if (streamTransforms2 != null) {
                return false;
            }
        } else if (!streamTransforms.equals(streamTransforms2)) {
            return false;
        }
        List<StreamField> streamFields = getStreamFields();
        List<StreamField> streamFields2 = inlongStreamImpl.getStreamFields();
        return streamFields == null ? streamFields2 == null : streamFields.equals(streamFields2);
    }

    protected boolean canEqual(Object obj) {
        return obj instanceof InlongStreamImpl;
    }

    public int hashCode() {
        InnerInlongManagerClient managerClient = getManagerClient();
        int hashCode = (1 * 59) + (managerClient == null ? 43 : managerClient.hashCode());
        String inlongGroupId = getInlongGroupId();
        int hashCode2 = (hashCode * 59) + (inlongGroupId == null ? 43 : inlongGroupId.hashCode());
        String inlongStreamId = getInlongStreamId();
        int hashCode3 = (hashCode2 * 59) + (inlongStreamId == null ? 43 : inlongStreamId.hashCode());
        Map<String, StreamSource> streamSources = getStreamSources();
        int hashCode4 = (hashCode3 * 59) + (streamSources == null ? 43 : streamSources.hashCode());
        Map<String, StreamSink> streamSinks = getStreamSinks();
        int hashCode5 = (hashCode4 * 59) + (streamSinks == null ? 43 : streamSinks.hashCode());
        Map<String, StreamTransform> streamTransforms = getStreamTransforms();
        int hashCode6 = (hashCode5 * 59) + (streamTransforms == null ? 43 : streamTransforms.hashCode());
        List<StreamField> streamFields = getStreamFields();
        return (hashCode6 * 59) + (streamFields == null ? 43 : streamFields.hashCode());
    }

    public String toString() {
        return "InlongStreamImpl(managerClient=" + getManagerClient() + ", inlongGroupId=" + getInlongGroupId() + ", inlongStreamId=" + getInlongStreamId() + ", streamSources=" + getStreamSources() + ", streamSinks=" + getStreamSinks() + ", streamTransforms=" + getStreamTransforms() + ", streamFields=" + getStreamFields() + ")";
    }
}
